Source code for ralph.agents.system_clock
"""Production clock for the agent timeout subsystem."""
from __future__ import annotations
import threading
import time
from ralph.agents.clock import Clock
__all__ = ["SystemClock"]
[docs]
class SystemClock(Clock):
"""Production Clock: uses real wall-clock time."""
[docs]
def monotonic(self) -> float:
return time.monotonic()
[docs]
def sleep(self, seconds: float) -> None:
# threading.Event().wait is interruptible (preserves SIGINT semantics)
# and matches the existing lines_event.wait() pattern in invoke.py.
threading.Event().wait(seconds)
[docs]
def wait_for_event(self, event: threading.Event, seconds: float) -> bool:
return event.wait(seconds)