Source code for ralph.pipeline.parallel.worker_session
"""Factory helpers for building per-worker MCP session bundles.
Provides ``build_worker_session``, which constructs an ``AgentSession``,
starts an MCP server for it via ``McpServerFactory``, and returns a
``WorkerSessionBundle`` containing the session, its server handle, and the
workspace scope that the worker should operate in.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING
from uuid import uuid4
from ralph.mcp.multimodal.capabilities import (
UNKNOWN_IDENTITY,
MultimodalModelIdentity,
ResolvedCapabilityProfile,
)
from ralph.mcp.protocol.session import AgentSession
from ralph.pipeline.parallel.worker_session_bundle import WorkerSessionBundle
if TYPE_CHECKING:
from pathlib import Path
from ralph.mcp.server.factory import McpServerFactory
from ralph.pipeline.work_units import WorkUnit
from ralph.workspace.scope import WorkspaceScope
[docs]
@dataclass(frozen=True)
class WorkerSessionConfig:
"""Optional session contract parameters for a parallel worker session."""
worker_artifact_dir: Path | None = None
worker_namespace: Path | None = None
session_drain: str = ""
session_capabilities: frozenset[str] = frozenset()
session_model_identity: MultimodalModelIdentity | None = None
session_capability_profile: ResolvedCapabilityProfile | None = None
[docs]
def build_worker_session(
unit: WorkUnit,
mcp_factory: McpServerFactory,
workspace_scope: WorkspaceScope,
config: WorkerSessionConfig | None = None,
) -> WorkerSessionBundle:
"""Create an AgentSession, start an MCP server, and return the worker bundle.
Pass a ``WorkerSessionConfig`` to propagate session contract parameters
(drain, capabilities, model identity, capability profile) from the parent
phase's ``SessionMcpPlan`` so the worker exposes the same multimodal
capability surface as serial execution.
"""
cfg = config if config is not None else WorkerSessionConfig()
session_id = f"dev-{unit.unit_id}-{uuid4().hex[:8]}"
session = AgentSession(
session_id=session_id,
run_id=unit.unit_id,
drain=cfg.session_drain,
capabilities=set(cfg.session_capabilities),
parallel_worker=True,
worker_artifact_dir=cfg.worker_artifact_dir,
worker_namespace=cfg.worker_namespace,
model_identity=(
cfg.session_model_identity
if cfg.session_model_identity is not None
else UNKNOWN_IDENTITY
),
stored_capability_profile=cfg.session_capability_profile,
)
mcp_handle = mcp_factory.build(session)
return WorkerSessionBundle(
session=session,
mcp_handle=mcp_handle,
workspace_scope=workspace_scope,
)
__all__ = ["WorkerSessionBundle", "WorkerSessionConfig", "build_worker_session"]