Source code for ralph.testing.fake_agent_executor
"""In-process fake executor for unit-testing parallel pipeline logic.
Provides ``FakeAgentExecutor`` and ``FakeRun``. Seed a ``FakeAgentExecutor`` with a
mapping of ``unit_id`` to ``FakeRun`` instances; the executor replays the seeded
output lines and exit code, emitting the correct ``WorkerStatus`` transitions, without
spawning any subprocess or real agent process.
"""
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
from ralph.agents.executor import ExecutorError, WorkerResult
from ralph.pipeline.worker_state import WorkerStatus
from ralph.testing.fake_run import FakeRun
if TYPE_CHECKING:
from collections.abc import Callable
from ralph.pipeline.work_units import WorkUnit
[docs]
class FakeAgentExecutor:
"""In-process agent executor that replays seeded FakeRun scripts without subprocesses."""
def __init__(self, runs: dict[str, FakeRun]) -> None:
self._runs = runs
self.calls: list[WorkUnit] = []
self.outputs_emitted: dict[str, list[str]] = {}
self.statuses_emitted: dict[str, list[WorkerStatus]] = {}
async def run(
self,
unit: WorkUnit,
*,
on_output: Callable[[str], None],
on_status: Callable[[WorkerStatus], None],
) -> WorkerResult:
self.calls.append(unit)
seed = self._runs.get(unit.unit_id)
if seed is None:
raise ExecutorError(f"No FakeRun seeded for unit_id={unit.unit_id!r}")
if seed.raise_on_start is not None:
raise seed.raise_on_start
emitted_outputs: list[str] = []
emitted_statuses: list[WorkerStatus] = []
on_status(WorkerStatus.RUNNING)
emitted_statuses.append(WorkerStatus.RUNNING)
if seed.side_effect is not None:
seed.side_effect()
for line in seed.outputs:
on_output(line)
emitted_outputs.append(line)
await asyncio.sleep(0)
final_status = WorkerStatus.SUCCEEDED if seed.exit_code == 0 else WorkerStatus.FAILED
on_status(final_status)
emitted_statuses.append(final_status)
self.outputs_emitted[unit.unit_id] = emitted_outputs
self.statuses_emitted[unit.unit_id] = emitted_statuses
final_message = seed.outputs[-1] if seed.outputs else ""
return WorkerResult(
unit_id=unit.unit_id,
exit_code=seed.exit_code,
final_message=final_message,
duration_ms=seed.duration_ms,
)
__all__ = ["FakeAgentExecutor", "FakeRun"]