"""MCP exec tool handler.
Ports the Rust MCP `exec` tool so agents can execute bounded subprocesses
from the workspace root after capability checks and blacklist filtering.
"""
from __future__ import annotations
import shlex
import subprocess
from pathlib import Path
from typing import TYPE_CHECKING, Protocol, cast, runtime_checkable
from ralph.mcp.tools._exec_completed_process import _CompletedProcessAdapter
from ralph.mcp.tools._exec_execution_error import ExecutionError
from ralph.mcp.tools._exec_params import ExecParams
from ralph.mcp.tools._exec_run_deps import CwdProvider, ExecRunDeps
from ralph.mcp.tools.coordination import (
CapabilityDeniedError,
CoordinationSessionLike,
InvalidParamsError,
ToolContent,
ToolResult,
require_capability,
)
from ralph.process.manager import SpawnOptions, get_process_manager
if TYPE_CHECKING:
from collections.abc import Mapping
PROCESS_EXEC_BOUNDED_CAPABILITY = "ProcessExecBounded"
DEFAULT_TIMEOUT_MS = 30_000
_TIMEOUT_NOTE_THRESHOLD_MS = 60_000
_KILL_SIGNAL_ARG_COUNT = 2
_ARCHIVE_EXTENSIONS = (".tar", ".zip", ".gz", ".bz2", ".xz")
_ARCHIVE_EXTRACT_FLAGS = ("-x", "--extract", "-d", "--delete")
_SHELL_OPERATOR_TOKENS = frozenset({"|", "||", "&&", ";", "&", ">", ">>", "<", "<<"})
_EXEC_USAGE_EXAMPLES = (
'Examples: {"command": "python -m pytest"}, '
'{"command": ["python", "-m", "pytest"]}, '
'{"argv": ["python", "-m", "pytest"]}.'
)
_BLACKLIST_DESCRIPTIONS = {
"version_control": "version control system",
"privilege_escalation": "privilege escalation",
"destructive_system": "destructive system operation",
"network_exfiltration": "network/exfiltration",
"package_manager": "package manager",
"container_escape": "container/VM escape",
"multi_file_operation": "multi-file operation",
}
_VERSION_CONTROL_COMMANDS = {"git", "svn", "hg", "fossil", "bzr", "darcs"}
_PRIVILEGE_ESCALATION_COMMANDS = {"sudo", "su", "doas", "pkexec", "runuser"}
_DESTRUCTIVE_SYSTEM_COMMANDS = {"shutdown", "reboot", "halt", "poweroff", "killall"}
_NETWORK_TUNNEL_COMMANDS = {"nc", "ncat", "netcat", "socat"}
_REMOTE_NETWORK_COMMANDS = {"ssh", "scp", "rsync"}
_CONTAINER_COMMANDS = {"docker", "podman", "chroot", "nsenter", "unshare"}
_PACKAGE_MANAGERS = {"apt", "yum", "dnf", "pacman", "brew"}
[docs]
@runtime_checkable
class WorkspaceWithRoot(Protocol):
"""Workspace surface required for command execution."""
@property
def root(self) -> Path:
"""Return the absolute workspace root path."""
...
[docs]
def parse_exec_params(params: Mapping[str, object]) -> ExecParams:
"""Parse and validate exec tool parameters."""
command_tokens = _parse_exec_command_tokens(params)
args = _parse_exec_args(params.get("args"))
command = command_tokens[0] if command_tokens else ""
merged_args = [*command_tokens[1:], *args]
timeout_value = params.get("timeout_ms", DEFAULT_TIMEOUT_MS)
timeout_ms = (
timeout_value
if isinstance(timeout_value, int) and timeout_value >= 0
else DEFAULT_TIMEOUT_MS
)
return ExecParams(command=command, args=merged_args, timeout_ms=timeout_ms)
def _parse_exec_command_tokens(params: Mapping[str, object]) -> list[str]:
command_value = params.get("command")
if isinstance(command_value, str):
return _parse_shell_words(command_value, field_name="command")
if isinstance(command_value, list):
return _coerce_argv_tokens(command_value, field_name="command")
if command_value is not None:
raise InvalidParamsError(
"'command' must be a string or string array. " + _EXEC_USAGE_EXAMPLES
)
argv_value = params.get("argv")
if isinstance(argv_value, str):
return _parse_shell_words(argv_value, field_name="argv")
if isinstance(argv_value, list):
return _coerce_argv_tokens(argv_value, field_name="argv")
if argv_value is not None:
raise InvalidParamsError("'argv' must be a string or string array. " + _EXEC_USAGE_EXAMPLES)
raise InvalidParamsError("Missing 'command' or 'argv' parameter. " + _EXEC_USAGE_EXAMPLES)
def _parse_exec_args(args_value: object) -> list[str]:
if isinstance(args_value, list):
return [value for value in args_value if isinstance(value, str)]
if isinstance(args_value, str):
return _parse_shell_words(args_value, field_name="args")
return []
def _coerce_argv_tokens(values: list[object], *, field_name: str) -> list[str]:
tokens = [value for value in values if isinstance(value, str)]
if not tokens:
raise InvalidParamsError(f"{field_name} must include at least one string token")
if any(token in _SHELL_OPERATOR_TOKENS for token in tokens):
raise InvalidParamsError(
f"{field_name} must not use shell control operators: exec does not run a shell. "
"Pass a plain command and arguments instead."
)
return tokens
def _parse_shell_words(value: str, *, field_name: str) -> list[str]:
stripped = value.strip()
if not stripped:
return []
try:
lexer = shlex.shlex(stripped, posix=True, punctuation_chars="|&;<>")
lexer.whitespace_split = True
lexer.commenters = ""
tokens = list(lexer)
except ValueError as exc:
raise InvalidParamsError(f"Malformed {field_name} value: {exc}") from exc
if any(token in _SHELL_OPERATOR_TOKENS for token in tokens):
raise InvalidParamsError(
f"{field_name} must not use shell control operators: exec does not run a shell. "
"Pass a plain command and arguments instead."
)
return tokens
[docs]
def check_command(command: str, args: list[str]) -> str | None:
"""Return a denial reason when a command matches the blacklist policy."""
cmd = command.strip()
if not cmd:
return None
for checker in (
check_version_control,
check_privilege_escalation,
check_destructive_system,
check_network_exfiltration,
check_package_manager,
check_container_escape,
check_multi_file_operation,
):
reason = checker(cmd, args)
if reason:
return reason
return None
def _description(key: str) -> str:
return _BLACKLIST_DESCRIPTIONS.get(key, "operation")
def _command_key(command: str) -> str:
return command.strip().lower()
def _lower_args(args: list[str]) -> list[str]:
return [arg.lower() for arg in args]
def _contains_any(arg_list: list[str], targets: set[str]) -> bool:
return any(arg in targets for arg in arg_list)
def check_version_control(command: str, _args: list[str]) -> str | None:
"""Return a denial reason if the command is a version control tool."""
key = _command_key(command)
if key in _VERSION_CONTROL_COMMANDS:
desc = _description("version_control")
return (
f"Command '{command}' is blacklisted: {desc} commands must go through "
"Ralph's git capabilities"
)
return None
def check_privilege_escalation(command: str, _args: list[str]) -> str | None:
"""Return a denial reason if the command is a privilege escalation tool."""
key = _command_key(command)
if key in _PRIVILEGE_ESCALATION_COMMANDS:
desc = _description("privilege_escalation")
return f"Command '{command}' is blacklisted: {desc} is not allowed"
return None
def check_destructive_system(command: str, args: list[str]) -> str | None:
"""Return a denial reason if the command is a destructive system operation."""
key = _command_key(command)
args_lower = _lower_args(args)
desc = _description("destructive_system")
if _is_destructive_rm(key, args, args_lower):
return f"Command 'rm' with recursive force flag targeting root/home is blacklisted: {desc}"
if key in {"mkfs", "dd"} and any(
arg.startswith("/dev/") or "of=/dev/" in arg for arg in args_lower
):
return f"Command '{command}' targeting devices is blacklisted: {desc}"
if key in _DESTRUCTIVE_SYSTEM_COMMANDS:
return f"Command '{command}' is blacklisted: {desc} is not allowed"
if _is_init_kill(key, args_lower):
return f"Command 'kill -9 1' (init) is blacklisted: {desc} is not allowed"
return None
def _is_destructive_rm(key: str, args: list[str], args_lower: list[str]) -> bool:
return (
key == "rm"
and _contains_any(args_lower, {"-rf", "-r", "-f"})
and any(
target == "/"
or target.startswith("/.")
or target.startswith("~")
or target.startswith("/home")
for target in args
)
)
def _is_init_kill(key: str, args_lower: list[str]) -> bool:
return (
key == "kill"
and len(args_lower) >= _KILL_SIGNAL_ARG_COUNT
and args_lower[0] == "-9"
and args_lower[1] == "1"
)
def check_network_exfiltration(command: str, args: list[str]) -> str | None:
"""Return a denial reason if the command could exfiltrate data over the network."""
key = _command_key(command)
args_lower = _lower_args(args)
if key in {"curl", "wget"}:
if any(_is_external_url(arg) for arg in args):
desc = _description("network_exfiltration")
return (
f"Command '{command}' to external URLs is blacklisted: {desc} risk. "
"Use Ralph's HTTP capabilities instead."
)
return None
if key in _NETWORK_TUNNEL_COMMANDS:
desc = _description("network_exfiltration")
return f"Command '{command}' is blacklisted: {desc} is not allowed"
if key in _REMOTE_NETWORK_COMMANDS:
joined = " ".join(args_lower)
if "@" in joined or ":/" in joined or "::" in joined:
desc = _description("network_exfiltration")
return f"Command '{command}' to remote hosts is blacklisted: {desc} is not allowed"
return None
def _is_external_url(arg: str) -> bool:
token = arg.strip()
if not token or token.startswith("-"):
return False
lower = token.lower()
if "localhost" in lower or "127.0.0.1" in lower:
return False
return lower.startswith("http://") or lower.startswith("https://") or "://" in lower
def check_package_manager(command: str, args: list[str]) -> str | None:
"""Return a denial reason if the command invokes a package manager install."""
key = _command_key(command)
args_lower = _lower_args(args)
desc = _description("package_manager")
if key in _PACKAGE_MANAGERS and any(
flag in args_lower for flag in ("install", "update", "upgrade", "remove", "-s", "--sync")
):
return (
f"Command '{command}' with install/update is blacklisted: {desc} "
"operations require Ralph's approval"
)
if (
key in {"pip", "pip3"}
and "install" in args_lower
and any(flag in args_lower for flag in ("--user", "-g", "--global"))
):
return (
f"Command '{key} install --user/-g' is blacklisted: {desc} operations "
"require Ralph's approval"
)
if key == "npm" and "install" in args_lower and "-g" in args_lower:
return (
f"Command 'npm install -g' is blacklisted: {desc} operations require Ralph's approval"
)
if key == "cargo" and args_lower and args_lower[0] == "install":
return f"Command 'cargo install' is blacklisted: {desc} operations require Ralph's approval"
if key == "gem" and "install" in args_lower and "--user-install" not in args_lower:
return (
"Command 'gem install' (global) is blacklisted: "
f"{desc} operations require Ralph's approval"
)
return None
def check_container_escape(command: str, _args: list[str]) -> str | None:
"""Return a denial reason if the command could escape container isolation."""
key = _command_key(command)
if key in _CONTAINER_COMMANDS:
desc = _description("container_escape")
return f"Command '{command}' is blacklisted: {desc} is not allowed"
return None
def check_multi_file_operation(command: str, args: list[str]) -> str | None:
"""Return a denial reason if the command performs bulk file operations."""
key = _command_key(command)
args_lower = _lower_args(args)
desc = _description("multi_file_operation")
checks = (
(
key == "find" and any(flag in args_lower for flag in ("-exec", "-delete")),
"Command 'find' with -exec/-delete is blacklisted: "
f"{desc} must go through Ralph's workspace write",
),
(
key == "xargs"
and any(flag in args_lower for flag in ("rm", "mv", "cp", "chmod", "chown")),
"Command 'xargs' with destructive commands is blacklisted: "
f"{desc} must go through Ralph's workspace write",
),
(
key == "sed" and "-i" in args_lower,
f"Command 'sed -i' is blacklisted: {desc} must go through Ralph's workspace write",
),
(
key == "awk" and ("-i" in args_lower or "-inplace" in args_lower),
f"Command 'awk -i' is blacklisted: {desc} must go through Ralph's workspace write",
),
(
key in {"rename", "mmv"},
f"Command '{command}' is blacklisted: {desc} must go through Ralph's workspace write",
),
(
key in {"chmod", "chown"} and any(flag in args_lower for flag in ("-r", "-R")),
"Command '"
f"{command} -R' is blacklisted: {desc} must go through Ralph's workspace write",
),
(
_has_recursive_glob_copy(key, args, args_lower),
"Command '"
f"{command}' with recursive glob is blacklisted: "
f"{desc} must go through Ralph's workspace write",
),
(
_extracts_archive_in_place(key, args_lower),
"Command '"
f"{command}' extracting archives in-place is blacklisted: "
f"{desc} must go through Ralph's workspace write",
),
)
for applies, message in checks:
if applies:
return message
return None
def _has_recursive_glob_copy(key: str, args: list[str], args_lower: list[str]) -> bool:
if key not in {"cp", "mv"}:
return False
has_glob = any("*" in arg or "?" in arg for arg in args)
has_recursive = any(flag in args_lower for flag in ("-r", "-rf", "-R", "-f"))
return has_glob and has_recursive
def _extracts_archive_in_place(key: str, args_lower: list[str]) -> bool:
if key not in {"tar", "zip", "unzip"}:
return False
has_extract_flag = any(
any(flag in arg for flag in _ARCHIVE_EXTRACT_FLAGS) for arg in args_lower
)
has_archive = any(arg.endswith(ext) for arg in args_lower for ext in _ARCHIVE_EXTENSIONS)
return has_extract_flag and has_archive
[docs]
def apply_exec_policy(command: str, args: list[str]) -> None:
"""Apply command policy and raise if the command is denied."""
reason = check_command(command, args)
if reason is None:
return
raise CapabilityDeniedError(f"Command '{command}' denied by policy: {reason}")
def _workspace_root(workspace: object, *, cwd_provider: CwdProvider = Path.cwd) -> Path:
if isinstance(workspace, WorkspaceWithRoot):
return workspace.root
root_value = cast("Path | str | None", getattr(workspace, "root", None))
if isinstance(root_value, Path):
return root_value
if isinstance(root_value, str):
return Path(root_value)
return cwd_provider()
[docs]
def run_command(
command: str,
args: list[str],
workspace: object,
timeout_ms: int,
deps: ExecRunDeps | None = None,
) -> _CompletedProcessAdapter:
"""Execute a subprocess in the workspace root."""
resolved_deps = deps or ExecRunDeps()
cwd_provider = resolved_deps.cwd_provider or Path.cwd
command_runner = resolved_deps.runner or _run_subprocess
cwd = _workspace_root(workspace, cwd_provider=cwd_provider)
timeout_seconds = timeout_ms / 1000 if timeout_ms > 0 else None
try:
return command_runner([command, *args], cwd, timeout_seconds)
except FileNotFoundError as exc:
raise ExecutionError(f"Failed to execute '{command}': {exc}") from exc
except PermissionError as exc:
raise ExecutionError(f"Failed to execute '{command}': {exc}") from exc
except subprocess.TimeoutExpired as exc:
raise ExecutionError(
f"Failed to execute '{command}': timed out after {timeout_ms}ms"
) from exc
except OSError as exc:
raise ExecutionError(f"Failed to execute '{command}': {exc}") from exc
def _run_subprocess(
command: list[str], cwd: Path, timeout_seconds: float | None
) -> _CompletedProcessAdapter:
handle = get_process_manager().spawn(
command,
SpawnOptions(
cwd=str(cwd),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
label=f"mcp-exec:{command[0]}",
),
)
try:
stdout, stderr = handle.communicate(timeout=timeout_seconds)
except subprocess.TimeoutExpired:
handle.terminate(grace_period_s=0)
raise
return _CompletedProcessAdapter(
stdout=stdout or b"",
stderr=stderr or b"",
returncode=handle.returncode or 0,
)
[docs]
def handle_exec_command(
session: CoordinationSessionLike,
workspace: object,
params: Mapping[str, object],
) -> ToolResult:
"""Execute a bounded subprocess in the workspace root."""
require_capability(session, PROCESS_EXEC_BOUNDED_CAPABILITY, "Command execution")
parsed = parse_exec_params(params)
apply_exec_policy(parsed.command, parsed.args)
output = run_command(parsed.command, parsed.args, workspace, parsed.timeout_ms)
return ToolResult(
content=[
ToolContent.text_content(
format_exec_result(parsed.command, parsed.args, output, parsed.timeout_ms)
)
],
is_error=output.returncode != 0,
)
__all__ = [
"DEFAULT_TIMEOUT_MS",
"PROCESS_EXEC_BOUNDED_CAPABILITY",
"ExecParams",
"ExecRunDeps",
"ExecutionError",
"WorkspaceWithRoot",
"apply_exec_policy",
"check_command",
"format_exec_result",
"handle_exec_command",
"parse_exec_params",
"run_command",
]