Source code for ralph.pipeline.parallel.scheduler

"""Wave scheduler for parallel work-unit execution.

Provides ``schedule_next_wave``, which selects the next batch of ready work units
given the set of already-completed unit IDs, the full plan, currently running unit
IDs, and the maximum worker concurrency. Units are ready when all their declared
dependencies are in ``completed``.
"""

from ralph.pipeline.work_units import WorkUnit


[docs] def schedule_next_wave( completed: set[str], all_units: tuple[WorkUnit, ...], currently_running: set[str], max_workers: int, ) -> list[WorkUnit]: """Return ready work units that can be launched in the next wave.""" available_slots = max_workers - len(currently_running) if available_slots <= 0: return [] ready = [ unit for unit in all_units if unit.unit_id not in completed and unit.unit_id not in currently_running and all(dep in completed for dep in unit.dependencies) ] ready.sort(key=lambda u: u.unit_id) return ready[:available_slots]