Source code for ralph.mcp.artifacts.commit_message

"""Commit-message artifact helpers.

Canonical commit messages are stored as MCP-style JSON artifacts in
`.agent/tmp/commit_message.json`. The commit artifact content follows a
structured schema with either a `commit` or `skip` variant. A plain-text
mirror in `.agent/tmp/commit-message.txt` is maintained for CLI compatibility.
"""

from __future__ import annotations

import json
import re
from datetime import UTC, datetime
from typing import TYPE_CHECKING, cast

from ralph.mcp.artifacts.file_backend import DEFAULT_FILE_BACKEND, FileBackend
from ralph.mcp.artifacts.store import Artifact

if TYPE_CHECKING:
    from collections.abc import Callable
    from pathlib import Path

COMMIT_MESSAGE_ARTIFACT = ".agent/tmp/commit_message.json"
COMMIT_MESSAGE_TEXT = ".agent/tmp/commit-message.txt"
COMMIT_MESSAGE_TYPE = "commit_message"
COMMIT_MESSAGE_NAME = "commit_message"
_COMMIT_KIND = "commit"
_SKIP_KIND = "skip"
_SKIP_PREFIX = "SKIP:"
_DETAILED_BODY_KEYS = ("body_summary", "body_details", "body_footer")
_EXCLUDED_FILE_REASONS = frozenset({"internal_ignore", "not_task_related", "sensitive", "deferred"})
_COMMIT_SUBJECT_PATTERN = re.compile(
    r"^(feat|fix|docs|refactor|test|style|perf|build|ci|chore)(\([a-z0-9/_-]+\))?(!)?: [a-z0-9].+"
)


def _now_iso() -> str:
    return datetime.now(tz=UTC).isoformat()


[docs] def commit_message_artifact_path(repo_root: Path) -> Path: """Return the canonical artifact JSON path for the given repo root.""" return repo_root / COMMIT_MESSAGE_ARTIFACT
[docs] def commit_message_text_path(repo_root: Path) -> Path: """Return the plain-text mirror path for commit messages.""" return repo_root / COMMIT_MESSAGE_TEXT
[docs] def write_commit_message_artifact( repo_root: Path, message: str | dict[str, object], *, backend: FileBackend = DEFAULT_FILE_BACKEND, now_iso: Callable[[], str] = _now_iso, ) -> None: """Persist a commit message as both a JSON artifact and a plain-text file.""" artifact_path = commit_message_artifact_path(repo_root) text_path = commit_message_text_path(repo_root) backend.mkdir(artifact_path.parent, parents=True, exist_ok=True) backend.mkdir(text_path.parent, parents=True, exist_ok=True) normalized = normalize_commit_message_content(message) timestamp = now_iso() artifact = Artifact( name=COMMIT_MESSAGE_NAME, artifact_type=COMMIT_MESSAGE_TYPE, content=normalized, created_at=timestamp, updated_at=timestamp, ) backend.write_text(artifact_path, json.dumps(artifact.to_dict(), indent=2), encoding="utf-8") backend.write_text(text_path, render_commit_message_content(normalized), encoding="utf-8")
[docs] def read_commit_message_artifact( repo_root: Path, *, backend: FileBackend = DEFAULT_FILE_BACKEND ) -> str | None: """Read the commit message from the canonical artifact, falling back to the text file.""" artifact_path = commit_message_artifact_path(repo_root) if backend.exists(artifact_path): parsed = _read_commit_message_text_from_json_path(artifact_path, backend=backend) if parsed is not None: return parsed text_path = commit_message_text_path(repo_root) if not backend.exists(text_path): return None contents = backend.read_text(text_path, encoding="utf-8").strip() return contents or None
[docs] def read_commit_message_from_path( message_file: Path, *, backend: FileBackend = DEFAULT_FILE_BACKEND ) -> str | None: """Read a commit message from an arbitrary file path (JSON or plain text).""" payload = read_commit_message_payload_from_path(message_file, backend=backend) if payload is not None: return render_commit_message_content(payload) if not backend.exists(message_file) or message_file.suffix == ".json": return None contents = backend.read_text(message_file, encoding="utf-8").strip() return contents or None
[docs] def read_commit_message_payload_from_path( message_file: Path, *, backend: FileBackend = DEFAULT_FILE_BACKEND ) -> dict[str, object] | None: """Read and normalize a commit message payload from JSON or plain text.""" if message_file.suffix == ".json": if not backend.exists(message_file): return None return _read_commit_message_payload_from_json_path(message_file, backend=backend) if not backend.exists(message_file): return None contents = backend.read_text(message_file, encoding="utf-8").strip() if not contents: return None try: return normalize_commit_message_content(contents) except ValueError: return None
_LEGACY_STALE_GLOBS = ( "commit_message.xml.processed", "commit_message.xsd", "commit_diff.txt", "commit_diff.model_safe.txt", )
[docs] def delete_commit_message_artifacts( repo_root: Path, *, backend: FileBackend = DEFAULT_FILE_BACKEND ) -> None: """Remove all commit message artifacts and legacy stale files.""" for path in (commit_message_artifact_path(repo_root), commit_message_text_path(repo_root)): if backend.exists(path): backend.unlink(path) tmp_dir = repo_root / ".agent" / "tmp" for name in _LEGACY_STALE_GLOBS: stale = tmp_dir / name if backend.exists(stale): backend.unlink(stale)
[docs] def normalize_commit_message_content(content: str | dict[str, object]) -> dict[str, object]: """Validate and normalize a commit message payload to a canonical dict form.""" if isinstance(content, str): stripped = content.strip() if not stripped: raise ValueError("commit_message content cannot be empty") if stripped.upper().startswith(_SKIP_PREFIX): reason = stripped[len(_SKIP_PREFIX) :].strip() if not reason: raise ValueError("skip commit_message content requires a reason") return {"type": _SKIP_KIND, "reason": reason} _validate_commit_subject(stripped) return {"type": _COMMIT_KIND, "subject": stripped} if not isinstance(content, dict): raise ValueError("commit_message content must be a dictionary") legacy_message = content.get("message") if isinstance(legacy_message, str) and legacy_message.strip(): return normalize_commit_message_content(legacy_message) if "message" in content: raise ValueError("legacy commit_message payload must use a non-empty 'message' string") kind = _required_string_field(content, "type") if kind == _COMMIT_KIND: return _normalize_commit_payload(content) if kind == _SKIP_KIND: reason = _required_string_field(content, "reason") _reject_unknown_fields(content, {"type", "reason"}) return {"type": _SKIP_KIND, "reason": reason} raise ValueError("commit_message content type must be 'commit' or 'skip'")
[docs] def render_commit_message_content(content: dict[str, object]) -> str: """Render normalized commit message content as a plain-text commit message string.""" normalized = normalize_commit_message_content(content) kind = cast("str", normalized["type"]) if kind == _SKIP_KIND: return f"{_SKIP_PREFIX} {cast('str', normalized['reason'])}" subject = cast("str", normalized["subject"]) body = _render_commit_body(normalized) return subject if not body else f"{subject}\n\n{body}"
def _read_commit_message_text_from_json_path( message_file: Path, *, backend: FileBackend = DEFAULT_FILE_BACKEND ) -> str | None: payload = _read_commit_message_payload_from_json_path(message_file, backend=backend) if payload is None: return None return render_commit_message_content(payload) def _read_commit_message_payload_from_json_path( message_file: Path, *, backend: FileBackend = DEFAULT_FILE_BACKEND ) -> dict[str, object] | None: try: payload = cast( "dict[str, object]", json.loads(backend.read_text(message_file, encoding="utf-8")) ) except (TypeError, json.JSONDecodeError): return None try: artifact = Artifact.from_dict(payload) except (KeyError, TypeError, ValueError): return _normalize_raw_commit_message_payload(payload) if artifact.artifact_type != COMMIT_MESSAGE_TYPE: return _normalize_raw_commit_message_payload(payload) try: return normalize_commit_message_content(artifact.content) except ValueError: return _normalize_raw_commit_message_payload(payload) def _render_raw_commit_message_payload(payload: dict[str, object]) -> str | None: normalized = _normalize_raw_commit_message_payload(payload) if normalized is None: return None return render_commit_message_content(normalized) def _normalize_raw_commit_message_payload(payload: dict[str, object]) -> dict[str, object] | None: try: return normalize_commit_message_content(payload) except ValueError: return None def _normalize_commit_payload(content: dict[str, object]) -> dict[str, object]: normalized: dict[str, object] = { "type": _COMMIT_KIND, "subject": _required_string_field(content, "subject"), } body = _optional_string_field(content, "body") detailed_values = { key: value for key in _DETAILED_BODY_KEYS if (value := _optional_string_field(content, key)) is not None } if body is not None and detailed_values: raise ValueError("Use either 'body' or the detailed body fields, not both") if body is not None: normalized["body"] = body normalized.update(detailed_values) files = _optional_string_list(content, "files") if files is not None: if not files: raise ValueError("commit_message 'files' must not be empty when provided") normalized["files"] = files excluded_files = _optional_excluded_files(content) if excluded_files is not None: normalized["excluded_files"] = excluded_files allowed_fields = {"type", "subject", "body", *_DETAILED_BODY_KEYS, "files", "excluded_files"} _reject_unknown_fields(content, allowed_fields) return normalized def _render_commit_body(content: dict[str, object]) -> str: body = _optional_string_field(content, "body") if body is not None: return body sections = [ value for key in _DETAILED_BODY_KEYS if (value := _optional_string_field(content, key)) is not None ] return "\n\n".join(sections) def _required_string_field(content: dict[str, object], field: str) -> str: value = content.get(field) if not isinstance(value, str) or not value.strip(): raise ValueError(f"commit_message payloads require a non-empty '{field}'") normalized = value.strip() if field == "subject": _validate_commit_subject(normalized) return normalized def _validate_commit_subject(subject: str) -> None: if not _COMMIT_SUBJECT_PATTERN.fullmatch(subject): raise ValueError( "commit_message subjects must use conventional commit format " "like 'fix(parser): preserve prefixed transcript lines'" ) def _optional_string_field(content: dict[str, object], field: str) -> str | None: value = content.get(field) if value is None: return None if not isinstance(value, str) or not value.strip(): raise ValueError(f"commit_message field '{field}' must be a non-empty string when provided") return value.strip() def _optional_string_list(content: dict[str, object], field: str) -> list[str] | None: value = content.get(field) if value is None: return None if not isinstance(value, list): raise ValueError(f"commit_message field '{field}' must be an array of strings") normalized: list[str] = [] for item in value: if not isinstance(item, str) or not item.strip(): raise ValueError(f"commit_message field '{field}' must contain only non-empty strings") normalized.append(item.strip()) return normalized def _optional_excluded_files(content: dict[str, object]) -> list[dict[str, object]] | None: value = content.get("excluded_files") if value is None: return None if not isinstance(value, list): raise ValueError("commit_message field 'excluded_files' must be an array") normalized: list[dict[str, object]] = [] for item in value: if not isinstance(item, dict): raise ValueError("commit_message 'excluded_files' entries must be objects") path = _required_string_field(item, "path") reason = _required_string_field(item, "reason") if reason not in _EXCLUDED_FILE_REASONS: raise ValueError( "commit_message excluded_files reason must be one of " + ", ".join(sorted(_EXCLUDED_FILE_REASONS)) ) _reject_unknown_fields(item, {"path", "reason"}) normalized.append({"path": path, "reason": reason}) return normalized def _reject_unknown_fields(content: dict[str, object], allowed: set[str]) -> None: unexpected = sorted(key for key in content if key not in allowed) if unexpected: formatted = ", ".join(unexpected) raise ValueError(f"commit_message payload contains unsupported field(s): {formatted}")