Source code for ralph.agents.agent_chain

"""Agent fallback chain with retry and backoff behavior."""

from __future__ import annotations

import time
from typing import TYPE_CHECKING

from loguru import logger

if TYPE_CHECKING:
    from collections.abc import Callable


[docs] class AgentChain: """Manages agent fallback chain with retry logic. The chain maintains an ordered list of agents and handles: - Current agent selection - Retry counting and limits - Exponential backoff between retries - Fallback to next agent on exhaustion Attributes: agents: List of agent names in the chain. current_index: Index of the currently selected agent. retries: Number of retries for current agent. max_retries: Maximum retries before falling back. retry_delay_ms: Base delay between retries in milliseconds. backoff_multiplier: Multiplier for exponential backoff. max_backoff_ms: Maximum backoff delay in milliseconds. """ def __init__( self, agents: list[str], max_retries: int = 3, retry_delay_ms: int = 1000, backoff_multiplier: float = 2.0, max_backoff_ms: int = 60000, ) -> None: """Initialize agent chain. Args: agents: List of agent names in fallback order. max_retries: Maximum retries per agent before fallback. retry_delay_ms: Base delay between retries in milliseconds. backoff_multiplier: Multiplier for exponential backoff. max_backoff_ms: Maximum backoff delay in milliseconds. """ self.agents = agents self.current_index = 0 self.retries = 0 self.max_retries = max_retries self.retry_delay_ms = retry_delay_ms self.backoff_multiplier = backoff_multiplier self.max_backoff_ms = max_backoff_ms @property def current_agent(self) -> str | None: """Get the current agent name. Returns: Agent name or None if chain is exhausted. """ if not self.agents or self.current_index >= len(self.agents): return None return self.agents[self.current_index] @property def is_exhausted(self) -> bool: """Check if all agents in chain are exhausted. Returns: True if no agents remain. """ return self.current_agent is None
[docs] def can_retry(self) -> bool: """Check if current agent can be retried. Returns: True if retries remain for current agent. """ return self.retries < self.max_retries
[docs] def advance(self) -> bool: """Advance to the next agent in the chain. Returns: True if advanced successfully, False if chain exhausted. """ if self.current_index + 1 < len(self.agents): self.current_index += 1 self.retries = 0 logger.debug("Advanced to next agent: {}", self.current_agent) return True logger.debug("Agent chain exhausted") return False
[docs] def record_retry(self) -> None: """Record a retry attempt for current agent.""" self.retries += 1 logger.debug( "Retry {} of {} for agent {}", self.retries, self.max_retries, self.current_agent, )
[docs] def calculate_backoff(self) -> float: """Calculate backoff delay in seconds. Returns: Backoff delay in seconds. """ delay = self.retry_delay_ms * (self.backoff_multiplier**self.retries) return min(delay, self.max_backoff_ms) / 1000.0
[docs] def wait_backoff(self, *, _sleep: Callable[[float], None] = time.sleep) -> None: """Wait for the backoff period.""" backoff = self.calculate_backoff() logger.debug("Backing off for {:.2f} seconds", backoff) _sleep(backoff)