Source code for ralph.agents.invoke._workspace

"""Workspace monitoring for file changes during agent execution."""

from __future__ import annotations

import importlib
from typing import TYPE_CHECKING, Protocol, cast

from loguru import logger

from ralph.agents.invoke._has_src_path import _HasSrcPath

if TYPE_CHECKING:
    from pathlib import Path

if TYPE_CHECKING:

    class _HasStop(Protocol):
        """Protocol for watchdog Observer-like objects that have a stop method."""

        def stop(self) -> None: ...
        def join(self, _timeout: float | None = None) -> None: ...

    class _ObserverProtocol(_HasStop, Protocol):
        """Protocol for watchdog Observer-like objects used by this module."""

        def schedule(self, _event_handler: object, path: str, **_kwargs: object) -> None: ...
        def start(self) -> None: ...

    class _WatchdogObserversModule(Protocol):
        """Typed accessor for the optional watchdog.observers module."""

        Observer: type[_ObserverProtocol]


_MAX_WORKSPACE_CHANGED_FILES = 512


def _make_change_tracker(monitor: WorkspaceMonitor) -> object:
    class _ChangeTrackerHandler:
        def dispatch(self, event: object) -> None:
            self.on_any_event(event)

        def on_any_event(self, event: object) -> None:
            if isinstance(event, _HasSrcPath):
                monitor.record_event(event.src_path)

    return _ChangeTrackerHandler()


def _create_watchdog_observer() -> _ObserverProtocol | None:
    """Construct a watchdog observer when the optional dependency is installed."""
    try:
        observers_module = cast(
            "_WatchdogObserversModule",
            importlib.import_module("watchdog.observers"),
        )
    except ImportError:
        return None
    return observers_module.Observer()


[docs] class WorkspaceMonitor: """Monitors workspace directory for file changes during agent execution. This allows the pipeline to detect when an agent has completed significant work by watching for file modifications in the workspace. """ def __init__(self, workspace_path: Path) -> None: """Initialize workspace monitor. Args: workspace_path: Path to the workspace directory to monitor. """ self._workspace = workspace_path self._observer: _HasStop | None = None self._event_count = 0 self._seen_files: dict[str, None] = {}
[docs] def start(self) -> None: """Start monitoring the workspace for file changes.""" observer = _create_watchdog_observer() if observer is None: return handler = _make_change_tracker(self) self._observer = observer self._observer.schedule(handler, str(self._workspace), recursive=True) self._observer.start() logger.debug("Started workspace monitoring: {}", self._workspace)
[docs] def record_event(self, src_path: str) -> None: """Record a file change event. Args: src_path: Path to the changed file. """ self._seen_files.pop(src_path, None) self._seen_files[src_path] = None while len(self._seen_files) > _MAX_WORKSPACE_CHANGED_FILES: oldest = next(iter(self._seen_files)) del self._seen_files[oldest] self._event_count += 1
[docs] def stop(self) -> None: """Stop monitoring the workspace.""" if self._observer is not None: self._observer.stop() self._observer.join(5) self._observer = None logger.debug( "Stopped workspace monitoring: {} ({} events)", self._workspace, self._event_count, )
@property def event_count(self) -> int: """Number of file change events detected.""" return self._event_count @property def changed_files(self) -> set[str]: """Set of file paths that changed during monitoring.""" return set(self._seen_files)