Source code for ralph.agents.invoke

"""Subprocess-based agent invocation with streaming NDJSON parsing.

This module handles invoking AI agents as subprocesses, parsing their
streaming NDJSON output, and managing the lifecycle of the process.

Key features:
- Line-by-line streaming from subprocess stdout to parser
- tqdm progress bar (or rich when TTY)
- loguru structured logging for every NDJSON line
- watchdog workspace monitoring for file-change events during execution
"""

from __future__ import annotations

import contextlib
import os
import shutil
import subprocess
from pathlib import Path
from typing import TYPE_CHECKING, cast
from uuid import uuid4

from loguru import logger

from ralph.agents.completion_signals import evaluate_completion
from ralph.agents.execution_state import strategy_for_transport
from ralph.agents.idle_watchdog import WatchdogFireReason
from ralph.agents.invoke._commands import (
    _agent_transport,
    _build_command,
    _build_opencode_command,
    _command_for_log,
    _interactive_stop_hook_settings,
    _interactive_stop_sentinel_path,
    check_agent_available,
)
from ralph.agents.invoke._completion import (
    _check_process_result,
    _CompletionCheckOptions,
    _wait_for_descendants_then_recheck,
)
from ralph.agents.invoke._errors import (
    AgentInactivityTimeoutError,
    AgentInvocationError,
    InactivityTimeoutOpts,
    InteractivePermissionPromptError,
    OpenCodeResumableExitError,
    UnsupportedMcpTransportError,
    _IdleStreamTimeoutError,
)
from ralph.agents.invoke._options import (
    InvokeRuntimeOptions,
    _log_workspace_completion,
    _policy_from_options,
    build_invoke_options_from_config,
)
from ralph.agents.invoke._process_reader import (
    _read_lines_from_process,
    _run_subprocess_and_read_lines,
)
from ralph.agents.invoke._pty_helpers import (
    _extract_choice_menu_state,
    _interactive_auto_response_for_prompt,
    _is_permission_prompt_line,
    _pending_vt_snapshot_line,
    _permission_prompt_action_message,
    _plan_choice_menu_response,
)
from ralph.agents.invoke._pty_reader import _run_pty_and_read_lines as _run_pty_and_read_lines_impl
from ralph.agents.invoke._session import _bounded_output_lines, extract_session_id
from ralph.agents.invoke._types import (
    InvokeOptions,
    ResolvedInvocationRuntime,
    _AgentRunCtx,
    _BuildCommandOptions,
    _ProcessReaderCtx,
    _PtyExtras,
)
from ralph.agents.invoke._workspace import WorkspaceMonitor
from ralph.config.enums import AgentTransport
from ralph.mcp.protocol.env import AGENT_LABEL_SCOPE_ENV, MCP_ENDPOINT_ENV, MCP_RUN_ID_ENV
from ralph.mcp.protocol.startup import (
    PreflightError,
    ensure_no_preflight_error,
    extract_preflight_tool_names,
    initialize_request,
    initialized_notification,
    parse_http_endpoint,
    post_http_jsonrpc_with_session,
    tools_list_request,
)
from ralph.mcp.session_plan import effective_session_mcp_plan_from_servers
from ralph.mcp.tools.names import claude_tool_name
from ralph.mcp.transport.agy import (
    agy_workspace_mcp_endpoint,
    load_existing_agy_upstream_servers,
)
from ralph.mcp.transport.claude import load_existing_claude_upstream_servers
from ralph.mcp.transport.codex import prepare_codex_home_with_upstreams
from ralph.mcp.transport.common import (
    mcp_toml_as_upstreams,
    set_upstream_mcp_config,
)
from ralph.mcp.transport.common import (
    merge_mcp_toml_into_upstreams as _merge_mcp_toml_into_upstreams,
)
from ralph.mcp.transport.opencode import build_opencode_provider_config
from ralph.process.child_liveness import ChildLivenessRegistry
from ralph.process.liveness import DefaultLivenessProbe
from ralph.process.manager import get_process_manager
from ralph.timeout_defaults import (
    CHILD_EXIT_RECONCILE_SECONDS,
    CHILD_HEARTBEAT_TTL_SECONDS,
    CHILD_PROGRESS_TTL_SECONDS,
    CHILD_STALE_LABEL_TTL_SECONDS,
)

if TYPE_CHECKING:
    from collections.abc import Iterator, Mapping
    from pathlib import Path

    from ralph.agents.timeout_clock import Clock
    from ralph.config.models import AgentConfig
    from ralph.mcp.upstream.config import UpstreamMcpServer


def _make_child_registry(opts: InvokeOptions) -> ChildLivenessRegistry:
    """Create a new per-invoke ChildLivenessRegistry using config-driven TTL values."""
    return ChildLivenessRegistry(
        progress_ttl=opts.child_progress_ttl_seconds
        if opts.child_progress_ttl_seconds is not None
        else CHILD_PROGRESS_TTL_SECONDS,
        heartbeat_ttl=opts.child_heartbeat_ttl_seconds
        if opts.child_heartbeat_ttl_seconds is not None
        else CHILD_HEARTBEAT_TTL_SECONDS,
        stale_label_ttl=opts.child_stale_label_ttl_seconds
        if opts.child_stale_label_ttl_seconds is not None
        else CHILD_STALE_LABEL_TTL_SECONDS,
        exit_reconcile=opts.child_exit_reconcile_seconds
        if opts.child_exit_reconcile_seconds is not None
        else CHILD_EXIT_RECONCILE_SECONDS,
    )


def _start_workspace_monitor(workspace_path: Path | None) -> WorkspaceMonitor | None:
    """Start workspace monitoring if path provided."""
    if workspace_path is None:
        return None
    monitor = WorkspaceMonitor(workspace_path)
    monitor.start()
    return monitor


def _stop_workspace_monitor(monitor: WorkspaceMonitor | None) -> None:
    """Stop workspace monitoring."""
    if monitor is not None:
        monitor.stop()


def _clear_session_completion_sentinel(workspace_path: Path, run_id: str) -> None:
    """Delete only the current run's completion sentinel."""
    sentinel_path = workspace_path / f".agent/completion_seen_{run_id}.json"
    sentinel_path.unlink(missing_ok=True)


def _apply_upstream_env(
    upstreams: tuple[UpstreamMcpServer, ...],
    workspace_path: Path | None,
    runtime_env: dict[str, str],
    server_env: dict[str, str],
) -> None:
    effective_mcp = effective_session_mcp_plan_from_servers(
        mcp_toml_as_upstreams(workspace_path),
        agent_upstream_servers=upstreams,
    )
    set_upstream_mcp_config(runtime_env, effective_mcp.effective_servers)
    set_upstream_mcp_config(server_env, effective_mcp.effective_servers)


def _prepare_interactive_claude_options(opts: InvokeOptions, config: AgentConfig) -> InvokeOptions:
    if _agent_transport(config) != AgentTransport.CLAUDE_INTERACTIVE:
        return opts
    session_id = opts.session_id or opts.initial_session_id or str(uuid4())
    sentinel_path = opts.stop_sentinel_path or _interactive_stop_sentinel_path(session_id)
    settings_json = opts.settings_json or _interactive_stop_hook_settings(sentinel_path)
    return InvokeOptions(
        model_flag=opts.model_flag,
        session_id=opts.session_id,
        verbose=opts.verbose,
        show_progress=opts.show_progress,
        workspace_path=opts.workspace_path,
        extra_env=opts.extra_env,
        idle_timeout_seconds=opts.idle_timeout_seconds,
        drain_window_seconds=opts.drain_window_seconds,
        max_waiting_on_child_seconds=opts.max_waiting_on_child_seconds,
        idle_poll_interval_seconds=opts.idle_poll_interval_seconds,
        parent_exit_grace_seconds=opts.parent_exit_grace_seconds,
        descendant_wait_timeout_seconds=opts.descendant_wait_timeout_seconds,
        descendant_wait_poll_seconds=opts.descendant_wait_poll_seconds,
        process_exit_wait_seconds=opts.process_exit_wait_seconds,
        max_session_seconds=opts.max_session_seconds,
        waiting_status_interval_seconds=opts.waiting_status_interval_seconds,
        suspect_waiting_on_child_seconds=opts.suspect_waiting_on_child_seconds,
        child_progress_ttl_seconds=opts.child_progress_ttl_seconds,
        child_heartbeat_ttl_seconds=opts.child_heartbeat_ttl_seconds,
        child_stale_label_ttl_seconds=opts.child_stale_label_ttl_seconds,
        child_exit_reconcile_seconds=opts.child_exit_reconcile_seconds,
        max_waiting_on_child_no_progress_seconds=opts.max_waiting_on_child_no_progress_seconds,
        pure=opts.pure,
        system_prompt_file=opts.system_prompt_file,
        waiting_listener=opts.waiting_listener,
        required_artifact=opts.required_artifact,
        explicit_completion_seen=opts.explicit_completion_seen,
        captured_session_id=opts.captured_session_id,
        initial_session_id=session_id,
        settings_json=settings_json,
        stop_sentinel_path=sentinel_path,
        permission_prompt_listener=opts.permission_prompt_listener,
    )


[docs] def invoke_agent( config: AgentConfig, prompt_file: str, *, options: InvokeOptions | None = None, _clock: Clock | None = None, ) -> Iterator[str]: """Invoke agent, yield parsed output lines as they arrive. Args: config: Agent configuration specifying command and flags. prompt_file: Path to PROMPT.md file to pass to agent. options: Optional invocation options. _clock: Injectable Clock for testing; production callers omit this. Yields: Raw agent output lines (before parsing). Raises: AgentInvocationError: If agent exits with non-zero code. """ opts = _prepare_interactive_claude_options(options or InvokeOptions(), config) runtime = resolve_invocation_runtime( config, opts.extra_env, opts.workspace_path, system_prompt_file=opts.system_prompt_file, ) runtime_env = runtime.agent_env mcp_endpoint = runtime.mcp_endpoint allowed_mcp_tool_names = provider_allowed_mcp_tool_names(config, mcp_endpoint) cmd = _build_command( config, prompt_file, options=_BuildCommandOptions( model_flag=opts.model_flag, session_id=opts.session_id, verbose=opts.verbose, pure=opts.pure, mcp_endpoint=mcp_endpoint, allowed_mcp_tool_names=allowed_mcp_tool_names, system_prompt_file=opts.system_prompt_file, workspace_path=opts.workspace_path, initial_session_id=opts.initial_session_id, settings_json=opts.settings_json, stop_sentinel_path=opts.stop_sentinel_path, ), ) logger.info("Invoking agent: {}", _command_for_log(config, cmd, prompt_file)) label_scope = None if runtime_env is not None: label_scope = runtime_env.get(str(AGENT_LABEL_SCOPE_ENV)) registry = _make_child_registry(opts) execution_strategy = strategy_for_transport( _agent_transport(config), label_scope=label_scope, registry=registry, ) liveness_probe = DefaultLivenessProbe(registry=registry) monitor = _start_workspace_monitor(opts.workspace_path) policy = _policy_from_options(opts) ctx = _AgentRunCtx( config=config, show_progress=opts.show_progress, extra_env=runtime_env, workspace_path=opts.workspace_path, policy=policy, execution_strategy=execution_strategy, liveness_probe=liveness_probe, waiting_listener=opts.waiting_listener, monitor=monitor, required_artifact=opts.required_artifact, clock=_clock, evaluate_completion_fn=evaluate_completion, ) try: transport = _agent_transport(config) if transport == AgentTransport.CLAUDE_INTERACTIVE: extras = _PtyExtras( expected_session_id=opts.session_id or opts.initial_session_id, stop_sentinel_path=opts.stop_sentinel_path, ) lines_iter = run_pty_and_read_lines(cmd, ctx, extras) yield from lines_iter elif transport == AgentTransport.AGY: run_id = (opts.extra_env or {}).get(str(MCP_RUN_ID_ENV)) or str(uuid4()) if opts.workspace_path is not None: _clear_session_completion_sentinel(opts.workspace_path, run_id) mcp_ctx = ( agy_workspace_mcp_endpoint(opts.workspace_path, runtime.mcp_endpoint) if runtime.mcp_endpoint and opts.workspace_path else contextlib.nullcontext() ) with mcp_ctx: yield from run_pty_and_read_lines( cmd, ctx, _PtyExtras(expected_session_id=run_id), ) else: yield from run_subprocess_and_read_lines(cmd, ctx) _log_workspace_completion(monitor) finally: _stop_workspace_monitor(monitor)
[docs] def resolve_invocation_runtime( config: AgentConfig, extra_env: dict[str, str] | None, workspace_path: Path | None, *, _base_env: Mapping[str, str] | None = None, system_prompt_file: str | None = None, ) -> ResolvedInvocationRuntime: """Build the runtime configuration needed to launch an agent. Resolves transport-specific environment variables, MCP server configuration, and endpoint address from ``config`` and ``extra_env``. Returns a ``ResolvedInvocationRuntime`` whose fields are ready to pass to the subprocess launcher. """ _env = _base_env if _base_env is not None else cast("Mapping[str, str]", os.environ) runtime_env = dict(extra_env or {}) server_env: dict[str, str] = {} endpoint = runtime_env.get(MCP_ENDPOINT_ENV) transport = _agent_transport(config) # Pre-compute early-exit result for transports that need one early_result: ResolvedInvocationRuntime | None = None if ( (transport == AgentTransport.OPENCODE and not endpoint) or ( transport == AgentTransport.CODEX and not endpoint and system_prompt_file is None ) or (transport == AgentTransport.AGY and not endpoint) or ( transport not in ( AgentTransport.OPENCODE, AgentTransport.CODEX, AgentTransport.CLAUDE, AgentTransport.CLAUDE_INTERACTIVE, AgentTransport.AGY, ) and not endpoint ) ): early_result = ResolvedInvocationRuntime(agent_env=runtime_env or None) if early_result is not None: return early_result # Transport-specific setup if transport == AgentTransport.OPENCODE: if endpoint is None: raise RuntimeError("endpoint must be set for OPENCODE transport") opencode_config = ( runtime_env.get("OPENCODE_CONFIG_CONTENT") or _env.get("OPENCODE_CONFIG_CONTENT") ) provider_config, upstreams = build_opencode_provider_config( opencode_config, endpoint, ) runtime_env["OPENCODE_CONFIG_CONTENT"] = provider_config _apply_upstream_env(upstreams, workspace_path, runtime_env, server_env) elif transport == AgentTransport.CODEX: codex_home, upstreams = prepare_codex_home_with_upstreams( endpoint, workspace_path=workspace_path, existing_home=runtime_env.get("CODEX_HOME") or _env.get("CODEX_HOME"), system_prompt_file=system_prompt_file, ) runtime_env["CODEX_HOME"] = codex_home _apply_upstream_env(upstreams, workspace_path, runtime_env, server_env) elif transport in (AgentTransport.CLAUDE, AgentTransport.CLAUDE_INTERACTIVE): if endpoint: _apply_upstream_env( load_existing_claude_upstream_servers(workspace_path), workspace_path, runtime_env, server_env, ) elif transport == AgentTransport.AGY: # AGY upstream servers from existing config files are loaded for Ralph # upstream proxy. Ralph injects its own run-scoped endpoint via # agy_workspace_mcp_endpoint in invoke_agent(). _apply_upstream_env( load_existing_agy_upstream_servers(workspace_path), workspace_path, runtime_env, server_env, ) elif endpoint is not None: # Unsupported transport with endpoint raise UnsupportedMcpTransportError( f"Agent transport '{transport}' does not declare how to receive Ralph MCP wiring" ) return ResolvedInvocationRuntime( agent_env=runtime_env or None, server_env=server_env or None, mcp_endpoint=endpoint, )
def _provider_allowed_mcp_tool_names( config: AgentConfig, endpoint: str | None, ) -> tuple[str, ...]: if endpoint is None or _agent_transport(config) not in ( AgentTransport.CLAUDE, AgentTransport.CLAUDE_INTERACTIVE, ): return () try: visible_tool_names = discover_http_mcp_tool_names(endpoint) except (PreflightError, ValueError) as exc: logger.warning("Failed to discover Ralph MCP tools for provider allowlist: {}", exc) return () return tuple(claude_tool_name(tool_name) for tool_name in visible_tool_names) def _discover_http_mcp_tool_names(endpoint: str) -> list[str]: target = parse_http_endpoint(endpoint) initialize_response, session_id = post_http_jsonrpc_with_session( endpoint, target, initialize_request(), ) ensure_no_preflight_error("HTTP MCP initialize", initialize_response.get("error")) initialized_response, session_id = post_http_jsonrpc_with_session( endpoint, target, initialized_notification(), session_id=session_id, ) ensure_no_preflight_error( "HTTP MCP notifications/initialized", initialized_response.get("error") ) tools_response, _ = post_http_jsonrpc_with_session( endpoint, target, tools_list_request(), session_id=session_id, ) ensure_no_preflight_error("HTTP MCP tools/list", tools_response.get("error")) return extract_preflight_tool_names(tools_response.get("result"), "HTTP MCP") # Public aliases — test-accessible names and monkeypatch interception points. # Internal callers must use the public name so that monkeypatches intercept correctly. bounded_output_lines = _bounded_output_lines run_pty_and_read_lines = _run_pty_and_read_lines_impl run_subprocess_and_read_lines = _run_subprocess_and_read_lines pending_vt_snapshot_line = _pending_vt_snapshot_line extract_choice_menu_state = _extract_choice_menu_state plan_choice_menu_response = _plan_choice_menu_response permission_prompt_action_message = _permission_prompt_action_message is_permission_prompt_line = _is_permission_prompt_line interactive_auto_response_for_prompt = _interactive_auto_response_for_prompt build_command = _build_command BuildCommandOptions = _BuildCommandOptions command_for_log = _command_for_log provider_allowed_mcp_tool_names = _provider_allowed_mcp_tool_names discover_http_mcp_tool_names = _discover_http_mcp_tool_names build_opencode_command = _build_opencode_command CompletionCheckOptions = _CompletionCheckOptions check_process_result = _check_process_result IdleStreamTimeoutError = _IdleStreamTimeoutError ProcessReaderCtx = _ProcessReaderCtx read_lines_from_process = _read_lines_from_process wait_for_descendants_then_recheck = _wait_for_descendants_then_recheck policy_from_options = _policy_from_options merge_mcp_toml_into_upstreams = _merge_mcp_toml_into_upstreams # Re-export all public types and error classes __all__ = [ "AgentInactivityTimeoutError", "AgentInvocationError", "BuildCommandOptions", "CompletionCheckOptions", "IdleStreamTimeoutError", "InactivityTimeoutOpts", "InteractivePermissionPromptError", "InvokeOptions", "InvokeRuntimeOptions", "OpenCodeResumableExitError", "ProcessReaderCtx", "ResolvedInvocationRuntime", "UnsupportedMcpTransportError", "WatchdogFireReason", "WorkspaceMonitor", "bounded_output_lines", "build_command", "build_invoke_options_from_config", "build_opencode_command", "check_agent_available", "check_process_result", "command_for_log", "discover_http_mcp_tool_names", "extract_choice_menu_state", "extract_session_id", "get_process_manager", "interactive_auto_response_for_prompt", "invoke_agent", "is_permission_prompt_line", "pending_vt_snapshot_line", "permission_prompt_action_message", "plan_choice_menu_response", "policy_from_options", "provider_allowed_mcp_tool_names", "read_lines_from_process", "resolve_invocation_runtime", "run_pty_and_read_lines", "run_subprocess_and_read_lines", "shutil", "subprocess", "wait_for_descendants_then_recheck", ]