Source code for ralph.logging_worker_sink
"""Per-worker log sink helpers for Ralph Workflow."""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING, cast
from loguru import logger
if TYPE_CHECKING:
from collections.abc import Mapping
from pathlib import Path
[docs]
@dataclass(frozen=True)
class WorkerSinkHandle:
"""Handle returned by ``bind_worker_sink`` to identify a per-worker loguru sink."""
sink_id: int
log_path: Path
[docs]
def bind_worker_sink(
unit_id: str,
log_dir: Path,
run_id: str = "default",
) -> WorkerSinkHandle:
"""Add a per-worker loguru sink that filters to ``unit_id`` and returns its handle."""
worker_log_dir = log_dir / run_id / "workers"
worker_log_dir.mkdir(parents=True, exist_ok=True)
log_path = worker_log_dir / f"unit-{unit_id}.log"
def worker_filter(record: object) -> bool:
record_mapping = cast("Mapping[str, object]", record)
extra = cast("Mapping[str, object]", record_mapping["extra"])
unit_id_value = extra.get("unit_id")
return isinstance(unit_id_value, str) and unit_id_value == unit_id
sink_id = logger.add(log_path, filter=worker_filter, format="{time} {level} {message}")
return WorkerSinkHandle(sink_id=sink_id, log_path=log_path)
[docs]
def remove_worker_sink(handle: WorkerSinkHandle) -> None:
"""Remove the per-worker loguru sink identified by ``handle``."""
logger.remove(handle.sink_id)
__all__ = ["WorkerSinkHandle", "bind_worker_sink", "remove_worker_sink"]