Source code for ralph.phases.analysis

"""Shared analysis logic for parsing analysis decisions.

The analysis phase reads a typed artifact submitted by the agent via MCP
and extracts the decision field to route the pipeline.

Decision routing is driven entirely by policy: the phase's decisions table
in pipeline.toml maps raw status strings (from the agent artifact) to
PhaseDecisionRoute targets. The reducer routes via decisions[status].target
directly, so the raw status string is passed through as-is.

Decisions are raw strings from the agent artifact mapped to routes in policy.
The BaseModel AnalysisDecision in ralph.mcp.artifacts.typed_artifacts is the
artifact schema; routing uses the raw status string, not a StrEnum.
"""

from __future__ import annotations

import json
from contextlib import suppress
from typing import TYPE_CHECKING

from loguru import logger

from ralph.mcp.artifacts.plan import PLAN_ARTIFACT_PATH, PlanArtifactValidationError, is_noop_plan
from ralph.phases.artifacts import (
    PhaseArtifactError,
    artifact_validation_failure_event,
    decision_vocabulary_for_drain,
    load_phase_artifact,
    unwrap_phase_artifact_content,
)
from ralph.phases.required_artifacts import (
    build_required_artifacts,
    build_retry_hint,
    resolve_required_artifact,
    retry_hint_path,
)
from ralph.pipeline.effects import Effect, InvokeAgentEffect, PreparePromptEffect
from ralph.pipeline.events import AnalysisDecisionEvent, Event, PipelineEvent

if TYPE_CHECKING:
    from ralph.phases import PhaseContext


[docs] def parse_analysis_decision_status( ctx: PhaseContext, drain_name: str, *, phase_name: str | None = None, ) -> str | None: """Parse the raw decision status string from the MCP artifact. Reads the artifact file from the workspace and extracts the status field. The raw status string is returned directly — the reducer looks up the target in ``phase_def.decisions[status].target``. The ``drain_name`` is used to locate the artifact and vocabulary; the ``phase_name`` (defaults to ``drain_name`` when omitted) is used to look up the phase's decisions table in pipeline policy. Args: ctx: Phase context with workspace and pipeline_policy. drain_name: Name of the drain (used for artifact path and vocabulary lookup). phase_name: Name of the phase in pipeline policy (defaults to drain_name). Returns: Raw status string, or None if parsing fails (caller should emit PhaseFailureEvent). """ policy_phase_name = phase_name if phase_name is not None else drain_name artifact_type = f"{drain_name}_decision" ra = resolve_required_artifact(ctx.artifacts_policy, drain=drain_name) artifact_path = ra.json_path if ra is not None else f".agent/artifacts/{artifact_type}.json" if not ctx.workspace.exists(artifact_path): logger.warning( "No analysis artifact found at {}. Cannot determine decision status.", artifact_path, ) return None try: artifact = load_phase_artifact(ctx.workspace, artifact_path) content = unwrap_phase_artifact_content(artifact, expected_type=artifact_type) status = str(content.get("status", "")).lower() vocabulary = decision_vocabulary_for_drain(ctx.artifacts_policy, drain_name, artifact_type) if vocabulary and status not in vocabulary: logger.warning( "Analysis artifact at {} used status '{}' outside allowed vocabulary {}.", artifact_path, status, vocabulary, ) return None # Validate that the status exists in the policy decisions table. phase_def = ctx.pipeline_policy.phases.get(policy_phase_name) if phase_def is not None and phase_def.decisions and status not in phase_def.decisions: logger.warning( "Phase '{}' has no policy route for status '{}'. " "Add it to phases.{}.decisions or update the artifact decision_vocabulary.", policy_phase_name, status, policy_phase_name, ) return None return status except Exception as exc: logger.warning( "Failed to parse analysis artifact at {}: {}.", artifact_path, exc, ) return None
def _has_noop_plan(ctx: PhaseContext) -> bool: """Return True if a noop plan artifact is present in the workspace. Used to short-circuit analysis phases when the upstream execution produced no meaningful work (e.g. a planning artifact that declares skip=True). Only fires when the plan artifact exists and is parseable as a noop. """ if not ctx.workspace.exists(PLAN_ARTIFACT_PATH): return False with suppress( json.JSONDecodeError, PlanArtifactValidationError, PhaseArtifactError, ValueError, Exception, ): wrapper = load_phase_artifact(ctx.workspace, PLAN_ARTIFACT_PATH) raw = unwrap_phase_artifact_content(wrapper, expected_type="plan") return is_noop_plan(raw) return False
[docs] def handle_generic_analysis_phase(effect: Effect, ctx: PhaseContext) -> list[Event]: """Generic handler for analysis-role phases registered via register_role_handlers. Used for policy-declared analysis phases whose names are not the canonical ``development_analysis`` or ``review_analysis``. The handler: - Uses ``effect.phase`` as the pipeline policy phase name. - Uses ``effect.drain`` (if set) or ``effect.phase`` as the drain name for artifact path and vocabulary lookup. - Emits ``AnalysisDecisionEvent`` with the raw decision string, letting the reducer route directly through ``phase_def.decisions[status].target``. Args: effect: The effect that triggered this phase. ctx: Phase context with workspace and policy. Returns: List of events to emit. """ if isinstance(effect, PreparePromptEffect): return [PipelineEvent.PROMPT_PREPARED] if isinstance(effect, InvokeAgentEffect): phase_name = str(effect.phase) drain_name = effect.drain if effect.drain is not None else phase_name # Short-circuit when the upstream execution was a noop. No analysis # decision artifact will have been produced for a noop plan. if _has_noop_plan(ctx): logger.info("Analysis phase '{}': plan is a no-op — skipping analysis", phase_name) return [AnalysisDecisionEvent(phase=phase_name, decision="completed")] try: registry = build_required_artifacts(ctx.artifacts_policy) except AttributeError: registry = None ra = resolve_required_artifact(ctx.artifacts_policy, drain=drain_name) artifact_path = ( ra.json_path if ra is not None else f".agent/artifacts/{drain_name}_decision.json" ) if not ctx.workspace.exists(artifact_path): detail = ( f"Missing required analysis artifact at {artifact_path}; " f"the agent must submit {drain_name}_decision before declaring completion" ) logger.warning( "Analysis phase '{}' completed without required artifact at {}", phase_name, artifact_path, ) with suppress(Exception): ctx.workspace.write( retry_hint_path(phase_name), build_retry_hint(phase_name, detail, registry=registry), ) return [artifact_validation_failure_event(phase=phase_name, reason=detail)] status = parse_analysis_decision_status(ctx, drain_name, phase_name=phase_name) if status is None: # parse_analysis_decision_status already logged the warning return [ artifact_validation_failure_event( phase=phase_name, reason=f"Unroutable analysis decision for phase '{phase_name}'", ) ] logger.info("Analysis phase '{}' decision: {}", phase_name, status) return [AnalysisDecisionEvent(phase=phase_name, decision=status)] return []