"""Immutable pipeline snapshot models.
This module projects pipeline state into a presentation-agnostic data shape
consumed by display panels and subscribers.
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import TYPE_CHECKING
from ralph.display.budget_progress import BudgetProgress
from ralph.display.pipeline_snapshot import PipelineSnapshot
from ralph.display.worker_snapshot import WorkerSnapshot
from ralph.pipeline.progress import review_issues_found as _review_issues_found
from ralph.pipeline.worker_state import WorkerState, WorkerStatus
if TYPE_CHECKING:
from ralph.pipeline.state import PipelineState
from ralph.policy.models import PipelinePolicy
_STATUS_TO_SEMANTIC: dict[str, str] = {
"PENDING": "pending",
"RUNNING": "running",
"SUCCEEDED": "success",
"FAILED": "error",
"CANCELLED": "skipped",
}
[docs]
@dataclass(frozen=True)
class SnapshotContext:
"""Display context for building a PipelineSnapshot from a PipelineState.
All fields are optional so callers can populate only what they know.
"""
prompt_path: str | None = None
prompt_preview: tuple[str, ...] = ()
run_id: str | None = None
pipeline_policy: PipelinePolicy | None = None
plan_summary: str | None = None
plan_scope_items: tuple[str, ...] = ()
plan_total_steps: int = 0
plan_current_step: int | None = None
plan_risks: tuple[str, ...] = ()
active_agent: str | None = None
active_tool: str | None = None
active_path: str | None = None
active_unit_id: str | None = None
active_workdir: str | None = None
active_command: str | None = None
active_pattern: str | None = None
last_activity_line: str | None = None
waiting_status_line: str | None = None
analysis_phase: str | None = None
analysis_decision: str | None = None
analysis_reason: str | None = None
decision_log: tuple[tuple[str, str, str, str], ...] = ()
mcp_restart_count: int = 0
active_process_labels: tuple[str, ...] = ()
[docs]
def snapshot_from_state(
state: PipelineState,
context: SnapshotContext | None = None,
) -> PipelineSnapshot:
"""Project PipelineState into an immutable pipeline snapshot."""
effective_context = context or SnapshotContext()
created_at = datetime.now(UTC)
workers = _snapshot_workers(state)
pipeline_policy = effective_context.pipeline_policy
fallover_tuples = tuple(
(fo.phase, fo.from_agent, fo.to_agent, fo.timestamp_iso) for fo in state.fallover_history
)
is_terminal_success = False
is_terminal_failure = False
current_phase_role: str | None = None
previous_phase_role: str | None = None
terminal_failure_route: str | None = None
if pipeline_policy is not None:
phase_def = pipeline_policy.phases.get(state.phase)
prev_def = (
pipeline_policy.phases.get(state.previous_phase) if state.previous_phase else None
)
if phase_def is not None:
current_phase_role = phase_def.role
is_terminal_success = (
phase_def.role == "terminal" and phase_def.terminal_outcome == "success"
) or state.phase == pipeline_policy.terminal_phase
is_terminal_failure = (
phase_def.role == "terminal" and phase_def.terminal_outcome == "failure"
)
if prev_def is not None:
previous_phase_role = prev_def.role
for _pname, _pdef in pipeline_policy.phases.items():
if _pdef.role == "terminal" and _pdef.terminal_outcome == "failure":
terminal_failure_route = _pname
break
if terminal_failure_route is None:
terminal_failure_route = pipeline_policy.recovery.failed_route
budget_progress: dict[str, BudgetProgress] = {}
if pipeline_policy is not None:
for bp_name, bp_cfg in pipeline_policy.budget_counters.items():
budget_progress[bp_name] = BudgetProgress(
completed=state.get_outer_progress(bp_name),
cap=state.get_budget_cap(bp_name),
description=bp_cfg.description or bp_name,
tracks_budget=bp_cfg.tracks_budget,
)
else:
for bp_name, cap in state.budget_caps.items():
budget_progress[bp_name] = BudgetProgress(
completed=state.get_outer_progress(bp_name),
cap=cap,
description=bp_name,
tracks_budget=False,
)
return PipelineSnapshot(
phase=state.phase,
previous_phase=state.previous_phase,
review_issues_found=_review_issues_found(state, pipeline_policy),
interrupted_by_user=state.interrupted_by_user,
last_error=state.last_error,
pr_url=state.pr_url,
push_count=state.push_count,
total_agent_calls=state.metrics.total_agent_calls,
total_continuations=state.metrics.total_continuations,
total_fallbacks=state.metrics.total_fallbacks,
total_retries=state.metrics.total_retries,
workers=workers,
prompt_path=effective_context.prompt_path,
prompt_preview=effective_context.prompt_preview,
run_id=effective_context.run_id,
created_at=created_at,
plan_summary=effective_context.plan_summary,
plan_scope_items=effective_context.plan_scope_items,
plan_total_steps=effective_context.plan_total_steps,
plan_current_step=effective_context.plan_current_step,
plan_risks=effective_context.plan_risks,
active_agent=effective_context.active_agent,
active_tool=effective_context.active_tool,
active_path=effective_context.active_path,
active_unit_id=effective_context.active_unit_id,
active_workdir=effective_context.active_workdir,
active_command=effective_context.active_command,
active_pattern=effective_context.active_pattern,
last_activity_line=effective_context.last_activity_line,
waiting_status_line=effective_context.waiting_status_line,
analysis_phase=effective_context.analysis_phase,
analysis_decision=effective_context.analysis_decision,
analysis_reason=effective_context.analysis_reason,
decision_log=tuple(effective_context.decision_log),
recovery_cycle_count=state.recovery_cycle_count,
recovery_cycle_cap=state.recovery_cycle_cap,
fallover_history=fallover_tuples,
last_failure_category=state.last_failure_category,
last_connectivity_state=state.last_connectivity_state,
is_terminal_success=is_terminal_success,
is_terminal_failure=is_terminal_failure,
current_phase_role=current_phase_role,
previous_phase_role=previous_phase_role,
terminal_failure_route=terminal_failure_route,
budget_progress=budget_progress,
outer_dev_iteration=next(
(
bp.completed
for bp in budget_progress.values()
if bp.tracks_budget and isinstance(bp.completed, int) and bp.completed > 0
),
None,
),
mcp_restart_count=effective_context.mcp_restart_count,
active_process_labels=effective_context.active_process_labels,
)
def _snapshot_workers(state: PipelineState) -> tuple[WorkerSnapshot, ...]:
worker_states = state.worker_states
seen: set[str] = set()
snapshots: list[WorkerSnapshot] = []
for work_unit in state.work_units:
worker = worker_states.get(work_unit.unit_id)
if worker is None:
worker = WorkerState(unit_id=work_unit.unit_id)
snapshots.append(_snapshot_worker(work_unit.description, worker))
seen.add(work_unit.unit_id)
for unit_id, worker in worker_states.items():
if unit_id in seen:
continue
snapshots.append(_snapshot_worker(worker.unit_id, worker))
return tuple(snapshots)
def _snapshot_worker(description: str, worker: WorkerState) -> WorkerSnapshot:
status = worker.status.value if isinstance(worker.status, WorkerStatus) else str(worker.status)
return WorkerSnapshot(
unit_id=worker.unit_id,
description=description,
status=status,
status_semantic=_STATUS_TO_SEMANTIC.get(status, "info"),
started_at=worker.started_at,
finished_at=worker.finished_at,
elapsed_s=_elapsed_seconds(worker),
exit_code=worker.exit_code,
error_message=worker.error_message,
)
def _elapsed_seconds(worker: WorkerState) -> float:
if worker.started_at is None:
return 0.0
if worker.finished_at is not None:
return (worker.finished_at - worker.started_at).total_seconds()
return (datetime.now(UTC) - worker.started_at).total_seconds()
__all__ = [
"BudgetProgress",
"PipelineSnapshot",
"SnapshotContext",
"WorkerSnapshot",
"snapshot_from_state",
]