Source code for ralph.recovery.events

"""Structured failure events and event bus for recovery observability."""

from __future__ import annotations

import contextlib
from collections.abc import Callable

from loguru import logger

from ralph.recovery.failure_event import FailureEvent
from ralph.recovery.fallover_event import FalloverEvent

__all__ = ["FailureEvent", "FailureEventBus", "FalloverEvent", "UnsubscribeFn"]

UnsubscribeFn = Callable[[], None]

_AnyListener = Callable[[FailureEvent | FalloverEvent], None]


[docs] class FailureEventBus: """Simple publish/subscribe bus for failure and fallover events.""" def __init__(self) -> None: self._listeners: list[_AnyListener] = [] def publish(self, evt: FailureEvent | FalloverEvent) -> None: if isinstance(evt, FailureEvent): category: str = evt.category agent_field: str | None = evt.agent counted: bool | None = evt.counted_against_budget else: category = "fallover" agent_field = evt.from_agent counted = None logger.bind(recovery=True).debug( "Recovery event: category={} phase={} agent={} counted={}", category, evt.phase, agent_field, counted, ) for listener in list(self._listeners): try: listener(evt) except Exception: logger.debug("FailureEventBus listener raised", exc_info=True)
[docs] def subscribe(self, cb: _AnyListener) -> UnsubscribeFn: """Register a listener. Returns a callable that unsubscribes it.""" self._listeners.append(cb) def _unsubscribe() -> None: with contextlib.suppress(ValueError): self._listeners.remove(cb) return _unsubscribe