"""Canonical workflow progress accounting.
This module owns all mutations for workflow progress fields, including completed
outer progress counters, inner analysis-loop counters, routing budgets, review-issue
flags tied to progress boundaries, and checkpoint-facing progress mirrors derived
from canonical state and active policy.
Contract:
* Outer progress counters are named by policy (via budget_counters); the runtime
never assumes a specific counter name.
* Analysis loopbacks mutate only the inner loop counter for the current cycle or pass.
* Capped analysis loopback preserves outer progress and carries the inner loop counter
to the cap until analysis or commit outcome resets it.
* Skipped commits honour the commit_policy.skipped_advances_progress flag; when
true (the default), a skip still advances outer progress and ends the current inner
loop, so routing can distinguish a consumed-but-skipped iteration from one that never ran.
* Checkpoint mirrors derive from canonical ``PipelineState`` and policy-declared budget
counters: the first budget-tracked counter (in commit-phase BFS order) maps to
``actual_developer_runs``; the second maps to ``actual_reviewer_runs``.
"""
from __future__ import annotations
from dataclasses import replace
from typing import TYPE_CHECKING
from ralph.pipeline.handoffs import resolve_phase_drain
from ralph.pipeline.state import CommitState, PipelineState
from ralph.policy.models import (
LifecyclePhasePolicy,
PhaseCommitPolicy,
PhaseDefinition,
PhaseLoopPolicy,
)
if TYPE_CHECKING:
from ralph.checkpoint.run_context import RunContext
from ralph.config.enums import PipelinePhase
from ralph.policy.models import PipelinePolicy
[docs]
def review_issues_found(state: PipelineState, policy: PipelinePolicy | None = None) -> bool:
"""Return True when the current review outcome indicates issues were found.
When policy is provided, checks the active review phase's clean_outcome to
determine whether the stored review_outcome represents an issues-found state.
When policy is None or no review phase with clean_outcome is declared, falls
back to checking whether review_outcome is non-None.
"""
if state.review_outcome is None:
return False
if policy is not None:
for phase_def in policy.phases.values():
if (
isinstance(phase_def, PhaseDefinition)
and phase_def.role == "review"
and phase_def.clean_outcome is not None
):
return state.review_outcome != phase_def.clean_outcome
return True
[docs]
def resolve_analysis_cap(
state: PipelineState,
iteration_field: str,
policy: PipelinePolicy,
) -> int:
"""Resolve the effective analysis cap from canonical state/policy sources."""
cap_value = state.loop_caps.get(iteration_field)
if cap_value is not None:
return cap_value
if iteration_field in policy.loop_counters:
return policy.loop_counters[iteration_field].default_max
msg = (
f"Analysis loop counter '{iteration_field}' is not declared in pipeline.loop_counters. "
"Analysis caps must come from a declared loop counter."
)
raise ValueError(msg)
[docs]
def is_final_analysis_iteration(current_iteration: int, max_iterations: int) -> bool:
"""Return True when the current analysis state should be treated as final.
This intentionally matches the user-facing label semantics.
"""
return current_iteration >= max_iterations - 1
[docs]
def should_skip_analysis_reentry(current_iteration: int, max_iterations: int) -> bool:
"""Return True when the *next* attempt to enter analysis must be skipped.
``current_iteration`` stores completed loopbacks, while the visible FINAL label is
rendered for the current run. That means re-entry should skip only *after* the
final labeled run has already happened.
"""
return is_final_analysis_iteration(current_iteration - 1, max_iterations)
[docs]
def advance_phase(
state: PipelineState,
target_phase: PipelinePhase,
*,
policy: PipelinePolicy | None,
) -> PipelineState:
"""Advance phases while applying only canonical routing-budget bookkeeping."""
if policy is None:
msg = (
f"advance_phase requires PipelinePolicy to advance to '{target_phase}'; "
"pass the active pipeline policy to resolve drain and commit-role semantics"
)
raise ValueError(msg)
updates: dict[str, object] = {
"phase": target_phase,
"previous_phase": state.phase,
"last_agent_session_id": None,
"session_preserve_retry_pending": False,
}
phase_def = policy.phases.get(target_phase)
is_commit_phase = phase_def is not None and phase_def.role == "commit"
if is_commit_phase:
updates["commit"] = CommitState()
updates["current_drain"] = resolve_phase_drain(target_phase, policy)
return state.copy_with(**updates)
[docs]
def apply_analysis_success(
state: PipelineState,
advanced_state: PipelineState,
*,
policy: PipelinePolicy | None = None,
) -> PipelineState:
"""Reset inner-loop progress when analysis exits successfully to commit/approval."""
result = advanced_state.copy_with(review_outcome=None)
if policy is not None:
phase_def = policy.phases.get(state.phase)
if phase_def is not None and isinstance(phase_def.loop_policy, PhaseLoopPolicy):
iteration_field = phase_def.loop_policy.iteration_state_field
result = result.with_loop_iteration(iteration_field, 0)
return result
[docs]
def apply_analysis_loopback(
state: PipelineState,
advanced_state: PipelineState,
iteration_field: str,
*,
max_iterations: int,
review_outcome: str | None = None,
) -> PipelineState:
"""Apply canonical loopback bookkeeping for an analysis phase."""
clamped = max(0, min(state.get_loop_iteration(iteration_field) + 1, max_iterations))
result = advanced_state.with_loop_iteration(iteration_field, clamped)
if review_outcome is not None:
result = result.copy_with(review_outcome=review_outcome)
return result
[docs]
def apply_commit_outcome(
state: PipelineState,
advanced_state: PipelineState,
*,
skipped: bool,
policy: PipelinePolicy | None = None,
) -> PipelineState:
"""Apply canonical outer-progress semantics for commit success vs skip.
Policy is required. Lifecycle-owned accounting takes precedence when the
current phase is declared in pipeline.lifecycle_phases. Otherwise commit_policy
drives the legacy fallback behavior.
"""
if policy is None:
msg = f"apply_commit_outcome requires PipelinePolicy for commit-role phase {state.phase!r}"
raise ValueError(msg)
lifecycle = policy.lifecycle_phases.get(state.phase)
if lifecycle is not None:
return _apply_lifecycle_completion_policy(state, advanced_state, lifecycle)
phase_def = policy.phases.get(state.phase)
if phase_def is not None and phase_def.commit_policy is not None:
return _apply_commit_outcome_policy_driven(
state, advanced_state, skipped, phase_def.commit_policy
)
return advanced_state
def _apply_lifecycle_completion_policy(
state: PipelineState,
advanced_state: PipelineState,
lifecycle: LifecyclePhasePolicy,
) -> PipelineState:
"""Apply lifecycle-owned counter and loop-reset semantics."""
result = advanced_state
for field_name in lifecycle.loop_resets:
result = result.with_loop_iteration(field_name, 0)
counter = lifecycle.increments_counter
if counter is None or counter == "none":
return result
return result.with_outer_progress(counter, state.get_outer_progress(counter) + 1)
def _apply_commit_outcome_policy_driven(
state: PipelineState,
advanced_state: PipelineState,
skipped: bool,
commit_policy: object,
) -> PipelineState:
"""Apply commit outcome using policy-declared commit_policy."""
if not isinstance(commit_policy, PhaseCommitPolicy):
return advanced_state
# Reset loop counters declared in loop_resets via with_loop_iteration
# (handles both legacy typed fields and custom dict-based fields)
result = advanced_state
for field_name in commit_policy.loop_resets:
result = result.with_loop_iteration(field_name, 0)
counter = commit_policy.increments_counter
if counter is None:
return result
advances = not skipped or commit_policy.skipped_advances_progress
if advances:
return result.with_outer_progress(
counter,
state.get_outer_progress(counter) + 1,
)
return result
[docs]
def apply_budget_counter_increment(
state: PipelineState,
advanced_state: PipelineState,
counter: str | None,
) -> PipelineState:
"""Increment a policy-declared budget counter when a lifecycle route completes."""
if counter is None or counter == "none":
return advanced_state
return advanced_state.with_outer_progress(counter, state.get_outer_progress(counter) + 1)
def _tracked_budget_counters_in_commit_order(policy: PipelinePolicy) -> list[str]:
"""Return tracked budget counter names in the order their commit phases appear in BFS."""
phases = policy.phases
visited: set[str] = set()
queue: list[str] = [policy.entry_phase]
result: list[str] = []
seen: set[str] = set()
while queue:
current = queue.pop(0)
if current in visited or current not in phases:
continue
visited.add(current)
phase_def = phases[current]
counter: str | None = None
lifecycle = policy.lifecycle_phases.get(current)
if lifecycle is not None:
counter = lifecycle.increments_counter
elif phase_def.role == "commit" and phase_def.commit_policy is not None:
counter = phase_def.commit_policy.increments_counter
if (
counter
and counter != "none"
and counter not in seen
and counter in policy.budget_counters
and policy.budget_counters[counter].tracks_budget
):
result.append(counter)
seen.add(counter)
t = phase_def.transitions
next_phases: list[str] = [
ph for ph in [t.on_success, t.on_failure, t.on_loopback] if ph and ph not in visited
]
next_phases.extend(
dr.target for dr in phase_def.decisions.values() if dr.target not in visited
)
next_phases.extend(tgt for tgt in phase_def.bypass_routes.values() if tgt not in visited)
queue.extend(next_phases)
return result
[docs]
def derive_run_context_progress(
state: PipelineState,
run_context: RunContext,
policy: PipelinePolicy | None = None,
) -> RunContext:
"""Derive checkpoint-facing progress mirrors from canonical pipeline state.
Resolves counter names by BFS through commit phases in the active policy;
the first budget-tracked counter maps to actual_developer_runs, the second to
actual_reviewer_runs. When policy is None, both fields are set to 0.
"""
if policy is not None:
tracked = _tracked_budget_counters_in_commit_order(policy)
dev_counter = tracked[0] if len(tracked) > 0 else None
rev_counter = tracked[1] if len(tracked) > 1 else None
return replace(
run_context,
actual_developer_runs=state.get_outer_progress(dev_counter) if dev_counter else 0,
actual_reviewer_runs=state.get_outer_progress(rev_counter) if rev_counter else 0,
)
return replace(
run_context,
actual_developer_runs=0,
actual_reviewer_runs=0,
)