"""MCP tool call coordination handlers.
Ports the Rust coordination handlers that support progress reporting,
completion declaration, workspace coordination, and environment reads.
"""
from __future__ import annotations
import contextlib
import json
import os
import time
from pathlib import Path
from typing import TYPE_CHECKING
from ralph.mcp.artifacts.policy_outcomes import is_policy_approved
from ralph.mcp.multimodal import ImageContent
from .capability_denied_error import CapabilityDeniedError
from .coordination_session_like import CoordinationSessionLike
from .invalid_params_error import InvalidParamsError
from .tool_content import ToolContent
from .tool_error import ToolError
from .tool_result import ContentBlock, ToolResult
from .workspace_like import WorkspaceLike
if TYPE_CHECKING:
from collections.abc import Callable
RUN_REPORT_PROGRESS_CAPABILITY = "run.report_progress"
ARTIFACT_SUBMIT_CAPABILITY = "artifact.submit"
ENV_READ_CAPABILITY = "env.read"
_COMPLETION_SENTINEL_RELPATHFMT = ".agent/completion_seen_{run_id}.json"
def _timestamp() -> int:
"""Return the current UNIX timestamp in seconds."""
return int(time.time())
def _parameter_as_string(params: dict[str, object], name: str) -> str:
value = params.get(name)
if not isinstance(value, str):
raise InvalidParamsError(f"Missing '{name}' parameter")
return value
def _is_approved(outcome: object) -> bool:
return is_policy_approved(outcome)
def _serialize_payload(payload: object) -> str:
"""Serialize coordination payloads with JSON, fallback to str()."""
try:
return json.dumps(payload, ensure_ascii=False)
except TypeError:
return str(payload)
except ValueError:
return str(payload)
def _write_completion_sentinel(
workspace: WorkspaceLike | None,
run_id: str,
*,
_write_fn: Callable[[str, str], None] | None = None,
) -> None:
"""Write a run-scoped completion sentinel as best-effort evidence."""
if workspace is None:
return
sentinel_relpath = _COMPLETION_SENTINEL_RELPATHFMT.format(run_id=run_id)
sentinel_abspath = workspace.absolute_path(sentinel_relpath)
sentinel_payload: dict[str, str] = {"run_id": run_id}
payload = json.dumps(sentinel_payload, ensure_ascii=False)
if _write_fn is not None:
_write_fn(sentinel_abspath, payload)
return
Path(sentinel_abspath).write_text(payload, encoding="utf-8")
[docs]
def format_progress_text(status: str, note: str, timestamp: int) -> str:
"""Build the progress report response text."""
return (
f"Progress reported: status='{status}', note='{note}', timestamp={timestamp}\n"
"[Progress event emitted to pipeline]"
)
[docs]
def require_capability(session: CoordinationSessionLike, capability: str, action: str) -> None:
"""Require a capability, raising a capability-denied error when missing."""
outcome = session.check_capability(capability)
if _is_approved(outcome):
return
raise CapabilityDeniedError(f"{action} requires capability '{capability}': {outcome!r}")
[docs]
def handle_report_progress(
session: CoordinationSessionLike,
_workspace: WorkspaceLike,
params: dict[str, object],
*,
now_fn: Callable[[], int] = _timestamp,
) -> ToolResult:
"""Report agent progress to the Ralph pipeline."""
require_capability(session, RUN_REPORT_PROGRESS_CAPABILITY, "Progress reporting")
status = _parameter_as_string(params, "status")
note_value = params.get("note", "")
note = note_value if isinstance(note_value, str) else ""
return ToolResult(
content=[ToolContent.text_content(format_progress_text(status, note, now_fn()))],
is_error=False,
)
[docs]
def handle_declare_complete(
session: CoordinationSessionLike,
workspace: WorkspaceLike,
params: dict[str, object],
*,
now_fn: Callable[[], int] = _timestamp,
) -> ToolResult:
"""Declare that the agent has completed its assigned task."""
require_capability(session, ARTIFACT_SUBMIT_CAPABILITY, "Task completion")
summary_value = params.get("summary", "No summary provided")
summary = summary_value if isinstance(summary_value, str) else "No summary provided"
with contextlib.suppress(OSError):
_write_completion_sentinel(workspace, session.run_id)
message = (
"Task declared complete: "
f"session_id={session.session_id}, summary='{summary}', timestamp={now_fn()}\n"
"[Completion event emitted to pipeline]"
)
return ToolResult(content=[ToolContent.text_content(message)], is_error=False)
[docs]
def format_coordination_text(
action: str,
session_id: str,
timestamp: int,
work_unit_id: str | None,
payload: object | None,
) -> str:
"""Format the coordination response text."""
message = (
f"Coordination action '{action}' processed: session_id={session_id}, timestamp={timestamp}"
)
if work_unit_id is not None:
message = f"{message}, work_unit_id={work_unit_id}"
if payload is not None:
message = f"{message}, payload={_serialize_payload(payload)}"
return f"{message}\n[Coordination event emitted to pipeline]"
[docs]
def handle_coordinate(
session: CoordinationSessionLike,
_workspace: WorkspaceLike,
params: dict[str, object],
*,
now_fn: Callable[[], int] = _timestamp,
) -> ToolResult:
"""Coordinate parallel worker activities."""
require_capability(session, ARTIFACT_SUBMIT_CAPABILITY, "Workspace coordination")
action = _parameter_as_string(params, "action")
work_unit_value = params.get("work_unit_id")
work_unit_id = work_unit_value if isinstance(work_unit_value, str) else None
payload = params.get("payload")
message = format_coordination_text(
action=action,
session_id=session.session_id,
timestamp=now_fn(),
work_unit_id=work_unit_id,
payload=payload,
)
return ToolResult(content=[ToolContent.text_content(message)], is_error=False)
[docs]
def handle_read_env(
session: CoordinationSessionLike,
_workspace: WorkspaceLike,
params: dict[str, object],
*,
env: dict[str, str] | os._Environ[str] = os.environ,
) -> ToolResult:
"""Read an environment variable by name."""
require_capability(session, ENV_READ_CAPABILITY, "Environment variable read")
name = _parameter_as_string(params, "name")
value = read_env_value(env, name)
return ToolResult(
content=[ToolContent.text_content(f"{name}={value}")],
is_error=False,
)
def read_env_value(env: dict[str, str] | os._Environ[str], name: str) -> str:
"""Return the value of an environment variable, or '[not found]' if absent."""
return env.get(name, "[not found]")
__all__ = [
"ARTIFACT_SUBMIT_CAPABILITY",
"ENV_READ_CAPABILITY",
"RUN_REPORT_PROGRESS_CAPABILITY",
"CapabilityDeniedError",
"ContentBlock",
"CoordinationSessionLike",
"ImageContent",
"InvalidParamsError",
"ToolContent",
"ToolError",
"ToolResult",
"WorkspaceLike",
"format_coordination_text",
"format_progress_text",
"handle_coordinate",
"handle_declare_complete",
"handle_read_env",
"handle_report_progress",
"require_capability",
]