feat(metrics): Add Risk-Control Metric Suite
Issue Link / Problem Description
- Fixes #2279
- Problem: The
ragaslibrary currently excels at evaluating the quality of generated answers but lacks metrics to assess a RAG system's trustworthiness and risk-control mechanisms. Specifically, it cannot measure a system's ability to recognize uncertainty and proactively abstain from answering when the retrieved context is insufficient or irrelevant. This is a critical capability for deploying reliable RAG systems in production and safety-critical domains.
Changes Made
- Added
_risk_control.py: Introduced a new filesrc/ragas/metrics/_risk_control.pywhich contains the implementation for a new suite of four interconnected metrics:Risk: Measures the probability of a "risky" answer (lower is better).Carefulness: Measures the ability to correctly discard unanswerable questions.Alignment: Measures the overall accuracy of the keep/discard decision.Coverage: Measures the proportion of questions the system attempts to answer.
- Added
risk_control_suitefactory function: This function efficiently initializes all four metrics, sharing a single calculation pass over the dataset to improve performance. - Updated
metrics/__init__.py: Exposed the new metrics (Risk,Carefulness,Alignment,Coverage) and therisk_control_suitefactory function to make them accessible to users.
Testing
How to Test
- [x] Automated tests added/updated
- Automated: A new test file
tests/unit/test_risk_control.pyhas been added. It includes comprehensive unit tests that verify:- Correct calculation of all four metrics on a sample dataset.
- Correct handling of edge cases (e.g., no "kept" answers, no "unanswerable" questions).
- Proper error handling for missing required columns.
- To run the tests:
pytest tests/unit/test_risk_control.py
Hi @anistark ,
Thank you so much for the insightful feedback on the initial proposal. It helped me understand the core ragas evaluation architecture much more deeply.
Based on your feedback, I've completely refactored the implementation to address every concern. Before I update the PR, I wanted to propose the new design here to make sure we're aligned.
Understanding the Challenge: A Corpus-Level Metric
The core challenge, as I now understand it, is that metrics like Risk and Carefulness are fundamentally corpus-level (or dataset-level). The score is an aggregate (Total UK / Total Kept) and cannot be calculated from a single row in isolation. This contrasts with row-level metrics like faithfulness.
The evaluate() function, however, is designed to call _single_turn_ascore() on a per-row basis. The design challenge is to bridge this gap in an efficient and stateless way that respects the ragas architecture.
Proposed Solution: Lazy, Cached Corpus Calculation
The proposed solution is to make the metric objects themselves stateless and perform a lazy, one-time calculation for the entire dataset on the first row processed. The results are then cached for all subsequent rows in that evaluation run.
This is achieved by leveraging the internal __ragas_dataset__ attribute that the evaluate function attaches to each sample.
Here’s a snapshot of the new design pattern in _risk_control.py:
from weakref import WeakKeyDictionary
from ragas.dataset_schema import SingleTurnSample
# ... other imports
# A module-level cache to store results per dataset object
_calculator_cache: WeakKeyDictionary[Dataset, dict[str, float]] = WeakKeyDictionary()
def _calculate_scores_for_dataset(dataset: Dataset) -> dict[str, float]:
"""
Performs the one-time, full-dataset calculation and caches the result.
"""
if dataset in _calculator_cache:
return _calculator_cache[dataset]
# ... full scan and calculation logic ...
_calculator_cache[dataset] = scores
return scores
@dataclass
class Risk(SingleTurnMetric):
name: str = "risk"
# ...
async def _single_turn_ascore(self, sample: SingleTurnSample, callbacks: Callbacks) -> float:
"""
For each row, get a reference to the parent dataset and compute scores.
The calculation only runs on the first call; subsequent calls are instant cache hits.
"""
dataset = getattr(sample, "__ragas_dataset__")
scores = _calculate_scores_for_dataset(dataset)
return scores["risk"]
# Users now import the singleton instances directly, no factory needed.
risk = Risk()
carefulness = Carefulness()
@AlanPonnachan This sounds like a good plan.
A few thoughts:
- We won't be able to use metrics outside
evaluate()as there'll be no__ragas_dataset__. So, also won't be able to test indidual metrics in isolation. - Cache not updating with if dataset changes.
dataset = Dataset.from_list([...])
results1 = evaluate(dataset, [risk]) # Calculates and caches
dataset = dataset.add_row(new_data) # Dataset changed
results2 = evaluate(dataset, [risk]) # Uses old cached results
- If ragas changes internals, such as renames
__ragas_dataset__to__dataset__this'll break. However this I can think of as something that would be changed alongside. But better to not hardcode.
How about if you can think of a hybrid approach which combines using factory for control and singleton instances for standard use?
@anistark
I've designed a new hybrid solution that I believe addresses every point.
The new design isolates the core calculation logic into a pure, standalone function that is completely independent of the ragas framework. The metric classes then act as thin, thread-safe wrappers around this testable core.
This is managed through a factory function, risk_control_suite(), which creates a shared cache for a single evaluate() run, ensuring efficiency and safety.
# FILE: src/ragas/metrics/_risk_control.py
import asyncio
from dataclasses import dataclass
def _calculate_scores_for_dataset(dataset: Dataset) -> dict[str, float]:
"""The PURE, TESTABLE core logic. No framework dependencies."""
# ... (full calculation logic) ...
@dataclass
class _RiskCache:
scores: dict[str, float] | None = None
lock: asyncio.Lock = field(default_factory=asyncio.Lock)
def risk_control_suite() -> list[Metric]:
"""Factory to create the suite with a shared, run-specific cache."""
cache = _RiskCache()
return [Risk(cache=cache), Carefulness(cache=cache), ...]
@dataclass(kw_only=True)
class Risk(SingleTurnMetric):
cache: _RiskCache = field(default_factory=_RiskCache)
async def _single_turn_ascore(self, sample: SingleTurnSample, ...) -> float:
async with self.cache.lock: # Prevent race conditions
if self.cache.scores is None:
dataset = getattr(sample, "__ragas_dataset__")
self.cache.scores = _calculate_scores_for_dataset(dataset)
return self.cache.scores["risk"]
How This Addresses Your Feedback:
-
Testability: The core logic is now in
_calculate_scores_for_dataset, which is a pure function. Our unit tests can call this directly, making the metric fully testable in isolation. -
Cache Staleness: The cache is no longer global. The
risk_control_suite()factory creates a fresh cache for eachevaluate()run, so state cannot leak between runs. This guarantees correctness. -
Fragile
__ragas_dataset__: The dependency is now architecturally contained. The core, testable logic is pure. The fragilegetattrcall is isolated in a thin adapter layer (_single_turn_ascore), minimizing the "blast radius" of any future internal changes inragas. (this seems to be a necessary trade-off for any corpus-level metric that needs to integrate seamlessly into the existing row-levelevaluate()loop.) -
Hybrid Approach: This is achieved perfectly. The
risk_control_suite()factory provides the controlled, efficient path for users, while the metrics remain robust. I've also added anasyncio.Lockto ensure it's safe for parallel execution.
This design feels like a great balance of robustness and simplicity. If this direction looks good to you, I will update the PR with this final implementation.