Source code for ralph.mcp.artifacts.audit_adapter

"""Audit adapter utilities for MCP records."""

from __future__ import annotations

from threading import Lock
from typing import TYPE_CHECKING, Protocol

from ralph.mcp.artifacts._agent_session_id import AgentSessionId
from ralph.mcp.artifacts._audit_correlation import AuditCorrelation
from ralph.mcp.artifacts._audit_metadata import AuditMetadata
from ralph.mcp.artifacts._mcp_audit_correlation import McpAuditCorrelation
from ralph.mcp.artifacts._mcp_audit_event_type import McpAuditEventType
from ralph.mcp.artifacts._mcp_audit_record import McpAuditRecord
from ralph.mcp.artifacts._ralph_audit_record import RalphAuditRecord
from ralph.mcp.protocol.capability_mapping import (
    AccessDecision,
    PolicyMode,
    PolicyOutcome,
    PolicyOutcomeStatus,
    lookup_ralph_capability,
)
from ralph.mcp.protocol.capability_mapping import (
    Capability as RalphCapability,
)

if TYPE_CHECKING:

    class AuditSink(Protocol):
        """Protocol defining the MCP audit sink contract."""

        def emit(self, record: McpAuditRecord) -> None: ...

        def flush(self) -> None: ...


[docs] def outcome_from_decision(decision: AccessDecision) -> PolicyOutcome: """Convert an access decision into a policy outcome.""" if decision.is_allowed(): return PolicyOutcome(status=PolicyOutcomeStatus.APPROVED) reason = decision.reason or "denied" return PolicyOutcome(status=PolicyOutcomeStatus.DENIED, reason=reason)
[docs] def resolve_audit_capability(record: McpAuditRecord) -> RalphCapability: """Map MCP capability to Ralph capability or fall back to workspace read.""" if record.capability is None: return RalphCapability.WORKSPACE_READ mapped = lookup_ralph_capability(record.capability) return mapped or RalphCapability.WORKSPACE_READ
def event_type_label(event_type: McpAuditEventType) -> str: """Return the lowercase label for an MCP audit event.""" return event_type.value def default_description(record: McpAuditRecord) -> str: """Build a default description when no metadata details are provided.""" if record.decision.is_allowed(): return f"MCP tool '{record.tool_name}' executed successfully" reason = record.decision.reason or "denied" return f"MCP tool '{record.tool_name}' access denied: {reason}" def resolve_description(record: McpAuditRecord) -> str: """Prefer metadata details but fall back to the default description.""" return record.metadata.details or default_description(record) def resolve_correlation(record: McpAuditRecord) -> AuditCorrelation | None: """Construct correlation metadata for Ralph audit records.""" corr = record.metadata.correlation policy_mode = None if corr.policy_mode is not None: policy_mode = ( corr.policy_mode.value if isinstance(corr.policy_mode, PolicyMode) else str(corr.policy_mode) ) if not any([corr.run_id, corr.generation, corr.drain, policy_mode]): return None return AuditCorrelation( run_id=corr.run_id, generation=corr.generation, drain=corr.drain, policy_mode=policy_mode, ) def to_ralph_record(record: McpAuditRecord) -> RalphAuditRecord: """Translate an MCP audit record into Ralph's domain model.""" return RalphAuditRecord( session_id=AgentSessionId.from_string(record.session_id), timestamp=record.timestamp_nanos // 1_000_000_000, capability=resolve_audit_capability(record), outcome=outcome_from_decision(record.decision), description=resolve_description(record), event_type=event_type_label(record.metadata.event_type), correlation=resolve_correlation(record), )
[docs] class RalphAuditSinkAdapter: """Adapter that buffers Ralph audit records produced by MCP.""" def __init__(self) -> None: self._records: list[RalphAuditRecord] = [] self._lock = Lock()
[docs] def emit(self, record: McpAuditRecord) -> None: """Store a converted audit record in the buffer.""" with self._lock: self._records.append(to_ralph_record(record))
[docs] def drain_records(self) -> list[RalphAuditRecord]: """Return buffered records and clear the buffer.""" with self._lock: drained = list(self._records) self._records.clear() return drained
[docs] def flush(self) -> None: """No-op flush since records are held in memory."""
__all__ = [ "AgentSessionId", "AuditCorrelation", "AuditMetadata", "AuditSink", "McpAuditCorrelation", "McpAuditEventType", "McpAuditRecord", "RalphAuditRecord", "RalphAuditSinkAdapter", "outcome_from_decision", "resolve_audit_capability", ]