"""Activity stream rendering and artifact handoff for the pipeline runner."""
from __future__ import annotations
import json
import shutil
from contextlib import suppress
from dataclasses import dataclass
from importlib import import_module
from typing import TYPE_CHECKING, Protocol, cast
from loguru import logger
from rich.text import Text
from ralph.agents.invoke import extract_session_id
from ralph.agents.parsers import AgentOutputLine, AgentParser, get_parser
from ralph.config.enums import AgentTransport, Verbosity
from ralph.display.activity_router import map_parser_type_to_kind
from ralph.display.artifact_renderer import (
render_analysis_decision,
render_development_artifact,
render_fix_artifact,
render_plan_artifact,
render_review_artifact,
)
from ralph.phases.required_artifacts import resolve_phase_required_artifact
from ralph.pipeline.artifact_handoff_context import ArtifactHandoffContext
from ralph.pipeline.events import PipelineEvent
from ralph.pipeline.legacy_console_display import (
LegacyConsoleDisplay,
emit_display_line,
get_display_context,
subscriber_for_display,
)
if TYPE_CHECKING:
from collections import deque
from collections.abc import Callable, Iterable, Iterator
from pathlib import Path
from ralph.display.artifact_reader import PlanSummary
from ralph.display.context import DisplayContext
from ralph.display.parallel_display import ParallelDisplay
from ralph.display.subscriber import PipelineSubscriber
from ralph.phases.required_artifacts import RequiredArtifact
from ralph.pipeline.events import Event
if TYPE_CHECKING:
class _ReadPlanArtifactFn(Protocol):
def __call__(self, workspace_root: Path) -> PlanSummary | None: ...
class _ParallelDisplayModule(Protocol):
ParallelDisplay: type[ParallelDisplay]
_MAX_TEXT_LENGTH = 200
_MAX_TOOL_INPUT_LENGTH = 120
_MAX_TOOL_RESULT_LENGTH = 150
_MAX_TOOL_RESULT_BRIEF = 80
_TOOL_RESULT_BRIEF_THRESHOLD = 500
_MAX_METADATA_PARTS = 3
_MAX_METADATA_SUMMARY_LENGTH = 120
def _parallel_display_cls() -> type[ParallelDisplay]:
module = cast("_ParallelDisplayModule", import_module("ralph.display.parallel_display"))
return module.ParallelDisplay
def _read_plan_artifact_func() -> _ReadPlanArtifactFn:
module = import_module("ralph.display.artifact_reader")
return cast("_ReadPlanArtifactFn", module.read_plan_artifact)
def _terminal_width() -> int:
return shutil.get_terminal_size().columns or 80
def _available_width(prefix_len: int) -> int:
return max(40, _terminal_width() - prefix_len - 2)
@dataclass(frozen=True)
class _ArtifactRenderCtx:
workspace_root: Path
display_context: DisplayContext
display: ParallelDisplay | LegacyConsoleDisplay | None
verbosity: Verbosity
ra: RequiredArtifact
[docs]
def render_phase_artifact_handoff(
phase: str,
event: Event,
workspace_root: Path,
display: ParallelDisplay | LegacyConsoleDisplay | None,
ctx: ArtifactHandoffContext | None = None,
) -> None:
"""Render the artifact handoff panel after a phase completes."""
_ctx = ctx or ArtifactHandoffContext()
display_ctx = get_display_context(display, _ctx.display_context)
effective_drain = _ctx.drain or phase
required_artifact = (
resolve_phase_required_artifact(
_ctx.policy_bundle.pipeline,
_ctx.policy_bundle.artifacts,
phase=phase,
drain=effective_drain,
)
if _ctx.policy_bundle is not None
else None
)
if required_artifact is None:
if event != PipelineEvent.AGENT_SUCCESS:
return
if _ctx.policy_bundle is not None:
phase_def = _ctx.policy_bundle.pipeline.phases.get(phase)
role = phase_def.role if phase_def is not None else None
if role == "analysis":
render_analysis_decision(workspace_root, effective_drain, display_ctx)
else:
logger.debug(
"policy: no renderer for phase '{}' (role={});"
" skipping artifact handoff render",
phase,
role,
)
return
artifact_type = required_artifact.artifact_type
if artifact_type.endswith("_analysis_decision"):
render_analysis_decision(workspace_root, effective_drain, display_ctx)
return
if event == PipelineEvent.AGENT_SUCCESS:
_render_success_artifact(
artifact_type,
_ArtifactRenderCtx(
workspace_root=workspace_root,
display_context=display_ctx,
display=display,
verbosity=_ctx.verbosity,
ra=required_artifact,
),
)
def _render_success_artifact(artifact_type: str, ctx: _ArtifactRenderCtx) -> None:
def _emit_close(produced: str) -> None:
if ctx.verbosity != Verbosity.QUIET and hasattr(ctx.display, "record_artifact_outcome"):
with suppress(Exception):
cast("ParallelDisplay", ctx.display).record_artifact_outcome(produced)
if artifact_type == "plan":
render_plan_artifact(ctx.workspace_root, ctx.display_context)
with suppress(Exception):
plan = _read_plan_artifact_func()(ctx.workspace_root)
produced = (
f"{plan.total_steps} step(s), {len(plan.risks_mitigations)} risk(s)"
if plan is not None
else "(no plan artifact on disk)"
)
_emit_close(produced)
return
if artifact_type == "development_result":
render_development_artifact(ctx.workspace_root, ctx.display_context)
produced = (
"result produced"
if (ctx.workspace_root / ctx.ra.json_path).exists()
else "no result artifact"
)
_emit_close(produced)
return
if artifact_type == "issues":
render_review_artifact(ctx.workspace_root, ctx.display_context)
with suppress(Exception):
issue_count = _count_issues(ctx.workspace_root / ctx.ra.json_path)
_emit_close(f"{issue_count} issue(s)")
return
if artifact_type == "fix_result":
render_fix_artifact(ctx.workspace_root, ctx.display_context)
_emit_close("applied")
def _count_issues(issues_path: Path) -> int:
if not issues_path.exists():
return 0
try:
issues_text = issues_path.read_text(encoding="utf-8")
issues_data = cast("object", json.loads(issues_text))
content_obj = (
cast("dict[str, object]", issues_data).get("content")
if isinstance(issues_data, dict)
else issues_data
)
issues_list = (
cast("dict[str, object]", content_obj).get("issues")
if isinstance(content_obj, dict)
else content_obj
)
return len(issues_list) if isinstance(issues_list, list) else 0
except Exception:
return 0
[docs]
def stream_parsed_agent_activity(
lines: Iterable[object],
parser_type: str,
agent_name: str,
display: ParallelDisplay | LegacyConsoleDisplay | None = None,
**kwargs: object,
) -> None:
"""Stream and render parsed agent output lines."""
transport = cast("AgentTransport | None", kwargs.get("transport"))
display_context = cast("DisplayContext | None", kwargs.get("display_context"))
raw_output_sink = cast("deque[str] | list[str] | None", kwargs.get("raw_output_sink"))
rendered_output_sink = cast("deque[str] | list[str] | None", kwargs.get("rendered_output_sink"))
session_id_sink = cast("Callable[[str], None] | None", kwargs.get("session_id_sink"))
parser_key = (
"claude_interactive" if transport == AgentTransport.CLAUDE_INTERACTIVE else parser_type
)
parser = _resolve_parser(parser_key)
def _iter_lines() -> Iterator[str]:
for line in lines:
text = str(line)
if raw_output_sink is not None:
raw_output_sink.append(text)
session_id = extract_session_id((text,))
if session_id is not None and session_id_sink is not None:
session_id_sink(session_id)
yield text
parallel_display_cls = _parallel_display_cls()
subscriber = subscriber_for_display(display)
for parsed_line in parser.parse(_iter_lines()):
rendered = _render_agent_activity_line(parsed_line, agent_name)
if rendered is not None and rendered_output_sink is not None:
rendered_output_sink.append(rendered.plain)
if isinstance(display, parallel_display_cls):
kind = map_parser_type_to_kind(parsed_line.type)
display.emit_parsed_event(
agent_name, kind, parsed_line.content, parsed_line.metadata or {}
)
elif rendered is not None:
emit_display_line(display, None, rendered, display_context)
if subscriber is not None:
_record_activity_on_subscriber(subscriber, parsed_line, rendered, agent_name)
def _record_activity_on_subscriber(
subscriber: PipelineSubscriber,
parsed_line: AgentOutputLine,
rendered: Text | None,
agent_name: str,
) -> None:
try:
if parsed_line.type == "thinking" and parsed_line.content.strip():
line_text = parsed_line.content.strip()
else:
line_text = "" if rendered is None else rendered.plain
metadata = parsed_line.metadata
tool_name: str | None = None
metadata_tool = metadata.get("tool")
if isinstance(metadata_tool, str) and metadata_tool.strip():
tool_name = metadata_tool.strip()
elif parsed_line.type == "tool_use":
stripped = parsed_line.content.strip()
if stripped:
tool_name = stripped
path = _format_metadata_value(metadata.get("path")) or None
workdir = _format_metadata_value(metadata.get("workdir")) or None
command = _format_metadata_value(metadata.get("command")) or None
subscriber.record_activity(
unit_id=agent_name,
line=line_text,
agent_name=agent_name,
tool_name=tool_name,
path=path,
workdir=workdir,
command=command,
)
except Exception:
logger.debug("subscriber.record_activity failed", exc_info=True)
def _resolve_parser(parser_type: str) -> AgentParser:
try:
return get_parser(parser_type)
except ValueError:
logger.warning("Unknown parser '{}'; falling back to generic", parser_type)
return get_parser("generic")
def _truncate(text: str, max_length: int) -> str:
if max_length <= 1 or len(text) <= max_length:
return text
return text[:max_length] + "…"
def _render_agent_activity_line(output: AgentOutputLine, agent_name: str) -> Text | None:
content_renderers: dict[str, Callable[[], Text | None]] = {
"text": lambda: _render_text_line(agent_name, output.content, "white"),
"thinking": lambda: _render_text_line(agent_name, output.content, "dim"),
"assistant": lambda: _render_text_line(agent_name, output.content, "dim"),
"result": lambda: _render_text_line(agent_name, output.content, "dim"),
"tool_use": lambda: _render_tool_use_line(agent_name, output),
"tool_result": lambda: _render_tool_result_line(agent_name, output.content),
"error": lambda: _render_error_line(agent_name, output.content),
}
renderer = content_renderers.get(output.type)
if renderer is not None:
return renderer()
return _render_metadata_event_line(agent_name, output)
def _render_text_line(agent_name: str, content: str, style: str) -> Text | None:
stripped = content.strip()
if not stripped:
return None
rendered = _styled_prefix(agent_name, style)
text_width = min(_MAX_TEXT_LENGTH, _available_width(len(agent_name) + 2))
rendered.append(_truncate(stripped, text_width))
return rendered
def _render_tool_use_line(agent_name: str, output: AgentOutputLine) -> Text:
tool_name = output.content.strip() or "unknown-tool"
prefix_label = f"{agent_name} tool"
rendered = _styled_prefix(prefix_label, "magenta")
rendered.append(tool_name, style="bold magenta")
input_summary = _tool_input_summary(output.metadata)
if input_summary:
prefix_total = len(prefix_label) + len(tool_name) + 4
tool_input_width = min(_MAX_TOOL_INPUT_LENGTH, _available_width(prefix_total))
truncated = _truncate(input_summary, tool_input_width)
rendered.append(f" ({truncated})", style="dim")
return rendered
def _render_tool_result_line(agent_name: str, content: str) -> Text | None:
result = content.strip()
if not result:
return None
result_label = f"{agent_name} result"
rendered = _styled_prefix(result_label, "dim")
result_prefix_len = len(result_label) + 2
max_length = (
_MAX_TOOL_RESULT_BRIEF
if len(result) > _TOOL_RESULT_BRIEF_THRESHOLD
else _MAX_TOOL_RESULT_LENGTH
)
result_width = min(max_length, _available_width(result_prefix_len))
rendered.append(_truncate(result, result_width), style="dim")
return rendered
def _render_error_line(agent_name: str, content: str) -> Text:
error = content.strip() or "unknown error"
rendered = _styled_prefix(f"{agent_name} ✗", "red")
rendered.append(error, style="bold red")
return rendered
def _render_metadata_event_line(agent_name: str, output: AgentOutputLine) -> Text:
summary = _metadata_summary(output.metadata)
rendered = _styled_prefix(agent_name, "dim")
rendered.append(output.type, style="dim")
if summary:
rendered.append(f" ({summary})", style="dim")
return rendered
def _tool_input_summary(metadata: dict[str, object]) -> str:
if not metadata:
return ""
input_data = metadata.get("input")
if not isinstance(input_data, dict):
return ""
args = input_data.get("args")
if isinstance(args, str) and args:
return args
return _kv_summary(
input_data,
preferred_keys=("command", "workdir", "path", "file_path", "pattern", "name"),
max_parts=_MAX_METADATA_PARTS,
max_length=_MAX_TOOL_INPUT_LENGTH,
)
def _metadata_summary(metadata: dict[str, object]) -> str:
if not metadata:
return ""
return _kv_summary(
metadata,
preferred_keys=(
"status",
"summary",
"phase",
"decision",
"message",
"event",
"tool",
"path",
"workdir",
"command",
),
max_parts=_MAX_METADATA_PARTS,
max_length=_MAX_METADATA_SUMMARY_LENGTH,
)
def _kv_summary(
values: dict[str, object],
*,
preferred_keys: tuple[str, ...],
max_parts: int,
max_length: int,
) -> str:
parts: list[str] = []
for key in preferred_keys:
value = _format_metadata_value(values.get(key))
if value is None:
continue
parts.append(f"{key}={value}")
if len(parts) >= max_parts:
break
return _truncate(", ".join(parts), max_length) if parts else ""
def _format_metadata_value(value: object) -> str | None:
if value is None:
return None
if isinstance(value, str) and value:
return value
return None
def _styled_prefix(label: str, style: str) -> Text:
text = Text()
text.append(f"{label}: ", style=f"bold {style}")
return text
render_agent_activity_line = _render_agent_activity_line
record_activity_on_subscriber = _record_activity_on_subscriber
metadata_summary = _metadata_summary
truncate = _truncate
available_width = _available_width
terminal_width = _terminal_width
MAX_TEXT_LENGTH = _MAX_TEXT_LENGTH
MAX_TOOL_RESULT_BRIEF = _MAX_TOOL_RESULT_BRIEF
MAX_METADATA_SUMMARY_LENGTH = _MAX_METADATA_SUMMARY_LENGTH