Source code for ralph.agents.invoke._commands

"""Command building for agent invocation."""

from __future__ import annotations

import json
import shlex
import shutil
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING, cast

from ralph.agents.invoke._errors import UnsupportedMcpTransportError
from ralph.agents.invoke._process_reader import _agent_command_name
from ralph.agents.invoke._types import _BuildCommandOptions
from ralph.config.enums import AgentTransport
from ralph.mcp.transport.claude import claude_mcp_config

if TYPE_CHECKING:
    from ralph.config.models import AgentConfig

_MODELED_FLAG_PARTS = 2


def _agent_transport(config: AgentConfig) -> AgentTransport:
    transport = config.transport
    if transport is None:
        return AgentTransport.GENERIC
    return transport


def _shell_single_quote(value: str) -> str:
    escaped = value.replace("'", "'\\''")
    return f"'{escaped}'"


def _interactive_stop_sentinel_path(session_id: str) -> Path:
    return Path(tempfile.gettempdir()) / f"ralph-claude-interactive-{session_id}.done"


def _interactive_stop_hook_settings(sentinel_path: Path) -> str:
    command = f"touch {_shell_single_quote(str(sentinel_path))}"
    settings: dict[str, object] = {
        "hooks": {
            "Stop": [
                {
                    "hooks": [
                        {
                            "type": "command",
                            "command": command,
                        }
                    ]
                }
            ]
        }
    }
    return json.dumps(settings)


def _resolve_prompt_path(prompt_file: str, workspace_path: Path | None) -> Path:
    prompt_path = Path(prompt_file)
    if prompt_path.is_absolute() or workspace_path is None:
        return prompt_path
    return workspace_path / prompt_path


def _sidecar_path_for_prompt(prompt_path: Path) -> Path | None:
    if not prompt_path.name.endswith("_prompt.md"):
        return None
    normalized = prompt_path.stem.removesuffix("_prompt")
    return prompt_path.parent / f"{normalized}_multimodal_handoff.json"


def _read_multimodal_sidecar(
    prompt_file: str,
    workspace_path: Path | None,
) -> list[dict[str, object]] | None:
    resolved = _resolve_prompt_path(prompt_file, workspace_path)
    sidecar = _sidecar_path_for_prompt(resolved)
    if sidecar is None or not sidecar.exists():
        return None
    try:
        data: dict[str, object] = json.loads(sidecar.read_text(encoding="utf-8"))
        artifacts = data.get("artifacts")
        if isinstance(artifacts, list) and artifacts:
            return cast("list[dict[str, object]]", artifacts)
        return None
    except Exception:
        return None


def _build_multimodal_appendix(artifacts: list[dict[str, object]]) -> str:
    lines = [
        "",
        "",
        "## Multimodal Artifacts",
        "",
        "The following artifacts are available via Ralph's MCP surface.",
        "Retrieve each artifact by calling the read_media tool"
        " with path=<ralph://media/...> replay handle:",
        "",
    ]
    for entry in artifacts:
        modality = entry.get("modality", "unknown")
        title = entry.get("title", "untitled")
        uri = entry.get("uri", "")
        delivery = entry.get("delivery", "resource_reference_replay")
        block_type = entry.get("block_type", "")
        reason = entry.get("reason", "")
        failure_kind = entry.get("failure_kind", "")
        lines.append(f"- [{modality}] {title}")
        lines.append(f"  path={uri}")
        lines.append(f"  Delivery: {delivery}")
        if block_type:
            lines.append(f"  Block-type: {block_type}")
        if failure_kind == "unsupported_runtime_seam":
            reason_suffix = f" Reason: {reason}" if reason else ""
            lines.append(
                f"  Note: the upstream artifact exists but cannot be delivered"
                f" through the active runtime seam.{reason_suffix}"
                " Do not use read_media, replay handles, or typed blocks for this artifact."
            )
        elif delivery == "resource_reference_replay":
            lines.append(
                "  Note: if the artifact is from a previous session it may not be"
                " replayable; read_media will return an explicit"
                " missing_replay_source failure in that case."
            )
        elif delivery == "typed_block":
            block_type_hint = f" (block_type={block_type!r})" if block_type else ""
            lines.append(
                f"  Note: call read_media with this path to receive a typed block"
                f"{block_type_hint} for direct delivery to the model."
            )
        elif delivery == "resource_reference":
            lines.append(
                "  Note: this artifact references an external URI; the model may"
                " access it directly via the URI without calling read_media."
            )
        elif delivery == "unsupported":
            reason_suffix = f" Reason: {reason}" if reason else ""
            lines.append(
                f"  Note: this modality is unsupported by the active provider;"
                f"{reason_suffix}"
                " read_media will return an explicit unsupported_modality failure."
            )
        lines.append("")
    return "\n".join(lines)


def _extend_claude_transport_flags(
    cmd: list[str],
    transport: AgentTransport,
    build_options: _BuildCommandOptions,
) -> None:
    if (
        transport not in (AgentTransport.CLAUDE, AgentTransport.CLAUDE_INTERACTIVE)
        or build_options.mcp_endpoint is None
    ):
        return

    # Claude/CCS non-interactive MCP mode is brittle around `--tools ""` combined
    # with `--allowedTools`. We only emit the tool restriction flags when live MCP
    # tool discovery succeeds and yields a non-empty allowlist; otherwise we keep the
    # strict MCP server isolation but avoid the known empty-tool edge case entirely.
    cmd.extend(
        [
            "--mcp-config",
            claude_mcp_config(
                build_options.mcp_endpoint,
                workspace_path=build_options.workspace_path,
            ),
            "--strict-mcp-config",
        ]
    )
    if build_options.allowed_mcp_tool_names:
        cmd.extend(
            [
                "--tools",
                "",
                "--allowedTools",
                ",".join(build_options.allowed_mcp_tool_names),
            ]
        )


def _append_transport_prompt_arg(
    cmd: list[str],
    transport: AgentTransport,
    prompt_file: str,
    build_options: _BuildCommandOptions,
) -> None:
    if (
        transport in (AgentTransport.CLAUDE, AgentTransport.CLAUDE_INTERACTIVE)
        and build_options.mcp_endpoint
    ):
        cmd.append("--")
        prompt_text = _load_prompt_text(prompt_file, build_options.workspace_path)
        cmd.append(prompt_text)
        return
    cmd.append(prompt_file)


def _load_prompt_text(prompt_file: str, workspace_path: Path | None) -> str:
    text = _resolve_prompt_path(prompt_file, workspace_path).read_text(encoding="utf-8")
    artifacts = _read_multimodal_sidecar(prompt_file, workspace_path)
    if artifacts:
        text += _build_multimodal_appendix(artifacts)
    return text


def _split_optional_flag(flag: str | None) -> list[str]:
    if not flag:
        return []
    return shlex.split(flag)


def _normalize_opencode_model_flag(model_flag: str) -> list[str]:
    parts = model_flag.split()
    if len(parts) == _MODELED_FLAG_PARTS and parts[0] in {"-m", "--model"}:
        return [parts[0], parts[1].removeprefix("opencode/")]
    return parts


def _build_opencode_command(
    config: AgentConfig,
    prompt_file: str,
    *,
    options: _BuildCommandOptions,
) -> list[str]:
    prompt_text = _load_prompt_text(prompt_file, options.workspace_path)
    cmd = [_agent_command_name(config), "run"]
    if options.pure:
        cmd.append("--pure")
    cmd.extend(["--format", "json"])

    if config.session_flag and options.session_id:
        cmd.extend(config.session_flag.format(options.session_id).split())

    cmd.extend(_split_optional_flag(config.yolo_flag))

    if options.verbose and config.verbose_flag:
        cmd.append(config.verbose_flag)

    effective_model = options.model_flag or config.model_flag
    if effective_model:
        cmd.extend(_normalize_opencode_model_flag(effective_model))

    cmd.append(prompt_text)
    return cmd


def _build_codex_command(
    config: AgentConfig,
    prompt_file: str,
    *,
    options: _BuildCommandOptions,
) -> list[str]:
    prompt_text = _load_prompt_text(prompt_file, options.workspace_path)
    cmd = config.cmd.split()
    if config.output_flag is not None:
        cmd.append(config.output_flag)

    cmd.extend(_split_optional_flag(config.yolo_flag))

    effective_model = options.model_flag or config.model_flag
    if effective_model:
        cmd.extend(effective_model.split())

    cmd.append(prompt_text)
    return cmd


def _build_agy_command(
    config: AgentConfig,
    prompt_file: str,
    *,
    options: _BuildCommandOptions,
) -> list[str]:
    """Build the AGY command line.

    AGY uses: agy [--dangerously-skip-permissions] [--add-dir <path>] [--verbose] --print <prompt>
    """
    cmd = config.cmd.split()
    cmd.extend(_split_optional_flag(config.yolo_flag))
    if config.session_flag and options.session_id:
        cmd.extend(config.session_flag.format(options.session_id).split())
    if options.workspace_path is not None:
        cmd.extend(["--add-dir", str(options.workspace_path)])
    if options.verbose and config.verbose_flag:
        cmd.append(config.verbose_flag)
    if config.print_flag:
        cmd.append(config.print_flag)
    prompt_text = _load_prompt_text(prompt_file, options.workspace_path)
    cmd.append(prompt_text)
    return cmd


def _build_claude_interactive_command(
    config: AgentConfig,
    prompt_file: str,
    *,
    options: _BuildCommandOptions,
) -> list[str]:
    cmd = config.cmd.split()
    cmd.extend(_split_optional_flag(config.yolo_flag))
    _extend_claude_transport_flags(cmd, AgentTransport.CLAUDE_INTERACTIVE, options)
    if options.verbose and config.verbose_flag:
        cmd.append(config.verbose_flag)
    if config.session_flag and options.session_id:
        cmd.extend(config.session_flag.format(options.session_id).split())
    elif options.initial_session_id is not None:
        cmd.extend(["--session-id", options.initial_session_id])
    if options.settings_json is not None:
        cmd.extend(["--settings", options.settings_json])
    if options.system_prompt_file:
        cmd.extend(["--append-system-prompt-file", options.system_prompt_file])
    effective_model = options.model_flag or config.model_flag
    if effective_model:
        cmd.extend(effective_model.split())
    _append_transport_prompt_arg(cmd, AgentTransport.CLAUDE_INTERACTIVE, prompt_file, options)
    return cmd


def _build_command(
    config: AgentConfig,
    prompt_file: str,
    *,
    options: _BuildCommandOptions | None = None,
) -> list[str]:
    """Build the command line for agent invocation.

    Args:
        config: Agent configuration.
        prompt_file: Path to prompt file.

    Returns:
        List of command arguments.
    """
    build_options = options or _BuildCommandOptions()
    transport = _agent_transport(config)
    if build_options.mcp_endpoint and transport == AgentTransport.GENERIC:
        raise UnsupportedMcpTransportError(
            "Ralph MCP endpoint provided for agent without a supported transport adapter"
        )

    if transport == AgentTransport.OPENCODE:
        return _build_opencode_command(
            config,
            prompt_file,
            options=build_options,
        )

    if transport == AgentTransport.CODEX:
        return _build_codex_command(
            config,
            prompt_file,
            options=build_options,
        )

    if transport == AgentTransport.CLAUDE_INTERACTIVE:
        return _build_claude_interactive_command(
            config,
            prompt_file,
            options=build_options,
        )

    if transport == AgentTransport.AGY:
        return _build_agy_command(
            config,
            prompt_file,
            options=build_options,
        )

    cmd = config.cmd.split()
    if transport == AgentTransport.CLAUDE and config.output_flag is not None:
        cmd.append(config.output_flag)

    if config.print_flag:
        cmd.append(config.print_flag)

    if config.streaming_flag:
        cmd.append(config.streaming_flag)

    if config.session_flag and build_options.session_id:
        cmd.extend(config.session_flag.format(build_options.session_id).split())

    cmd.extend(_split_optional_flag(config.yolo_flag))

    if build_options.verbose and config.verbose_flag:
        cmd.append(config.verbose_flag)

    _extend_claude_transport_flags(cmd, transport, build_options)

    if transport == AgentTransport.CLAUDE and build_options.system_prompt_file:
        cmd.extend(["--append-system-prompt-file", build_options.system_prompt_file])

    effective_model = build_options.model_flag or config.model_flag
    if effective_model:
        cmd.extend(effective_model.split())

    _append_transport_prompt_arg(cmd, transport, prompt_file, build_options)
    return cmd


def _command_for_log(config: AgentConfig, cmd: list[str], prompt_file: str) -> str:
    logged_cmd = list(cmd)
    if (
        _agent_transport(config)
        in {
            AgentTransport.OPENCODE,
            AgentTransport.CODEX,
            AgentTransport.CLAUDE,
            AgentTransport.CLAUDE_INTERACTIVE,
            AgentTransport.AGY,
        }
        and logged_cmd
    ):
        logged_cmd[-1] = prompt_file
    return " ".join(logged_cmd)


[docs] def check_agent_available(config: AgentConfig) -> bool: """Check if an agent command is available. Args: config: Agent configuration. Returns: True if agent command exists and is executable. """ cmd = config.cmd.split() if not cmd: return False return shutil.which(cmd[0]) is not None