Source code for ralph.mcp.tools.workspace._read_handlers

"""Read, stat, list, and search handler functions."""

from __future__ import annotations

import fnmatch
from typing import TYPE_CHECKING

from ralph.mcp.tools.coordination import (
    CoordinationSessionLike,
    InvalidParamsError,
    ToolContent,
    ToolError,
    ToolResult,
    require_capability,
)
from ralph.mcp.tools.workspace._list_ops import (
    _collect_matching_files,
    _list_dir_recursive_output,
    list_dir_flat,
)
from ralph.mcp.tools.workspace._utils import (
    _GREP_DEFAULT_LIMIT,
    FULL_READ_DEFAULT_MAX_BYTES,
    WORKSPACE_METADATA_READ_CAPABILITY,
    WORKSPACE_READ_CAPABILITY,
    _int_opt_param,
    _int_param,
    _ReadSelector,
    _tool_json,
    join_path,
    normalize_relative_path,
    required_string_param,
)
from ralph.workspace.skip import RECURSIVE_SKIP_DIRECTORY_NAMES

if TYPE_CHECKING:
    from ralph.workspace import Workspace


def _dispatch_partial_read(
    workspace: Workspace,
    normalized: str,
    path: str,
    sel: _ReadSelector,
) -> ToolResult:
    """Validate selectors and dispatch to byte-window or line-based partial read."""
    line_range = (sel.start is not None) or (sel.end is not None)
    byte_window = (sel.off is not None and sel.off > 0) or (sel.lim is not None)
    head_mode = sel.head is not None
    tail_mode = sel.tail is not None

    if line_range and byte_window:
        raise InvalidParamsError("Cannot combine line_start/line_end with offset/limit")
    if byte_window and (head_mode or tail_mode):
        raise InvalidParamsError("Cannot combine offset/limit with head/tail")
    if line_range and (head_mode or tail_mode):
        raise InvalidParamsError("Cannot combine line_start/line_end with head/tail")

    if byte_window:
        byte_offset = sel.off if sel.off is not None else 0
        content, meta = workspace.read_bytes(normalized, offset=byte_offset, limit=sel.lim)
        payload: dict[str, object] = {
            "path": path,
            "content": content,
            "total_bytes": meta.get("total_bytes"),
            "returned_bytes": meta.get("returned_bytes"),
            "truncated": meta.get("truncated"),
        }
    else:
        content, meta = workspace.read_lines(
            normalized, start=sel.start, end=sel.end, head=sel.head, tail=sel.tail
        )
        payload = {
            "path": path,
            "content": content,
            "total_lines": meta.get("total_lines"),
            "returned_lines": meta.get("returned_lines"),
            "truncated": meta.get("truncated"),
        }
    return ToolResult(content=[ToolContent.text_content(_tool_json(payload))], is_error=False)


[docs] def handle_read_file( session: CoordinationSessionLike, workspace: Workspace, params: dict[str, object], ) -> ToolResult: """Read a UTF-8 file from the workspace. Full-file reads (no partial params) return a plain text block for UTF-8 files at or below max_bytes (default 5_000_000). The JSON envelope only appears when truncated is True OR when an error occurs (binary_or_invalid_utf8). Partial-read parameter groups (line_start/line_end, offset/limit, head, tail) are mutually exclusive; combining any two raises InvalidParams. Optional param max_bytes overrides the default ceiling for full-file reads. """ require_capability(session, WORKSPACE_READ_CAPABILITY, "Workspace read") path = required_string_param(params, "path") normalized = normalize_relative_path(path) sel = _ReadSelector.from_params(params) if sel.is_active(): return _dispatch_partial_read(workspace, normalized, path, sel) max_bytes = _int_param(params, "max_bytes", FULL_READ_DEFAULT_MAX_BYTES) try: stat_result: dict[str, object] = workspace.stat(normalized) except Exception: stat_result = {} file_type = stat_result.get("type", "") size_bytes = stat_result.get("size_bytes") if file_type == "file" and isinstance(size_bytes, int) and size_bytes > max_bytes: head_value = max(1, max_bytes // 256) content, _meta = workspace.read_lines(normalized, head=head_value) payload = { "path": path, "content": content, "truncated": True, "total_bytes": size_bytes, "max_bytes": max_bytes, "reason": "oversize", } return ToolResult(content=[ToolContent.text_content(_tool_json(payload))], is_error=False) try: content = workspace.read(normalized) except UnicodeDecodeError as exc: payload = { "status": "binary_or_invalid_utf8", "path": path, "error": str(exc), "byte_offset": exc.start, } return ToolResult(content=[ToolContent.text_content(_tool_json(payload))], is_error=True) except FileNotFoundError as exc: raise ToolError(f"Failed to read file '{path}': {exc}") from exc except Exception as exc: raise ToolError(f"Failed to read file '{path}': {exc}") from exc return ToolResult(content=[ToolContent.text_content(content)], is_error=False)
[docs] def handle_read_multiple_files( session: CoordinationSessionLike, workspace: Workspace, params: dict[str, object], ) -> ToolResult: """Read multiple workspace files in one call and return per-file results.""" require_capability(session, WORKSPACE_READ_CAPABILITY, "Read multiple files") paths_param = params.get("paths") if not isinstance(paths_param, list): raise InvalidParamsError("Missing 'paths' parameter as list of strings") paths = [str(p) for p in paths_param] results: list[dict[str, object]] = [] for p in paths: normalized = normalize_relative_path(p) try: content = workspace.read(normalized) results.append({"path": p, "content": content}) except Exception as exc: results.append({"path": p, "error": str(exc)}) payload = _tool_json({"files": results}) return ToolResult(content=[ToolContent.text_content(payload)], is_error=False)
[docs] def handle_stat( session: CoordinationSessionLike, workspace: Workspace, params: dict[str, object], ) -> ToolResult: """Return file metadata (type, size, timestamps) for a workspace path.""" require_capability(session, WORKSPACE_METADATA_READ_CAPABILITY, "Workspace metadata read") path = required_string_param(params, "path") normalized = normalize_relative_path(path) try: stat_result = workspace.stat(normalized) except Exception as exc: raise ToolError(f"Failed to stat '{path}': {exc}") from exc return ToolResult(content=[ToolContent.text_content(_tool_json(stat_result))], is_error=False)
[docs] def handle_list_allowed_roots( session: CoordinationSessionLike, workspace: Workspace, params: dict[str, object], ) -> ToolResult: """Return the list of workspace paths the session is permitted to access.""" require_capability(session, WORKSPACE_READ_CAPABILITY, "List allowed roots") try: roots = workspace.allowed_roots() except Exception as exc: raise ToolError(f"Failed to list allowed roots: {exc}") from exc payload = _tool_json({"allowed_roots": roots}) return ToolResult(content=[ToolContent.text_content(payload)], is_error=False)
[docs] def handle_list_directory( session: CoordinationSessionLike, workspace: Workspace, params: dict[str, object], ) -> ToolResult: """List entries in a workspace directory, optionally recursive.""" require_capability(session, WORKSPACE_READ_CAPABILITY, "Directory listing") path = required_string_param(params, "path") recursive = bool(params.get("recursive", False)) output = ( list_dir_flat(workspace, path) if not recursive else _list_dir_recursive_output(workspace, path) ) return ToolResult(content=[ToolContent.text_content(output)], is_error=False)
[docs] def handle_list_directory_recursive( session: CoordinationSessionLike, workspace: Workspace, params: dict[str, object], ) -> ToolResult: """Return a flat listing of all entries under a workspace directory.""" require_capability(session, WORKSPACE_READ_CAPABILITY, "Recursive directory listing") path = required_string_param(params, "path") output = _list_dir_recursive_output(workspace, path) return ToolResult(content=[ToolContent.text_content(output)], is_error=False)
def _build_directory_tree( workspace: Workspace, path: str, current_depth: int, max_depth: int | None, exclude_patterns: list[str] | None, ) -> dict[str, object]: """Build a recursive directory tree structure.""" normalized = normalize_relative_path(path) name = normalized.split("/")[-1] if normalized else path def should_exclude(entry_name: str, entry_path: str) -> bool: if not exclude_patterns: return False for pat in exclude_patterns: if fnmatch.fnmatchcase(entry_name, pat) or fnmatch.fnmatchcase(entry_path, pat): return True return False is_dir = workspace.is_dir(normalized) if not is_dir: return {"name": name, "type": "file"} if max_depth is not None and current_depth >= max_depth: return {"name": name, "type": "dir", "children": []} entries: list[dict[str, object]] = [] try: dir_entries = workspace.list_dir(normalized) except Exception: dir_entries = [] for entry in sorted(dir_entries): entry_path = join_path(normalized, entry) if should_exclude(entry, entry_path): continue if entry in RECURSIVE_SKIP_DIRECTORY_NAMES: continue child = _build_directory_tree( workspace, entry_path, current_depth + 1, max_depth, exclude_patterns ) entries.append(child) return {"name": name, "type": "dir", "children": entries}
[docs] def handle_directory_tree( session: CoordinationSessionLike, workspace: Workspace, params: dict[str, object], ) -> ToolResult: """Return a nested JSON directory tree for a workspace path.""" require_capability(session, WORKSPACE_READ_CAPABILITY, "Directory tree") path = required_string_param(params, "path") max_depth = _int_opt_param(params, "max_depth") exclude_patterns = params.get("exclude_patterns") if exclude_patterns and isinstance(exclude_patterns, list): exclude_patterns = [str(p) for p in exclude_patterns] else: exclude_patterns = None try: tree = _build_directory_tree(workspace, path, 0, max_depth, exclude_patterns) except Exception as exc: raise ToolError(f"Failed to build directory tree for '{path}': {exc}") from exc return ToolResult(content=[ToolContent.text_content(_tool_json(tree))], is_error=False)
[docs] def handle_search_files( session: CoordinationSessionLike, workspace: Workspace, params: dict[str, object], ) -> ToolResult: """Search for files matching a glob pattern within a workspace directory.""" require_capability(session, WORKSPACE_READ_CAPABILITY, "File search") pattern = required_string_param(params, "pattern") path = required_string_param(params, "path") normalized = normalize_relative_path(path) exclude_param = params.get("exclude") exclude = ( [str(p) for p in exclude_param] if exclude_param and isinstance(exclude_param, list) else None ) limit = _int_param(params, "limit", _GREP_DEFAULT_LIMIT) matches = _collect_matching_files(workspace, normalized, pattern, exclude=exclude) truncated = len(matches) > limit if truncated: matches = matches[:limit] output = { "pattern": pattern, "base": path, "matches": matches, "truncated": truncated, } return ToolResult(content=[ToolContent.text_content(_tool_json(output))], is_error=False)