Source code for ralph.checkpoint.builder

"""Builder helpers for Python checkpoint payload extensions."""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import TYPE_CHECKING

from ralph.checkpoint.checkpoint_payload import CheckpointPayload
from ralph.checkpoint.execution_history import ExecutionHistory
from ralph.checkpoint.run_context import RunContext
from ralph.pipeline.progress import derive_run_context_progress

if TYPE_CHECKING:
    from ralph.pipeline.state import PipelineState
    from ralph.policy.models import PipelinePolicy


[docs] @dataclass(frozen=True) class CheckpointBuilder: """Builder for assembling enriched Python checkpoint payloads.""" _state: PipelineState | None = None _run_context: RunContext | None = None _execution_history: ExecutionHistory = field(default_factory=ExecutionHistory.new) _working_dir: str = "" _policy: PipelinePolicy | None = None
[docs] @classmethod def new(cls) -> CheckpointBuilder: """Create a fresh checkpoint builder.""" return cls()
[docs] def pipeline_policy(self, policy: PipelinePolicy) -> CheckpointBuilder: """Attach the pipeline policy for policy-driven progress derivation.""" return type(self)( _state=self._state, _run_context=self._run_context, _execution_history=self._execution_history, _working_dir=self._working_dir, _policy=policy, )
[docs] def state(self, state: PipelineState) -> CheckpointBuilder: """Attach the pipeline state.""" return type(self)( _state=state, _run_context=self._run_context, _execution_history=self._execution_history, _working_dir=self._working_dir, _policy=self._policy, )
[docs] def run_context(self, run_context: RunContext) -> CheckpointBuilder: """Attach run lineage metadata.""" return type(self)( _state=self._state, _run_context=run_context, _execution_history=self._execution_history, _working_dir=self._working_dir, _policy=self._policy, )
[docs] def execution_history(self, execution_history: ExecutionHistory) -> CheckpointBuilder: """Attach bounded execution history.""" return type(self)( _state=self._state, _run_context=self._run_context, _execution_history=execution_history, _working_dir=self._working_dir, _policy=self._policy, )
[docs] def working_dir(self, working_dir: str) -> CheckpointBuilder: """Attach the working directory captured for the checkpoint.""" return type(self)( _state=self._state, _run_context=self._run_context, _execution_history=self._execution_history, _working_dir=working_dir, _policy=self._policy, )
[docs] def build(self) -> CheckpointPayload: """Build the checkpoint payload or raise if required state is missing.""" if self._state is None: raise ValueError("CheckpointBuilder requires pipeline state before build()") run_context = self._run_context or RunContext.new() normalized_context = derive_run_context_progress(self._state, run_context, self._policy) return CheckpointPayload( state=self._state, run_context=normalized_context, execution_history=self._execution_history, working_dir=self._working_dir, )