"""Helpers for reading persisted MCP artifacts inside phase handlers."""
from __future__ import annotations
import json
from typing import TYPE_CHECKING, cast
from ralph.pipeline.events import PhaseFailureEvent
from ralph.recovery.classifier import FailureCategory
if TYPE_CHECKING:
from collections.abc import Mapping
from ralph.policy.models import ArtifactContract
from ralph.workspace.protocol import Workspace
[docs]
class PhaseArtifactError(ValueError):
"""Raised when a phase artifact is missing or malformed."""
[docs]
def load_phase_artifact(workspace: Workspace, path: str) -> dict[str, object]:
"""Load a persisted MCP artifact wrapper from the workspace."""
try:
content = workspace.read(path)
except (FileNotFoundError, OSError) as exc:
raise PhaseArtifactError(f"Artifact not found at {path}") from exc
try:
raw_obj: object = json.loads(content)
except (TypeError, json.JSONDecodeError) as exc:
raise PhaseArtifactError(f"Artifact at {path} must be valid JSON text") from exc
if not isinstance(raw_obj, dict):
raise PhaseArtifactError(f"Artifact at {path} must be a JSON object")
return cast("dict[str, object]", raw_obj)
[docs]
def unwrap_phase_artifact_content(
artifact: Mapping[str, object],
*,
expected_type: str | None = None,
) -> dict[str, object]:
"""Return the inner content payload from a persisted artifact wrapper."""
artifact_type = artifact.get("type")
if expected_type is not None and artifact_type is not None and artifact_type != expected_type:
raise PhaseArtifactError(
f"Artifact type mismatch: expected {expected_type}, got {artifact_type!r}"
)
content = artifact.get("content")
if content is None and artifact_type is None:
return dict(artifact)
if not isinstance(content, dict):
raise PhaseArtifactError("Artifact content must be a JSON object")
return cast("dict[str, object]", content)
[docs]
def artifact_validation_failure_event(
phase: str,
reason: str,
*,
retry_in_session: bool = True,
) -> PhaseFailureEvent:
"""Build a typed phase failure event for artifact/proof validation issues."""
return PhaseFailureEvent(
phase=phase,
reason=reason,
recoverable=True,
retry_in_session=retry_in_session,
failure_category=FailureCategory.ARTIFACT_VALIDATION,
)
[docs]
def artifact_contract_for_drain(
artifacts_policy: object,
drain: str,
artifact_type: str,
) -> ArtifactContract | None:
"""Find the artifact contract for a drain/type pair if one exists."""
raw_artifacts: object = getattr(artifacts_policy, "artifacts", None)
if not isinstance(raw_artifacts, dict):
return None
artifacts = cast("dict[str, object]", raw_artifacts)
for contract in artifacts.values():
contract_drain = cast("object", getattr(contract, "drain", None))
contract_artifact_type = cast("object", getattr(contract, "artifact_type", None))
if (
isinstance(contract_drain, str)
and isinstance(contract_artifact_type, str)
and contract_drain == drain
and contract_artifact_type == artifact_type
):
return cast("ArtifactContract", contract)
return None
[docs]
def decision_vocabulary_for_drain(
artifacts_policy: object,
drain: str,
artifact_type: str,
) -> list[str]:
"""Return the allowed decision strings for a given drain and artifact type."""
contract = artifact_contract_for_drain(artifacts_policy, drain, artifact_type)
vocabulary: object = (
getattr(contract, "decision_vocabulary", []) if contract is not None else []
)
return list(vocabulary) if isinstance(vocabulary, list) else []