Source code for ralph.files.operations

"""File capture and state-tracking helpers for Ralph checkpoints."""

from __future__ import annotations

import hashlib
from dataclasses import dataclass
from pathlib import Path

from ralph.files._file_snapshot import FileSnapshot
from ralph.files._file_state_issue import FileStateIssue
from ralph.files._file_state_kind import FileStateKind

DEFAULT_TRACKED_FILES: tuple[Path, ...] = (
    Path("PROMPT.md"),
    Path(".agent/PLAN.md"),
    Path(".agent/ISSUES.md"),
    Path(".agent/DEVELOPMENT_RESULT.md"),
    Path(".agent/FIX_RESULT.md"),
    Path(".agent/DEVELOPMENT_ANALYSIS_DECISION.md"),
    Path(".agent/REVIEW_ANALYSIS_DECISION.md"),
    Path(".agent/config.toml"),
    Path(".agent/start_commit"),
    Path(".agent/NOTES.md"),
    Path(".agent/status"),
)

_READ_CHUNK_SIZE = 1024 * 1024


[docs] @dataclass(frozen=True) class FileSystemState: """Snapshots for tracked Ralph files rooted at a workspace path.""" root: Path files: dict[Path, FileSnapshot]
[docs] def calculate_checksum(path: Path | str) -> str: """Return the SHA-256 checksum for a file.""" digest = hashlib.sha256() with Path(path).open("rb") as handle: for chunk in iter(lambda: handle.read(_READ_CHUNK_SIZE), b""): digest.update(chunk) return digest.hexdigest()
[docs] def capture_file_snapshot(path: Path | str, *, root: Path | str | None = None) -> FileSnapshot: """Capture the current state of a file relative to a workspace root.""" candidate = Path(path) workspace_root = Path(root) if root is not None else candidate.parent relative_path = _relative_path(candidate, workspace_root) if not candidate.exists(): return FileSnapshot(path=relative_path, checksum="", size=0, exists=False) return FileSnapshot( path=relative_path, checksum=calculate_checksum(candidate), size=candidate.stat().st_size, exists=True, )
[docs] def capture_file_system_state( root: Path | str, *, tracked_paths: list[Path | str] | tuple[Path | str, ...] = DEFAULT_TRACKED_FILES, ) -> FileSystemState: """Capture snapshots for tracked files under a workspace root.""" workspace_root = Path(root) snapshots = { relative_path: capture_file_snapshot(workspace_root / relative_path, root=workspace_root) for relative_path in (_relative_path(Path(path), workspace_root) for path in tracked_paths) } return FileSystemState(root=workspace_root, files=snapshots)
[docs] def validate_file_system_state( state: FileSystemState, root: Path | str | None = None, ) -> list[FileStateIssue]: """Compare current tracked files with a captured checkpoint snapshot.""" workspace_root = Path(root) if root is not None else state.root issues: list[FileStateIssue] = [] for relative_path, snapshot in state.files.items(): current_snapshot = capture_file_snapshot( workspace_root / relative_path, root=workspace_root, ) if snapshot.exists and not current_snapshot.exists: issues.append(FileStateIssue(kind=FileStateKind.MISSING, path=relative_path)) continue if not snapshot.exists and current_snapshot.exists: issues.append(FileStateIssue(kind=FileStateKind.UNEXPECTED, path=relative_path)) continue if ( snapshot.exists and current_snapshot.exists and current_snapshot.checksum != snapshot.checksum ): issues.append(FileStateIssue(kind=FileStateKind.CHANGED, path=relative_path)) return issues
def _relative_path(path: Path, root: Path) -> Path: """Normalize a path to be relative to the workspace root when possible.""" if path.is_absolute(): return path.relative_to(root) return path __all__ = [ "DEFAULT_TRACKED_FILES", "FileSnapshot", "FileStateIssue", "FileStateKind", "FileSystemState", "calculate_checksum", "capture_file_snapshot", "capture_file_system_state", "validate_file_system_state", ]