"""MCP bridge.
Bridges Ralph's phase system with MCP (Model Context Protocol) clients.
Exposes tools for agent interactions, artifact submission, and state queries.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Protocol, cast
from loguru import logger
from ralph.mcp.artifacts._bridge_artifact_deps import BridgeArtifactDeps
from ralph.mcp.artifacts._bridge_config import BridgeConfig
from ralph.mcp.artifacts._bridge_error import BridgeError
from ralph.mcp.artifacts._mcp_tool import MCPTool
from ralph.mcp.artifacts.store import (
ArtifactExistsError,
ArtifactNotFoundError,
ArtifactSubmitOptions,
get_artifact,
list_artifacts,
submit_artifact,
)
from ralph.mcp.protocol.transport import MCPMessage, StdioTransport
if TYPE_CHECKING:
class _ToolHandler(Protocol):
"""Protocol for MCP tool handler callables."""
def __call__(self, *args: object, **kwargs: object) -> dict[str, object]: ...
class _MethodDispatcher(Protocol):
"""Protocol for MCP method dispatcher callables."""
def __call__(self, message: MCPMessage, /) -> MCPMessage: ...
[docs]
class MCPBridge:
"""MCP bridge for Ralph.
Bridges the phase system with MCP by exposing tools to agents,
managing artifact lifecycle, and handling MCP protocol messages.
"""
def __init__(self, config: BridgeConfig) -> None:
"""Initialize MCP bridge.
Args:
config: Bridge configuration.
"""
self._config = config
self._tools: dict[str, MCPTool] = {}
self._transport = config.transport or StdioTransport(["echo", "noop"])
self._running = False
[docs]
def submit_artifact_mcp(
self,
name: str,
artifact_type: str,
content: dict[str, object],
metadata: dict[str, object] | None = None,
) -> dict[str, object]:
"""Submit an artifact via MCP.
Args:
name: Artifact name.
artifact_type: Type of artifact.
content: Artifact content.
metadata: Optional metadata.
Returns:
Artifact submission result.
"""
try:
artifact = submit_artifact(
self._config.artifact_dir,
name,
artifact_type,
content,
ArtifactSubmitOptions(
metadata=metadata,
persistence=self._config.artifact_deps.persistence,
),
)
return {"success": True, "artifact": artifact.to_dict()}
except ArtifactExistsError as exc:
return {"success": False, "error": str(exc)}
[docs]
def get_artifact_mcp(self, name: str) -> dict[str, object]:
"""Get an artifact via MCP.
Args:
name: Artifact name.
Returns:
Artifact data.
"""
try:
artifact = get_artifact(
self._config.artifact_dir,
name,
backend=self._config.artifact_deps.backend,
)
return {"success": True, "artifact": artifact.to_dict()}
except ArtifactNotFoundError as exc:
return {"success": False, "error": str(exc)}
[docs]
def list_artifacts_mcp(self) -> dict[str, object]:
"""List all artifacts via MCP.
Returns:
List of artifacts.
"""
artifacts = list_artifacts(
self._config.artifact_dir,
backend=self._config.artifact_deps.backend,
)
return {
"success": True,
"artifacts": [a.to_dict() for a in artifacts],
}
[docs]
async def handle_message(self, message: MCPMessage) -> MCPMessage | None:
"""Handle an incoming MCP message.
Args:
message: The MCP message to process.
Returns:
Optional response message.
"""
handler = self._method_dispatchers.get(message.method)
if handler is not None:
return handler(message)
logger.warning("Unknown MCP method: {}", message.method)
return MCPMessage(
method=message.method,
error={"code": -32601, "message": f"Method not found: {message.method}"},
msg_id=message.msg_id,
)
def _dispatch_tools_list(self, message: MCPMessage) -> MCPMessage:
"""Handle tools/list method."""
tools = [
{
"name": t.name,
"description": t.description,
"inputSchema": t.input_schema,
}
for t in self._tools.values()
]
return MCPMessage(
method="tools/list",
params={"tools": tools},
msg_id=message.msg_id,
)
def _dispatch_tools_call(self, message: MCPMessage) -> MCPMessage:
"""Handle tools/call method."""
if not message.params:
return MCPMessage(
method="tools/call",
error={"code": -32600, "message": "Invalid request"},
msg_id=message.msg_id,
)
tool_name = cast("str", message.params.get("name", "")) or ""
arguments = cast("dict[str, object]", message.params.get("arguments", {}))
result = self.tool_called(tool_name, arguments)
return MCPMessage(
method="tools/call",
params={"content": [result]},
msg_id=message.msg_id,
)
def _dispatch_artifacts_submit(self, message: MCPMessage) -> MCPMessage:
"""Handle artifacts/submit method."""
if not message.params:
return MCPMessage(
method="artifacts/submit",
error={"code": -32600, "message": "Invalid request"},
msg_id=message.msg_id,
)
result = self.submit_artifact_mcp(
name=cast("str", message.params.get("name", "")),
artifact_type=cast("str", message.params.get("type", "unknown")),
content=cast("dict[str, object]", message.params.get("content", {})),
metadata=cast("dict[str, object] | None", message.params.get("metadata")),
)
return MCPMessage(
method="artifacts/submit",
params=result,
msg_id=message.msg_id,
)
def _dispatch_artifacts_get(self, message: MCPMessage) -> MCPMessage:
"""Handle artifacts/get method."""
name = cast("str", message.params.get("name", "")) if message.params else ""
result = self.get_artifact_mcp(name)
return MCPMessage(
method="artifacts/get",
params=result,
msg_id=message.msg_id,
)
def _dispatch_artifacts_list(self, message: MCPMessage) -> MCPMessage:
"""Handle artifacts/list method."""
result = self.list_artifacts_mcp()
return MCPMessage(
method="artifacts/list",
params=result,
msg_id=message.msg_id,
)
@property
def _method_dispatchers(self) -> dict[str, _MethodDispatcher]:
"""Return method dispatchers mapping."""
return {
"tools/list": self._dispatch_tools_list,
"tools/call": self._dispatch_tools_call,
"artifacts/submit": self._dispatch_artifacts_submit,
"artifacts/get": self._dispatch_artifacts_get,
"artifacts/list": self._dispatch_artifacts_list,
}
[docs]
def start(self) -> None:
"""Start the MCP bridge."""
if isinstance(self._transport, StdioTransport):
self._transport.start()
self._running = True
logger.info("MCP bridge started")
[docs]
async def run(self) -> None:
"""Run the bridge message loop."""
self.start()
async for message in self._transport.recv():
response = await self.handle_message(message)
if response:
await self._transport.send(response)
[docs]
async def close(self) -> None:
"""Close the bridge and transport."""
self._running = False
await self._transport.close()
logger.info("MCP bridge closed")
__all__ = ["BridgeArtifactDeps", "BridgeConfig", "BridgeError", "MCPBridge"]