Source code for ralph.process.mcp_supervisor
"""Active MCP server supervision during agent execution.
The McpSupervisor runs a background thread that polls bridge health on a
fixed interval while an agent attempt is executing. When the MCP server
crashes, the supervisor restarts it on the stable endpoint so the agent
can continue. If the restart budget is exhausted, the error is stored and
re-raised when the context manager exits.
"""
from __future__ import annotations
import threading
from datetime import timedelta
from typing import TYPE_CHECKING
from loguru import logger
from ralph.mcp.server.lifecycle import McpServerError, RestartAwareMcpBridge
if TYPE_CHECKING:
from collections.abc import Callable
DEFAULT_INTERVAL = timedelta(seconds=2)
[docs]
class McpSupervisor:
"""Background-thread supervisor for an active MCP server bridge.
Usage::
with McpSupervisor(bridge, on_restart=subscriber.record_mcp_restart):
output = invoke_agent(...)
stream_output(output)
The supervisor polls ``check_mcp_bridge_health(bridge)`` every
``check_interval`` seconds. Restarts are recorded via the optional
``on_restart`` callback. If the restart budget is exhausted, the stored
:class:`~ralph.mcp.server.lifecycle.McpServerError` is re-raised when
the context manager exits — taking priority over any agent-level error.
"""
def __init__(
self,
bridge: RestartAwareMcpBridge,
*,
check_interval: timedelta = DEFAULT_INTERVAL,
on_restart: Callable[[int], None] | None = None,
on_error: Callable[[McpServerError], None] | None = None,
) -> None:
self._bridge = bridge
self._check_interval = check_interval
self._on_restart = on_restart
self._on_error = on_error
self._mcp_error: McpServerError | None = None
self._done = threading.Event()
self._thread = threading.Thread(target=self._run, daemon=True, name="mcp-supervisor")
def _do_check_once(self) -> bool:
"""Execute one health check. Returns True if a restart occurred.
Raises :class:`~ralph.mcp.server.lifecycle.McpServerError` if the
restart budget is exhausted. The error is also stored in ``_mcp_error``
and passed to the ``on_error`` callback before being raised.
"""
try:
restarted = self._bridge.check_health_and_restart_if_needed()
if restarted and self._on_restart is not None:
self._on_restart(self._bridge.restart_count)
return restarted
except McpServerError as exc:
self._mcp_error = exc
if self._on_error is not None:
self._on_error(exc)
logger.error(
"MCP server restart budget exhausted during active agent run; restart_count={}: {}",
exc.restart_count,
exc,
)
raise
def __enter__(self) -> McpSupervisor:
self._thread.start()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: object,
) -> None:
self._done.set()
self._thread.join(timeout=self._check_interval.total_seconds() * 2 + 1.0)
if self._mcp_error is not None:
raise self._mcp_error
def _run(self) -> None:
interval_s = self._check_interval.total_seconds()
while not self._done.wait(timeout=interval_s):
try:
self._do_check_once()
except McpServerError:
return
__all__ = ["DEFAULT_INTERVAL", "McpSupervisor"]