Source code for ralph.mcp.upstream.client

"""HTTP and stdio clients for proxying calls to upstream MCP servers.

Provides ``HttpUpstreamClient`` and ``StdioUpstreamClient``, both implementing
``UpstreamMcpClient``. ``make_upstream_client`` selects the right implementation
from the server's transport field. Internal helpers handle JSON-RPC framing,
legacy SSE endpoints, and multimodal content-block normalization.

Multimodal normalization is done at the registry level via
``normalize_upstream_content_blocks()``, not inside individual clients.
"""

from __future__ import annotations

import base64
import json
from collections.abc import Callable, Mapping
from typing import TYPE_CHECKING, cast

import httpx

from ralph.mcp.multimodal.artifacts import infer_modality_and_mime
from ralph.mcp.multimodal.resources import MediaEntryExtras, MediaSource, build_media_identity
from ralph.mcp.protocol.startup import (
    initialize_request,
    initialized_notification,
    legacy_sse_jsonrpc_exchange,
    looks_like_legacy_sse_endpoint,
)
from ralph.mcp.upstream._has_media_manifest import HasMediaManifest
from ralph.mcp.upstream._stdio_upstream_client import StdioUpstreamClient
from ralph.mcp.upstream._upstream_mcp_client import UpstreamMcpClient
from ralph.mcp.upstream.models import UpstreamCallError, UpstreamTool

if TYPE_CHECKING:
    from ralph.mcp.upstream.config import UpstreamMcpServer

JsonObject = dict[str, object]
JsonRpcCaller = Callable[[str, JsonObject], JsonObject]


[docs] class HttpUpstreamClient: """Upstream MCP client that communicates over HTTP JSON-RPC.""" def __init__( self, server: UpstreamMcpServer, *, caller: JsonRpcCaller | None = None, ) -> None: self._server = server self._caller: JsonRpcCaller = ( caller if caller is not None else _make_http_caller(server.url or "") ) def list_tools(self) -> list[UpstreamTool]: try: result = self._caller("tools/list", {}) except UpstreamCallError: raise except Exception as exc: raise UpstreamCallError( f"upstream server '{self._server.name}' tools/list failed: {exc}" ) from exc return _parse_tools(result) def call_tool(self, name: str, arguments: JsonObject) -> object: try: result = self._caller("tools/call", {"name": name, "arguments": arguments}) except UpstreamCallError: raise except Exception as exc: raise UpstreamCallError( f"upstream server '{self._server.name}' tool '{name}' failed: {exc}" ) from exc return result
[docs] def make_upstream_client( server: UpstreamMcpServer, *, caller: JsonRpcCaller | None = None, ) -> HttpUpstreamClient | StdioUpstreamClient: """Instantiate the appropriate upstream client for the server's transport.""" if server.transport == "http": return HttpUpstreamClient(server, caller=caller) return StdioUpstreamClient(server, caller=caller)
def _parse_tools(result: JsonObject) -> list[UpstreamTool]: raw_tools = result.get("tools") if not isinstance(raw_tools, list): return [] tools: list[UpstreamTool] = [] for item in raw_tools: if not isinstance(item, Mapping): continue item_map = cast("Mapping[str, object]", item) name = item_map.get("name") if not isinstance(name, str) or not name: continue description_raw = item_map.get("description") description = str(description_raw) if description_raw is not None else "" schema_raw = item_map.get("inputSchema") or item_map.get("input_schema") if isinstance(schema_raw, Mapping): input_schema: dict[str, object] = dict(cast("Mapping[str, object]", schema_raw)) else: input_schema = {} tools.append(UpstreamTool(name=name, description=description, input_schema=input_schema)) return tools def _json_rpc_result(raw: object, context: str) -> JsonObject: if not isinstance(raw, Mapping): raise UpstreamCallError(f"unexpected response type from {context}") raw_map = cast("Mapping[str, object]", raw) err = raw_map.get("error") if err is not None: raise UpstreamCallError(f"JSON-RPC error from {context}: {err}") result = raw_map.get("result") if isinstance(result, Mapping): return dict(cast("Mapping[str, object]", result)) return {} _UPSTREAM_MEDIA_BLOCK_TYPES: frozenset[str] = frozenset( {"image", "audio", "video", "pdf", "document"} ) _MAX_EXTENSION_LEN = 6 _DEFAULT_MIME_BY_BLOCK_TYPE: dict[str, str] = { "image": "image/png", "audio": "audio/mpeg", "video": "video/mp4", "pdf": "application/pdf", "document": "application/octet-stream", } def _modality_from_mime(mime_type: str) -> str | None: """Return modality for a MIME type by prefix, or None if unrecognized.""" if mime_type.startswith("image/"): return "image" if mime_type.startswith("audio/"): return "audio" if mime_type.startswith("video/"): return "video" if mime_type == "application/pdf": return "pdf" if mime_type in ( "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "application/vnd.openxmlformats-officedocument.presentationml.presentation", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", ): return "document" return None def _extract_mime(block: Mapping[str, object], block_type: str) -> str: """Extract MIME type from an upstream block, falling back by block_type.""" mime = block.get("mimeType") if isinstance(mime, str) and mime: return mime source = block.get("source") if isinstance(source, Mapping): media_type = source.get("media_type") if isinstance(media_type, str) and media_type: return media_type upstream_uri = _extract_uri(block) if upstream_uri: stem = upstream_uri.split("?")[0].split("#")[0] if "." in stem: ext = "." + stem.rsplit(".", 1)[-1] if len(ext) <= _MAX_EXTENSION_LEN: inferred = infer_modality_and_mime(ext) if inferred: return inferred[1] return _DEFAULT_MIME_BY_BLOCK_TYPE.get(block_type, "application/octet-stream") def _extract_uri(block: Mapping[str, object]) -> str | None: """Extract URI from URI-backed upstream block, or None if not URI-backed.""" uri = block.get("uri") if isinstance(uri, str) and uri: return uri source = block.get("source") if isinstance(source, Mapping): src_uri = source.get("uri") if isinstance(src_uri, str) and src_uri: return src_uri return None def _extract_data(block: Mapping[str, object]) -> bytes | None: """Extract raw bytes from embedded-data upstream block, or None if not embedded.""" data = block.get("data") if isinstance(data, str) and data: try: return base64.b64decode(data) except Exception: return None source = block.get("source") if isinstance(source, Mapping): src_data = source.get("data") if isinstance(src_data, str) and src_data: try: return base64.b64decode(src_data) except Exception: return None return None def _get_block_title(block: Mapping[str, object], tool_name: str, idx: int, block_type: str) -> str: """Get display title for a normalized block.""" title = block.get("title") or block.get("name") if isinstance(title, str) and title: return title return f"{tool_name}_{block_type}_{idx}" def _normalize_media_block( block: Mapping[str, object], block_type: str, idx: int, server_name: str, tool_name: str, _session: HasMediaManifest | None, ) -> dict[str, object]: """Normalize an upstream media block into a resource_reference content block. Handles image/audio/video/pdf/document blocks with either URI-backed or embedded-data shapes: - Embedded-data blocks (with 'data'/'source.data'): bytes are stored in the session manifest as a Ralph-owned ralph://media/... artifact. Delivery is 'resource_reference_replay' — the agent can call read_media with the returned URI to retrieve the artifact. - URI-backed blocks (with 'uri'/'source.uri'): the original upstream URI is preserved as-is. Delivery is 'resource_reference' — the URI points to an external resource, not a Ralph-owned artifact. """ mime_type = _extract_mime(block, block_type) derived_modality = _modality_from_mime(mime_type) if derived_modality is not None and derived_modality != block_type: raise UpstreamCallError( f"upstream server '{server_name}' tool '{tool_name}' returned " f"content block (type='{block_type}') with inconsistent MIME type '{mime_type}' " f"(derived modality '{derived_modality}' != declared type '{block_type}') " f"at index {idx}." ) title = _get_block_title(block, tool_name, idx, block_type) upstream_uri = _extract_uri(block) raw_bytes = _extract_data(block) if raw_bytes is not None: if _session is None: raise UpstreamCallError( f"upstream server '{server_name}' tool '{tool_name}' returned " f"embedded {block_type} content block at index {idx} " f"but no active session is available to store the artifact bytes. " f"Embedded media requires an active session manifest." ) entry = _session.media_manifest.add( title=title, mime_type=mime_type, modality=block_type, raw_bytes=raw_bytes, extras=MediaEntryExtras( identity_key=build_media_identity( modality=block_type, mime_type=mime_type, title=title, source=MediaSource(raw_bytes=raw_bytes), ), ), ) uri = entry.uri delivery = "resource_reference_replay" elif upstream_uri is not None: uri = upstream_uri delivery = "resource_reference" else: raise UpstreamCallError( f"upstream server '{server_name}' tool '{tool_name}' returned " f"content block (type='{block_type}') at index {idx} with neither " f"'uri'/'source.uri' nor 'data'/'source.data' — cannot normalize." ) return { "type": "resource_reference", "uri": uri, "mimeType": mime_type, "title": title, "modality": block_type, "delivery": delivery, } def _get_content_list(result: JsonObject) -> list[object] | None: """Extract content list from result, returning None if not a valid list of blocks.""" content = result.get("content") if not isinstance(content, list): return None return list(content)
[docs] def normalize_upstream_content_blocks( result: JsonObject, server_name: str, tool_name: str, session: HasMediaManifest | None = None, ) -> None: """Normalize upstream tool result content blocks into the multimodal contract. - text blocks: pass through unchanged. - resource_reference blocks: pass through unchanged. - image/audio/video/pdf/document blocks: normalized to resource_reference. URI-backed blocks preserve the upstream URI; embedded-data blocks store bytes in the session manifest (ralph://media/... URI) when available. - Other types: raise UpstreamCallError with a clear explanation. Modifies the result dict in place. """ content_blocks = _get_content_list(result) if content_blocks is None: return normalized: list[object] = [] for idx, block in enumerate(content_blocks): if not isinstance(block, Mapping): normalized.append(block) continue block_type = block.get("type") if not isinstance(block_type, str): normalized.append(block) continue if block_type in ("text", "resource_reference"): normalized.append(block) elif block_type in _UPSTREAM_MEDIA_BLOCK_TYPES: normalized.append( _normalize_media_block(block, block_type, idx, server_name, tool_name, session) ) else: raise UpstreamCallError( f"upstream server '{server_name}' tool '{tool_name}' returned " f"unsupported content block (type='{block_type}') at index {idx}. " f"Accepted types: text, resource_reference, " f"image, audio, video, pdf, document." ) result["content"] = normalized
def _make_http_caller(url: str) -> JsonRpcCaller: def _call(method: str, params: JsonObject) -> JsonObject: payload_obj: dict[str, object] = { "jsonrpc": "2.0", "id": 2, "method": method, "params": params, } if looks_like_legacy_sse_endpoint(url): responses = legacy_sse_jsonrpc_exchange( url, (initialize_request(), initialized_notification(), payload_obj), timeout_s=30.0, ) return _json_rpc_result(responses[-1], f"'{url}'") try: response = httpx.post( url, content=json.dumps(payload_obj, separators=(",", ":")).encode(), headers={"Content-Type": "application/json"}, timeout=30.0, ) response.raise_for_status() except httpx.HTTPError as exc: raise UpstreamCallError(f"HTTP request to '{url}' failed: {exc}") from exc raw: object = json.loads(response.content) return _json_rpc_result(raw, f"'{url}'") return _call __all__ = [ "HasMediaManifest", "HttpUpstreamClient", "JsonObject", "JsonRpcCaller", "StdioUpstreamClient", "UpstreamCallError", "UpstreamMcpClient", "make_upstream_client", "normalize_upstream_content_blocks", ]