Source code for ralph.pipeline.phase_entry_cleaner

"""Phase-entry drain clearing for fresh phase entries.

This module handles the clearing of drain artifact files (JSON + Markdown handoff)
when a pipeline phase is genuinely entered fresh — on program start, cross-phase
transition, or last-commit re-entry — as opposed to same-phase retry or analysis
loopback where the existing context should be preserved.
"""

from __future__ import annotations

from typing import TYPE_CHECKING

from ralph.phases.required_artifacts import build_required_artifacts

if TYPE_CHECKING:
    from ralph.policy.models import ArtifactsPolicy, PipelinePolicy
    from ralph.workspace.protocol import Workspace


__all__ = [
    "clear_phase_entry_drains",
    "is_fresh_phase_entry",
]


[docs] def is_fresh_phase_entry( entering_phase: str, previous_phase: str | None, pipeline_policy: PipelinePolicy, ) -> bool: """Return True when entering a phase via genuine fresh entry. Fresh entry means program start, cross-phase transition, or last-commit re-entry. Clearing is suppressed for same-phase retry, analysis loopback, and checkpoint restore (resume). The resume case is handled by the caller via state-field check, not by this function. Args: entering_phase: The phase being entered. previous_phase: The phase that preceded this entry (from PreparePromptEffect). pipeline_policy: The active pipeline policy. Returns: True for genuine fresh entry; False for same-phase or analysis loopback. """ # Same-phase retry or on_loopback self-reference: preserve context if previous_phase == entering_phase: return False # Analysis loopback back into the execution phase: preserve context if previous_phase is not None: prev_def = pipeline_policy.phases.get(previous_phase) if prev_def is not None and prev_def.role == "analysis": loopback_target = prev_def.transitions.on_loopback if loopback_target == entering_phase: return False # All other cases are fresh entry: # - previous_phase is None (program start or last-commit→planning) # - previous_phase is an unrelated normal phase (cross-phase transition) # - previous_phase not in pipeline_policy.phases (unknown previous treated as fresh) return True
def _clear_drain( workspace: Workspace, drain_name: str, artifacts_policy: ArtifactsPolicy, ) -> None: """Clear the primary artifact files (JSON + Markdown) for a single drain. Uses the same path resolution as phase_output_artifact_paths in commit_executor.py. Args: workspace: The workspace to operate on. drain_name: The drain name to clear artifacts for. artifacts_policy: The active artifacts policy for path resolution. """ required_artifacts = build_required_artifacts(artifacts_policy) required_artifact = required_artifacts.get(drain_name) if required_artifact is None: return # Clear the primary JSON artifact if workspace.exists(required_artifact.json_path): workspace.remove(required_artifact.json_path) # Clear the Markdown handoff if one exists md_path = required_artifact.markdown_path if md_path is not None and workspace.exists(md_path): workspace.remove(md_path)
[docs] def clear_phase_entry_drains( workspace: Workspace, phase_name: str, previous_phase: str | None, pipeline_policy: PipelinePolicy, artifacts_policy: ArtifactsPolicy, ) -> None: """Clear declared drain artifacts on genuine fresh phase entry. Clears the primary JSON and Markdown handoff for each drain listed in the phase's clear_drains_on_fresh_entry field, but only when is_fresh_phase_entry returns True. Args: workspace: The workspace to operate on. phase_name: The phase being entered. previous_phase: The previous phase from PreparePromptEffect. pipeline_policy: The active pipeline policy. artifacts_policy: The active artifacts policy. """ phase_def = pipeline_policy.phases.get(phase_name) if phase_def is None: return if not is_fresh_phase_entry(phase_name, previous_phase, pipeline_policy): return for drain in phase_def.clear_drains_on_fresh_entry: _clear_drain(workspace, drain, artifacts_policy)