"""Generic NDJSON parser for other agents.
This parser handles NDJSON output from agents that don't have
a dedicated parser. It attempts to extract text content and
error information from common NDJSON formats, with robust delta
accumulation for streaming text responses.
"""
from __future__ import annotations
import json
from typing import TYPE_CHECKING, Final, cast
from .agent_output_line import AgentOutputLine
from .text_accumulator import TextAccumulator
if TYPE_CHECKING:
from collections.abc import Iterator
# Threshold: content shorter than this without paragraph boundary
# is treated as a streaming delta and accumulated.
# Content at or above this threshold is treated as a standalone message.
_SHORT_CONTENT_THRESHOLD = 200
# Bare JSON event type values that carry no user payload — suppress silently.
# Only exact matches after str.lower() on the "type" field are suppressed;
# longer strings like "starting the analysis" are never touched.
_LIFECYCLE_EVENT_TYPES: Final[frozenset[str]] = frozenset(
{
"start",
"begin",
"ready",
"thread.started",
"turn.started",
"message_start",
"message_stop",
"heartbeat",
"content_block_start",
"content_block_stop",
"user",
"assistant",
"thinking",
"message_delta",
}
)
[docs]
class GenericParser:
"""Generic NDJSON parser for unknown or simple agent formats.
This parser handles NDJSON by:
1. Parsing each line as JSON
2. Looking for common text fields (content, text, message, output)
3. Accumulating short text content and flushing on paragraph boundaries
4. Extracting error information
5. Falling back to raw line storage for unparseable content
Text deltas are accumulated into coherent blocks before emission, flushing on:
- ``\\n\\n`` paragraph boundary (incremental surfacing of long responses)
- Stop/done markers (end of message)
- Iterator exhaustion (final flush via ``_flush_all_accumulators()``)
Short content (below threshold) that doesn't end with ``\\n\\n`` is treated
as a streaming delta and accumulated. Content at or above the threshold,
or ending with ``\\n\\n``, is emitted immediately.
Field priority for content extraction:
1. content, text, message, output, response, result (type='text')
2. thought, reasoning (type='thinking') — only when no higher-priority field matches
"""
_STOP_TYPES: frozenset[str] = frozenset({"stop", "done", "complete", "finish", "end"})
def __init__(self) -> None:
self._text_accumulator: TextAccumulator | None = None
[docs]
def parse(self, lines: Iterator[str]) -> Iterator[AgentOutputLine]:
"""Parse generic streaming NDJSON lines.
Args:
lines: Iterator of raw lines from agent stdout.
Yields:
Normalized AgentOutputLine instances.
"""
for line in lines:
stripped = line.strip()
if not stripped:
continue
try:
parsed: object = json.loads(stripped, strict=False)
except json.JSONDecodeError:
# Not JSON, treat as raw text - flush any pending accumulator first
yield from self._flush_accumulator()
yield AgentOutputLine(type="raw", content=stripped, raw=stripped)
continue
if not isinstance(parsed, dict):
# Not a dict JSON object, treat as raw text - flush any pending accumulator first
yield from self._flush_accumulator()
yield AgentOutputLine(type="raw", content=stripped, raw=stripped)
continue
obj = cast("dict[str, object]", parsed)
# Suppress lifecycle-only events that carry no user payload.
type_val = str(obj.get("type", "")).lower()
if type_val in _LIFECYCLE_EVENT_TYPES:
continue
# Check for stop/done markers first
if self._is_stop(obj):
yield from self._flush_accumulator()
yield AgentOutputLine(type="stop", raw=stripped)
continue
# Check for error indicators
if self._is_error(obj):
# Flush pending text before emitting error
yield from self._flush_accumulator()
error_msg = self._extract_error(obj)
yield AgentOutputLine(
type="error",
content=error_msg,
raw=stripped,
metadata=obj,
)
continue
# Look for text content in high-priority fields
content = self._extract_content(obj)
if content:
yield from self._process_content(content, stripped)
continue
# thought/reasoning fields map to 'thinking' — lower priority than text fields
thinking = self._extract_thinking_content(obj)
if thinking:
yield from self._flush_accumulator()
yield AgentOutputLine(type="thinking", content=thinking, raw=stripped, metadata=obj)
continue
# If no content was extracted but we have valid JSON, store metadata
yield from self._flush_accumulator()
yield AgentOutputLine(type="unknown", raw=stripped, metadata=obj)
# Final flush: if iterator exhausted with pending accumulators, flush them all
yield from self._flush_all_accumulators()
def _is_short_content(self, content: str) -> bool:
"""Return True if content appears to be a short streaming delta.
Content shorter than the threshold that doesn't end with a paragraph
boundary is treated as a potential streaming delta.
"""
if len(content) >= _SHORT_CONTENT_THRESHOLD:
return False
return not content.endswith("\n\n")
def _process_content(self, content: str, raw: str) -> Iterator[AgentOutputLine]:
"""Process text content with delta accumulation and paragraph-boundary flush.
Args:
content: Extracted text content.
raw: Raw line for tracking.
Yields:
AgentOutputLine instances, possibly flushing accumulated content.
"""
if not content:
return
# If content ends with \n\n (paragraph boundary), flush and emit immediately
if content.endswith("\n\n"):
yield from self._flush_accumulator()
emit_content = content[:-2]
if emit_content:
yield AgentOutputLine(type="text", content=emit_content, raw=raw)
return
# Short content without paragraph boundary -> treat as streaming delta
if self._is_short_content(content):
if self._text_accumulator is None:
self._text_accumulator = TextAccumulator()
yield from self._text_accumulator.accumulate(
content, raw, kind="text", keep_current_when_empty=False
)
return
# Long content or content with sentence-ending punctuation -> standalone
# Flush any pending accumulator first, then emit immediately
yield from self._flush_accumulator()
yield AgentOutputLine(type="text", content=content, raw=raw)
def _flush_accumulator(self) -> Iterator[AgentOutputLine]:
"""Flush the single text accumulator and remove it."""
if self._text_accumulator is None:
return
acc = self._text_accumulator
self._text_accumulator = None
yield from acc.flush(kind="text")
def _flush_all_accumulators(self) -> Iterator[AgentOutputLine]:
"""Flush all pending accumulators on stop or iterator exhaustion."""
yield from self._flush_accumulator()
def _extract_content(self, obj: dict[str, object]) -> str:
"""Extract text content from JSON object using high-priority fields.
Args:
obj: Parsed JSON object.
Returns:
Extracted text content or empty string.
"""
for field_name in ("content", "text", "message", "output", "response", "result"):
value = obj.get(field_name)
if isinstance(value, str) and value:
return value
if isinstance(value, dict):
# Sometimes content is nested
nested = value.get("text") or value.get("content")
if isinstance(nested, str) and nested:
return nested
return ""
def _extract_thinking_content(self, obj: dict[str, object]) -> str:
"""Extract reasoning/thought text from low-priority fields.
Only called when no high-priority content field matched.
Args:
obj: Parsed JSON object.
Returns:
Thinking content string, or empty string if not present.
"""
for field_name in ("thought", "reasoning"):
value = obj.get(field_name)
if isinstance(value, str) and value:
return value
return ""
def _is_error(self, obj: dict[str, object]) -> bool:
"""Check if object represents an error.
Args:
obj: Parsed JSON object.
Returns:
True if object appears to be an error.
"""
type_val = str(obj.get("type", "")).lower()
return "error" in type_val or bool(obj.get("error"))
def _extract_error(self, obj: dict[str, object]) -> str:
"""Extract error message from object.
Args:
obj: Parsed JSON object.
Returns:
Error message string.
"""
error = obj.get("error")
if isinstance(error, str):
return error
if isinstance(error, dict):
return str(error.get("message", error.get("type", "unknown error")))
return str(obj.get("message", obj.get("msg", "unknown error")))
def _is_stop(self, obj: dict[str, object]) -> bool:
"""Check if object represents end of output.
Args:
obj: Parsed JSON object.
Returns:
True if object represents end of stream.
"""
type_val = str(obj.get("type", "")).lower()
return type_val in self._STOP_TYPES
__all__ = ["GenericParser"]