Source code for ralph.agents.idle_watchdog.idle_watchdog

"""Idle watchdog for detecting stalled agents."""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Protocol

from loguru import logger

from ralph.agents.execution_state import AgentExecutionState
from ralph.process.child_liveness import AliveBy

from .corroboration_snapshot import CorroborationSnapshot, WaitingCorroborator
from .waiting_status_event import WaitingStatusEvent, WaitingStatusListener
from .waiting_status_kind import WaitingStatusKind
from .watchdog_fire_reason import WatchdogFireReason
from .watchdog_verdict import WatchdogVerdict

if TYPE_CHECKING:
    from collections.abc import Callable

    from .timeout_policy import TimeoutPolicy

    class Clock(Protocol):
        def monotonic(self) -> float: ...


[docs] @dataclass class IdleWatchdog: """Tracks agent idle time and decides when to fire the timeout. The watchdog owns the last_activity timestamp; the caller's loop must NEVER mutate `_last_activity` directly. Activity must flow through `record_activity()`, which preserves the cumulative WAITING_ON_CHILD ceiling while advancing the idle baseline. Direct resets here previously caused a false-negative bug where WAITING_ON_CHILD deferred the deadline forever. Cumulative WAITING_ON_CHILD time is an absolute ceiling that is preserved across every transition (heartbeat activity, drain windows, classify_quiet outcomes). Once recorded, cumulative time never decays during the session — this mirrors max_session_seconds semantics so neither ceiling can be defeated by a process that alternates between producing output and waiting on children. The session ceiling (max_session_seconds) is checked first on every evaluate() call and cannot be defeated by activity — record_activity() does not reset it. Status events are emitted via the optional listener: - ENTERED once when WAITING_ON_CHILD deferral begins. - PROGRESS at most once per waiting_status_interval_seconds (rate-limited). - SUSPECTED_FROZEN once per WAITING run when suspect threshold is crossed. - EXITED when transitioning out of WAITING_ON_CHILD. - HARD_STOP immediately before returning FIRE for CHILDREN_PERSIST_TOO_LONG. Listener exceptions are caught and logged at DEBUG; they never propagate. """ _config: TimeoutPolicy _clock: Clock _last_activity: float = field(init=False) _session_started_at: float = field(init=False) _waiting_on_child_started_at: float | None = field(default=None, init=False) _cumulative_waiting_on_child_seconds: float = field(default=0.0, init=False) _in_drain_window: bool = field(default=False, init=False) _drain_started_at: float | None = field(default=None, init=False) _last_fire_reason: WatchdogFireReason | None = field(default=None, init=False) _last_waiting_status_at: float | None = field(default=None, init=False) _suspicion_announced_for_run: bool = field(default=False, init=False) def __init__( self, config: TimeoutPolicy, clock: Clock, listener: WaitingStatusListener | None = None, *, corroborator: WaitingCorroborator | None = None, ) -> None: self._config = config self._clock = clock self._listener = listener self._corroborator = corroborator now = clock.monotonic() self._last_activity = now self._session_started_at = now self._waiting_on_child_started_at = None self._cumulative_waiting_on_child_seconds = 0.0 self._in_drain_window = False self._drain_started_at = None self._last_fire_reason = None self._last_waiting_status_at = None self._suspicion_announced_for_run = False self._entry_corroboration: CorroborationSnapshot | None = None self._log = logger.bind(component="idle_watchdog") @property def last_fire_reason(self) -> WatchdogFireReason | None: """The reason the watchdog fired, or None if it hasn't fired yet.""" return self._last_fire_reason @property def cumulative_waiting_on_child_seconds(self) -> float: """Cumulative seconds spent in WAITING_ON_CHILD state across all runs.""" return self._cumulative_waiting_on_child_seconds
[docs] def record_activity(self) -> None: """Record that the agent produced output; resets idle/drain/child state. Does NOT reset _session_started_at — the session ceiling is absolute and cannot be defeated by heartbeat activity. Does NOT reset _cumulative_waiting_on_child_seconds. Cumulative is a true absolute ceiling (parallel to the session ceiling) and never decays during the session. """ now = self._clock.monotonic() self._accumulate_waiting_run(now) self._last_activity = now self._in_drain_window = False self._drain_started_at = None
def _accumulate_waiting_run(self, now: float) -> None: """Add elapsed time from the current WAITING run to the cumulative total. Called on every transition OUT of the WAITING_ON_CHILD state so the cumulative total is preserved across WAITING<->ACTIVE oscillation. Double-counting is prevented by only calling this on transitions (not on consecutive WAITING evaluations). Emits a EXITED event if we were actually in a WAITING run. """ if self._waiting_on_child_started_at is not None: elapsed = now - self._waiting_on_child_started_at current_run_elapsed = max(0.0, elapsed) idle_elapsed = now - self._last_activity self._cumulative_waiting_on_child_seconds += current_run_elapsed self._emit( WaitingStatusKind.EXITED, current_run_seconds=current_run_elapsed, idle_elapsed=idle_elapsed, ) self._waiting_on_child_started_at = None self._last_waiting_status_at = None self._suspicion_announced_for_run = False self._entry_corroboration = None def _safe_corroborate(self) -> CorroborationSnapshot: """Call the corroborator safely, returning an empty snapshot on None or error.""" if self._corroborator is None: return CorroborationSnapshot() try: return self._corroborator() except Exception: self._log.debug("idle watchdog: corroborator raised (suppressed)") return CorroborationSnapshot() def _build_corroboration_diag( self, current: CorroborationSnapshot, ) -> dict[str, str | int | float | bool]: """Build a diagnostic dict comparing current corroboration snapshot to entry baseline.""" diag: dict[str, str | int | float | bool] = {} entry = self._entry_corroboration if ( current.workspace_event_count is not None and entry is not None and entry.workspace_event_count is not None ): diag["workspace_event_delta"] = ( current.workspace_event_count - entry.workspace_event_count ) if current.oldest_child_seconds is not None: diag["oldest_child_seconds"] = current.oldest_child_seconds if current.scoped_child_active is not None: diag["scoped_child_active"] = current.scoped_child_active if current.scoped_child_count is not None: diag["scoped_child_count"] = current.scoped_child_count if ( current.terminal_child_events_total is not None and entry is not None and entry.terminal_child_events_total is not None ): diag["terminal_child_events_since_entry"] = ( current.terminal_child_events_total - entry.terminal_child_events_total ) if current.last_activity_was_meaningful is False: diag["lifecycle_only_activity"] = True if current.alive_by is not None: diag["alive_by"] = current.alive_by return diag def _build_evidence_string( self, diag: dict[str, str | int | float | bool], ) -> str: """Compose a human-readable evidence label for a SUSPECTED_FROZEN event.""" suspect = self._config.suspect_waiting_on_child_seconds tokens: list[str] = [] ws_delta = diag.get("workspace_event_delta") oldest = diag.get("oldest_child_seconds") if ( isinstance(ws_delta, int | float) and ws_delta == 0 and isinstance(oldest, int | float) and suspect is not None and oldest >= suspect ): tokens.append("time_and_workspace_quiet") if diag.get("scoped_child_active") is True: tokens.append("time_and_scoped_child_active") if diag.get("lifecycle_only_activity") is True: tokens.append("time_and_lifecycle_only") return "+".join(tokens) if tokens else "time_only" def _emit( self, kind: WaitingStatusKind, current_run_seconds: float, idle_elapsed: float, *, ceiling_seconds: float | None = None, diagnostic: dict[str, str | int | float | bool] | None = None, ) -> None: """Build and dispatch a WaitingStatusEvent to the listener. Never propagates listener exceptions; logs at DEBUG if one is raised. """ if self._listener is None: return candidate_total = self._cumulative_waiting_on_child_seconds + current_run_seconds event = WaitingStatusEvent( kind=kind, cumulative_seconds=candidate_total, current_run_seconds=current_run_seconds, idle_elapsed_seconds=idle_elapsed, ceiling_seconds=( self._config.max_waiting_on_child_seconds if ceiling_seconds is None else ceiling_seconds ), suspect_threshold_seconds=self._config.suspect_waiting_on_child_seconds, diagnostic=dict(diagnostic) if diagnostic else {}, ) try: self._listener(event) except Exception: self._log.debug("idle watchdog: listener raised (suppressed)")
[docs] def evaluate( self, *, classify_quiet: Callable[[], AgentExecutionState], ) -> WatchdogVerdict: """Evaluate whether the watchdog should fire, wait, or continue. The session ceiling is checked first (before idle deadline) because it is absolute and activity cannot reset it. Args: classify_quiet: Called only when the idle deadline has elapsed; returns the current AgentExecutionState to distinguish child-wait from stall. Also called on every drain-window tick to detect newly appearing children (which abort the drain and resume deferral). Returns: CONTINUE: keep running normally. WAITING_ON_CHILD: idle deadline elapsed; children still active; last_activity not reset. FIRE: idle deadline elapsed with no valid deferral; caller must terminate. """ now = self._clock.monotonic() if self._config.max_session_seconds is not None: session_elapsed = now - self._session_started_at if session_elapsed >= self._config.max_session_seconds: self._last_fire_reason = WatchdogFireReason.SESSION_CEILING_EXCEEDED idle_elapsed = now - self._last_activity self._log.warning( "idle watchdog: FIRE reason={} session_elapsed={}s" " idle_elapsed={}s cumulative_waiting={}s", WatchdogFireReason.SESSION_CEILING_EXCEEDED, round(session_elapsed, 1), round(idle_elapsed, 1), round(self._cumulative_waiting_on_child_seconds, 1), ) return WatchdogVerdict.FIRE if self._config.idle_timeout_seconds is None: return WatchdogVerdict.CONTINUE idle_elapsed = now - self._last_activity if idle_elapsed < self._config.idle_timeout_seconds: self._accumulate_waiting_run(now) return WatchdogVerdict.CONTINUE if self._in_drain_window: return self._handle_drain_window(now, classify_quiet) quiet_state = classify_quiet() if quiet_state == AgentExecutionState.WAITING_ON_CHILD: return self._handle_waiting_branch(now) return self._handle_active_branch(now)
def _handle_drain_window( self, now: float, classify_quiet: Callable[[], AgentExecutionState], ) -> WatchdogVerdict: """Handle evaluation while in the drain window. Re-consults classify_quiet on every tick. If children appear during the drain window, the drain is abandoned and we fall back to WAITING_ON_CHILD deferral to prevent false-positive fires while children are alive. """ assert self._drain_started_at is not None quiet_state = classify_quiet() if quiet_state == AgentExecutionState.WAITING_ON_CHILD: self._in_drain_window = False self._drain_started_at = None self._log.info( "idle watchdog: drain window abandoned" " (children reappeared), switching to WAITING_ON_CHILD" ) return self._handle_waiting_branch(now) drain_elapsed = now - self._drain_started_at if drain_elapsed < self._config.drain_window_seconds: self._log.debug( "idle watchdog: drain window active drain_elapsed={}s window={}s", round(drain_elapsed, 3), self._config.drain_window_seconds, ) return WatchdogVerdict.CONTINUE idle_elapsed = now - self._last_activity self._last_fire_reason = WatchdogFireReason.NO_OUTPUT_DEADLINE self._log.warning( "idle watchdog: FIRE reason={} idle_elapsed={}s cumulative_waiting={}s", WatchdogFireReason.NO_OUTPUT_DEADLINE, round(idle_elapsed, 1), round(self._cumulative_waiting_on_child_seconds, 1), ) return WatchdogVerdict.FIRE _NON_PROGRESS_ALIVE_BY_VALUES = frozenset( [ AliveBy.FRESH_HEARTBEAT_ONLY, AliveBy.STALE_LABEL_ONLY, AliveBy.OS_DESCENDANT_ONLY_STALE_PROGRESS, ] ) def _effective_waiting_ceiling( self, corroboration: CorroborationSnapshot, ) -> float: """Compute the effective waiting ceiling based on corroboration. Returns the shorter no-progress ceiling when the child is alive but not making forward progress (heartbeat-only, stale-label, or OS-descendant-only). Returns the standard full ceiling when the child is making progress or when the no-progress ceiling is disabled (None). """ if self._config.max_waiting_on_child_no_progress_seconds is None: return self._config.max_waiting_on_child_seconds alive_by = corroboration.alive_by if alive_by is None: return self._config.max_waiting_on_child_seconds if alive_by == AliveBy.FRESH_PROGRESS: return self._config.max_waiting_on_child_seconds if alive_by in self._NON_PROGRESS_ALIVE_BY_VALUES: return self._config.max_waiting_on_child_no_progress_seconds return self._config.max_waiting_on_child_seconds def _handle_waiting_branch(self, now: float) -> WatchdogVerdict: """Handle the WAITING_ON_CHILD deferral branch. Accumulates time within the current run WITHOUT mutating the cumulative total (which is only updated on transition out of WAITING). The ceiling check uses cumulative + current-run total to avoid double-counting. Emits structured status events (ENTERED, PROGRESS, SUSPECTED_FROZEN, HARD_STOP) rather than per-tick debug spam. Status emission cadence is governed by waiting_status_interval_seconds and does NOT affect ceiling math. When max_waiting_on_child_no_progress_seconds is set and corroboration shows non-progress evidence (heartbeat-only, stale-label, or OS-descendant-only), the shorter no-progress ceiling is used instead of the full ceiling. """ idle_elapsed = now - self._last_activity if self._waiting_on_child_started_at is None: self._entry_corroboration = self._safe_corroborate() self._waiting_on_child_started_at = now self._last_waiting_status_at = now self._suspicion_announced_for_run = False self._log.info( "idle watchdog: entering WAITING_ON_CHILD deferral idle_elapsed={}s cumulative={}s", round(idle_elapsed, 1), round(self._cumulative_waiting_on_child_seconds, 1), ) entry_ceiling = self._effective_waiting_ceiling(self._entry_corroboration) self._emit( WaitingStatusKind.ENTERED, current_run_seconds=0.0, idle_elapsed=idle_elapsed, ceiling_seconds=entry_ceiling, ) current_run_elapsed = now - self._waiting_on_child_started_at candidate_total = self._cumulative_waiting_on_child_seconds + current_run_elapsed current_corr = self._safe_corroborate() effective_ceiling = self._effective_waiting_ceiling(current_corr) if candidate_total >= effective_ceiling: self._last_fire_reason = WatchdogFireReason.CHILDREN_PERSIST_TOO_LONG corr_diag_hs = self._build_corroboration_diag(current_corr) corr_diag_hs["evidence"] = self._build_evidence_string(corr_diag_hs) diag: dict[str, str | int | float | bool] = { "cumulative": round(candidate_total, 1), "run_elapsed": round(current_run_elapsed, 1), "idle_elapsed": round(idle_elapsed, 1), "ceiling": effective_ceiling, "effective_ceiling": ( "no_progress" if effective_ceiling < self._config.max_waiting_on_child_seconds else "standard" ), } if self._config.suspect_waiting_on_child_seconds is not None: diag["suspect_threshold"] = self._config.suspect_waiting_on_child_seconds for key, value in corr_diag_hs.items(): if key not in diag: diag[key] = value self._emit( WaitingStatusKind.HARD_STOP, current_run_seconds=current_run_elapsed, idle_elapsed=idle_elapsed, ceiling_seconds=effective_ceiling, diagnostic=diag, ) self._log.warning( "idle watchdog: FIRE reason={} idle_elapsed={}s cumulative_waiting={}s", WatchdogFireReason.CHILDREN_PERSIST_TOO_LONG, round(idle_elapsed, 1), round(candidate_total, 1), ) return WatchdogVerdict.FIRE if ( self._config.suspect_waiting_on_child_seconds is not None and not self._suspicion_announced_for_run and candidate_total >= self._config.suspect_waiting_on_child_seconds ): self._suspicion_announced_for_run = True corr_diag_sf = self._build_corroboration_diag(current_corr) corr_diag_sf["evidence"] = self._build_evidence_string(corr_diag_sf) self._log.warning( "idle watchdog: SUSPECTED_FROZEN candidate_total={}s suspect={}s ceiling={}s", round(candidate_total, 1), self._config.suspect_waiting_on_child_seconds, effective_ceiling, ) self._emit( WaitingStatusKind.SUSPECTED_FROZEN, current_run_seconds=current_run_elapsed, idle_elapsed=idle_elapsed, ceiling_seconds=effective_ceiling, diagnostic=corr_diag_sf, ) assert self._last_waiting_status_at is not None if now - self._last_waiting_status_at >= self._config.waiting_status_interval_seconds: self._last_waiting_status_at = now corr_diag_pr = self._build_corroboration_diag(current_corr) if effective_ceiling < self._config.max_waiting_on_child_seconds: corr_diag_pr["effective_ceiling"] = "no_progress" self._log.info( "idle watchdog: WAITING_ON_CHILD progress cumulative={}s ceiling={}s", round(candidate_total, 1), round(effective_ceiling, 1), ) self._emit( WaitingStatusKind.PROGRESS, current_run_seconds=current_run_elapsed, idle_elapsed=idle_elapsed, ceiling_seconds=effective_ceiling, diagnostic=corr_diag_pr, ) return WatchdogVerdict.WAITING_ON_CHILD def _handle_active_branch(self, now: float) -> WatchdogVerdict: """Handle the case where the agent appears active (no children visible). Accumulates any elapsed WAITING run time before entering the drain window. When drain_window_seconds=0, fires immediately without a drain window. """ idle_elapsed = now - self._last_activity self._accumulate_waiting_run(now) if self._config.drain_window_seconds == 0.0: self._last_fire_reason = WatchdogFireReason.NO_OUTPUT_DEADLINE self._log.warning( "idle watchdog: FIRE reason={} idle_elapsed={}s cumulative_waiting={}s", WatchdogFireReason.NO_OUTPUT_DEADLINE, round(idle_elapsed, 1), round(self._cumulative_waiting_on_child_seconds, 1), ) return WatchdogVerdict.FIRE self._in_drain_window = True self._drain_started_at = now self._log.info( "idle watchdog: entering drain window idle_elapsed={}s cumulative_waiting={}s", round(idle_elapsed, 1), round(self._cumulative_waiting_on_child_seconds, 1), ) return WatchdogVerdict.CONTINUE