Source code for ralph.git.operations

"""Git operations for ralph pipeline via GitPython.

This module provides a high-level interface for git operations,
wrapping GitPython to provide the functionality needed by the pipeline.
"""

from __future__ import annotations

import re
import time
from pathlib import Path
from typing import TYPE_CHECKING, cast

from git import Actor, InvalidGitRepositoryError, Repo
from git.exc import GitCommandError
from loguru import logger

if TYPE_CHECKING:
    from collections.abc import Callable

_LOCK_PATH_PATTERN = re.compile(r"Unable to create '([^']+\.lock)'")
_RECOVERABLE_GIT_LOCK_FILES = frozenset({"index.lock", "HEAD.lock", "packed-refs.lock"})
_STALE_GIT_LOCK_AGE_SECONDS = 10.0


[docs] class GitOperationError(Exception): """Raised when a git operation fails. Attributes: operation: Name of the operation that failed. message: Error message describing the failure. """ def __init__(self, operation: str, message: str) -> None: """Initialize git operation error. Args: operation: Name of the operation. message: Error message. """ self.operation = operation self.message = message super().__init__(f"Git {operation} failed: {message}")
def _close_repo(repo: Repo | None) -> None: close = cast("Callable[[], object] | None", getattr(repo, "close", None)) if callable(close): close() def _parse_lock_path_from_error(error_text: str) -> Path | None: match = _LOCK_PATH_PATTERN.search(error_text) if match is None: return None lock_path = Path(match.group(1)) if lock_path.name not in _RECOVERABLE_GIT_LOCK_FILES: return None return lock_path def _recover_stale_git_lock( operation: str, error: Exception, *, stale_lock_age_seconds: float = _STALE_GIT_LOCK_AGE_SECONDS, ) -> bool: error_text = str(error) lock_path = _parse_lock_path_from_error(error_text) if lock_path is None: return False if lock_path.exists(): age_seconds = time.time() - lock_path.stat().st_mtime if age_seconds < stale_lock_age_seconds: return False try: lock_path.unlink() except OSError: return False logger.warning( "Recovered stale git lock for {} by removing {} (age={:.1f}s)", operation, lock_path, age_seconds, ) return True logger.warning( "Retrying {} after transient git lock contention; lock path already disappeared: {}", operation, lock_path, ) return True def _run_git_operation_with_stale_lock_recovery[T]( operation: str, action: Callable[[], T], ) -> T: try: return action() except GitCommandError as exc: if not _recover_stale_git_lock(operation, exc): raise return action()
[docs] def find_repo_root(start: Path | str = Path()) -> Path: """Locate git repo root from start path. Args: start: Starting path for the search. Returns: Path to the repository root. Raises: GitOperationError: If not inside a git repository. """ repo: Repo | None = None try: repo = Repo(start, search_parent_directories=True) if repo.working_tree_dir: return Path(repo.working_tree_dir).resolve() return Path(repo.working_dir).resolve() except InvalidGitRepositoryError as exc: raise GitOperationError("find_repo_root", "Not inside a git repository") from exc finally: _close_repo(repo)
[docs] def find_main_worktree_root(start: Path | str = Path()) -> Path: """Find the primary worktree root for the current repository. For linked worktrees, this resolves to the main checkout that owns the shared git common directory. For ordinary repositories, it matches the active repository root. This helper detects linked git worktrees only as a workspace-root resolver and is NEVER used by the same-workspace parallel worker path. Parallel v1 workers always share the canonical repo_root; this function MUST NOT be invoked by ``ralph.pipeline.parallel.*`` modules. Callers in that package violate the same-workspace isolation contract. """ repo: Repo | None = None try: repo = Repo(start, search_parent_directories=True) common_dir = Path(repo.common_dir).resolve() return common_dir.parent.resolve() except InvalidGitRepositoryError as exc: raise GitOperationError("find_main_worktree_root", "Not inside a git repository") from exc finally: _close_repo(repo)
[docs] def is_repo_clean(repo_root: Path | str) -> bool: """Check if repository has uncommitted changes. Args: repo_root: Path to the repository root. Returns: True if repository is clean (no uncommitted changes). """ repo: Repo | None = None try: repo = Repo(repo_root) return not repo.is_dirty() finally: _close_repo(repo)
[docs] def has_uncommitted_changes(repo_root: Path | str) -> bool: """Return True when the working tree has uncommitted work. Includes staged diff, unstaged diff, and untracked files. This is the authoritative skip check for commit phases: if this returns False, there is literally nothing for a commit agent to package up. """ repo: Repo | None = None try: repo = Repo(repo_root) return repo.is_dirty(untracked_files=True) finally: _close_repo(repo)
[docs] def has_commits_since(repo_root: Path | str, baseline_sha: str | None) -> bool: """Return True when HEAD is ahead of ``baseline_sha``. When ``baseline_sha`` is None the caller has no prior baseline (first run), so we conservatively return True to allow the caller to proceed. """ if baseline_sha is None: return True repo: Repo | None = None try: repo = Repo(repo_root) return any(True for _ in repo.iter_commits(f"{baseline_sha}..HEAD")) except Exception: return True finally: _close_repo(repo)
[docs] def has_staged_changes(repo_root: Path | str) -> bool: """Check if repository has staged changes. Args: repo_root: Path to the repository root. Returns: True if there are staged changes. """ repo: Repo | None = None try: repo = Repo(repo_root) return bool(repo.index.diff("HEAD")) or bool(repo.untracked_files) finally: _close_repo(repo)
[docs] def get_staged_files(repo_root: Path | str) -> list[str]: """Get list of staged files. Args: repo_root: Path to the repository root. Returns: List of staged file paths. """ repo: Repo | None = None try: repo = Repo(repo_root) staged = repo.index.diff("HEAD") return [item.a_path for item in staged if item.a_path] if staged else [] finally: _close_repo(repo)
[docs] def stage_all(repo_root: Path | str) -> None: """Stage all changes (git add -A). Args: repo_root: Path to the repository root. """ repo: Repo | None = None try: repo = Repo(repo_root) def _stage() -> None: _ = cast("str", repo.git.add(A=True)) _run_git_operation_with_stale_lock_recovery("stage_all", _stage) logger.debug("Staged all changes in {}", repo_root) except Exception as exc: raise GitOperationError("stage_all", str(exc)) from exc finally: _close_repo(repo)
[docs] def stage_files(repo_root: Path | str, files: list[str]) -> None: """Stage only the provided repository-relative paths. Uses ``git add --all -- <paths>`` so modified, untracked, and deleted files are all handled consistently for the selected scope. """ if not files: logger.debug("No files requested for selective staging in {}", repo_root) return repo: Repo | None = None try: repo = Repo(repo_root) def _stage() -> None: _ = cast("str", repo.git.add("--all", "--", *files)) _run_git_operation_with_stale_lock_recovery( "stage_files", _stage ) logger.debug("Staged {} selected paths in {}", len(files), repo_root) except Exception as exc: raise GitOperationError("stage_files", str(exc)) from exc finally: _close_repo(repo)
[docs] def create_commit( repo_root: Path | str, message: str, author_name: str | None = None, author_email: str | None = None, ) -> str: """Create a git commit. Args: repo_root: Path to the repository root. message: Commit message. author_name: Optional author name override. author_email: Optional author email override. Returns: SHA of the created commit. Raises: GitOperationError: If commit fails. """ repo: Repo | None = None try: repo = Repo(repo_root) if not author_name or not author_email: try: config = repo.config_reader() author_name = author_name or str(config.get_value("user", "name", "Ralph")) author_email = author_email or str(config.get_value("user", "email", "ralph@ai")) except Exception: author_name = author_name or "Ralph" author_email = author_email or "ralph@ai" actor = Actor(author_name, author_email) commit = _run_git_operation_with_stale_lock_recovery( "create_commit", lambda: repo.index.commit(message, author=actor, committer=actor), ) logger.info( "Created commit {}: {}", commit.hexsha[:8], message.splitlines()[0] if message else "(no message)", ) return commit.hexsha except Exception as exc: raise GitOperationError("create_commit", str(exc)) from exc finally: _close_repo(repo)
[docs] def push( repo_root: Path | str, remote: str = "origin", branch: str | None = None, ) -> None: """Push current branch to remote. Args: repo_root: Path to the repository root. remote: Remote name to push to. branch: Optional branch name override. Raises: GitOperationError: If push fails. """ repo: Repo | None = None try: repo = Repo(repo_root) active_branch = branch or repo.active_branch.name repo.remote(remote).push(active_branch) logger.info("Pushed {} to {}/{}", active_branch, remote, active_branch) except Exception as exc: raise GitOperationError("push", str(exc)) from exc finally: _close_repo(repo)
[docs] def get_head_sha(repo_root: Path | str) -> str: """Return current HEAD commit SHA. Args: repo_root: Path to the repository root. Returns: SHA of the current HEAD commit. """ repo: Repo | None = None try: repo = Repo(repo_root) return repo.head.commit.hexsha finally: _close_repo(repo)
[docs] def merge_base( repo_root: Path | str, ref_a: str, ref_b: str, ) -> str: """Return merge-base SHA between two refs. Args: repo_root: Path to the repository root. ref_a: First ref (branch, tag, SHA). ref_b: Second ref (branch, tag, SHA). Returns: SHA of the merge base commit. Raises: GitOperationError: If merge base cannot be determined. """ repo: Repo | None = None try: repo = Repo(repo_root) bases = repo.merge_base(ref_a, ref_b) if not bases: msg = f"No merge base between {ref_a} and {ref_b}" raise GitOperationError("merge_base", msg) return bases[0].hexsha except Exception as exc: raise GitOperationError("merge_base", str(exc)) from exc finally: _close_repo(repo)
[docs] def get_current_branch(repo_root: Path | str) -> str: """Get the current branch name. Args: repo_root: Path to the repository root. Returns: Name of the current branch. """ repo: Repo | None = None try: repo = Repo(repo_root) return repo.active_branch.name finally: _close_repo(repo)
[docs] def get_commits_between( repo_root: Path | str, from_ref: str, to_ref: str, ) -> list[str]: """Get list of commit SHAs between two refs. Args: repo_root: Path to the repository root. from_ref: Starting ref (exclusive). to_ref: Ending ref (inclusive). Returns: List of commit SHAs in reverse chronological order. """ repo: Repo | None = None try: repo = Repo(repo_root) commits = repo.iter_commits(f"{from_ref}..{to_ref}") return [c.hexsha for c in commits] finally: _close_repo(repo)
[docs] def append_to_gitignore(repo_root: Path | str, patterns: list[str]) -> None: """Append patterns to .gitignore. Args: repo_root: Path to the repository root. patterns: List of patterns to add. """ gitignore_path = Path(repo_root) / ".gitignore" existing = set() if gitignore_path.exists(): existing = set(gitignore_path.read_text().splitlines()) new_patterns = [p for p in patterns if p not in existing] if new_patterns: with gitignore_path.open("a", encoding="utf-8") as f: if new_patterns[0]: f.write("\n") f.write("\n".join(new_patterns)) logger.debug("Appended {} patterns to .gitignore", len(new_patterns))