"""Trackable workflow instance model for orchestration use cases.
Exposes the minimum product-facing information an external orchestrator needs
to monitor a running Ralph Workflow instance: stable identity, lifecycle status,
current pipeline stage, and recent operational activity.
"""
from __future__ import annotations
from dataclasses import FrozenInstanceError
from importlib import import_module
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ralph._supervising_tracker import WorkflowInstanceTracker
from ralph.display.snapshot import PipelineSnapshot
from ralph.instance_status import InstanceStatus
else:
InstanceStatus = import_module("ralph.instance_status").InstanceStatus
__all__ = [
"InstanceStatus",
"WorkflowInstanceTracker",
"WorkflowInstanceView",
"instance_view_from_snapshot",
]
_UNSET_PHASE = "__unset__"
# =============================================================================
# WorkflowInstanceView
# =============================================================================
[docs]
class WorkflowInstanceView:
"""Immutable view of a single Ralph Workflow instance for orchestration.
Attributes:
instance_id: Stable orchestration identity assigned at tracker construction,
or the runtime run_id when projected directly from a snapshot.
For tracker-based supervision, this is always a non-empty str.
run_id: Optional runtime identifier copied from the live pipeline snapshot.
This may be None before startup or when the underlying system does not
assign a runtime identity. It is separate from the stable instance_id
so that a supervising orchestrator can track the same instance across
restarts or reconnects without confusion.
lifecycle_status: Observable lifecycle state of the instance.
current_stage: Active pipeline stage name, or None when no stage is active
(including before startup, after terminal states, and when phase is unset).
recent_activity: Recent operational output, ordered oldest to newest.
"""
_current_stage: str | None
_instance_id: str
_lifecycle_status: InstanceStatus
_recent_activity: tuple[str, ...]
_run_id: str | None
__slots__ = (
"_current_stage",
"_instance_id",
"_lifecycle_status",
"_recent_activity",
"_run_id",
)
def __init__(
self,
instance_id: str,
run_id: str | None,
lifecycle_status: InstanceStatus,
current_stage: str | None,
recent_activity: tuple[str, ...],
) -> None:
object.__setattr__(self, "_instance_id", instance_id)
object.__setattr__(self, "_run_id", run_id)
object.__setattr__(self, "_lifecycle_status", lifecycle_status)
object.__setattr__(self, "_current_stage", current_stage)
object.__setattr__(self, "_recent_activity", recent_activity)
@property
def instance_id(self) -> str:
return self._instance_id
@property
def run_id(self) -> str | None:
return self._run_id
@property
def lifecycle_status(self) -> InstanceStatus:
return self._lifecycle_status
@property
def current_stage(self) -> str | None:
return self._current_stage
@property
def recent_activity(self) -> tuple[str, ...]:
return self._recent_activity
def __setattr__(self, name: str, value: object) -> None:
raise FrozenInstanceError(f"cannot set attribute '{name}'")
def __repr__(self) -> str:
return (
f"WorkflowInstanceView(instance_id={self._instance_id!r}, "
f"run_id={self._run_id!r}, lifecycle_status={self._lifecycle_status!r}, "
f"current_stage={self._current_stage!r}, recent_activity={self._recent_activity!r})"
)
# =============================================================================
# Snapshot projection helpers
# =============================================================================
[docs]
def instance_view_from_snapshot(
snapshot: PipelineSnapshot,
*,
_instance_id_override: str | None = None,
) -> WorkflowInstanceView:
"""Project a PipelineSnapshot into a WorkflowInstanceView.
When called with ``_instance_id_override``, that stable identity is
used and ``snapshot.run_id`` is copied to the view's ``run_id`` field.
This form is used internally by ``WorkflowInstanceTracker`` to preserve
the orchestrator-assigned identity while exposing the runtime ``run_id``
separately.
When called without an identity override (the default), the view's
``instance_id`` is taken directly from ``snapshot.run_id``. This form
is only valid when ``snapshot.run_id`` is not None. If ``snapshot.run_id``
is None and no override is provided, a ``ValueError`` is raised because
the supervising contract requires a stable orchestrator-facing identity.
Args:
snapshot: The pipeline snapshot to project.
_instance_id_override: Stable identity to use instead of snapshot.run_id.
Should be supplied by WorkflowInstanceTracker or when the caller
needs to project a snapshot without a runtime identity.
Raises:
ValueError: If ``snapshot.run_id`` is None and no ``_instance_id_override``
is provided. The supervising contract requires a stable identity.
"""
lifecycle_status = _lifecycle_status(snapshot)
if _instance_id_override is not None:
instance_id = _instance_id_override
elif snapshot.run_id is not None:
instance_id = snapshot.run_id
else:
raise ValueError(
"instance_view_from_snapshot requires either a non-None "
"snapshot.run_id or an explicit _instance_id_override. "
"For tracker-based supervision, use "
"WorkflowInstanceTracker.update_from_snapshot instead."
)
return WorkflowInstanceView(
instance_id=instance_id,
run_id=snapshot.run_id,
lifecycle_status=lifecycle_status,
current_stage=_current_stage(snapshot, lifecycle_status),
recent_activity=_recent_activity(snapshot),
)
def _lifecycle_status(snapshot: PipelineSnapshot) -> InstanceStatus:
if snapshot.is_terminal_success:
return InstanceStatus.COMPLETED
if snapshot.is_terminal_failure or snapshot.interrupted_by_user:
return InstanceStatus.FAILED
if snapshot.run_id is None:
return InstanceStatus.NOT_STARTED
if snapshot.waiting_status_line is not None:
return InstanceStatus.WAITING
return InstanceStatus.ACTIVE
def _current_stage(snapshot: PipelineSnapshot, status: InstanceStatus) -> str | None:
if status in (
InstanceStatus.COMPLETED,
InstanceStatus.FAILED,
InstanceStatus.NOT_STARTED,
):
return None
phase = snapshot.phase
if not phase or phase == _UNSET_PHASE:
return None
return phase
def _recent_activity(snapshot: PipelineSnapshot) -> tuple[str, ...]:
lines: list[str] = []
for phase, decision, reason, timestamp in snapshot.decision_log[-5:]:
parts = [timestamp, phase, decision]
if reason:
parts.append(reason)
lines.append(" | ".join(parts))
if snapshot.waiting_status_line is not None:
lines.append(snapshot.waiting_status_line)
elif snapshot.last_activity_line is not None:
lines.append(snapshot.last_activity_line)
return tuple(lines)
def __getattr__(name: str) -> object:
if name == "WorkflowInstanceTracker":
from ralph._supervising_tracker import WorkflowInstanceTracker
return WorkflowInstanceTracker
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")