Source code for ralph.pipeline.prompt_prep

"""Prompt materialization helpers for the pipeline runner."""

from __future__ import annotations

import os
from pathlib import Path
from typing import TYPE_CHECKING

from ralph.mcp.protocol.capability_mapping import DrainClass, SessionDrain
from ralph.mcp.protocol.env import WORKER_NAMESPACE_ENV
from ralph.phases.required_artifacts import resolve_phase_required_artifact
from ralph.pipeline.effect_router import agents_for_phase
from ralph.pipeline.effects import InvokeAgentEffect, PreparePromptEffect
from ralph.pipeline.handoffs import resolve_phase_drain
from ralph.prompts.materialize import (
    collect_media_entries_for_phase,
    materialize_prompt_for_phase,
    tool_name_prefix_for_transport,
)
from ralph.prompts.types import SessionCapabilities
from ralph.workspace import FsWorkspace

if TYPE_CHECKING:
    from collections.abc import Mapping
    from typing import Protocol

    from ralph.config.models import AgentConfig, UnifiedConfig
    from ralph.pipeline.effects import Effect
    from ralph.pipeline.state import PipelineState
    from ralph.policy.models import AgentsPolicy, ArtifactsPolicy, PipelinePolicy, PolicyBundle
    from ralph.prompts.materialize import PromptPhaseContext, PromptPhaseOptions
    from ralph.workspace.scope import WorkspaceScope

    class _RegistryLike(Protocol):
        def get(self, name: str) -> AgentConfig | None: ...

    class _MaterializePromptFn(Protocol):
        def __call__(
            self,
            context: PromptPhaseContext | None = ...,
            options: PromptPhaseOptions | None = ...,
            **kwargs: object,
        ) -> str: ...


def _session_drain_for_drain_class(drain_class: DrainClass) -> SessionDrain:
    return {
        DrainClass.PLANNING: SessionDrain.PLANNING,
        DrainClass.DEVELOPMENT: SessionDrain.DEVELOPMENT,
        DrainClass.ANALYSIS: SessionDrain.ANALYSIS,
        DrainClass.REVIEW: SessionDrain.REVIEW,
        DrainClass.FIX: SessionDrain.FIX,
        DrainClass.COMMIT: SessionDrain.COMMIT,
    }[drain_class]


def _fallback_prompt_session_drain_for_role(role: str | None) -> SessionDrain:
    if role == "execution":
        return SessionDrain.DEVELOPMENT
    if role == "analysis":
        return SessionDrain.ANALYSIS
    if role == "review":
        return SessionDrain.REVIEW
    if role == "commit":
        return SessionDrain.COMMIT
    if role == "fix":
        return SessionDrain.FIX
    return SessionDrain.ANALYSIS


def _prompt_session_drain_for_phase(
    drain: str | None,
    *,
    phase: str | None = None,
    pipeline_policy: PipelinePolicy | None = None,
    agents_policy: AgentsPolicy | None = None,
) -> SessionDrain:
    """Return the prompt capability profile for a policy drain."""
    candidate_drains: list[str] = []
    if drain is not None:
        candidate_drains.append(drain)

    phase_def = None
    if phase is not None and pipeline_policy is not None and hasattr(pipeline_policy, "phases"):
        phase_def = pipeline_policy.phases.get(phase)
        phase_drain = phase_def.drain if phase_def is not None else None
        if phase_drain is not None and phase_drain not in candidate_drains:
            candidate_drains.append(phase_drain)

    for candidate in candidate_drains:
        try:
            return SessionDrain(candidate)
        except ValueError:
            if agents_policy is not None:
                drain_cfg = agents_policy.agent_drains.get(candidate)
                if drain_cfg is not None:
                    drain_class = drain_cfg.capability_class or drain_cfg.drain_class
                    if drain_class is not None:
                        return _session_drain_for_drain_class(DrainClass(drain_class))

    if phase_def is not None:
        return _fallback_prompt_session_drain_for_role(phase_def.role)

    return SessionDrain("cli")


[docs] def session_capabilities_for_agent_phase( drain: str | None, *, phase: str | None = None, pipeline_policy: PipelinePolicy | None = None, agent: AgentConfig | None = None, agents_policy: AgentsPolicy | None = None, ) -> SessionCapabilities: """Return prompt session capabilities with the effective transport tool prefix.""" tool_name_prefix = "" if agent is not None: tool_name_prefix = tool_name_prefix_for_transport(agent.transport) return SessionCapabilities.defaults_for_drain( _prompt_session_drain_for_phase( drain, phase=phase, pipeline_policy=pipeline_policy, agents_policy=agents_policy, ), tool_name_prefix=tool_name_prefix, )
def _prompt_changed_since_last_materialization(workspace_root: Path) -> bool: prompt_path = workspace_root / "PROMPT.md" current_prompt_path = workspace_root / ".agent" / "CURRENT_PROMPT.md" if not prompt_path.exists() or not current_prompt_path.exists(): return False try: return prompt_path.read_text(encoding="utf-8") != current_prompt_path.read_text( encoding="utf-8" ) except OSError: return False def _should_resume_existing_planning_phase_name( *, phase: str, drain: str | None, state: PipelineState | None, pipeline_policy: PipelinePolicy, artifacts_policy: ArtifactsPolicy, ) -> bool: if state is None or state.phase != phase: return False if state.previous_phase is not None or state.checkpoint_saved_count <= 0: return False effective_drain = drain or resolve_phase_drain(phase, pipeline_policy) or phase required_artifact = resolve_phase_required_artifact( pipeline_policy, artifacts_policy, phase=phase, drain=effective_drain, ) return bool(required_artifact is not None and required_artifact.artifact_type == "plan") def _should_resume_existing_planning_phase( *, effect: InvokeAgentEffect, state: PipelineState, policy_bundle: PolicyBundle, ) -> bool: return _should_resume_existing_planning_phase_name( phase=effect.phase, drain=effect.drain, state=state, pipeline_policy=policy_bundle.pipeline, artifacts_policy=policy_bundle.artifacts, ) def _materialize_prepared_prompt( effect: PreparePromptEffect, pipeline_policy: PipelinePolicy, artifacts_policy: ArtifactsPolicy, workspace_scope: WorkspaceScope, agents_policy: AgentsPolicy | None = None, state: PipelineState | None = None, env: Mapping[str, str] | None = None, materialize_fn: _MaterializePromptFn | None = None, *, registry: _RegistryLike | None = None, config: UnifiedConfig | None = None, ) -> None: env_map = os.environ if env is None else env workspace = FsWorkspace( workspace_scope.root, allowed_roots=workspace_scope.allowed_roots, ) worker_ns_str = env_map.get(WORKER_NAMESPACE_ENV) worker_namespace = Path(worker_ns_str) if worker_ns_str else None work_unit = None if worker_namespace is not None and state is not None and len(state.work_units) == 1: work_unit = state.work_units[0] phase_drain = effect.drain or resolve_phase_drain(effect.phase, pipeline_policy) or effect.phase agent = None if registry is not None and config is not None: agent_names = agents_for_phase( config, effect.phase, agents_policy=agents_policy, pipeline_policy=pipeline_policy, ) if agent_names: agent = registry.get(agent_names[0]) media_entries = collect_media_entries_for_phase(workspace, effect.phase) or None _mat = materialize_fn or materialize_prompt_for_phase _mat( phase=effect.phase, workspace=workspace, pipeline_policy=pipeline_policy, session_caps=session_capabilities_for_agent_phase( phase_drain, phase=effect.phase, pipeline_policy=pipeline_policy, agent=agent, agents_policy=agents_policy, ), workspace_root=workspace_scope.root, artifacts_policy=artifacts_policy, worker_namespace=worker_namespace, previous_phase=effect.previous_phase, work_unit=work_unit, resume_existing_phase=( not _prompt_changed_since_last_materialization(workspace_scope.root) and _should_resume_existing_planning_phase_name( phase=effect.phase, drain=effect.drain, state=state, pipeline_policy=pipeline_policy, artifacts_policy=artifacts_policy, ) ), multimodal_entries=media_entries, ) def _materialize_agent_prompt_if_needed( effect: Effect, state: PipelineState, workspace: FsWorkspace, policy_bundle: PolicyBundle, registry: _RegistryLike, *, materialize_fn: _MaterializePromptFn | None = None, ) -> None: if not isinstance(effect, InvokeAgentEffect): return agent = registry.get(effect.agent_name) media_entries = collect_media_entries_for_phase(workspace, effect.phase) or None _mat = materialize_fn or materialize_prompt_for_phase _mat( phase=effect.phase, workspace=workspace, pipeline_policy=policy_bundle.pipeline, session_caps=session_capabilities_for_agent_phase( effect.drain or resolve_phase_drain(effect.phase, policy_bundle.pipeline) or effect.phase, phase=effect.phase, pipeline_policy=policy_bundle.pipeline, agent=agent, agents_policy=policy_bundle.agents, ), workspace_root=workspace.root, artifacts_policy=policy_bundle.artifacts, previous_phase=state.previous_phase, resume_existing_phase=( not _prompt_changed_since_last_materialization(workspace.root) and _should_resume_existing_planning_phase( effect=effect, state=state, policy_bundle=policy_bundle, ) ), multimodal_entries=media_entries, ) materialize_prepared_prompt = _materialize_prepared_prompt materialize_agent_prompt_if_needed = _materialize_agent_prompt_if_needed prompt_session_drain_for_phase = _prompt_session_drain_for_phase