"""Failure classification logic and rules."""
from __future__ import annotations
import errno
import socket
import urllib.error
from loguru import logger
from .classified_failure import ClassifiedFailure
from .failure_category import FailureCategory
from .failure_details import contains_casefolded_marker, failure_detail_parts
# Network/transport error substrings that indicate environmental faults
_TRANSPORT_SUBSTRINGS: frozenset[str] = frozenset(
{
"connection reset",
"socket hang up",
"ECONNREFUSED",
"broken pipe",
"Network is unreachable",
"Temporary failure in name resolution",
"Connection refused",
"Connection timed out",
"Name or service not known",
"nodename nor servname provided",
}
)
# OSError error numbers that indicate environmental faults
_ENV_ERRNOS: frozenset[int] = frozenset(
{
errno.ENETUNREACH,
errno.ECONNREFUSED,
errno.EHOSTUNREACH,
errno.ENETDOWN,
errno.EPIPE,
errno.ENOSPC,
}
)
# Substrings that indicate a stale/invalid agent session ID was used for resume.
SESSION_NOT_FOUND_SUBSTRINGS: tuple[str, ...] = (
"No conversation found with session ID:",
"Session not found",
"Unknown session",
"session does not exist",
)
_MISSING_ARTIFACT_SUBSTRINGS: frozenset[str] = frozenset(
{
"Missing required artifact",
"Artifact not found at .agent/artifacts/",
"required_artifact_missing",
"Missing/invalid issues artifact",
"Missing required analysis artifact",
"Missing fix_result artifact",
}
)
# Typed *ValidationError class names that should route to ARTIFACT_VALIDATION.
# Matched by type(exc).__name__ to avoid circular imports between ralph.recovery
# and ralph.mcp/ralph.pipeline.
_ARTIFACT_VALIDATION_TYPE_NAMES: frozenset[str] = frozenset(
{
"PlanArtifactValidationError",
"DevelopmentResultValidationError",
"TypedArtifactValidationError",
"SmokeTestResultValidationError",
"ProductSpecValidationError",
"WorkUnitsValidationError",
}
)
def _is_environmental_exc(exc: BaseException) -> bool:
"""Return True if this exception is clearly an environmental/network fault."""
if isinstance(exc, ConnectionError | TimeoutError):
return True
if isinstance(exc, socket.gaierror):
return True
if isinstance(exc, urllib.error.URLError):
return True
return isinstance(exc, OSError) and exc.errno in _ENV_ERRNOS
def _message_looks_environmental(msg: str) -> bool:
"""Return True if the error message contains a transport-fault substring."""
lower = msg.lower()
return any(s.lower() in lower for s in _TRANSPORT_SUBSTRINGS)
def _is_user_config_exc(exc: BaseException) -> bool:
type_name = type(exc).__name__
return type_name in {
"DrainNotBoundError",
"CheckpointPolicyMismatchError",
"PolicyValidationError",
"UnknownAgentError",
"McpConfigError",
}
def _is_user_config_message(raw_message: str) -> bool:
config_substrings = (
"is not bound",
"is not defined",
"Checkpoint was saved at phase",
"no longer exists in pipeline.toml",
)
return any(s in raw_message for s in config_substrings)
[docs]
def is_missing_artifact_message(raw_message: str) -> bool:
"""Return True if the message indicates a missing required artifact."""
return any(s in raw_message for s in _MISSING_ARTIFACT_SUBSTRINGS)
def _is_artifact_validation_message(raw_message: str) -> bool:
"""Return True for artifact/proof validation failures with a typed recovery path."""
artifact_validation_substrings = (
"Invalid plan artifact:",
"Invalid development evidence:",
"Missing/invalid issues artifact:",
"Artifact type mismatch:",
"Artifact content must be a JSON object",
"must be valid JSON text",
"must be a JSON object",
"PROOF INVALID:",
"PROOF INCOMPLETE:",
"proof entries are incomplete or invalid",
"how_to_fix item",
"Unknown plan_item reference",
"Unknown how_to_fix_item reference",
)
return is_missing_artifact_message(raw_message) or any(
s in raw_message for s in artifact_validation_substrings
)
def _is_stale_session_message(raw_message: str) -> bool:
"""Return True if the message indicates a stale agent session ID was used."""
return contains_casefolded_marker((raw_message,), SESSION_NOT_FOUND_SUBSTRINGS)
[docs]
class FailureClassifier:
"""Classify failures into categories for intelligent recovery routing.
This is a pure, stateless classifier. All classification rules are
encapsulated here so new failure modes are added once, not at call sites.
"""
[docs]
def classify(
self,
exc: BaseException | str,
*,
phase: str,
agent: str | None,
) -> ClassifiedFailure:
"""Classify a failure and return a ClassifiedFailure."""
if isinstance(exc, str):
raw_message = exc
original: BaseException | None = None
detail_parts = [exc]
else:
exc_msg = str(exc)
type_name = type(exc).__name__
raw_message = f"{type_name}: {exc_msg}" if exc_msg else type_name
original = exc
detail_parts = failure_detail_parts(exc)
exc_obj = exc if isinstance(exc, BaseException) else None
category, counts, reset_session = self._categorize(exc_obj, raw_message, detail_parts)
if category == FailureCategory.AMBIGUOUS:
logger.warning(
"Ambiguous failure classification in phase={} agent={}: "
"{} [flagged_for_review=true]",
phase,
agent,
raw_message[:200],
)
return ClassifiedFailure(
category=category,
reason=self._build_reason(category, raw_message),
attributed_agent=agent if category == FailureCategory.AGENT else None,
attributed_phase=phase,
counts_against_budget=counts,
original_exception=original,
raw_message=raw_message,
reset_session=reset_session,
)
def _categorize_exc(
self,
exc: BaseException,
raw_message: str,
detail_parts: list[str],
) -> tuple[FailureCategory, bool, bool] | None:
if _is_user_config_exc(exc):
return FailureCategory.USER_CONFIG, False, False
# Route typed *ValidationError exceptions by type name BEFORE environmental
# checks so a future validation error that wraps an OSError is still caught here.
if type(exc).__name__ in _ARTIFACT_VALIDATION_TYPE_NAMES:
return FailureCategory.ARTIFACT_VALIDATION, False, False
if _is_environmental_exc(exc):
return FailureCategory.ENVIRONMENTAL, False, False
type_name = type(exc).__name__
if type_name == "AgentInactivityTimeoutError":
return FailureCategory.AGENT, True, False
if type_name == "AgentInvocationError":
reset_session = contains_casefolded_marker(
detail_parts, SESSION_NOT_FOUND_SUBSTRINGS
)
if reset_session or (
not _message_looks_environmental(raw_message)
and (
"empty" in (msg_lower := raw_message.lower())
or "no output" in msg_lower
or "timed out" in msg_lower
)
):
return FailureCategory.AGENT, True, reset_session
return None
def _categorize(
self,
exc: BaseException | None,
raw_message: str,
detail_parts: list[str],
) -> tuple[FailureCategory, bool, bool]:
if exc is not None:
result = self._categorize_exc(exc, raw_message, detail_parts)
if result is not None:
return result
if contains_casefolded_marker(detail_parts, SESSION_NOT_FOUND_SUBSTRINGS):
return FailureCategory.AGENT, True, True
if _message_looks_environmental(raw_message):
return FailureCategory.ENVIRONMENTAL, False, False
if _is_user_config_message(raw_message):
return FailureCategory.USER_CONFIG, False, False
if _is_artifact_validation_message(raw_message):
return FailureCategory.ARTIFACT_VALIDATION, False, False
return FailureCategory.AMBIGUOUS, False, False
def _build_reason(self, category: FailureCategory, raw_message: str) -> str:
prefix_map = {
FailureCategory.ENVIRONMENTAL: "Environmental fault",
FailureCategory.AGENT: "Agent fault",
FailureCategory.USER_CONFIG: "Configuration fault",
FailureCategory.ARTIFACT_VALIDATION: "Artifact validation fault",
FailureCategory.AMBIGUOUS: "Ambiguous fault (flagged for review)",
}
prefix = prefix_map.get(category, "Unknown fault")
msg = raw_message[:300] if raw_message else "(no message)"
return f"{prefix}: {msg}"