Source code for ralph.cli.commands.smoke

"""Manual smoke tests for expensive agent-runtime checks.

These smoke tests are intentionally excluded from the verify pipeline because they
consume live agent tokens. They exist to help operators validate real-world agent
behavior, especially interactive-Claude parity, when changing the runtime.
"""

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING
from uuid import uuid4

from rich.panel import Panel
from rich.table import Table

from ralph.agents.execution_state import strategy_for_transport
from ralph.agents.invoke import (
    AgentInvocationError,
    InvokeOptions,
    InvokeRuntimeOptions,
    OpenCodeResumableExitError,
    build_invoke_options_from_config,
    extract_session_id,
    invoke_agent,
)
from ralph.agents.parsers import get_parser
from ralph.agents.registry import AgentRegistry
from ralph.cli.commands.smoke_run_params import SmokeRunParams
from ralph.config.enums import AgentTransport
from ralph.config.loader import load_config
from ralph.display.context import DisplayContext, make_display_context
from ralph.mcp.artifacts.smoke_test_result import (
    SMOKE_TEST_RESULT_ARTIFACT_TYPE,
    read_smoke_test_result_artifact,
)
from ralph.mcp.protocol.session import MCP_ENDPOINT_ENV, MCP_RUN_ID_ENV, AgentSession
from ralph.mcp.server.lifecycle import McpServerExtras, SessionBridgeLike, start_mcp_server
from ralph.mcp.session_plan import build_session_mcp_plan
from ralph.pipeline.activity_stream import stream_parsed_agent_activity
from ralph.policy.loader import load_agents_policy_for_workspace_scope
from ralph.prompts.materialize import submit_artifact_tool_name_for_transport
from ralph.workspace.fs import FsWorkspace
from ralph.workspace.scope import resolve_workspace_scope

if TYPE_CHECKING:
    from ralph.config.models import AgentConfig, UnifiedConfig

_SMOKE_RELATIVE_DIR = Path("tmp/interactive-claude-smoke")
_SMOKE_OUTPUT_FILE = _SMOKE_RELATIVE_DIR / "todo-list.js"
_PROMPT_FILE = _SMOKE_RELATIVE_DIR / "PROMPT.md"
_INTERACTIVE_AGENT = "claude/haiku"
_SMOKE_RUN_ID = "interactive-claude-smoke"
_SMOKE_IDLE_TIMEOUT_SECONDS = 30.0
_SMOKE_MAX_SESSION_SECONDS = 120.0
_SMOKE_MAX_TURNS = 5
_MAX_MEANINGFUL_OUTPUT_LINES = 8
_MIN_MEANINGFUL_OUTPUT_LINES = 3
_MAX_VISIBLE_OUTPUT_LINES = 80
_HEADLESS_SEMANTIC_GUIDE = (
    "session capture, tool activity, completion signal, parser events, and tmp/ artifact creation"
)
_PERMISSION_PROMPT_MARKERS = (
    "allow?",
    "approve",
    "permission",
    "y/n",
)
_CRASH_MARKERS = (
    "traceback",
    "fatal",
    "segmentation fault",
    "panic",
    "crash",
)


[docs] @dataclass(frozen=True) class SmokeRunResult: """Observed results from the interactive Claude smoke run.""" agent_name: str transport: str output_file: Path file_created: bool session_id: str | None explicit_completion_seen: bool raw_line_count: int parsed_event_count: int tool_activity_seen: bool artifact_submitted: bool meaningful_output_lines: list[str] errors: list[str]
def _build_smoke_prompt(output_relpath: str, *, submit_artifact_tool_name: str) -> str: """Return the prompt used for the parity smoke test.""" return ( "Create a small JavaScript todo list implementation at " f"`{output_relpath}`.\n\n" "Requirements:\n" "- Keep it tiny: one file only.\n" "- Export a small in-memory todo list API.\n" "- Do not touch files outside tmp/.\n" "- Use the headless semantic guide as a rubric: session capture, tool activity, " "completion signal, parser events, and tmp artifact creation.\n" f"- Call `{submit_artifact_tool_name}` with " f'artifact_type="{SMOKE_TEST_RESULT_ARTIFACT_TYPE}" ' "and report what worked and what broke.\n" "- When finished, call declare_complete.\n" ) def _count_parsed_events(config: AgentConfig, lines: list[str]) -> int: parser = get_parser(config.json_parser) return sum(1 for _ in parser.parse(iter(lines))) def _tool_activity_seen(config: AgentConfig, lines: list[str]) -> bool: strategy = strategy_for_transport(config.transport) for line in lines: signal = strategy.classify_activity_line(line) if signal is not None and signal.kind.value == "tool_use": return True return False def _meaningful_output_lines(config: AgentConfig, lines: list[str]) -> list[str]: parser = get_parser(config.json_parser) collected: list[str] = [] for parsed in parser.parse(iter(lines)): content = parsed.content.strip() if parsed.type in {"text", "thinking", "tool_use", "tool_result", "error"} and content: collected.append(f"{parsed.type}: {content}") if len(collected) >= _MAX_MEANINGFUL_OUTPUT_LINES: break return collected def _detect_break_indicators(lines: list[str]) -> list[str]: errors: list[str] = [] lowered = [line.strip().lower() for line in lines] if any(any(marker in line for marker in _PERMISSION_PROMPT_MARKERS) for line in lowered): errors.append("unexpected permission prompt observed in transcript") if any(any(marker in line for marker in _CRASH_MARKERS) for line in lowered): errors.append("crash-like transcript output observed") return errors def _start_smoke_bridge(repo_root: Path, *, config: UnifiedConfig) -> SessionBridgeLike: workspace_scope = resolve_workspace_scope(repo_root) agents_policy = load_agents_policy_for_workspace_scope(workspace_scope, config=config) session_mcp_plan = build_session_mcp_plan( transport=None, drain="development", workspace_path=repo_root, agents_policy=agents_policy, ) session = AgentSession( session_id=f"smoke-{uuid4().hex[:8]}", run_id=str(uuid4()), drain="development", capabilities=set(session_mcp_plan.capabilities), model_identity=session_mcp_plan.model_identity, stored_capability_profile=session_mcp_plan.capability_profile, ) workspace = FsWorkspace(repo_root) return start_mcp_server( session, workspace, extras=McpServerExtras(extra_env=session_mcp_plan.server_env) ) def _smoke_bridge_env(bridge: SessionBridgeLike) -> dict[str, str]: return { MCP_ENDPOINT_ENV: bridge.agent_endpoint_uri(), MCP_RUN_ID_ENV: _SMOKE_RUN_ID, } def _with_session_id(options: InvokeOptions, session_id: str | None) -> InvokeOptions: return InvokeOptions( model_flag=options.model_flag, session_id=session_id, verbose=options.verbose, show_progress=options.show_progress, workspace_path=options.workspace_path, extra_env=options.extra_env, idle_timeout_seconds=options.idle_timeout_seconds, drain_window_seconds=options.drain_window_seconds, max_waiting_on_child_seconds=options.max_waiting_on_child_seconds, idle_poll_interval_seconds=options.idle_poll_interval_seconds, parent_exit_grace_seconds=options.parent_exit_grace_seconds, descendant_wait_timeout_seconds=options.descendant_wait_timeout_seconds, descendant_wait_poll_seconds=options.descendant_wait_poll_seconds, process_exit_wait_seconds=options.process_exit_wait_seconds, max_session_seconds=options.max_session_seconds, waiting_status_interval_seconds=options.waiting_status_interval_seconds, suspect_waiting_on_child_seconds=options.suspect_waiting_on_child_seconds, child_progress_ttl_seconds=options.child_progress_ttl_seconds, child_heartbeat_ttl_seconds=options.child_heartbeat_ttl_seconds, child_stale_label_ttl_seconds=options.child_stale_label_ttl_seconds, child_exit_reconcile_seconds=options.child_exit_reconcile_seconds, max_waiting_on_child_no_progress_seconds=options.max_waiting_on_child_no_progress_seconds, pure=options.pure, system_prompt_file=options.system_prompt_file, waiting_listener=options.waiting_listener, required_artifact=options.required_artifact, explicit_completion_seen=options.explicit_completion_seen, captured_session_id=options.captured_session_id, initial_session_id=options.initial_session_id, settings_json=options.settings_json, stop_sentinel_path=options.stop_sentinel_path, ) def _execute_smoke_turns( params: SmokeRunParams, current_session_id: str | None, ) -> tuple[list[str], list[str], str | None, AgentInvocationError | None]: """Execute smoke test turns and return collected lines and state.""" all_lines: list[str] = [] live_output_lines: list[str] = [] final_exception: AgentInvocationError | None = None for _attempt in range(_SMOKE_MAX_TURNS): raw_lines: list[str] = [] rendered_lines: list[str] = [] try: line_iter = invoke_agent( params.config, str(params.prompt_file), options=_with_session_id(params.options, current_session_id), ) stream_parsed_agent_activity( line_iter, parser_type=str(params.config.json_parser), agent_name=params.agent_name, display=None, transport=params.config.transport, display_context=params.display_context, raw_output_sink=raw_lines, rendered_output_sink=rendered_lines, session_id_sink=None, ) all_lines.extend(raw_lines) live_output_lines.extend(rendered_lines) break except OpenCodeResumableExitError as exc: all_lines.extend(raw_lines) live_output_lines.extend(rendered_lines) current_session_id = exc.resumable_session_id or extract_session_id(raw_lines) final_exception = exc continue except AgentInvocationError as exc: all_lines.extend(raw_lines or exc.parsed_output) live_output_lines.extend(rendered_lines) final_exception = exc break return all_lines, live_output_lines, current_session_id, final_exception def _detect_smoke_errors( params: SmokeRunParams, lines: list[str], live_output_lines: list[str], session_id: str | None, final_exception: AgentInvocationError | None, ) -> list[str]: """Detect errors in smoke run results.""" errors = _detect_break_indicators(lines) if final_exception is not None: errors.append(str(final_exception)) if not params.output_file.exists(): errors.append("expected todo-list.js was not created") if session_id is None: errors.append("session ID was not observed") explicit_completion_seen = any("Task declared complete:" in line for line in lines) if not explicit_completion_seen: errors.append("declare_complete marker was not observed") parsed_event_count = _count_parsed_events(params.config, lines) if lines else 0 if parsed_event_count == 0: errors.append("no parser events were observed") tool_activity_seen = _tool_activity_seen(params.config, lines) if lines else False if not tool_activity_seen: errors.append("no tool activity was observed") if not read_smoke_test_result_artifact(params.workspace_root): errors.append("smoke_test_result artifact was not submitted") meaningful_output = [line for line in live_output_lines if line.strip()] meaningful_output = meaningful_output[:_MAX_MEANINGFUL_OUTPUT_LINES] if len(meaningful_output) < _MIN_MEANINGFUL_OUTPUT_LINES: errors.append("fewer than 3 meaningful output lines were observed") visible_output_count = len([line for line in live_output_lines if line.strip()]) if visible_output_count > _MAX_VISIBLE_OUTPUT_LINES: errors.append( "interactive output overran into too many visible lines; " "semantic output parity is still insufficient" ) return errors def _run_smoke_agent(params: SmokeRunParams) -> SmokeRunResult: """Run the smoke agent and return results.""" all_lines, live_output_lines, current_session_id, final_exception = _execute_smoke_turns( params, None ) lines = all_lines session_id = current_session_id or extract_session_id(lines) explicit_completion_seen = any("Task declared complete:" in line for line in lines) parsed_event_count = _count_parsed_events(params.config, lines) if lines else 0 tool_activity_seen = _tool_activity_seen(params.config, lines) if lines else False meaningful_output_lines = [line for line in live_output_lines if line.strip()][ :_MAX_MEANINGFUL_OUTPUT_LINES ] if not meaningful_output_lines: meaningful_output_lines = _meaningful_output_lines(params.config, lines) if lines else [] errors = _detect_smoke_errors( params, lines, live_output_lines, session_id, final_exception, ) config = params.config transport_name = config.transport.value if config.transport is not None else "generic" return SmokeRunResult( agent_name=params.agent_name, transport=transport_name, output_file=params.output_file, file_created=params.output_file.exists(), session_id=session_id, explicit_completion_seen=explicit_completion_seen, raw_line_count=len([line for line in live_output_lines if line.strip()]), parsed_event_count=parsed_event_count, tool_activity_seen=tool_activity_seen, artifact_submitted=read_smoke_test_result_artifact(params.workspace_root) is not None, meaningful_output_lines=meaningful_output_lines, errors=errors, ) def _render_smoke_report(results: list[SmokeRunResult]) -> str: """Render a human-readable parity report.""" lines = [ "Interactive Claude parity smoke report", "", f"Headless semantic guide: {_HEADLESS_SEMANTIC_GUIDE}", "", ] for result in results: lines.append(f"Agent: {result.agent_name} ({result.transport})") lines.append("Observed working:") working: list[str] = [] if result.file_created: working.append(f"- created {result.output_file}") if result.session_id is not None: working.append(f"- session ID observed: {result.session_id}") if result.explicit_completion_seen: working.append("- declare_complete marker observed") if result.parsed_event_count > 0: working.append(f"- parser emitted {result.parsed_event_count} event(s)") if result.tool_activity_seen: working.append("- tool activity observed") if result.artifact_submitted: working.append("- smoke_test_result artifact submitted") lines.extend(working or ["- none"]) lines.append("Observed output:") lines.extend([f"- {line}" for line in result.meaningful_output_lines] or ["- none"]) lines.append("Observed breaks:") lines.extend([f"- {error}" for error in result.errors] or ["- No breaks observed"]) if any("no output" in error.lower() for error in result.errors): lines.append( "- HUGE RED FLAG: repeated 'idle watchdog: drain window active' logs " "before firing mean the interpreter lost semantic visibility while " "the watchdog kept doing its job." ) if any("overran" in error.lower() for error in result.errors): lines.append( "- HUGE RED FLAG: the interactive stream printed too many visible " "lines without enough semantic compression, so operator-visible " "parity is still broken." ) lines.append("") return "\n".join(lines).rstrip() def _render_smoke_table(results: list[SmokeRunResult], *, display_context: DisplayContext) -> None: console = display_context.console table = Table(title="Interactive Claude parity smoke test", show_lines=False) table.add_column("Agent") table.add_column("Transport") table.add_column("File") table.add_column("Session") table.add_column("Parser events") table.add_column("Tool activity") table.add_column("Artifact") table.add_column("Breaks") for result in results: table.add_row( result.agent_name, result.transport, "yes" if result.file_created else "no", result.session_id or "missing", str(result.parsed_event_count), "yes" if result.tool_activity_seen else "no", "yes" if result.artifact_submitted else "no", "none" if not result.errors else "; ".join(result.errors), ) console.print(table) console.print(Panel(_render_smoke_report(results), title="Detailed report"))
[docs] def smoke_interactive_claude_command(*, display_context: DisplayContext | None = None) -> int: """Run a token-consuming manual parity smoke test for interactive Claude.""" ctx = display_context if display_context is not None else make_display_context() workspace_scope = resolve_workspace_scope() workspace_root = workspace_scope.root smoke_dir = workspace_root / _SMOKE_RELATIVE_DIR smoke_dir.mkdir(parents=True, exist_ok=True) prompt_file = workspace_root / _PROMPT_FILE output_file = workspace_root / _SMOKE_OUTPUT_FILE config = load_config(None, {}, workspace_scope=workspace_scope) registry = AgentRegistry.from_config(config) agent_config = registry.get(_INTERACTIVE_AGENT) if agent_config is None: raise RuntimeError( f"Smoke test agent '{_INTERACTIVE_AGENT}' is unavailable in the registry" ) submit_artifact_tool_name = submit_artifact_tool_name_for_transport(agent_config.transport) prompt_file.write_text( _build_smoke_prompt( _SMOKE_OUTPUT_FILE.as_posix(), submit_artifact_tool_name=submit_artifact_tool_name, ), encoding="utf-8", ) bridge = _start_smoke_bridge(workspace_root, config=config) try: if output_file.exists(): output_file.unlink() options = build_invoke_options_from_config( config.general, InvokeRuntimeOptions( verbose=False, show_progress=False, workspace_path=workspace_root, extra_env=_smoke_bridge_env(bridge), pure=agent_config.transport == AgentTransport.OPENCODE, ), ) options = InvokeOptions( model_flag=options.model_flag, session_id=options.session_id, verbose=options.verbose, show_progress=options.show_progress, workspace_path=options.workspace_path, extra_env=options.extra_env, idle_timeout_seconds=_SMOKE_IDLE_TIMEOUT_SECONDS, drain_window_seconds=options.drain_window_seconds, max_waiting_on_child_seconds=options.max_waiting_on_child_seconds, idle_poll_interval_seconds=options.idle_poll_interval_seconds, parent_exit_grace_seconds=options.parent_exit_grace_seconds, descendant_wait_timeout_seconds=options.descendant_wait_timeout_seconds, descendant_wait_poll_seconds=options.descendant_wait_poll_seconds, process_exit_wait_seconds=options.process_exit_wait_seconds, max_session_seconds=_SMOKE_MAX_SESSION_SECONDS, waiting_status_interval_seconds=options.waiting_status_interval_seconds, suspect_waiting_on_child_seconds=options.suspect_waiting_on_child_seconds, child_progress_ttl_seconds=options.child_progress_ttl_seconds, child_heartbeat_ttl_seconds=options.child_heartbeat_ttl_seconds, child_stale_label_ttl_seconds=options.child_stale_label_ttl_seconds, child_exit_reconcile_seconds=options.child_exit_reconcile_seconds, max_waiting_on_child_no_progress_seconds=options.max_waiting_on_child_no_progress_seconds, pure=options.pure, system_prompt_file=options.system_prompt_file, waiting_listener=options.waiting_listener, required_artifact=options.required_artifact, explicit_completion_seen=options.explicit_completion_seen, captured_session_id=options.captured_session_id, ) results = [ _run_smoke_agent( SmokeRunParams( agent_name=_INTERACTIVE_AGENT, config=agent_config, workspace_root=workspace_root, prompt_file=prompt_file, output_file=output_file, options=options, display_context=ctx, ) ) ] finally: bridge.shutdown() _render_smoke_table(results, display_context=ctx) return 0 if all(not result.errors for result in results) else 1
build_smoke_prompt = _build_smoke_prompt render_smoke_report = _render_smoke_report __all__ = [ "SmokeRunParams", "SmokeRunResult", "build_smoke_prompt", "render_smoke_report", "smoke_interactive_claude_command", ]