Source code for ralph.pipeline.handoffs

"""Pure helpers for phase handoff and drain resolution.

This module centralizes the policy-driven routing and phase-to-drain lookup
used across the reducer and runtime. Keeping these helpers pure makes the
handoff contract easy to unit test and keeps runtime injection limited to the
policy data loaded at the composition root.
"""

from __future__ import annotations

from typing import TYPE_CHECKING

from ralph.pipeline import progress
from ralph.pipeline.exhausted_analysis_bypass_result import ExhaustedAnalysisBypassResult
from ralph.pipeline.exhausted_analysis_skip import ExhaustedAnalysisSkip
from ralph.policy.models import PhaseLoopPolicy

if TYPE_CHECKING:
    from ralph.config.enums import PipelinePhase
    from ralph.pipeline.state import PipelineState
    from ralph.policy.models import PipelinePolicy


Handoff = ExhaustedAnalysisBypassResult


[docs] def resolve_phase_drain( phase: PipelinePhase, pipeline_policy: PipelinePolicy, ) -> str | None: """Return the configured drain for a phase.""" phase_def = pipeline_policy.phases.get(phase) return phase_def.drain if phase_def is not None else None
[docs] def resolve_next_phase( current_phase: PipelinePhase, signal: str, pipeline_policy: PipelinePolicy, ) -> PipelinePhase: """Resolve the next phase based on a signal and the pipeline policy.""" phase_def = pipeline_policy.phases.get(current_phase) if phase_def is None: msg = f"Cannot resolve transition: phase '{current_phase}' not found" raise ValueError(msg) transitions = phase_def.transitions target: str | None if signal == "success": target = transitions.on_success elif signal == "failure": target = transitions.on_failure elif signal == "loopback": target = transitions.on_loopback else: msg = f"Unknown signal: {signal}" raise ValueError(msg) if target is None: msg = ( f"No '{signal}' transition defined for phase '{current_phase}'. " f"Define on_{signal} in pipeline.toml or set the phase to terminal." ) raise ValueError(msg) terminal_states = pipeline_policy.terminal_states() if target in terminal_states: return target if target not in pipeline_policy.phases: msg = ( f"Transition from '{current_phase}' on signal '{signal}' " f"references unknown phase '{target}'. " f"Either add '{target}' to pipeline.phases or use the declared terminal phase." ) raise ValueError(msg) return target
[docs] def resolve_exhausted_analysis_bypass( state: PipelineState, candidate_phase: PipelinePhase, pipeline_policy: PipelinePolicy, ) -> ExhaustedAnalysisBypassResult: """Resolve exhausted-analysis re-entry from one canonical policy-driven helper. The helper accepts an analysis phase candidate and follows that phase's declared success transition whenever its loop budget is already exhausted. Each skipped analysis phase has its loop counter reset to 0 and its skip metadata recorded so reducer and runner paths can reuse the exact same bypass contract. """ current_state = state current_target = candidate_phase skipped: list[ExhaustedAnalysisSkip] = [] visited: set[str] = set() while current_target not in visited: visited.add(current_target) phase_def = pipeline_policy.phases.get(current_target) if phase_def is None or not isinstance(phase_def.loop_policy, PhaseLoopPolicy): break iteration_field = phase_def.loop_policy.iteration_state_field current_iteration = current_state.get_loop_iteration(iteration_field) max_iterations = progress.resolve_analysis_cap( current_state, iteration_field, pipeline_policy, ) if not progress.should_skip_analysis_reentry(current_iteration, max_iterations): break next_target = resolve_next_phase(current_target, "success", pipeline_policy) skipped.append( ExhaustedAnalysisSkip( phase=current_target, target_phase=next_target, iteration_field=iteration_field, iteration_value=current_iteration, max_iterations=max_iterations, ) ) current_state = current_state.with_loop_iteration(iteration_field, 0).copy_with( review_outcome=None ) current_target = next_target return ExhaustedAnalysisBypassResult( state=current_state, target_phase=current_target, skipped=tuple(skipped), )
[docs] def resolve_post_commit_phase( state: PipelineState, pipeline_policy: PipelinePolicy, ) -> PipelinePhase: """Resolve next phase for a successful commit with optional budget guards. Routing is driven by post_commit_routes in policy, matched by phase name and budget_state. This works for any commit-role phase, not just the canonical development_commit/review_commit names. """ phase_def = pipeline_policy.phases.get(state.phase) is_commit_phase = phase_def is not None and phase_def.role == "commit" if is_commit_phase: budget_state = _compute_budget_state(state, pipeline_policy) if budget_state is not None: for route in pipeline_policy.post_commit_routes: if route.when.phase == state.phase and route.when.budget_state == budget_state: return route.target return resolve_next_phase(state.phase, "success", pipeline_policy)
def _compute_budget_state(state: PipelineState, pipeline_policy: PipelinePolicy) -> str | None: """Determine the budget_state label for the current commit phase. Uses only policy-declared counter names from pipeline.budget_counters. Counter identity is fully generic: any counter name declared with tracks_budget=True is eligible. Returns: 'remaining' — the phase's own budget counter still has budget left 'exhausted' — this counter is at 0 but another tracked counter has budget 'no_review' — this counter is at 0 and no other tracked counter has budget None — no tracked budget counter governs this phase """ lifecycle = pipeline_policy.lifecycle_phases.get(state.phase) if lifecycle is not None: counter = lifecycle.increments_counter else: phase_def = pipeline_policy.phases.get(state.phase) if phase_def is None or phase_def.commit_policy is None: return None counter = ( phase_def.commit_policy.route_counter or phase_def.commit_policy.increments_counter ) if not counter or counter == "none": return None tracked_cfg = pipeline_policy.budget_counters.get(counter) if tracked_cfg is None or not tracked_cfg.tracks_budget: return None if state.get_budget_remaining(counter) > 0: return "remaining" for other_name, other_cfg in pipeline_policy.budget_counters.items(): if ( other_name != counter and other_cfg.tracks_budget and state.get_budget_remaining(other_name) > 0 ): return "exhausted" return "no_review" __all__ = [ "ExhaustedAnalysisBypassResult", "ExhaustedAnalysisSkip", "Handoff", "resolve_exhausted_analysis_bypass", "resolve_next_phase", "resolve_phase_drain", "resolve_post_commit_phase", ]