Source code for ralph.agents.timeout_clock
"""Fake test clock for the agent timeout subsystem."""
from __future__ import annotations
from typing import TYPE_CHECKING
from ralph.agents.clock import Clock
from ralph.agents.system_clock import SystemClock
if TYPE_CHECKING:
import threading
__all__ = ["Clock", "FakeClock", "SystemClock"]
[docs]
class FakeClock:
"""Test Clock: advances logical time synchronously without real waits."""
def __init__(self, start: float = 0.0) -> None:
self._now: float = start
def monotonic(self) -> float:
return self._now
def sleep(self, seconds: float) -> None:
self._now += seconds
[docs]
def advance(self, seconds: float) -> None:
"""Advance logical time by seconds without blocking."""
self._now += seconds
def wait_for_event(self, event: threading.Event, seconds: float) -> bool:
self._now += seconds
return event.is_set()