Source code for ralph.display.subscriber

"""Queue-backed asyncio+threading-safe state→snapshot bridge."""

from __future__ import annotations

import threading
from datetime import UTC, datetime
from queue import Full, Queue
from typing import TYPE_CHECKING

from loguru import logger

from ralph.agents.idle_watchdog import WaitingStatusEvent, WaitingStatusKind
from ralph.display.artifact_reader import (
    PlanSummary,
    read_latest_analysis_decision,
    read_plan_artifact,
)
from ralph.display.lifecycle_filter import is_bare_lifecycle
from ralph.display.prompt_reader import find_prompt_path, read_prompt_preview
from ralph.display.snapshot import PipelineSnapshot, SnapshotContext, snapshot_from_state
from ralph.policy.models import ROLE_REVIEW
from ralph.process.manager import get_process_manager

if TYPE_CHECKING:
    from collections.abc import Callable
    from pathlib import Path

    from ralph.pipeline.state import PipelineState
    from ralph.policy.models import PipelinePolicy


_DECISION_LOG_MAX = 16


def _format_waiting_status_line(event: object) -> str:
    """Build the human-readable line for a WaitingStatusEvent."""
    assert isinstance(event, WaitingStatusEvent)
    cum = f"{event.cumulative_seconds:.0f}"
    ceil = f"{event.ceiling_seconds:.0f}"
    run = f"{event.current_run_seconds:.0f}"
    if event.kind == WaitingStatusKind.ENTERED:
        return f"Background child work started waiting (cumulative={cum}s, ceiling={ceil}s)"
    if event.kind == WaitingStatusKind.PROGRESS:
        delta = event.diagnostic.get("workspace_event_delta")
        alive_by = event.diagnostic.get("alive_by")
        parts = [f"run={run}s", f"cumulative={cum}s", f"ceiling={ceil}s"]
        if delta is not None:
            parts.append(f"workspace_events_since_wait={delta}")
        if alive_by is not None:
            parts.append(f"alive_by={alive_by}")
        return f"Background child work still active ({', '.join(parts)})"
    if event.kind == WaitingStatusKind.SUSPECTED_FROZEN:
        evidence = str(event.diagnostic.get("evidence", "unknown"))
        alive_by = event.diagnostic.get("alive_by")
        suffix = f", alive_by={alive_by}" if alive_by is not None else ""
        return (
            f"Background child work may be frozen"
            f" (cumulative={cum}s, ceiling={ceil}s, evidence={evidence}{suffix})"
        )
    if event.kind == WaitingStatusKind.EXITED:
        return f"Background child work resumed activity (run={run}s, cumulative={cum}s)"
    scoped = event.diagnostic.get("scoped_child_active", "?")
    oldest_val = event.diagnostic.get("oldest_child_seconds")
    oldest_part = (
        f", oldest_child_seconds={float(oldest_val):.0f}s" if oldest_val is not None else ""
    )
    return (
        f"Background child work hit hard ceiling"
        f" (cumulative={cum}s, ceiling={ceil}s,"
        f" scoped_child_active={scoped}{oldest_part})"
    )


def _now_iso() -> str:
    return datetime.now(UTC).isoformat()


[docs] class PipelineSubscriber: """Receives PipelineState after each reducer reduce and enqueues a PipelineSnapshot. Thread and asyncio safe: notify() only calls put_nowait() which is documented as thread-safe and never blocks. Prompt preview and the plan artifact are read once at construction and cached for the lifetime of the subscriber. The subscriber additionally exposes record_activity and record_analysis to receive lightweight presentation events that should flow into the same snapshot queue without breaking the notify(state) contract. """ def __init__( self, *, queue: Queue[PipelineSnapshot], workspace_root: Path, run_id: str, _prompt_reader: Callable[[Path], tuple[str, ...]] = read_prompt_preview, on_snapshot: Callable[[PipelineSnapshot], None] | None = None, pipeline_policy: PipelinePolicy | None = None, ) -> None: self._queue = queue self._run_id = run_id self._workspace_root = workspace_root self._dropped_count = 0 self._lock = threading.Lock() self._on_snapshot = on_snapshot self._pipeline_policy: PipelinePolicy | None = pipeline_policy prompt_path = find_prompt_path(workspace_root) self._prompt_path: str | None = str(prompt_path) if prompt_path is not None else None self._prompt_preview: tuple[str, ...] = ( _prompt_reader(prompt_path) if prompt_path is not None else () ) plan = read_plan_artifact(workspace_root) or PlanSummary() self._plan_summary: str | None = plan.summary self._plan_scope_items: tuple[str, ...] = plan.scope_items self._plan_total_steps: int = plan.total_steps self._plan_risks: tuple[str, ...] = plan.risks_mitigations self._last_plan_refresh_marker: int | None = self._plan_refresh_marker() self._previous_phase: str | None = None self._active_agent: str | None = None self._active_tool: str | None = None self._active_path: str | None = None self._active_unit_id: str | None = None self._active_workdir: str | None = None self._active_command: str | None = None self._active_pattern: str | None = None self._last_activity_line: str | None = None self._waiting_status_line: str | None = None self._analysis_phase: str | None = None self._analysis_decision: str | None = None self._analysis_reason: str | None = None self._decision_log: list[tuple[str, str, str, str]] = [] self._mcp_restart_count: int = 0 self._last_state: PipelineState | None = None self._snapshot_cache_state: PipelineState | None = None self._snapshot_cache: PipelineSnapshot | None = None @property def queue(self) -> Queue[PipelineSnapshot]: return self._queue def _invalidate_snapshot_cache_locked(self) -> None: self._snapshot_cache_state = None self._snapshot_cache = None @property def dropped_count(self) -> int: return self._dropped_count @property def decision_log(self) -> tuple[tuple[str, str, str, str], ...]: with self._lock: return tuple(self._decision_log) @property def plan_risks(self) -> tuple[str, ...]: with self._lock: return self._plan_risks @property def last_state(self) -> PipelineState | None: with self._lock: return self._last_state @property def run_id(self) -> str: return self._run_id @property def pipeline_policy(self) -> PipelinePolicy | None: return self._pipeline_policy @property def last_tool_name(self) -> str | None: """The most recently recorded tool name.""" with self._lock: return self._active_tool @property def last_tool_path(self) -> str | None: """The most recently recorded tool path argument.""" with self._lock: return self._active_path @property def waiting_status_line(self) -> str | None: """The current waiting-status line for debug breadcrumbs.""" with self._lock: return self._waiting_status_line
[docs] def notify(self, state: PipelineState) -> None: """Build a PipelineSnapshot from state and enqueue it non-blocking. Never blocks. On queue.Full, increments dropped_count silently. Safe to call from both sync (runner.py) and async (coordinator.py) contexts. """ with self._lock: self._record_state_transitions_locked(state) self._refresh_plan_from_disk_locked(state) self._refresh_analysis_for_phase_change_locked(state) self._active_agent = state.current_agent() or self._active_agent self._last_state = state snapshot = self._build_snapshot_locked(state) if snapshot is not None: self._publish(snapshot)
[docs] def build_snapshot(self, state: PipelineState) -> PipelineSnapshot | None: """Project the subscriber's accumulated state into a snapshot. Read-only: does not mutate any internal state. Safe to call from external code (such as the end-of-run summary path) without breaking the notify(state) contract. """ with self._lock: return self._build_snapshot_locked(state)
def record_activity( self, *, unit_id: str, line: str, agent_name: str = "", tool_name: str | None = None, path: str | None = None, workdir: str | None = None, command: str | None = None, pattern: str | None = None, ) -> None: with self._lock: self._invalidate_snapshot_cache_locked() self._active_unit_id = unit_id self._active_agent = agent_name or self._active_agent if tool_name is not None: self._active_tool = tool_name if path: self._active_path = path if workdir: self._active_workdir = workdir if command: self._active_command = command if pattern: self._active_pattern = pattern # Never store bare lifecycle markers as the last activity line — # they carry no user payload and would overwrite a richer previous value. if line and not is_bare_lifecycle(line): self._last_activity_line = line snapshot = self._build_snapshot_locked(self._last_state) if snapshot is not None: self._publish(snapshot)
[docs] def record_waiting_status( self, event: object, *, unit_id: str | None = None, agent_name: str | None = None, ) -> None: """Record a waiting-status event from IdleWatchdog and push a fresh snapshot.""" if not isinstance(event, WaitingStatusEvent): return line = _format_waiting_status_line(event) snapshots_to_publish: list[PipelineSnapshot] = [] with self._lock: self._invalidate_snapshot_cache_locked() if unit_id is not None: self._active_unit_id = unit_id if agent_name is not None: self._active_agent = agent_name self._waiting_status_line = line if event.kind in (WaitingStatusKind.SUSPECTED_FROZEN, WaitingStatusKind.HARD_STOP): self._append_decision_log_locked( phase=self._previous_phase or "unknown", decision=event.kind.name, reason=line, ) snapshot = self._build_snapshot_locked(self._last_state) if snapshot is not None: snapshots_to_publish.append(snapshot) if event.kind == WaitingStatusKind.EXITED: self._waiting_status_line = None cleared = self._build_snapshot_locked(self._last_state) if cleared is not None: snapshots_to_publish.append(cleared) for s in snapshots_to_publish: self._publish(s)
[docs] def record_analysis(self, phase: str, decision: str, reason: str | None = None) -> None: """Record an analysis result; updates the analysis panel and decision log.""" with self._lock: self._invalidate_snapshot_cache_locked() self._analysis_phase = phase self._analysis_decision = decision self._analysis_reason = reason self._append_decision_log_locked( phase=phase, decision=decision, reason=reason or "", ) snapshot = self._build_snapshot_locked(self._last_state) if snapshot is not None: self._publish(snapshot)
[docs] def record_mcp_restart(self, restart_count: int) -> None: """Record the current MCP server restart count and push a fresh snapshot.""" with self._lock: self._mcp_restart_count = restart_count snapshot = self._build_snapshot_locked(self._last_state) if snapshot is not None: self._publish(snapshot)
[docs] def record_permission_prompt_action( self, *, agent_name: str, prompt_summary: str, selected_option: str, ) -> None: """Record an auto-answered permission prompt for visibility and auditing.""" line = f"Ralph auto-answered permission prompt: {prompt_summary}{selected_option}" with self._lock: self._invalidate_snapshot_cache_locked() self._active_agent = agent_name or self._active_agent self._last_activity_line = line self._append_decision_log_locked( phase=self._previous_phase or "unknown", decision="permission_prompt_auto_answered", reason=f"{prompt_summary} -> {selected_option}", ) snapshot = self._build_snapshot_locked(self._last_state) if snapshot is not None: self._publish(snapshot)
def _record_state_transitions_locked(self, state: PipelineState) -> None: prev = self._previous_phase cur = state.phase if prev is None or prev == cur: self._previous_phase = cur return policy = self._pipeline_policy prev_role = policy.phases[prev].role if policy and prev in policy.phases else None cur_role = policy.phases[cur].role if policy and cur in policy.phases else None if prev_role == "analysis": # Determine whether the transition is a "proceed" (toward commit) or "revise" commit_role = "commit" decision = "proceed" if cur_role == commit_role else "revise" self._append_decision_log_locked( phase=prev, decision=decision, reason=f"-> {cur}", ) elif prev_role == ROLE_REVIEW: # review role: going to execution-role means revise; commit-role means proceed decision = "revise" if cur_role == "execution" else "proceed" self._append_decision_log_locked( phase=prev, decision=decision, reason=f"-> {cur}", ) elif cur_role == "terminal": cur_def = policy.phases[cur] if policy and cur in policy.phases else None terminal_outcome = cur_def.terminal_outcome if cur_def else None decision = terminal_outcome or cur self._append_decision_log_locked( phase=cur, decision=decision, reason=state.last_error or "", ) if ( self._last_state is not None and self._last_state.pr_url is None and state.pr_url is not None ): self._append_decision_log_locked( phase=state.phase, decision="pr_opened", reason=state.pr_url, ) self._previous_phase = cur def _refresh_plan_from_disk_locked(self, state: PipelineState) -> None: del state marker = self._plan_refresh_marker() if marker == self._last_plan_refresh_marker: return self._invalidate_snapshot_cache_locked() plan = read_plan_artifact(self._workspace_root) or PlanSummary() self._plan_summary = plan.summary self._plan_scope_items = plan.scope_items self._plan_total_steps = plan.total_steps self._plan_risks = plan.risks_mitigations self._last_plan_refresh_marker = marker def _plan_refresh_marker(self) -> int | None: plan_path = self._workspace_root / ".agent" / "artifacts" / "plan.json" try: return plan_path.stat().st_mtime_ns except OSError: return None def _refresh_analysis_for_phase_change_locked(self, state: PipelineState) -> None: prev = self._previous_phase cur = state.phase if prev == cur: return policy = self._pipeline_policy # Refresh analysis artifact when entering or leaving an analysis-role phase for phase_name in (cur, prev): if phase_name is None: continue phase_def = policy.phases.get(phase_name) if policy else None if phase_def is not None and phase_def.role == "analysis": summary = read_latest_analysis_decision(self._workspace_root, phase_name) if summary is not None: self._analysis_phase = phase_name self._analysis_decision = summary.decision self._analysis_reason = summary.reason def _append_decision_log_locked( self, *, phase: str, decision: str, reason: str, ) -> None: self._invalidate_snapshot_cache_locked() self._decision_log.append((phase, decision, reason, _now_iso())) if len(self._decision_log) > _DECISION_LOG_MAX: self._decision_log = self._decision_log[-_DECISION_LOG_MAX:] def _build_snapshot_locked( self, state: PipelineState | None, ) -> PipelineSnapshot | None: if state is None: return None if self._snapshot_cache_state is state and self._snapshot_cache is not None: return self._snapshot_cache snapshot = snapshot_from_state( state, SnapshotContext( prompt_path=self._prompt_path, prompt_preview=self._prompt_preview, run_id=self._run_id, pipeline_policy=self._pipeline_policy, plan_summary=self._plan_summary, plan_scope_items=self._plan_scope_items, plan_total_steps=self._plan_total_steps, plan_risks=self._plan_risks, active_agent=self._active_agent, active_tool=self._active_tool, active_path=self._active_path, active_unit_id=self._active_unit_id, active_workdir=self._active_workdir, active_command=self._active_command, active_pattern=self._active_pattern, last_activity_line=self._last_activity_line, waiting_status_line=self._waiting_status_line, analysis_phase=self._analysis_phase, analysis_decision=self._analysis_decision, analysis_reason=self._analysis_reason, decision_log=tuple(self._decision_log), mcp_restart_count=self._mcp_restart_count, active_process_labels=tuple( r.label for r in get_process_manager().list_active() if r.label is not None ), ), ) self._snapshot_cache_state = state self._snapshot_cache = snapshot return snapshot def _publish(self, snapshot: PipelineSnapshot) -> None: if self._on_snapshot is not None: try: self._on_snapshot(snapshot) except Exception as exc: logger.bind(event="subscriber_callback_error").error( "callback failed: {err}", err=exc ) self._enqueue(snapshot) def _enqueue(self, snapshot: PipelineSnapshot) -> None: try: self._queue.put_nowait(snapshot) except Full: # Silent backpressure - keep count for end-of-run diagnostics only. # Do NOT emit per-drop logs into the user transcript. self._dropped_count += 1
__all__ = ["PipelineSubscriber"]