"""Run pipeline command for Ralph Workflow CLI.
This module implements the main pipeline execution command.
"""
from __future__ import annotations
import os
import shutil
from importlib import import_module
from inspect import signature
from pathlib import Path
from typing import TYPE_CHECKING, NamedTuple, Protocol, cast
from loguru import logger
from rich.panel import Panel
from rich.text import Text
from ralph.agents.registry import AgentRegistry
from ralph.cli.commands._execute_pipeline_request import _ExecutePipelineRequest
from ralph.cli.commands._load_result import _LoadResult
from ralph.cli.commands._policy_preflight_request import _PolicyPreflightRequest
from ralph.cli.commands._preflight_request import _PreflightRequest
from ralph.cli.commands._run_func_state import _RUN_FUNC_UNSET, _RunFuncState
from ralph.config.loader import load_config
from ralph.display.context import make_display_context
from ralph.mcp.protocol.env import RALPH_PARALLEL_WORKER_MANIFEST_ENV
from ralph.onboarding import GETTING_STARTED_DOC, fresh_workspace_next_steps
from ralph.pipeline import checkpoint as ckpt
from ralph.pipeline.parallel.worker_runtime import run_parallel_worker_from_manifest
from ralph.policy.loader import (
load_policy as _dir_load_policy,
)
from ralph.policy.loader import (
load_policy_for_workspace_scope,
)
from ralph.policy.validation import (
CheckpointPolicyMismatchError,
PolicyValidationError,
validate_agent_chains_satisfiable,
validate_checkpoint_against_policy,
validate_drain_contracts,
validate_policy_completeness,
validate_recovery_config,
validate_required_inputs,
)
from ralph.workspace.scope import resolve_workspace_scope
if TYPE_CHECKING:
from ralph.cli.commands._legacy_run_pipeline_kwargs import _LegacyRunPipelineKwargs
from ralph.config.enums import Verbosity
from ralph.config.models import UnifiedConfig
from ralph.display.context import DisplayContext
from ralph.pipeline.state import PipelineState
from ralph.policy.models import PolicyBundle
if TYPE_CHECKING:
class _RunnerFunc(Protocol):
def __call__(
self,
config: UnifiedConfig,
initial_state: PipelineState | None,
**kwargs: object,
) -> int: ...
class _RunnerModule(Protocol):
"""Typed accessor for the lazily imported pipeline runner module."""
run: _RunnerFunc
_state = _RunFuncState()
def _get_run_func() -> _RunnerFunc | None:
"""Return the pipeline runner callable, importing it lazily on first call.
The module-level ``_state.run_func`` is set so tests can inject a fake runner
via ``monkeypatch.setattr(_state, 'run_func', ...)``. A sentinel distinguishes
"not yet loaded" from the genuine ``None`` produced by an ImportError, ensuring
repeated calls do not retry the import after a failure.
"""
if _state.run_func is not _RUN_FUNC_UNSET:
return cast("_RunnerFunc | None", _state.run_func)
try:
module = cast("_RunnerModule", import_module("ralph.pipeline.runner"))
except ImportError:
_state.run_func = None
return None
_state.run_func = module.run
return module.run
ConfigOverrides = dict[str, object]
# Exit codes
_EXIT_SUCCESS = 0
_EXIT_CONFIG_ERROR = 1
_EXIT_INTERRUPT = 130
_EXIT_PREFLIGHT = 2
load_policy = _dir_load_policy
_GENERATED_AGENT_STATE_DIRS: tuple[str, ...] = (
"artifacts",
"tmp",
"prompt_history",
"workers",
)
def _validate_custom_mcp_servers(workspace_root: Path) -> int:
module = import_module("ralph.pipeline.runner")
return cast("int", module.validate_custom_mcp_servers(workspace_root))
validate_custom_mcp_servers = _validate_custom_mcp_servers
_GENERATED_AGENT_STATE_FILES: tuple[str, ...] = (
"CURRENT_PROMPT.md",
"PLAN.md",
"ISSUES.md",
"DEVELOPMENT_RESULT.md",
"FIX_RESULT.md",
"DEVELOPMENT_ANALYSIS_DECISION.md",
"REVIEW_ANALYSIS_DECISION.md",
"checkpoint.json",
"rebase_checkpoint.json",
"rebase_checkpoint.json.bak",
"rebase.lock",
"start_commit",
)
[docs]
class RunPipelineRequest(NamedTuple):
"""Parameters for a pipeline run request."""
config_path: Path | None = None
cli_overrides: ConfigOverrides | None = None
dry_run: bool = False
resume: bool = False
verbosity: Verbosity | None = None
counter_overrides: dict[str, int] | None = None
inline_prompt: str | None = None
parallel_worker_manifest: Path | None = None
def _prompt_changed_since_last_materialization(workspace_root: Path) -> bool:
prompt_path = workspace_root / "PROMPT.md"
current_prompt_path = workspace_root / ".agent" / "CURRENT_PROMPT.md"
if not prompt_path.exists() or not current_prompt_path.exists():
return False
try:
return prompt_path.read_text(encoding="utf-8") != current_prompt_path.read_text(
encoding="utf-8"
)
except OSError:
return False
def _clear_generated_pipeline_state(workspace_root: Path) -> None:
agent_dir = workspace_root / ".agent"
for relative_dir in _GENERATED_AGENT_STATE_DIRS:
shutil.rmtree(agent_dir / relative_dir, ignore_errors=True)
for relative_file in _GENERATED_AGENT_STATE_FILES:
(agent_dir / relative_file).unlink(missing_ok=True)
def _invalidate_pipeline_state_if_prompt_changed(workspace_root: Path) -> bool:
if not _prompt_changed_since_last_materialization(workspace_root):
return False
_clear_generated_pipeline_state(workspace_root)
return True
def _load_configuration(
config_path: Path | None,
cli_overrides: ConfigOverrides,
resume: bool,
*,
display_context: DisplayContext,
inline_prompt: str | None = None,
) -> _LoadResult | int:
"""Load configuration and resolve workspace scope.
Returns:
_LoadResult on success, or int error code on failure.
"""
console = display_context.console
try:
workspace_scope = None if config_path is not None else resolve_workspace_scope()
config = load_config(config_path, cli_overrides, workspace_scope=workspace_scope)
except Exception as e:
logger.error("Failed to load configuration: {}", e)
return _EXIT_CONFIG_ERROR
initial_state: PipelineState | None = None
policy_bundle: PolicyBundle | None = None
if (
workspace_scope is not None
and inline_prompt is None
and _invalidate_pipeline_state_if_prompt_changed(workspace_scope.root)
):
console.print(
Text(
"PROMPT.md changed since the last materialized run context; "
"cleared saved pipeline state and caches.",
style="theme.status.warning",
)
)
if workspace_scope is not None:
try:
if load_policy is not _dir_load_policy:
policy_dir = workspace_scope.resolve_agent_file("pipeline.toml").parent
policy_bundle = load_policy(policy_dir, config=config)
else:
policy_bundle = load_policy_for_workspace_scope(workspace_scope, config=config)
except Exception as e:
logger.warning("Failed to load policy bundle: {}", e)
err_text = Text()
err_text.append("Preflight error:", style="theme.status.error")
err_text.append(f" {e}")
console.print(err_text, soft_wrap=True)
return _EXIT_PREFLIGHT
if resume:
initial_state = ckpt.load()
if initial_state is None:
console.print(Text("No checkpoint found to resume from", style="theme.status.warning"))
return _LoadResult(
config=config,
workspace_scope=workspace_scope,
initial_state=initial_state,
policy_bundle=policy_bundle,
)
def _print_not_initialized_panel(*, display_context: DisplayContext) -> None:
"""Print a friendly 'not initialized' panel for completely fresh workspaces."""
console = display_context.console
content = Text()
content.append(
"Ralph Workflow orchestrates AI coding agents through a "
"planning → development loop "
"driven by your PROMPT.md.\n\n"
)
content.append("Next steps:\n", style="theme.banner.title")
for index, line in enumerate(fresh_workspace_next_steps(), start=1):
content.append(f" {index}. {line}\n")
content.append("\nDocs: ", style="theme.text.muted")
content.append(GETTING_STARTED_DOC, style="theme.text.muted")
content.append(" — step-by-step walkthrough for new users", style="theme.text.muted")
panel = Panel(
content,
title="Ralph Workflow is not initialized here yet",
border_style="theme.status.warning",
padding=(1, 2),
)
console.print(panel)
def _validate_loaded_policy_bundle(policy_bundle: PolicyBundle) -> None:
"""Validate cross-drain policy contracts for an already loaded bundle."""
validate_drain_contracts(policy_bundle)
def _run_policy_preflight_checks(
request: _PolicyPreflightRequest,
*,
display_context: DisplayContext,
) -> int:
"""Run policy-backed preflight checks against the already loaded bundle."""
console = display_context.console
try:
agent_registry = AgentRegistry.from_config(request.config)
validate_agent_chains_satisfiable(request.policy_bundle, agent_registry)
except PolicyValidationError as e:
console.print(_preflight_error_text(e.message), soft_wrap=True)
return _EXIT_PREFLIGHT
try:
validate_recovery_config(request.policy_bundle)
except PolicyValidationError as e:
console.print(_preflight_error_text(e.message), soft_wrap=True)
return _EXIT_PREFLIGHT
if request.counter_overrides:
try:
validate_policy_completeness(
request.policy_bundle,
cli_counter_overrides=request.counter_overrides,
)
except PolicyValidationError as e:
console.print(_preflight_error_text(e.message), soft_wrap=True)
return _EXIT_PREFLIGHT
if request.initial_state is not None:
try:
validate_checkpoint_against_policy(request.initial_state, request.policy_bundle)
except CheckpointPolicyMismatchError as e:
console.print(_checkpoint_mismatch_text(str(e)), soft_wrap=True)
return _EXIT_PREFLIGHT
except PolicyValidationError as e:
console.print(_preflight_error_text(e.message), soft_wrap=True)
return _EXIT_PREFLIGHT
return _EXIT_SUCCESS
def _run_preflight_checks(
request: _PreflightRequest,
*,
display_context: DisplayContext,
) -> int:
"""Run all preflight validation checks.
Returns:
_EXIT_SUCCESS if all checks pass, _EXIT_PREFLIGHT if any check fails.
"""
console = display_context.console
# validate_required_inputs requires workspace_scope
if request.workspace_scope is not None and request.inline_prompt is None:
# Fresh-state detection: workspace has neither PROMPT.md nor .agent
prompt_path = request.workspace_scope.root / "PROMPT.md"
agent_dir = request.workspace_scope.root / ".agent"
if not prompt_path.exists() and not agent_dir.exists():
_print_not_initialized_panel(display_context=display_context)
return _EXIT_PREFLIGHT
try:
validate_required_inputs(request.workspace_scope)
except PolicyValidationError as e:
console.print(_preflight_error_text(e.message), soft_wrap=True)
return _EXIT_PREFLIGHT
if validate_custom_mcp_servers(request.workspace_scope.root) != _EXIT_SUCCESS:
console.print(
_preflight_error_text("Custom MCP validation failed — see logs"),
soft_wrap=True,
)
return _EXIT_PREFLIGHT
# Only run policy-based validations if we have a loaded policy bundle.
if request.policy_bundle is not None:
loaded_policy_bundle = cast("PolicyBundle", request.policy_bundle)
try:
validate_loaded_policy_bundle(loaded_policy_bundle)
except PolicyValidationError as e:
console.print(_preflight_error_text(e.message), soft_wrap=True)
return _EXIT_PREFLIGHT
return _run_policy_preflight_checks(
_PolicyPreflightRequest(
config=request.config,
policy_bundle=loaded_policy_bundle,
initial_state=request.initial_state,
counter_overrides=request.counter_overrides,
),
display_context=display_context,
)
return _EXIT_SUCCESS
[docs]
def print_dry_run(
initial_state: PipelineState | None,
config: UnifiedConfig,
policy_bundle: PolicyBundle | None,
*,
display_context: DisplayContext,
) -> None:
"""Print dry-run information."""
console = display_context.console
console.print(Text("Dry run mode", style="theme.cat.meta"))
fallback_phase = policy_bundle.pipeline.entry_phase if policy_bundle is not None else "unknown"
phase = initial_state.phase if initial_state else fallback_phase
console.print(_detail_text("Phase", phase))
console.print(_detail_text("Iterations", str(config.general.developer_iters)))
def _execute_pipeline(
request: _ExecutePipelineRequest,
*,
display_context: DisplayContext,
) -> int:
"""Execute the pipeline.
Returns:
Exit code from pipeline runner.
"""
console = display_context.console
run_func = _get_run_func()
if run_func is None:
logger.error("Pipeline runner is unavailable")
console.print(Text("Pipeline runner is unavailable", style="theme.status.error"))
return _EXIT_CONFIG_ERROR
try:
kwargs: dict[str, object] = {}
runner_params = signature(run_func).parameters
if request.verbosity is not None and "verbosity" in runner_params:
kwargs["verbosity"] = request.verbosity
if request.policy_bundle is not None and "policy_bundle" in runner_params:
kwargs["policy_bundle"] = request.policy_bundle
if "display_context" in runner_params:
kwargs["display_context"] = display_context
if request.counter_overrides and "counter_overrides" in runner_params:
kwargs["counter_overrides"] = request.counter_overrides
if request.config_path is not None and "config_path" in runner_params:
kwargs["config_path"] = request.config_path
if request.cli_overrides is not None and "cli_overrides" in runner_params:
kwargs["cli_overrides"] = request.cli_overrides
return run_func(request.config, request.initial_state, **kwargs)
except KeyboardInterrupt:
console.print(Text("\nInterrupted by user", style="theme.status.warning"))
if request.initial_state is not None:
_save_interrupt_checkpoint(request.initial_state)
return _EXIT_INTERRUPT
except CheckpointPolicyMismatchError as e:
console.print(_checkpoint_mismatch_text(str(e)))
return _EXIT_PREFLIGHT
except PolicyValidationError as e:
console.print(_pipeline_config_error_text(e.message))
return _EXIT_PREFLIGHT
except Exception as e:
logger.exception("Pipeline execution failed: {}")
console.print(_status_text("Pipeline failed", str(e), "theme.status.error"))
return _EXIT_CONFIG_ERROR
def _save_interrupt_checkpoint(initial_state: PipelineState) -> None:
"""Save checkpoint on interrupt."""
try:
update_data: ConfigOverrides = {"interrupted_by_user": True}
interrupted_state = initial_state.model_copy(update=update_data)
ckpt.save(interrupted_state)
except Exception:
logger.warning("Checkpoint save failed during interrupt", exc_info=True)
def _preflight_error_text(message: str) -> Text:
text = Text()
text.append("Preflight error:", style="theme.status.error")
text.append(f" {message}")
return text
def _checkpoint_mismatch_text(message: str) -> Text:
text = Text()
text.append("Checkpoint mismatch:", style="theme.status.error")
text.append(f" {message}")
return text
def _pipeline_config_error_text(message: str) -> Text:
text = Text()
text.append("Pipeline configuration error:", style="theme.status.error")
text.append(f" {message}")
return text
def _status_text(label: str, detail: str, style: str) -> Text:
text = Text()
text.append(f"{label}:", style=style)
text.append(" ")
text.append(detail)
return text
def _detail_text(label: str, detail: str) -> Text:
text = Text()
text.append(f" {label}: ")
text.append(detail)
return text
# Backward compatibility: expose run_pipeline for direct invocation
[docs]
def run_pipeline(
request: RunPipelineRequest | None = None,
*,
display_context: DisplayContext | None = None,
**kwargs: _LegacyRunPipelineKwargs,
) -> int:
"""Run the Ralph Workflow pipeline (backward compatibility wrapper).
Args:
request: RunPipelineRequest namedtuple with all pipeline options.
display_context: Display context for consistent rendering. If None, a default
context is created using make_display_context().
**kwargs: Additional keyword arguments for backward compatibility.
Accepted keys: config_path, cli_overrides, dry_run, resume, verbosity,
counter_overrides, inline_prompt.
Returns:
Exit code (0 for success, non-zero for failure).
"""
ctx = display_context if display_context is not None else make_display_context()
if request is None:
request = RunPipelineRequest(
config_path=cast("Path | None", kwargs.get("config_path")),
cli_overrides=cast("ConfigOverrides | None", kwargs.get("cli_overrides")),
dry_run=cast("bool", kwargs.get("dry_run", False)),
resume=cast("bool", kwargs.get("resume", False)),
verbosity=cast("Verbosity | None", kwargs.get("verbosity")),
counter_overrides=cast("dict[str, int] | None", kwargs.get("counter_overrides")),
inline_prompt=cast("str | None", kwargs.get("inline_prompt")),
parallel_worker_manifest=(
Path(cast("str", kwargs["parallel_worker_manifest"]))
if isinstance(kwargs.get("parallel_worker_manifest"), str)
else cast("Path | None", kwargs.get("parallel_worker_manifest"))
),
)
effective_request = request
effective_counter_overrides = effective_request.counter_overrides or {}
effective_parallel_worker_manifest = effective_request.parallel_worker_manifest
if effective_parallel_worker_manifest is None:
manifest_from_env = os.environ.get(str(RALPH_PARALLEL_WORKER_MANIFEST_ENV))
if manifest_from_env:
effective_parallel_worker_manifest = Path(manifest_from_env)
if effective_parallel_worker_manifest is not None:
return run_parallel_worker_from_manifest(
manifest_path=effective_parallel_worker_manifest,
display_context=ctx,
)
if effective_request.inline_prompt is not None:
workspace_scope = resolve_workspace_scope()
current_prompt_path = workspace_scope.root / ".agent" / "CURRENT_PROMPT.md"
current_prompt_path.parent.mkdir(parents=True, exist_ok=True)
current_prompt_path.write_text(effective_request.inline_prompt, encoding="utf-8")
# Phase 1: Load configuration
load_result = _load_configuration(
effective_request.config_path,
effective_request.cli_overrides or {},
effective_request.resume,
display_context=ctx,
inline_prompt=effective_request.inline_prompt,
)
if isinstance(load_result, int):
return load_result
# Phase 2: Preflight validation (before any pipeline activity)
preflight_result = _run_preflight_checks(
_PreflightRequest(
config=load_result.config,
workspace_scope=load_result.workspace_scope,
policy_bundle=load_result.policy_bundle,
initial_state=load_result.initial_state,
counter_overrides=effective_counter_overrides,
inline_prompt=effective_request.inline_prompt,
parallel_worker_manifest=effective_request.parallel_worker_manifest,
),
display_context=ctx,
)
if preflight_result != _EXIT_SUCCESS:
return preflight_result
# Phase 3: Handle dry-run
if effective_request.dry_run:
print_dry_run(
load_result.initial_state,
load_result.config,
load_result.policy_bundle,
display_context=ctx,
)
return _EXIT_SUCCESS
# Phase 4: Execute pipeline
return _execute_pipeline(
_ExecutePipelineRequest(
config=load_result.config,
initial_state=load_result.initial_state,
policy_bundle=load_result.policy_bundle,
verbosity=effective_request.verbosity,
counter_overrides=effective_counter_overrides,
config_path=effective_request.config_path,
cli_overrides=effective_request.cli_overrides,
parallel_worker_manifest=effective_request.parallel_worker_manifest,
),
display_context=ctx,
)
validate_loaded_policy_bundle = _validate_loaded_policy_bundle
state = _state
invalidate_pipeline_state_if_prompt_changed = _invalidate_pipeline_state_if_prompt_changed