"""Generic execution phase handler.
Handles any phase with role='execution'. Behavior is determined by the drain's
artifact contract:
- Drain produces artifact_type='plan': plan validation, noop detection, plan draft
management (PreparePromptEffect clears stale drafts).
- Drain produces artifact_type='development_result': plan INPUT validation (noop
short-circuit + work-unit policy check) before validating the output artifact.
- All other drains: validate the configured output artifact contract only.
On PreparePromptEffect: clears a stale plan draft when the phase produces a plan.
On InvokeAgentEffect: validates the output artifact contract, with type-specific
pre-validation for plan and development_result drains.
"""
from __future__ import annotations
import json
import re
from contextlib import suppress
from datetime import datetime
from difflib import SequenceMatcher
from pathlib import Path
from typing import TYPE_CHECKING
from loguru import logger
from ralph.mcp.artifacts.development_result import DevelopmentResult
from ralph.mcp.artifacts.plan import (
PLAN_ARTIFACT_PATH,
PLAN_DRAFT_PATH,
PlanArtifactValidationError,
is_noop_plan,
load_plan_draft,
normalize_plan_artifact_content,
)
from ralph.phases.artifacts import (
PhaseArtifactError,
artifact_validation_failure_event,
load_phase_artifact,
unwrap_phase_artifact_content,
)
from ralph.phases.required_artifacts import (
build_missing_input_hint,
build_proof_failure_hint,
build_required_artifacts,
build_retry_hint,
resolve_phase_required_artifact,
resolve_required_artifact,
retry_hint_path,
)
from ralph.pipeline.effects import Effect, InvokeAgentEffect, PreparePromptEffect
from ralph.pipeline.events import Event, PipelineEvent
from ralph.pipeline.work_units import WorkUnitsValidationError, parse_work_units_from_artifact
from ralph.policy.validation import PolicyValidationError, validate_work_units_against_policy
if TYPE_CHECKING:
from ralph.phases import PhaseContext
from ralph.phases.required_artifacts import RequiredArtifact
from ralph.policy.models import (
ArtifactProofPolicy,
ArtifactsPolicy,
PhaseDefinition,
PipelinePolicy,
)
[docs]
def handle_execution_phase(effect: Effect, ctx: PhaseContext) -> list[Event]:
"""Generic handler for any phase with role='execution'.
Args:
effect: The effect that triggered this phase.
ctx: Phase context with workspace and policy.
Returns:
List of events to emit.
"""
if isinstance(effect, PreparePromptEffect):
phase = effect.phase
phase_def = ctx.pipeline_policy.phases.get(phase)
drain = phase_def.drain if phase_def is not None else phase
ra = resolve_phase_required_artifact(
ctx.pipeline_policy, ctx.artifacts_policy, phase=phase, drain=drain
)
if ra is not None and ra.artifact_type == "plan":
_clear_stale_plan_draft_if_needed(ctx)
logger.info("Execution phase '{}': preparing prompt", phase)
return [PipelineEvent.PROMPT_PREPARED]
if not isinstance(effect, InvokeAgentEffect):
return []
phase = effect.phase
logger.info("Execution phase '{}': validating output artifact after agent run", phase)
phase_def = ctx.pipeline_policy.phases.get(phase)
drain = phase_def.drain if phase_def is not None else phase
ra = resolve_phase_required_artifact(
ctx.pipeline_policy, ctx.artifacts_policy, phase=phase, drain=drain
)
events: list[Event] | None = None
if ra is not None and ra.artifact_type == "plan":
events = _validate_plan_output(effect, ctx, ra, phase_def)
elif ra is not None and ra.artifact_type == "development_result":
plan_result = _validate_plan_input(effect, ctx)
if plan_result is not None:
events = plan_result if plan_result else [PipelineEvent.AGENT_SUCCESS]
if events is None and ra is not None:
failure = _validate_output_artifact(effect, ctx, ra)
if failure is not None:
events = failure
elif (
ra.artifact_type == "development_result"
and phase_def is not None
and phase_def.artifact_proof_policy is not None
):
proof_failure = _validate_development_result_proof(
ctx, phase, phase_def.artifact_proof_policy, ra
)
if proof_failure is not None:
events = proof_failure
if events is None:
events = [PipelineEvent.AGENT_SUCCESS]
return events
def _clear_stale_plan_draft_if_needed(ctx: PhaseContext) -> None:
if not ctx.workspace.exists(PLAN_DRAFT_PATH):
return
if not ctx.workspace.exists(PLAN_ARTIFACT_PATH):
return
artifact_dir = Path(ctx.workspace.absolute_path(".agent/artifacts"))
draft = load_plan_draft(artifact_dir)
if draft is None:
return
updated_at = draft.get("updated_at")
if not isinstance(updated_at, str):
return
try:
draft_updated_at = datetime.fromisoformat(updated_at).timestamp()
except ValueError:
return
plan_mtime = Path(ctx.workspace.absolute_path(PLAN_ARTIFACT_PATH)).stat().st_mtime
if draft_updated_at <= plan_mtime:
logger.info("Clearing stale plan draft at {}", PLAN_DRAFT_PATH)
ctx.workspace.remove(PLAN_DRAFT_PATH)
def _validate_plan_output(
effect: InvokeAgentEffect,
ctx: PhaseContext,
ra: RequiredArtifact,
phase_def: PhaseDefinition | None,
) -> list[Event]:
"""Validate the plan artifact produced by a planning-type phase."""
phase = effect.phase
if not ctx.workspace.exists(ra.json_path):
detail = (
f"Missing required plan artifact at {ra.json_path}; "
"the agent must submit plan before declaring completion"
)
logger.warning("Planning agent completed without producing {}", ra.json_path)
_write_retry_hint(ctx, phase, detail)
return [artifact_validation_failure_event(phase=phase, reason=detail)]
try:
artifact_wrapper = load_phase_artifact(ctx.workspace, ra.json_path)
raw_content = unwrap_phase_artifact_content(
artifact_wrapper, expected_type=ra.artifact_type
)
if is_noop_plan(raw_content):
logger.info("Planning produced a no-op plan — skipping development iteration")
return [PipelineEvent.AGENT_SUCCESS]
artifact = normalize_plan_artifact_content(raw_content)
parsed = parse_work_units_from_artifact(artifact)
if parsed is not None:
successor = _transitions_on_success(phase_def)
validate_work_units_against_policy(
parsed, ctx.pipeline_policy, phase=successor or phase
)
except (
json.JSONDecodeError,
PlanArtifactValidationError,
ValueError,
WorkUnitsValidationError,
PolicyValidationError,
) as exc:
logger.warning("Invalid plan artifact: {}", exc)
_write_retry_hint(ctx, phase, str(exc))
return [
artifact_validation_failure_event(
phase=phase,
reason=f"Invalid plan artifact: {exc}",
)
]
return [PipelineEvent.AGENT_SUCCESS]
def _validate_plan_input(effect: InvokeAgentEffect, ctx: PhaseContext) -> list[Event] | None:
"""Validate the plan INPUT for development-type phases.
Returns a list of failure events on error, an empty list to signal noop
(caller should return AGENT_SUCCESS immediately), or None on success.
"""
phase = effect.phase
if not ctx.workspace.exists(PLAN_ARTIFACT_PATH):
upstream = _find_plan_producing_phase(ctx.pipeline_policy, ctx.artifacts_policy)
detail = f"Missing planning artifact at {PLAN_ARTIFACT_PATH}"
hint = build_missing_input_hint(phase, upstream, PLAN_ARTIFACT_PATH)
with suppress(Exception):
ctx.workspace.write(retry_hint_path(phase), hint)
return [artifact_validation_failure_event(phase=phase, reason=detail)]
try:
artifact_wrapper = load_phase_artifact(ctx.workspace, PLAN_ARTIFACT_PATH)
artifact_content = unwrap_phase_artifact_content(artifact_wrapper, expected_type="plan")
if is_noop_plan(artifact_content):
return []
artifact = (
artifact_content
if _is_legacy_work_units_payload(artifact_content)
else normalize_plan_artifact_content(artifact_content)
)
parsed = parse_work_units_from_artifact(artifact)
if parsed is not None:
validate_work_units_against_policy(parsed, ctx.pipeline_policy, phase=phase)
except (
json.JSONDecodeError,
PlanArtifactValidationError,
PhaseArtifactError,
ValueError,
WorkUnitsValidationError,
PolicyValidationError,
) as exc:
logger.warning("Invalid development phase evidence: {}", exc)
_write_retry_hint(ctx, phase, str(exc))
return [
artifact_validation_failure_event(
phase=phase,
reason=f"Invalid development evidence: {exc}",
)
]
return None
def _validate_output_artifact(
effect: InvokeAgentEffect, ctx: PhaseContext, ra: RequiredArtifact
) -> list[Event] | None:
"""Validate the output artifact contract. Returns failure events if invalid, else None.
When ra.artifact_required is False and the artifact is absent, returns None
(treat as success). A present optional artifact is still parsed and type-checked.
"""
phase = effect.phase
if not ctx.workspace.exists(ra.json_path):
if not ra.artifact_required:
logger.debug(
"Execution phase '{}': optional artifact at {} absent — treating as success",
phase,
ra.json_path,
)
return None
detail = (
f"Missing required artifact at {ra.json_path}; "
f"the agent must submit {ra.artifact_type} before declaring completion"
)
logger.warning("Execution phase '{}' missing required artifact at {}", phase, ra.json_path)
_write_retry_hint(ctx, phase, detail)
return [artifact_validation_failure_event(phase=phase, reason=detail)]
try:
artifact_wrapper = load_phase_artifact(ctx.workspace, ra.json_path)
content = unwrap_phase_artifact_content(artifact_wrapper, expected_type=ra.artifact_type)
if ra.normalizer is not None:
ra.normalizer(content)
except (json.JSONDecodeError, PhaseArtifactError, ValueError) as exc:
detail = str(exc)
logger.warning(
"Invalid {} artifact in execution phase '{}': {}", ra.artifact_type, phase, detail
)
_write_retry_hint(ctx, phase, detail)
return [
artifact_validation_failure_event(
phase=phase,
reason=f"Invalid {ra.artifact_type} artifact: {detail}",
)
]
return None
def _write_retry_hint(ctx: PhaseContext, phase: str, detail: str) -> None:
hint_path = retry_hint_path(phase)
try:
registry = build_required_artifacts(ctx.artifacts_policy)
except AttributeError:
registry = None
hint = build_retry_hint(
phase,
detail,
registry=registry,
)
with suppress(Exception):
ctx.workspace.write(hint_path, hint)
def _write_proof_failure_hint(ctx: PhaseContext, phase: str, detail: str) -> None:
hint_path = retry_hint_path(phase)
hint = build_proof_failure_hint(phase, detail)
with suppress(Exception):
ctx.workspace.write(hint_path, hint)
def _step_proof_errors(required_refs: frozenset[str], submitted_list: list[str]) -> list[str]:
errors: list[str] = []
submitted_set = frozenset(submitted_list)
if len(submitted_set) < len(submitted_list):
errors.append("PROOF INVALID: Duplicate plan_item entries found in plan_items_proven.")
missing = required_refs - submitted_set
extra = submitted_set - required_refs
if missing:
errors.append(
"PROOF INCOMPLETE: The following plan step(s) have no proof entry: "
f'{sorted(missing)}. Each plan_item must exactly match "Step N: <title>".'
)
if extra:
errors.append(
"PROOF INVALID: Unknown plan_item reference(s) not matching any plan step: "
f"{sorted(extra)}."
)
return errors
def _work_unit_proof_errors(required_refs: frozenset[str], submitted_list: list[str]) -> list[str]:
errors: list[str] = []
submitted_set = frozenset(submitted_list)
if len(submitted_set) < len(submitted_list):
errors.append("PROOF INVALID: Duplicate plan_item entries found in plan_items_proven.")
if not submitted_set:
errors.append(
"PROOF INCOMPLETE: plan_items_proven is empty. The agent must prove at least "
"one work unit. Each plan_item must exactly match a work_unit unit_id from the plan."
)
extra = submitted_set - required_refs
if extra:
errors.append(
"PROOF INVALID: Unknown plan_item reference(s) not matching any work_unit unit_id: "
f"{sorted(extra)}. Valid unit_ids: {sorted(required_refs)}."
)
return errors
def _normalize_analysis_proof_ref(value: str) -> str:
return re.sub(r"\s+", " ", re.sub(r"[^\w]+", " ", value.casefold())).strip()
def _match_analysis_proof_ref(required_refs: frozenset[str], submitted_ref: str) -> str | None:
normalized_submitted = _normalize_analysis_proof_ref(submitted_ref)
if not normalized_submitted:
return None
exact_matches = [
ref for ref in required_refs if _normalize_analysis_proof_ref(ref) == normalized_submitted
]
if len(exact_matches) == 1:
return exact_matches[0]
if len(exact_matches) > 1:
return None
scored_matches = sorted(
(
(
SequenceMatcher(
None,
normalized_submitted,
_normalize_analysis_proof_ref(required_ref),
).ratio(),
required_ref,
)
for required_ref in required_refs
),
reverse=True,
)
if not scored_matches:
return None
best_score, best_match = scored_matches[0]
second_best_score = scored_matches[1][0] if len(scored_matches) > 1 else 0.0
if best_score < 0.88 or best_score - second_best_score < 0.03:
return None
return best_match
def _analysis_proof_errors(required_refs: frozenset[str], submitted_list: list[str]) -> list[str]:
errors: list[str] = []
matched_refs: list[str] = []
unmatched_refs: list[str] = []
for submitted_ref in submitted_list:
matched_ref = _match_analysis_proof_ref(required_refs, submitted_ref)
if matched_ref is None:
unmatched_refs.append(submitted_ref)
continue
matched_refs.append(matched_ref)
if len(set(matched_refs)) < len(matched_refs):
errors.append(
"PROOF INVALID: Duplicate how_to_fix_item entries found in analysis_items_addressed."
)
missing = required_refs - frozenset(matched_refs)
if missing:
errors.append(
"PROOF INCOMPLETE: The following how_to_fix item(s) have no proof entry: "
f"{sorted(missing)}. Each how_to_fix_item must exactly match the prior analysis text."
)
if unmatched_refs:
errors.append(
"PROOF INVALID: Unknown how_to_fix_item reference(s) not matching any prior "
f"analysis item: {sorted(frozenset(unmatched_refs))}."
)
return errors
def _plan_proof_errors(ctx: PhaseContext, dev_result: DevelopmentResult) -> list[str]:
step_refs = _get_canonical_step_refs(ctx)
if step_refs:
return _step_proof_errors(step_refs, [p.plan_item for p in dev_result.plan_items_proven])
work_unit_ids = _get_canonical_work_unit_ids(ctx)
if work_unit_ids:
return _work_unit_proof_errors(
work_unit_ids, [p.plan_item for p in dev_result.plan_items_proven]
)
return []
def _get_canonical_step_refs(ctx: PhaseContext) -> frozenset[str]:
refs: set[str] = set()
try:
if ctx.workspace.exists(PLAN_ARTIFACT_PATH):
artifact_wrapper = load_phase_artifact(ctx.workspace, PLAN_ARTIFACT_PATH)
content = unwrap_phase_artifact_content(artifact_wrapper, expected_type="plan")
if not is_noop_plan(content):
steps = content.get("steps")
if isinstance(steps, list) and steps:
for step in steps:
if not isinstance(step, dict):
return frozenset()
if "number" not in step or "title" not in step:
return frozenset()
refs.add(f"Step {step['number']}: {step['title']}")
except Exception:
return frozenset()
return frozenset(refs)
def _get_canonical_work_unit_ids(ctx: PhaseContext) -> frozenset[str]:
try:
if not ctx.workspace.exists(PLAN_ARTIFACT_PATH):
return frozenset()
artifact_wrapper = load_phase_artifact(ctx.workspace, PLAN_ARTIFACT_PATH)
content = unwrap_phase_artifact_content(artifact_wrapper, expected_type="plan")
if is_noop_plan(content):
return frozenset()
parsed = parse_work_units_from_artifact(content)
if parsed is None or not parsed.work_units:
return frozenset()
return frozenset(unit.unit_id for unit in parsed.work_units)
except Exception:
return frozenset()
def _get_canonical_analysis_how_to_fix_refs(ctx: PhaseContext, phase: str) -> frozenset[str]:
try:
for phase_def in ctx.pipeline_policy.phases.values():
if phase_def.role != "analysis" or phase_def.transitions.on_loopback != phase:
continue
ra = resolve_required_artifact(ctx.artifacts_policy, drain=phase_def.drain)
if ra is None or not ctx.workspace.exists(ra.json_path):
return frozenset()
artifact_wrapper = load_phase_artifact(ctx.workspace, ra.json_path)
content = unwrap_phase_artifact_content(
artifact_wrapper, expected_type=ra.artifact_type
)
how_to_fix = content.get("how_to_fix")
if not isinstance(how_to_fix, list):
return frozenset()
return frozenset(str(item) for item in how_to_fix if item)
return frozenset()
except Exception:
return frozenset()
def _validate_development_result_proof(
ctx: PhaseContext,
phase: str,
proof_policy: ArtifactProofPolicy,
ra: RequiredArtifact,
) -> list[Event] | None:
try:
artifact_wrapper = load_phase_artifact(ctx.workspace, ra.json_path)
raw_content = unwrap_phase_artifact_content(
artifact_wrapper, expected_type=ra.artifact_type
)
dev_result = DevelopmentResult.model_validate(raw_content)
except Exception:
return None
errors: list[str] = []
if proof_policy.require_plan_proof:
errors.extend(_plan_proof_errors(ctx, dev_result))
if proof_policy.require_analysis_proof:
required_refs = _get_canonical_analysis_how_to_fix_refs(ctx, phase)
if required_refs:
errors.extend(
_analysis_proof_errors(
required_refs,
[a.how_to_fix_item for a in dev_result.analysis_items_addressed],
)
)
if not errors:
return None
detail = "\n".join(errors)
_write_proof_failure_hint(ctx, phase, detail)
return [artifact_validation_failure_event(phase=phase, reason=detail)]
def _is_legacy_work_units_payload(content: dict[str, object]) -> bool:
return "work_units" in content and "summary" not in content
def _transitions_on_success(phase_def: PhaseDefinition | None) -> str | None:
if phase_def is None:
return None
return phase_def.transitions.on_success
def _find_plan_producing_phase(
pipeline_policy: PipelinePolicy, artifacts_policy: ArtifactsPolicy
) -> str:
"""Find the phase name that produces plan artifacts."""
for contract in artifacts_policy.artifacts.values():
if contract.artifact_type == "plan":
for phase_name, phase_def in pipeline_policy.phases.items():
if phase_def.drain == contract.drain:
return phase_name
return pipeline_policy.entry_phase