Source code for ralph.display.parallel_display

"""Parallel display adapter: always emit log-first, copy-paste-safe transcript lines."""

from __future__ import annotations

import contextlib
import queue
import time
import uuid
from pathlib import Path
from typing import TYPE_CHECKING, Literal, cast

from ralph.display.activity_model import ActivityEventKind
from ralph.display.activity_router import ActivityRouter
from ralph.display.content_condenser import CondenseOptions, condense_content
from ralph.display.context import DisplayContext
from ralph.display.lifecycle_filter import is_bare_lifecycle as _is_bare_lifecycle
from ralph.display.long_content_summary import build_headline_or_placeholder
from ralph.display.plain_renderer import (
    ActivityLineOptions as _ActivityLineOptions,
)
from ralph.display.plain_renderer import (
    PhaseCloseOptions,
    PlainLogRenderer,
)
from ralph.display.plain_renderer import (
    PhaseCounters as _PhaseCounters,
)
from ralph.display.raw_overflow import DEFAULT_MAX_OVERFLOW_FILE_BYTES, RawOverflowLog
from ralph.display.subscriber import PipelineSubscriber
from ralph.display.tool_args import format_tool_input, friendly_tool_name

if TYPE_CHECKING:
    from types import TracebackType

    from rich.console import Console

    from ralph.display.phase_lifecycle import PhaseExitModel
    from ralph.display.phase_status import PhaseIterationContext
    from ralph.display.plain_renderer import RunStartOrientation
    from ralph.display.snapshot import PipelineSnapshot
    from ralph.pipeline.worker_state import WorkerStatus
    from ralph.policy.models import PipelinePolicy

_DEFAULT_SNAPSHOT_QUEUE_MAXSIZE: int = 64
_MAX_OVERFLOW_FILE_BYTES: int = DEFAULT_MAX_OVERFLOW_FILE_BYTES
_DROP_DEBOUNCE_SECONDS: float = 1.0
_NEVER_WARNED: float = float("-inf")


[docs] def strip_markup(line: str) -> str: """Strip Rich markup tags from a line, returning plain text.""" return PlainLogRenderer.strip_markup(line)
[docs] class ParallelDisplay: """Multiplexed terminal display for parallel pipeline workers. Maintains per-worker ``RingBuffer`` instances through an ``ActivityRouter`` and renders them as a live Rich table while agents are running. """ __slots__ = ( "_activity_router", "_ctx", "_drop_last_warned", "_overflow_logs", "_overflow_warned", "_plain_renderer", "_subscriber", "_workspace_root", ) def __init__( self, display_context: DisplayContext, *, subscriber: PipelineSubscriber | None = None, workspace_root: Path | None = None, run_id: str | None = None, pipeline_policy: PipelinePolicy | None = None, ) -> None: if not isinstance(display_context, DisplayContext): raise TypeError("display_context is required") self._ctx = display_context self._plain_renderer = PlainLogRenderer(self._ctx) self._workspace_root: Path = workspace_root if workspace_root is not None else Path.cwd() # Per-unit raw overflow logs, lazy-created on first oversized emit self._overflow_logs: dict[str, RawOverflowLog] = {} # Track units where the 50 MB guard WARN was already emitted self._overflow_warned: set[str] = set() # Per-unit last drop-warning timestamp; _NEVER_WARNED means never warned yet self._drop_last_warned: dict[str, float] = {} self._activity_router: ActivityRouter = ActivityRouter( on_event=self._emit_activity_event, raw_overflow_callback=self._raw_overflow_write, ) if subscriber is not None: self._subscriber = subscriber else: snapshot_q: queue.Queue[PipelineSnapshot] = queue.Queue( maxsize=_DEFAULT_SNAPSHOT_QUEUE_MAXSIZE ) effective_run_id = run_id if run_id is not None else str(uuid.uuid4()) self._subscriber = PipelineSubscriber( queue=snapshot_q, workspace_root=self._workspace_root, run_id=effective_run_id, on_snapshot=self._plain_renderer.emit_snapshot, pipeline_policy=pipeline_policy, ) @property def _console(self) -> Console: return self._ctx.console def _get_overflow_log(self, unit_id: str) -> RawOverflowLog: if unit_id not in self._overflow_logs: self._overflow_logs[unit_id] = RawOverflowLog( self._workspace_root, unit_id, max_bytes=_MAX_OVERFLOW_FILE_BYTES ) return self._overflow_logs[unit_id] def _raw_overflow_write(self, unit_id: str, raw_line: str) -> None: """Write a raw malformed line to the per-unit overflow log for diagnosis.""" overflow = self._get_overflow_log(unit_id) overflow.append(raw_line) def _check_overflow_size(self, unit_id: str, overflow: RawOverflowLog) -> None: """Emit a single WARN and disable the log if it exceeds the size guard.""" if unit_id in self._overflow_warned: return with contextlib.suppress(OSError): if overflow.path.exists() and overflow.path.stat().st_size >= _MAX_OVERFLOW_FILE_BYTES: self._overflow_warned.add(unit_id) overflow.disable() self._plain_renderer.emit_activity_line( unit_id, "progress", f"[overflow log full, raw content for {unit_id} discarded]", ) def _emit_drop_warning(self, unit_id: str) -> None: """Check and emit a debounced WARN for dropped ring-buffer lines.""" buffer = self._activity_router.get_buffer(unit_id) delta = buffer.consume_drop_delta() if delta <= 0: return now = time.monotonic() last = self._drop_last_warned.get(unit_id, _NEVER_WARNED) if now - last < _DROP_DEBOUNCE_SECONDS: return self._drop_last_warned[unit_id] = now self._plain_renderer.emit_warn_line( unit_id, "progress", f"dropped {delta} lines since last flush", ) def _emit_activity_event( self, unit_id: str, kind: ActivityEventKind, content: str | None, raw_ref: str | None, metadata: dict[str, object], ) -> None: text = content or "" tool_signature: tuple[str, str] | None = None if kind is ActivityEventKind.TOOL_USE: original_name = text text = friendly_tool_name(text) input_obj = metadata.get("input") args_str = format_tool_input(input_obj) if args_str: text = f"{text} {args_str}" input_dict: dict[str, object] = ( cast("dict[str, object]", input_obj) if isinstance(input_obj, dict) else {} ) tool_path = str(input_dict.get("path", "") or "") tool_workdir = str(input_dict.get("workdir", "") or "") tool_command = str(input_dict.get("command", "") or "") tool_pattern = str(input_dict.get("pattern", "") or "") tool_signature = (original_name, tool_path) with contextlib.suppress(Exception): self._subscriber.record_activity( unit_id=unit_id, line=text, tool_name=original_name, path=tool_path or None, workdir=tool_workdir or None, command=tool_command or None, pattern=tool_pattern or None, ) overflow = self._get_overflow_log(unit_id) overflow_ref = overflow.relative_reference(self._workspace_root) visible, condensed_flag, summary_line, ai_summary_line = cast( "tuple[str, bool, str | None, str | None]", condense_content( text, options=CondenseOptions( soft_limit=self._ctx.condenser_soft_limit, hard_limit=self._ctx.condenser_hard_limit, summary=True, overflow_ref=overflow_ref, ), ), ) if condensed_flag: self._check_overflow_size(unit_id, overflow) overflow.append(text) effective_summary_line = summary_line if ( kind is ActivityEventKind.TOOL_RESULT and summary_line is None and text.strip() and len(text) >= self._ctx.tool_result_headline_min_chars ): effective_summary_line = build_headline_or_placeholder( text, max_chars=self._ctx.headline_max_chars ) self._plain_renderer.emit_activity_line( unit_id, kind.value, visible, options=_ActivityLineOptions( condensed_ref=overflow_ref if condensed_flag else None, condensed_flag=condensed_flag, summary_line=effective_summary_line, ai_summary_line=ai_summary_line, tool_signature=tool_signature, ), ) self._emit_drop_warning(unit_id) @property def activity_router(self) -> ActivityRouter: return self._activity_router @property def mode(self) -> Literal["compact", "medium", "wide"]: return self._ctx.mode @property def subscriber(self) -> PipelineSubscriber: return self._subscriber def start(self) -> None: return None def stop(self) -> None: self._plain_renderer.flush_blocks()
[docs] def emit(self, unit_id: str | None, line: str) -> None: """Emit a raw line directly to the plain renderer. Bare lifecycle tokens (e.g. prefixed transcript noise) are silently dropped before reaching the renderer. If unit_id is None, defaults to "run". """ if _is_bare_lifecycle(line): return self._plain_renderer.emit_log_line(unit_id or "run", line)
[docs] def emit_parsed_event( self, unit_id: str, kind: ActivityEventKind, content: str | None, metadata: dict[str, object], ) -> None: """Route a pre-parsed agent event through the structured activity path.""" if ( kind in (ActivityEventKind.LIFECYCLE, ActivityEventKind.UNKNOWN) and content is not None and _is_bare_lifecycle(content) ): return self._emit_activity_event(unit_id, kind, content, None, metadata)
def set_status(self, unit_id: str, status: WorkerStatus) -> None: self._plain_renderer.emit_status_line(unit_id, str(status)) def emit_analysis_result( self, phase: str, decision: str, reason: str | None = None, ) -> None: with contextlib.suppress(Exception): self._subscriber.record_analysis(phase, decision, reason)
[docs] def emit_run_start(self, orientation: RunStartOrientation) -> None: """Emit a one-time run-start orientation block at pipeline start.""" with contextlib.suppress(Exception): self._plain_renderer.emit_run_start(orientation)
[docs] def begin_phase(self, phase: str) -> None: """Start timing a new phase and reset its counters.""" with contextlib.suppress(Exception): self._plain_renderer.begin_phase(phase)
@property def last_phase_elapsed_seconds(self) -> float: """Return elapsed time of the most recently closed phase in seconds.""" return self._plain_renderer.last_phase_elapsed_seconds @property def last_phase_counters(self) -> _PhaseCounters | None: """Return the counters from the most recently closed phase, if available. Returns None when no phase has been closed yet. """ return self._plain_renderer.last_phase_counters @property def last_phase_artifact_outcome(self) -> str: """Return the artifact outcome from the most recently closed phase.""" return self._plain_renderer.last_phase_artifact_outcome @property def phase_close_emitted(self) -> bool: """Return True when emit_phase_close_from_exit was called for the current phase.""" return self._plain_renderer.phase_close_emitted
[docs] def record_artifact_outcome(self, outcome: str) -> None: """Record artifact outcome without emitting a log line.""" with contextlib.suppress(Exception): self._plain_renderer.record_artifact_outcome(outcome)
[docs] def emit_phase_close( self, phase: str, produced: str, *, phase_role: str | None = None, iteration_context: PhaseIterationContext | None = None, exit_trigger: str | None = None, ) -> None: """Emit a single-line recap at the end of a phase.""" with contextlib.suppress(Exception): self._plain_renderer.emit_phase_close( phase, produced, options=PhaseCloseOptions( phase_role=phase_role, iteration_context=iteration_context, exit_trigger=exit_trigger, ), )
[docs] def emit_phase_close_from_exit(self, exit_model: PhaseExitModel) -> None: """Emit a phase-close recap from a PhaseExitModel.""" with contextlib.suppress(Exception): self._plain_renderer.emit_phase_close_from_exit(exit_model)
[docs] def emit_run_end( self, *, phase: str, total_agent_calls: int = 0, pr_url: str | None = None, exit_trigger: str | None = None, outer_dev_iteration: int | None = None, ) -> None: """Emit a one-time run-end orientation block at pipeline stop.""" with contextlib.suppress(Exception): self._plain_renderer.emit_run_end( phase=phase, total_agent_calls=total_agent_calls, pr_url=pr_url, exit_trigger=exit_trigger, outer_dev_iteration=outer_dev_iteration, )
@property def console(self) -> Console: """Expose console for external renderers.""" return self._ctx.console def __enter__(self) -> ParallelDisplay: self.start() return self def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: del exc_type, exc_val, exc_tb self.stop()
__all__ = ["ParallelDisplay", "strip_markup"]