Source code for ralph.agents.chain

"""Agent fallback chain management with strict drain-to-chain binding.

This module handles the agent fallback chain — the ordered list of agents
to try when an agent fails. It supports retry logic and exponential backoff.

IMPORTANT: This module implements STRICT drain-to-chain binding. Every drain
must have an explicit binding in AgentsPolicy or startup validation fails.
There is NO permissive fallback resolution — no sibling fallback, no inference,
no default chains. If a drain is not bound, DrainNotBoundError is raised.
"""

from __future__ import annotations

from typing import TYPE_CHECKING

from ralph.policy.models import AgentChainConfig, AgentsPolicy, DrainName

from .agent_chain import AgentChain
from .drain_not_bound_error import DrainNotBoundError
from .unknown_agent_error import UnknownAgentError

if TYPE_CHECKING:
    from ralph.config.models import UnifiedConfig

__all__ = [
    "AgentChain",
    "ChainManager",
    "DrainNotBoundError",
    "UnknownAgentError",
    "create_chain_from_config",
]


[docs] class ChainManager: """Manages agent chains with strict drain-to-chain binding. ChainManager is constructed with an AgentsPolicy and provides lookup of chains by drain name. Drain resolution is STRICT — there is no fallback or inference. If a drain is not explicitly bound, DrainNotBoundError is raised. Attributes: agents_policy: The agents policy containing chains and drain bindings. """ def __init__(self, agents_policy: AgentsPolicy) -> None: """Initialize ChainManager with an AgentsPolicy. Args: agents_policy: Validated agents policy with chain and drain definitions. """ self._policy = agents_policy
[docs] @classmethod def from_config(cls, config: UnifiedConfig) -> ChainManager: """Create ChainManager from a legacy UnifiedConfig. This is a compatibility shim that converts the old UnifiedConfig format to the new AgentsPolicy format. Args: config: Legacy unified configuration. Returns: ChainManager instance. """ agent_chains = dict(config.agent_chains) agent_drains = dict(config.agent_drains) policy = AgentsPolicy( agent_chains=agent_chains, agent_drains=agent_drains, ) return cls(policy)
[docs] def chain_for_drain(self, drain: DrainName) -> AgentChainConfig: """Get the chain configuration for a drain. This is the STRICT drain resolution — no fallback, no inference. If the drain is not explicitly bound in agents.toml, DrainNotBoundError is raised at startup before any agent is invoked. Args: drain: Drain name to look up. Returns: AgentChainConfig for the bound chain. Raises: DrainNotBoundError: If the drain is not explicitly bound. """ binding = self._policy.agent_drains.get(drain) if binding is None: raise DrainNotBoundError( drain=drain, available_drains=set(self._policy.agent_drains.keys()), ) chain = self._policy.agent_chains.get(binding.chain) if chain is None: msg = ( f"Drain '{drain}' references chain '{binding.chain}' " f"which is not defined in agent_chains" ) raise ValueError(msg) return chain
[docs] def chain_config_for_drain(self, drain: DrainName) -> AgentChainConfig: """Alias for chain_for_drain for clarity.""" return self.chain_for_drain(drain)
[docs] def validate(self) -> list[str]: """Validate the policy for internal consistency. Returns: List of validation error messages (empty if valid). """ errors: list[str] = [] for drain, binding in self._policy.agent_drains.items(): if binding.chain not in self._policy.agent_chains: errors.append(f"Drain '{drain}' references unknown chain '{binding.chain}'") for name, chain in self._policy.agent_chains.items(): if not chain.agents: errors.append(f"Chain '{name}' has no agents") return errors
[docs] def create_chain_from_config( config: UnifiedConfig, chain_name: str, ) -> AgentChain | None: """Create an AgentChain from UnifiedConfig. Args: config: Unified configuration. chain_name: Name of the chain in agent_chains. Returns: AgentChain instance or None if chain not found. """ chain_config = config.agent_chains.get(chain_name) if chain_config is None: return None return AgentChain( agents=chain_config.agents, max_retries=config.general.max_retries, retry_delay_ms=config.general.retry_delay_ms, backoff_multiplier=config.general.backoff_multiplier, max_backoff_ms=config.general.max_backoff_ms, )