Source code for ralph.mcp.tools.workspace._utils

"""Workspace utility functions, constants, and helpers."""

from __future__ import annotations

import json
from pathlib import PurePosixPath
from typing import TYPE_CHECKING, NamedTuple, cast

from ralph.mcp.artifacts.policy_outcomes import is_policy_approved
from ralph.mcp.tools.coordination import (
    CapabilityDeniedError,
    InvalidParamsError,
    ToolError,
)

if TYPE_CHECKING:
    from collections.abc import Callable

    from ralph.workspace import Workspace

WORKSPACE_READ_CAPABILITY = "WorkspaceRead"
WORKSPACE_WRITE_TRACKED_CAPABILITY = "WorkspaceWriteTracked"
WORKSPACE_WRITE_EPHEMERAL_CAPABILITY = "WorkspaceWriteEphemeral"
WORKSPACE_METADATA_READ_CAPABILITY = "WorkspaceMetadataRead"
WORKSPACE_EDIT_CAPABILITY = "WorkspaceEdit"
WORKSPACE_DELETE_CAPABILITY = "WorkspaceDelete"
MEDIA_READ_CAPABILITY = "media.read"

_SUPPORTED_IMAGE_MIME_TYPES: dict[str, str] = {
    ".png": "image/png",
    ".jpg": "image/jpeg",
    ".jpeg": "image/jpeg",
    ".gif": "image/gif",
    ".webp": "image/webp",
}

_GREP_DEFAULT_LIMIT = 1000
_MAX_PATTERN_LENGTH = 1000
FULL_READ_DEFAULT_MAX_BYTES = 5_000_000


def _attribute_value(
    obj: object, attribute_name: str, default: object | None = None
) -> object | None:
    return cast("object | None", getattr(obj, attribute_name, default))


[docs] def required_string_param(params: dict[str, object], name: str) -> str: """Return a required string parameter, raising if it is missing.""" value = params.get(name) if not isinstance(value, str): raise InvalidParamsError(f"Missing '{name}' parameter") return value
def _tool_json(data: dict[str, object]) -> str: """Serialize a result dict to a JSON string for ToolResult content.""" return json.dumps(data) def _int_param(params: dict[str, object], name: str, default: int = 0) -> int: """Extract an int parameter from params dict with a default.""" value = params.get(name, default) if isinstance(value, int): return value return int(str(value)) def _int_opt_param(params: dict[str, object], name: str) -> int | None: """Extract an optional int parameter from params dict.""" value = params.get(name) if value is None: return None if isinstance(value, int): return value return int(str(value)) def normalize_relative_path(path: str) -> str: normalized = str(PurePosixPath(path)) if normalized in ("", "."): return "" return normalized def join_path(base: str, entry: str) -> str: if not base: return normalize_relative_path(entry) return normalize_relative_path(str(PurePosixPath(base) / entry)) def list_dir_entries(workspace: Workspace, path: str) -> list[str]: try: return workspace.list_dir(path) except Exception as exc: raise ToolError(f"Failed to list directory '{path}': {exc}") from exc def is_parallel_worker(session: object) -> bool: flag = _attribute_value(session, "is_parallel_worker", False) if callable(flag): try: executable = cast("Callable[[], object]", flag) return bool(executable()) except TypeError: return False return bool(flag) def check_edit_area_restriction(session: object, path: str) -> None: if not is_parallel_worker(session): return checker = _attribute_value(session, "check_edit_area") if not callable(checker): return callable_checker = cast("Callable[[str], object]", checker) outcome = callable_checker(path) if is_policy_approved(outcome): return raise CapabilityDeniedError(f"Write to '{path}' denied: edit area restriction") def _write_file_to_workspace(workspace: Workspace, path: str, content: str) -> None: try: workspace.write(path, content) except Exception as exc: raise ToolError(f"Failed to write file '{path}': {exc}") from exc def is_path_git_tracked(workspace: Workspace, path: str) -> bool: normalized = normalize_relative_path(path) if not normalized: return False try: exists = workspace.exists(normalized) except ValueError: return False if not exists: return False candidate = normalized.replace("\\", "/") return ( ".agent/" not in candidate and "/target/" not in candidate and "node_modules/" not in candidate ) def infer_image_mime_type(path: str) -> str | None: suffix = PurePosixPath(path).suffix.lower() return _SUPPORTED_IMAGE_MIME_TYPES.get(suffix) class _ReadSelector(NamedTuple): """Normalized partial-read selectors for read_file.""" start: int | None end: int | None off: int | None lim: int | None head: int | None tail: int | None @classmethod def from_params(cls, params: dict[str, object]) -> _ReadSelector: """Extract and normalize selectors from raw MCP params. Treats 0 as absent for all params except offset (offset=0 is a valid start-of-file position). Inert zero defaults sent by brokers are normalized to None so they do not trigger mode selection. """ def _n(v: int | None) -> int | None: return None if v == 0 else v return cls( start=_n(_int_opt_param(params, "line_start")), end=_n(_int_opt_param(params, "line_end")), off=_int_opt_param(params, "offset"), lim=_n(_int_opt_param(params, "limit")), head=_n(_int_opt_param(params, "head")), tail=_n(_int_opt_param(params, "tail")), ) def is_active(self) -> bool: """Return True when at least one partial-read mode is requested.""" line_range = (self.start is not None) or (self.end is not None) byte_window = (self.off is not None and self.off > 0) or (self.lim is not None) return line_range or byte_window or (self.head is not None) or (self.tail is not None)