Source code for ralph.display.pipeline_snapshot
"""Immutable pipeline state snapshot for transcript rendering."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from datetime import datetime
from ralph.display.budget_progress import BudgetProgress
from ralph.display.worker_snapshot import WorkerSnapshot
[docs]
@dataclass(frozen=True, slots=True)
class PipelineSnapshot:
"""Immutable pipeline state snapshot for transcript rendering."""
phase: str
previous_phase: str | None
review_issues_found: bool
interrupted_by_user: bool
last_error: str | None
pr_url: str | None
push_count: int
total_agent_calls: int
total_continuations: int
total_fallbacks: int
total_retries: int
workers: tuple[WorkerSnapshot, ...]
prompt_path: str | None
prompt_preview: tuple[str, ...]
run_id: str | None
created_at: datetime
plan_summary: str | None = None
plan_scope_items: tuple[str, ...] = ()
plan_total_steps: int = 0
plan_current_step: int | None = None
plan_risks: tuple[str, ...] = ()
active_agent: str | None = None
active_tool: str | None = None
active_path: str | None = None
active_unit_id: str | None = None
active_workdir: str | None = None
active_command: str | None = None
active_pattern: str | None = None
last_activity_line: str | None = None
waiting_status_line: str | None = None
analysis_phase: str | None = None
analysis_decision: str | None = None
analysis_reason: str | None = None
decision_log: tuple[tuple[str, str, str, str], ...] = field(default_factory=tuple)
recovery_cycle_count: int = 0
recovery_cycle_cap: int = 200
fallover_history: tuple[tuple[str, str, str, str], ...] = field(default_factory=tuple)
last_failure_category: str | None = None
last_connectivity_state: str = "unknown"
is_terminal_success: bool = False
is_terminal_failure: bool = False
current_phase_role: str | None = None
previous_phase_role: str | None = None
terminal_failure_route: str | None = None
budget_progress: dict[str, BudgetProgress] = field(default_factory=dict)
outer_dev_iteration: int | None = None
mcp_restart_count: int = 0
active_process_labels: tuple[str, ...] = ()