"""Grep/content-search handler."""
from __future__ import annotations
import re
from typing import TYPE_CHECKING
from ralph.mcp.tools.coordination import (
CoordinationSessionLike,
InvalidParamsError,
ToolContent,
ToolResult,
require_capability,
)
from ralph.mcp.tools.workspace._list_ops import (
_collect_files_recursive,
match_glob,
)
from ralph.mcp.tools.workspace._utils import (
_GREP_DEFAULT_LIMIT,
_MAX_PATTERN_LENGTH,
WORKSPACE_READ_CAPABILITY,
_int_param,
_tool_json,
normalize_relative_path,
required_string_param,
)
if TYPE_CHECKING:
from ralph.workspace import Workspace
def _compile_grep_pattern(
pattern: str,
*,
is_regex: bool,
case_sensitive: bool,
whole_word: bool,
) -> re.Pattern[str]:
"""Compile a grep search pattern to a regex."""
flags = 0 if case_sensitive else re.IGNORECASE
if is_regex:
try:
return re.compile(pattern, flags)
except re.error as exc:
raise InvalidParamsError(f"Invalid regex pattern: {exc}") from exc
escaped = re.escape(pattern)
if whole_word:
escaped = r"\b" + escaped + r"\b"
return re.compile(escaped, flags)
def _collect_files_for_grep(workspace: Workspace, normalized: str) -> list[str]:
"""Collect all files under normalized path for grep, with fallback."""
try:
return list(workspace.iter_files(normalized))
except Exception:
return _collect_files_recursive(workspace, normalized)
def _search_file_content(
workspace: Workspace,
file_path: str,
compiled: re.Pattern[str],
context_before: int,
context_after: int,
_max_file_bytes: int,
) -> list[dict[str, object]] | None:
"""Search a single file for matches; returns None if the file should be skipped."""
try:
file_stat = workspace.stat(file_path)
except Exception:
return None
if file_stat.get("type") == "dir":
return None
size_bytes = file_stat.get("size_bytes", 0)
if isinstance(size_bytes, int) and size_bytes > _max_file_bytes:
return None
try:
content = workspace.read(file_path)
except (UnicodeDecodeError, Exception):
return None
lines = content.splitlines(keepends=True)
matches: list[dict[str, object]] = []
for line_no, line in enumerate(lines, 1):
if not compiled.search(line):
continue
start_idx = max(0, line_no - 1 - context_before)
ctx_before = [lines[i].rstrip("\n\r") for i in range(start_idx, line_no - 1)]
end_idx = min(len(lines), line_no + context_after)
ctx_after = [lines[i].rstrip("\n\r") for i in range(line_no, end_idx)]
matches.append(
{
"path": file_path,
"line": line_no,
"text": line.rstrip("\n\r"),
"context_before": ctx_before,
"context_after": ctx_after,
}
)
return matches
[docs]
def handle_grep_files(
session: CoordinationSessionLike,
workspace: Workspace,
params: dict[str, object],
) -> ToolResult:
"""Search file contents for a pattern and return line-level matches."""
require_capability(session, WORKSPACE_READ_CAPABILITY, "Content search")
pattern = required_string_param(params, "pattern")
path = required_string_param(params, "path")
normalized = normalize_relative_path(path)
is_regex = bool(params.get("regex", True))
case_sensitive = bool(params.get("case_sensitive", True))
whole_word = bool(params.get("whole_word", False))
include_param = params.get("include")
include = (
[str(p) for p in include_param]
if include_param and isinstance(include_param, list)
else None
)
exclude_param = params.get("exclude")
exclude = (
[str(p) for p in exclude_param]
if exclude_param and isinstance(exclude_param, list)
else None
)
context_before = _int_param(params, "context_before", 0)
context_after = _int_param(params, "context_after", 0)
limit = _int_param(params, "limit", _GREP_DEFAULT_LIMIT)
max_file_bytes = _int_param(params, "max_file_bytes", 5_000_000)
if len(pattern) > _MAX_PATTERN_LENGTH:
raise InvalidParamsError(
f"Pattern exceeds maximum length of {_MAX_PATTERN_LENGTH} characters"
)
compiled = _compile_grep_pattern(
pattern,
is_regex=is_regex,
case_sensitive=case_sensitive,
whole_word=whole_word,
)
all_files = _collect_files_for_grep(workspace, normalized)
matches: list[dict[str, object]] = []
skipped_files = 0
truncated = False
for file_path in all_files:
if include and not any(match_glob(file_path, p) for p in include):
continue
if exclude and any(match_glob(file_path, p) for p in exclude):
continue
file_matches = _search_file_content(
workspace,
file_path,
compiled,
context_before,
context_after,
max_file_bytes,
)
if file_matches is None:
skipped_files += 1
continue
for m in file_matches:
matches.append(m)
if len(matches) >= limit:
truncated = True
break
if truncated:
break
result = {
"pattern": pattern,
"base": path,
"matches": matches,
"truncated": truncated,
"skipped_files": skipped_files,
}
return ToolResult(content=[ToolContent.text_content(_tool_json(result))], is_error=False)