Source code for ralph.language_detector.scanner

"""Workspace scanning utilities for language detection."""

from __future__ import annotations

from collections import deque
from pathlib import PurePosixPath
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from collections.abc import Iterator

    from ralph.workspace.protocol import Workspace

MAX_FILES_TO_SCAN = 2000
MAX_SIGNATURE_SEARCH_DEPTH = 6
SKIP_DIR_NAMES: set[str] = {
    "node_modules",
    "target",
    "dist",
    "build",
    "vendor",
    "__pycache__",
    "venv",
    ".venv",
    "env",
}
SIGNATURE_FILE_NAMES: set[str] = {
    "cargo.toml",
    "pyproject.toml",
    "requirements.txt",
    "setup.py",
    "pipfile",
    "package.json",
    "package-lock.json",
    "yarn.lock",
    "pnpm-lock.yaml",
    "bun.lockb",
    "bun.lock",
    "gemfile",
    "go.mod",
    "pom.xml",
    "build.gradle",
    "build.gradle.kts",
    "composer.json",
}
TEST_DIRECTORY_NAMES: set[str] = {"tests", "test", "spec", "__tests__"}


[docs] def normalize_path(path: str) -> str: """Normalise ``path`` to a forward-slash form, returning empty string for the root.""" normalized = str(PurePosixPath(path)) return "" if normalized in {"", "."} else normalized
[docs] def join_path(parent: str, child: str) -> str: """Join ``parent`` and ``child`` as a normalised POSIX path.""" if not parent: return normalize_path(child) return normalize_path(str(PurePosixPath(parent) / child))
[docs] def should_skip_dir_name(name: str) -> bool: """Return True if ``name`` is a hidden directory or a known build/cache directory.""" lowered = name.lower() return lowered.startswith(".") or lowered in SKIP_DIR_NAMES
[docs] def iter_files(workspace: Workspace, root: str = "") -> Iterator[str]: """Yield every file path under ``root`` up to ``MAX_FILES_TO_SCAN`` files.""" queue: deque[str] = deque([normalize_path(root)]) visited: set[str] = set() scanned = 0 while queue and scanned < MAX_FILES_TO_SCAN: current = queue.popleft() if current in visited: continue visited.add(current) try: entries = workspace.list_dir(current) except FileNotFoundError: continue for entry in entries: child_path = join_path(current, entry) if workspace.is_dir(child_path): if not should_skip_dir_name(entry): queue.append(child_path) continue if workspace.is_file(child_path): scanned += 1 yield child_path if scanned >= MAX_FILES_TO_SCAN: break
[docs] def count_extensions(workspace: Workspace, root: str = "") -> dict[str, int]: """Return a map from lowercase file extension to file count under ``root``.""" counts: dict[str, int] = {} for path in iter_files(workspace, root): suffix = PurePosixPath(path).suffix if not suffix: continue extension = suffix[1:].lower() counts[extension] = counts.get(extension, 0) + 1 return counts
[docs] def collect_signature_files(workspace: Workspace, root: str = "") -> dict[str, list[str]]: """Return a map from lowercased signature file name to a list of matching paths.""" signatures: dict[str, list[str]] = {} queue: deque[tuple[str, int]] = deque([(normalize_path(root), 0)]) while queue: current, depth = queue.popleft() try: entries = workspace.list_dir(current) except FileNotFoundError: continue for entry in entries: entry_lower = entry.lower() child_path = join_path(current, entry) if workspace.is_dir(child_path): if depth < MAX_SIGNATURE_SEARCH_DEPTH and not should_skip_dir_name(entry_lower): queue.append((child_path, depth + 1)) continue if workspace.is_file(child_path) and entry_lower in SIGNATURE_FILE_NAMES: signatures.setdefault(entry_lower, []).append(child_path) return signatures
[docs] def is_test_file_name(file_name: str, primary_language: str, path_components: list[str]) -> bool: """Return True if ``file_name`` matches the test file convention for ``primary_language``.""" lower_name = file_name.lower() language_checks = { "Go": lambda: lower_name.endswith("_test.go"), "PHP": lambda: lower_name.endswith("test.php") or lower_name.endswith("spec.php"), "Python": lambda: ( (lower_name.startswith("test_") and lower_name.endswith(".py")) or lower_name.endswith("_test.py") ), "Ruby": lambda: lower_name.endswith("_spec.rb") or lower_name.endswith("_test.rb"), } if primary_language == "Rust": return ( lower_name == "tests.rs" or lower_name.endswith("_test.rs") or (lower_name.endswith(".rs") and "tests" in path_components) ) if primary_language in {"JavaScript", "TypeScript"}: return any( lower_name.endswith(suffix) for suffix in ( ".test.js", ".spec.js", ".test.ts", ".spec.ts", ".test.tsx", ".spec.tsx", ) ) if primary_language == "Java": return ("src" in path_components and "test" in path_components) or lower_name.endswith( "test.java" ) check = language_checks.get(primary_language) if check is not None: return check() return "test" in lower_name or "spec" in lower_name
[docs] def detect_tests(workspace: Workspace, root: str = "", primary_language: str = "Unknown") -> bool: """Return True if the workspace contains any recognisable test directories or test files.""" queue: deque[str] = deque([normalize_path(root)]) scanned = 0 while queue and scanned < MAX_FILES_TO_SCAN: current = queue.popleft() try: entries = workspace.list_dir(current) except FileNotFoundError: continue for entry in entries: entry_path = join_path(current, entry) entry_lower = entry.lower() if workspace.is_dir(entry_path): if entry_lower in TEST_DIRECTORY_NAMES: return True if should_skip_dir_name(entry_lower): continue queue.append(entry_path) continue if workspace.is_file(entry_path): scanned += 1 components = [part.lower() for part in PurePosixPath(entry_path).parts] if is_test_file_name(entry, primary_language, components): return True if scanned >= MAX_FILES_TO_SCAN: break return False