Source code for ralph.agents.parsers.claude

"""Parser for Claude's NDJSON streaming format."""

from __future__ import annotations

import json
import re
from typing import TYPE_CHECKING, Final, cast

from .agent_output_line import AgentOutputLine
from .base import stringify_text_blocks
from .text_accumulator import TextAccumulator

if TYPE_CHECKING:
    from collections.abc import Iterator

# Matches "claude" or "claude/<model>" at line start, followed by space, colon, or end.
_CLAUDE_PREFIX_RE: Final[re.Pattern[str]] = re.compile(r"^claude(?:/[^:\s]+)?(?=[ :]|$)")

# Lifecycle markers emitted by Claude CLI that carry no user payload.
# Unknown free-text after "claude/<model>: " defaults to type='text' (safe default).
_LIFECYCLE_MARKERS: Final[frozenset[str]] = frozenset(
    {
        "message_delta",
        "user",
        "thinking",
        "assistant",
        "message_start",
        "message_stop",
        "content_block_start",
        "content_block_stop",
    }
)


[docs] class ClaudeParser: """Parser for Claude's NDJSON streaming output with robust delta accumulation. Text deltas are accumulated into coherent blocks before emission, flushing on: - ``content_block_stop`` (end of a content block) - ``message_stop`` (end of the message) - ``\\n\\n`` paragraph boundary (incremental surfacing of long responses) Thinking deltas (``thinking_delta``) are accumulated separately from text deltas and emitted as ``type="thinking"`` lines. """ _LIFECYCLE_EVENT_TYPES: Final[frozenset[str]] = frozenset( {"message_start", "message_stop", "content_block_stop"} ) def __init__(self) -> None: # Accumulators keyed by (message_id, content_block_index) self._text_accumulator: dict[tuple[str, int], TextAccumulator] = {} self._thinking_accumulator: dict[tuple[str, int], TextAccumulator] = {} self._fallback_accumulator: TextAccumulator | None = None self._fallback_thinking_accumulator: TextAccumulator | None = None self._current_message_id: str | None = None self._seen_content_blocks: set[tuple[str, int]] = set()
[docs] def parse(self, lines: Iterator[str]) -> Iterator[AgentOutputLine]: """Parse Claude streaming NDJSON lines.""" for line in lines: stripped = line.strip() if not stripped: continue prefixed_lines = self._parse_prefixed_transcript_line(stripped) if prefixed_lines is not None: yield from prefixed_lines continue try: parsed: object = json.loads(stripped, strict=False) except json.JSONDecodeError: yield AgentOutputLine(type="raw", content=stripped, raw=stripped) continue if not isinstance(parsed, dict): yield AgentOutputLine(type="raw", content=stripped, raw=stripped) continue obj = cast("dict[str, object]", parsed) yield from self._parse_top_level_object(obj, stripped) # Final flush: if iterator exhausted with pending accumulators, flush them all yield from self._flush_all_accumulators()
def _parse_top_level_object( self, obj: dict[str, object], raw: str, ) -> Iterator[AgentOutputLine]: event_type = str(obj.get("type", "unknown")) lifecycle_result = self._handle_lifecycle_event(obj, event_type) if lifecycle_result is not None: yield from lifecycle_result return yield from self._dispatch_top_level_event(obj, raw, event_type) def _handle_lifecycle_event( self, obj: dict[str, object], event_type: str, ) -> Iterator[AgentOutputLine] | None: if event_type == "message_start": self._record_message_start(obj) return iter(()) if event_type == "message_stop": flushed = self._flush_all_accumulators() self._current_message_id = None self._seen_content_blocks.clear() return flushed if event_type == "content_block_stop": return self._flush_content_block(obj) if event_type in self._LIFECYCLE_EVENT_TYPES: return iter(()) return None def _record_message_start(self, obj: dict[str, object]) -> None: message = obj.get("message") if not isinstance(message, dict): return msg_id = str(message.get("id", "")) if msg_id: self._current_message_id = msg_id def _flush_content_block(self, obj: dict[str, object]) -> Iterator[AgentOutputLine]: index = obj.get("index") if isinstance(index, int) and self._current_message_id is not None: key = (self._current_message_id, index) if key in self._text_accumulator: yield from self._flush_text_accumulator(key) if key in self._thinking_accumulator: yield from self._flush_thinking_accumulator(key) def _dispatch_top_level_event( self, obj: dict[str, object], raw: str, event_type: str, ) -> Iterator[AgentOutputLine]: if event_type == "stream_event": event = obj.get("event") if isinstance(event, dict): yield from self._parse_stream_inner(event, raw) else: yield AgentOutputLine(type="stream_event", raw=raw, metadata=obj) return if event_type == "content_block_delta": yield from self._parse_content_block_delta(obj, raw) return if event_type == "content_block_start": self._track_content_block_start(obj) yield from self._parse_content_block_start(obj, raw) return if event_type == "assistant": yield from self._parse_assistant_message(obj, raw) return if event_type == "result": yield from self._parse_result_event(obj, raw) return if event_type == "error": yield from self._parse_error_event(obj, raw) return yield AgentOutputLine(type=event_type, raw=raw, metadata=obj) def _track_content_block_start(self, obj: dict[str, object]) -> None: content_block = obj.get("content_block") if not isinstance(content_block, dict) or self._current_message_id is None: return index = obj.get("index") if not isinstance(index, int): return block_type = str(content_block.get("type", "")) key = (self._current_message_id, index) if block_type == "text": if key not in self._text_accumulator: self._text_accumulator[key] = TextAccumulator() elif block_type == "thinking" and key not in self._thinking_accumulator: self._thinking_accumulator[key] = TextAccumulator() def _parse_stream_inner( self, event: dict[str, object], raw: str, ) -> Iterator[AgentOutputLine]: event_type = str(event.get("type", "unknown")) if event_type == "content_block_delta": yield from self._parse_content_block_delta(event, raw) return if event_type == "content_block_start": # Track content block start for text and thinking blocks self._track_content_block_start(event) yield from self._parse_stream_content_block_start(event, raw) return if event_type == "error": yield from self._parse_stream_error(event, raw) return if event_type in self._LIFECYCLE_EVENT_TYPES: return yield AgentOutputLine(type=event_type, raw=raw, metadata=event) def _parse_content_block_delta( self, obj: dict[str, object], raw: str, ) -> Iterator[AgentOutputLine]: delta = obj.get("delta") if not isinstance(delta, dict): return delta_type = str(delta.get("type", "text_delta" if "text" in delta else "")) if delta_type == "thinking_delta": yield from self._accumulate_thinking_delta(obj, delta, raw) return if delta_type != "text_delta": return text = str(delta.get("text", "")) if not text: return # Get block index for accumulator keying index = obj.get("index") block_key: tuple[str, int] | None = None if isinstance(index, int) and self._current_message_id is not None: block_key = (self._current_message_id, index) if block_key in self._text_accumulator: yield from self._text_accumulator[block_key].accumulate( text, raw, kind="text", keep_current_when_empty=False ) return # No keyed content block context - accumulate in a fallback stream bucket. if self._fallback_accumulator is None: self._fallback_accumulator = TextAccumulator() yield from self._fallback_accumulator.accumulate( text, raw, kind="text", keep_current_when_empty=True ) def _accumulate_thinking_delta( self, obj: dict[str, object], delta: dict[str, object], raw: str, ) -> Iterator[AgentOutputLine]: # thinking_delta uses field key "thinking" (not "text") text = str(delta.get("thinking", delta.get("text", ""))) # Skip whitespace-only deltas with no paragraph-boundary markers. if not text.strip() and "\n\n" not in text: return index = obj.get("index") if isinstance(index, int) and self._current_message_id is not None: key = (self._current_message_id, index) if key in self._thinking_accumulator: yield from self._thinking_accumulator[key].accumulate( text, raw, kind="thinking", keep_current_when_empty=False ) return # Fallback thinking accumulator if self._fallback_thinking_accumulator is None: self._fallback_thinking_accumulator = TextAccumulator() yield from self._fallback_thinking_accumulator.accumulate( text, raw, kind="thinking", keep_current_when_empty=True ) def _flush_text_accumulator(self, key: tuple[str, int]) -> Iterator[AgentOutputLine]: """Flush a single text accumulator and remove it.""" if key not in self._text_accumulator: return acc = self._text_accumulator.pop(key) yield from acc.flush(kind="text") def _flush_thinking_accumulator(self, key: tuple[str, int]) -> Iterator[AgentOutputLine]: """Flush a single thinking accumulator and remove it.""" if key not in self._thinking_accumulator: return acc = self._thinking_accumulator.pop(key) yield from acc.flush(kind="thinking", require_strip=True) def _flush_fallback_accumulator(self) -> Iterator[AgentOutputLine]: if self._fallback_accumulator is None: return acc = self._fallback_accumulator self._fallback_accumulator = None yield from acc.flush(kind="text") def _flush_fallback_thinking_accumulator(self) -> Iterator[AgentOutputLine]: if self._fallback_thinking_accumulator is None: return acc = self._fallback_thinking_accumulator self._fallback_thinking_accumulator = None yield from acc.flush(kind="thinking", require_strip=True) def _flush_all_accumulators(self) -> Iterator[AgentOutputLine]: """Flush all pending accumulators on message_stop or iterator exhaustion.""" for key in list(self._text_accumulator.keys()): yield from self._flush_text_accumulator(key) for key in list(self._thinking_accumulator.keys()): yield from self._flush_thinking_accumulator(key) yield from self._flush_fallback_accumulator() yield from self._flush_fallback_thinking_accumulator() def _parse_result_event( self, obj: dict[str, object], raw: str, ) -> Iterator[AgentOutputLine]: subtype = str(obj.get("subtype", "")) if subtype == "error": error = str(obj.get("error", "unknown error")) yield AgentOutputLine(type="error", content=error, raw=raw, metadata=obj) return result = str(obj.get("result", "")) if result: yield AgentOutputLine(type="text", content=result, raw=raw, metadata=obj) def _parse_content_block_start( self, obj: dict[str, object], raw: str, ) -> Iterator[AgentOutputLine]: content_block = obj.get("content_block") if not isinstance(content_block, dict): return yield from self._parse_content_block(content_block, raw) def _parse_error_event( self, obj: dict[str, object], raw: str, ) -> Iterator[AgentOutputLine]: error_obj = obj.get("error") if isinstance(error_obj, dict): error_msg = str(error_obj.get("message", error_obj.get("type", "unknown error"))) else: error_msg = "unknown" yield AgentOutputLine(type="error", content=error_msg, raw=raw, metadata=obj) def _parse_stream_content_block_start( self, event: dict[str, object], raw: str, ) -> Iterator[AgentOutputLine]: content_block = event.get("content_block") if not isinstance(content_block, dict): return yield from self._parse_content_block(content_block, raw) def _parse_stream_error( self, event: dict[str, object], raw: str, ) -> Iterator[AgentOutputLine]: error = event.get("error") if isinstance(error, dict): error_msg = str(error.get("message", error.get("code", "unknown error"))) else: error_msg = "unknown error" yield AgentOutputLine(type="error", content=error_msg, raw=raw, metadata=event) def _parse_assistant_message( self, obj: dict[str, object], raw: str, ) -> Iterator[AgentOutputLine]: message = obj.get("message") if not isinstance(message, dict): return content = message.get("content") if not isinstance(content, list): return yield from self._parse_message_content(content, raw) def _parse_message_content( self, content: list[object], raw: str, ) -> Iterator[AgentOutputLine]: for block in content: if not isinstance(block, dict): continue block_obj = cast("dict[str, object]", block) block_type = str(block_obj.get("type", "")) if block_type == "text": text = str(block_obj.get("text", "")) if text: yield AgentOutputLine(type="text", content=text, raw=raw, metadata=block_obj) continue if block_type == "tool_use": tool_name = str(block_obj.get("name", "unknown")) yield AgentOutputLine( type="tool_use", content=tool_name, raw=raw, metadata=block_obj ) continue if block_type == "tool_result": yield from self._parse_tool_result(block_obj, raw) continue if block_type == "thinking": text = str(block_obj.get("thinking", block_obj.get("text", ""))) # Skip whitespace-only thinking content — carries no user payload. if text.strip(): yield AgentOutputLine( type="thinking", content=text, raw=raw, metadata=block_obj ) continue # Non-text, non-tool, non-thinking block types (e.g., image) are rejected yield AgentOutputLine( type="error", content=f"unsupported content block type '{block_type}' in agent output", raw=raw, metadata=block_obj, ) def _parse_plain_text_prefix(self, raw: str, text: str) -> list[AgentOutputLine]: if text in _LIFECYCLE_MARKERS or text.startswith("system (status="): return [] return [AgentOutputLine(type="text", content=text, raw=raw)] def _parse_structured_remainder(self, raw: str, remainder: str) -> list[AgentOutputLine] | None: for role in ("user", "assistant"): role_prefix = f" {role}: message=" if remainder.startswith(role_prefix): return self._parse_prefixed_message_line(raw, remainder[len(role_prefix) :]) if remainder.startswith(" message_delta") or remainder.startswith(" system: status="): return [] if remainder.startswith(" ✗: "): return [AgentOutputLine(type="error", content=remainder[4:], raw=raw)] return None def _parse_prefixed_transcript_line(self, raw: str) -> list[AgentOutputLine] | None: if raw.startswith("[claude]:"): return [] m = _CLAUDE_PREFIX_RE.match(raw) if m is None: return None remainder = raw[m.end() :] if remainder.startswith(": "): return self._parse_plain_text_prefix(raw, remainder[2:]) if remainder.startswith(" tool: "): return self._parse_prefixed_tool_line(raw, remainder[7:]) return self._parse_structured_remainder(raw, remainder) def _parse_prefixed_tool_line(self, raw: str, payload: str) -> list[AgentOutputLine]: payload = payload.strip() tool_name, has_details, detail_suffix = payload.partition(" (") metadata: dict[str, object] = {} if has_details and detail_suffix.endswith(")"): metadata["input"] = {"args": detail_suffix[:-1]} return [ AgentOutputLine( type="tool_use", content=tool_name.strip() or "unknown", raw=raw, metadata=metadata, ) ] def _parse_prefixed_message_line( self, raw: str, json_payload: str ) -> list[AgentOutputLine] | None: try: parsed: object = json.loads(json_payload) except json.JSONDecodeError: return None if not isinstance(parsed, dict): return None content = parsed.get("content") if not isinstance(content, list): return [] return list(self._parse_message_content(content, raw)) def _parse_content_block( self, content_block: dict[str, object], raw: str, ) -> Iterator[AgentOutputLine]: block_type = str(content_block.get("type", "unknown")) if block_type == "text": text = str(content_block.get("text", "")) if text: yield AgentOutputLine(type="text", content=text, raw=raw, metadata=content_block) return if block_type == "tool_use": tool_name = str(content_block.get("name", "unknown")) yield AgentOutputLine( type="tool_use", content=tool_name, raw=raw, metadata=content_block ) return if block_type == "tool_result": yield from self._parse_tool_result(content_block, raw) return # thinking blocks are handled via delta accumulation; no emission at block_start if block_type == "thinking": return # Non-text, non-tool, non-thinking block types (e.g., image) are rejected yield AgentOutputLine( type="error", content=f"unsupported content block type '{block_type}' in agent output", raw=raw, metadata=content_block, ) def _parse_tool_result( self, block: dict[str, object], raw: str, ) -> Iterator[AgentOutputLine]: """Parse a tool_result content block, preserving multimodal content as bounded summaries.""" content = block.get("content") if content is None: yield AgentOutputLine(type="tool_result", content="", raw=raw, metadata=block) return if isinstance(content, list): tool_result = stringify_text_blocks(content, require_text_type=True) yield AgentOutputLine( type="tool_result", content=tool_result, raw=raw, metadata=block, ) return # String content is passed through yield AgentOutputLine(type="tool_result", content=str(content), raw=raw, metadata=block)