Source code for ralph.agents.execution_state.claude_execution_strategy

"""Execution strategy for Claude agents."""

from __future__ import annotations

import json
from typing import cast

from ralph.agents.activity import AgentActivityKind, AgentActivitySignal

from ._helpers import _classify_claude_json_object, _classify_claude_prefixed_line
from .generic_execution_strategy import GenericExecutionStrategy


[docs] class ClaudeExecutionStrategy(GenericExecutionStrategy): """Claude-aware activity classification for watchdog control flow."""
[docs] def classify_activity_line(self, line: str) -> AgentActivitySignal | None: stripped = line.strip() if not stripped: return None prefixed_signal = _classify_claude_prefixed_line(stripped) if prefixed_signal is not None: return prefixed_signal try: parsed = cast("object", json.loads(stripped, strict=False)) except json.JSONDecodeError: return AgentActivitySignal(AgentActivityKind.OUTPUT_LINE, raw=line) if not isinstance(parsed, dict): return AgentActivitySignal(AgentActivityKind.OUTPUT_LINE, raw=line) obj = cast("dict[str, object]", parsed) return _classify_claude_json_object(obj, line)