Source code for ralph.agents.parsers.text_accumulator
"""Shared text delta accumulator for parser implementations."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from .agent_output_line import AgentOutputLine
if TYPE_CHECKING:
from collections.abc import Iterator
[docs]
@dataclass(slots=True)
class TextAccumulator:
"""Shared delta accumulator for paragraph-boundary text flushing."""
buffer: str = ""
raw_lines: list[str] = field(default_factory=list)
[docs]
def accumulate(
self,
text: str,
raw: str,
*,
kind: str = "text",
keep_current_when_empty: bool,
) -> Iterator[AgentOutputLine]:
"""Append text/raw and yield an AgentOutputLine if a paragraph boundary is reached.
Args:
text: Text delta to append to the buffer.
raw: Raw JSON line to track.
kind: Output type for the emitted line ('text' or 'thinking').
keep_current_when_empty: When True, always keep the current raw line in the
tail after a flush even if remaining buffer is empty (unconditional rule).
When False, only keep it when remaining is non-empty.
"""
self.buffer += text
self.raw_lines.append(raw)
if "\n\n" in self.buffer:
parts = self.buffer.split("\n\n", 1)
flushed = parts[0]
remaining = parts[1]
if flushed:
flushed_raw = "\n".join(self.raw_lines[:-1])
yield AgentOutputLine(type=kind, content=flushed, raw=flushed_raw)
self.buffer = remaining
self.raw_lines = [raw] if (remaining or keep_current_when_empty) else []
[docs]
def flush(
self, *, kind: str = "text", require_strip: bool = False
) -> Iterator[AgentOutputLine]:
"""Yield remaining buffer content as an AgentOutputLine if non-empty, then reset.
Args:
kind: Output type for the emitted line ('text' or 'thinking').
require_strip: When True, only emit if buffer.strip() is non-empty (for
thinking accumulators that should suppress whitespace-only content).
"""
check = self.buffer.strip() if require_strip else self.buffer
if check:
raw_joined = "\n".join(self.raw_lines) if self.raw_lines else ""
yield AgentOutputLine(type=kind, content=self.buffer, raw=raw_joined)
self.buffer = ""
self.raw_lines = []