Source code for ralph.pipeline.waiting_dispatch

"""Dispatch waiting status events to pipeline subscribers."""

from __future__ import annotations

from typing import TYPE_CHECKING

from loguru import logger

if TYPE_CHECKING:
    from ralph.display.subscriber import PipelineSubscriber


[docs] def dispatch_waiting_event( event: object, *, subscriber: PipelineSubscriber | None, unit_id: str, agent_name: str, ) -> None: """Dispatch a WaitingStatusEvent to the subscriber. Exposed as a free function so tests can exercise it without a full pipeline. """ if subscriber is not None: try: subscriber.record_waiting_status(event, unit_id=unit_id, agent_name=agent_name) except Exception: logger.debug("dispatch_waiting_event.record_waiting_status failed", exc_info=True)