Source code for ralph.git.wrapper

"""Git wrapper helpers for blocking commits during agent phases."""

from __future__ import annotations

from pathlib import Path

from git import GitCommandError, Repo

MARKER_FILENAME = "no_agent_commit"
TRACK_FILENAME = "git-wrapper-dir.txt"
HEAD_OID_FILENAME = "head-oid.txt"
HOOKS_STATE_FILENAME = "hooks-path-state"
HOOKS_DIR_NAME = "hooks"


[docs] class GitHelpers: """Simple placeholder for Git wrapper state tracking.""" real_git: Path | None wrapper_dir: Path | None wrapper_repo_root: Path | None def __init__(self) -> None: self.real_git = None self.wrapper_dir = None self.wrapper_repo_root = None
[docs] def start_agent_phase(repo_root: Path | str, helpers: GitHelpers | None = None) -> None: """Enable git protections for an agent phase.""" repo = Repo(repo_root) helpers = helpers or GitHelpers() helpers.wrapper_repo_root = Path(repo_root) ralph_dir = _ensure_ralph_dir(repo) helpers.wrapper_dir = ralph_dir _write_marker(ralph_dir) _write_track_file(ralph_dir) _capture_head_oid(repo, ralph_dir) _store_previous_hooks_path(repo, ralph_dir) _set_hooks_path(repo, ralph_dir)
[docs] def end_agent_phase(repo_root: Path | str, helpers: GitHelpers | None = None) -> None: """Remove agent-phase protections and restore git state.""" repo = Repo(repo_root) helpers = helpers or GitHelpers() helpers.wrapper_repo_root = Path(repo_root) ralph_dir = _ralph_dir_from_repo(repo) _restore_hooks_path(repo, ralph_dir) _remove_marker(ralph_dir) _remove_head_oid(ralph_dir) _remove_track_file(ralph_dir)
[docs] def detect_unauthorized_commit(repo_root: Path | str) -> bool: """Return True if the HEAD OID no longer matches the stored baseline.""" repo = Repo(repo_root) ralph_dir = _ralph_dir_from_repo(repo) head_file = ralph_dir / HEAD_OID_FILENAME if not head_file.exists(): return False stored_oid = head_file.read_text().strip() if not stored_oid: return False try: current_head = repo.head.commit.hexsha except (ValueError, GitCommandError): return False return current_head != stored_oid
def _ensure_ralph_dir(repo: Repo) -> Path: git_dir = Path(repo.git_dir) ralph_dir = git_dir / "ralph" ralph_dir.mkdir(parents=True, exist_ok=True) return ralph_dir def _ralph_dir_from_repo(repo: Repo) -> Path: return Path(repo.git_dir) / "ralph" def _write_marker(ralph_dir: Path) -> None: (ralph_dir / MARKER_FILENAME).write_text("") def _write_track_file(ralph_dir: Path) -> None: (ralph_dir / TRACK_FILENAME).write_text(str(ralph_dir)) def _capture_head_oid(repo: Repo, ralph_dir: Path) -> None: try: oid = repo.head.commit.hexsha except (ValueError, GitCommandError): return (ralph_dir / HEAD_OID_FILENAME).write_text(f"{oid}\n") def _set_hooks_path(repo: Repo, ralph_dir: Path) -> None: hooks_dir = ralph_dir / HOOKS_DIR_NAME hooks_dir.mkdir(parents=True, exist_ok=True) repo.git.config("--local", "core.hooksPath", str(hooks_dir)) def _read_hooks_path(repo: Repo) -> str | None: try: value = repo.git.config("--local", "--get", "core.hooksPath") except GitCommandError as exc: if exc.status == 1: return None raise return value.strip() def _store_previous_hooks_path(repo: Repo, ralph_dir: Path) -> None: state_path = ralph_dir / HOOKS_STATE_FILENAME if state_path.exists(): return hooks_value = _read_hooks_path(repo) if hooks_value is None: state_path.write_text("missing\n") else: state_path.write_text(f"value\n{hooks_value}\n") def _restore_hooks_path(repo: Repo, ralph_dir: Path) -> None: state_path = ralph_dir / HOOKS_STATE_FILENAME if not state_path.exists(): return lines = state_path.read_text().splitlines() if not lines: _unset_hooks_path(repo) state_path.unlink() return if lines[0] == "missing": _unset_hooks_path(repo) elif lines[0] == "value" and len(lines) > 1: repo.git.config("--local", "core.hooksPath", lines[1]) state_path.unlink() def _unset_hooks_path(repo: Repo) -> None: missing_key_status = 5 try: repo.git.config("--local", "--unset-all", "core.hooksPath") except GitCommandError as exc: if exc.status == missing_key_status: return raise def _remove_marker(ralph_dir: Path) -> None: path = ralph_dir / MARKER_FILENAME if path.exists(): path.unlink() def _remove_head_oid(ralph_dir: Path) -> None: path = ralph_dir / HEAD_OID_FILENAME if path.exists(): path.unlink() def _remove_track_file(ralph_dir: Path) -> None: path = ralph_dir / TRACK_FILENAME if path.exists(): path.unlink()