Source code for ralph.agents.execution_state.opencode_execution_strategy

"""Execution strategy for Opencode agents."""

from __future__ import annotations

from typing import TYPE_CHECKING, cast

from ralph.process.child_liveness import classify_child_snapshot

from ._helpers import (
    _AGENT_LABEL_PREFIX,
    _classify_opencode_child_signal,
    _evidence_precedence,
    _non_blank_output_signal,
    _os_descendant_state,
    _probe_check_quiet,
    _route_opencode_line_to_registry,
)
from .agent_execution_state import AgentExecutionState

if TYPE_CHECKING:
    from ralph.agents.activity import AgentActivitySignal
    from ralph.agents.completion_signals import CompletionSignals
    from ralph.process.child_liveness import ChildLivenessRegistry
    from ralph.process.liveness import LivenessProbe

    from ._live_descendant_handle import _LiveDescendantHandle


[docs] class OpenCodeExecutionStrategy: """OpenCode-aware strategy. Idle classification checks the injectable LivenessProbe before falling back to the psutil-based has_live_descendants(), so unit tests can inject a FakeLivenessProbe without spawning real processes. Exit classification uses evidence precedence: 1. terminal_ack_seen or schema-valid required artifact -> TERMINAL_COMPLETE 2. fresh progress in registry -> WAITING_ON_CHILD 3. live OS descendants with no fresh progress -> RESUMABLE_CONTINUE (stale) 4. else -> RESUMABLE_CONTINUE ``label_scope`` narrows the Ralph-tracked liveness check to processes whose labels start with ``agent:{label_scope}:``. When no scope is available, the empty-prefix registry-wide snapshot is consulted; the strategy never returns ACTIVE based on a never-matching sentinel. """ def __init__( self, *, label_scope: str | None = None, registry: ChildLivenessRegistry | None = None, ) -> None: self._label_scope = label_scope self._registry = registry def _active_label_prefix(self) -> str | None: if self._label_scope is None: return None return f"{_AGENT_LABEL_PREFIX}{self._label_scope}:"
[docs] def classify_activity_line(self, line: str) -> AgentActivitySignal | None: """Classify OpenCode output for idle-watchdog activity.""" signal = _classify_opencode_child_signal(line) if signal is not None: return signal return _non_blank_output_signal(line)
[docs] def observe_line(self, line: str) -> None: """Route a parsed output line into the child liveness registry.""" registry = cast("ChildLivenessRegistry | None", getattr(self, "_registry", None)) if registry is None: return _route_opencode_line_to_registry(line, registry, self._active_label_prefix() or "")
def classify_quiet( self, handle: _LiveDescendantHandle, liveness_probe: LivenessProbe, ) -> AgentExecutionState: prefix = self._active_label_prefix() probe_prefix = prefix if prefix is not None else "" scoped_child_evidence_stale = False registry = cast("ChildLivenessRegistry | None", getattr(self, "_registry", None)) if registry is not None: had_scoped_records = registry.has_records(probe_prefix) try: reg_snap = registry.snapshot(probe_prefix) verdict = classify_child_snapshot(reg_snap) if verdict.all_children_terminal: return AgentExecutionState.ACTIVE if verdict.deferral_allowed: return AgentExecutionState.WAITING_ON_CHILD if had_scoped_records: scoped_child_evidence_stale = True except Exception: pass probe_state, probe_stale = _probe_check_quiet(liveness_probe, probe_prefix) if probe_state is not None: return probe_state scoped_child_evidence_stale = scoped_child_evidence_stale or probe_stale if scoped_child_evidence_stale: return AgentExecutionState.ACTIVE return _os_descendant_state(handle, AgentExecutionState.ACTIVE) def classify_exit( self, handle: _LiveDescendantHandle, completion_signals: CompletionSignals, liveness_probe: LivenessProbe | None = None, ) -> AgentExecutionState: registry = cast("ChildLivenessRegistry | None", getattr(self, "_registry", None)) label_prefix = self._active_label_prefix() return _evidence_precedence( handle, completion_signals, liveness_probe, label_prefix, registry=registry ) def supports_session_continuation(self) -> bool: return True def supports_completion_enforcement(self) -> bool: return False