Source code for ralph.mcp.tools.workspace._media_handlers

"""Public media handler functions: read_media, read_image, persist_upstream_media."""

from __future__ import annotations

from pathlib import PurePosixPath
from typing import TYPE_CHECKING

from ralph.mcp.multimodal.resources import (
    MediaSource,
    build_media_identity,
    parse_media_uri,
)
from ralph.mcp.tools.coordination import (
    CoordinationSessionLike,
    ToolContent,
    ToolResult,
    require_capability,
)
from ralph.mcp.tools.workspace._media_blocks import (
    _handle_replay_uri,
    _handle_workspace_media,
)
from ralph.mcp.tools.workspace._media_io import (
    _persist_media_session_entry,
    _write_durable_media_cache,
)
from ralph.mcp.tools.workspace._media_session import (
    _get_media_manifest,
    _get_session_capability_profile,
    _workspace_artifact_loader,
)
from ralph.mcp.tools.workspace._utils import (
    _SUPPORTED_IMAGE_MIME_TYPES,
    MEDIA_READ_CAPABILITY,
    infer_image_mime_type,
    normalize_relative_path,
    required_string_param,
)

if TYPE_CHECKING:
    from ralph.workspace import Workspace


[docs] def handle_read_media( session: CoordinationSessionLike, workspace: Workspace, params: dict[str, object], *, max_inline_bytes: int = 5_242_880, ) -> ToolResult: """Read a media file or replay a stored artifact handle. Accepts either: - a workspace file path (e.g., ``screenshots/shot.png``) - a ``ralph://media/{artifact_id}`` replay handle from a prior session When given a replay handle, rehydrates the artifact from the live session manifest and returns the same typed block that was originally emitted. Invalid or unrecognised handles return an explicit structured failure. For workspace paths, delivery mode is determined by the session's model identity via the capability matrix: INLINE_IMAGE, TYPED_BLOCK, RESOURCE_REFERENCE_REPLAY, or UNSUPPORTED. """ require_capability(session, MEDIA_READ_CAPABILITY, "Media read") path = required_string_param(params, "path") if path.startswith("ralph://media/"): return _handle_replay_uri(session, workspace, path) return _handle_workspace_media(session, workspace, path, max_inline_bytes)
[docs] def handle_read_image( session: CoordinationSessionLike, workspace: Workspace, params: dict[str, object], *, max_inline_bytes: int = 5_242_880, ) -> ToolResult: """Read an image file and return it as a capability-aware content block. Requires MediaRead capability. Validates that the file is a supported image format, then delegates to the shared workspace media handler for delivery decision (inline image, typed block, or explicit unsupported/error). This is a compatibility alias over ``_handle_workspace_media`` that restricts inputs to image formats only while preserving the same truthful delivery contract as ``read_media``. """ require_capability(session, MEDIA_READ_CAPABILITY, "Image read") path = required_string_param(params, "path") normalized = normalize_relative_path(path) mime_type = infer_image_mime_type(normalized or path) if mime_type is None: suffix = PurePosixPath(path).suffix.lower() or "(none)" return ToolResult( content=[ ToolContent.text_content( f"Unsupported image format '{suffix}'. " f"Supported: {', '.join(sorted(_SUPPORTED_IMAGE_MIME_TYPES.keys()))}" ) ], is_error=True, ) return _handle_workspace_media(session, workspace, path, max_inline_bytes)
def _extract_resource_reference_replay_blocks( result: object, ) -> list[dict[str, str]]: """Extract resource_reference_replay blocks from a normalized upstream result.""" if not isinstance(result, dict): return [] raw_content: object = result.get("content") if not isinstance(raw_content, list): return [] blocks: list[dict[str, str]] = [] for item in raw_content: if not isinstance(item, dict): continue block: dict[str, str] = {k: str(v) for k, v in item.items() if isinstance(v, str)} if ( block.get("type") == "resource_reference" and block.get("delivery") == "resource_reference_replay" ): blocks.append(block) return blocks def _extract_resource_reference_blocks( result: object, ) -> list[dict[str, str]]: """Extract URI-backed resource_reference blocks from a normalized upstream result. These blocks reference external URIs (not Ralph-owned artifacts) and cannot be replayed across sessions. They are synthesized as unsupported_runtime_seam entries at the cross-session handoff boundary. """ if not isinstance(result, dict): return [] raw_content: object = result.get("content") if not isinstance(raw_content, list): return [] blocks: list[dict[str, str]] = [] for item in raw_content: if not isinstance(item, dict): continue block: dict[str, str] = {k: str(v) for k, v in item.items() if isinstance(v, str)} if ( block.get("type") == "resource_reference" and block.get("delivery") == "resource_reference" ): blocks.append(block) return blocks
[docs] def persist_upstream_media_artifacts( result: object, session: object, workspace: Workspace, ) -> None: """Persist upstream embedded media artifacts to the durable cache and session index. Called after normalize_upstream_content_blocks so that: - resource_reference_replay blocks (backed by ralph://media/... URIs stored in the session manifest) are written to the durable cache and session index, enabling cross-session replay of artifacts from upstream embedded-data blocks. - URI-backed resource_reference blocks (delivery='resource_reference') reference external URIs and cannot be replayed across sessions. These are synthesized as unsupported_runtime_seam entries so the failure is explicit at invoke time. """ replay_blocks = _extract_resource_reference_replay_blocks(result) uri_blocks = _extract_resource_reference_blocks(result) if not replay_blocks and not uri_blocks: return manifest = _get_media_manifest(session) profile = _get_session_capability_profile(session) if replay_blocks and manifest is not None: for block in replay_blocks: uri = block.get("uri", "") artifact_id = parse_media_uri(uri) if artifact_id is None: continue entry = manifest.get(artifact_id) if entry is None: continue verdict = profile.verdict_for(entry.modality) raw_bytes = entry.load_bytes() if raw_bytes is None: continue cache_path = _write_durable_media_cache(workspace, artifact_id, raw_bytes) identity_key = entry.identity_key or build_media_identity( modality=entry.modality, mime_type=entry.mime_type, title=entry.title, source=MediaSource(raw_bytes=raw_bytes), ) entry.set_replay_source( cache_path=cache_path, byte_loader=_workspace_artifact_loader(workspace, cache_path, ""), ) _persist_media_session_entry( session, workspace, { "uri": uri, "mime_type": entry.mime_type, "title": entry.title, "modality": entry.modality, "delivery": "resource_reference_replay", "reason": verdict.reason, "source_path": "", "cache_path": cache_path, "source_uri": "", "block_type": verdict.block_type or "", "identity_key": identity_key, }, ) if uri_blocks: for block in uri_blocks: uri = block.get("uri", "") modality = block.get("modality", "unknown") title = block.get("title", uri.rsplit("/", maxsplit=1)[-1] or "untitled") mime_type = block.get("mimeType", "application/octet-stream") source_uri = uri reason = ( f"Active runtime seam cannot carry {modality} content through the handoff path. " f"External URI-backed artifacts are not replayable across sessions." ) _persist_media_session_entry( session, workspace, { "uri": uri, "mime_type": mime_type, "title": title, "modality": modality, "delivery": "unsupported", "reason": reason, "source_path": "", "cache_path": "", "source_uri": source_uri, "block_type": "", "failure_kind": "unsupported_runtime_seam", "identity_key": build_media_identity( modality=modality, mime_type=mime_type, title=title, source=MediaSource(source_uri=source_uri), ), }, )