Source code for ralph.process.manager._singleton
"""Module-level ProcessManager singleton and related helpers."""
from __future__ import annotations
import atexit
import contextlib
from typing import TYPE_CHECKING
from loguru import logger
from ralph.process.manager._process_manager import ProcessManager
from ralph.process.manager._process_termination_error import ProcessTerminationError
if TYPE_CHECKING:
from collections.abc import Generator
from ralph.process.manager._process_manager_policy import ProcessManagerPolicy
class _ProcessManagerState:
"""Mutable holder for module-level singleton and registration flag."""
instance: ProcessManager | None = None
atexit_registered: bool = False
_pm_state = _ProcessManagerState()
def _atexit_shutdown() -> None:
try:
pm = _pm_state.instance
if pm is None:
return
pm.shutdown_all(grace_period_s=0.5)
except BaseException:
pass
[docs]
def get_process_manager(*, policy: ProcessManagerPolicy | None = None) -> ProcessManager:
"""Return the module-level ProcessManager singleton, creating it on first call."""
if _pm_state.instance is None:
_pm_state.instance = ProcessManager(policy=policy)
if not _pm_state.atexit_registered:
atexit.register(_atexit_shutdown)
_pm_state.atexit_registered = True
return _pm_state.instance
def reset_process_manager() -> None:
"""Replace the singleton with a fresh instance. Call from test teardown."""
_pm_state.instance = None
@contextlib.contextmanager
def process_phase_scope(phase_name: str) -> Generator[None, None, None]:
"""Context manager that tears down all processes labeled 'phase:<phase_name>' on exit."""
try:
yield
finally:
try:
get_process_manager().shutdown_all_for_label(
f"phase:{phase_name}",
grace_period_s=get_process_manager().policy.default_grace_period_s,
)
except ProcessTerminationError as exc:
logger.warning(
"phase:{} cleanup could not terminate all processes: {}", phase_name, exc
)