Source code for ralph.pipeline.parallel.worker_runtime

"""Early worker-runtime seam for dedicated parallel worker execution."""

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING

from loguru import logger

from ralph.agents.invoke import AgentInvocationError, invoke_agent
from ralph.agents.registry import AgentRegistry
from ralph.config.loader import load_config
from ralph.pipeline.agent_execution_deps import AgentExecutionDeps
from ralph.pipeline.checkpoint import worker_checkpoint_path
from ralph.pipeline.effect_router import determine_effect_from_policy
from ralph.pipeline.effects import InvokeAgentEffect
from ralph.pipeline.events import Event, PipelineEvent
from ralph.pipeline.parallel.worker_manifest import ParallelWorkerManifest
from ralph.pipeline.phase_agent_handler import phase_event_after_agent_run
from ralph.pipeline.prompt_prep import session_capabilities_for_agent_phase
from ralph.pipeline.state_init import create_initial_state
from ralph.policy.loader import load_policy_for_workspace_scope
from ralph.prompts.debug_dump import worker_multimodal_sidecar_path, worker_prompt_dump_path
from ralph.prompts.materialize import materialize_prompt_for_phase
from ralph.prompts.system_prompt import worker_current_prompt_path, worker_system_prompt_path
from ralph.workspace import FsWorkspace
from ralph.workspace.scope import WorkspaceScope

if TYPE_CHECKING:
    from ralph.config.models import UnifiedConfig
    from ralph.display.context import DisplayContext
    from ralph.pipeline.state import PipelineState
    from ralph.policy.models import PolicyBundle


[docs] @dataclass(frozen=True) class WorkerRuntimePaths: """Worker-local filesystem paths for prompt and checkpoint runtime state.""" checkpoint_path: Path current_prompt_path: Path prompt_dump_path: Path system_prompt_path: Path multimodal_sidecar_path: Path
[docs] def build_worker_runtime_paths( *, workspace_root: Path, worker_namespace: Path, phase: str, ) -> WorkerRuntimePaths: """Return the namespaced runtime paths a parallel worker should own.""" del workspace_root return WorkerRuntimePaths( checkpoint_path=worker_checkpoint_path(worker_namespace), current_prompt_path=worker_current_prompt_path(worker_namespace), prompt_dump_path=worker_prompt_dump_path(worker_namespace, phase), system_prompt_path=worker_system_prompt_path(worker_namespace, phase), multimodal_sidecar_path=worker_multimodal_sidecar_path(worker_namespace, phase), )
def _state_for_worker_manifest( manifest: ParallelWorkerManifest, *, config: UnifiedConfig, policy_bundle: PolicyBundle, ) -> PipelineState: initial_state = create_initial_state( config, agents_policy=policy_bundle.agents, pipeline_policy=policy_bundle.pipeline, ) unit = manifest.to_work_unit() return initial_state.copy_with( phase=manifest.phase, current_drain=manifest.drain, work_units=(unit,), )
[docs] def run_parallel_worker_from_manifest( *, manifest_path: Path, display_context: DisplayContext ) -> int: """Execute one manifest-backed worker flow without entering the shared run loop.""" manifest = ParallelWorkerManifest.load(manifest_path) workspace_root = Path(manifest.workspace_root) worker_namespace = Path(manifest.worker_namespace) workspace_scope = WorkspaceScope.for_same_workspace_worker( repo_root=workspace_root, allowed_directories=tuple(manifest.allowed_directories), worker_namespace=worker_namespace, ) config = load_config( Path(manifest.config_path) if manifest.config_path is not None else None, manifest.cli_overrides, workspace_scope=workspace_scope, ) policy_bundle = load_policy_for_workspace_scope(workspace_scope, config=config) state = _state_for_worker_manifest(manifest, config=config, policy_bundle=policy_bundle) effect = determine_effect_from_policy(state, policy_bundle, workspace_scope, config=config) if not isinstance(effect, InvokeAgentEffect): logger.error( "Parallel worker manifest resolved unsupported effect for phase={} effect={}", manifest.phase, type(effect).__name__, ) return 1 workspace = FsWorkspace(workspace_scope.root, allowed_roots=workspace_scope.allowed_roots) agent = AgentRegistry.from_config(config).get(effect.agent_name) prompt_path = materialize_prompt_for_phase( phase=manifest.phase, workspace=workspace, pipeline_policy=policy_bundle.pipeline, session_caps=session_capabilities_for_agent_phase( manifest.drain, agent=agent, agents_policy=policy_bundle.agents, ), workspace_root=workspace_root, artifacts_policy=policy_bundle.artifacts, worker_namespace=worker_namespace, work_unit=manifest.to_work_unit(), ) rendered_prompt = workspace.read(prompt_path) resolved_prompt_path = Path(manifest.prompt_file) resolved_prompt_path.parent.mkdir(parents=True, exist_ok=True) resolved_prompt_path.write_text(rendered_prompt, encoding="utf-8") worker_effect = InvokeAgentEffect( agent_name=effect.agent_name, phase=effect.phase, prompt_file=str(resolved_prompt_path), drain=effect.drain, chain_name=effect.chain_name, ) deps = AgentExecutionDeps( invoke_agent=invoke_agent, agent_invocation_error=AgentInvocationError, agent_registry=AgentRegistry, ) event: Event = execute_agent_effect( worker_effect, config, deps, workspace_scope, display_context=display_context, state=state, policy_bundle=policy_bundle, worker_namespace=worker_namespace, worker_artifact_dir=Path(manifest.worker_artifact_dir), parallel_worker=True, ) if event == PipelineEvent.AGENT_SUCCESS: event = phase_event_after_agent_run( effect=worker_effect, config=config, policy_bundle=policy_bundle, workspace=workspace, workspace_scope=workspace_scope, display_context=display_context, state=state, ) return 0 if event == PipelineEvent.AGENT_SUCCESS else 1
def execute_agent_effect( effect: InvokeAgentEffect, config: UnifiedConfig, deps: AgentExecutionDeps, workspace_scope: WorkspaceScope, **opts: object, ) -> PipelineEvent: """Delegate worker-mode agent execution through the standard runner seam.""" from ralph.pipeline.runner import execute_agent_effect as runner_execute_agent_effect return runner_execute_agent_effect(effect, config, deps, workspace_scope, **opts) __all__ = [ "WorkerRuntimePaths", "build_worker_runtime_paths", "run_parallel_worker_from_manifest", ]