Source code for ralph.agents.post_exit_watchdog

"""Post-exit watchdog for agent subprocess and parent-exit timeout enforcement.

PostExitWatchdog owns all wall-clock timeout decisions that occur after the
agent's stdout stream has closed (EOF): the post-EOF process-exit hang window,
the parent-exit grace window, and the descendant-quiesce window.

All wall-clock decisions go through the injected Clock so the watchdog is fully
testable without real sleeps (FakeClock) per CLAUDE.md test performance policy.

This module is the counterpart to ralph.agents.idle_watchdog.IdleWatchdog,
which owns in-stream idle/deadline timeouts.  Every wall-clock timeout fire
path in the agent invocation system routes through one of these two watchdogs;
no ad-hoc clock.monotonic()/clock.sleep() loops are allowed in invoke.py.

Fire-reason precedence:
  1. PROCESS_EXIT_HANG  — PostExitWatchdog.wait_for_process_exit()
  2. DESCENDANT_HANG    — PostExitWatchdog.wait_descendant_quiesce()
  3. (parent-exit grace does not fire; it returns verdicts that map to
     TERMINAL_COMPLETE / WAITING_ON_CHILD / RESUMABLE_CONTINUE)
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING

from loguru import logger

from ralph.agents.execution_state import AgentExecutionState
from ralph.agents.idle_watchdog import WatchdogFireReason
from ralph.agents.post_exit_verdict import PostExitVerdict

if TYPE_CHECKING:
    from collections.abc import Callable

    from ralph.agents.clock import Clock
    from ralph.agents.idle_watchdog import TimeoutPolicy

__all__ = [
    "PostExitVerdict",
    "PostExitWatchdog",
]


[docs] @dataclass class PostExitWatchdog: """Post-exit wall-clock watchdog for process-exit, parent-exit grace, and descendant-wait. All three wait methods poll at policy.descendant_wait_poll_seconds intervals and delegate wall-clock reading to self._clock so tests can use FakeClock for deterministic, sub-second test runs. Attributes: last_verdict_reason: Set by wait_for_process_exit() when returning FIRE_PROCESS_EXIT_HANG; None otherwise. Exposed so integration tests can assert the correct reason without introspecting private state. """ _policy: TimeoutPolicy _clock: Clock last_verdict_reason: PostExitVerdict | None = None def __init__(self, policy: TimeoutPolicy, clock: Clock) -> None: self._policy = policy self._clock = clock self.last_verdict_reason = None self._log = logger.bind(component="post_exit_watchdog")
[docs] def wait_for_process_exit(self, predicate_exit_observed: Callable[[], bool]) -> PostExitVerdict: """Wait for the subprocess to exit within process_exit_wait_seconds. Checks the predicate BEFORE the first sleep so an already-exited process returns immediately without consuming any clock budget. Args: predicate_exit_observed: Returns True when the subprocess has exited. Returns: CONTINUE: subprocess exited (predicate returned True) before deadline. FIRE_PROCESS_EXIT_HANG: deadline elapsed with predicate still False. """ deadline = self._clock.monotonic() + self._policy.process_exit_wait_seconds # Check-before-first-sleep preserves the original do-check-first ordering. if predicate_exit_observed(): self.last_verdict_reason = PostExitVerdict.CONTINUE return PostExitVerdict.CONTINUE while self._clock.monotonic() < deadline: self._clock.sleep(self._policy.descendant_wait_poll_seconds) if predicate_exit_observed(): self.last_verdict_reason = PostExitVerdict.CONTINUE return PostExitVerdict.CONTINUE self.last_verdict_reason = PostExitVerdict.FIRE_PROCESS_EXIT_HANG self._log.warning( "post-exit watchdog: FIRE reason={} elapsed={}s", WatchdogFireReason.PROCESS_EXIT_HANG, self._policy.process_exit_wait_seconds, ) return PostExitVerdict.FIRE_PROCESS_EXIT_HANG
[docs] def wait_parent_exit_grace( self, classify_exit_state: Callable[[], AgentExecutionState] ) -> PostExitVerdict: """Wait up to parent_exit_grace_seconds for completion signals or children. Polls classify_exit_state() at descendant_wait_poll_seconds intervals. The grace window covers the race where MCP-driven background subagents have been launched but not yet registered with the ProcessManager. Args: classify_exit_state: Returns the current AgentExecutionState by consulting evaluate_completion + execution_strategy.classify_exit. Returns: SIGNALS_PRESENT: classify_exit_state returned TERMINAL_COMPLETE. CHILDREN_ACTIVE: classify_exit_state returned WAITING_ON_CHILD. QUIESCED_NO_SIGNALS: deadline elapsed with RESUMABLE_CONTINUE. """ deadline = self._clock.monotonic() + self._policy.parent_exit_grace_seconds while self._clock.monotonic() < deadline: state = classify_exit_state() if state == AgentExecutionState.TERMINAL_COMPLETE: self.last_verdict_reason = PostExitVerdict.SIGNALS_PRESENT return PostExitVerdict.SIGNALS_PRESENT if state == AgentExecutionState.WAITING_ON_CHILD: self.last_verdict_reason = PostExitVerdict.CHILDREN_ACTIVE return PostExitVerdict.CHILDREN_ACTIVE self._clock.sleep(self._policy.descendant_wait_poll_seconds) # Final recheck after deadline. state = classify_exit_state() if state == AgentExecutionState.TERMINAL_COMPLETE: self.last_verdict_reason = PostExitVerdict.SIGNALS_PRESENT return PostExitVerdict.SIGNALS_PRESENT if state == AgentExecutionState.WAITING_ON_CHILD: self.last_verdict_reason = PostExitVerdict.CHILDREN_ACTIVE return PostExitVerdict.CHILDREN_ACTIVE self.last_verdict_reason = PostExitVerdict.QUIESCED_NO_SIGNALS return PostExitVerdict.QUIESCED_NO_SIGNALS
[docs] def wait_descendant_quiesce( self, classify_exit_state: Callable[[], AgentExecutionState] ) -> PostExitVerdict: """Wait for descendant processes to quiesce within descendant_wait_timeout_seconds. Polls classify_exit_state() at descendant_wait_poll_seconds intervals. If WAITING_ON_CHILD persists for the full deadline, FIRE_DESCENDANT_HANG is returned so the caller can fall back to RESUMABLE_CONTINUE semantics (raise OpenCodeResumableExitError) rather than treating this as silent success. Args: classify_exit_state: Returns the current AgentExecutionState. Returns: SIGNALS_PRESENT: TERMINAL_COMPLETE seen during polling. QUIESCED_NO_SIGNALS: RESUMABLE_CONTINUE seen (tree quiet before deadline). FIRE_DESCENDANT_HANG: deadline elapsed with WAITING_ON_CHILD persistent. """ deadline = self._clock.monotonic() + self._policy.descendant_wait_timeout_seconds while self._clock.monotonic() < deadline: state = classify_exit_state() if state == AgentExecutionState.TERMINAL_COMPLETE: self.last_verdict_reason = PostExitVerdict.SIGNALS_PRESENT return PostExitVerdict.SIGNALS_PRESENT if state == AgentExecutionState.RESUMABLE_CONTINUE: self.last_verdict_reason = PostExitVerdict.QUIESCED_NO_SIGNALS return PostExitVerdict.QUIESCED_NO_SIGNALS self._clock.sleep(self._policy.descendant_wait_poll_seconds) # Final recheck after deadline. state = classify_exit_state() if state == AgentExecutionState.TERMINAL_COMPLETE: self.last_verdict_reason = PostExitVerdict.SIGNALS_PRESENT return PostExitVerdict.SIGNALS_PRESENT if state == AgentExecutionState.WAITING_ON_CHILD: self.last_verdict_reason = PostExitVerdict.FIRE_DESCENDANT_HANG self._log.warning( "post-exit watchdog: FIRE reason={} elapsed={}s", WatchdogFireReason.DESCENDANT_HANG, self._policy.descendant_wait_timeout_seconds, ) return PostExitVerdict.FIRE_DESCENDANT_HANG self.last_verdict_reason = PostExitVerdict.QUIESCED_NO_SIGNALS return PostExitVerdict.QUIESCED_NO_SIGNALS