Source code for ralph.cli.commands.commit

"""Commit plumbing commands for Ralph CLI.

This module implements commit-related commands for generating
and applying commit messages.
"""

from __future__ import annotations

import json
import typing
import uuid
from collections import deque
from dataclasses import dataclass, field
from inspect import signature
from pathlib import Path
from typing import TYPE_CHECKING, cast

from rich.text import Text

from ralph.agents.invoke import (
    AgentInvocationError,
    InvokeOptions,
    InvokeRuntimeOptions,
    build_invoke_options_from_config,
    extract_session_id,
    invoke_agent,
)
from ralph.agents.parsers import AgentOutputLine, AgentParser, get_parser
from ralph.agents.registry import AgentRegistry
from ralph.cli.commands._commit_agent_attempt import CommitAgentAttempt
from ralph.cli.commands._commit_attempt_context import CommitAttemptContext
from ralph.cli.commands._commit_chain_config import CommitChainConfig
from ralph.cli.commands._commit_plumbing_options import CommitPlumbingOptions
from ralph.config.enums import AgentTransport
from ralph.config.loader import load_config
from ralph.display.artifact_renderer import render_commit_message
from ralph.display.context import DisplayContext, make_display_context
from ralph.executor.process import ProcessRunOptions, run_process
from ralph.git.operations import (
    create_commit,
    find_repo_root,
    has_staged_changes,
    stage_all,
)
from ralph.mcp.artifacts.commit_message import (
    delete_commit_message_artifacts,
    read_commit_message_artifact,
)
from ralph.mcp.protocol.session import MCP_ENDPOINT_ENV, MCP_RUN_ID_ENV, AgentSession
from ralph.mcp.server.lifecycle import McpServerExtras, SessionBridgeLike, start_mcp_server
from ralph.mcp.session_plan import build_session_mcp_plan
from ralph.mcp.tools.names import SUBMIT_ARTIFACT_TOOL, claude_tool_name_prefix
from ralph.policy.loader import load_agents_policy_for_workspace_scope
from ralph.policy.models import AgentChainConfig, AgentDrainConfig
from ralph.prompts.commit import (
    CommitPromptPayloadConfig,
    prompt_commit_message,
    prompt_commit_message_for_opencode,
)
from ralph.prompts.materialize import submit_artifact_tool_name_for_transport
from ralph.prompts.payload_refs import sanitize_surrogates as _sanitize_surrogates
from ralph.prompts.system_prompt import materialize_system_prompt
from ralph.prompts.template_registry import TemplateRegistry, default_template_dirs
from ralph.workspace.fs import FsWorkspace
from ralph.workspace.scope import resolve_workspace_scope

__all__ = ["CommitPlumbingOptions", "commit_plumbing"]

if TYPE_CHECKING:
    from collections.abc import Iterable, Iterator

    from ralph.config.models import AgentConfig, UnifiedConfig
    from ralph.display.context import DisplayContext
    from ralph.policy.models import AgentsPolicy


class _RepoHeadProtocol(typing.Protocol):
    def is_valid(self) -> bool: ...


class _RepoGitProtocol(typing.Protocol):
    def diff(self, *_args: object, **_kwargs: object) -> str: ...


class _RepoProtocol(typing.Protocol):
    head: _RepoHeadProtocol
    git: _RepoGitProtocol


class _RepoFactoryProtocol(typing.Protocol):
    def __call__(self, *_args: object, **_kwargs: object) -> _RepoProtocol: ...

# Maximum number of staged files to display in output
_MAX_DISPLAY_FILES = 5
_DEFAULT_COMMIT_AGENT = "claude"
_VERBOSE_THRESHOLD = 2
_SKIP_PREFIX = "skip:"
_MAX_METADATA_PARTS = 5
_MISSING_COMMIT_ARTIFACT_REASON = "agent completed without writing a commit_message artifact"
_MAX_COMMIT_PARSED_OUTPUT_LINES = 128
_MAX_COMMIT_RAW_OUTPUT_LINES = 256

Repo: _RepoFactoryProtocol | None = None


@dataclass(frozen=True)
class CommitAgentResult:
    """Aggregated result returned after all commit-message agent attempts complete."""

    message: str = ""
    skipped: bool = False
    failure_details: list[str] = field(default_factory=list)


[docs] def commit_plumbing( *, options: CommitPlumbingOptions | None = None, display_context: DisplayContext | None = None, ) -> None: """Handle commit plumbing operations. Args: options: Commit plumbing options. display_context: Display context for consistent rendering. If None, a context is created using make_display_context(). """ ctx = display_context if display_context is not None else make_display_context() console = ctx.console opts = options or CommitPlumbingOptions() try: repo_root = find_repo_root() except Exception as e: console.print( _styled_commit_status("Error", f"Not in a git repository: {e}", "theme.status.error") ) return # Load configuration try: workspace_scope = ( None if opts.config_path is not None else resolve_workspace_scope(repo_root) ) config = load_config(opts.config_path, opts.cli_overrides, workspace_scope=workspace_scope) except Exception as e: console.print(_styled_commit_status("Error loading config", str(e), "theme.status.error")) return if opts.show_commit_msg: _show_commit_message(repo_root, display_context=ctx) return if opts.generate_commit_msg or opts.generate_commit: _handle_agent_commit_generation( repo_root=repo_root, config=config, options=opts, display_context=ctx, ) return if not has_staged_changes(repo_root): console.print(Text("No staged changes to commit", style="theme.status.warning")) return
def _handle_agent_commit_generation( *, repo_root: Path, config: UnifiedConfig, options: CommitPlumbingOptions, display_context: DisplayContext, ) -> None: ctx = display_context console = ctx.console generate = options.generate_commit_msg or options.generate_commit apply = options.generate_commit git_user_name = config.general.git_user_name git_user_email = config.general.git_user_email if not generate: return delete_commit_message_artifacts(repo_root) diff = working_tree_diff(repo_root) if not diff.strip(): console.print(Text("No changes to commit", style="theme.status.warning")) return registry = AgentRegistry.from_config(config) agents = _resolve_commit_message_agents(config, registry) if not agents: console.print( Text( "No commit-capable agents available in commit/review drains", style="theme.status.error", ) ) return workspace_scope = resolve_workspace_scope(repo_root) result = _generate_commit_message_with_chain( diff=diff, repo_root=repo_root, chain_config=CommitChainConfig( registry=registry, agents=agents, verbose=config.general.verbosity >= _VERBOSE_THRESHOLD, agents_policy=load_agents_policy_for_workspace_scope(workspace_scope, config=config), general_config=config.general, ), display_context=ctx, ) if result.skipped: delete_commit_message_artifacts(repo_root) console.print(Text("Skipping commit: agent requested skip", style="theme.status.warning")) return if not result.message: console.print( Text( "Failed to generate commit message from commit drain agents", style="theme.status.error", ) ) _print_commit_failure_details(result.failure_details, display_context=ctx) return persisted_message = read_commit_message_artifact(repo_root) if persisted_message is None: console.print( Text("Failed to persist generated commit message", style="theme.status.error") ) return # Use the shared render_commit_message for consistent UI console.print(Text("\nGenerated commit message:", style="theme.status.success")) render_commit_message(repo_root, ctx) if apply: try: stage_all(repo_root) sha = create_commit( repo_root, persisted_message, author_name=git_user_name, author_email=git_user_email, ) delete_commit_message_artifacts(repo_root) console.print( _styled_commit_status( "Created commit", sha[:8], "theme.status.success", leading_newline=True ) ) except Exception as e: console.print( _styled_commit_status( "Commit failed", str(e), "theme.status.error", leading_newline=True ) ) def _resolve_chain_agent_names(config: object, drain_name: str) -> list[str]: raw_agent_drains_obj: object = getattr(config, "agent_drains", {}) raw_agent_drains = ( cast("dict[str, object]", raw_agent_drains_obj) if isinstance(raw_agent_drains_obj, dict) else {} ) raw_agent_chains_obj: object = getattr(config, "agent_chains", {}) raw_agent_chains = ( cast("dict[str, object]", raw_agent_chains_obj) if isinstance(raw_agent_chains_obj, dict) else {} ) drain_binding = raw_agent_drains.get(drain_name) if isinstance(drain_binding, AgentDrainConfig): chain_name = drain_binding.chain elif isinstance(drain_binding, str): chain_name = drain_binding else: return [] chain_value = raw_agent_chains.get(chain_name) if isinstance(chain_value, AgentChainConfig): return list(chain_value.agents) if isinstance(chain_value, list): return list(chain_value) return [] def _resolve_commit_message_agents(config: UnifiedConfig, registry: AgentRegistry) -> list[str]: commit_chain = _resolve_chain_agent_names(config, "commit") review_chain = _resolve_chain_agent_names(config, "review") commit_candidates = [ name for name in commit_chain if _commit_drain_agent_supported(registry, name) ] if commit_candidates: return commit_candidates review_candidates = [ name for name in review_chain if _commit_drain_agent_supported(registry, name) ] if review_candidates: return review_candidates default_candidates = [_DEFAULT_COMMIT_AGENT] return [name for name in default_candidates if _commit_drain_agent_supported(registry, name)] def _commit_drain_agent_supported(registry: AgentRegistry, agent_name: str) -> bool: cfg = registry.get(agent_name) return cfg is not None and bool(cfg.can_commit) def _working_tree_diff(repo_root: Path) -> str: if Repo is not None: repo = Repo(repo_root) if repo.head.is_valid(): return _sanitize_surrogates(repo.git.diff("HEAD")) or "(no diff available)" return _sanitize_surrogates(repo.git.diff("--cached")) or "(no diff available)" head_check = run_process( "git", ["rev-parse", "--verify", "HEAD"], options=ProcessRunOptions(cwd=repo_root), ) if head_check.returncode == 0: result = run_process("git", ["diff", "HEAD"], options=ProcessRunOptions(cwd=repo_root)) else: result = run_process( "git", ["diff", "--cached"], options=ProcessRunOptions(cwd=repo_root), ) if result.returncode != 0: return "(no diff available)" return _sanitize_surrogates(result.stdout) or "(no diff available)" def _commit_submit_artifact_tool_names( registry: AgentRegistry, agents: list[str], ) -> tuple[str, ...]: names: list[str] = [] for agent_name in agents: agent = registry.get(agent_name) if agent is None: continue tool_name = submit_artifact_tool_name_for_transport(agent.transport) if tool_name not in names: names.append(tool_name) return tuple(names) or (SUBMIT_ARTIFACT_TOOL,) def _is_opencode_agent(agent: AgentConfig | None) -> bool: return agent is not None and agent.transport == AgentTransport.OPENCODE def _commit_prompt_for_agent( agent: AgentConfig, diff: str, *, template_registry: TemplateRegistry, repo_root: Path, ) -> str: payload_output_dir = repo_root / ".agent" / "tmp" / "prompt_payloads" if _is_opencode_agent(agent): return prompt_commit_message_for_opencode( diff, submit_artifact_tool_name=SUBMIT_ARTIFACT_TOOL, payload_config=CommitPromptPayloadConfig( output_dir=payload_output_dir, name_prefix="commit_plumbing", ), ) return prompt_commit_message( diff, template_registry=template_registry, submit_artifact_tool_names=_submit_artifact_tool_names_for_transport(agent.transport), payload_config=CommitPromptPayloadConfig( output_dir=payload_output_dir, name_prefix="commit_plumbing", ), ) def _submit_artifact_tool_names_for_transport( transport: AgentTransport | None, ) -> tuple[str, ...]: if transport in (AgentTransport.CLAUDE, AgentTransport.CLAUDE_INTERACTIVE): return SUBMIT_ARTIFACT_TOOL.prompt_aliases( tool_name_prefix=claude_tool_name_prefix(), ) return (SUBMIT_ARTIFACT_TOOL,) def _generate_commit_message_with_chain( *, diff: str, repo_root: Path, chain_config: CommitChainConfig, display_context: DisplayContext, ) -> CommitAgentResult: template_dirs = (repo_root / ".agent" / "prompts" / "commit", *default_template_dirs(repo_root)) template_registry = TemplateRegistry(template_dirs=template_dirs) start_commit_bridge_params = signature(start_commit_bridge).parameters if "agents_policy" in start_commit_bridge_params: bridge = start_commit_bridge(repo_root, agents_policy=chain_config.agents_policy) else: legacy_start_commit_bridge = cast( "typing.Callable[[Path], SessionBridgeLike]", start_commit_bridge, ) bridge = legacy_start_commit_bridge(repo_root) extra_env = _commit_bridge_env(bridge) failure_details: list[str] = [] try: for agent_name in chain_config.agents: cfg = chain_config.registry.get(agent_name) if cfg is None: continue prompt = _commit_prompt_for_agent( cfg, diff, template_registry=template_registry, repo_root=repo_root, ) prompt_file = write_commit_prompt_file(repo_root, prompt) attempt_ctx = CommitAttemptContext( repo_root=repo_root, verbose=chain_config.verbose, extra_env=extra_env, general_config=chain_config.general_config, ) result = _generate_commit_message_with_agent( cfg, prompt_file=prompt_file, attempt_context=attempt_ctx, display_context=display_context, ) failure_details.extend(result.failure_details) if result.skipped: return CommitAgentResult(skipped=True, failure_details=failure_details) if result.message: return CommitAgentResult(message=result.message) finally: bridge.shutdown() return CommitAgentResult(failure_details=failure_details) def _generate_commit_message_with_agent( agent: AgentConfig, *, prompt_file: str, attempt_context: CommitAttemptContext, display_context: DisplayContext, ) -> CommitAgentResult: failure_details: list[str] = [] initial_attempt = invoke_commit_agent_attempt( agent, prompt_file=prompt_file, attempt_context=attempt_context, display_context=display_context, ) if initial_attempt.failure_detail: failure_details.append(initial_attempt.failure_detail) else: return _finalize_commit_attempt(initial_attempt, failure_details) if not _is_missing_commit_artifact_failure(initial_attempt.failure_detail): return CommitAgentResult(failure_details=failure_details) if initial_attempt.resume_session_id: session_retry = invoke_commit_agent_attempt( agent, prompt_file=prompt_file, attempt_context=attempt_context, session_id=initial_attempt.resume_session_id, display_context=display_context, ) if session_retry.failure_detail: failure_details.append(session_retry.failure_detail) else: return _finalize_commit_attempt(session_retry, failure_details) if not _is_missing_commit_artifact_failure(session_retry.failure_detail): return CommitAgentResult(failure_details=failure_details) summary_prompt_file = write_commit_prompt_file( attempt_context.repo_root, _summarized_retry_prompt( _read_retry_prompt_text(prompt_file), initial_attempt.parsed_output, ), ) summary_retry = invoke_commit_agent_attempt( agent, prompt_file=summary_prompt_file, attempt_context=attempt_context, session_id=initial_attempt.resume_session_id, display_context=display_context, ) if summary_retry.failure_detail: failure_details.append(summary_retry.failure_detail) else: return _finalize_commit_attempt(summary_retry, failure_details) return CommitAgentResult(failure_details=failure_details) def _is_skip_response(text: str) -> bool: return text.strip().lower().startswith(_SKIP_PREFIX) def invoke_commit_agent_attempt( agent: AgentConfig, *, prompt_file: str, attempt_context: CommitAttemptContext, session_id: str | None = None, display_context: DisplayContext, ) -> CommitAgentAttempt: """Run one commit-agent invocation attempt and return its result.""" delete_commit_message_artifacts(attempt_context.repo_root) system_prompt = materialize_system_prompt( workspace_root=attempt_context.repo_root, name="commit", default_current_prompt="Commit message generation task.", ) if attempt_context.general_config is not None: options = build_invoke_options_from_config( attempt_context.general_config, InvokeRuntimeOptions( verbose=attempt_context.verbose, workspace_path=attempt_context.repo_root, extra_env=attempt_context.extra_env, pure=_is_opencode_agent(agent), session_id=session_id, system_prompt_file=system_prompt, ), ) else: options = InvokeOptions( verbose=attempt_context.verbose, workspace_path=attempt_context.repo_root, extra_env=attempt_context.extra_env, pure=_is_opencode_agent(agent), session_id=session_id, system_prompt_file=system_prompt, ) try: lines = invoke_agent( agent, prompt_file, options=options, ) except AgentInvocationError as exc: return CommitAgentAttempt( failure_detail=_format_agent_invocation_failure( agent.cmd, prompt_file, exc, parsed_output=[] ) ) try: parsed_output, raw_output, resume_session_id = collect_commit_agent_output( lines, parser_type=str(agent.json_parser), agent_name=agent.cmd.split()[0], verbose=attempt_context.verbose, display_context=display_context, ) except AgentInvocationError as exc: return CommitAgentAttempt( failure_detail=_format_agent_invocation_failure( agent.cmd, prompt_file, exc, parsed_output=_parsed_output_from_invocation_error(exc), ) ) try: artifact_message = read_commit_message_artifact(attempt_context.repo_root) except Exception as exc: return CommitAgentAttempt( failure_detail=_format_commit_agent_failure( agent.cmd, prompt_file, parsed_output, str(exc) ), parsed_output=parsed_output, raw_output=raw_output, resume_session_id=resume_session_id, ) if not artifact_message: return CommitAgentAttempt( failure_detail=_format_commit_agent_failure( agent.cmd, prompt_file, parsed_output, _MISSING_COMMIT_ARTIFACT_REASON, ), parsed_output=parsed_output, raw_output=raw_output, resume_session_id=resume_session_id, ) if _is_skip_response(artifact_message): return CommitAgentAttempt(skipped=True, parsed_output=parsed_output, raw_output=raw_output) return CommitAgentAttempt( message=artifact_message, parsed_output=parsed_output, raw_output=raw_output ) def _finalize_commit_attempt( attempt: CommitAgentAttempt, failure_details: list[str], ) -> CommitAgentResult: if attempt.skipped: return CommitAgentResult(skipped=True, failure_details=failure_details) return CommitAgentResult(message=attempt.message, failure_details=failure_details) def _is_missing_commit_artifact_failure(detail: str) -> bool: return _MISSING_COMMIT_ARTIFACT_REASON in detail def _summarized_retry_prompt(base_prompt: str, parsed_output: list[str]) -> str: output_lines = "\n".join(parsed_output[-12:]) if parsed_output else "(no output captured)" example_content: dict[str, str] = { "type": "commit", "subject": "type(scope): description", } example_arguments: dict[str, str] = { "artifact_type": "commit_message", "content": json.dumps(example_content), } example_payload = json.dumps(example_arguments) return ( f"{base_prompt}\n\n" "RETRY CONTEXT:\n" "Previous attempt failed to submit the required commit_message artifact.\n" "Treat the prior conversational output as a failure, not as permission " "to ask the user a question.\n" "Do not repeat the mistake. Submit the artifact now.\n" 'Call the submit-artifact MCP tool with artifact_type="commit_message" ' "and put the commit payload in the content field as a JSON string.\n" "Example MCP arguments:\n" f"{example_payload}\n" "If the submit-artifact MCP tool is still unavailable, write the raw commit payload JSON " "to .agent/tmp/commit_message.json instead.\n" "Write only the inner payload object, such as " '{"type":"commit","subject":"fix(scope): message"}, without artifact metadata.\n' "Do not use content_path for this retry.\n" "Do not use Bash, python, tee, printf, shell redirection, or any file-writing path " "other than writing .agent/tmp/commit_message.json directly.\n" "Message quality mistakes to avoid:\n" "- Bad: chore: update files -> Good: feat(mcp): add structured commit retries\n" "- Bad: fix: stuff -> Good: fix(parser): preserve prefixed transcript lines\n" "- Use chore only for repo maintenance, not meaningful code changes.\n" "- Omit the scope when the change spans multiple subsystems.\n" "- Include a body when the why is not obvious from the subject alone.\n" "Previous output summary:\n" f"{output_lines}\n" ) def _read_retry_prompt_text(prompt_file: str) -> str: path = Path(prompt_file) if not path.exists(): return "" return path.read_text(encoding="utf-8") def _write_commit_prompt_file(repo_root: Path, prompt: str) -> str: prompt_dir = repo_root / ".agent" / "tmp" prompt_dir.mkdir(parents=True, exist_ok=True) for stale_path in prompt_dir.glob("commit_prompt*.md"): stale_path.unlink(missing_ok=True) prompt_path = prompt_dir / f"commit_prompt_{uuid.uuid4().hex}.md" prompt_path.write_text(prompt, encoding="utf-8") return str(prompt_path) def _show_commit_message(repo_root: Path, *, display_context: DisplayContext) -> None: ctx = display_context console = ctx.console commit_message = read_commit_message_artifact(repo_root) if commit_message is None: console.print(Text("No commit message generated yet", style="theme.status.error")) return # Use the shared render_commit_message for consistent UI render_commit_message(repo_root, ctx) def _print_commit_failure_details( failure_details: list[str], *, display_context: DisplayContext, ) -> None: ctx = display_context console = ctx.console for detail in failure_details: console.print(Text(detail, style="theme.status.error")) def _format_agent_invocation_failure( agent_name: str, prompt_file: str, exc: AgentInvocationError, *, parsed_output: list[str] | None = None, ) -> str: stderr = exc.stderr.strip() or "(no stderr)" lines = [ f"Agent: {agent_name}", f"Prompt file: {prompt_file}", f"Exit code: {exc.returncode}", ] if parsed_output: lines.extend(["Agent output:", *parsed_output]) lines.extend(["Stderr:", stderr]) return "\n".join(lines) def _format_commit_agent_failure( agent_name: str, prompt_file: str, parsed_output: list[str], reason: str, ) -> str: lines = [ f"Agent: {agent_name}", f"Prompt file: {prompt_file}", f"Reason: {reason}", ] if parsed_output: lines.extend(["Agent output:", *parsed_output]) else: lines.append("Agent output: (no output captured)") return "\n".join(lines) def collect_commit_agent_output( lines: Iterable[object], *, parser_type: str, agent_name: str, verbose: bool, display_context: DisplayContext, ) -> tuple[list[str], list[str], str | None]: """Consume agent output lines, returning (parsed_lines, raw_lines, resume_session_id).""" ctx = display_context console = ctx.console parser = _resolve_commit_parser(parser_type) parsed_output: deque[str] = deque(maxlen=_MAX_COMMIT_PARSED_OUTPUT_LINES) raw_output: deque[str] = deque(maxlen=_MAX_COMMIT_RAW_OUTPUT_LINES) resume_session_id: str | None = None try: def _raw_lines() -> Iterator[str]: for line in lines: raw_line = str(line) raw_output.append(raw_line) session_id = extract_session_id((raw_line,)) nonlocal resume_session_id if session_id is not None: resume_session_id = session_id yield raw_line for parsed_line in parser.parse(_raw_lines()): rendered = _render_commit_agent_activity_line(parsed_line, agent_name) if rendered is None: continue parsed_output.append(rendered.plain) if verbose: console.print(rendered) except AgentInvocationError as exc: raise _invocation_error_with_output(exc, list(parsed_output)) from exc return list(parsed_output), list(raw_output), resume_session_id def _resolve_commit_parser(parser_type: str) -> AgentParser: try: return get_parser(parser_type) except ValueError: return get_parser("generic") def _render_commit_agent_activity_line(output: AgentOutputLine, agent_name: str) -> Text | None: rendered: Text | None = None if output.type == "text": content = output.content.strip() if content: rendered = _styled_commit_prefix(agent_name, "theme.text.emphasis") rendered.append(content) elif output.type == "tool_use": tool_name = output.content.strip() or "unknown-tool" summary = _tool_input_summary(output.metadata) rendered = _styled_commit_prefix(f"{agent_name} tool", "theme.phase.review_analysis") rendered.append(tool_name) if summary: rendered.append(f" ({summary})") elif output.type == "tool_result": result = output.content.strip() or _event_summary(output) if result: rendered = _styled_commit_prefix(f"{agent_name} tool result", "theme.text.muted") rendered.append(result) elif output.type == "error": error = output.content.strip() or "unknown error" rendered = _styled_commit_prefix(f"{agent_name} error", "theme.status.error") rendered.append(error) else: rendered = _styled_commit_prefix(f"{agent_name} {output.type}", "theme.text.muted") rendered.append(_event_summary(output)) return rendered def _styled_commit_prefix(label: str, style: str) -> Text: text = Text() text.append(f"{label}:", style=style) text.append(" ") return text def _styled_commit_status( label: str, detail: str, style: str, *, leading_newline: bool = False, ) -> Text: text = Text() if leading_newline: text.append("\n") text.append(f"{label}:", style=style) text.append(" ") text.append(detail) return text def _event_summary(output: AgentOutputLine) -> str: content = output.content.strip() if content: return content if output.metadata: summary = _metadata_summary(output.metadata) if summary: return summary return "(no details)" def _tool_input_summary(metadata: dict[str, object]) -> str: input_obj = metadata.get("input") if isinstance(input_obj, dict): return _metadata_summary(cast("dict[str, object]", input_obj)) return "" def _metadata_summary(metadata: dict[str, object]) -> str: preferred_keys = ( "status", "summary", "phase", "tool", "name", "command", "workdir", "path", "result", "output", "error", "message", ) parts: list[str] = [] for key in preferred_keys: if key not in metadata: continue value = _format_metadata_value(metadata[key]) if value: parts.append(f"{key}={value}") if parts: return "; ".join(parts) for key, value_obj in metadata.items(): value = _format_metadata_value(value_obj) if value: parts.append(f"{key}={value}") if len(parts) >= _MAX_METADATA_PARTS: break return "; ".join(parts) def _format_metadata_value(value: object) -> str: match value: case str(): return value.strip() case bool(): return "true" if value else "false" case int() | float(): return str(value) case dict() | list() | tuple(): return json.dumps(value, default=str, sort_keys=True) case _: return "" def _invocation_error_with_output( exc: AgentInvocationError, parsed_output: list[str], ) -> AgentInvocationError: return AgentInvocationError( exc.agent_name, exc.returncode, exc.stderr, parsed_output=list(parsed_output), ) def _parsed_output_from_invocation_error(exc: AgentInvocationError) -> list[str]: parsed_output: list[str] = exc.parsed_output return parsed_output def _start_commit_bridge(repo_root: Path, *, agents_policy: AgentsPolicy) -> SessionBridgeLike: session_mcp_plan = build_session_mcp_plan( transport=None, drain="commit", workspace_path=repo_root, agents_policy=agents_policy, ) session = AgentSession( session_id=f"commit-{uuid.uuid4().hex[:8]}", run_id=str(uuid.uuid4()), drain="commit", capabilities=set(session_mcp_plan.capabilities), model_identity=session_mcp_plan.model_identity, stored_capability_profile=session_mcp_plan.capability_profile, ) workspace = FsWorkspace(repo_root) return start_mcp_server( session, workspace, extras=McpServerExtras(extra_env=session_mcp_plan.server_env) ) def _commit_bridge_env(bridge: SessionBridgeLike) -> dict[str, str]: return { MCP_ENDPOINT_ENV: bridge.agent_endpoint_uri(), MCP_RUN_ID_ENV: "commit-plumbing", } start_commit_bridge = _start_commit_bridge write_commit_prompt_file = _write_commit_prompt_file working_tree_diff = _working_tree_diff render_commit_agent_activity_line = _render_commit_agent_activity_line