"""Centralized required-artifact metadata for all pipeline phases.
Artifact metadata is split across two policy surfaces. ``artifacts.toml`` owns
artifact type, JSON path, markdown handoff path, and schema normalizer lookup.
``pipeline.toml`` owns whether a phase's output artifact is required for
success. There are no built-in override tables — artifact paths must be
declared in ``artifacts.toml`` and requiredness must be declared on the phase
definition.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING
from ralph.mcp.artifacts.development_result import normalize_development_result_content
from ralph.mcp.artifacts.typed_artifacts import (
normalize_fix_result_content,
normalize_issues_content,
)
from ralph.recovery.retry_prompt import build_retry_error_block
if TYPE_CHECKING:
from collections.abc import Callable
from ralph.policy.models import ArtifactsPolicy, PipelinePolicy
# Normalizers keyed by artifact_type — used by build_required_artifacts()
_ARTIFACT_TYPE_NORMALIZERS: dict[str, Callable[[dict[str, object]], dict[str, object]]] = {
"development_result": normalize_development_result_content,
"fix_result": normalize_fix_result_content,
"issues": normalize_issues_content,
}
[docs]
@dataclass(frozen=True)
class RequiredArtifact:
"""Metadata about an artifact contract for a pipeline phase.
When artifact_required is False, an absent artifact does not fail the phase;
a present artifact is still validated.
"""
phase: str
artifact_type: str
json_path: str
markdown_path: str | None
normalizer: Callable[[dict[str, object]], dict[str, object]] | None
artifact_required: bool = True
[docs]
def build_required_artifacts(
artifacts_policy: ArtifactsPolicy,
) -> dict[str, RequiredArtifact]:
"""Build a drain-keyed artifact registry from ArtifactsPolicy.
The registry contains artifact identity and path metadata only. Callers that
need phase-specific requiredness must use resolve_phase_required_artifact().
"""
result: dict[str, RequiredArtifact] = {}
for contract in artifacts_policy.artifacts.values():
drain = str(contract.drain)
artifact_type = contract.artifact_type
json_path = contract.artifact_json_path or f".agent/artifacts/{artifact_type}.json"
markdown_path = contract.markdown_summary_path
normalizer = _ARTIFACT_TYPE_NORMALIZERS.get(artifact_type)
result[drain] = RequiredArtifact(
phase=drain,
artifact_type=artifact_type,
json_path=json_path,
markdown_path=markdown_path,
normalizer=normalizer,
artifact_required=True,
)
return result
[docs]
def resolve_required_artifact(
artifacts_policy: ArtifactsPolicy,
*,
drain: str,
) -> RequiredArtifact | None:
"""Resolve artifact identity/path metadata for a drain from artifacts.toml."""
try:
registry = build_required_artifacts(artifacts_policy)
return registry.get(drain)
except AttributeError:
return None
[docs]
def resolve_phase_required_artifact(
pipeline_policy: PipelinePolicy,
artifacts_policy: ArtifactsPolicy,
*,
phase: str,
drain: str | None = None,
) -> RequiredArtifact | None:
"""Resolve the artifact contract for a phase, including phase-owned requiredness."""
phase_def = pipeline_policy.phases.get(phase)
effective_drain = drain or (phase_def.drain if phase_def is not None else phase)
ra = resolve_required_artifact(artifacts_policy, drain=effective_drain)
if ra is None:
return None
required = phase_def.artifact_required if phase_def is not None else True
return RequiredArtifact(
phase=phase,
artifact_type=ra.artifact_type,
json_path=ra.json_path,
markdown_path=ra.markdown_path,
normalizer=ra.normalizer,
artifact_required=required,
)
[docs]
def retry_hint_path(phase: str) -> str:
"""Return the workspace-relative path for the retry hint file for a phase."""
return f".agent/tmp/last_retry_error_{phase}.txt"
[docs]
def build_retry_hint(
phase: str,
detail: str,
*,
registry: dict[str, RequiredArtifact] | None = None,
) -> str:
"""Build a retry hint message for a phase that failed to submit a required artifact.
Args:
phase: Pipeline phase name.
detail: Error detail message.
registry: Optional policy-derived artifact registry. When provided,
the hint includes the specific artifact type and path.
"""
ra = registry.get(phase) if registry is not None else None
if ra is None:
return build_retry_error_block(
failure_summary=(
"the required artifact was not submitted before completion was declared"
),
detail=detail,
)
return build_retry_error_block(
failure_summary=(
f"required artifact '{ra.artifact_type}' at '{ra.json_path}' "
"was not submitted or was invalid"
),
detail=detail,
)
[docs]
def build_proof_failure_hint(phase: str, detail: str) -> str:
"""Build a retry hint for a phase that submitted proof but failed validation."""
return build_retry_error_block(
failure_summary="proof entries are incomplete or invalid",
detail=detail,
)
__all__ = [
"RequiredArtifact",
"build_missing_input_hint",
"build_proof_failure_hint",
"build_required_artifacts",
"build_retry_hint",
"resolve_phase_required_artifact",
"resolve_required_artifact",
"retry_hint_path",
]