Source code for ralph.interrupt.state

"""Shared process-local interrupt state helpers."""

from __future__ import annotations

import threading

_USER_INTERRUPT_OCCURRED = threading.Event()


[docs] def request_user_interrupt() -> None: """Record that a user interrupt has been requested.""" _USER_INTERRUPT_OCCURRED.set()
[docs] def user_interrupted_occurred() -> bool: """Return ``True`` if any user interrupt has occurred in this process.""" return _USER_INTERRUPT_OCCURRED.is_set()
__all__ = ["request_user_interrupt", "user_interrupted_occurred"]