Source code for ralph.agents.system_clock

"""Production clock for the agent timeout subsystem."""

from __future__ import annotations

import threading
import time

from ralph.agents.clock import Clock

__all__ = ["SystemClock"]


[docs] class SystemClock(Clock): """Production Clock: uses real wall-clock time."""
[docs] def monotonic(self) -> float: return time.monotonic()
[docs] def sleep(self, seconds: float) -> None: # threading.Event().wait is interruptible (preserves SIGINT semantics) # and matches the existing lines_event.wait() pattern in invoke.py. threading.Event().wait(seconds)
[docs] def wait_for_event(self, event: threading.Event, seconds: float) -> bool: return event.wait(seconds)