"""MCP capability mapping for Ralph sessions.
Ports the Rust capability-mapping layer used to translate session drain and
policy outcomes into MCP access-control decisions.
"""
from __future__ import annotations
from enum import Enum, StrEnum
from importlib import import_module
from typing import TYPE_CHECKING, cast
from ralph.mcp.protocol._access_decision import AccessDecision
from ralph.mcp.protocol._access_denied_code import AccessDeniedCode
from ralph.mcp.protocol._access_mode import AccessMode
from ralph.mcp.protocol._drain_class import DrainClass
from ralph.mcp.protocol._mcp_capability import McpCapability
from ralph.mcp.protocol._policy_mode import PolicyMode
from ralph.mcp.protocol._policy_outcome import PolicyOutcome
from ralph.mcp.protocol._policy_outcome_status import PolicyOutcomeStatus
from ralph.mcp.protocol._session_drain import SessionDrain
if TYPE_CHECKING:
from ralph.policy.models import AgentsPolicy
[docs]
class Capability(StrEnum):
"""Internal Ralph capability vocabulary."""
WORKSPACE_READ = "workspace.read"
WORKSPACE_WRITE_EPHEMERAL = "workspace.write_ephemeral"
WORKSPACE_WRITE_TRACKED = "workspace.write_tracked"
WORKSPACE_METADATA_READ = "workspace.metadata_read"
WORKSPACE_EDIT = "workspace.edit"
WORKSPACE_DELETE = "workspace.delete"
PROCESS_EXEC_BOUNDED = "process.exec_bounded"
PROCESS_EXEC_UNBOUNDED = "process.exec_unbounded"
ARTIFACT_SUBMIT = "artifact.submit"
ARTIFACT_PLAN_READ = "artifact.plan_read"
ARTIFACT_PLAN_WRITE = "artifact.plan_write"
RUN_REPORT_PROGRESS = "run.report_progress"
GIT_STATUS_READ = "git.status_read"
GIT_DIFF_READ = "git.diff_read"
GIT_WRITE = "git.write"
ENV_READ = "env.read"
ENV_WRITE = "env.write"
UPSTREAM_TOOL_USE = "upstream.tool_use"
WEB_SEARCH = "web.search"
WEB_VISIT = "web.visit"
MEDIA_READ = "media.read"
ARTIFACT_PLAN_SUBMIT = "artifact.plan_submit"
MCP_TO_RALPH_CAPABILITY_MAP: dict[McpCapability, Capability] = {
McpCapability.FILE_READ: Capability.WORKSPACE_READ,
McpCapability.GIT_READ: Capability.GIT_STATUS_READ,
McpCapability.PROCESS_EXEC: Capability.PROCESS_EXEC_BOUNDED,
McpCapability.ARTIFACT_SUBMIT: Capability.ARTIFACT_SUBMIT,
McpCapability.ARTIFACT_PLAN_READ: Capability.ARTIFACT_PLAN_READ,
McpCapability.ARTIFACT_PLAN_WRITE: Capability.ARTIFACT_PLAN_WRITE,
McpCapability.WORKSPACE_READ: Capability.WORKSPACE_READ,
McpCapability.WORKSPACE_WRITE_EPHEMERAL: Capability.WORKSPACE_WRITE_EPHEMERAL,
McpCapability.WORKSPACE_WRITE_TRACKED: Capability.WORKSPACE_WRITE_TRACKED,
McpCapability.GIT_STATUS_READ: Capability.GIT_STATUS_READ,
McpCapability.GIT_WRITE: Capability.GIT_WRITE,
McpCapability.ENV_READ: Capability.ENV_READ,
McpCapability.ENV_WRITE: Capability.ENV_WRITE,
McpCapability.PROCESS_EXEC_BOUNDED: Capability.PROCESS_EXEC_BOUNDED,
McpCapability.PROCESS_EXEC_UNBOUNDED: Capability.PROCESS_EXEC_UNBOUNDED,
McpCapability.RUN_REPORT_PROGRESS: Capability.RUN_REPORT_PROGRESS,
McpCapability.UPSTREAM_TOOL_USE: Capability.UPSTREAM_TOOL_USE,
McpCapability.WEB_SEARCH: Capability.WEB_SEARCH,
McpCapability.WEB_VISIT: Capability.WEB_VISIT,
McpCapability.MEDIA_READ: Capability.MEDIA_READ,
McpCapability.WORKSPACE_METADATA_READ: Capability.WORKSPACE_METADATA_READ,
McpCapability.WORKSPACE_EDIT: Capability.WORKSPACE_EDIT,
McpCapability.WORKSPACE_DELETE: Capability.WORKSPACE_DELETE,
# Legacy alias kept for backward-compatibility; canonical capability is plan_write.
McpCapability.ARTIFACT_PLAN_SUBMIT: Capability.ARTIFACT_PLAN_WRITE,
}
_RALPH_CAPABILITY_ALIASES: dict[str, Capability] = {
"process_exec_bounded": Capability.PROCESS_EXEC_BOUNDED,
"process_exec_unbounded": Capability.PROCESS_EXEC_UNBOUNDED,
"process.exec_bounded": Capability.PROCESS_EXEC_BOUNDED,
"process.exec_unbounded": Capability.PROCESS_EXEC_UNBOUNDED,
"git.status.read": Capability.GIT_STATUS_READ,
"git.status_read": Capability.GIT_STATUS_READ,
"git.diff.read": Capability.GIT_DIFF_READ,
"git.diff_read": Capability.GIT_DIFF_READ,
"web.search": Capability.WEB_SEARCH,
"web_search": Capability.WEB_SEARCH,
"web.visit": Capability.WEB_VISIT,
"web_visit": Capability.WEB_VISIT,
"media.read": Capability.MEDIA_READ,
"media_read": Capability.MEDIA_READ,
"workspace.metadata_read": Capability.WORKSPACE_METADATA_READ,
"workspace.metadata.read": Capability.WORKSPACE_METADATA_READ,
"artifact.plan_read": Capability.ARTIFACT_PLAN_READ,
"artifact.plan_write": Capability.ARTIFACT_PLAN_WRITE,
"workspace.edit": Capability.WORKSPACE_EDIT,
"workspace.delete": Capability.WORKSPACE_DELETE,
"artifact.plan_submit": Capability.ARTIFACT_PLAN_WRITE,
}
_MCP_CAPABILITY_ALIASES: dict[str, McpCapability] = {
"workspace.read": McpCapability.WORKSPACE_READ,
"workspace.write_ephemeral": McpCapability.WORKSPACE_WRITE_EPHEMERAL,
"workspace.write_tracked": McpCapability.WORKSPACE_WRITE_TRACKED,
"artifact.submit": McpCapability.ARTIFACT_SUBMIT,
"artifact.plan_read": McpCapability.ARTIFACT_PLAN_READ,
"artifact.plan_write": McpCapability.ARTIFACT_PLAN_WRITE,
"workspace.coordination": McpCapability.WORKSPACE_COORDINATION,
"git.read": McpCapability.GIT_READ,
"git.status.read": McpCapability.GIT_STATUS_READ,
"git.status_read": McpCapability.GIT_STATUS_READ,
"git.write": McpCapability.GIT_WRITE,
"env.read": McpCapability.ENV_READ,
"env.write": McpCapability.ENV_WRITE,
"process.exec": McpCapability.PROCESS_EXEC,
"process.exec_bounded": McpCapability.PROCESS_EXEC_BOUNDED,
"process.exec_unbounded": McpCapability.PROCESS_EXEC_UNBOUNDED,
"process_exec_bounded": McpCapability.PROCESS_EXEC_BOUNDED,
"process_exec_unbounded": McpCapability.PROCESS_EXEC_UNBOUNDED,
"run.report_progress": McpCapability.RUN_REPORT_PROGRESS,
"file.read": McpCapability.FILE_READ,
"file.write": McpCapability.FILE_WRITE,
"upstream.tool_use": McpCapability.UPSTREAM_TOOL_USE,
"upstream_tool_use": McpCapability.UPSTREAM_TOOL_USE,
"web.search": McpCapability.WEB_SEARCH,
"web_search": McpCapability.WEB_SEARCH,
"web.visit": McpCapability.WEB_VISIT,
"web_visit": McpCapability.WEB_VISIT,
"media.read": McpCapability.MEDIA_READ,
"media_read": McpCapability.MEDIA_READ,
"workspace.metadata_read": McpCapability.WORKSPACE_METADATA_READ,
"workspace.edit": McpCapability.WORKSPACE_EDIT,
"workspace.delete": McpCapability.WORKSPACE_DELETE,
"artifact.plan_submit": McpCapability.ARTIFACT_PLAN_WRITE,
}
def _policy_validation_error_type() -> type[Exception]:
return cast(
"type[Exception]",
import_module("ralph.policy.validation").PolicyValidationError,
)
_APPROVED_POLICY_VALUES = {"approved", "allow", "allowed"}
_DENIED_POLICY_VALUES = {"denied", "deny", "denied_by_policy"}
_APPROVED_WITH_RESTRICTION_VALUES = {
"approvedwithrestriction",
"approved_with_restriction",
"allow_with_restriction",
"allowed_with_restriction",
}
def normalize_token(value: str) -> str:
"""Normalize a capability or policy token to lowercase with underscores."""
return value.strip().replace("-", "_").replace(" ", "_").lower()
def extract_text_field(value: object, field_name: str) -> str | None:
"""Extract a named string field from a dict or object attribute, returning None if absent."""
if isinstance(value, dict):
field_value = value.get(field_name)
else:
field_value = getattr(value, field_name, None)
return field_value if isinstance(field_value, str) else None
def extract_named_value(value: object) -> str | None:
"""Extract the canonical string value from a string, Enum, or structured object."""
if isinstance(value, str):
return value
if isinstance(value, Enum):
enum_value = cast("object", value.value)
if isinstance(enum_value, str):
return enum_value
for field_name in ("status", "name", "value"):
field_value = extract_text_field(value, field_name)
if field_value is not None:
return field_value
return None
def coerce_session_drain(value: SessionDrain | str) -> SessionDrain:
"""Coerce a string or SessionDrain to a SessionDrain, raising ValueError for unknown values."""
if isinstance(value, SessionDrain):
return value
normalized = normalize_token(value)
aliases = {
"planning": SessionDrain.PLANNING,
"development": SessionDrain.DEVELOPMENT,
"development_analysis": SessionDrain.DEVELOPMENT_ANALYSIS,
"development_commit": SessionDrain.DEVELOPMENT_COMMIT,
"analysis": SessionDrain.ANALYSIS,
"review": SessionDrain.REVIEW,
"review_analysis": SessionDrain.REVIEW_ANALYSIS,
"review_commit": SessionDrain.REVIEW_COMMIT,
"fix": SessionDrain.FIX,
"commit": SessionDrain.COMMIT,
}
try:
return aliases[normalized]
except KeyError as exc:
raise ValueError(f"Unknown session drain: {value!r}") from exc
def coerce_capability(value: Capability | str) -> Capability:
"""Coerce a string or Capability to a Capability enum, raising ValueError for unknown values."""
if isinstance(value, Capability):
return value
normalized = normalize_token(value)
for capability in Capability:
if normalize_token(capability.value) == normalized:
return capability
try:
return _RALPH_CAPABILITY_ALIASES[normalized]
except KeyError as exc:
raise ValueError(f"Unknown Ralph capability: {value!r}") from exc
def coerce_mcp_capability(value: McpCapability | str) -> McpCapability:
"""Coerce a string or McpCapability to a McpCapability enum."""
if isinstance(value, McpCapability):
return value
normalized = normalize_token(value)
for capability in McpCapability:
if normalize_token(capability.value) == normalized:
return capability
try:
return _MCP_CAPABILITY_ALIASES[normalized]
except KeyError as exc:
raise ValueError(f"Unknown McpCapability: {value!r}") from exc
def normalize_policy_outcome(value: object) -> PolicyOutcome:
"""Normalize any policy outcome representation to a PolicyOutcome."""
if isinstance(value, PolicyOutcome):
return value
if value is True:
return PolicyOutcome(status=PolicyOutcomeStatus.APPROVED)
status_value = extract_named_value(value)
normalized_status = normalize_token(status_value) if status_value is not None else ""
reason = extract_text_field(value, "reason")
restriction = extract_text_field(value, "restriction")
status = resolved_policy_status(value, normalized_status, reason)
if status is not None:
return PolicyOutcome(status=status, reason=reason, restriction=restriction)
raise ValueError(f"Unsupported policy outcome: {value!r}")
def resolved_policy_status(
value: object,
normalized_status: str,
reason: str | None,
) -> PolicyOutcomeStatus | None:
"""Resolve a normalized status string to a PolicyOutcomeStatus, or None if unrecognized."""
if normalized_status in _APPROVED_POLICY_VALUES:
return PolicyOutcomeStatus.APPROVED
if normalized_status in _APPROVED_WITH_RESTRICTION_VALUES:
return PolicyOutcomeStatus.APPROVED_WITH_RESTRICTION
if normalized_status in _DENIED_POLICY_VALUES:
return PolicyOutcomeStatus.DENIED
if isinstance(value, dict) and "reason" in value:
return PolicyOutcomeStatus.DENIED
if reason is not None:
return PolicyOutcomeStatus.DENIED
return None
def drain_class_for_drain_name(
name: str,
agents_policy: AgentsPolicy | None = None,
) -> DrainClass:
"""Resolve the drain class for any policy-declared drain name.
Resolution order:
1. Explicit drain_class on the AgentDrainConfig (highest priority).
2. Drain name itself is a valid DrainClass value (fallback).
3. PolicyValidationError when neither applies.
"""
policy_validation_error = _policy_validation_error_type()
if agents_policy is not None:
drain_cfg = agents_policy.agent_drains.get(name)
if drain_cfg is not None and drain_cfg.drain_class is not None:
try:
return DrainClass(drain_cfg.drain_class)
except ValueError as err:
raise policy_validation_error(
f"Drain '{name}' has invalid drain_class '{drain_cfg.drain_class}'; "
f"expected one of: planning, development, analysis, review, fix, commit."
) from err
try:
return DrainClass(name)
except ValueError:
pass
raise policy_validation_error(
f"Drain '{name}' has no drain_class declared in agents.toml; "
f"add drain_class = '<class>' under [agent_drains.{name}] "
f"(one of: planning, development, analysis, review, fix, commit)."
)
[docs]
def drain_class_for_session(
drain: SessionDrain | str,
agents_policy: AgentsPolicy | None = None,
) -> DrainClass:
"""Classify a session drain into its drain class.
Resolution is policy-defined only: callers must supply ``agents_policy`` and
the drain must be declared there with an explicit ``drain_class``.
"""
if agents_policy is None:
raise _policy_validation_error_type()(
f"Drain {drain!r} cannot resolve drain_class without agents_policy; "
"pass the active agents policy so drain_class is read from declarations"
)
return drain_class_for_drain_name(str(drain), agents_policy)
[docs]
def drain_to_access_mode(
drain: SessionDrain | str,
agents_policy: AgentsPolicy | None = None,
) -> AccessMode:
"""Determine the MCP access mode for a session drain."""
if drain_class_for_session(drain, agents_policy).allows_write():
return AccessMode.READ_WRITE
return AccessMode.READ_ONLY
[docs]
def drain_to_policy_mode(
drain: SessionDrain | str,
agents_policy: AgentsPolicy | None = None,
) -> PolicyMode:
"""Map a session drain to the matching policy mode.
Accepts any policy-declared drain name by resolving its class through
drain_class_for_session. DrainClass and PolicyMode share the same
vocabulary, so the mapping is a direct value lookup.
"""
dc = drain_class_for_session(drain, agents_policy)
return PolicyMode(dc.value)
[docs]
def lookup_ralph_capability(capability: McpCapability | str) -> Capability | None:
"""Look up the Ralph capability mapped from an MCP capability."""
try:
normalized_capability = coerce_mcp_capability(capability)
except ValueError:
return None
return MCP_TO_RALPH_CAPABILITY_MAP.get(normalized_capability)
[docs]
def policy_from_outcome(outcome: object) -> AccessDecision:
"""Convert a Ralph policy outcome to an MCP access decision."""
normalized_outcome = normalize_policy_outcome(outcome)
if normalized_outcome.status in {
PolicyOutcomeStatus.APPROVED,
PolicyOutcomeStatus.APPROVED_WITH_RESTRICTION,
}:
return AccessDecision.allow()
reason = normalized_outcome.reason or "denied"
return AccessDecision.deny(reason, AccessDeniedCode.CAPABILITY_DENIED)
[docs]
def evaluate_workspace_write(ephemeral: object, tracked: object) -> AccessDecision:
"""Evaluate the composite workspace-write policy."""
ephemeral_outcome = normalize_policy_outcome(ephemeral)
tracked_outcome = normalize_policy_outcome(tracked)
allowed_statuses = {
PolicyOutcomeStatus.APPROVED,
PolicyOutcomeStatus.APPROVED_WITH_RESTRICTION,
}
if ephemeral_outcome.status in allowed_statuses or tracked_outcome.status in allowed_statuses:
return AccessDecision.allow()
return AccessDecision.deny(
"Workspace write capability not granted",
AccessDeniedCode.CAPABILITY_DENIED,
)
[docs]
def evaluate_mapped_capability(
capability: McpCapability | str,
mapped_outcome: tuple[Capability | str, object] | None,
) -> AccessDecision:
"""Evaluate access for a capability that maps directly to a Ralph capability."""
try:
normalized_capability = coerce_mcp_capability(capability)
except ValueError:
return AccessDecision.deny(
f"Unknown capability: {capability!r}",
AccessDeniedCode.CAPABILITY_DENIED,
)
if mapped_outcome is None:
return AccessDecision.deny(
f"Unknown capability: {normalized_capability.value}",
AccessDeniedCode.CAPABILITY_DENIED,
)
mapped_capability, outcome = mapped_outcome
coerce_capability(mapped_capability)
return policy_from_outcome(outcome)
[docs]
def check_mcp_capability_policy(
capability: McpCapability | str,
ephemeral: object,
tracked: object,
mapped_outcome: tuple[Capability | str, object] | None,
) -> AccessDecision:
"""Decide access for an MCP capability from session policy outcomes."""
try:
normalized_capability = coerce_mcp_capability(capability)
except ValueError:
return AccessDecision.deny(
"Unrecognized McpCapability "
f"{capability!r}: ralph-workflow has not been updated "
"to handle this capability variant",
AccessDeniedCode.CAPABILITY_DENIED,
)
if normalized_capability in {McpCapability.WORKSPACE_WRITE_ANY, McpCapability.FILE_WRITE}:
return evaluate_workspace_write(ephemeral, tracked)
if normalized_capability is McpCapability.WORKSPACE_COORDINATION:
return AccessDecision.allow()
if normalized_capability in {
McpCapability.FILE_READ,
McpCapability.GIT_READ,
McpCapability.PROCESS_EXEC,
McpCapability.ARTIFACT_SUBMIT,
McpCapability.WORKSPACE_READ,
McpCapability.WORKSPACE_WRITE_EPHEMERAL,
McpCapability.WORKSPACE_WRITE_TRACKED,
McpCapability.GIT_STATUS_READ,
McpCapability.GIT_WRITE,
McpCapability.ENV_READ,
McpCapability.ENV_WRITE,
McpCapability.PROCESS_EXEC_BOUNDED,
McpCapability.PROCESS_EXEC_UNBOUNDED,
McpCapability.RUN_REPORT_PROGRESS,
McpCapability.UPSTREAM_TOOL_USE,
McpCapability.WEB_SEARCH,
McpCapability.WEB_VISIT,
McpCapability.MEDIA_READ,
McpCapability.WORKSPACE_METADATA_READ,
McpCapability.WORKSPACE_EDIT,
McpCapability.WORKSPACE_DELETE,
McpCapability.ARTIFACT_PLAN_SUBMIT,
}:
return evaluate_mapped_capability(normalized_capability, mapped_outcome)
return AccessDecision.deny(
"Unrecognized McpCapability "
f"{normalized_capability.value!r}: ralph-workflow has not been updated "
"to handle this capability variant",
AccessDeniedCode.CAPABILITY_DENIED,
)
__all__ = [
"MCP_TO_RALPH_CAPABILITY_MAP",
"AccessDecision",
"AccessDeniedCode",
"AccessMode",
"Capability",
"DrainClass",
"McpCapability",
"PolicyMode",
"PolicyOutcome",
"PolicyOutcomeStatus",
"SessionDrain",
"check_mcp_capability_policy",
"drain_class_for_session",
"drain_to_access_mode",
"drain_to_policy_mode",
"evaluate_mapped_capability",
"evaluate_workspace_write",
"lookup_ralph_capability",
"policy_from_outcome",
]