"""Dependency-injected interrupt orchestration helpers.
This module centralizes what should happen when Ralph receives a user interrupt:
record it, stop optional connectivity waits, try a graceful shutdown first, and
escalate to a forced kill plus hard exit on a second interrupt. Keeping these
actions behind an injectable controller makes the behavior testable without
real signals.
"""
from __future__ import annotations
import os
import signal
from contextlib import suppress
from dataclasses import dataclass
from typing import TYPE_CHECKING, cast
from ralph.interrupt.state import request_user_interrupt
from ralph.process.manager import get_process_manager
if TYPE_CHECKING:
from collections.abc import Callable, Iterable
from types import FrameType
from ralph.interrupt.signal_getter import SignalGetter
from ralph.interrupt.signal_setter import SignalSetter
from ralph.process.manager import ProcessManager
_DEFAULT_SIGNAL_GETTER = cast("SignalGetter", signal.getsignal)
_DEFAULT_SIGNAL_SETTER = cast("SignalSetter", signal.signal)
INTERRUPT_EXIT_CODE = 130
[docs]
@dataclass(frozen=True)
class InterruptController:
"""Coordinate graceful and forced interrupt handling through injected seams."""
shutdown_all: Callable[[float], None]
record_interrupt: Callable[[], None] = request_user_interrupt
stop_connectivity: Callable[[], None] | None = None
kill_process_group: Callable[[int, int], None] | None = None
hard_exit: Callable[[int], None] | None = None
[docs]
def begin_interrupt(self, *, grace_period_s: float) -> None:
"""Record the interrupt and attempt graceful tracked-process shutdown."""
self.record_interrupt()
if self.stop_connectivity is not None:
with suppress(Exception):
self.stop_connectivity()
self.shutdown_all(grace_period_s)
[docs]
def force_interrupt(self, *, bridge_pids: Iterable[int] = ()) -> None:
"""Escalate to immediate tracked-process termination."""
self.record_interrupt()
if self.stop_connectivity is not None:
with suppress(Exception):
self.stop_connectivity()
self.shutdown_all(0)
kill_process_group = self.kill_process_group or os.killpg
for pid in bridge_pids:
with suppress(ProcessLookupError, PermissionError):
kill_process_group(pid, signal.SIGKILL)
[docs]
def force_exit(self, *, bridge_pids: Iterable[int] = ()) -> None:
"""Force-kill tracked work and exit with the canonical interrupt code."""
self.force_interrupt(bridge_pids=bridge_pids)
hard_exit = self.hard_exit or os._exit
hard_exit(INTERRUPT_EXIT_CODE)
[docs]
def install_force_kill_handler(
on_force_interrupt: Callable[[], None],
*,
signal_getter: SignalGetter = _DEFAULT_SIGNAL_GETTER,
signal_setter: SignalSetter = _DEFAULT_SIGNAL_SETTER,
) -> Callable[[], None]:
"""Install a temporary SIGINT handler that escalates to forced termination."""
previous = signal_getter(signal.SIGINT)
def _handler(signum: int, frame: FrameType | None) -> None:
del signum, frame
on_force_interrupt()
signal_setter(signal.SIGINT, _handler)
def _restore() -> None:
signal_setter(signal.SIGINT, previous)
return _restore
[docs]
def controller_from_process_manager(
*,
process_manager: ProcessManager | None = None,
stop_connectivity: Callable[[], None] | None = None,
record_interrupt: Callable[[], None] = request_user_interrupt,
kill_process_group: Callable[[int, int], None] | None = None,
hard_exit: Callable[[int], None] | None = None,
) -> InterruptController:
"""Build an :class:`InterruptController` from a ProcessManager instance."""
manager = process_manager or get_process_manager()
def _shutdown_all(grace_period_s: float) -> None:
manager.shutdown_all(grace_period_s=grace_period_s)
return InterruptController(
shutdown_all=_shutdown_all,
record_interrupt=record_interrupt,
stop_connectivity=stop_connectivity,
kill_process_group=kill_process_group,
hard_exit=hard_exit,
)
__all__ = [
"INTERRUPT_EXIT_CODE",
"InterruptController",
"controller_from_process_manager",
"install_force_kill_handler",
]