Source code for ralph.mcp.tools.workspace._grep_handlers

"""Grep/content-search handler."""

from __future__ import annotations

import re
from typing import TYPE_CHECKING

from ralph.mcp.tools.coordination import (
    CoordinationSessionLike,
    InvalidParamsError,
    ToolContent,
    ToolResult,
    require_capability,
)
from ralph.mcp.tools.workspace._list_ops import (
    _collect_files_recursive,
    match_glob,
)
from ralph.mcp.tools.workspace._utils import (
    _GREP_DEFAULT_LIMIT,
    _MAX_PATTERN_LENGTH,
    WORKSPACE_READ_CAPABILITY,
    _int_param,
    _tool_json,
    normalize_relative_path,
    required_string_param,
)

if TYPE_CHECKING:
    from ralph.workspace import Workspace


def _compile_grep_pattern(
    pattern: str,
    *,
    is_regex: bool,
    case_sensitive: bool,
    whole_word: bool,
) -> re.Pattern[str]:
    """Compile a grep search pattern to a regex."""
    flags = 0 if case_sensitive else re.IGNORECASE
    if is_regex:
        try:
            return re.compile(pattern, flags)
        except re.error as exc:
            raise InvalidParamsError(f"Invalid regex pattern: {exc}") from exc
    escaped = re.escape(pattern)
    if whole_word:
        escaped = r"\b" + escaped + r"\b"
    return re.compile(escaped, flags)


def _collect_files_for_grep(workspace: Workspace, normalized: str) -> list[str]:
    """Collect all files under normalized path for grep, with fallback."""
    try:
        return list(workspace.iter_files(normalized))
    except Exception:
        return _collect_files_recursive(workspace, normalized)


def _search_file_content(
    workspace: Workspace,
    file_path: str,
    compiled: re.Pattern[str],
    context_before: int,
    context_after: int,
    _max_file_bytes: int,
) -> list[dict[str, object]] | None:
    """Search a single file for matches; returns None if the file should be skipped."""
    try:
        file_stat = workspace.stat(file_path)
    except Exception:
        return None

    if file_stat.get("type") == "dir":
        return None
    size_bytes = file_stat.get("size_bytes", 0)
    if isinstance(size_bytes, int) and size_bytes > _max_file_bytes:
        return None

    try:
        content = workspace.read(file_path)
    except (UnicodeDecodeError, Exception):
        return None

    lines = content.splitlines(keepends=True)
    matches: list[dict[str, object]] = []
    for line_no, line in enumerate(lines, 1):
        if not compiled.search(line):
            continue
        start_idx = max(0, line_no - 1 - context_before)
        ctx_before = [lines[i].rstrip("\n\r") for i in range(start_idx, line_no - 1)]
        end_idx = min(len(lines), line_no + context_after)
        ctx_after = [lines[i].rstrip("\n\r") for i in range(line_no, end_idx)]
        matches.append(
            {
                "path": file_path,
                "line": line_no,
                "text": line.rstrip("\n\r"),
                "context_before": ctx_before,
                "context_after": ctx_after,
            }
        )
    return matches


[docs] def handle_grep_files( session: CoordinationSessionLike, workspace: Workspace, params: dict[str, object], ) -> ToolResult: """Search file contents for a pattern and return line-level matches.""" require_capability(session, WORKSPACE_READ_CAPABILITY, "Content search") pattern = required_string_param(params, "pattern") path = required_string_param(params, "path") normalized = normalize_relative_path(path) is_regex = bool(params.get("regex", True)) case_sensitive = bool(params.get("case_sensitive", True)) whole_word = bool(params.get("whole_word", False)) include_param = params.get("include") include = ( [str(p) for p in include_param] if include_param and isinstance(include_param, list) else None ) exclude_param = params.get("exclude") exclude = ( [str(p) for p in exclude_param] if exclude_param and isinstance(exclude_param, list) else None ) context_before = _int_param(params, "context_before", 0) context_after = _int_param(params, "context_after", 0) limit = _int_param(params, "limit", _GREP_DEFAULT_LIMIT) max_file_bytes = _int_param(params, "max_file_bytes", 5_000_000) if len(pattern) > _MAX_PATTERN_LENGTH: raise InvalidParamsError( f"Pattern exceeds maximum length of {_MAX_PATTERN_LENGTH} characters" ) compiled = _compile_grep_pattern( pattern, is_regex=is_regex, case_sensitive=case_sensitive, whole_word=whole_word, ) all_files = _collect_files_for_grep(workspace, normalized) matches: list[dict[str, object]] = [] skipped_files = 0 truncated = False for file_path in all_files: if include and not any(match_glob(file_path, p) for p in include): continue if exclude and any(match_glob(file_path, p) for p in exclude): continue file_matches = _search_file_content( workspace, file_path, compiled, context_before, context_after, max_file_bytes, ) if file_matches is None: skipped_files += 1 continue for m in file_matches: matches.append(m) if len(matches) >= limit: truncated = True break if truncated: break result = { "pattern": pattern, "base": path, "matches": matches, "truncated": truncated, "skipped_files": skipped_files, } return ToolResult(content=[ToolContent.text_content(_tool_json(result))], is_error=False)