"""Runtime options and policy building for agent invocation."""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING
from loguru import logger
from ralph.agents.idle_watchdog import TimeoutPolicy, WaitingStatusListener
from ralph.agents.invoke._types import InvokeOptions
if TYPE_CHECKING:
from collections.abc import Callable
from pathlib import Path
from ralph.agents.invoke._workspace import WorkspaceMonitor
from ralph.config.models import GeneralConfig
from ralph.phases.required_artifacts import RequiredArtifact
[docs]
@dataclass(frozen=True)
class InvokeRuntimeOptions:
"""Non-timeout runtime options for agent invocation."""
verbose: bool = False
show_progress: bool = True
workspace_path: Path | None = None
extra_env: dict[str, str] | None = None
pure: bool = False
session_id: str | None = None
system_prompt_file: str | None = None
waiting_listener: WaitingStatusListener | None = None
permission_prompt_listener: Callable[[str], None] | None = None
required_artifact: RequiredArtifact | None = None
[docs]
def build_invoke_options_from_config(
general_config: GeneralConfig,
runtime: InvokeRuntimeOptions | None = None,
) -> InvokeOptions:
"""Build InvokeOptions from GeneralConfig, mapping all timeout fields."""
rt = runtime if runtime is not None else InvokeRuntimeOptions()
return InvokeOptions(
verbose=rt.verbose,
show_progress=rt.show_progress,
workspace_path=rt.workspace_path,
extra_env=rt.extra_env,
pure=rt.pure,
session_id=rt.session_id,
system_prompt_file=rt.system_prompt_file,
waiting_listener=rt.waiting_listener,
permission_prompt_listener=rt.permission_prompt_listener,
required_artifact=rt.required_artifact,
idle_timeout_seconds=general_config.agent_idle_timeout_seconds,
drain_window_seconds=general_config.agent_idle_drain_window_seconds,
max_waiting_on_child_seconds=general_config.agent_idle_max_waiting_on_child_seconds,
idle_poll_interval_seconds=general_config.agent_idle_poll_interval_seconds,
parent_exit_grace_seconds=general_config.agent_parent_exit_grace_seconds,
descendant_wait_timeout_seconds=general_config.agent_descendant_wait_timeout_seconds,
descendant_wait_poll_seconds=general_config.agent_descendant_wait_poll_seconds,
process_exit_wait_seconds=general_config.agent_process_exit_wait_seconds,
max_session_seconds=general_config.agent_max_session_seconds,
waiting_status_interval_seconds=general_config.agent_waiting_status_interval_seconds,
suspect_waiting_on_child_seconds=general_config.agent_suspect_waiting_on_child_seconds,
max_waiting_on_child_no_progress_seconds=general_config.agent_idle_no_progress_waiting_on_child_seconds,
child_progress_ttl_seconds=general_config.agent_child_progress_ttl_seconds,
child_heartbeat_ttl_seconds=general_config.agent_child_heartbeat_ttl_seconds,
child_stale_label_ttl_seconds=general_config.agent_child_stale_label_ttl_seconds,
child_exit_reconcile_seconds=general_config.agent_child_exit_reconcile_seconds,
)
def _policy_from_options(opts: InvokeOptions) -> TimeoutPolicy:
"""Build a TimeoutPolicy from InvokeOptions, falling back to policy defaults for None fields."""
_base = TimeoutPolicy(idle_timeout_seconds=opts.idle_timeout_seconds)
_effective_max = (
opts.max_waiting_on_child_seconds
if opts.max_waiting_on_child_seconds is not None
else _base.max_waiting_on_child_seconds
)
# Prefer opts values; fall back to TimeoutPolicy defaults. Disable suspicion when
# it would be >= the max ceiling (e.g. in tests with small max).
_suspect = (
opts.suspect_waiting_on_child_seconds
if opts.suspect_waiting_on_child_seconds is not None
else _base.suspect_waiting_on_child_seconds
)
if _suspect is not None and _effective_max is not None and _suspect >= _effective_max:
_suspect = None
return TimeoutPolicy(
idle_timeout_seconds=opts.idle_timeout_seconds,
drain_window_seconds=(
opts.drain_window_seconds
if opts.drain_window_seconds is not None
else _base.drain_window_seconds
),
max_waiting_on_child_seconds=_effective_max,
max_session_seconds=(
opts.max_session_seconds
if opts.max_session_seconds is not None
else _base.max_session_seconds
),
idle_poll_interval_seconds=(
opts.idle_poll_interval_seconds
if opts.idle_poll_interval_seconds is not None
else _base.idle_poll_interval_seconds
),
parent_exit_grace_seconds=(
opts.parent_exit_grace_seconds
if opts.parent_exit_grace_seconds is not None
else _base.parent_exit_grace_seconds
),
descendant_wait_timeout_seconds=(
opts.descendant_wait_timeout_seconds
if opts.descendant_wait_timeout_seconds is not None
else _base.descendant_wait_timeout_seconds
),
descendant_wait_poll_seconds=(
opts.descendant_wait_poll_seconds
if opts.descendant_wait_poll_seconds is not None
else _base.descendant_wait_poll_seconds
),
process_exit_wait_seconds=(
opts.process_exit_wait_seconds
if opts.process_exit_wait_seconds is not None
else _base.process_exit_wait_seconds
),
waiting_status_interval_seconds=(
opts.waiting_status_interval_seconds
if opts.waiting_status_interval_seconds is not None
else _base.waiting_status_interval_seconds
),
suspect_waiting_on_child_seconds=_suspect,
max_waiting_on_child_no_progress_seconds=(
opts.max_waiting_on_child_no_progress_seconds
if opts.max_waiting_on_child_no_progress_seconds is not None
else _base.max_waiting_on_child_no_progress_seconds
if (
_effective_max is not None
and _base.max_waiting_on_child_no_progress_seconds is not None
and _base.max_waiting_on_child_no_progress_seconds <= _effective_max
)
else None
),
)
def _log_workspace_completion(monitor: WorkspaceMonitor | None) -> None:
"""Log workspace changes if monitoring.
Args:
monitor: Workspace monitor instance.
"""
if monitor is None:
return
logger.debug(
"Agent completed. Workspace changes: {} files, {} events",
len(monitor.changed_files),
monitor.event_count,
)