Source code for ralph.display.activity_router

"""Activity router: parser → ActivityModel → RingBuffer."""

from __future__ import annotations

import contextlib
from typing import TYPE_CHECKING, cast

from ralph.agents.parsers import (
    ClaudeParser,
    CodexParser,
    GeminiParser,
    GenericParser,
    OpenCodeParser,
)
from ralph.display.activity_model import (
    ActivityEventKind,
    ActivityProvider,
    EventOptions,
    make_event,
    render_event_line,
)
from ralph.display.ring_buffer import PARALLEL_DISPLAY_BUFFER_SIZE, RingBuffer

if TYPE_CHECKING:
    from collections.abc import Callable

    from ralph.agents.parsers.base import AgentParser

PARSERS: dict[ActivityProvider, type[AgentParser]] = {
    ActivityProvider.AGY: GenericParser,
    ActivityProvider.CLAUDE: ClaudeParser,
    ActivityProvider.OPENCODE: OpenCodeParser,
    ActivityProvider.CODEX: CodexParser,
    ActivityProvider.GEMINI: GeminiParser,
    ActivityProvider.GENERIC: cast("type[AgentParser]", GenericParser),
}


def _default_parser_factory(provider: ActivityProvider) -> AgentParser:
    parser_cls = PARSERS.get(provider, cast("type[AgentParser]", GenericParser))
    return parser_cls()


[docs] def detect_provider_from_command(command: list[str]) -> ActivityProvider: """Infer the ``ActivityProvider`` from the agent command executable name.""" if not command: return ActivityProvider.GENERIC argv0 = command[0] # Map substrings to providers (checked in order) substrings_to_provider: list[tuple[str, ActivityProvider]] = [ ("agy", ActivityProvider.AGY), ("claude", ActivityProvider.CLAUDE), ("opencode", ActivityProvider.OPENCODE), ("codex", ActivityProvider.CODEX), ("aider", ActivityProvider.CODEX), ("gemini", ActivityProvider.GEMINI), ] for substring, provider in substrings_to_provider: if substring in argv0: return provider return ActivityProvider.GENERIC
[docs] def map_parser_type_to_kind(parser_type: str) -> ActivityEventKind: """Convert a parser output type string to the canonical ``ActivityEventKind``.""" mapping: dict[str, ActivityEventKind] = { "text": ActivityEventKind.TEXT, "thinking": ActivityEventKind.THINKING, "tool_use": ActivityEventKind.TOOL_USE, "tool_result": ActivityEventKind.TOOL_RESULT, "error": ActivityEventKind.ERROR, "status": ActivityEventKind.STATUS, "lifecycle": ActivityEventKind.LIFECYCLE, } return mapping.get(parser_type, ActivityEventKind.UNKNOWN)
[docs] class ActivityRouter: """Wire a per-unit parser to its RingBuffer via the typed activity model. Each *unit_id* owns an isolated parser instance and an isolated ring buffer so output from different workers never interferes. Parser exceptions are caught per-line and recorded as ERROR events — a malformed line must never crash the caller. """ def __init__( self, *, parser_factory: Callable[[ActivityProvider], AgentParser] | None = None, buffer_factory: Callable[[], RingBuffer] | None = None, on_event: ( Callable[[str, ActivityEventKind, str | None, str | None, dict[str, object]], None] | None ) = None, raw_overflow_callback: Callable[[str, str], None] | None = None, ) -> None: self._parser_factory = parser_factory or _default_parser_factory self._buffer_factory = buffer_factory or ( lambda: RingBuffer(maxsize=PARALLEL_DISPLAY_BUFFER_SIZE) ) self._parsers: dict[str, AgentParser] = {} self._buffers: dict[str, RingBuffer] = {} self._on_event = on_event self._raw_overflow_callback = raw_overflow_callback def get_buffer(self, unit_id: str) -> RingBuffer: if unit_id not in self._buffers: self._buffers[unit_id] = self._buffer_factory() return self._buffers[unit_id]
[docs] def push_raw_line( self, unit_id: str, raw_line: str, *, provider: ActivityProvider = ActivityProvider.GENERIC, raw_reference: str | None = None, ) -> None: """Never raises — parser failures are converted to ERROR events.""" buffer = self.get_buffer(unit_id) try: parser = self._parsers.get(unit_id) if parser is None: parser = self._parser_factory(provider) self._parsers[unit_id] = parser lines = list(parser.parse(iter([raw_line]))) for out in lines: kind = map_parser_type_to_kind(out.type) event = make_event( provider=provider, kind=kind, options=EventOptions( content=out.content, metadata=out.metadata or {}, source=unit_id, ), ) rendered = render_event_line(event.kind, event.content, timestamp=event.timestamp) buffer.enqueue(rendered) if self._on_event is not None: self._on_event(unit_id, kind, event.content, raw_reference, out.metadata or {}) except Exception as exc: if self._raw_overflow_callback is not None: with contextlib.suppress(Exception): self._raw_overflow_callback(unit_id, raw_line) error_event = make_event( provider=provider, kind=ActivityEventKind.ERROR, options=EventOptions( content=f"parser error: {exc}", source=unit_id, ), ) rendered = render_event_line( error_event.kind, error_event.content, timestamp=error_event.timestamp ) buffer.enqueue(rendered) if self._on_event is not None: self._on_event( unit_id, ActivityEventKind.ERROR, error_event.content, raw_reference, {} )
__all__ = [ "PARSERS", "ActivityRouter", "detect_provider_from_command", "map_parser_type_to_kind", ]