Source code for ralph.process._liveness_probe
"""Protocol for checking liveness of agent processes."""
from __future__ import annotations
from typing import TYPE_CHECKING, Protocol, runtime_checkable
if TYPE_CHECKING:
from ralph.process.child_liveness import ChildActivitySnapshot
[docs]
@runtime_checkable
class LivenessProbe(Protocol):
"""Protocol for checking whether any tracked agent label is still active."""
[docs]
def any_agent_active(self, label_prefix: str) -> bool:
"""Return True if any tracked process whose label starts
with label_prefix is running."""
...
[docs]
def child_snapshot(self, scope_prefix: str) -> ChildActivitySnapshot:
"""Return a freshness-aware snapshot for all children matching scope_prefix."""
...