Source code for ralph.display.ring_buffer

"""Bounded ring buffer with drop-oldest policy and dropped-item counter.

Used by ParallelDisplay to absorb burst output from agents without OOM risk.
Thread-safe via threading.Lock — NOT asyncio-safe.
"""

import collections
import threading

PARALLEL_DISPLAY_BUFFER_SIZE: int = 1000


[docs] class RingBuffer: """Thread-safe bounded ring buffer with drop-oldest overflow policy. When the buffer is full and a new item arrives, the oldest item is dropped and `dropped_count` is incremented. Args: maxsize: Maximum number of items to retain. """ def __init__(self, maxsize: int) -> None: self._buf: collections.deque[str] = collections.deque(maxlen=maxsize) self._lock = threading.Lock() self._dropped_count: int = 0 def enqueue(self, item: str) -> None: with self._lock: if len(self._buf) == self._buf.maxlen: self._dropped_count += 1 self._buf.append(item)
[docs] def drain(self) -> list[str]: """Destructively return and clear buffered items.""" with self._lock: items = list(self._buf) self._buf.clear() return items
[docs] def snapshot(self, n: int | None = None) -> list[str]: """Return buffered items without clearing them. `drain()` is destructive; `snapshot()` is the non-destructive, read-only view for callers that need to inspect buffered output. """ with self._lock: items = list(self._buf) if n is None: return items if n == 0: return [] return items[-n:]
[docs] def consume_drop_delta(self) -> int: """Return and zero the dropped_count atomically. Each call returns how many items were dropped since the last call (or since construction). Thread-safe. """ with self._lock: delta = self._dropped_count self._dropped_count = 0 return delta
@property def dropped_count(self) -> int: with self._lock: return self._dropped_count