Source code for ware_ops_algos.algorithms.scheduling.scheduling

import heapq
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional

from ware_ops_algos.algorithms import Algorithm, I, O
from ware_ops_algos.algorithms.algorithm import SchedulingSolution, Job, Route, AlgorithmSolution, PickList
from ware_ops_algos.domain_models import OrdersDomain, Resources, Resource, Order


@dataclass
[docs] class SequencingInput:
[docs] pick_lists: list[PickList]
[docs] routes: list[Route]
@dataclass
[docs] class Sequencing:
[docs] sequenced_pick_lists: list[Route]
@dataclass
[docs] class SequencingSolution(AlgorithmSolution):
[docs] sequencing: Optional[list[PickList]] = None
# PickList vs Route Sequencing
[docs] class PickListSequencer(Algorithm[list[PickList], SequencingSolution], ABC): """Performs sequencing of pick list: takes pick list, returns them in sorted order.""" def __init__(self, orders: OrdersDomain, resources: Resources): super().__init__()
[docs] self.orders_domain = orders
[docs] self.resource_domain = resources
[docs] class EDDSequencer(PickListSequencer): def _sort_pick_lists(self, pick_lists: list[PickList]): return sorted(pick_lists, key=lambda j: j.earliest_due_date) def _run(self, input_data: list[PickList]) -> SequencingSolution: sorted_pick_lists = self._sort_pick_lists(input_data) return SequencingSolution(sequencing=sorted_pick_lists)
@dataclass
[docs] class SchedulingInput: """ Minimal info the scheduler needs. """
[docs] routes: list[Route]
[docs] orders: OrdersDomain
[docs] resources: Resources
[docs] class PriorityScheduling(Algorithm[SchedulingInput, SchedulingSolution], ABC): """ Schedule by: sort jobs once using a rule, then assign to earliest-free picker. Subclasses implement `_sorted_jobs`. """ @abstractmethod def _sorted_jobs(self, jobs: list[dict], order_by_id: dict[int, Order], resources: Resources) -> list[dict]: pass def _run(self, input_data: SchedulingInput) -> SchedulingSolution: routes = input_data.routes orders = input_data.orders resources = input_data.resources # batch -> order_numbers batches_orders = [] for route in routes: pl = route.pick_list order_nums = pl.order_numbers batches_orders.append(sorted(order_nums)) order_by_id = {o.order_id: o for o in orders.orders} # build jobs jobs = [] for idx, (route, order_nums) in enumerate(zip(routes, batches_orders)): jobs.append({ "idx": idx, "solution": route, "order_numbers": order_nums, # "release": self._latest_order_arrival(order_nums, order_by_id), # r_j # "first_due": self._first_due_date(order_nums, order_by_id), # d_j^min "release": route.pick_list.release, "first_due": route.pick_list.earliest_due_date }) # sort once by chosen rule jobs_sorted = self._sorted_jobs(jobs, order_by_id, resources) # earliest-free picker heap rheap = [(0.0, i, r) for i, r in enumerate(resources.resources)] heapq.heapify(rheap) jobs_scheduled: list[Job] = [] for job in jobs_sorted: avail_time, ridx, picker = heapq.heappop(rheap) n_picks = len(job["solution"].item_sequence) pt = self._processing_time(job["solution"], picker, n_picks) start_time = max(job["release"], avail_time) if picker.tour_setup_time: start_time += picker.tour_setup_time end_time = start_time + pt travel_time = job["solution"].distance / picker.speed handling_time = picker.time_per_pick * n_picks jobs_scheduled.append(Job( batch_idx=job["idx"], picker_id=picker.id, start_time=start_time, end_time=end_time, release_time=job["release"], distance=job["solution"].distance, n_picks=n_picks, travel_time=travel_time, handling_time=handling_time, route=job["solution"], )) heapq.heappush(rheap, (end_time, ridx, picker)) # assignments.sort(key=lambda a: a.pick_list_idx) return SchedulingSolution(jobs=jobs_scheduled, algo_name=self.algo_name) # ---- helpers ---- @staticmethod def _latest_order_arrival(order_numbers: list[int], order_by_id: dict[int, Order]) -> float: ts = [order_by_id[on].order_date for on in order_numbers if on in order_by_id and order_by_id[on].order_date is not None] return max(ts) if ts else 0.0 @staticmethod def _first_due_date(order_numbers: list[int], order_by_id: dict[int, Order]) -> float: ts = [order_by_id[on].due_date for on in order_numbers if on in order_by_id and order_by_id[on].due_date is not None] return min(ts) if ts else float("inf") @staticmethod def _processing_time(sol: Route, picker: Resource, n_picks: int) -> float: if picker.speed is None or picker.speed <= 0: raise ValueError(f"Resource {picker.id} needs a positive 'speed'.") if picker.time_per_pick is None: raise ValueError(f"Resource {picker.id} needs 'time_per_pick'.") return (sol.distance / picker.speed) + picker.time_per_pick * n_picks
[docs] class SPTScheduling(PriorityScheduling): """Shortest Processing Time first.""" def _sorted_jobs(self, jobs, order_by_id, resources): rp = resources.resources[0] return sorted(jobs, key=lambda j: self._processing_time(j["solution"], rp, len(j["solution"].item_sequence)))
[docs] class LPTScheduling(PriorityScheduling): """Longest Processing Time first (often better for makespan).""" def _sorted_jobs(self, jobs, order_by_id, resources): rp = resources.resources[0] return sorted(jobs, key=lambda j: self._processing_time(j["solution"], rp, len(j["solution"].item_sequence)), reverse=True)
[docs] class EDDScheduling(PriorityScheduling): """Earliest Due Date first (good for tardiness).""" def _sorted_jobs(self, jobs, order_by_id, resources): return sorted(jobs, key=lambda j: j["first_due"])
[docs] class ERDScheduling(PriorityScheduling): """Earliest Release Date first (start as soon as available).""" def _sorted_jobs(self, jobs, order_by_id, resources): return sorted(jobs, key=lambda j: j["release"])
[docs] class FIFOScheduling(PriorityScheduling): """By smallest order number in batch (proxy for input order).""" def _sorted_jobs(self, jobs, order_by_id, resources): return sorted(jobs, key=lambda j: (min(j["order_numbers"]) if j["order_numbers"] else float("inf")))