Source code for ralph.testing.fake_agent_executor

"""In-process fake executor for unit-testing parallel pipeline logic.

Provides ``FakeAgentExecutor`` and ``FakeRun``. Seed a ``FakeAgentExecutor`` with a
mapping of ``unit_id`` to ``FakeRun`` instances; the executor replays the seeded
output lines and exit code, emitting the correct ``WorkerStatus`` transitions, without
spawning any subprocess or real agent process.
"""

from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING

from ralph.agents.executor import ExecutorError, WorkerResult
from ralph.pipeline.worker_state import WorkerStatus
from ralph.testing.fake_run import FakeRun

if TYPE_CHECKING:
    from collections.abc import Callable

    from ralph.pipeline.work_units import WorkUnit


[docs] class FakeAgentExecutor: """In-process agent executor that replays seeded FakeRun scripts without subprocesses.""" def __init__(self, runs: dict[str, FakeRun]) -> None: self._runs = runs self.calls: list[WorkUnit] = [] self.outputs_emitted: dict[str, list[str]] = {} self.statuses_emitted: dict[str, list[WorkerStatus]] = {} async def run( self, unit: WorkUnit, *, on_output: Callable[[str], None], on_status: Callable[[WorkerStatus], None], ) -> WorkerResult: self.calls.append(unit) seed = self._runs.get(unit.unit_id) if seed is None: raise ExecutorError(f"No FakeRun seeded for unit_id={unit.unit_id!r}") if seed.raise_on_start is not None: raise seed.raise_on_start emitted_outputs: list[str] = [] emitted_statuses: list[WorkerStatus] = [] on_status(WorkerStatus.RUNNING) emitted_statuses.append(WorkerStatus.RUNNING) if seed.side_effect is not None: seed.side_effect() for line in seed.outputs: on_output(line) emitted_outputs.append(line) await asyncio.sleep(0) final_status = WorkerStatus.SUCCEEDED if seed.exit_code == 0 else WorkerStatus.FAILED on_status(final_status) emitted_statuses.append(final_status) self.outputs_emitted[unit.unit_id] = emitted_outputs self.statuses_emitted[unit.unit_id] = emitted_statuses final_message = seed.outputs[-1] if seed.outputs else "" return WorkerResult( unit_id=unit.unit_id, exit_code=seed.exit_code, final_message=final_message, duration_ms=seed.duration_ms, )
__all__ = ["FakeAgentExecutor", "FakeRun"]