Source code for ralph.policy._render_text

"""Text rendering helpers for policy explanations."""

from __future__ import annotations

from ralph.policy.explain import PhaseExplanation, PolicyExplanation

_ROLE_LABELS: dict[str, str] = {
    "execution": "execution (agent runs code)",
    "analysis": "analysis (agent reviews output, decides next step)",
    "review": "review (agent performs code review)",
    "commit": "commit (agent commits changes)",
    "verification": "verification (automated gate)",
    "terminal": "terminal (pipeline ends here)",
    "fanout_join": "fanout-join (waits for parallel workers)",
}


[docs] def render_explanation_text(exp: object) -> str: """Render a PolicyExplanation as a multi-section human-readable text string.""" if not isinstance(exp, PolicyExplanation): return "" lines: list[str] = [] _render_header(exp, lines) _render_authored_blocks_text(exp, lines) _render_lifecycle_text(exp, lines) _render_terminal_outcomes_text(exp, lines) _render_phases_text(exp, lines) _render_loop_counters_text(exp, lines) _render_budget_counters_text(exp, lines) if exp.post_commit_routes: _render_post_commit_routes_text(exp, lines) if exp.parallel_executions: _render_parallel_executions_text(exp, lines) elif exp.parallel_execution is not None: _render_parallel_text(exp, lines) if exp.recovery is not None: _render_recovery_text(exp, lines) lines.append("") lines.append("=" * 70) return "\n".join(lines)
def _render_header(exp: PolicyExplanation, lines: list[str]) -> None: lines.append("=" * 70) lines.append("RALPH WORKFLOW — ACTIVE POLICY EXPLANATION") lines.append("=" * 70) lines.append("") if exp.entry_block: lines.append(f"Entry block : {exp.entry_block}") lines.append(f"Entry phase : {exp.entry_phase}") lines.append(f"Terminal phase: {exp.terminal_phase}") def _render_authored_blocks_text(exp: PolicyExplanation, lines: list[str]) -> None: if not exp.authored_blocks: return lines.append("") lines.append("-" * 70) lines.append("AUTHORED BLOCKS") lines.append("-" * 70) lines.extend(f" {block_name}" for block_name in exp.authored_blocks) def _render_lifecycle_text(exp: PolicyExplanation, lines: list[str]) -> None: if not exp.lifecycle_explanations: return lines.append("") lines.append("-" * 70) lines.append("LIFECYCLE COMPLETION") lines.append("-" * 70) for lifecycle in exp.lifecycle_explanations: counter = lifecycle.increments_counter or "(none)" lines.append( " " f"{lifecycle.lifecycle_name}: completion block " f"'{lifecycle.completion_block}' compiles to phase " f"'{lifecycle.completion_phase}' and increments {counter}" ) if lifecycle.before_complete: lines.append(f" before_complete: {', '.join(lifecycle.before_complete)}") if lifecycle.after_complete: lines.append(f" after_complete : {', '.join(lifecycle.after_complete)}") def _render_terminal_outcomes_text(exp: PolicyExplanation, lines: list[str]) -> None: if not exp.terminal_outcomes: return lines.append("") lines.append("Terminal outcomes:") lines.extend(f" {to.outcome:10s}{to.phase}" for to in exp.terminal_outcomes) def _render_phases_text(exp: PolicyExplanation, lines: list[str]) -> None: lines.append("") lines.append("-" * 70) lines.append("PHASES") lines.append("-" * 70) for phase in exp.phases: _render_phase_text(phase, lines) def _render_loop_counters_text(exp: PolicyExplanation, lines: list[str]) -> None: if not exp.loop_counters: return lines.append("") lines.append("-" * 70) lines.append("LOOP COUNTERS") lines.append("-" * 70) for lc in exp.loop_counters: desc = f" — {lc.description}" if lc.description else "" lines.append(f" {lc.name}: max={lc.default_max}{desc}") def _render_budget_counters_text(exp: PolicyExplanation, lines: list[str]) -> None: if not exp.budget_counters: return lines.append("") lines.append("-" * 70) lines.append("BUDGET COUNTERS") lines.append("-" * 70) for bc in exp.budget_counters: tracked = "tracked (exhaustion matters)" if bc.tracks_budget else "not tracked" desc = f" — {bc.description}" if bc.description else "" lines.append(f" {bc.name}: {tracked}, default_max={bc.default_max}{desc}")
[docs] def render_explanation_sentences(phase: PhaseExplanation) -> list[str]: """Generate explanation sentences for a phase per Required Product Outcome D.""" sentences: list[str] = [] if phase.decisions: for decision_name, target in phase.decisions.items(): sentences.append( f"Explanation: phase '{phase.name}' routes to " f"'{target}' because the configured decision was " f"'{decision_name}'." ) if phase.terminal_outcome: sentences.append( f"Explanation: when reached, the run terminates because " f"the workflow policy declares phase '{phase.name}' as a " f"terminal '{phase.terminal_outcome}' outcome." ) for outcome, target in sorted(phase.bypass_routes.items()): sentences.append( f"Explanation: phase '{phase.name}' bypasses to '{target}' " f"when the configured outcome is '{outcome}'." ) if phase.on_loopback and phase.loop_policy is not None: sentences.append( f"Explanation: phase '{phase.name}' loops back to " f"'{phase.on_loopback}' until " f"{phase.loop_policy.max_iterations} attempts are exhausted, " f"after which the run terminates." ) if phase.verification is not None: v = phase.verification failure_target = v.on_failure_route or "pipeline failure" sentences.append( f"Explanation: phase '{phase.name}' must satisfy a {v.kind} " f"verification gate before {v.gate_for}; failure routes to " f"'{failure_target}'." ) if v.on_failure_route: sentences.append( f"Explanation: phase '{phase.name}' fails verification " f"→ routes to '{v.on_failure_route}' because the policy " f"declares verification.on_failure_route" ) if not phase.has_parallelization and phase.role not in {"terminal", "fanout_join"}: sentences.append( f"Explanation: parallel execution is rejected at phase " f"'{phase.name}' because no parallelization policy is declared" ) for budget_state, target in phase.post_commit_routes_info: sentences.append( f"Explanation: after commit phase '{phase.name}' with budget_state " f"'{budget_state}' → routes to '{target}' because the workflow " f"policy declares this post_commit_route" ) return sentences
def _render_post_commit_routes_text(exp: object, lines: list[str]) -> None: if not isinstance(exp, PolicyExplanation): return lines.append("") lines.append("-" * 70) lines.append("POST-COMMIT ROUTES") lines.append("-" * 70) lines.extend( f" phase {route.phase} (budget={route.budget_state}) → {route.target}" for route in exp.post_commit_routes ) def _render_parallel_executions_text(exp: object, lines: list[str]) -> None: if not isinstance(exp, PolicyExplanation): return lines.append("") lines.append("-" * 70) lines.append("PARALLEL EXECUTION") lines.append("-" * 70) for pe in exp.parallel_executions: verify = "yes" if pe.post_fanout_verification else "no" req = "yes" if pe.require_allowed_directories else "no" lines.append(f" Fanout phase : {pe.phase}") lines.append(f" Max workers : {pe.max_parallel_workers}") lines.append(f" Max work units: {pe.max_work_units}") lines.append(f" Require allowed_directories: {req}") lines.append(f" post_fanout_verify: {verify}") lines.append( f" When is parallel execution allowed? " f"When the planning artifact declares multiple work_units " f"(up to {pe.max_work_units}) for phase '{pe.phase}'." ) def _render_parallel_text(exp: object, lines: list[str]) -> None: if not isinstance(exp, PolicyExplanation): return pe = exp.parallel_execution if pe is None: return lines.append("") lines.append("-" * 70) lines.append("PARALLEL EXECUTION") lines.append("-" * 70) verify = "yes" if pe.post_fanout_verification else "no" lines.append(f" Fanout phase : {pe.phase}") lines.append(f" Max workers : {pe.max_parallel_workers}") lines.append(f" Max work units: {pe.max_work_units}") req = "yes" if pe.require_allowed_directories else "no" lines.append(f" Require allowed_directories: {req}") lines.append(f" post_fanout_verify: {verify}") lines.append( f" When is parallel execution allowed? " f"When the planning artifact declares multiple work_units " f"(up to {pe.max_work_units}) for phase '{pe.phase}'." ) def _render_recovery_text(exp: object, lines: list[str]) -> None: if not isinstance(exp, PolicyExplanation): return r = exp.recovery if r is None: return lines.append("") lines.append("-" * 70) lines.append("RECOVERY POLICY") lines.append("-" * 70) lines.append(f" Max recovery cycles : {r.cycle_cap}") lines.append(f" Terminal failure route: {r.terminal_recovery_route}") lines.append(f" Session preserved on: {', '.join(r.preserve_session_on_categories) or 'none'}") def _render_phase_routing(phase: PhaseExplanation, lines: list[str]) -> None: if phase.terminal_outcome: lines.append(f" Terminal outcome: {phase.terminal_outcome}") elif phase.on_success: lines.append(f" On success → {phase.on_success}") if phase.on_failure: lines.append(f" On failure → {phase.on_failure}") elif not phase.is_terminal: lines.append(" On failure → pipeline fails (no on_failure route)") if phase.on_loopback: lines.append(f" On loopback → {phase.on_loopback}") for outcome, target in phase.bypass_routes.items(): lines.append(f" Bypass [{outcome}] → {target}") if phase.workflow_fallback is not None: fallback_target, fallback_note = phase.workflow_fallback lines.append(f" Workflow fallback (chain exhausted) → {fallback_target}") if fallback_note: lines.append(f" Note: {fallback_note}") if phase.decisions: lines.append(" Decisions:") for decision, target in phase.decisions.items(): lines.append(f" {decision:20s}{target}") def _render_phase_commit(phase: PhaseExplanation, lines: list[str]) -> None: cp = phase.commit_policy if cp is None: return counter = cp.increments_counter counter_str = f"increments '{counter}'" if counter else "no counter incremented" lines.append(f" Commit : {counter_str}") if cp.loop_resets: lines.append(f" resets loop counters: {cp.loop_resets}") req = "yes" if cp.requires_artifact else "no" lines.append(f" requires artifact: {req}") lines.append(" When is commit required? When this phase is active and the agent") lines.append(" produces changes that need to be committed.") def _render_phase_review(phase: PhaseExplanation, lines: list[str]) -> None: if phase.role != "review": return if phase.clean_outcome: lines.append(f" Clean outcome: {phase.clean_outcome}") if phase.issues_outcome: lines.append(f" Issues outcome: {phase.issues_outcome}") def _render_phase_verification(phase: PhaseExplanation, lines: list[str]) -> None: v = phase.verification if v is None: return if v.on_failure_route: on_fail_str = f"on_failure_route='{v.on_failure_route}'" else: on_fail_str = "no on_failure_route (pipeline fails)" lines.append(f" Verification: kind={v.kind}, gates={v.gate_for}, {on_fail_str}") if v.kind == "artifact": lines.append( " An artifact file must be present and non-empty before advancement." ) elif v.kind == "none": lines.append(" Declarative gate — always passes; use for documentation.") def _render_phase_text(phase: object, lines: list[str]) -> None: if not isinstance(phase, PhaseExplanation): return badges: list[str] = [] if phase.is_entry: badges.append("ENTRY") if phase.is_terminal: badges.append("TERMINAL") badge_str = f" [{', '.join(badges)}]" if badges else "" role_label = _ROLE_LABELS.get(phase.role or "", phase.role or "unknown") lines.append("") lines.append(f" Phase: {phase.name}{badge_str}") lines.append(f" Role : {role_label}") lines.append(f" Drain : {phase.drain}") if phase.chain: agent_str = ", ".join(phase.agents) if phase.agents else "(none)" lines.append(f" Chain : {phase.chain} → agents: [{agent_str}]") fallback = ", then fall back to next agent" if len(phase.agents) > 1 else ", then fail" lines.append(f" Retry : up to {phase.max_retries} retries per agent{fallback}") if phase.skip_invocation: lines.append(" Invocation : SKIPPED — routing proceeds without invoking an agent") _render_phase_routing(phase, lines) if phase.loop_policy is not None: lp = phase.loop_policy lines.append( f" Loop : counter='{lp.iteration_state_field}', max={lp.max_iterations}" ) if lp.loopback_review_outcome: lines.append( f" loopback sets review_outcome='{lp.loopback_review_outcome}'" ) _render_phase_commit(phase, lines) _render_phase_review(phase, lines) _render_phase_verification(phase, lines) lines.extend(render_explanation_sentences(phase))