Source code for ralph.agents.parsers.gemini

"""Parser for Gemini's SSE+JSON streaming format.

Gemini emits Server-Sent Events (SSE) where each event contains a JSON payload.
This parser handles the SSE format and normalizes Gemini output to AgentOutputLine
instances with robust delta accumulation.
"""

from __future__ import annotations

import json
from typing import TYPE_CHECKING, Final, cast

from .agent_output_line import AgentOutputLine
from .text_accumulator import TextAccumulator

JsonValue = object
JsonDict = dict[str, JsonValue]

if TYPE_CHECKING:
    from collections.abc import Iterator


# Structured JSON event types that carry only lifecycle metadata — suppress silently.
_LIFECYCLE_EVENT_TYPES: Final[frozenset[str]] = frozenset(
    {
        "thread.started",
        "turn.started",
        "message_start",
        "message_started",
        "heartbeat",
        "ping",
        "ready",
        "start",
    }
)


[docs] class GeminiParser: """Parser for Gemini's SSE+JSON streaming output with robust delta accumulation. Gemini uses SSE with data: lines containing JSON payloads. Each payload has a "type" field indicating what kind of content it carries. Text deltas are accumulated into coherent blocks before emission, flushing on: - ``done`` / ``stop`` / ``message_end`` (end of message) - ``\\n\\n`` paragraph boundary (incremental surfacing of long responses) - Iterator exhaustion (final flush via ``_flush_all_accumulators()``) """ _STOP_EVENT_TYPES: Final[frozenset[str]] = frozenset({"done", "stop", "message_end"}) def __init__(self) -> None: # Single accumulator for gemini's sequential text streaming self._text_accumulator: TextAccumulator | None = None
[docs] def parse(self, lines: Iterator[str]) -> Iterator[AgentOutputLine]: """Parse Gemini SSE streaming lines. Args: lines: Iterator of raw lines from Gemini stdout (SSE format). Yields: Normalized AgentOutputLine instances. """ for line in lines: stripped = line.strip() if not stripped: continue # Handle SSE format: "data: <json>" if stripped.startswith("data:"): stripped = stripped[5:].strip() if not stripped or stripped == "[DONE]": continue try: obj = cast("JsonDict", json.loads(stripped, strict=False)) except json.JSONDecodeError: yield AgentOutputLine(type="raw", content=stripped, raw=stripped) continue yield from self._parse_object(obj, stripped) # Final flush: if iterator exhausted with pending accumulators, flush them all yield from self._flush_all_accumulators()
def _parse_object(self, obj: JsonDict, stripped: str) -> Iterator[AgentOutputLine]: """Parse a JSON object into AgentOutputLine instances. Args: obj: Parsed JSON object. stripped: Original stripped line. Yields: AgentOutputLine instances. """ event_type = str(obj.get("type", obj.get("event", "unknown"))) # Suppress lifecycle-only events that carry no user payload. if event_type in _LIFECYCLE_EVENT_TYPES: return # Handle stop events - flush accumulators first if event_type in self._STOP_EVENT_TYPES: yield from self._flush_all_accumulators() yield AgentOutputLine(type="stop", raw=stripped, metadata=obj) return if event_type in ("text", "content"): yield from self._parse_text_content(obj, stripped) elif event_type in ("block", "content_block"): yield from self._parse_block(obj, stripped) elif event_type in ("tool_call", "tool_use"): yield from self._parse_tool_call(obj, stripped) elif event_type in ("tool_result", "function_call"): yield from self._parse_tool_result(obj, stripped) elif event_type in ("error", "error_details"): yield from self._parse_error(obj, stripped) elif event_type in ("candidate", "prompt_feedback"): yield AgentOutputLine(type=event_type, raw=stripped, metadata=obj) elif event_type in ("message", "server_message"): yield from self._parse_message(obj, stripped) elif "candidates" in obj: # Gemini API response format: {candidates: [{content: {parts: [...]}}]} yield from self._parse_candidates_response(obj, stripped) else: yield AgentOutputLine(type=event_type, raw=stripped, metadata=obj) def _parse_text_content(self, obj: JsonDict, stripped: str) -> Iterator[AgentOutputLine]: """Parse text/content event with delta accumulation.""" content = self._extract_first_part_text(obj) if not content: content = str(obj.get("content", "") or obj.get("text", "")) if not content: return if self._text_accumulator is None: self._text_accumulator = TextAccumulator() yield from self._text_accumulator.accumulate( content, stripped, kind="text", keep_current_when_empty=True ) def _parse_block(self, obj: JsonDict, stripped: str) -> Iterator[AgentOutputLine]: """Parse block/content_block event with delta accumulation.""" content = self._extract_first_part_text(obj) if not content: content = str(obj.get("content", "")) if not content: return if self._text_accumulator is None: self._text_accumulator = TextAccumulator() yield from self._text_accumulator.accumulate( content, stripped, kind="text", keep_current_when_empty=True ) def _parse_tool_call(self, obj: JsonDict, stripped: str) -> Iterator[AgentOutputLine]: """Parse tool_call/tool_use event.""" function_call_obj: JsonValue | None = obj.get("function_call") func_call: JsonDict | None = ( cast("JsonDict", function_call_obj) if isinstance(function_call_obj, dict) else None ) tool_name = str( obj.get("name", "") or obj.get("tool", "") or (func_call.get("name", "") if func_call is not None else "") ) args_source = obj.get("args") or obj.get("arguments") args_str = "" if args_source: args_str = str(args_source) elif func_call is not None: func_args = func_call.get("args") if isinstance(func_args, dict): args_str = json.dumps(cast("JsonDict", func_args)) yield AgentOutputLine( type="tool_use", content=tool_name, raw=stripped, metadata={"tool": tool_name, "args": args_str}, ) def _parse_tool_result(self, obj: JsonDict, stripped: str) -> Iterator[AgentOutputLine]: """Parse tool_result/function_call event.""" result = str(obj.get("response", "") or obj.get("result", "") or obj.get("content", "")) yield AgentOutputLine(type="tool_result", content=result, raw=stripped, metadata=obj) def _parse_error(self, obj: JsonDict, stripped: str) -> Iterator[AgentOutputLine]: """Parse error/error_details event.""" error_val = obj.get("error") if isinstance(error_val, dict): error_msg = str(cast("JsonDict", error_val).get("message", "")) else: error_msg = str(error_val) if error_val else "unknown error" yield AgentOutputLine(type="error", content=error_msg, raw=stripped, metadata=obj) def _parse_message(self, obj: JsonDict, stripped: str) -> Iterator[AgentOutputLine]: """Parse message/server_message event.""" parts = obj.get("parts") if isinstance(parts, list): for part in parts: if isinstance(part, dict): part_dict = cast("JsonDict", part) text = str(part_dict.get("text", "")) if text: # thought=True parts are reasoning tokens, not user-visible text. if part_dict.get("thought") is True: yield AgentOutputLine(type="thinking", content=text, raw=stripped) else: yield AgentOutputLine(type="text", content=text, raw=stripped) function_call_obj = part_dict.get("function_call") func: JsonDict | None = ( cast("JsonDict", function_call_obj) if isinstance(function_call_obj, dict) else None ) if func: tool_name = str(func.get("name", "")) args_str = str(func.get("args", "")) yield AgentOutputLine( type="tool_use", content=tool_name, raw=stripped, metadata={"tool": tool_name, "args": args_str}, ) content = str(obj.get("content", "")) if content: yield AgentOutputLine(type="text", content=content, raw=stripped) def _parse_candidates_response(self, obj: JsonDict, stripped: str) -> Iterator[AgentOutputLine]: """Parse a Gemini API candidates-style response. Handles: {"candidates": [{"content": {"parts": [{"text": "...", "thought": true}]}}]} Parts with thought=True are emitted as 'thinking'; others as 'text'. """ candidates_val = obj.get("candidates") if not isinstance(candidates_val, list) or not candidates_val: return candidate = candidates_val[0] if not isinstance(candidate, dict): return content_val = cast("JsonDict", candidate).get("content") if not isinstance(content_val, dict): return parts_val = cast("JsonDict", content_val).get("parts") if not isinstance(parts_val, list): return for part in parts_val: if not isinstance(part, dict): continue part_dict = cast("JsonDict", part) text = str(part_dict.get("text", "")) if not text: continue if part_dict.get("thought") is True: yield AgentOutputLine(type="thinking", content=text, raw=stripped) else: yield AgentOutputLine(type="text", content=text, raw=stripped) def _extract_first_part_text(self, obj: JsonDict) -> str: """Return the text for the first part entry, if present.""" parts_val: JsonValue | None = obj.get("parts") if isinstance(parts_val, list) and parts_val: first_part = parts_val[0] if isinstance(first_part, dict): part_dict = cast("JsonDict", first_part) return str(part_dict.get("text", "")) return "" def _flush_accumulator(self) -> Iterator[AgentOutputLine]: """Flush the single text accumulator and clear it.""" if self._text_accumulator is None: return acc = self._text_accumulator self._text_accumulator = None yield from acc.flush(kind="text") def _flush_all_accumulators(self) -> Iterator[AgentOutputLine]: """Flush all pending accumulators on stop or iterator exhaustion.""" yield from self._flush_accumulator()