Source code for ralph.agents.parsers.claude_interactive

"""Convert interactive Claude transcript lines into agent parser output."""

from __future__ import annotations

from typing import TYPE_CHECKING

from .agent_output_line import AgentOutputLine
from .claude_interactive_transcript_parser import ClaudeInteractiveTranscriptParser
from .interactive_transcript_event import InteractiveTranscriptEvent
from .text_accumulator import TextAccumulator

if TYPE_CHECKING:
    from collections.abc import Iterator


[docs] class ClaudeInteractiveParser: """Convert interactive Claude transcript lines into AgentOutputLine events.""" def __init__(self) -> None: self._parser = ClaudeInteractiveTranscriptParser() self._text_accumulator = TextAccumulator() self._thinking_accumulator = TextAccumulator() def parse(self, lines: Iterator[str]) -> Iterator[AgentOutputLine]: for raw in lines: for event in self._parser.feed(raw): if event.kind == "output": self._text_accumulator.buffer += event.text + "\n" self._text_accumulator.raw_lines.append(raw) continue if event.kind == "thinking": self._thinking_accumulator.buffer += event.text + "\n" self._thinking_accumulator.raw_lines.append(raw) continue yield from self._flush_accumulators() if event.kind == "session": continue if event.kind == "tool_use": tool_name = ( event.text.split(":", 1)[-1].strip() if ":" in event.text else event.text ) yield AgentOutputLine( type="tool_use", content=tool_name, raw=raw, metadata={"tool": tool_name}, ) continue if event.kind == "tool_result": yield AgentOutputLine(type="tool_result", content=event.text, raw=raw) yield from self._flush_accumulators() def _flush_accumulators(self) -> Iterator[AgentOutputLine]: text = self._text_accumulator.buffer.strip() if text: raw = "\n".join(self._text_accumulator.raw_lines) yield AgentOutputLine(type="text", content=text, raw=raw) self._text_accumulator.buffer = "" self._text_accumulator.raw_lines = [] thinking = self._thinking_accumulator.buffer.strip() if thinking: raw = "\n".join(self._thinking_accumulator.raw_lines) yield AgentOutputLine(type="thinking", content=thinking, raw=raw) self._thinking_accumulator.buffer = "" self._thinking_accumulator.raw_lines = []
__all__ = [ "ClaudeInteractiveParser", "ClaudeInteractiveTranscriptParser", "InteractiveTranscriptEvent", ]