Source code for ralph.guidelines.stack

"""Stack-guided review guidelines for the Python port."""

from __future__ import annotations

from dataclasses import dataclass, field
from importlib import import_module
from typing import TYPE_CHECKING, cast

from ralph.guidelines._detected_stack import DetectedStack
from ralph.guidelines._detected_stack_like import _DetectedStackLike

if TYPE_CHECKING:
    from collections.abc import Callable, Iterable, Sequence
    from types import ModuleType
    from typing import Protocol

    from ralph.guidelines._guideline_source import GuidelineSource
    from ralph.workspace.protocol import Workspace

    class _HandlerCallable(Protocol):
        """Protocol for language handler callables."""

        def __call__(self, *args: object, **kwargs: object) -> GuidelineSource: ...

    class _StackDetector(Protocol):
        """Protocol for detect_stack_with_workspace callables."""

        def __call__(self, workspace: Workspace, root: str) -> object: ...


CATEGORY_FIELDS = (
    "quality_checks",
    "security_checks",
    "performance_checks",
    "testing_checks",
    "documentation_checks",
    "idioms",
    "anti_patterns",
    "concurrency_checks",
    "resource_checks",
    "observability_checks",
    "secrets_checks",
    "api_design_checks",
)

BASE_QUALITY_CHECKS = [
    "Code follows consistent style and formatting",
    "Functions have single responsibility",
    "Error handling is comprehensive",
    "No dead code or unused imports",
]
BASE_SECURITY_CHECKS = [
    "No hardcoded secrets or credentials",
    "Input validation on external data",
    "Proper authentication/authorization checks",
]
BASE_PERFORMANCE_CHECKS = [
    "No obvious performance bottlenecks",
    "Efficient data structures used",
]
BASE_TESTING_CHECKS = [
    "Tests cover main functionality",
    "Edge cases are tested",
]
BASE_DOCUMENTATION_CHECKS = [
    "Public APIs are documented",
    "Complex logic has explanatory comments",
]
BASE_IDIOMS = ["Code follows language conventions"]
BASE_ANTI_PATTERNS = ["Avoid code duplication"]
BASE_CONCURRENCY_CHECKS = [
    "Shared mutable state is properly synchronized",
    "No potential deadlocks (lock ordering)",
]
BASE_RESOURCE_CHECKS = [
    "Resources are properly closed/released",
    "No resource leaks in error paths",
]
BASE_OBSERVABILITY_CHECKS = [
    "Errors are logged with context",
    "Critical operations have appropriate logging",
]
BASE_SECRETS_CHECKS = [
    "Secrets loaded from environment/config, not hardcoded",
    "Sensitive data not logged or exposed in errors",
]
BASE_API_DESIGN_CHECKS = [
    "API follows consistent naming conventions",
    "Breaking changes are clearly documented",
]

LANGUAGE_FRAMEWORK_MAP: dict[str, tuple[str, ...]] = {
    "Python": ("Django", "FastAPI", "Flask"),
    "JavaScript": (
        "React",
        "Vue",
        "Angular",
        "Svelte",
        "Next.js",
        "Nuxt",
        "Express",
        "Fastify",
        "NestJS",
        "Gatsby",
    ),
    "Go": ("Gin", "Echo", "Fiber", "Gorilla", "Chi"),
    "Java": ("Spring",),
    "PHP": ("Laravel", "Symfony"),
    "Ruby": ("Rails", "Sinatra"),
}

SIGNATURE_LANGUAGE_MAP: dict[str, str] = {
    "setup.py": "Python",
    "pyproject.toml": "Python",
    "requirements.txt": "Python",
    "Pipfile": "Python",
    "Cargo.toml": "Rust",
    "package.json": "JavaScript",
    "tsconfig.json": "TypeScript",
    "go.mod": "Go",
    "pom.xml": "Java",
    "build.gradle": "Java",
    "build.gradle.kts": "Kotlin",
    "Gemfile": "Ruby",
    "composer.json": "PHP",
}


def _load_guideline_class(module_name: str, class_name: str) -> _HandlerCallable:
    module = import_module(module_name)
    return cast("_HandlerCallable", _module_attr(module, class_name))


def _module_attr(module: ModuleType, attribute: str) -> object:
    namespace = cast("dict[str, object]", module.__dict__)
    return namespace[attribute]


def _dedupe_strings(values: Sequence[str]) -> list[str]:
    deduped: list[str] = []
    seen: set[str] = set()
    for value in values:
        if value in seen:
            continue
        seen.add(value)
        deduped.append(value)
    return deduped


def _guideline_categories(source: GuidelineSource) -> tuple[tuple[str, Sequence[str]], ...]:
    def _category(name: str) -> Sequence[str]:
        value: object = getattr(source, name, ())
        if isinstance(value, list | tuple) and all(isinstance(item, str) for item in value):
            return cast("Sequence[str]", value)
        return ()

    return (
        ("quality_checks", _category("quality_checks")),
        ("security_checks", _category("security_checks")),
        ("performance_checks", _category("performance_checks")),
        ("testing_checks", _category("testing_checks")),
        ("documentation_checks", _category("documentation_checks")),
        ("idioms", _category("idioms")),
        ("anti_patterns", _category("anti_patterns")),
        ("concurrency_checks", _category("concurrency_checks")),
        ("resource_checks", _category("resource_checks")),
        ("observability_checks", _category("observability_checks")),
        ("secrets_checks", _category("secrets_checks")),
        ("api_design_checks", _category("api_design_checks")),
    )


def _stack_from_detected(source: _DetectedStackLike) -> DetectedStack:
    return DetectedStack(
        primary_language=source.primary_language,
        secondary_languages=list(source.secondary_languages),
        frameworks=list(source.frameworks),
    )


def _frameworks_for_language(language: str, frameworks: Iterable[str]) -> list[str]:
    allowed = LANGUAGE_FRAMEWORK_MAP.get(language, ())
    allowed_lower = {fw.casefold() for fw in allowed}
    return [fw for fw in frameworks if fw.casefold() in allowed_lower]


def _language_group(language: str) -> str:
    if language == "Kotlin":
        return "Java"
    return language


def _rust_handler(frameworks: Iterable[str], _: bool) -> GuidelineSource:
    rust_guidelines = _load_guideline_class("ralph.guidelines.rust", "RustGuidelines")
    return rust_guidelines()


def _python_handler(frameworks: Iterable[str], _: bool) -> GuidelineSource:
    python_guidelines = _load_guideline_class("ralph.guidelines.python", "PythonGuidelines")
    return python_guidelines(frameworks=_frameworks_for_language("Python", frameworks))


def _javascript_handler(frameworks: Iterable[str], typescript: bool) -> GuidelineSource:
    javascript_guidelines = _load_guideline_class(
        "ralph.guidelines.javascript",
        "JavaScriptGuidelines",
    )
    return javascript_guidelines(
        frameworks=_frameworks_for_language("JavaScript", frameworks),
        typescript=typescript,
    )


def _go_handler(frameworks: Iterable[str], _: bool) -> GuidelineSource:
    go_guidelines = _load_guideline_class("ralph.guidelines.go", "GoGuidelines")
    return go_guidelines(frameworks=_frameworks_for_language("Go", frameworks))


def _java_handler(frameworks: Iterable[str], _: bool) -> GuidelineSource:
    java_guidelines = _load_guideline_class("ralph.guidelines.java", "JavaGuidelines")
    return java_guidelines(frameworks=_frameworks_for_language("Java", frameworks))


def _php_handler(frameworks: Iterable[str], _: bool) -> GuidelineSource:
    php_guidelines = _load_guideline_class("ralph.guidelines.php", "PHPGuidelines")
    return php_guidelines(frameworks=_frameworks_for_language("PHP", frameworks))


def _ruby_handler(frameworks: Iterable[str], _: bool) -> GuidelineSource:
    ruby_guidelines = _load_guideline_class("ralph.guidelines.ruby", "RubyGuidelines")
    return ruby_guidelines(frameworks=_frameworks_for_language("Ruby", frameworks))


LANGUAGE_HANDLERS: dict[str, Callable[[Iterable[str], bool], GuidelineSource]] = {
    "Rust": _rust_handler,
    "Python": _python_handler,
    "JavaScript": _javascript_handler,
    "Go": _go_handler,
    "Java": _java_handler,
    "Kotlin": _java_handler,
    "PHP": _php_handler,
    "Ruby": _ruby_handler,
}


[docs] @dataclass class StackGuidelines: """Merged review guidelines accumulated from all detected language handlers.""" quality_checks: Sequence[str] = field(default_factory=lambda: list(BASE_QUALITY_CHECKS)) security_checks: Sequence[str] = field(default_factory=lambda: list(BASE_SECURITY_CHECKS)) performance_checks: Sequence[str] = field(default_factory=lambda: list(BASE_PERFORMANCE_CHECKS)) testing_checks: Sequence[str] = field(default_factory=lambda: list(BASE_TESTING_CHECKS)) documentation_checks: Sequence[str] = field( default_factory=lambda: list(BASE_DOCUMENTATION_CHECKS) ) idioms: Sequence[str] = field(default_factory=lambda: list(BASE_IDIOMS)) anti_patterns: Sequence[str] = field(default_factory=lambda: list(BASE_ANTI_PATTERNS)) concurrency_checks: Sequence[str] = field(default_factory=lambda: list(BASE_CONCURRENCY_CHECKS)) resource_checks: Sequence[str] = field(default_factory=lambda: list(BASE_RESOURCE_CHECKS)) observability_checks: Sequence[str] = field( default_factory=lambda: list(BASE_OBSERVABILITY_CHECKS) ) secrets_checks: Sequence[str] = field(default_factory=lambda: list(BASE_SECRETS_CHECKS)) api_design_checks: Sequence[str] = field(default_factory=lambda: list(BASE_API_DESIGN_CHECKS)) def __post_init__(self) -> None: self._seen: dict[str, set[str]] = { category: set(values) for category, values in _guideline_categories(self) } def merge_from(self, other: GuidelineSource) -> None: source_items = dict(_guideline_categories(other)) for category, target in _guideline_categories(self): items = source_items[category] if not items: continue mutable_target = cast("list[str]", target) seen = self._seen[category] for item in items: if item not in seen: seen.add(item) mutable_target.append(item) def summary(self) -> str: return ( f"{len(self.quality_checks)} quality checks, " f"{len(self.security_checks)} security checks, " f"{len(self.anti_patterns)} anti-patterns" ) def total_checks(self) -> int: return sum(len(items) for _, items in _guideline_categories(self))
[docs] def get_stack_guidelines(workspace: Workspace, root: str = "") -> StackGuidelines: """Build merged review guidelines from the stack detected in ``workspace``.""" stack = _detect_stack_with_workspace(workspace, root) languages = [stack.primary_language, *stack.secondary_languages] normalized = [lang for lang in languages if lang] typescript_enabled = any(lang == "TypeScript" for lang in normalized) normalized = [lang for lang in normalized if lang != "TypeScript"] guidelines = StackGuidelines() processed: set[str] = set() for language in normalized: key = _language_group(language) if key in processed: continue handler = LANGUAGE_HANDLERS.get(key) if not handler: continue frameworks = _frameworks_for_language(key, stack.frameworks) guidelines.merge_from(handler(frameworks, typescript_enabled)) processed.add(key) return guidelines
def _detect_stack_with_workspace(workspace: Workspace, root: str) -> DetectedStack: try: module = import_module("ralph.language_detector") except ImportError: return _fallback_detect_stack(workspace) module_dict: dict[str, object | None] = module.__dict__ detector_candidate = module_dict.get("detect_stack_with_workspace") if detector_candidate is None or not callable(detector_candidate): return _fallback_detect_stack(workspace) detector = cast("_StackDetector", detector_candidate) stack_candidate = detector(workspace, root) if not isinstance(stack_candidate, _DetectedStackLike): return _fallback_detect_stack(workspace) return _stack_from_detected(stack_candidate) def _fallback_detect_stack(workspace: Workspace) -> DetectedStack: languages: list[str] = [] frameworks: list[str] = [] for signature, language in SIGNATURE_LANGUAGE_MAP.items(): if not workspace.exists(signature): continue languages.append(language) frameworks.extend(_frameworks_for_signature(workspace, signature, language)) if not languages: return DetectedStack() unique_languages = _dedupe_strings(languages) return DetectedStack( primary_language=unique_languages[0], secondary_languages=unique_languages[1:], frameworks=_dedupe_strings(frameworks), ) def _frameworks_for_signature(workspace: Workspace, signature: str, language: str) -> list[str]: try: content = workspace.read(signature).casefold() except FileNotFoundError: return [] return [ framework for framework in LANGUAGE_FRAMEWORK_MAP.get(_language_group(language), ()) if framework.casefold() in content ]