"""Public policy validation API functions."""
from __future__ import annotations
from importlib import import_module
from typing import TYPE_CHECKING, Protocol, cast
from ralph.onboarding import (
STARTER_PROMPT_SENTINEL,
missing_prompt_validation_hint,
starter_prompt_validation_hint,
)
from ralph.policy.validation._checkpoint_policy_mismatch_error import (
CheckpointPolicyMismatchError,
)
from ralph.policy.validation._phase_validators import (
_validate_analysis_phase,
_validate_commit_cleanup_phase,
_validate_commit_phase_loop_resets,
_validate_commit_phase_post_commit_routes,
_validate_loop_policy_role,
_validate_parallelization_consistency,
_validate_review_phase,
_validate_skip_invocation_has_on_success,
_validate_terminal_phase,
_validate_verification_phase,
)
from ralph.policy.validation._pipeline_validators import (
_validate_no_legacy_phase_constants,
_validate_post_commit_routes_complete,
_validate_reachability,
_validate_recovery_failed_route,
_validate_review_phase_outcome_complete,
_validate_shared_drain_history_consistency,
_validate_terminal_failure_phase_declared,
_validate_tracked_counters_have_positive_max,
)
from ralph.policy.validation._policy_validation_error import PolicyValidationError
if TYPE_CHECKING:
from collections.abc import Callable
from ralph.agents.registry import AgentRegistry
from ralph.pipeline.state import PipelineState
from ralph.pipeline.work_units import WorkUnitsPlan
from ralph.policy.models._pipeline_policy import PipelinePolicy
from ralph.policy.models._policy_bundle import PolicyBundle
from ralph.workspace.scope import WorkspaceScope
class _WorkUnitsModule(Protocol):
WorkUnitsValidationError: type[Exception]
validate_for_same_workspace: Callable[[object], object]
[docs]
def validate_phase_exists_in_policy(
phase: str,
policy: PipelinePolicy,
) -> None:
"""Validate that a phase name is present in the current pipeline policy."""
if phase not in policy.phases:
raise CheckpointPolicyMismatchError(
checkpoint_phase=phase,
valid_phases=set(policy.phases.keys()),
)
[docs]
def validate_checkpoint_compatible(
checkpoint_phase: str,
bundle: PolicyBundle,
) -> None:
"""Validate that a checkpoint phase is compatible with the current policy bundle."""
validate_phase_exists_in_policy(checkpoint_phase, bundle.pipeline)
[docs]
def validate_drain_bound(
drain: str,
bundle: PolicyBundle,
) -> None:
"""Validate that a drain name has a binding in the current policy."""
if drain not in bundle.agents.agent_drains:
raise ValueError(
f"Drain '{drain}' is not bound in agents.toml. "
f"Available drains: {sorted(bundle.agents.agent_drains.keys())}"
)
[docs]
def validate_chain_exists(
chain: str,
bundle: PolicyBundle,
) -> None:
"""Validate that an agent chain is defined."""
if chain not in bundle.agents.agent_chains:
raise ValueError(
f"Chain '{chain}' is not defined in agents.toml. "
f"Available chains: {sorted(bundle.agents.agent_chains.keys())}"
)
[docs]
def validate_drain_contracts(bundle: PolicyBundle) -> None:
"""Validate drain contracts and enforce strict binding rules."""
if not bundle.agents.forbid_sibling_drain_inference:
return
required_drains: set[str] = {
phase_def.drain
for phase_name, phase_def in bundle.pipeline.phases.items()
if phase_name != bundle.pipeline.terminal_phase and phase_def.role != "terminal"
}
unbound_drains: list[str] = [
drain for drain in required_drains if drain not in bundle.agents.agent_drains
]
if unbound_drains:
raise PolicyValidationError(
f"Implicit sibling-drain inference is forbidden, but the following "
f"pipeline drains lack explicit chain bindings: {sorted(unbound_drains)}. "
f"Each drain used by a non-terminal pipeline phase must have an explicit "
f"'chain' binding in agents.toml when forbid_sibling_drain_inference=true."
)
drains_without_class: list[str] = [
drain
for drain in required_drains
if drain in bundle.agents.agent_drains
and bundle.agents.agent_drains[drain].drain_class is None
]
if drains_without_class:
raise PolicyValidationError(
f"Implicit sibling-drain inference is forbidden, but the following "
f"pipeline drains have no explicit drain_class: {sorted(drains_without_class)}. "
f"Set drain_class on each drain in agents.toml "
f"(one of: planning, development, analysis, review, fix, commit)."
)
def _work_units_validation_deps() -> tuple[type[Exception], Callable[[object], object]]:
module = cast("_WorkUnitsModule", import_module("ralph.pipeline.work_units"))
return (module.WorkUnitsValidationError, module.validate_for_same_workspace)
[docs]
def validate_cli_counter_overrides(
policy: PipelinePolicy,
cli_counter_overrides: dict[str, int],
errors: list[str],
) -> None:
"""Validate that every CLI counter override names a declared budget counter."""
declared = set(policy.budget_counters.keys())
unknown = sorted(k for k in cli_counter_overrides if k not in declared)
if unknown:
declared_list = sorted(declared) if declared else ["(none declared)"]
errors.append(
f"--counter override(s) {unknown} are not declared in pipeline.budget_counters. "
f"Declared counters: {declared_list}. "
f"Add [budget_counters.<name>] to pipeline.toml or remove the --counter flag."
)
[docs]
def validate_policy_completeness(
bundle: PolicyBundle,
*,
cli_counter_overrides: dict[str, int] | None = None,
) -> None:
"""Validate that the policy bundle is semantically complete for policy-driven orchestration."""
errors: list[str] = []
policy = bundle.pipeline
terminal_phase = policy.terminal_phase
for phase_name, phase_def in policy.phases.items():
if phase_name == terminal_phase or phase_def.role == "terminal":
_validate_terminal_phase(phase_name, phase_def, errors)
continue
role = phase_def.role
if role is None:
errors.append(
f"phases.{phase_name}: 'role' is required. "
f"Set role='execution'|'analysis'|'review'|'commit'|'verification'|'terminal'. "
f"Run `ralph --regenerate-config` to get an updated pipeline.toml template."
)
continue
if role == "analysis":
_validate_analysis_phase(phase_name, phase_def, bundle, errors)
if role == "review":
_validate_review_phase(phase_name, phase_def, errors)
if role == "commit":
if phase_def.commit_policy is None:
errors.append(
f"phases.{phase_name}: role='commit' requires commit_policy "
f"(requires_artifact, increments_counter, loop_resets)"
)
else:
_validate_commit_phase_loop_resets(phase_name, phase_def, policy, errors)
_validate_commit_phase_post_commit_routes(phase_name, phase_def, policy, errors)
if role == "verification":
_validate_verification_phase(phase_name, phase_def, policy, errors)
if role == "commit_cleanup":
_validate_commit_cleanup_phase(phase_name, phase_def, bundle, errors)
_validate_loop_policy_role(phase_name, phase_def, errors)
_validate_skip_invocation_has_on_success(phase_name, phase_def, errors)
_validate_parallelization_consistency(phase_name, phase_def, errors)
_validate_recovery_failed_route(policy, errors)
_validate_no_legacy_phase_constants(policy, errors)
_validate_reachability(policy, errors)
_validate_post_commit_routes_complete(policy, errors)
_validate_review_phase_outcome_complete(policy, errors)
_validate_terminal_failure_phase_declared(policy, errors)
_validate_tracked_counters_have_positive_max(policy, errors)
_validate_shared_drain_history_consistency(policy, errors)
if cli_counter_overrides:
validate_cli_counter_overrides(policy, cli_counter_overrides, errors)
if errors:
raise PolicyValidationError(
"Policy completeness validation failed:\n" + "\n".join(f" {e}" for e in errors),
source="completeness",
)
[docs]
def get_drain_resolution_matrix(bundle: PolicyBundle) -> dict[str, dict[str, str]]:
"""Generate a normalized drain resolution matrix."""
matrix: dict[str, dict[str, str]] = {}
for drain_name in bundle.agents.agent_drains:
drain_config = bundle.agents.agent_drains[drain_name]
chain_name = drain_config.chain
chain_config = bundle.agents.agent_chains.get(chain_name)
matrix[drain_name] = {
"chain": chain_name,
"agents": ",".join(chain_config.agents) if chain_config else "",
"max_retries": str(chain_config.max_retries) if chain_config else "",
}
return matrix
[docs]
def validate_work_units_against_policy(
work_units: WorkUnitsPlan,
pipeline_policy: PipelinePolicy,
*,
phase: str,
) -> None:
"""Validate parsed planning work_units against the active phase's parallelization policy."""
if len(work_units.work_units) <= 1:
return
phase_def = pipeline_policy.phases.get(phase)
parallel_policy = phase_def.parallelization if phase_def is not None else None
if parallel_policy is None:
work_units_count = len(work_units.work_units)
raise PolicyValidationError(
f"Phase {phase!r} does not declare parallelization but the plan declares "
f"{work_units_count} work_units; the active transition policy must explicitly "
f"enable same-workspace fan-out via [phases.{phase}.parallelization]"
)
work_units_count = len(work_units.work_units)
if work_units_count > parallel_policy.max_work_units:
raise PolicyValidationError(
f"work_units count {work_units_count} exceeds cap {parallel_policy.max_work_units}"
)
if work_units_count > parallel_policy.max_parallel_workers:
raise PolicyValidationError(
"Planning artifact declares "
f"{work_units_count} work_units, exceeding "
f"max_parallel_workers={parallel_policy.max_parallel_workers}"
)
if parallel_policy.require_allowed_directories:
for unit in work_units.work_units:
if not unit.allowed_directories:
raise PolicyValidationError(
f"Work unit '{unit.unit_id}' must declare allowed_directories"
)
work_units_validation_error, validate_for_same_workspace = _work_units_validation_deps()
try:
validate_for_same_workspace(work_units)
except work_units_validation_error as exc:
raise PolicyValidationError(str(exc)) from exc
[docs]
def validate_agent_chains_satisfiable(
bundle: PolicyBundle,
agent_registry: AgentRegistry,
) -> None:
"""Validate that every agent referenced in every chain exists in the registry."""
unknown_agents: list[str] = []
for chain_name, chain_config in bundle.agents.agent_chains.items():
unknown_agents.extend(
f"chain '{chain_name}' references unknown agent '{agent_name}'"
for agent_name in chain_config.agents
if agent_registry.get(agent_name) is None
)
if unknown_agents:
raise PolicyValidationError(
"Agent chains reference unknown agents (check configuration, not PATH): "
+ "; ".join(unknown_agents)
)
[docs]
def validate_recovery_config(bundle: PolicyBundle) -> None:
"""Validate recovery-related configuration in the policy bundle."""
for chain_name, chain_config in bundle.agents.agent_chains.items():
if chain_config.max_retries < 0:
raise PolicyValidationError(
f"Chain '{chain_name}' has invalid "
f"max_retries={chain_config.max_retries}; must be >= 0"
)
[docs]
def validate_checkpoint_against_policy(
state: PipelineState,
bundle: PolicyBundle,
) -> None:
"""Validate a checkpoint state against the current policy bundle."""
try:
entry_block = bundle.pipeline.entry_block
except AttributeError:
entry_block = None
if entry_block is not None and state.policy_format_version != 2:
raise PolicyValidationError(
"Cannot resume from this obsolete checkpoint: it was saved before "
"the block-based policy format redesign. Preserve the checkpoint "
"for inspection and start fresh with --no-resume or regenerate "
"policy/config before rerunning."
)
validate_phase_exists_in_policy(state.phase, bundle.pipeline)
if state.current_drain is not None and state.current_drain not in bundle.agents.agent_drains:
raise PolicyValidationError(
f"Checkpoint references drain '{state.current_drain}' which is not bound "
f"in agents.toml. Available drains: {sorted(bundle.agents.agent_drains.keys())}"
)