"""Helpers for executing external processes with captured output."""
from __future__ import annotations
import asyncio
import os
import subprocess
from typing import TYPE_CHECKING
from ralph.executor._process_error_details import ProcessErrorDetails
from ralph.executor._process_result import ProcessResult
from ralph.executor._process_run_options import ProcessRunOptions
from ralph.process.manager import ProcessManager, SpawnOptions, get_process_manager
if TYPE_CHECKING:
from collections.abc import Mapping, Sequence
from pathlib import Path
# Standard unix timeout exit code
TIMEOUT_EXIT_CODE = 124
[docs]
class ProcessExecutionError(RuntimeError):
"""Raised when a process cannot be started or exceeds its timeout."""
def __init__(
self,
command: tuple[str, ...],
message: str,
details: ProcessErrorDetails | None = None,
) -> None:
self.command = command
payload = details or ProcessErrorDetails()
self.timed_out = payload.timed_out
self.timeout = payload.timeout
self.stdout = payload.stdout
self.stderr = payload.stderr
super().__init__(message)
[docs]
@classmethod
def from_timeout(
cls,
command: tuple[str, ...],
*,
timeout: float | None,
stdout: str,
stderr: str,
) -> ProcessExecutionError:
"""Build a timeout error with captured partial output."""
executable = command[0]
message = f"Failed to execute '{executable}': timed out"
if timeout is not None:
message = f"{message} after {timeout}s"
return cls(
command,
message,
ProcessErrorDetails(
timed_out=True,
timeout=timeout,
stdout=stdout,
stderr=stderr,
),
)
[docs]
@classmethod
def from_os_error(
cls,
command: tuple[str, ...],
error: OSError,
) -> ProcessExecutionError:
"""Build an execution error from an OS-level failure."""
return cls(command, f"Failed to execute '{command[0]}': {error}")
[docs]
async def run_process_async(
command: str,
args: Sequence[str] = (),
*,
cwd: str | Path | None = None,
env: Mapping[str, str] | None = None,
timeout: float | None = None,
_pm: ProcessManager | None = None,
) -> ProcessResult:
"""Run a process asynchronously and capture its output."""
cmd = _normalize_command(command, args)
pm = _pm if _pm is not None else get_process_manager()
try:
handle = await pm.spawn_async(
cmd,
SpawnOptions(
cwd=_normalize_cwd(cwd),
env=_build_env(env),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
),
)
except OSError as exc:
raise ProcessExecutionError.from_os_error(cmd, exc) from exc
communicate_task: asyncio.Task[tuple[bytes, bytes]]
communicate_task = asyncio.create_task(handle.communicate())
done, _pending = await asyncio.wait({communicate_task}, timeout=timeout)
if communicate_task not in done:
await handle.terminate(grace_period_s=0)
stdout_bytes, stderr_bytes = await communicate_task
# Return exit code TIMEOUT_EXIT_CODE on timeout (standard unix timeout exit code)
# instead of raising an exception, so callers can handle it gracefully
return ProcessResult(
command=cmd,
returncode=TIMEOUT_EXIT_CODE,
stdout=_decode_output(stdout_bytes),
stderr=_decode_output(stderr_bytes),
)
stdout_bytes, stderr_bytes = communicate_task.result()
rc = handle.returncode if handle.returncode is not None else -1
return ProcessResult(
command=cmd,
returncode=rc,
stdout=_decode_output(stdout_bytes),
stderr=_decode_output(stderr_bytes),
)
[docs]
def run_process(
command: str,
args: Sequence[str] = (),
*,
options: ProcessRunOptions | None = None,
_pm: ProcessManager | None = None,
) -> ProcessResult:
"""Run a process synchronously, optionally capturing output.
When ``options.capture_output`` is ``False`` the child process inherits the
parent's stdout/stderr so output streams directly to the terminal. The
returned ``ProcessResult`` will have empty ``stdout`` and ``stderr`` strings.
"""
effective_options = options or ProcessRunOptions()
cmd = _normalize_command(command, args)
pm = _pm if _pm is not None else get_process_manager()
pipe = subprocess.PIPE if effective_options.capture_output else None
try:
handle = pm.spawn(
cmd,
SpawnOptions(
cwd=_normalize_cwd(effective_options.cwd),
env=_build_env(effective_options.env),
stdout=pipe,
stderr=pipe,
),
)
except OSError as exc:
raise ProcessExecutionError.from_os_error(cmd, exc) from exc
try:
stdout_bytes, stderr_bytes = handle.communicate(timeout=effective_options.timeout)
except subprocess.TimeoutExpired:
handle.terminate(grace_period_s=0)
stdout_bytes, stderr_bytes = handle.communicate()
# Return exit code TIMEOUT_EXIT_CODE on timeout (standard unix timeout exit code)
# instead of raising an exception, so callers can handle it gracefully
return ProcessResult(
command=cmd,
returncode=TIMEOUT_EXIT_CODE,
stdout=_decode_output(stdout_bytes),
stderr=_decode_output(stderr_bytes),
)
rc = handle.returncode if handle.returncode is not None else -1
return ProcessResult(
command=cmd,
returncode=rc,
stdout=_decode_output(stdout_bytes),
stderr=_decode_output(stderr_bytes),
)
def _normalize_command(command: str, args: Sequence[str]) -> tuple[str, ...]:
return (command, *args)
def _normalize_cwd(cwd: str | Path | None) -> str | None:
if cwd is None:
return None
return str(cwd)
def _build_env(env: Mapping[str, str] | None) -> dict[str, str]:
merged = dict(os.environ)
if env is not None:
merged.update(env)
return merged
def _decode_output(output: str | bytes | None) -> str:
if output is None:
return ""
if isinstance(output, bytes):
return output.decode("utf-8", errors="replace")
return output
__all__ = [
"ProcessExecutionError",
"ProcessResult",
"ProcessRunOptions",
"run_process",
"run_process_async",
]