"""In-memory child liveness lease registry and canonical evidence classifier.
This module is the single source of truth for child-evidence freshness decisions.
``classify_child_snapshot()`` is the canonical verdict function: both the
in-stream idle-watchdog path (``execution_state.classify_quiet``) and the post-exit
path (``execution_state.classify_exit`` / ``invoke._evidence_precedence``) must call
this function rather than re-encoding the stale-vs-fresh precedence rules independently.
Evidence is tracked per-child with heartbeat, progress, and terminal-ack signals
using an injectable clock so tests can use deterministic FakeClock-compatible sources.
No on-disk persistence: the registry is instantiated per invoke and lives only as
long as the invocation.
"""
from __future__ import annotations
import time as _time
from typing import TYPE_CHECKING
from ralph.process._alive_by import AliveBy
from ralph.process._child_activity_snapshot import ChildActivitySnapshot
from ralph.process._child_evidence_verdict import ChildEvidenceVerdict
from ralph.process._child_liveness_record import ChildLivenessRecord
from ralph.process._mutable_record import MutableRecord
if TYPE_CHECKING:
from collections.abc import Callable
__all__ = [
"AliveBy",
"ChildActivitySnapshot",
"ChildEvidenceVerdict",
"ChildLivenessRecord",
"ChildLivenessRegistry",
"classify_child_snapshot",
]
[docs]
def classify_child_snapshot(
snapshot: ChildActivitySnapshot,
*,
has_os_descendants: bool = False,
) -> ChildEvidenceVerdict:
"""Classify child-liveness evidence from a snapshot into a typed verdict.
This is the single source of truth for stale/fresh precedence logic.
Both execution_state and invoke corroboration must consume this function
rather than re-implementing the precedence rules independently.
Args:
snapshot: Freshness-aware aggregate snapshot from a registry or probe.
has_os_descendants: Whether OS-level descendants exist (from process tree scan).
Only consulted when no scoped Ralph evidence is present.
Returns:
A ChildEvidenceVerdict encoding alive_by classification, deferral_allowed,
and all_children_terminal flags.
"""
# All Ralph-tracked children are terminal and none remain active.
if snapshot.terminal_count > 0 and snapshot.active_count == 0:
return ChildEvidenceVerdict(
alive_by=None, deferral_allowed=False, all_children_terminal=True
)
if snapshot.has_fresh_progress:
return ChildEvidenceVerdict(alive_by=AliveBy.FRESH_PROGRESS, deferral_allowed=True)
if snapshot.has_fresh_heartbeat and snapshot.has_process:
return ChildEvidenceVerdict(alive_by=AliveBy.FRESH_HEARTBEAT_ONLY, deferral_allowed=True)
if snapshot.has_process:
# Child is in registry but has no fresh heartbeat or progress.
# Deferral is allowed only while the label is still fresh (stale_label_ttl
# grace window) to preserve the grace period for newly registered children
# that have not yet sent their first heartbeat.
return ChildEvidenceVerdict(
alive_by=AliveBy.STALE_LABEL_ONLY,
deferral_allowed=snapshot.has_fresh_label,
)
# No scoped Ralph evidence at all — fall back to OS descendants if present.
if has_os_descendants:
return ChildEvidenceVerdict(
alive_by=AliveBy.OS_DESCENDANT_ONLY_STALE_PROGRESS,
deferral_allowed=True,
)
return ChildEvidenceVerdict(alive_by=None, deferral_allowed=False)
[docs]
class ChildLivenessRegistry:
"""In-memory registry of active child leases with freshness tracking.
All methods are synchronous and safe to call from the main thread. The registry
is not thread-safe by design: the invoke loop drives all operations from a
single call site.
Args:
progress_ttl: Seconds since last progress signal before child is stale.
heartbeat_ttl: Seconds since last heartbeat before heartbeat is stale.
stale_label_ttl: Grace period (seconds) after evidence goes stale.
exit_reconcile: Window (seconds) after terminal ack during which the
record is retained before being dropped from active counts.
now: Callable returning current monotonic time; defaults to time.monotonic.
"""
def __init__(
self,
*,
progress_ttl: float,
heartbeat_ttl: float,
stale_label_ttl: float,
exit_reconcile: float,
now: Callable[[], float] = _time.monotonic,
) -> None:
self._progress_ttl = progress_ttl
self._heartbeat_ttl = heartbeat_ttl
self._stale_label_ttl = stale_label_ttl
self._exit_reconcile = exit_reconcile
self._now = now
self._records: dict[str, MutableRecord] = {}
[docs]
def register_child(
self,
child_id: str,
scope_prefix: str,
*,
pid: int | None = None,
phase: str = "spawned",
) -> None:
"""Register a new child with the registry."""
t = self._now()
self._records[child_id] = MutableRecord(
child_id=child_id,
scope_prefix=scope_prefix,
pid=pid,
started_at=t,
last_known_phase=phase,
)
[docs]
def record_heartbeat(self, child_id: str) -> None:
"""Record a heartbeat for a child (advances last_heartbeat_at only)."""
rec = self._records.get(child_id)
if rec is None:
return
rec.last_heartbeat_at = self._now()
[docs]
def record_progress(self, child_id: str, *, phase: str | None = None) -> None:
"""Record progress for a child (advances both progress and heartbeat)."""
rec = self._records.get(child_id)
if rec is None:
return
t = self._now()
rec.last_progress_at = t
rec.last_heartbeat_at = t
if phase is not None:
rec.last_known_phase = phase
[docs]
def record_terminal_ack(self, child_id: str, *, terminal_state: str = "complete") -> None:
"""Record that a child has terminated."""
rec = self._records.get(child_id)
if rec is None:
return
t = self._now()
rec.last_ack_at = t
rec.terminal_state = terminal_state
[docs]
def has_records(self, scope_prefix: str) -> bool:
"""Return True when any record currently matches the given scope prefix."""
return any(rec.scope_prefix.startswith(scope_prefix) for rec in self._records.values())
[docs]
def snapshot(self, scope_prefix: str) -> ChildActivitySnapshot:
"""Return an aggregated freshness snapshot for all children matching scope_prefix."""
now = self._now()
self.prune_stale(now)
active_count = 0
terminal_count = 0
has_process = False
has_fresh_label = False
has_fresh_heartbeat = False
has_fresh_progress = False
oldest_live_child_seconds: float | None = None
for rec in self._records.values():
if not rec.scope_prefix.startswith(scope_prefix):
continue
if rec.terminal_state is not None:
# Terminal record: count it but exclude from active evidence
# unless still inside reconcile window
terminal_count += 1
if rec.last_ack_at is not None and (now - rec.last_ack_at) <= self._exit_reconcile:
# Still in reconcile window: counted but not as active
pass
continue
# Active (non-terminal) child
active_count += 1
has_process = True
child_age = now - rec.started_at
if oldest_live_child_seconds is None or child_age > oldest_live_child_seconds:
oldest_live_child_seconds = child_age
# Freshness checks
# has_fresh_label: child was registered within stale_label_ttl,
# OR had a heartbeat within heartbeat_ttl (heartbeat freshness
# overrides stale label to suppress false-positive timeouts during
# transient network/lifecycle events).
label_age = now - rec.started_at
child_has_fresh_label = label_age <= self._stale_label_ttl
if not child_has_fresh_label and rec.last_heartbeat_at is not None:
heartbeat_age = now - rec.last_heartbeat_at
if heartbeat_age <= self._heartbeat_ttl:
child_has_fresh_label = True
has_fresh_label = has_fresh_label or child_has_fresh_label
# has_fresh_heartbeat: child had a heartbeat within heartbeat_ttl but NOT
# counted as progress. Distinct from has_fresh_label because has_fresh_label
# can also be True due to recent registration (within stale_label_ttl).
if rec.last_heartbeat_at is not None:
heartbeat_age = now - rec.last_heartbeat_at
if heartbeat_age <= self._heartbeat_ttl:
has_fresh_heartbeat = True
# has_fresh_progress: child produced a progress signal within progress_ttl
if rec.last_progress_at is not None:
progress_age = now - rec.last_progress_at
if progress_age <= self._progress_ttl:
has_fresh_progress = True
return ChildActivitySnapshot(
scope_prefix=scope_prefix,
has_process=has_process,
has_fresh_label=has_fresh_label,
has_fresh_heartbeat=has_fresh_heartbeat,
has_fresh_progress=has_fresh_progress,
oldest_live_child_seconds=oldest_live_child_seconds,
active_count=active_count,
terminal_count=terminal_count,
)
[docs]
def prune_stale(self, now: float | None = None) -> int:
"""Remove records whose evidence is fully stale.
A record is pruned when:
- It has a terminal state AND the ack is outside the exit_reconcile window, OR
- It has no terminal state AND no progress ever, AND its label age > stale_label_ttl, OR
- It has no terminal state AND its last progress is older than progress_ttl.
Returns:
Number of records pruned.
"""
t = now if now is not None else self._now()
to_prune: list[str] = []
for child_id, rec in self._records.items():
if rec.terminal_state is not None:
if rec.last_ack_at is not None and (t - rec.last_ack_at) > self._exit_reconcile:
to_prune.append(child_id)
continue
# Non-terminal: prune if progress is stale (or never happened and label is old)
heartbeat_fresh = False
if rec.last_heartbeat_at is not None:
heartbeat_fresh = (t - rec.last_heartbeat_at) <= self._heartbeat_ttl
if rec.last_progress_at is not None:
progress_stale = (t - rec.last_progress_at) > self._progress_ttl
if progress_stale and not heartbeat_fresh:
to_prune.append(child_id)
else:
label_age = t - rec.started_at
if label_age > self._stale_label_ttl and not heartbeat_fresh:
to_prune.append(child_id)
for child_id in to_prune:
del self._records[child_id]
return len(to_prune)