Source code for ralph.workspace.memory

"""In-memory workspace for testing.

This module provides the MemoryWorkspace implementation that
stores file contents in memory for test isolation.
"""

from __future__ import annotations

from pathlib import PurePosixPath


[docs] class MemoryWorkspace: """In-memory workspace for test isolation. This workspace stores all file contents in a dictionary, making it suitable for unit testing without filesystem operations. All paths are normalized to POSIX-style relative paths. """ def __init__(self, root: str = "/workspace") -> None: """Initialize empty in-memory workspace.""" self._storage: dict[str, str] = {} self._dirs: set[str] = {""} # Root is always present self._root = PurePosixPath(root) def _normalize(self, path: str) -> str: """Normalize path to POSIX-style relative path. Args: path: Input path. Returns: Normalized path. """ normalized = str(PurePosixPath(path)) return "" if normalized == "." else normalized def _ensure_parent(self, path: str) -> None: """Ensure parent directory exists. Args: path: File path. """ p = PurePosixPath(path) if p.parent == PurePosixPath("."): return for parent in reversed(p.parents[:-1]): normalized_parent = self._normalize(str(parent)) if normalized_parent: self._dirs.add(normalized_parent)
[docs] def read(self, path: str) -> str: """Read file contents. Args: path: Relative path to the file. Returns: File contents as string. Raises: FileNotFoundError: If file doesn't exist. """ normalized = self._normalize(path) if normalized not in self._storage: msg = f"File not found: {path}" raise FileNotFoundError(msg) return self._storage[normalized]
[docs] def write(self, path: str, content: str) -> None: """Write content to file. Args: path: Relative path to the file. content: Content to write. """ normalized = self._normalize(path) self._ensure_parent(normalized) self._storage[normalized] = content
[docs] def append(self, path: str, content: str) -> None: """Append content to file. Args: path: Relative path to the file. content: Content to append. """ normalized = self._normalize(path) self._ensure_parent(normalized) if normalized in self._storage: self._storage[normalized] += content else: self._storage[normalized] = content
[docs] def exists(self, path: str) -> bool: """Check if file exists. Args: path: Relative path to check. Returns: True if file exists. """ return self._normalize(path) in self._storage
[docs] def remove(self, path: str) -> None: """Remove a file. Args: path: Relative path to the file. """ normalized = self._normalize(path) self._storage.pop(normalized, None)
[docs] def list_dir(self, path: str) -> list[str]: """List directory contents. Args: path: Relative path to the directory. Returns: List of file/directory names. """ normalized = self._normalize(path) if normalized and not normalized.endswith("/"): normalized += "/" result: list[str] = [] for key in self._storage: if key == normalized: continue if normalized == "" or key.startswith(normalized): remainder = key[len(normalized) :] first_part = remainder.split("/")[0] if first_part and first_part not in result: result.append(first_part) # Also check directories for d in self._dirs: if normalized == "" or d.startswith(normalized): if d == normalized: continue if normalized == "" or d.startswith(normalized): remainder = d[len(normalized) :].lstrip("/") first_part = remainder.split("/")[0] if first_part and first_part not in result: result.append(first_part) return sorted(result)
[docs] def is_dir(self, path: str) -> bool: """Check if path is a directory. Args: path: Relative path to check. Returns: True if path is a directory. """ normalized = self._normalize(path) return normalized in self._dirs
[docs] def is_file(self, path: str) -> bool: """Check if path is a file. Args: path: Relative path to check. Returns: True if path is a file. """ return self.exists(path)
[docs] def clear(self) -> None: """Clear all stored contents.""" self._storage.clear() self._dirs = {""}
[docs] def create_dir(self, path: str) -> None: """Create a directory. Args: path: Relative path to the directory. """ normalized = self._normalize(path) if normalized: self._dirs.add(normalized)
[docs] def absolute_path(self, path: str) -> str: """Return an absolute-like path string for the workspace.""" return str(self._root / self._normalize(path))
[docs] def read_lines( self, path: str, *, start: int | None = None, end: int | None = None, head: int | None = None, tail: int | None = None, ) -> tuple[str, dict[str, object]]: """Read lines from a file with slicing support. Args: path: Relative path to the file. start: 1-based line number to start from (inclusive). end: 1-based line number to end at (inclusive). head: Return only the first N lines. tail: Return only the last N lines. Returns: Tuple of (text content, metadata dict) where metadata has total_lines, returned_lines, truncated keys. Raises: ValueError: If conflicting params are supplied. FileNotFoundError: If file doesn't exist. """ # Count how many mode groups are specified has_range = (start is not None) or (end is not None) has_head = head is not None has_tail = tail is not None mode_count = sum(1 for m in (has_range, has_head, has_tail) if m) if mode_count > 1: raise ValueError("Only one of (start/end range), head, or tail may be specified") content = self.read(path) all_lines = content.splitlines(keepends=True) total_lines = len(all_lines) returned_lines: list[str] truncated = False if head is not None: returned_lines = all_lines[:head] if total_lines > head: truncated = True elif tail is not None: returned_lines = all_lines[-tail:] if total_lines > tail: truncated = True elif start is not None or end is not None: start_idx = (start - 1) if start is not None else 0 end_idx = end if end is not None else total_lines start_idx = max(0, start_idx) end_idx = min(total_lines, end_idx) returned_lines = all_lines[start_idx:end_idx] if end_idx < total_lines: truncated = True else: returned_lines = all_lines return "".join(returned_lines), { "total_lines": total_lines, "returned_lines": len(returned_lines), "truncated": truncated, }
[docs] def read_bytes( self, path: str, *, offset: int = 0, limit: int | None = None, ) -> tuple[str, dict[str, object]]: """Read a byte window from a file, decoded as UTF-8.""" normalized = self._normalize(path) if normalized not in self._storage: raise FileNotFoundError(f"File not found: {path}") raw = self._storage[normalized].encode("utf-8") total_bytes = len(raw) sliced = raw[offset : offset + limit] if limit is not None else raw[offset:] returned_bytes = len(sliced) truncated = (offset + returned_bytes) < total_bytes text = sliced.decode("utf-8") return text, { "total_bytes": total_bytes, "returned_bytes": returned_bytes, "truncated": truncated, }
[docs] def stat(self, path: str) -> dict[str, object]: """Get file metadata/stat data. Args: path: Relative path to the file. Returns: Dict with type ('file'|'dir'|'missing'), size_bytes, created_unix, modified_unix, mode. """ normalized = self._normalize(path) if normalized in self._dirs: return { "type": "dir", "size_bytes": 0, "created_unix": None, "modified_unix": None, "mode": None, } if normalized in self._storage: return { "type": "file", "size_bytes": len(self._storage[normalized]), "created_unix": None, "modified_unix": None, "mode": None, } return {"type": "missing"}
[docs] def mkdirs(self, path: str) -> None: """Create a directory and all parent directories. Args: path: Relative path to the directory to create. """ normalized = self._normalize(path) if normalized: parts = normalized.split("/") for i in range(len(parts)): self._dirs.add("/".join(parts[: i + 1]))
[docs] def move(self, src: str, dest: str, *, overwrite: bool = False) -> None: """Move a file or directory. Args: src: Source path. dest: Destination path. overwrite: Whether to overwrite existing destination. Raises: FileExistsError: If dest exists and overwrite is False. """ src_norm = self._normalize(src) dest_norm = self._normalize(dest) if (dest_norm in self._storage or dest_norm in self._dirs) and not overwrite: raise FileExistsError(f"Destination '{dest}' already exists") if src_norm in self._storage: self._storage[dest_norm] = self._storage.pop(src_norm) elif src_norm in self._dirs: self._dirs.discard(src_norm) self._dirs.add(dest_norm) else: raise FileNotFoundError(f"Source '{src}' not found")
[docs] def copy(self, src: str, dest: str, *, overwrite: bool = False) -> None: """Copy a file or directory. Args: src: Source path. dest: Destination path. overwrite: Whether to overwrite existing destination. Raises: FileExistsError: If dest exists and overwrite is False. """ src_norm = self._normalize(src) dest_norm = self._normalize(dest) if (dest_norm in self._storage or dest_norm in self._dirs) and not overwrite: raise FileExistsError(f"Destination '{dest}' already exists") if src_norm in self._storage: self._storage[dest_norm] = self._storage[src_norm] elif src_norm in self._dirs: self._dirs.add(dest_norm) else: raise FileNotFoundError(f"Source '{src}' not found")
[docs] def delete(self, path: str, *, recursive: bool = False) -> None: """Delete a file or directory. Args: path: Relative path to delete. recursive: If True, delete directories recursively. Raises: IsADirectoryError: If path is a directory and recursive is False. """ normalized = self._normalize(path) if normalized in self._dirs: if not recursive: raise IsADirectoryError(f"Path '{path}' is a directory, use recursive=True") self._dirs.discard(normalized) self._storage.pop(normalized, None) elif normalized in self._storage: del self._storage[normalized] else: raise FileNotFoundError(f"Path '{path}' not found")
[docs] def allowed_roots(self) -> list[str]: """Return the list of allowed workspace root paths. Returns: List of string paths from configured allowed roots. """ return [str(self._root)]
[docs] def iter_files(self, base: str) -> tuple[str, ...]: """Iterate over file paths under a base directory. Args: base: Base directory path to search under. Yields: File paths relative to workspace root, honoring skip patterns. """ normalized = self._normalize(base) if normalized and not normalized.endswith("/"): normalized += "/" skip_names = frozenset( { ".git", ".hg", ".mypy_cache", ".pytest_cache", ".ruff_cache", ".svn", ".venv", "__pycache__", "node_modules", "target", } ) results: list[str] = [] for key in self._storage: if normalized == "" or key.startswith(normalized): remainder = key[len(normalized) :] parts = remainder.split("/") if len(parts) == 1 or parts[0] not in skip_names: results.append(key) return tuple(results)