Source code for ralph.pipeline.runner

"""Pipeline runner: orchestration glue that wires extracted submodules together.

This module coordinates effect dispatch, step execution, and policy resolution.
Heavy lifting is delegated to focused submodules; runner.py owns only the
plumbing that connects them.
"""

from __future__ import annotations

from inspect import signature
from typing import TYPE_CHECKING, cast

from git import InvalidGitRepositoryError, Repo
from loguru import logger

from ralph.agents.invoke import AgentInvocationError, invoke_agent
from ralph.agents.registry import AgentRegistry
from ralph.agents.subprocess_executor import SubprocessAgentExecutor
from ralph.config.enums import Verbosity
from ralph.display.artifact_renderer import render_commit_message
from ralph.display.context import install_width_refresher, make_display_context
from ralph.display.phase_banner import show_phase_close_banner, show_phase_transition
from ralph.executor.process import run_process_async
from ralph.git.operations import create_commit, stage_all
from ralph.interrupt.asyncio_bridge import install_signal_handlers
from ralph.mcp.protocol.startup import heartbeat_policy_from_env
from ralph.mcp.server.factory_impl import DynamicBindingMcpServerFactory
from ralph.mcp.server.lifecycle import (
    check_mcp_bridge_health,
    shutdown_mcp_server,
    start_mcp_server,
)
from ralph.mcp.session_plan import build_session_mcp_plan
from ralph.phases import handle_phase, register_role_handlers
from ralph.pipeline import checkpoint as ckpt
from ralph.pipeline._runner_interrupt import (
    handle_keyboard_interrupt as _handle_keyboard_interrupt,
)
from ralph.pipeline._runner_mcp_validation import (
    default_probe_agent_transports as _default_probe_agent_transports,
)
from ralph.pipeline._runner_mcp_validation import (
    default_validate_mcp as _default_validate_mcp,
)
from ralph.pipeline._runner_mcp_validation import (
    run_custom_mcp_validation,
)
from ralph.pipeline._runner_session import (
    apply_session_capture as _apply_session_capture,
)
from ralph.pipeline._runner_session import (
    set_last_captured_session_id as _set_last_captured_session_id,
)
from ralph.pipeline._runner_state_helpers import (
    notify_pipeline_subscriber as _notify_pipeline_subscriber,
)
from ralph.pipeline._runner_state_helpers import (
    reset_phase_chain_for_recovery as _reset_phase_chain_for_recovery,
)
from ralph.pipeline.activity_stream import (
    MAX_METADATA_SUMMARY_LENGTH,
    MAX_TEXT_LENGTH,
    MAX_TOOL_RESULT_BRIEF,
    metadata_summary,
    record_activity_on_subscriber,
    render_agent_activity_line,
    terminal_width,
    truncate,
)
from ralph.pipeline.agent_execution_deps import AgentExecutionDeps
from ralph.pipeline.commit_executor import (
    cleanup_commit_message_artifacts,
    commit_effect,
    default_mcp_capabilities_for_phase,
    phase_output_artifact_paths,
    repo_has_commit_work,
)
from ralph.pipeline.commit_executor import (
    execute_commit_effect as _ee_execute_commit_effect,
)
from ralph.pipeline.cycle_baseline import (
    clear_cycle_baseline,
    read_cycle_baseline,
    write_cycle_baseline,
)
from ralph.pipeline.effect_executor import (
    execute_agent_effect as _ee_execute_agent_effect,
)
from ralph.pipeline.effect_router import (
    determine_effect_from_policy,
)
from ralph.pipeline.effects import (
    CommitEffect,
    EarlySkipCommitEffect,
    Effect,
    ExhaustedAnalysisPhaseAdvanceEffect,
    ExitFailureEffect,
    ExitSuccessEffect,
    FanOutEffect,
    InvokeAgentEffect,
    PreparePromptEffect,
    SaveCheckpointEffect,
)
from ralph.pipeline.events import Event, PhaseFailureEvent, PipelineEvent
from ralph.pipeline.fan_out import execute_fan_out_sync as _fan_out_execute_fan_out_sync
from ralph.pipeline.handoffs import resolve_exhausted_analysis_bypass, resolve_phase_drain
from ralph.pipeline.legacy_console_display import (
    LegacyConsoleDisplay,
    emit_display_line,
    resolve_display,
    status_text,
)
from ralph.pipeline.phase_agent_handler import (
    phase_event_after_agent_run,
)
from ralph.pipeline.phase_entry_cleaner import clear_phase_entry_drains
from ralph.pipeline.phase_transition import (
    PENDING_PHASE_TRANSITION_METADATA_ATTR,
    PendingPhaseTransitionMetadata,
    clear_phase_materialization_outputs,
    emit_final_summary,
    record_phase_transition_metadata,
    show_phase_start_with_context,
    skipped_exhausted_analysis_info,
)
from ralph.pipeline.phase_transition import (
    emit_phase_transition_if_changed as _pt_emit_phase_transition_if_changed,
)
from ralph.pipeline.prompt_prep import (
    _materialize_prepared_prompt as _materialize_prepared_prompt_impl,
)
from ralph.pipeline.prompt_prep import (
    materialize_agent_prompt_if_needed,
    prompt_session_drain_for_phase,
)
from ralph.pipeline.reducer import reduce as reducer_reduce
from ralph.pipeline.run_loop import run
from ralph.pipeline.state import CommitState, PipelineState
from ralph.pipeline.state_init import create_initial_state
from ralph.policy.loader import (
    load_policy_for_workspace_scope,
)
from ralph.policy.loader import (
    load_policy_or_die as _dir_load_policy_or_die,
)
from ralph.process.manager import process_phase_scope
from ralph.process.mcp_supervisor import McpSupervisor
from ralph.prompts.materialize import MissingPlanHandoffError, materialize_prompt_for_phase
from ralph.prompts.system_prompt import materialize_system_prompt
from ralph.recovery.classifier import FailureContext
from ralph.workspace import FsWorkspace
from ralph.workspace.scope import WorkspaceScope, resolve_workspace_scope

if TYPE_CHECKING:
    from collections.abc import Callable, Mapping
    from pathlib import Path
    from typing import Protocol

    from ralph.config.models import AgentConfig, UnifiedConfig
    from ralph.display.context import DisplayContext
    from ralph.display.parallel_display import ParallelDisplay
    from ralph.pipeline.agent_execution_deps import (
        _CheckMcpBridgeHealthFn,
        _McpSupervisorFactory,
        _ShutdownMcpServerFn,
        _StartMcpServerFn,
    )
    from ralph.policy.models import (
        AgentsPolicy,
        ArtifactsPolicy,
        PipelinePolicy,
        PolicyBundle,
    )
    from ralph.recovery.controller import RecoveryController


__all__ = [
    "MAX_METADATA_SUMMARY_LENGTH",
    "MAX_TEXT_LENGTH",
    "MAX_TOOL_RESULT_BRIEF",
    "PENDING_PHASE_TRANSITION_METADATA_ATTR",
    "AgentRegistry",
    "DynamicBindingMcpServerFactory",
    "McpSupervisor",
    "PendingPhaseTransitionMetadata",
    "SubprocessAgentExecutor",
    "available_width",
    "build_session_mcp_plan",
    "check_mcp_bridge_health",
    "clear_cycle_baseline",
    "commit_effect",
    "create_initial_state",
    "default_mcp_capabilities_for_phase",
    "emit_final_summary",
    "emit_phase_transition_if_changed",
    "execute_agent_effect",
    "execute_commit_effect",
    "handle_phase",
    "heartbeat_policy_from_env",
    "install_signal_handlers",
    "install_width_refresher",
    "make_display_context",
    "materialize_prompt_for_phase",
    "materialize_system_prompt",
    "metadata_summary",
    "phase_output_artifact_paths",
    "prompt_session_drain_for_phase",
    "record_activity_on_subscriber",
    "reducer_reduce",
    "register_role_handlers",
    "render_agent_activity_line",
    "render_commit_message",
    "repo_has_commit_work",
    "resolve_display",
    "resolve_workspace_scope",
    "run",
    "run_process_async",
    "show_phase_close_banner",
    "show_phase_transition",
    "shutdown_mcp_server",
    "skipped_exhausted_analysis_info",
    "start_mcp_server",
    "terminal_width",
    "truncate",
]


if TYPE_CHECKING:

    class _PipelineSubscriber(Protocol):
        def notify(self, state: PipelineState) -> None: ...

    class _RegistryLike(Protocol):
        def get(self, name: str) -> AgentConfig | None: ...

    class _AgentRegistryFactory(Protocol):
        @classmethod
        def from_config(cls, config: UnifiedConfig) -> _RegistryLike: ...

    class _ExecuteEffectKwargsFn(Protocol):
        def __call__(
            self,
            effect: Effect,
            config: UnifiedConfig,
            workspace_scope: WorkspaceScope,
            **kwargs: object,
        ) -> Event: ...

    class _ConnectivityMonitorLike(Protocol):
        @property
        def current_state(self) -> object: ...

        def add_listener(self, cb: Callable[[object], None]) -> Callable[[], None]: ...


_LEGACY_EXECUTE_EFFECT_ARITY = 3
_POLICY_LOADER_CONFIG_ARITY = 2

load_policy_or_die = _dir_load_policy_or_die

VALIDATE_MCP = _default_validate_mcp
PROBE_AGENT_TRANSPORTS = _default_probe_agent_transports
_VALIDATE_MCP = _default_validate_mcp
_PROBE_AGENT_TRANSPORTS = _default_probe_agent_transports


def _validate_custom_mcp_servers(workspace_root: Path) -> int:
    effective_validate = (
        VALIDATE_MCP if VALIDATE_MCP is not _default_validate_mcp else _VALIDATE_MCP
    )
    effective_probe = (
        PROBE_AGENT_TRANSPORTS
        if PROBE_AGENT_TRANSPORTS is not _default_probe_agent_transports
        else _PROBE_AGENT_TRANSPORTS
    )
    return run_custom_mcp_validation(workspace_root, effective_validate, effective_probe)


validate_custom_mcp_servers = _validate_custom_mcp_servers


def _execute_effect(
    effect: Effect,
    config: UnifiedConfig,
    workspace_scope: WorkspaceScope,
    *,
    display: ParallelDisplay | LegacyConsoleDisplay | None = None,
    verbosity: Verbosity = Verbosity.VERBOSE,
    state: PipelineState | None = None,
    policy_bundle: PolicyBundle | None = None,
) -> PipelineEvent:
    deps = AgentExecutionDeps(
        invoke_agent=invoke_agent,
        agent_invocation_error=AgentInvocationError,
        agent_registry=AgentRegistry,
        show_phase_start_cb=show_phase_start_with_context,
        set_session_id_cb=_set_last_captured_session_id,
    )
    if isinstance(effect, InvokeAgentEffect):
        return execute_agent_effect(
            effect,
            config,
            deps,
            workspace_scope,
            display=display,
            verbosity=verbosity,
            state=state,
            policy_bundle=policy_bundle,
        )
    if isinstance(effect, CommitEffect):
        return execute_commit_effect(
            effect, create_commit, stage_all, workspace_scope.root, display, verbosity=verbosity
        )
    if isinstance(effect, EarlySkipCommitEffect):
        logger.info("Skipping commit early: worktree is clean")
        _cleanup_commit_message_artifacts(workspace_scope.root)
        return PipelineEvent.COMMIT_SKIPPED
    if isinstance(effect, ExhaustedAnalysisPhaseAdvanceEffect):
        if state is not None and policy_bundle is not None:
            bypass = resolve_exhausted_analysis_bypass(state, effect.phase, policy_bundle.pipeline)
            logger.info(
                "Skipping exhausted analysis phase '{}' and reducing PHASE_ADVANCE to '{}'",
                effect.phase,
                bypass.target_phase,
            )
        else:
            logger.warning(
                "Skipping exhausted analysis phase '{}' without routing context", effect.phase
            )
        return PipelineEvent.PHASE_ADVANCE
    if isinstance(effect, SaveCheckpointEffect):
        return PipelineEvent.CHECKPOINT_SAVED

    logger.warning("Unknown effect type: {}", type(effect))
    return PipelineEvent.AGENT_FAILURE


def _execute_effect_with_optional_display(
    effect: Effect,
    config: UnifiedConfig,
    workspace_scope: WorkspaceScope,
    *,
    display: ParallelDisplay | LegacyConsoleDisplay | None = None,
    display_context: DisplayContext | None = None,
    verbosity: Verbosity = Verbosity.VERBOSE,
    state: PipelineState | None = None,
    policy_bundle: PolicyBundle | None = None,
) -> Event:
    fn = execute_effect
    params = signature(fn).parameters
    accepts_kwargs = any(p.kind == p.VAR_KEYWORD for p in params.values())
    all_opts: dict[str, object] = {
        "display": display,
        "display_context": display_context,
        "verbosity": verbosity,
        "state": state,
        "policy_bundle": policy_bundle,
    }
    supported = all_opts if accepts_kwargs else {k: v for k, v in all_opts.items() if k in params}
    return cast("_ExecuteEffectKwargsFn", fn)(effect, config, workspace_scope, **supported)


def execute_effect_with_optional_display(
    effect: Effect,
    config: UnifiedConfig,
    workspace_scope: WorkspaceScope,
    *,
    display: ParallelDisplay | LegacyConsoleDisplay | None = None,
    display_context: DisplayContext | None = None,
    verbosity: Verbosity = Verbosity.VERBOSE,
    state: PipelineState | None = None,
    policy_bundle: PolicyBundle | None = None,
) -> Event:
    """Execute an effect and return the resulting event, optionally routing output to a display."""
    return _execute_effect_with_optional_display(
        effect,
        config,
        workspace_scope,
        display=display,
        display_context=display_context,
        verbosity=verbosity,
        state=state,
        policy_bundle=policy_bundle,
    )


def _invoke_execute_effect_with_optional_display(
    effect: Effect,
    config: UnifiedConfig,
    workspace_scope: WorkspaceScope,
    *,
    display: ParallelDisplay | LegacyConsoleDisplay | None,
    display_context: DisplayContext | None = None,
    verbosity: Verbosity,
    state: PipelineState,
    policy_bundle: PolicyBundle,
) -> Event:
    return execute_effect_with_optional_display(
        effect,
        config,
        workspace_scope,
        display=display,
        display_context=display_context,
        verbosity=verbosity,
        state=state,
        policy_bundle=policy_bundle,
    )


def _reduce_runtime_recovery(
    state: PipelineState,
    pipeline_policy: PipelinePolicy,
    *,
    reason: str,
    recovery: RecoveryController | None = None,
    exc: BaseException | None = None,
) -> tuple[PipelineState, list[Effect]]:
    if recovery is not None:
        raw_failure: BaseException | str = exc if exc is not None else reason
        new_state, effects, _ = recovery.handle(
            state,
            raw_failure,
            FailureContext(phase=state.phase, agent=state.current_agent()),
        )
        if state.work_units and not new_state.work_units:
            new_state = new_state.copy_with(work_units=state.work_units)
        return new_state, effects
    failure_event = PhaseFailureEvent(
        phase=state.phase,
        reason=reason,
        recoverable=True,
    )
    recovered_state, effects = reducer_reduce(state, failure_event, pipeline_policy, recovery=None)
    return recovered_state, effects


def _save_checkpoint_or_log(
    state: PipelineState,
    *,
    message: str,
) -> None:
    try:
        ckpt.save(state)
    except Exception as exc:
        logger.exception(message, phase=state.phase, err=exc)


def _maybe_clear_invoke_agent_entry_drains(
    effect: Effect,
    state: PipelineState,
    workspace: FsWorkspace,
    policy_bundle: PolicyBundle,
) -> None:
    if isinstance(effect, InvokeAgentEffect):
        is_resume = (
            state.phase == effect.phase
            and state.previous_phase is None
            and state.checkpoint_saved_count > 0
        )
        if not is_resume:
            clear_phase_entry_drains(
                workspace,
                str(effect.phase),
                state.previous_phase,
                policy_bundle.pipeline,
                policy_bundle.artifacts,
            )


def _run_pipeline_step(
    *,
    state: PipelineState,
    policy_bundle: PolicyBundle,
    workspace_scope: WorkspaceScope,
    config: UnifiedConfig,
    display: ParallelDisplay | LegacyConsoleDisplay,
    display_context: DisplayContext,
    verbosity: Verbosity,
    registry: _RegistryLike,
    pipeline_subscriber: _PipelineSubscriber | None,
    recovery_controller: RecoveryController | None = None,
    config_path: Path | None = None,
    cli_overrides: dict[str, object] | None = None,
    _monitor_stop_cb: Callable[[], None] | None = None,
) -> PipelineState | int:
    try:
        effect = call_determine_effect_from_policy(state, policy_bundle, workspace_scope, config)
        inline_result = handle_inline_effect(
            effect=effect,
            state=state,
            pipeline_policy=policy_bundle.pipeline,
            artifacts_policy=policy_bundle.artifacts,
            agents_policy=policy_bundle.agents,
            registry=registry,
            config=config,
            workspace_scope=workspace_scope,
            display=display,
            pipeline_subscriber=pipeline_subscriber,
        )
        if inline_result is not None:
            return inline_result

        if isinstance(effect, FanOutEffect):
            return execute_fan_out_sync(
                effect=effect,
                state=state,
                display=display,
                policy_bundle=policy_bundle,
                workspace_scope=workspace_scope,
                pipeline_subscriber=pipeline_subscriber,
                config=config,
                config_path=config_path,
                cli_overrides=cli_overrides,
                monitor_stop_cb=_monitor_stop_cb,
            )

        with process_phase_scope(state.phase):
            workspace = FsWorkspace(
                workspace_scope.root,
                allowed_roots=workspace_scope.allowed_roots,
            )
            _maybe_clear_invoke_agent_entry_drains(
                effect,
                state,
                workspace,
                policy_bundle,
            )
            _mat_fn = (
                materialize_prompt_for_phase
                if materialize_prompt_for_phase is not _original_materialize_prompt_for_phase
                else None
            )
            materialize_agent_prompt_if_needed(
                effect, state, workspace, policy_bundle, registry, materialize_fn=_mat_fn
            )
            event = invoke_execute_effect_with_optional_display(
                effect,
                config,
                workspace_scope,
                display=display,
                display_context=display_context,
                verbosity=verbosity,
                state=state,
                policy_bundle=policy_bundle,
            )
            if isinstance(effect, InvokeAgentEffect):
                state = _apply_session_capture(state)
            if isinstance(effect, InvokeAgentEffect) and event == PipelineEvent.AGENT_SUCCESS:
                if recovery_controller is not None:
                    recovery_controller.reset_backoff(effect.phase, effect.agent_name)
                _hp_fn = handle_phase if handle_phase is not _original_handle_phase else None
                event = phase_event_after_agent_run(
                    effect=effect,
                    config=config,
                    policy_bundle=policy_bundle,
                    workspace=workspace,
                    workspace_scope=workspace_scope,
                    display=display,
                    display_context=display_context,
                    verbosity=verbosity,
                    state=state,
                    handle_phase_fn=_hp_fn,
                )

        _commit_phase_def = policy_bundle.pipeline.phases.get(state.phase)
        if (
            isinstance(effect, CommitEffect)
            and _commit_phase_def is not None
            and _commit_phase_def.role == "commit"
            and event in (PipelineEvent.COMMIT_SUCCESS, PipelineEvent.COMMIT_SKIPPED)
        ):
            clear_cycle_baseline(workspace_scope.root)
        next_state, _ = reducer_reduce(
            state,
            event,
            policy_bundle.pipeline,
            recovery=recovery_controller,
        )
        skipped_phases = record_phase_transition_metadata(
            display,
            state,
            event,
            next_state,
            policy_bundle.pipeline,
        )
        for skipped_phase in skipped_phases:
            clear_phase_materialization_outputs(workspace, skipped_phase)
        _notify_pipeline_subscriber(pipeline_subscriber, next_state)
        _save_checkpoint_or_log(
            next_state,
            message=(
                "Checkpoint save failed in phase={phase}: {err} -- continuing without checkpoint"
            ),
        )
        return next_state
    except KeyboardInterrupt:
        raise
    except BaseException as exc:
        logger.exception(
            "Pipeline step crashed in phase={phase}: {err}",
            phase=state.phase,
            err=exc,
        )
        recovered_state, _recv_effects = _reduce_runtime_recovery(
            state,
            policy_bundle.pipeline,
            reason=f"Pipeline step crashed: {type(exc).__name__}: {exc}",
            recovery=recovery_controller,
            exc=exc,
        )
        for _eff in _recv_effects:
            if isinstance(_eff, ExitFailureEffect):
                emit_display_line(
                    display, None, status_text("Recovery exhausted", _eff.reason, "red")
                )
                return 1
        _notify_pipeline_subscriber(pipeline_subscriber, recovered_state)
        _save_checkpoint_or_log(
            recovered_state,
            message="Checkpoint save failed while recording recovery in phase={phase}: {err}",
        )
        return recovered_state


def _load_policy_bundle_for_run(
    workspace_scope: WorkspaceScope,
    config: UnifiedConfig,
) -> PolicyBundle:
    if load_policy_or_die is not _dir_load_policy_or_die:
        effective_policy_dir = workspace_scope.resolve_agent_file("pipeline.toml").parent
        loader = load_policy_or_die
        params = signature(loader).parameters
        if "config" in params:
            return loader(effective_policy_dir, config=config)

        positional = [
            param
            for param in params.values()
            if param.kind in (param.POSITIONAL_ONLY, param.POSITIONAL_OR_KEYWORD)
        ]
        if (
            any(param.kind == param.VAR_KEYWORD for param in params.values())
            or len(positional) >= _POLICY_LOADER_CONFIG_ARITY
        ):
            return loader(effective_policy_dir, config=config)
        return loader(effective_policy_dir)

    return load_policy_for_workspace_scope(workspace_scope, config=config)


def _handle_inline_effect(
    *,
    effect: Effect,
    state: PipelineState,
    pipeline_policy: PipelinePolicy,
    artifacts_policy: ArtifactsPolicy,
    workspace_scope: WorkspaceScope,
    agents_policy: AgentsPolicy | None = None,
    registry: _RegistryLike | None = None,
    config: UnifiedConfig | None = None,
    display: ParallelDisplay | LegacyConsoleDisplay | None = None,
    pipeline_subscriber: _PipelineSubscriber | None = None,
    dashboard_subscriber: _PipelineSubscriber | None = None,
) -> PipelineState | int | None:
    effective_subscriber = dashboard_subscriber or pipeline_subscriber

    if isinstance(effect, SaveCheckpointEffect):
        ckpt.save(state)
        new_state, _ = reducer_reduce(state, PipelineEvent.CHECKPOINT_SAVED, pipeline_policy)
        _notify_pipeline_subscriber(effective_subscriber, new_state)
        return new_state

    if isinstance(effect, PreparePromptEffect):
        if not effect.skip_materialization:
            # Phase-agnostic resume guard: suppress clearing when restoring a checkpoint
            is_resume = (
                state is not None
                and str(state.phase) == str(effect.phase)
                and state.previous_phase is None
                and state.checkpoint_saved_count > 0
            )
            if not is_resume:
                _entry_ws = FsWorkspace(
                    workspace_scope.root, allowed_roots=workspace_scope.allowed_roots
                )
                clear_phase_entry_drains(
                    _entry_ws,
                    str(effect.phase),
                    effect.previous_phase,
                    pipeline_policy,
                    artifacts_policy,
                )
            try:
                materialize_prepared_prompt(
                    effect,
                    pipeline_policy,
                    artifacts_policy,
                    workspace_scope,
                    agents_policy,
                    state=state,
                    registry=registry,
                    config=config,
                )
            except MissingPlanHandoffError as exc:
                if state.phase != pipeline_policy.recovery.failed_route:
                    raise
                logger.warning(
                    "Missing plan handoff for phase={phase}: {err}; re-routing to entry phase",
                    phase=effect.phase,
                    err=exc,
                )
                current_epoch = state.recovery_epoch if isinstance(state.recovery_epoch, int) else 0
                recovered_state = state.copy_with(
                    phase=pipeline_policy.entry_phase,
                    previous_phase=state.phase,
                    last_error=str(exc),
                    recovery_epoch=current_epoch + 1,
                )
                ckpt.save(recovered_state)
                _notify_pipeline_subscriber(effective_subscriber, recovered_state)
                return recovered_state
        prepared_state = state
        if state.phase == pipeline_policy.recovery.failed_route:
            prepared_state = _reset_phase_chain_for_recovery(state, effect.phase)
            target_phase_def = pipeline_policy.phases.get(effect.phase)
            if target_phase_def is not None and target_phase_def.role == "commit":
                prepared_state = prepared_state.copy_with(commit=CommitState())
            if target_phase_def is not None and target_phase_def.role == "execution":
                clear_cycle_baseline(workspace_scope.root)
                write_start_commit_if_absent(workspace_scope.root)
        updated_state = prepared_state.copy_with(
            phase=effect.phase,
            current_drain=effect.drain or resolve_phase_drain(effect.phase, pipeline_policy),
        )
        ckpt.save(updated_state)
        _notify_pipeline_subscriber(effective_subscriber, updated_state)
        return updated_state

    if isinstance(effect, ExitSuccessEffect):
        emit_display_line(display, None, "[green]Pipeline completed successfully.[/green]")
        return 0

    if isinstance(effect, ExitFailureEffect):
        emit_display_line(
            display,
            None,
            status_text("Recovery triggered", effect.reason, "yellow"),
        )
        current_epoch = state.recovery_epoch if isinstance(state.recovery_epoch, int) else 0
        recovered_state = state.copy_with(
            phase=pipeline_policy.recovery.failed_route,
            previous_phase=state.phase,
            last_error=effect.reason,
            recovery_epoch=current_epoch + 1,
        )
        ckpt.save(recovered_state)
        _notify_pipeline_subscriber(effective_subscriber, recovered_state)
        return recovered_state

    return None


def _call_determine_effect_from_policy(
    state: PipelineState,
    policy_bundle: PolicyBundle,
    workspace_scope: WorkspaceScope,
    config: UnifiedConfig,
) -> Effect:
    fn = determine_effect_from_policy
    params = signature(fn).parameters
    if "config" in params:
        return fn(state, policy_bundle, workspace_scope, config=config)

    positional = [
        param
        for param in params.values()
        if param.kind in (param.POSITIONAL_ONLY, param.POSITIONAL_OR_KEYWORD)
    ]
    if (
        any(param.kind == param.VAR_POSITIONAL for param in params.values())
        or len(positional) >= _LEGACY_EXECUTE_EFFECT_ARITY
    ):
        return fn(state, policy_bundle, workspace_scope)
    return fn(state, policy_bundle)



_original_start_mcp_server = start_mcp_server
_original_shutdown_mcp_server = shutdown_mcp_server
_original_check_mcp_bridge_health = check_mcp_bridge_health
_original_materialize_system_prompt = materialize_system_prompt
_original_mcp_supervisor = McpSupervisor
_original_heartbeat_policy_from_env = heartbeat_policy_from_env
_original_materialize_prompt_for_phase = materialize_prompt_for_phase
_original_handle_phase = handle_phase
_original_render_commit_message = render_commit_message
_original_show_phase_close_banner = show_phase_close_banner
_original_show_phase_transition = show_phase_transition
_cleanup_commit_message_artifacts = cleanup_commit_message_artifacts


def execute_fan_out_sync(
    *,
    effect: FanOutEffect,
    state: PipelineState,
    display: ParallelDisplay | LegacyConsoleDisplay,
    **opts: object,
) -> PipelineState:
    """Execute fan-out synchronously, forwarding current module globals as injectable overrides."""
    return _fan_out_execute_fan_out_sync(
        effect=effect,
        state=state,
        display=display,
        _install_signal_handlers=install_signal_handlers,
        _executor_cls=SubprocessAgentExecutor,
        _mcp_factory_cls=DynamicBindingMcpServerFactory,
        _run_process_async=run_process_async,
        _reducer_reduce=reducer_reduce,
        **opts,
    )


def materialize_prepared_prompt(
    effect: PreparePromptEffect,
    pipeline_policy: PipelinePolicy,
    artifacts_policy: ArtifactsPolicy,
    workspace_scope: WorkspaceScope,
    agents_policy: AgentsPolicy | None = None,
    state: PipelineState | None = None,
    env: Mapping[str, str] | None = None,
    *,
    registry: _RegistryLike | None = None,
    config: UnifiedConfig | None = None,
) -> None:
    """Delegate to _materialize_prepared_prompt, injecting the patchable prompt function."""
    _mat_fn = (
        materialize_prompt_for_phase
        if materialize_prompt_for_phase is not _original_materialize_prompt_for_phase
        else None
    )
    _materialize_prepared_prompt_impl(
        effect,
        pipeline_policy,
        artifacts_policy,
        workspace_scope,
        agents_policy=agents_policy,
        state=state,
        env=env,
        materialize_fn=_mat_fn,
        registry=registry,
        config=config,
    )


[docs] def available_width(prefix_len: int) -> int: """Return usable terminal width minus prefix and padding.""" return max(40, terminal_width() - prefix_len - 2)
[docs] def execute_agent_effect( effect: InvokeAgentEffect, config: UnifiedConfig, deps: AgentExecutionDeps, workspace_scope: WorkspaceScope, **opts: object, ) -> PipelineEvent: """Execute an agent-invocation effect, injecting any patched MCP lifecycle hooks.""" effective_start_fn = ( start_mcp_server if start_mcp_server is not _original_start_mcp_server else None ) effective_shutdown_fn = ( shutdown_mcp_server if shutdown_mcp_server is not _original_shutdown_mcp_server else None ) effective_health_fn = ( check_mcp_bridge_health if check_mcp_bridge_health is not _original_check_mcp_bridge_health else None ) effective_materialize_fn = ( materialize_system_prompt if materialize_system_prompt is not _original_materialize_system_prompt else None ) effective_supervisor = McpSupervisor if McpSupervisor is not _original_mcp_supervisor else None effective_heartbeat_fn = ( heartbeat_policy_from_env if heartbeat_policy_from_env is not _original_heartbeat_policy_from_env else None ) effective_deps = AgentExecutionDeps( invoke_agent=deps.invoke_agent, agent_invocation_error=deps.agent_invocation_error, agent_registry=deps.agent_registry, show_phase_start_cb=deps.show_phase_start_cb or show_phase_start_with_context, set_session_id_cb=deps.set_session_id_cb, start_mcp_server_fn=cast( "_StartMcpServerFn | None", deps.start_mcp_server_fn or effective_start_fn ), shutdown_mcp_server_fn=cast( "_ShutdownMcpServerFn | None", deps.shutdown_mcp_server_fn or effective_shutdown_fn, ), check_mcp_bridge_health_fn=cast( "_CheckMcpBridgeHealthFn | None", deps.check_mcp_bridge_health_fn or effective_health_fn, ), materialize_system_prompt_fn=deps.materialize_system_prompt_fn or effective_materialize_fn, mcp_supervisor_factory=cast( "_McpSupervisorFactory | None", deps.mcp_supervisor_factory or effective_supervisor ), heartbeat_policy_from_env_fn=deps.heartbeat_policy_from_env_fn or effective_heartbeat_fn, ) return _ee_execute_agent_effect(effect, config, effective_deps, workspace_scope, **opts)
[docs] def execute_commit_effect( effect: CommitEffect, create_commit_fn: Callable[[Path | str, str], str], stage_all_fn: Callable[[Path | str], None], repo_root: Path, display: ParallelDisplay | LegacyConsoleDisplay | None = None, **opts: object, ) -> PipelineEvent: """Execute a commit effect, injecting any patched render_commit_message hook.""" effective_render_fn = ( render_commit_message if render_commit_message is not _original_render_commit_message else None ) return _ee_execute_commit_effect( effect, repo_root, display, create_commit_fn=create_commit_fn, stage_all_fn=stage_all_fn, has_commit_work_fn=repo_has_commit_work, render_commit_message_fn=opts.pop("render_commit_message_fn", None) or effective_render_fn, **opts, )
[docs] def emit_phase_transition_if_changed( display: ParallelDisplay | LegacyConsoleDisplay, previous_phase: str, state: PipelineState, *, verbosity: Verbosity, pipeline_policy: PipelinePolicy, ) -> str: """Emit phase-transition banners if the active phase changed, injecting patched banner hooks.""" _close_fn = ( show_phase_close_banner if show_phase_close_banner is not _original_show_phase_close_banner else None ) _trans_fn = ( show_phase_transition if show_phase_transition is not _original_show_phase_transition else None ) return _pt_emit_phase_transition_if_changed( display, previous_phase, state, verbosity=verbosity, pipeline_policy=pipeline_policy, show_close_banner_fn=_close_fn, show_transition_fn=_trans_fn, )
def write_start_commit_if_absent(workspace_root: Path) -> None: """Record the current HEAD as the cycle baseline if no baseline exists yet. This is best effort: if the baseline cannot be written, log a warning and continue without aborting the pipeline. """ if read_cycle_baseline(workspace_root) is not None: return repo: Repo | None = None try: repo = Repo(workspace_root) except InvalidGitRepositoryError: return try: if not repo.head.is_valid(): return write_cycle_baseline(workspace_root, repo.head.commit.hexsha, force=True) except OSError as exc: logger.warning( "Could not write cycle baseline in {}: {} — continuing without baseline", workspace_root, exc, ) finally: close = cast("Callable[[], object] | None", getattr(repo, "close", None)) if callable(close): close() call_determine_effect_from_policy = _call_determine_effect_from_policy invoke_execute_effect_with_optional_display = _invoke_execute_effect_with_optional_display handle_inline_effect = _handle_inline_effect run_pipeline_step = _run_pipeline_step execute_effect = _execute_effect notify_pipeline_subscriber = _notify_pipeline_subscriber handle_keyboard_interrupt = _handle_keyboard_interrupt save_checkpoint_or_log = _save_checkpoint_or_log load_policy_bundle_for_run = _load_policy_bundle_for_run