"""Precondition validation before performing a git rebase."""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
from git import InvalidGitRepositoryError, Repo
from git.exc import GitCommandError
from ralph.git.rebase._concurrent_operation import _ConcurrentOperation
if TYPE_CHECKING:
from collections.abc import Sequence
from git.config import GitConfigParser
REBASE_APPLY_DIR = "rebase-apply"
REBASE_MERGE_DIR = "rebase-merge"
_LOCK_FILES = ("index.lock", "packed-refs.lock", "HEAD.lock")
[docs]
class RebasePreconditionError(Exception):
"""Raised when a rebase cannot start because a precondition failed."""
[docs]
def check_rebase_preconditions(repo_root: Path | str) -> None:
"""Ensure the git repository is ready to start a rebase.
Args:
repo_root: Path to the git repository.
Raises:
RebasePreconditionError: When the repository is not ready to rebase.
"""
repo = _open_repo(repo_root)
_validate_git_state(repo)
concurrent = _detect_concurrent_operation(repo)
if concurrent:
raise RebasePreconditionError(
f"Cannot start rebase: {concurrent.description} already in progress. "
"Please complete or abort the current operation first."
)
_ensure_git_identity(repo)
_ensure_clean_worktree(repo)
_check_shallow_clone(repo)
_check_worktree_conflicts(repo)
_check_submodule_state(repo)
_check_sparse_checkout_state(repo)
def _open_repo(repo_root: Path | str) -> Repo:
try:
return Repo(repo_root)
except InvalidGitRepositoryError as exc:
raise RebasePreconditionError(f"Not a git repository: {exc}") from exc
def _validate_git_state(repo: Repo) -> None:
try:
head = repo.head
except (GitCommandError, ValueError, TypeError) as exc:
raise RebasePreconditionError(f"Repository HEAD is invalid: {exc}") from exc
try:
commit = head.commit
_ = commit.tree
except (GitCommandError, ValueError, OSError) as exc:
raise RebasePreconditionError(f"Object database corruption: {exc}") from exc
try:
_ = repo.index
except (GitCommandError, OSError) as exc:
raise RebasePreconditionError(f"Repository index is corrupted: {exc}") from exc
def _detect_concurrent_operation(repo: Repo) -> _ConcurrentOperation | None:
git_dir = _git_dir(repo)
if (git_dir / REBASE_MERGE_DIR).exists() or (git_dir / REBASE_APPLY_DIR).exists():
return _ConcurrentOperation("rebase", "rebase")
checks: Sequence[tuple[str, str]] = (
("MERGE_HEAD", "merge"),
("CHERRY_PICK_HEAD", "cherry-pick"),
("REVERT_HEAD", "revert"),
)
for filename, label in checks:
if (git_dir / filename).exists():
return _ConcurrentOperation(label, label)
bisect_files = ("BISECT_LOG", "BISECT_START", "BISECT_NAMES")
if any((git_dir / path).exists() for path in bisect_files):
return _ConcurrentOperation("bisect", "bisect")
if any((git_dir / lock_file).exists() for lock_file in _LOCK_FILES):
return _ConcurrentOperation("lock", "another Git process")
for entry in git_dir.iterdir():
name = entry.name
if any(keyword in name for keyword in ("REBASE", "MERGE", "CHERRY")):
return _ConcurrentOperation("unknown", f"unknown operation: {name}")
return None
def _ensure_git_identity(repo: Repo) -> None:
reader = repo.config_reader()
username = _read_config_value(reader, "user", "name")
email = _read_config_value(reader, "user", "email")
if not username or not email:
raise RebasePreconditionError(
"Git identity is not configured. Please set user.name and user.email:\n "
'git config --global user.name "Your Name"\n '
'git config --global user.email "you@example.com"',
)
def _ensure_clean_worktree(repo: Repo) -> None:
if repo.is_dirty(untracked_files=True, submodules=True):
raise RebasePreconditionError(
"Working tree is not clean. Please commit or stash changes before rebasing."
)
def _check_shallow_clone(repo: Repo) -> None:
shallow = _git_dir(repo) / "shallow"
if shallow.exists():
try:
content = shallow.read_text()
except OSError as exc:
raise RebasePreconditionError(f"Failed to read shallow clone metadata: {exc}") from exc
line_count = len(content.splitlines())
raise RebasePreconditionError(
f"Repository is a shallow clone with {line_count} commits. "
"Rebasing may fail due to missing history. "
"Consider running: git fetch --unshallow"
)
def _check_worktree_conflicts(repo: Repo) -> None:
if repo.head.is_detached:
return
try:
branch_name = repo.active_branch.name
except (TypeError, GitCommandError):
return
worktrees_dir = _git_dir(repo) / "worktrees"
if not worktrees_dir.is_dir():
return
target_ref = f"refs/heads/{branch_name}"
for entry in worktrees_dir.iterdir():
if not entry.is_dir():
continue
head_file = entry / "HEAD"
if not head_file.exists():
continue
try:
content = head_file.read_text()
except OSError:
continue
if target_ref in content:
raise RebasePreconditionError(
f"Branch '{branch_name}' is already checked out in worktree '{entry.name}'. "
"Use 'git worktree add' to create a new worktree for this branch."
)
def _check_submodule_state(repo: Repo) -> None:
workdir = _worktree(repo)
gitmodules = workdir / ".gitmodules"
if not gitmodules.exists():
return
modules_dir = _git_dir(repo) / "modules"
if not modules_dir.exists():
raise RebasePreconditionError(
"Submodules are not initialized. Run: git submodule update --init --recursive"
)
content = gitmodules.read_text()
if "path =" not in content:
return
for line in content.splitlines():
if "path =" not in line:
continue
_, _, remainder = line.partition("path =")
path_value = remainder.strip()
if not path_value:
continue
submodule_path = workdir / path_value
if not submodule_path.exists():
raise RebasePreconditionError(
f"Submodule '{path_value}' is not initialized. Run: "
"git submodule update --init --recursive"
)
def _check_sparse_checkout_state(repo: Repo) -> None:
reader = repo.config_reader()
sparse_enabled = _config_value_as_bool(reader, "core", "sparseCheckout")
cone_enabled = _config_value_as_bool(reader, "extensions", "sparseCheckoutCone")
if not (sparse_enabled or cone_enabled):
return
info_sparse = _git_dir(repo) / "info" / "sparse-checkout"
if not info_sparse.exists():
raise RebasePreconditionError(
"Sparse checkout is enabled but not configured. Run: git sparse-checkout init"
)
if not info_sparse.read_text().strip():
raise RebasePreconditionError(
"Sparse checkout configuration is empty. Run: git sparse-checkout set <patterns>"
)
def _read_config_value(reader: GitConfigParser, section: str, key: str) -> str | None:
try:
value = reader.get_value(section, key)
except Exception:
return None
if not value:
return None
if isinstance(value, bool):
return "true" if value else "false"
stripped = str(value).strip()
return stripped or None
def _config_value_as_bool(reader: GitConfigParser, section: str, key: str) -> bool:
value = _read_config_value(reader, section, key)
if value is None:
return False
return value.lower() in ("true", "1", "yes", "on")
def _git_dir(repo: Repo) -> Path:
git_dir = repo.git_dir
if not git_dir:
raise RebasePreconditionError("Cannot determine .git directory for repository")
return Path(git_dir)
def _worktree(repo: Repo) -> Path:
if repo.working_tree_dir:
return Path(repo.working_tree_dir)
return _git_dir(repo).parent