Source code for ralph.pipeline.work_units

"""Planning work_units parsing and validation.

This module provides a typed parser for work_units[] declared in planning
artifacts. It intentionally focuses on schema and graph validation; execution
fanout remains orchestrator-owned.
"""

from __future__ import annotations

from pathlib import PurePosixPath
from typing import TYPE_CHECKING

from pydantic import ConfigDict, Field, model_validator

from ralph.pipeline.work_unit import WorkUnit
from ralph.pipeline.work_units_validation_error import WorkUnitsValidationError
from ralph.pydantic_compat import RalphBaseModel

if TYPE_CHECKING:
    from collections.abc import Mapping

__all__ = [
    "RESERVED_EDIT_PATHS",
    "WorkUnit",
    "WorkUnitsPlan",
    "WorkUnitsValidationError",
    "parse_work_units_from_artifact",
    "validate_for_same_workspace",
]

RESERVED_EDIT_PATHS: frozenset[str] = frozenset(
    {
        ".agent",
        ".git",
        # Defense-in-depth: deny .worktrees entries; not a supported edit area.
        ".worktrees",
        ".",
        "",
    }
)


[docs] class WorkUnitsPlan(RalphBaseModel): """Typed representation of work_units[] in planning artifacts.""" model_config = ConfigDict(frozen=True) work_units: list[WorkUnit] = Field(default_factory=list) @model_validator(mode="after") def validate_graph(self) -> WorkUnitsPlan: ids = [unit.unit_id for unit in self.work_units] seen: set[str] = set() for unit_id in ids: if unit_id in seen: raise WorkUnitsValidationError(f"Duplicate work unit id: {unit_id}") seen.add(unit_id) known = set(ids) dependency_map: dict[str, list[str]] = {} for unit in self.work_units: dependency_map[unit.unit_id] = unit.dependencies if unit.unit_id in unit.dependencies: raise WorkUnitsValidationError(f"Work unit '{unit.unit_id}' depends on itself") for dependency in unit.dependencies: if dependency not in known: raise WorkUnitsValidationError( f"Unknown dependency '{dependency}' in work unit '{unit.unit_id}'" ) _validate_acyclic(dependency_map) return self
[docs] def validate_for_same_workspace(plan: WorkUnitsPlan) -> None: """Validate that a plan is safe for same-workspace parallel execution. Enforces rules that apply specifically when workers share the same checkout: - Every unit must declare at least one allowed_directory. - No unit may declare a reserved path (.agent, .git, .worktrees, ., ""). - No two units may have overlapping edit areas (prefix-overlap by path segments). Raises: WorkUnitsValidationError: with a human-readable message naming the problematic units/paths and suggesting a fix. """ for unit in plan.work_units: if not unit.allowed_directories: raise WorkUnitsValidationError( f"Work unit '{unit.unit_id}' does not declare any allowed_directories. " "Each unit must declare the subdirectories it is permitted to edit. " "Choose disjoint subdirectories or merge the work units." ) for d in unit.allowed_directories: _check_reserved(unit.unit_id, d) _check_no_overlap(plan.work_units)
def _check_reserved(unit_id: str, directory: str) -> None: p = PurePosixPath(directory) if not p.parts: raise WorkUnitsValidationError( f"Work unit '{unit_id}' declares an empty allowed_directory. " "The empty string is a reserved path. Choose a specific subdirectory." ) first_part = p.parts[0] if p.parts else "" normalized = str(p) if first_part == ".worktrees": raise WorkUnitsValidationError( f"Work unit '{unit_id}' declares reserved path {directory!r} as an edit area. " "Path '.worktrees' is reserved (defense-in-depth) and may not be used as an " "allowed_directory. Choose a project-owned subdirectory." ) if normalized in RESERVED_EDIT_PATHS or first_part in {".agent", ".git"}: raise WorkUnitsValidationError( f"Work unit '{unit_id}' declares reserved path {directory!r} as an edit area. " "Reserved paths (.agent, .git, .) may not be declared as " "allowed_directories. Choose a project-owned subdirectory." ) def _check_no_overlap(units: list[WorkUnit]) -> None: """Detect prefix-overlap between any two units' allowed_directories.""" all_dirs: list[tuple[str, str, tuple[str, ...]]] = [] def sort_key(unit: WorkUnit) -> str: return unit.unit_id for unit in sorted(units, key=sort_key): for d in sorted(unit.allowed_directories): parts = PurePosixPath(d).parts all_dirs.append((unit.unit_id, d, parts)) for i, (uid1, d1, p1) in enumerate(all_dirs): for uid2, d2, p2 in all_dirs[i + 1 :]: if uid1 == uid2: continue if _path_parts_overlap(p1, p2): raise WorkUnitsValidationError( f"Work unit '{uid1}' edit area '{d1}' overlaps with " f"work unit '{uid2}' edit area '{d2}'. " "Choose disjoint subdirectories or merge the work units." ) def _path_parts_overlap(a: tuple[str, ...], b: tuple[str, ...]) -> bool: """Return True only when one path is a strict prefix of the other (segment-aware). 'src/api' and 'src/api2' do NOT overlap (different second segment). 'src/api' and 'src/api/auth' DO overlap (a is a prefix of b). 'src/api' and 'src/api' DO overlap (exact match). """ shorter, longer = (a, b) if len(a) <= len(b) else (b, a) return longer[: len(shorter)] == shorter def _validate_acyclic(dependency_map: dict[str, list[str]]) -> None: visiting: set[str] = set() visited: set[str] = set() def dfs(node: str) -> None: if node in visited: return if node in visiting: raise WorkUnitsValidationError(f"Dependency cycle detected at '{node}'") visiting.add(node) for dependency in dependency_map.get(node, []): dfs(dependency) visiting.remove(node) visited.add(node) for node in dependency_map: dfs(node)
[docs] def parse_work_units_from_artifact(artifact: Mapping[str, object]) -> WorkUnitsPlan | None: """Parse and validate work_units[] from a planning artifact payload. Returns None when the artifact does not declare work_units. """ raw = artifact.get("work_units") if raw is None: return None try: payload: dict[str, object] = {"work_units": raw} return WorkUnitsPlan.model_validate(payload) except Exception as exc: # pragma: no cover - converted to domain error below if isinstance(exc, WorkUnitsValidationError): raise raise WorkUnitsValidationError(str(exc)) from exc