Source code for ralph.prompts._policy_flag_set

"""PolicyFlagSet — set of Ralph policy flags."""

from __future__ import annotations

from typing import TYPE_CHECKING

from ralph.mcp.protocol.capability_mapping import SessionDrain
from ralph.prompts._policy_flag import PolicyFlag

if TYPE_CHECKING:
    from collections.abc import Iterable, Iterator


[docs] class PolicyFlagSet: """Set of Ralph policy flags.""" def __init__(self, values: Iterable[PolicyFlag] | None = None) -> None: self._values = frozenset(values or ()) def contains(self, flag: PolicyFlag) -> bool: return flag in self._values def insert(self, flag: PolicyFlag) -> None: self._values = frozenset((*self._values, flag)) def __iter__(self) -> Iterator[PolicyFlag]: return iter(self._values) def iter(self) -> Iterable[PolicyFlag]: return iter(self._values) def to_vec(self) -> tuple[PolicyFlag, ...]: return tuple(self._values) @classmethod def defaults_for_drain(cls, drain: SessionDrain) -> PolicyFlagSet: return cls(DEFAULT_POLICY_FLAGS.get(drain, ())) @classmethod def from_identifiers(cls, identifiers: Iterable[str] | None) -> PolicyFlagSet: if not identifiers: return cls() values: list[PolicyFlag] = [] for identifier in identifiers: try: values.append(PolicyFlag(identifier)) except ValueError: continue return cls(values)
DEFAULT_POLICY_FLAGS: dict[SessionDrain, tuple[PolicyFlag, ...]] = { SessionDrain.PLANNING: (PolicyFlag.NO_EDIT,), SessionDrain.ANALYSIS: (PolicyFlag.NO_EDIT,), SessionDrain.DEVELOPMENT_ANALYSIS: (PolicyFlag.NO_EDIT,), SessionDrain.REVIEW_ANALYSIS: (PolicyFlag.NO_EDIT,), SessionDrain.REVIEW: (PolicyFlag.NO_EDIT,), SessionDrain.DEVELOPMENT: (PolicyFlag.ALLOW_SHELL,), SessionDrain.FIX: (PolicyFlag.ALLOW_SHELL,), SessionDrain.COMMIT: (PolicyFlag.ALLOW_GIT_WRITE,), } __all__ = ["DEFAULT_POLICY_FLAGS", "PolicyFlagSet"]