"""Polished artifact block renderers for plan/analysis/commit/fix artifacts.
These renderers read artifact files and emit rich, titled blocks that are
clearly delimited in the transcript. All output is markup-free and
highlight-free for copy-paste safety.
"""
from __future__ import annotations
import json
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING, cast
from rich.rule import Rule
from ralph.display.artifact_reader import (
read_latest_analysis_decision,
read_plan_artifact,
)
from ralph.display.phase_banner import phase_style
from ralph.mcp.artifacts.commit_message import read_commit_message_artifact
from ralph.mcp.artifacts.handoffs import (
ensure_markdown_handoff_from_artifact,
handoff_path_for_artifact,
)
if TYPE_CHECKING:
from rich.console import Console
from ralph.display.context import DisplayContext
_ARTIFACTS_DIR = ".agent/artifacts"
def _read_text_defensive(path: Path) -> str | None:
try:
content = path.read_text(encoding="utf-8")
except (FileNotFoundError, OSError, PermissionError):
return None
return content
def _read_markdown_handoff(workspace_root: Path, artifact_type: str) -> str | None:
relative_path = handoff_path_for_artifact(artifact_type)
if relative_path is None:
return None
candidate = workspace_root / relative_path
markdown = _read_text_defensive(candidate)
if markdown is None:
return None
stripped = markdown.strip()
return stripped or None
def _regenerated_markdown_handoff(
workspace_root: Path,
artifact_type: str,
artifact_path: Path,
) -> str | None:
artifact_content = _read_text_defensive(artifact_path)
if artifact_content is None:
return None
try:
created_path = ensure_markdown_handoff_from_artifact(
workspace_root,
artifact_type,
artifact_content,
)
except (json.JSONDecodeError, OSError, PermissionError, TypeError, ValueError):
return None
if created_path is None:
return None
regenerated = _read_text_defensive(Path(created_path))
if regenerated is None:
return None
stripped = regenerated.strip()
return stripped or None
def _resolve_authoritative_markdown_handoff(
workspace_root: Path,
artifact_type: str,
artifact_path: Path,
) -> str | None:
regenerated = _regenerated_markdown_handoff(workspace_root, artifact_type, artifact_path)
if regenerated is not None:
return regenerated
return _read_markdown_handoff(workspace_root, artifact_type)
def _render_titled_lines(
title: str,
style_phase: str,
lines: list[str],
console: Console,
) -> None:
console.print()
console.print(Rule(title, style=phase_style(style_phase)), markup=False, highlight=False)
for line in lines:
console.print(line, markup=False, highlight=False)
console.print(Rule(style=phase_style(style_phase)), markup=False, highlight=False)
def _render_text_block(
title: str,
body: str,
style_phase: str,
console: Console,
*,
indent: bool = False,
) -> None:
lines = [line.rstrip() for line in body.splitlines() if line.strip()]
if indent:
lines = [f" {lines[0]}", *[f" {line}" for line in lines[1:]]] if lines else []
_render_titled_lines(title, style_phase, lines, console)
def _read_json_defensive(path: Path) -> dict[str, object] | None:
"""Read JSON file defensively, returning None on any error."""
raw = _read_text_defensive(path)
if raw is None:
return None
try:
parsed_obj: object = json.loads(raw)
except json.JSONDecodeError:
return None
if not isinstance(parsed_obj, dict):
return None
return cast("dict[str, object]", parsed_obj)
[docs]
def render_missing_plan_hint(display_context: DisplayContext) -> None:
"""Emit a plain INFO line when the plan artifact is absent at phase completion.
Call this from the planning phase completion handler when plan.json is not
on disk so the log stream always has a [plan] entry rather than silence.
"""
timestamp = datetime.now(UTC).isoformat()
display_context.console.print(
f"{timestamp} INFO META [plan] (no plan artifact on disk)",
markup=False,
highlight=False,
no_wrap=True,
)
[docs]
def render_plan_artifact(
workspace_root: Path,
display_context: DisplayContext,
) -> None:
"""Render the agent-facing plan handoff, falling back to the JSON summary.
Prefer the authoritative Markdown handoff regenerated from ``plan.json`` when
that artifact exists. Fall back to ``.agent/PLAN.md`` only when there is no
structured artifact available. Missing artifacts emit a hint line.
"""
markdown = _resolve_authoritative_markdown_handoff(
workspace_root,
"plan",
workspace_root / _ARTIFACTS_DIR / "plan.json",
)
if markdown:
_render_text_block("PLAN", markdown, "execution", display_context.console)
return
plan = read_plan_artifact(workspace_root)
if plan is None:
render_missing_plan_hint(display_context)
return
lines: list[str] = []
if plan.summary:
lines.append(f" Context: {plan.summary}")
if plan.scope_items:
lines.append(" Scope:")
lines.extend(f" - {item}" for item in plan.scope_items)
if plan.total_steps > 0:
lines.append(f" Steps: {plan.total_steps}")
if plan.risks_mitigations:
lines.append(" Risks:")
lines.extend(f" - {risk}" for risk in plan.risks_mitigations)
_render_titled_lines("PLAN", "execution", lines, display_context.console)
[docs]
def render_analysis_decision(
workspace_root: Path,
drain: str,
display_context: DisplayContext,
) -> None:
"""Render an analysis decision artifact as a titled block."""
artifact_type = _analysis_handoff_artifact_type(drain)
if artifact_type is not None:
markdown = _resolve_authoritative_markdown_handoff(
workspace_root,
artifact_type,
workspace_root / _ARTIFACTS_DIR / f"{artifact_type}.json",
)
if markdown:
_render_text_block(
f"ANALYSIS: {drain}",
markdown,
"analysis",
display_context.console,
)
return
summary = read_latest_analysis_decision(workspace_root, drain)
if summary is None:
return
lines = [f" decision: {summary.decision}"]
if summary.reason:
lines.append(f" reason: {summary.reason}")
_render_titled_lines(
f"ANALYSIS: {drain}",
"analysis",
lines,
display_context.console,
)
[docs]
def render_commit_message(
workspace_root: Path,
display_context: DisplayContext,
) -> None:
"""Render the commit message artifact as a titled block."""
try:
message = read_commit_message_artifact(workspace_root)
except Exception:
message = None
if message is None:
return
_render_text_block(
"COMMIT MESSAGE",
message,
"commit",
display_context.console,
indent=True,
)
def _analysis_handoff_artifact_type(drain: str) -> str | None:
# Derive artifact type using the {drain}_decision naming convention.
# Canonical drains (development_analysis, review_analysis) have registered
# handoff paths; custom drain names fall through to read_latest_analysis_decision
# when no handoff file is found.
return f"{drain}_decision"
[docs]
def render_development_artifact(
workspace_root: Path,
display_context: DisplayContext,
) -> None:
"""Render development results using the authoritative Markdown handoff."""
markdown = _resolve_authoritative_markdown_handoff(
workspace_root,
"development_result",
workspace_root / _ARTIFACTS_DIR / "development_result.json",
)
if markdown:
_render_text_block("DEVELOPMENT RESULT", markdown, "execution", display_context.console)
return
found = _read_json_defensive(workspace_root / _ARTIFACTS_DIR / "development_result.json")
if found is None:
return
_render_text_block(
"DEVELOPMENT RESULT",
json.dumps(found, indent=2),
"execution",
display_context.console,
)
[docs]
def render_review_artifact(
workspace_root: Path,
display_context: DisplayContext,
) -> None:
"""Render review findings using the authoritative Markdown handoff."""
markdown = _resolve_authoritative_markdown_handoff(
workspace_root,
"issues",
workspace_root / _ARTIFACTS_DIR / "issues.json",
)
if markdown:
_render_text_block("REVIEW ISSUES", markdown, "review", display_context.console)
return
found = _read_json_defensive(workspace_root / _ARTIFACTS_DIR / "issues.json")
if found is None:
return
_render_text_block(
"REVIEW ISSUES",
json.dumps(found, indent=2),
"review",
display_context.console,
)
[docs]
def render_fix_artifact(
workspace_root: Path,
display_context: DisplayContext,
) -> None:
"""Render fix result artifacts as a titled block."""
markdown = _resolve_authoritative_markdown_handoff(
workspace_root,
"fix_result",
workspace_root / _ARTIFACTS_DIR / "fix_result.json",
)
if markdown:
_render_text_block("FIX", markdown, "fix", display_context.console)
return
found = _first_json_candidate(
workspace_root / _ARTIFACTS_DIR / "fix_result.json",
workspace_root / _ARTIFACTS_DIR / "issues.json",
)
if found is None:
return
lines = _render_fix_json_summary(found)
_render_titled_lines("FIX", "fix", lines, display_context.console)
def _first_json_candidate(*candidates: Path) -> dict[str, object] | None:
for candidate in candidates:
found = _read_json_defensive(candidate)
if found is not None:
return found
return None
def _render_fix_json_summary(found: dict[str, object]) -> list[str]:
if "issues" in found and isinstance(found["issues"], list):
return _render_issues_summary(found["issues"])
if "fixed" in found:
return _render_fixed_summary(found["fixed"])
return [f" Fix artifact: {list(found.keys())[:5]}"]
def _render_issues_summary(issues: list[object]) -> list[str]:
lines = [f" {len(issues)} issue(s) addressed:"]
for issue in issues[:10]:
if isinstance(issue, dict):
desc_obj = issue.get("description") or issue.get("message") or str(issue)
else:
desc_obj = str(issue)
lines.append(f" - {str(desc_obj)[:120]}")
return lines
def _render_fixed_summary(fixed: object) -> list[str]:
if isinstance(fixed, list):
lines = [f" {len(fixed)} item(s) fixed:"]
lines.extend(f" - {str(item)[:120]}" for item in fixed[:10])
return lines
return [f" Fixed: {fixed}"]
__all__ = [
"render_analysis_decision",
"render_commit_message",
"render_development_artifact",
"render_fix_artifact",
"render_missing_plan_hint",
"render_plan_artifact",
"render_review_artifact",
]