Source code for ralph.interrupt.asyncio_bridge

"""Asyncio signal bridge for graceful-then-forced SIGINT handling.

Uses ``loop.add_signal_handler()`` — not ``signal.signal()`` — to stay
compatible with the asyncio event loop.

Signal handling contract:

* First ``SIGINT`` records the interrupt, requests graceful tracked-process
  shutdown, and cancels ``root_task``.
* Second ``SIGINT`` force-kills tracked child processes and exits with code 130.

``bridge.pids`` stays synchronized by subscribing to ProcessManager lifecycle
Events, so callers must not register or deregister PIDs manually.
"""

from __future__ import annotations

import signal
from dataclasses import dataclass, field
from typing import TYPE_CHECKING

from loguru import logger

from ralph.interrupt.controller import InterruptController, controller_from_process_manager
from ralph.process.manager import ProcessEvent, ProcessStatus, get_process_manager

if TYPE_CHECKING:
    import asyncio
    from collections.abc import Callable


[docs] @dataclass class SignalBridge: """Bridge that routes OS signals to asyncio task cancellation and process cleanup.""" pids: set[int] = field(default_factory=set) _interrupt_count: int = field(default=0, init=False) _unsubscribe: object = field(default=None, init=False) _connectivity_stop: Callable[[], None] | None = field(default=None, init=False) def register_pid(self, pid: int) -> None: self.pids.add(pid) def deregister_pid(self, pid: int) -> None: self.pids.discard(pid) def _on_process_event(self, event: ProcessEvent) -> None: if event.new_status == ProcessStatus.RUNNING: self.pids.add(event.record.pid) elif event.new_status in ( ProcessStatus.EXITED, ProcessStatus.KILLED, ProcessStatus.FAILED, ): self.pids.discard(event.record.pid)
[docs] def install_signal_handlers( loop: asyncio.AbstractEventLoop, root_task: asyncio.Task[object], bridge: SignalBridge, controller: InterruptController | None = None, ) -> None: """Register SIGINT and SIGTERM handlers that cancel ``root_task`` and forward to child PIDs.""" pm = get_process_manager() bridge._unsubscribe = pm.register_listener(bridge._on_process_event) active_controller = controller or controller_from_process_manager( process_manager=pm, stop_connectivity=bridge._connectivity_stop, ) def _first_sigint() -> None: bridge._interrupt_count += 1 try: active_controller.begin_interrupt(grace_period_s=pm.policy.default_grace_period_s) except Exception: logger.warning("Interrupt controller raised during SIGINT") root_task.cancel() loop.add_signal_handler(signal.SIGINT, _second_sigint) def _second_sigint() -> None: active_controller.force_exit(bridge_pids=list(bridge.pids)) loop.add_signal_handler(signal.SIGINT, _first_sigint)
__all__ = ["SignalBridge", "install_signal_handlers"]