Source code for ralph.git.executor

"""GitExecutor — serialized gate for GitPython operations.

GitPython is not thread-safe when multiple threads share a single .git/ directory.
GitExecutor wraps all repo-mutating operations in a single-threaded executor,
ensuring only one git operation runs at a time regardless of how many asyncio
tasks or threads call it concurrently.
"""

import asyncio
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor
from typing import TypeVar

T = TypeVar("T")


[docs] class GitExecutor: """Serialized gate for GitPython operations. All calls to `run()` are executed sequentially in a single background thread (max_workers=1), preventing concurrent GitPython access. Attributes: _executor: Single-threaded executor that serializes all git ops. """ def __init__(self) -> None: self._executor = ThreadPoolExecutor(max_workers=1)
[docs] def run(self, op: Callable[[], T]) -> T: """Run a git operation synchronously, serialized with all other ops. Args: op: Zero-argument callable returning T. Returns: Result of op(). Raises: Any exception raised by op(). """ future = self._executor.submit(op) return future.result()
[docs] async def arun(self, op: Callable[[], T]) -> T: """Run a git operation asynchronously without blocking the event loop. Uses loop.run_in_executor which runs op in the thread pool. Combined with max_workers=1, this ensures serialization even from coroutines. Args: op: Zero-argument callable returning T. Returns: Awaitable result of op(). """ loop = asyncio.get_running_loop() future = loop.run_in_executor(self._executor, op) return await future
def shutdown(self, wait: bool = True) -> None: self._executor.shutdown(wait=wait)