"""Parser for Codex's NDJSON streaming format."""
from __future__ import annotations
import json
from typing import TYPE_CHECKING, Final, cast
from .agent_output_line import AgentOutputLine
from .text_accumulator import TextAccumulator
if TYPE_CHECKING:
from collections.abc import Iterator
[docs]
class CodexParser:
"""Parser for Codex's NDJSON streaming output with robust delta accumulation.
Text deltas are accumulated into coherent blocks before emission, flushing on:
- ``response.completed`` / ``turn.completed`` / ``message_stop`` (end of message)
- ``\\n\\n`` paragraph boundary (incremental surfacing of long responses)
- Iterator exhaustion (final flush via ``_flush_all_accumulators()``)
"""
_STOP_EVENT_TYPES: Final[frozenset[str]] = frozenset(
{"turn.completed", "message_stop", "done", "stop", "response.completed"}
)
# Lifecycle-only events that carry no user payload — suppress silently.
_LIFECYCLE_EVENT_TYPES: Final[frozenset[str]] = frozenset(
{"thread.started", "turn.started", "message_start", "ping", "heartbeat", "ready"}
)
def __init__(self) -> None:
# Accumulator keyed by response id or synthetic stream key
self._text_accumulator: dict[str, TextAccumulator] = {}
self._current_response_id: str | None = None
self._stream_counter = 0
[docs]
def parse(self, lines: Iterator[str]) -> Iterator[AgentOutputLine]:
"""Parse Codex streaming NDJSON lines."""
for line in lines:
stripped = line.strip()
if stripped.startswith("data:"):
stripped = stripped.removeprefix("data:").strip()
if not stripped:
continue
if stripped == "[DONE]":
yield AgentOutputLine(type="stop", raw=stripped)
continue
try:
parsed: object = json.loads(stripped, strict=False)
except json.JSONDecodeError:
yield AgentOutputLine(type="raw", content=stripped, raw=stripped)
continue
if not isinstance(parsed, dict):
yield AgentOutputLine(type="raw", content=stripped, raw=stripped)
continue
obj = cast("dict[str, object]", parsed)
yield from self._parse_object(obj, stripped)
# Final flush: if iterator exhausted with pending accumulators, flush them all
yield from self._flush_all_accumulators()
def _parse_object(self, obj: dict[str, object], stripped: str) -> Iterator[AgentOutputLine]:
"""Parse a JSON object into AgentOutputLine instances."""
event_type = str(obj.get("type", "unknown"))
# Handle lifecycle/flush events
if event_type in self._STOP_EVENT_TYPES:
yield from self._flush_all_accumulators()
self._current_response_id = None
yield AgentOutputLine(type="stop", raw=stripped, metadata=obj)
return
# Suppress known lifecycle events that carry no user payload
if event_type in self._LIFECYCLE_EVENT_TYPES:
return
handler_map = {
"text": self._parse_text_content,
"content": self._parse_text_content,
"text_delta": self._parse_text_delta,
"response.output_text": self._parse_text_content,
"response.output_text.delta": self._parse_text_delta,
"tool_use": self._parse_tool_use,
"tool_result": self._parse_tool_result,
"tool_result_delta": self._parse_tool_result,
"error": self._parse_error,
"error_delta": self._parse_error,
"assistant": self._parse_assistant,
"item.started": self._parse_item_event,
"item.completed": self._parse_item_event,
"result": self._parse_result,
"turn.failed": self._parse_turn_failed,
}
handler = handler_map.get(event_type)
if handler:
yield from handler(obj, stripped)
return
yield AgentOutputLine(type=event_type, raw=stripped, metadata=obj)
def _parse_text_content(
self,
obj: dict[str, object],
stripped: str,
) -> Iterator[AgentOutputLine]:
content = str(obj.get("content", "") or obj.get("text", ""))
if content:
yield AgentOutputLine(type="text", content=content, raw=stripped)
def _parse_text_delta(self, obj: dict[str, object], stripped: str) -> Iterator[AgentOutputLine]:
delta_val = obj.get("delta")
if isinstance(delta_val, dict):
delta_obj = cast("dict[str, object]", delta_val)
content_val = delta_obj.get("content") or delta_obj.get("text")
content = str(content_val or "")
elif isinstance(delta_val, str):
content = delta_val
else:
content = ""
if not content:
return
# Get response id for accumulator keying
response_id = str(obj.get("response_id", obj.get("responseId", "")) or "")
if not response_id:
if self._current_response_id:
response_id = self._current_response_id
else:
# No active response context, yield immediately
yield AgentOutputLine(type="text", content=content, raw=stripped)
return
key = response_id
if key not in self._text_accumulator:
self._text_accumulator[key] = TextAccumulator()
yield from self._text_accumulator[key].accumulate(
content, stripped, kind="text", keep_current_when_empty=True
)
def _flush_accumulator(self, key: str) -> Iterator[AgentOutputLine]:
"""Flush a single accumulator and remove it."""
if key not in self._text_accumulator:
return
acc = self._text_accumulator.pop(key)
yield from acc.flush(kind="text")
def _flush_all_accumulators(self) -> Iterator[AgentOutputLine]:
"""Flush all pending accumulators on stop or iterator exhaustion."""
for key in list(self._text_accumulator.keys()):
yield from self._flush_accumulator(key)
def _parse_tool_use(self, obj: dict[str, object], stripped: str) -> Iterator[AgentOutputLine]:
tool_name = str(obj.get("tool", obj.get("name", "unknown")))
tool_input = obj.get("input", {})
yield AgentOutputLine(
type="tool_use",
content=tool_name,
raw=stripped,
metadata={"tool": tool_name, "input": tool_input},
)
def _parse_tool_result(
self,
obj: dict[str, object],
stripped: str,
) -> Iterator[AgentOutputLine]:
result = str(obj.get("result", obj.get("content", obj.get("output", ""))))
yield AgentOutputLine(type="tool_result", content=result, raw=stripped, metadata=obj)
def _parse_error(self, obj: dict[str, object], stripped: str) -> Iterator[AgentOutputLine]:
error_val = obj.get("error")
if isinstance(error_val, dict):
error_msg = str(cast("dict[str, object]", error_val).get("message", ""))
else:
error_msg = str(obj.get("message") or error_val or "unknown error")
yield AgentOutputLine(type="error", content=error_msg, raw=stripped, metadata=obj)
def _parse_turn_failed(
self,
obj: dict[str, object],
stripped: str,
) -> Iterator[AgentOutputLine]:
error_message = str(obj.get("error", "turn failed"))
yield AgentOutputLine(type="error", content=error_message, raw=stripped, metadata=obj)
def _parse_assistant(self, obj: dict[str, object], stripped: str) -> Iterator[AgentOutputLine]:
content = str(obj.get("content", ""))
if content:
yield AgentOutputLine(type="text", content=content, raw=stripped)
yield AgentOutputLine(type="assistant", raw=stripped, metadata=obj)
def _parse_result(self, obj: dict[str, object], stripped: str) -> Iterator[AgentOutputLine]:
result = str(obj.get("result", ""))
if result:
yield AgentOutputLine(type="text", content=result, raw=stripped, metadata=obj)
def _parse_item_event(self, obj: dict[str, object], stripped: str) -> Iterator[AgentOutputLine]:
item_obj = obj.get("item")
if not isinstance(item_obj, dict):
yield AgentOutputLine(type=str(obj.get("type", "item")), raw=stripped, metadata=obj)
return
item_type = str(item_obj.get("type", "unknown"))
text = str(item_obj.get("text", ""))
# reasoning items map to 'thinking' so the display applies the thinking-preview treatment.
if item_type == "reasoning" and text:
yield AgentOutputLine(type="thinking", content=text, raw=stripped, metadata=item_obj)
return
if item_type == "agent_message" and text:
yield AgentOutputLine(type="text", content=text, raw=stripped, metadata=item_obj)
return
if item_type == "mcp_tool_call":
tool_name = str(item_obj.get("tool", "unknown"))
arguments: object = item_obj.get("arguments", {})
yield AgentOutputLine(
type="tool_use",
content=tool_name,
raw=stripped,
metadata={"tool": tool_name, "input": arguments},
)
return
if item_type == "command_execution":
command = str(item_obj.get("command", ""))
if command:
yield AgentOutputLine(
type="tool_use",
content="bash",
raw=stripped,
metadata=item_obj,
)
else:
yield AgentOutputLine(
type="item_command_execution",
raw=stripped,
metadata=item_obj,
)
return
if item_type in {"mcp_tool_result", "tool_result", "mcp_result"}:
tool_name = str(item_obj.get("tool", "unknown"))
result_obj = item_obj.get("result", item_obj.get("output", item_obj.get("content", "")))
content = result_obj if isinstance(result_obj, str) else ""
yield AgentOutputLine(
type="tool_result",
content=content,
raw=stripped,
metadata={"tool": tool_name, "result": result_obj},
)
return
yield AgentOutputLine(type=f"item_{item_type}", raw=stripped, metadata=item_obj)