Source code for ralph.mcp.server.factory_impl

"""Concrete MCP server factory that allocates dynamically bound localhost endpoints.

``DynamicBindingMcpServerFactory`` is the production implementation of
``McpServerFactory``. It reserves a unique TCP port per worker session, starts an
MCP server subprocess via ``lifecycle.start_mcp_server``, and returns a
``McpServerHandle`` that callers can use to reach the server or shut it down.
"""

from __future__ import annotations

from dataclasses import replace
from threading import Lock
from typing import TYPE_CHECKING, Protocol

if TYPE_CHECKING:
    from collections.abc import Callable

    from ralph.mcp.protocol._session_bridge_like import SessionBridgeLike
from ralph.mcp.protocol.session import AgentSession
from ralph.mcp.server import lifecycle
from ralph.mcp.server._bridge_with_process import _BridgeWithProcess
from ralph.mcp.server.factory import McpServerFactory, McpServerHandle

if TYPE_CHECKING:
    from ralph.mcp.protocol.startup import WorkspaceLike

    class StartServer(Protocol):
        """Callable signature for the MCP server start function."""

        def __call__(
            self,
            session: AgentSession,
            workspace: WorkspaceLike,
            *,
            deps: lifecycle.LifecycleDeps | None = None,
        ) -> SessionBridgeLike: ...


[docs] class DynamicBindingMcpServerFactory(McpServerFactory): """Build MCP server handles with dynamically allocated localhost endpoints.""" def __init__( self, workspace: WorkspaceLike, *, reserve_port: Callable[[], int] | None = None, start_server: StartServer = lifecycle.start_mcp_server, lifecycle_deps: lifecycle.LifecycleDeps | None = None, ) -> None: self.workspace = workspace self._start_server = start_server self._base_deps = lifecycle_deps or lifecycle._default_lifecycle_deps() self._reserve_port = reserve_port or self._base_deps.reserve_port self._allocated_endpoints: set[str] = set() self._allocation_lock = Lock() def build(self, session: object) -> McpServerHandle: agent_session = self._coerce_session(session) bridge = self._start_server( agent_session, self.workspace, deps=replace(self._base_deps, reserve_port=self._reserve_unique_port), ) pid = self._bridge_pid(bridge) return McpServerHandle( endpoint=bridge.agent_endpoint_uri(), pid=pid, shutdown=bridge.shutdown, ) def _reserve_unique_port(self) -> int: while True: port = self._reserve_port() endpoint = f"http://127.0.0.1:{port}/mcp" with self._allocation_lock: if endpoint in self._allocated_endpoints: continue self._allocated_endpoints.add(endpoint) return port @staticmethod def _bridge_pid(bridge: SessionBridgeLike) -> int: if not isinstance(bridge, _BridgeWithProcess): msg = "MCP server bridge must expose process.pid" raise TypeError(msg) return bridge.process.pid @staticmethod def _coerce_session(session: object) -> AgentSession: if isinstance(session, AgentSession): return session msg = "DynamicBindingMcpServerFactory.build requires an AgentSession" raise TypeError(msg)
__all__ = ["DynamicBindingMcpServerFactory"]