Source code for ralph.pipeline.worker_state

"""Worker execution state model for parallel pipeline workers."""

from datetime import datetime

from pydantic import ConfigDict, Field

from ralph.pipeline.worker_status import WorkerStatus
from ralph.pydantic_compat import RalphBaseModel


[docs] class WorkerState(RalphBaseModel): """Immutable snapshot of a single parallel worker's execution state. Attributes: unit_id: Identifier of the work unit this worker is executing. status: Current execution status. started_at: When the worker started execution. finished_at: When the worker finished execution. exit_code: Process exit code, if finished. error_message: Human-readable error description, if failed. worker_namespace: Filesystem path to the worker's per-worker namespace under ``.agent/workers/<unit_id>/`` in the shared checkout. log_file: Path to the worker's log file. """ model_config = ConfigDict(frozen=True, extra="ignore") unit_id: str = Field(..., min_length=1) status: WorkerStatus = WorkerStatus.PENDING started_at: datetime | None = None finished_at: datetime | None = None exit_code: int | None = None error_message: str | None = None worker_namespace: str | None = None log_file: str | None = None
[docs] def copy_with(self, **updates: object) -> "WorkerState": """Return a copy with the given fields replaced.""" return self.model_copy(update=updates)
__all__ = ["WorkerState", "WorkerStatus"]