"""Progress display utilities for Ralph Workflow pipeline.
This module provides a RalphProgress context manager that wraps rich.Progress
for multi-task display, with tqdm fallback for non-TTY environments.
The RalphProgress provides:
- Overall pipeline progress (top-level task)
- Current phase progress (sub-task)
- Current agent progress (leaf task)
When running in a non-TTY environment (e.g., CI, redirected output),
tqdm is used as a fallback to ensure progress is still visible.
Example::
ctx = make_display_context()
with RalphProgress(ctx) as progress:
pipeline_task = progress.add_task("Pipeline", total=100)
with progress.phase(pipeline_task, "Planning"):
phase_task = progress.add_task(pipeline_task, "Agent", total=50)
# ... do work ...
progress.update(phase_task, advance=10)
"""
from __future__ import annotations
import sys
from contextlib import contextmanager
from importlib import import_module
from io import TextIOBase
from typing import TYPE_CHECKING, cast
from ralph.display._progress_singleton import ProgressSingleton
from ralph.display.progress_protocols import TaskID
if TYPE_CHECKING:
from collections.abc import Iterator
from types import ModuleType
from ralph.display.context import DisplayContext
from ralph.display.progress_protocols import (
_ColumnFactory,
_ConsoleFactory,
_ConsoleProto,
_GetIPython,
_ProgressFactory,
_ProgressProto,
_TqdmFactory,
_TqdmProto,
)
def _module_attr(module: ModuleType, attribute: str) -> object | None:
namespace = cast("dict[str, object]", module.__dict__)
return namespace.get(attribute)
def load_rich_components() -> (
tuple[
_ConsoleFactory,
_ProgressFactory,
tuple[
_ColumnFactory,
_ColumnFactory,
_ColumnFactory,
_ColumnFactory,
_ColumnFactory,
_ColumnFactory,
_ColumnFactory,
],
]
| None
):
"""Lazily import Rich display factories; returns None when Rich is not installed."""
try:
console_module = import_module("rich.console")
progress_module = import_module("rich.progress")
except ImportError:
return None
console_factory = cast("_ConsoleFactory", _module_attr(console_module, "Console"))
progress_factory = cast("_ProgressFactory", _module_attr(progress_module, "Progress"))
columns = (
cast("_ColumnFactory", _module_attr(progress_module, "SpinnerColumn")),
cast("_ColumnFactory", _module_attr(progress_module, "TextColumn")),
cast("_ColumnFactory", _module_attr(progress_module, "BarColumn")),
cast("_ColumnFactory", _module_attr(progress_module, "TaskProgressColumn")),
cast("_ColumnFactory", _module_attr(progress_module, "MofNCompleteColumn")),
cast("_ColumnFactory", _module_attr(progress_module, "TimeElapsedColumn")),
cast("_ColumnFactory", _module_attr(progress_module, "TimeRemainingColumn")),
)
return console_factory, progress_factory, columns
def load_tqdm_factory() -> _TqdmFactory | None:
"""Lazily import the tqdm progress-bar factory; returns None when tqdm is not installed."""
try:
tqdm_module = import_module("tqdm")
except ImportError:
return None
return cast("_TqdmFactory", _module_attr(tqdm_module, "tqdm"))
def load_get_ipython() -> _GetIPython | None:
"""Lazily import IPython's get_ipython; returns None when IPython is not installed."""
try:
ipython_module = import_module("IPython")
except ImportError:
return None
candidate = _module_attr(ipython_module, "get_ipython")
if candidate is None or not callable(candidate):
return None
return cast("_GetIPython", candidate)
RICH_AVAILABLE = load_rich_components() is not None
GET_IPYTHON = load_get_ipython()
IPYTHON_AVAILABLE = GET_IPYTHON is not None
__all__ = [
"RalphProgress",
"TaskID",
"get_progress",
]
[docs]
class RalphProgress:
"""Multi-task progress display for Ralph Workflow pipeline.
RalphProgress manages a hierarchy of progress tasks:
1. Pipeline-level progress (overall completion)
2. Phase-level progress (current phase advancement)
3. Agent-level progress (agent output lines/events)
Uses rich.Progress when running in a TTY environment,
falls back to tqdm when running in non-TTY (CI, redirected output).
Note:
RalphProgress requires a DisplayContext to be provided. The context
is used to obtain the shared console. Instances are cached by
console identity via _ProgressSingleton.
"""
def __init__(self, context: DisplayContext) -> None:
"""Initialize RalphProgress.
Args:
context: DisplayContext providing the console for progress display.
"""
self._context = context
self._console: _ConsoleProto | None = None
self._progress: _ProgressProto | None = None
self._tqdm: _TqdmProto | None = None
self._is_jupyter = self._check_jupyter()
def _check_jupyter(self) -> bool:
"""Check if running in a Jupyter environment.
Returns:
True if in Jupyter, False otherwise.
"""
if not IPYTHON_AVAILABLE:
return False
try:
return GET_IPYTHON is not None and GET_IPYTHON() is not None
except Exception:
return False
def _is_tty(self) -> bool:
"""Check if output is a TTY.
Returns:
True if stderr is a TTY and rich is available.
"""
if not RICH_AVAILABLE:
return False
stderr = sys.stderr
return isinstance(stderr, TextIOBase) and stderr.isatty()
@contextmanager
def _rich_progress(self) -> Iterator[_ProgressProto]:
"""Provide rich.Progress context manager.
Yields:
Configured rich Progress instance.
"""
rich_components = load_rich_components()
if rich_components is None:
raise RuntimeError("rich is unavailable")
_, progress_factory, columns = rich_components
(
spinner_column,
text_column,
bar_column,
task_progress_column,
mofn_column,
elapsed_column,
remaining_column,
) = columns
console: _ConsoleProto = cast("_ConsoleProto", self._context.console)
progress = progress_factory(
spinner_column(),
text_column("[theme.cat.meta]{task.description}[/theme.cat.meta]"),
bar_column(),
task_progress_column(),
mofn_column(),
elapsed_column(),
remaining_column(),
console=console,
transient=False,
auto_refresh=True,
)
self._console = console
self._progress = progress
with progress:
yield progress
self._progress = None
self._console = None
@contextmanager
def _tqdm_progress(self) -> Iterator[_TqdmProto]:
"""Provide tqdm context manager for non-TTY environments.
Yields:
Configured tqdm instance.
"""
tqdm_factory = load_tqdm_factory()
if tqdm_factory is None:
raise RuntimeError("tqdm is unavailable")
bar = tqdm_factory(
desc="Ralph Workflow",
unit="iter",
file=sys.stderr,
leave=True,
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]",
)
self._tqdm = bar
try:
yield bar
finally:
bar.close()
self._tqdm = None
def __enter__(self) -> RalphProgress:
"""Enter the progress context.
Returns:
Self for use as context manager.
"""
return self
def __exit__(self, *args: object) -> None:
"""Exit the progress context."""
pass
[docs]
@contextmanager
def phase(self, task: TaskID, phase_name: str) -> Iterator[_ProgressProto | None]:
"""Context manager for a phase within a pipeline task.
Args:
task: Parent pipeline task ID.
phase_name: Name of the current phase.
Yields:
Progress for adding phase subtasks.
"""
if self._progress is not None:
self._progress.add_task(
f" {phase_name}",
parent=task,
total=None,
)
try:
yield self._progress
finally:
pass
else:
yield None
[docs]
def add_task(
self,
description: str,
total: int | None = 100,
completed: int = 0,
parent: TaskID | None = None,
) -> TaskID:
"""Add a new progress task.
Args:
description: Task description.
total: Total units of work.
completed: Initial completed units.
parent: Parent task ID for nested tasks.
Returns:
Task ID for use in update/remove.
"""
if self._progress is not None:
return self._progress.add_task(
description,
total=total,
completed=completed,
parent=parent,
)
else:
return TaskID(0)
[docs]
def update(
self,
task: TaskID,
completed: int | None = None,
advance: int = 0,
description: str | None = None,
) -> None:
"""Update a progress task.
Args:
task: Task ID to update.
completed: New completed value (absolute).
advance: Amount to advance (relative).
description: New description.
"""
if self._progress is not None:
self._progress.update(
task_id=task,
completed=completed,
advance=advance if advance else None,
description=description,
)
elif self._tqdm is not None:
if advance:
self._tqdm.update(advance)
if completed is not None:
self._tqdm.n = completed
self._tqdm.refresh()
[docs]
def get_progress(context: DisplayContext) -> RalphProgress:
"""Get the default RalphProgress instance for the given context.
Args:
context: DisplayContext providing the console for progress display.
Different contexts (identified by console identity) yield separate
RalphProgress instances.
Returns:
RalphProgress instance for the given context.
"""
return ProgressSingleton.get(context)
ProgressSingleton._factory = RalphProgress