Source code for ralph.git.rebase.rebase_kinds

"""Classification helpers for Git rebase outcomes."""

from __future__ import annotations

from collections.abc import Callable, Mapping
from dataclasses import dataclass, field
from types import MappingProxyType
from typing import cast

from ralph.git.rebase._rebase_kind import RebaseKind


[docs] @dataclass(frozen=True) class RebaseErrorKind: """Payload for a classified rebase failure.""" kind: RebaseKind metadata: Mapping[str, object] = field(default_factory=dict) def __post_init__(self) -> None: object.__setattr__(self, "metadata", _freeze_metadata(self.metadata))
def _freeze_metadata(metadata: Mapping[str, object]) -> Mapping[str, object]: # Copy to prevent modification of the original dictionary before freezing. metadata_dict: dict[str, object] = dict(metadata) return cast("Mapping[str, object]", MappingProxyType(metadata_dict))
[docs] def classify_rebase_error(stderr: str, stdout: str) -> RebaseErrorKind: """Translate git rebase stderr/stdout into a concrete rebase failure kind.""" output = f"{stderr}\n{stdout}".strip() for classifier in _REBASE_CLASSIFIERS: result = classifier(output) if result is not None: return result return RebaseErrorKind( kind=RebaseKind.UNKNOWN, metadata={"details": _extract_error_line(output)}, )
def _classify_invalid_revision(output: str) -> RebaseErrorKind | None: triggers = [ "invalid revision", "unknown revision", "bad revision", "ambiguous revision", "not found", "does not exist", "no such ref", ] if not any(trigger in output for trigger in triggers): return None revision = _extract_revision(output) or "unknown" return RebaseErrorKind(kind=RebaseKind.INVALID_REVISION, metadata={"revision": revision}) def _classify_shallow_or_missing_history(output: str) -> RebaseErrorKind | None: triggers = ["shallow", "depth", "unreachable", "needed single revision", "does not have"] if not any(trigger in output for trigger in triggers): return None return RebaseErrorKind( kind=RebaseKind.REPOSITORY_CORRUPT, metadata={"details": _extract_error_line(output)}, ) def _classify_worktree_conflict(output: str) -> RebaseErrorKind | None: triggers = ["worktree", "checked out", "another branch", "already checked out"] if not any(trigger in output for trigger in triggers): return None return RebaseErrorKind( kind=RebaseKind.CONCURRENT_OPERATION, metadata={"operation": "branch checked out in another worktree"}, ) def _classify_submodule_conflict(output: str) -> RebaseErrorKind | None: if ".gitmodules" not in output and "submodule" not in output: return None return RebaseErrorKind( kind=RebaseKind.CONTENT_CONFLICT, metadata={"files": _extract_conflict_files(output)}, ) def _classify_dirty_working_tree(output: str) -> RebaseErrorKind | None: triggers = ["dirty", "uncommitted changes", "local changes", "cannot rebase"] if not any(trigger in output for trigger in triggers): return None return RebaseErrorKind(kind=RebaseKind.DIRTY_WORKING_TREE) def _classify_concurrent_operation(output: str) -> RebaseErrorKind | None: triggers = [ "rebase in progress", "merge in progress", "cherry-pick in progress", "revert in progress", "bisect in progress", "Another git process", "Locked", ] if not any(trigger in output for trigger in triggers): return None operation = _extract_operation(output) or "unknown" return RebaseErrorKind(kind=RebaseKind.CONCURRENT_OPERATION, metadata={"operation": operation}) def _classify_repository_corruption(output: str) -> RebaseErrorKind | None: triggers = [ "corrupt", "object not found", "missing object", "invalid object", "bad object", "disk full", "filesystem", ] if not any(trigger in output for trigger in triggers): return None return RebaseErrorKind( kind=RebaseKind.REPOSITORY_CORRUPT, metadata={"details": _extract_error_line(output)}, ) def _classify_environment_failure(output: str) -> RebaseErrorKind | None: triggers = ["user.name", "user.email", "author", "committer", "terminal", "editor"] if not any(trigger in output for trigger in triggers): return None return RebaseErrorKind( kind=RebaseKind.ENVIRONMENT_FAILURE, metadata={"reason": _extract_error_line(output)}, ) def _classify_hook_rejection(output: str) -> RebaseErrorKind | None: if "pre-rebase" not in output and "hook" not in output and "rejected by" not in output: return None return RebaseErrorKind( kind=RebaseKind.HOOK_REJECTION, metadata={"hook_name": _extract_hook_name(output)}, ) def _classify_content_conflict(output: str) -> RebaseErrorKind | None: triggers = ["Conflict", "conflict", "Resolve", "Merge conflict"] if not any(trigger in output for trigger in triggers): return None return RebaseErrorKind( kind=RebaseKind.CONTENT_CONFLICT, metadata={"files": _extract_conflict_files(output)}, ) def _classify_patch_failure(output: str) -> RebaseErrorKind | None: triggers = ["patch does not apply", "patch failed", "hunk failed", "context mismatch", "fuzz"] if not any(trigger in output for trigger in triggers): return None return RebaseErrorKind( kind=RebaseKind.PATCH_APPLICATION_FAILED, metadata={"details": _extract_error_line(output)}, ) def _classify_interactive_stop(output: str) -> RebaseErrorKind | None: triggers = ["Stopped at", "paused", "edit command"] if not any(trigger in output for trigger in triggers): return None return RebaseErrorKind( kind=RebaseKind.INTERACTIVE_STOP, metadata={"command": _extract_command(output)}, ) def _classify_empty_commit(output: str) -> RebaseErrorKind | None: triggers = ["empty", "no changes", "already applied"] if not any(trigger in output for trigger in triggers): return None return RebaseErrorKind(kind=RebaseKind.EMPTY_COMMIT) def _classify_autostash_failure(output: str) -> RebaseErrorKind | None: triggers = ["autostash", "stash"] if not any(trigger in output for trigger in triggers): return None return RebaseErrorKind( kind=RebaseKind.AUTOSTASH_FAILED, metadata={"details": _extract_error_line(output)}, ) def _classify_commit_creation_failure(output: str) -> RebaseErrorKind | None: triggers = [ "pre-commit", "commit-msg", "prepare-commit-msg", "post-commit", "signing", "GPG", ] if not any(trigger in output for trigger in triggers): return None return RebaseErrorKind( kind=RebaseKind.COMMIT_CREATION_FAILED, metadata={"details": _extract_error_line(output)}, ) def _classify_reference_update_failure(output: str) -> RebaseErrorKind | None: triggers = ["cannot lock", "ref update", "packed-refs", "reflog"] if not any(trigger in output for trigger in triggers): return None return RebaseErrorKind( kind=RebaseKind.REFERENCE_UPDATE_FAILED, metadata={"details": _extract_error_line(output)}, ) _Classifier = Callable[[str], RebaseErrorKind | None] _REBASE_CLASSIFIERS: tuple[_Classifier, ...] = ( _classify_invalid_revision, _classify_shallow_or_missing_history, _classify_worktree_conflict, _classify_submodule_conflict, _classify_dirty_working_tree, _classify_concurrent_operation, _classify_repository_corruption, _classify_environment_failure, _classify_hook_rejection, _classify_content_conflict, _classify_patch_failure, _classify_interactive_stop, _classify_empty_commit, _classify_autostash_failure, _classify_commit_creation_failure, _classify_reference_update_failure, ) def _extract_revision(output: str) -> str | None: patterns = [ ("invalid revision '", "'"), ("unknown revision '", "'"), ("bad revision '", "'"), ("branch '", "' not found"), ("upstream branch '", "' not found"), ("revision ", " not found"), ("'", "'"), ] for start, end in patterns: start_idx = output.find(start) if start_idx == -1: continue after = output[start_idx + len(start) :] end_idx = after.find(end) if end_idx == -1: continue candidate = after[:end_idx] if candidate: return candidate for line in output.splitlines(): if "not found" not in line and "does not exist" not in line: continue single_quote_start = line.find("'") if single_quote_start != -1: next_quote = line.find("'", single_quote_start + 1) if next_quote != -1: candidate = line[single_quote_start + 1 : next_quote] if candidate: return candidate double_quote_start = line.find('"') if double_quote_start != -1: next_quote = line.find('"', double_quote_start + 1) if next_quote != -1: candidate = line[double_quote_start + 1 : next_quote] if candidate: return candidate return None def _extract_operation(output: str) -> str | None: mappings = [ ("rebase in progress", "rebase"), ("merge in progress", "merge"), ("cherry-pick in progress", "cherry-pick"), ("revert in progress", "revert"), ("bisect in progress", "bisect"), ] for pattern, name in mappings: if pattern in output: return name return None def _extract_hook_name(output: str) -> str: hooks = ["pre-rebase", "pre-commit", "commit-msg", "post-commit"] for hook in hooks: if hook in output: return hook return "hook" def _extract_command(output: str) -> str: commands = ["edit", "reword", "break", "exec"] for command in commands: if command in output: return command return "unknown" def _extract_error_line(output: str) -> str: for line in output.splitlines(): stripped = line.strip() if not stripped: continue lowered = stripped.lower() if lowered.startswith("hint:") or lowered.startswith("note:"): continue return stripped return output.strip() def _extract_conflict_files(output: str) -> list[str]: files: list[str] = [] for line in output.splitlines(): if not any(keyword in line for keyword in ("CONFLICT", "Conflict", "Merge conflict")): continue marker = line.find("in ") if marker == -1: continue path = line[marker + 3 :].strip() if path: files.append(path) return files __all__ = ["RebaseErrorKind", "RebaseKind", "classify_rebase_error"]