Source code for ralph.cli.commands.diagnose

"""Diagnose command for Ralph Workflow CLI.

This module implements diagnostic commands to check the
environment and configuration.
"""

from __future__ import annotations

from importlib import import_module
from pathlib import Path
from typing import TYPE_CHECKING, cast

from rich.panel import Panel
from rich.table import Table
from rich.text import Text

from ralph.agents.availability import check_agent_availability
from ralph.agents.registry import AgentRegistry
from ralph.config.loader import load_config
from ralph.display.context import make_display_context
from ralph.git.operations import find_repo_root, is_repo_clean
from ralph.mcp.session_plan import resolve_effective_session_mcp_plan
from ralph.mcp.transport.agy import load_existing_agy_upstream_servers
from ralph.mcp.transport.claude import load_existing_claude_upstream_servers
from ralph.mcp.transport.common import mcp_toml_as_upstreams
from ralph.mcp.upstream.agent_probe import probe_agent_transports
from ralph.mcp.upstream.validation import validate_upstream_mcp_servers
from ralph.onboarding import (
    GETTING_STARTED_DOC,
    INIT_COMMAND,
    RUN_COMMAND,
    starter_prompt_validation_hint,
)
from ralph.policy.loader import (
    load_policy,
    load_policy_for_workspace_scope,
)
from ralph.policy.validation import (
    PolicyValidationError,
    validate_agent_chains_satisfiable,
    validate_recovery_config,
)
from ralph.workspace.scope import WorkspaceScope, resolve_workspace_scope

if TYPE_CHECKING:
    from types import ModuleType

    from rich.console import Console

    from ralph.display.context import DisplayContext
    from ralph.mcp.upstream.config import UpstreamMcpServer


def _module_attr(module: ModuleType, attribute: str) -> object:
    namespace = cast("dict[str, object]", module.__dict__)
    return namespace[attribute]


def _load_starter_prompt_sentinel() -> str:
    return cast(
        "str",
        _module_attr(import_module("ralph.cli.commands.init"), "STARTER_PROMPT_SENTINEL"),
    )


[docs] def diagnose_command( config_path: Path | None = None, cli_overrides: dict[str, object] | None = None, *, display_context: DisplayContext | None = None, ) -> int: """Run diagnostics on the Ralph Workflow environment. Args: config_path: Optional path to config file. cli_overrides: CLI flag overrides. display_context: Display context for consistent rendering. If None, a default context is created using make_display_context(). Returns: Exit code (0 for success, 1 for errors, 2 for validation failures). """ ctx = display_context if display_context is not None else make_display_context() console = ctx.console title = Text() title.append("Ralph Workflow Diagnostics", style="theme.banner.title") console.print() console.print(title) console.print() workspace_scope = resolve_workspace_scope() config_ok = _check_git_repo(display_context=ctx) config_ok &= _check_configuration(config_path, cli_overrides, display_context=ctx) agent_missing = check_agents(cli_overrides, display_context=ctx) config_ok &= not agent_missing config_ok &= _check_mcp_servers(workspace_scope, display_context=ctx) config_ok &= _check_workspace_files(display_context=ctx) # Pre-flight validation using policy system validation_ok = _run_preflight_validation( config_path, cli_overrides, workspace_scope, display_context=ctx ) # Build and print next steps prompt_path = workspace_scope.root / "PROMPT.md" prompt_exists = prompt_path.exists() prompt_has_sentinel = False if prompt_exists: try: sentinel = _load_starter_prompt_sentinel() prompt_has_sentinel = sentinel in prompt_path.read_text(encoding="utf-8") except Exception: pass next_steps = build_next_steps( validation_ok=validation_ok, agent_missing=agent_missing, prompt_exists=prompt_exists, prompt_has_sentinel=prompt_has_sentinel, ) _print_next_steps_panel(next_steps, display_context=ctx) console.print() if not validation_ok: return 2 if not config_ok: return 1 return 0
[docs] def build_next_steps( *, validation_ok: bool, agent_missing: bool, prompt_exists: bool, prompt_has_sentinel: bool, ) -> list[str]: """Build the list of remediation steps based on current diagnostic state. Args: validation_ok: Whether pre-flight validation passed. agent_missing: Whether any configured agent is missing from PATH. prompt_exists: Whether PROMPT.md exists in the workspace. prompt_has_sentinel: Whether PROMPT.md still contains the starter sentinel. Returns: List of human-readable remediation lines. """ steps: list[str] = [] if not prompt_exists: steps.append(f"Run `{INIT_COMMAND}` to scaffold PROMPT.md and project config files.") elif prompt_has_sentinel: steps.append(starter_prompt_validation_hint()) if agent_missing: steps.append( "Install at least one supported agent: " "Claude Code (https://docs.claude.com/claude-code), " "Codex CLI (https://codex.openai.com), " "OpenCode (https://opencode.ai), " "or Google Anti Gravity (https://github.com/google-antigravity/antigravity-cli)." ) if not validation_ok: steps.append( "Pre-flight validation failed: see the Pre-flight Validation table above. " "Fix policy errors with `ralph --regenerate-config` if config files were edited." ) if not steps: steps.append(f"Run `{RUN_COMMAND}` to start the pipeline.") return steps
def _print_next_steps_panel(steps: list[str], *, display_context: DisplayContext) -> None: """Print the Next steps panel to the console.""" c = display_context.console content = Text() for i, step in enumerate(steps): if i > 0: content.append("\n") content.append(f" • {step}") content.append("\n\n") content.append("New to Ralph Workflow? ", style="theme.text.muted") content.append(GETTING_STARTED_DOC, style="theme.text.muted") content.append(" — step-by-step walkthrough.", style="theme.text.muted") c.print(Panel(content, title="Next steps", border_style="theme.phase.planning", padding=(1, 2))) def _run_preflight_validation( config_path: Path | None, cli_overrides: dict[str, object] | None, workspace_scope: WorkspaceScope, *, display_context: DisplayContext, ) -> bool: """Run pre-flight validation on policy configuration. Args: config_path: Optional path to config file. cli_overrides: CLI flag overrides. workspace_scope: Workspace scope. display_context: DisplayContext providing the console for output. Returns: True if validation passes, False otherwise. """ c = display_context.console table = Table(title="Pre-flight Validation", show_header=False) table.add_column("Check", style="theme.cat.meta") table.add_column("Status") try: # Load UnifiedConfig for agent registry config = load_config(config_path, cli_overrides, workspace_scope=workspace_scope) registry = AgentRegistry.from_config(config) # Determine policy directory if config_path is not None: policy_dir = config_path.parent has_effective_policy_files = any( (policy_dir / name).exists() for name in ( "ralph-workflow.toml", "agents.toml", "pipeline.toml", "artifacts.toml", ) ) else: policy_dir = workspace_scope.resolve_agent_file("pipeline.toml").parent has_effective_policy_files = any( workspace_scope.resolve_agent_file(name).exists() for name in ( "ralph-workflow.toml", "agents.toml", "pipeline.toml", "artifacts.toml", ) ) if not has_effective_policy_files: table.add_row( "Pre-flight", Text( "Skipped: project is not initialized yet (run `ralph --init`)", style="theme.status.warning", ), ) c.print(table) return True # Load PolicyBundle for validation bundle = ( load_policy(policy_dir, config=config) if config_path is not None else load_policy_for_workspace_scope(workspace_scope, config=config) ) # Run validators validate_agent_chains_satisfiable(bundle, registry) validate_recovery_config(bundle) table.add_row("Agent chains", Text("Satisfiable", style="theme.status.success")) table.add_row("Recovery config", Text("Valid", style="theme.status.success")) c.print(table) return True except PolicyValidationError as e: table.add_row("Policy validation", _status_text("Failed", e.message, "theme.status.error")) c.print(table) return False except Exception as e: table.add_row("Pre-flight", _status_text("Error", str(e), "theme.status.error")) c.print(table) return False def _check_git_repo(*, display_context: DisplayContext) -> bool: """Check git repository status. Args: display_context: DisplayContext providing the console for output. Returns: True if check passed, False otherwise. """ c = display_context.console table = Table(title="Git Repository", show_header=False) table.add_column("Check", style="theme.cat.meta") table.add_column("Status") try: repo_root = find_repo_root() table.add_row("Repository root", str(repo_root)) except Exception as e: table.add_row("Repository", _status_text("Error", str(e), "theme.status.error")) c.print(table) return False try: clean = is_repo_clean(repo_root) if clean: table.add_row("Working tree", Text("Clean", style="theme.status.success")) else: table.add_row( "Working tree", Text("Has uncommitted changes", style="theme.status.warning") ) except Exception as e: table.add_row("Working tree", _status_text("Error", str(e), "theme.status.error")) c.print(table) return True def _check_configuration( config_path: Path | None, cli_overrides: dict[str, object] | None, *, display_context: DisplayContext, ) -> bool: """Check configuration validity. Args: config_path: Optional path to config file. cli_overrides: CLI flag overrides. display_context: DisplayContext providing the console for output. Returns: True if check passed, False otherwise. """ c = display_context.console table = Table(title="Configuration", show_header=False) table.add_column("Check", style="theme.cat.meta") table.add_column("Status") try: workspace_scope = None if config_path is not None else resolve_workspace_scope() config = load_config(config_path, cli_overrides, workspace_scope=workspace_scope) table.add_row("Config loaded", Text("Success", style="theme.status.success")) table.add_row("Developer iters", str(config.general.developer_iters)) table.add_row("Checkpoint enabled", str(config.general.workflow.checkpoint_enabled)) except Exception as e: table.add_row("Config loaded", _status_text("Error", str(e), "theme.status.error")) c.print(table) return False c.print(table) return True
[docs] def check_agents( cli_overrides: dict[str, object] | None, *, display_context: DisplayContext, ) -> bool: """Check agent availability and return True if any agent is missing from PATH. Args: cli_overrides: CLI flag overrides. display_context: DisplayContext providing the console for output. Returns: True if at least one agent is missing from PATH, False otherwise. """ c = display_context.console table = Table(title="Agents") table.add_column("Agent", style="theme.cat.meta") table.add_column("Config") table.add_column("PATH") any_missing = False try: config = load_config(None, cli_overrides, workspace_scope=resolve_workspace_scope()) registry = AgentRegistry.from_config(config) agent_names = registry.list_agents() if not agent_names: table.add_row("(none)", Text("No agents configured", style="theme.status.warning"), "-") else: availability = check_agent_availability(registry) path_by_name: dict[str, Text] = {} for name, status in availability: if status == "available": path_by_name[name] = Text("on PATH", style="theme.status.success") else: path_by_name[name] = Text("missing", style="theme.status.warning") any_missing = True for name in agent_names: agent = registry.get(name) cmd = agent.cmd if agent else "" path_status = path_by_name.get(name, Text("missing", style="theme.status.warning")) config_cell = _status_text("Configured", cmd, "theme.status.success") table.add_row(name, config_cell, path_status) except Exception as e: table.add_row("Agents", _status_text("Error", str(e), "theme.status.error"), "-") c.print(table) return True c.print(table) return any_missing
def _check_mcp_servers( workspace_scope: WorkspaceScope, *, display_context: DisplayContext, ) -> bool: """Render custom MCP server health and per-agent transport compatibility. Args: workspace_scope: Workspace scope. display_context: DisplayContext providing the console for output. Returns: True if check passed, False otherwise. """ c = display_context.console _print_effective_session_mcp_inventory(c, workspace_scope.root) ok, healthy_servers = _render_custom_mcp_server_table(c, workspace_scope.root) if not ok or not healthy_servers: return ok _print_agent_transport_compatibility(c, healthy_servers, workspace_scope.root) return True def _render_custom_mcp_server_table( console: Console, workspace_root: Path ) -> tuple[bool, tuple[UpstreamMcpServer, ...]]: """Print custom MCP health table and return whether it succeeded.""" upstreams = mcp_toml_as_upstreams(workspace_root) server_table = Table(title="Custom MCP Servers") server_table.add_column("Server", style="theme.cat.meta") server_table.add_column("Transport") server_table.add_column("Status") server_table.add_column("Tools") server_table.add_column("Detail") if not upstreams: server_table.add_row( "(none)", "-", Text("No custom MCP servers configured", style="theme.status.warning"), "-", "-", ) console.print(server_table) return True, () try: report = validate_upstream_mcp_servers(upstreams, strict=False) except Exception as exc: server_table.add_row( "(validator)", "-", _status_text("Error", str(exc), "theme.status.error"), "-", "-", ) console.print(server_table) return False, () for entry in report.servers: status = ( Text("ok", style="theme.status.success") if entry.ok else Text("failed", style="theme.status.error") ) detail = entry.error or "" if entry.secret_keys: keys = ",".join(entry.secret_keys) detail = f"{detail} (env: {keys})" if detail else f"env: {keys}" server_table.add_row( entry.name, entry.transport, status, str(entry.tool_count), detail or "-", ) console.print(server_table) healthy_names = {r.name for r in report.servers if r.ok} healthy_servers = tuple(s for s in upstreams if s.name in healthy_names) return True, healthy_servers def _print_agent_transport_compatibility( console: Console, healthy_servers: tuple[UpstreamMcpServer, ...], workspace_root: Path, ) -> None: """Print per-agent MCP transport compatibility for healthy custom servers.""" probe_table = Table(title="Agent Transport Compatibility") probe_table.add_column("Server", style="theme.cat.meta") probe_table.add_column("Claude") probe_table.add_column("Codex") probe_table.add_column("OpenCode") probe_table.add_column("AGY") probes = probe_agent_transports(healthy_servers, workspace_path=workspace_root) by_server: dict[str, dict[str, Text]] = {} for probe in probes: if probe.note and probe.ok: cell = Text("-", style="theme.status.warning") elif probe.ok: cell = Text("✓", style="theme.status.success") else: cell = Text("✗", style="theme.status.error") by_server.setdefault(probe.server_name, {})[probe.transport.value] = cell for server in healthy_servers: cells = by_server.get(server.name, {}) probe_table.add_row( server.name, cells.get("claude", Text("-")), cells.get("codex", Text("-")), cells.get("opencode", Text("-")), cells.get("agy", Text("-")), ) console.print(probe_table) def _print_effective_session_mcp_inventory(console: Console, workspace_root: Path) -> None: effective_mcp = resolve_effective_session_mcp_plan( workspace_root, agent_upstream_servers=( *load_existing_claude_upstream_servers(workspace_root), *load_existing_agy_upstream_servers(workspace_root), ), ) inventory_table = Table(title="Effective Session MCP Inventory") inventory_table.add_column("Server", style="theme.cat.meta") inventory_table.add_column("Origin") inventory_table.add_column("Transport") inventory_table.add_column("Exposure") if effective_mcp.effective_servers: for server in effective_mcp.effective_servers: inventory_table.add_row( server.name, server.origin, server.transport, _inventory_exposure(server.origin), ) else: inventory_table.add_row("(none)", "-", "-", "No effective session MCP servers") console.print(inventory_table) def _inventory_exposure(origin: str) -> str: if origin == "custom": return "proxied via ralph_custom__*" return "proxied via ralph_upstream__*" def _check_workspace_files(*, display_context: DisplayContext) -> bool: """Check workspace files. Args: display_context: DisplayContext providing the console for output. Returns: True if check passed, False otherwise. """ c = display_context.console table = Table(title="Workspace Files", show_header=False) table.add_column("File", style="theme.cat.meta") table.add_column("Status") workspace_files = [ ("PROMPT.md", "Implementation prompt"), (".agent/ralph-workflow.toml", "Local config"), (".agent/checkpoint.json", "Checkpoint"), ] for file_path, description in workspace_files: path = Path(file_path) file_label = Text() file_label.append(f"{file_path} ({description})") if path.exists(): size = path.stat().st_size table.add_row( file_label, _status_text("Exists", f"{size} bytes", "theme.status.success") ) else: table.add_row(file_label, Text("Not found", style="theme.status.warning")) c.print(table) return True def _status_text(label: str, detail: str, style: str) -> Text: text = Text() text.append(f"{label}:", style=style) text.append(" ") text.append(detail) return text check_git_repo = _check_git_repo check_configuration = _check_configuration check_mcp_servers = _check_mcp_servers check_workspace_files = _check_workspace_files