import heapq
import random
import time
from copy import deepcopy
from enum import Enum
from abc import ABC, abstractmethod
from itertools import combinations
from typing import Optional, Type
import pandas as pd
from ware_ops_algos.algorithms import Algorithm
from ware_ops_algos.algorithms.batching.batching_utils import CapacityChecker
from ware_ops_algos.algorithms.routing.routing import Routing
from ware_ops_algos.algorithms.algorithm import BatchingSolution, BatchObject, WarehouseOrder
from ware_ops_algos.domain_models import Order, PickCart, Articles, DimensionType, Box
[docs]
class Batching(Algorithm[list[WarehouseOrder], BatchingSolution], ABC):
"""Batching class to batch orders"""
def __init__(self, pick_cart: PickCart, articles: Articles, **kwargs):
# instance attributes
super().__init__(**kwargs)
# self.picker = picker
# self.picker_capa = pick_cart.
[docs]
self.execution_time = None
[docs]
self.pick_cart = pick_cart
[docs]
self.articles = articles
[docs]
self.capacity_checker = CapacityChecker(pick_cart=pick_cart,
articles=articles)
def _run(self, input_data: list[WarehouseOrder]) -> BatchingSolution:
pass
[docs]
class PriorityBatching(Batching):
"""Priority batching class to batch orders based on sorting criterion."""
@abstractmethod
def _sorted_orders(self) -> list[WarehouseOrder]:
pass
def _run(self, input_data: list[WarehouseOrder]) -> BatchingSolution:
self.order_list = input_data
sorted_orders = self._sorted_orders()
batched_list: list[BatchObject] = []
batch_number = 0
current_batch: list[WarehouseOrder] = []
for order in sorted_orders:
# Check if we can add this order to current batch
if self.capacity_checker.can_add_order(current_batch, order):
current_batch.append(order)
else:
# Current batch is full - start new batch
if current_batch:
batched_list.append(BatchObject(batch_id=batch_number, orders=current_batch))
batch_number += 1
# Check if order fits alone
if self.capacity_checker.can_add_order([], order):
current_batch = [order]
else:
print(f"Order {order.order_id} exceeds capacity, excluded")
current_batch = []
if current_batch:
batched_list.append(BatchObject(batch_id=batch_number, orders=current_batch))
return BatchingSolution(batches=batched_list)
[docs]
class OrderNrFifoBatching(PriorityBatching):
"""First in First out batching based on order number."""
[docs]
algo_name = "OrderNrFiFoBatching"
def _sorted_orders(self) -> list[WarehouseOrder]:
return sorted(self.order_list, key=lambda o: o.order_id)
[docs]
class FifoBatching(PriorityBatching):
"""
First in First out batching class to batch orders
"""
[docs]
algo_name = "FiFoBatching"
# def __init__(self, capacity):
# super().__init__(capacity)
def _sorted_orders(self) -> list[WarehouseOrder]:
return sorted(self.order_list, key=lambda o: o.order_date)
[docs]
class RandomBatching(PriorityBatching):
"""
First in First out batching class to batch orders
"""
[docs]
algo_name = "RandomBatching"
def __init__(self, pick_cart: PickCart, articles: Articles, seed=44):
super().__init__(pick_cart, articles)
def _sorted_orders(self):
"""
Sorting the batching list
"""
shuffled = self.order_list.copy()
random.Random(self.seed).shuffle(shuffled)
return shuffled
[docs]
class DueDateBatching(PriorityBatching):
"""
First in First out batching class to batch orders
"""
[docs]
algo_name = "DueDateBatching"
def __init__(self, pick_cart: PickCart, articles: Articles):
super().__init__(pick_cart, articles)
def _sorted_orders(self) -> list[WarehouseOrder]:
return sorted(self.order_list, key=lambda o: o.due_date)
[docs]
class SavingsBatching(Batching, ABC):
"""
Base class for savings-based batching algorithms.
"""
def __init__(self,
pick_cart: PickCart,
articles: Articles,
routing_class: Type[Routing],
routing_class_kwargs,
time_limit: float | None = None):
super().__init__(pick_cart, articles)
[docs]
self.routing_class = routing_class
[docs]
self.routing_class_kwargs = routing_class_kwargs
self._router = routing_class(**routing_class_kwargs)
self._route_cache = {}
[docs]
self.time_limit = time_limit
[docs]
self.algo_name = f"{self.routing_class.algo_name}_SavingsBatching"
def _calc_dist_with_routing_algo(self, orders: list[WarehouseOrder]) -> float:
"""Compute route distance for a list of orders, with caching."""
key = tuple(sorted(o.order_id for o in orders))
if key not in self._route_cache:
# router = self.routing_class(
# **self.routing_class_kwargs
# )
self._router.reset_parameters()
batches = [BatchObject(batch_id=0, orders=orders)]
pick_lists = []
for batch in batches:
pick_list = []
for order in batch.orders:
for pos in order.pick_positions:
pick_list.append(pos)
pick_lists.append(pick_list)
routing_sol = self._router.solve(pick_lists[0])
self._route_cache[key] = routing_sol.route.distance
return self._route_cache[key]
def _calculate_saving(self, batch_a: BatchObject, batch_b: BatchObject) -> float:
"""Calculate saving from merging two batches."""
orders_a = batch_a.orders
orders_b = batch_b.orders
merged_orders = orders_a + orders_b
# Capacity check
# total_amount = sum(pos.in_store for o in merged_orders for pos in o.pick_positions)
# if total_amount > self.picker_capa:
# return 0
if not self.capacity_checker.orders_fit(merged_orders):
return 0
dist_a = self._calc_dist_with_routing_algo(orders_a)
dist_b = self._calc_dist_with_routing_algo(orders_b)
dist_comb = self._calc_dist_with_routing_algo(merged_orders)
return dist_a + dist_b - dist_comb
[docs]
class ClarkAndWrightBatching(SavingsBatching):
def _run(self, input_data: list[WarehouseOrder]) -> BatchingSolution:
self.order_list = input_data
start_time = time.time()
batches = {i: BatchObject(batch_id=i, orders=[order])
for i, order in enumerate(self.order_list)}
batch_counter = len(batches)
savings_heap = []
for id_a, id_b in combinations(batches.keys(), 2):
saving = self._calculate_saving(batches[id_a], batches[id_b])
if saving > 0:
pair = (min(id_a, id_b), max(id_a, id_b))
heapq.heappush(savings_heap, (-saving, pair[0], pair[1]))
while savings_heap:
if self.time_limit and (time.time() - start_time) > self.time_limit:
break
neg_saving, id_a, id_b = heapq.heappop(savings_heap)
if id_a not in batches or id_b not in batches:
continue
if -neg_saving <= 0:
break
batch_a = batches.pop(id_a)
batch_b = batches.pop(id_b)
merged_batch = BatchObject(batch_id=batch_counter, orders=batch_a.orders + batch_b.orders)
batches[batch_counter] = merged_batch
for other_id in batches:
if other_id == batch_counter:
continue
saving = self._calculate_saving(merged_batch, batches[other_id])
if saving > 0:
pair = (min(batch_counter, other_id), max(batch_counter, other_id))
heapq.heappush(savings_heap, (-saving, pair[0], pair[1]))
batch_counter += 1
return BatchingSolution(batches=list(batches.values()))
[docs]
class SeedCriteria(str, Enum):
[docs]
FEWEST_POSITIONS = "fewest_positions"
[docs]
MOST_POSITIONS = "most_positions"
[docs]
CLOSEST_TO_DEPOT = "closest_to_depot"
[docs]
class SimilarityMeasure(str, Enum):
[docs]
SHARED_ARTICLES = "shared_articles"
[docs]
MIN_DISTANCE = "min_distance"
[docs]
class SeedBatching(Batching):
def __init__(
self,
pick_cart: PickCart,
articles: Articles,
distance_matrix: Optional[pd.DataFrame] = None,
seed_criterion: SeedCriteria = SeedCriteria.RANDOM,
similarity_measure: SimilarityMeasure = SimilarityMeasure.SHARED_ARTICLES,
start_node: Optional[tuple[int, int]] = None,
):
super().__init__(pick_cart, articles)
[docs]
self.distance_matrix = distance_matrix
[docs]
self.similarity_measure = similarity_measure
[docs]
self.seed_criterion = seed_criterion
[docs]
self.start_node = start_node
[docs]
self.algo_name = f"{self.seed_criterion.value}_{self.similarity_measure.value}_SeedBatching"
@staticmethod
[docs]
def shared_article_similarity(seed_order: WarehouseOrder, other_order: WarehouseOrder):
seed_articles = {pos.article_id for pos in seed_order.pick_positions}
other_articles = {pos.article_id for pos in other_order.pick_positions}
return -len(seed_articles & other_articles)
[docs]
def min_distance_similarity(self, seed_order: WarehouseOrder, other_order: WarehouseOrder):
return min(
self.distance_matrix.at[pos_a.pick_node, pos_b.pick_node]
for pos_a in seed_order.pick_positions
for pos_b in other_order.pick_positions
)
@staticmethod
[docs]
def rand_seed_order(candidate_orders: list[WarehouseOrder]) -> WarehouseOrder:
return random.choice(candidate_orders)
@staticmethod
[docs]
def most_positions_seed_order(candidate_orders: list[WarehouseOrder]) -> WarehouseOrder:
return max(candidate_orders, key=lambda o: len(o.pick_positions))
@staticmethod
[docs]
def fewest_positions_seed_order(candidate_orders: list[WarehouseOrder]) -> WarehouseOrder:
return min(candidate_orders, key=lambda o: len(o.pick_positions))
[docs]
def closest_to_depot_seed_order(self, candidate_orders: list[WarehouseOrder]) -> WarehouseOrder:
return min(
candidate_orders,
key=lambda o: min(
self.distance_matrix.at[self.start_node, pos.pick_node]
for pos in o.pick_positions
),
)
[docs]
def similarity(self, seed_order: WarehouseOrder, other_order: WarehouseOrder):
if self.similarity_measure == SimilarityMeasure.SHARED_ARTICLES:
return self.shared_article_similarity(seed_order, other_order)
elif self.similarity_measure == SimilarityMeasure.MIN_DISTANCE:
return self.min_distance_similarity(seed_order, other_order)
else:
raise ValueError("Invalid similarity measure.")
[docs]
def get_seed_order(self,
seed_criterion: SeedCriteria,
candidate_orders: list[WarehouseOrder]):
if seed_criterion == SeedCriteria.RANDOM:
seed_order = self.rand_seed_order(candidate_orders)
elif seed_criterion == SeedCriteria.MOST_POSITIONS:
seed_order = self.most_positions_seed_order(candidate_orders)
elif seed_criterion == SeedCriteria.FEWEST_POSITIONS:
seed_order = self.fewest_positions_seed_order(candidate_orders)
elif seed_criterion == SeedCriteria.CLOSEST_TO_DEPOT:
seed_order = self.closest_to_depot_seed_order(candidate_orders)
else:
raise ValueError("Invalid seed criterion.")
return seed_order
def _run(self, input_data: list[WarehouseOrder]) -> BatchingSolution:
self.order_list = input_data
remaining_orders = self.order_list.copy()
batched_list: list[BatchObject] = []
while remaining_orders:
seed_order = self.get_seed_order(self.seed_criterion, remaining_orders)
current_batch = [seed_order]
current_capa = sum(pos.in_store for pos in seed_order.pick_positions)
remaining_orders.remove(seed_order)
sorted_remaining = sorted(
remaining_orders,
key=lambda o: self.similarity(seed_order, o)
)
for candidate in sorted_remaining:
# demand = sum(pos.in_store for pos in candidate.pick_positions)
# if current_capa + demand <= self.picker_capa:
# current_batch.append(candidate)
# current_capa += demand
if self.capacity_checker.can_add_order(current_batch, candidate):
current_batch.append(candidate)
for o in current_batch:
if o in remaining_orders:
remaining_orders.remove(o)
batched_list.append(BatchObject(batch_id=len(batched_list), orders=current_batch))
return BatchingSolution(batches=batched_list)
[docs]
class LocalSearchBatching(Batching):
"""Order batching via local search with swap and shift neighborhoods.
Generates an initial solution using a configurable start batching algorithm,
then iteratively improves it by swapping orders between batches and shifting
orders from one batch to another until no improvement is found or the time
limit is exceeded.
Based on:
Henn, S., Koch, S., Doerner, K.F. et al.
Metaheuristics for the Order Batching Problem in Manual Order Picking Systems.
Bus Res 3, 82–105 (2010). https://doi.org/10.1007/BF03342717
Note: While Henn et al. present an iterated local search approach, we only implemented the local search component.
"""
def __init__(self,
pick_cart: PickCart,
articles: Articles,
routing_class: type[Routing],
routing_class_kwargs: dict,
start_batching_class: type[Batching],
start_batching_kwargs: dict = None,
time_limit: float = 120.0):
super().__init__(pick_cart, articles)
[docs]
self.routing_class = routing_class
[docs]
self.routing_class_kwargs = routing_class_kwargs
[docs]
self.start_batching_class = start_batching_class
[docs]
self.start_batching_kwargs = start_batching_kwargs or {}
[docs]
self.time_limit = time_limit
self._route_cache = {}
self._router = routing_class(**self.routing_class_kwargs)
self._start_time = None
[docs]
self.algo_name = f"{self.routing_class.algo_name}_{self.start_batching_class.algo_name}_LocalSearchBatching"
def _run(self, input_data: list[WarehouseOrder]) -> BatchingSolution:
self.order_list = input_data
start_batches = self._create_start_batches()
batches = self._local_search(start_batches)
return BatchingSolution(batches=batches)
def _create_start_batches(self) -> list[BatchObject]:
batching_instance: Batching = self.start_batching_class(
pick_cart=self.pick_cart,
articles=self.articles,
**self.start_batching_kwargs
)
batching_sol = batching_instance.solve(self.order_list)
return batching_sol.batches
def _time_limit_exceeded(self) -> bool:
"""Check if time limit has been exceeded."""
return time.time() - self._start_time > self.time_limit
def _local_search(self, batches: list[BatchObject]) -> list[BatchObject]:
self._start_time = time.time()
initial_cost = sum(self._batch_cost_from_orders(b.orders) for b in batches)
print(f"\n{'=' * 60}")
print(f"Local Search Started")
print(f"{'=' * 60}")
print(f"Initial solution: {len(batches)} batches, cost: {initial_cost:.2f}")
print(f"Time limit: {self.time_limit}s")
print(f"{'=' * 60}\n")
iteration = 0
swap_improvements = 0
shift_improvements = 0
while True:
if self._time_limit_exceeded():
elapsed = time.time() - self._start_time
print(f"\n⏱️ Time limit exceeded after {elapsed:.2f}s")
break
iteration += 1
overall_improved = False
iter_start_cost = sum(self._batch_cost_from_orders(b.orders) for b in batches)
# Exhaust all SWAP improvements
swap_count = 0
swap_improved = True
while swap_improved and not self._time_limit_exceeded():
batches, swap_improved = self._swap(batches)
if swap_improved:
swap_count += 1
swap_improvements += 1
overall_improved = True
# Exhaust all SHIFT improvements
shift_count = 0
shift_improved = True
while shift_improved and not self._time_limit_exceeded():
batches, shift_improved = self._shift(batches)
if shift_improved:
shift_count += 1
shift_improvements += 1
overall_improved = True
if overall_improved:
iter_end_cost = sum(self._batch_cost_from_orders(b.orders) for b in batches)
improvement = iter_start_cost - iter_end_cost
elapsed = time.time() - self._start_time
print(f"Iteration {iteration}: "
f"swaps={swap_count}, shifts={shift_count} | "
f"cost: {iter_end_cost:.2f} "
f"(Δ {improvement:+.2f}) | "
f"{len(batches)} batches | "
f"cache: {len(self._route_cache)} | "
f"{elapsed:.1f}s")
if not overall_improved:
break
# Final summary
final_cost = sum(self._batch_cost_from_orders(b.orders) for b in batches)
total_improvement = initial_cost - final_cost
elapsed = time.time() - self._start_time
print(f"\n{'=' * 60}")
print(f"Local Search Completed")
print(f"{'=' * 60}")
print(f"Iterations: {iteration}")
print(f"Total improvements: {swap_improvements + shift_improvements} "
f"(swaps: {swap_improvements}, shifts: {shift_improvements})")
print(f"Initial cost: {initial_cost:.2f}")
print(f"Final cost: {final_cost:.2f}")
print(f"Total improvement: {total_improvement:.2f} ({100 * total_improvement / initial_cost:.1f}%)")
print(f"Final batches: {len(batches)}")
print(f"Route cache size: {len(self._route_cache)}")
print(f"Time elapsed: {elapsed:.2f}s")
print(f"{'=' * 60}\n")
return batches
def _swap(self, batches: list[BatchObject]) -> tuple[list[BatchObject], bool]:
for i in range(len(batches)):
for j in range(i + 1, len(batches)):
if self._time_limit_exceeded():
return batches, False
batch_i = batches[i]
batch_j = batches[j]
old_cost_i = self._batch_cost_from_orders(batch_i.orders)
old_cost_j = self._batch_cost_from_orders(batch_j.orders)
old_total = old_cost_i + old_cost_j
for i_idx, order_i in enumerate(batch_i.orders):
for j_idx, order_j in enumerate(batch_j.orders):
temp_orders_i = batch_i.orders[:i_idx] + [order_j] + batch_i.orders[i_idx + 1:]
temp_orders_j = batch_j.orders[:j_idx] + [order_i] + batch_j.orders[j_idx + 1:]
if not self.capacity_checker.orders_fit(temp_orders_i) or \
not self.capacity_checker.orders_fit(temp_orders_j):
continue
# Compute new costs
new_cost_i = self._batch_cost_from_orders(temp_orders_i)
new_cost_j = self._batch_cost_from_orders(temp_orders_j)
new_total = new_cost_i + new_cost_j
if new_total < old_total - 1e-6:
batch_i.orders[i_idx] = order_j
batch_j.orders[j_idx] = order_i
return batches, True
return batches, False
def _shift(self, batches: list[BatchObject]) -> tuple[list[BatchObject], bool]:
for i in range(len(batches)):
batch_i = batches[i]
for order_idx, order in enumerate(batch_i.orders):
for j in range(len(batches)):
if self._time_limit_exceeded():
return batches, False
if i == j:
continue
batch_j = batches[j]
# Skip if source batch would become empty
if len(batch_i.orders) <= 1:
continue
# Check capacity BEFORE computing costs
temp_orders_i = batch_i.orders[:order_idx] + batch_i.orders[order_idx + 1:]
temp_orders_j = batch_j.orders + [order]
if not self.capacity_checker.orders_fit(temp_orders_i) or \
not self.capacity_checker.orders_fit(temp_orders_j):
continue
# Compute costs
old_cost_i = self._batch_cost_from_orders(batch_i.orders)
old_cost_j = self._batch_cost_from_orders(batch_j.orders)
old_total = old_cost_i + old_cost_j
new_cost_i = self._batch_cost_from_orders(temp_orders_i)
new_cost_j = self._batch_cost_from_orders(temp_orders_j)
new_total = new_cost_i + new_cost_j
if new_total < old_total - 1e-6:
# Apply improvement IN PLACE
batch_i.orders.pop(order_idx)
batch_j.orders.append(order)
# Clean up empty batches if needed
if not batch_i.orders:
batches.pop(i)
return batches, True
return batches, False
def _batch_cost_from_orders(self, orders: list[WarehouseOrder]) -> float:
"""Calculate routing cost for a list of orders with caching."""
if not orders:
return 0.0
key = tuple(sorted(o.order_id for o in orders))
if key not in self._route_cache:
self._router.reset_parameters()
pick_list = [pos for order in orders for pos in order.pick_positions]
sol = self._router.solve(pick_list)
self._route_cache[key] = sol.route.distance
return self._route_cache[key]
def _nn_cost(self, pick_list: list) -> float:
if not pick_list:
return 0.0
remaining_indices = [self._router._node_to_idx[pos.pick_node] for pos in pick_list]
source_idx = self._router._node_to_idx[self._router.start_node]
end_idx = self._router._node_to_idx[self._router.end_node]
dist = self._router._dist_array
cost = 0.0
while remaining_indices:
distances = dist[source_idx, remaining_indices]
best = int(distances.argmin())
cost += distances[best]
source_idx = remaining_indices[best]
remaining_indices.pop(best) # O(n) but preserves order → consistent costs
cost += dist[source_idx, end_idx]
return cost
def _batch_cost_from_orders_estimator(self, orders: list[WarehouseOrder]) -> float:
if not orders:
return 0.0
key = tuple(sorted(o.order_id for o in orders))
if key not in self._route_cache:
pick_list = [pos for order in orders for pos in order.pick_positions]
self._route_cache[key] = self._nn_cost(pick_list)
return self._route_cache[key]
def _batch_cost(self, batch: BatchObject) -> float:
"""Calculate routing cost for a batch with caching."""
return self._batch_cost_from_orders(batch.orders)