Source code for biblicus.evaluation.retrieval

"""
Evaluation utilities for Biblicus retrieval snapshots.
"""

from __future__ import annotations

import json
import time
from pathlib import Path
from typing import Dict, List, Optional

from pydantic import BaseModel, ConfigDict, Field, model_validator

from ..constants import DATASET_SCHEMA_VERSION
from ..corpus import Corpus
from ..models import QueryBudget, RetrievalResult, RetrievalSnapshot
from ..retrievers import get_retriever
from ..time import utc_now_iso


class EvaluationQuery(BaseModel):
    """
    Query record for retrieval evaluation.

    :ivar query_id: Unique identifier for the query.
    :vartype query_id: str
    :ivar query_text: Natural language query to execute.
    :vartype query_text: str
    :ivar expected_item_id: Optional expected item identifier.
    :vartype expected_item_id: str or None
    :ivar expected_source_uri: Optional expected source uniform resource identifier.
    :vartype expected_source_uri: str or None
    :ivar kind: Query kind (gold or synthetic).
    :vartype kind: str
    """

    model_config = ConfigDict(extra="forbid")

    query_id: str
    query_text: str
    expected_item_id: Optional[str] = None
    expected_source_uri: Optional[str] = None
    kind: str = Field(default="gold")

    @model_validator(mode="after")
    def _require_expectation(self) -> "EvaluationQuery":
        if not self.expected_item_id and not self.expected_source_uri:
            raise ValueError(
                "Evaluation queries must include expected_item_id or expected_source_uri"
            )
        return self


class EvaluationDataset(BaseModel):
    """
    Dataset for retrieval evaluation.

    :ivar schema_version: Dataset schema version.
    :vartype schema_version: int
    :ivar name: Dataset name.
    :vartype name: str
    :ivar description: Optional description.
    :vartype description: str or None
    :ivar queries: List of evaluation queries.
    :vartype queries: list[EvaluationQuery]
    """

    model_config = ConfigDict(extra="forbid")

    schema_version: int = Field(ge=1)
    name: str
    description: Optional[str] = None
    queries: List[EvaluationQuery] = Field(default_factory=list)

    @model_validator(mode="after")
    def _enforce_schema_version(self) -> "EvaluationDataset":
        if self.schema_version != DATASET_SCHEMA_VERSION:
            raise ValueError(f"Unsupported dataset schema version: {self.schema_version}")
        return self


class EvaluationResult(BaseModel):
    """
    Result bundle for a retrieval evaluation.

    :ivar dataset: Dataset metadata.
    :vartype dataset: dict[str, object]
    :ivar retriever_id: Retriever identifier.
    :vartype retriever_id: str
    :ivar snapshot_id: Retrieval snapshot identifier.
    :vartype snapshot_id: str
    :ivar evaluated_at: International Organization for Standardization 8601 evaluation timestamp.
    :vartype evaluated_at: str
    :ivar metrics: Quality metrics for retrieval.
    :vartype metrics: dict[str, float]
    :ivar system: System metrics for retrieval.
    :vartype system: dict[str, float]
    """

    model_config = ConfigDict(extra="forbid")

    dataset: Dict[str, object]
    retriever_id: str
    snapshot_id: str
    evaluated_at: str
    metrics: Dict[str, float]
    system: Dict[str, float]


[docs] def load_dataset(path: Path) -> EvaluationDataset: """ Load an evaluation dataset from JavaScript Object Notation. :param path: Path to the dataset JavaScript Object Notation file. :type path: Path :return: Parsed evaluation dataset. :rtype: EvaluationDataset """ data = json.loads(path.read_text(encoding="utf-8")) return EvaluationDataset.model_validate(data)
[docs] def evaluate_snapshot( *, corpus: Corpus, snapshot: RetrievalSnapshot, dataset: EvaluationDataset, budget: QueryBudget, ) -> EvaluationResult: """ Evaluate a retrieval snapshot against a dataset. :param corpus: Corpus associated with the snapshot. :type corpus: Corpus :param snapshot: Retrieval snapshot manifest. :type snapshot: RetrievalSnapshot :param dataset: Evaluation dataset. :type dataset: EvaluationDataset :param budget: Evidence selection budget. :type budget: QueryBudget :return: Evaluation result bundle. :rtype: EvaluationResult """ retriever = get_retriever(snapshot.configuration.retriever_id) latency_seconds: List[float] = [] hit_count = 0 reciprocal_ranks: List[float] = [] for query in dataset.queries: timer_start = time.perf_counter() result = retriever.query( corpus, snapshot=snapshot, query_text=query.query_text, budget=budget ) elapsed_seconds = time.perf_counter() - timer_start latency_seconds.append(elapsed_seconds) expected_rank = _expected_rank(result, query) if expected_rank is not None: hit_count += 1 reciprocal_ranks.append(1.0 / expected_rank) else: reciprocal_ranks.append(0.0) total_queries = max(len(dataset.queries), 1) max_total_items = float(budget.max_total_items) hit_rate = hit_count / total_queries precision_at_max_total_items = hit_count / (total_queries * max_total_items) mean_reciprocal_rank = sum(reciprocal_ranks) / total_queries metrics = { "hit_rate": hit_rate, "precision_at_max_total_items": precision_at_max_total_items, "mean_reciprocal_rank": mean_reciprocal_rank, } system = { "average_latency_milliseconds": _average_latency_milliseconds(latency_seconds), "percentile_95_latency_milliseconds": _percentile_95_latency_milliseconds(latency_seconds), "index_bytes": float(_snapshot_artifact_bytes(corpus, snapshot)), } dataset_meta = { "name": dataset.name, "description": dataset.description, "queries": len(dataset.queries), } return EvaluationResult( dataset=dataset_meta, retriever_id=snapshot.configuration.retriever_id, snapshot_id=snapshot.snapshot_id, evaluated_at=utc_now_iso(), metrics=metrics, system=system, )
def _expected_rank(result: RetrievalResult, query: EvaluationQuery) -> Optional[int]: """ Locate the first evidence rank that matches the expected item or source. :param result: Retrieval result for a query. :type result: RetrievalResult :param query: Evaluation query definition. :type query: EvaluationQuery :return: Rank of the first matching evidence item, or None. :rtype: int or None """ for evidence in result.evidence: if query.expected_item_id and evidence.item_id == query.expected_item_id: return evidence.rank if query.expected_source_uri and evidence.source_uri == query.expected_source_uri: return evidence.rank return None def _average_latency_milliseconds(latencies: List[float]) -> float: """ Compute average latency in milliseconds. :param latencies: Latency samples in seconds. :type latencies: list[float] :return: Average latency in milliseconds. :rtype: float """ if not latencies: return 0.0 return sum(latencies) / len(latencies) * 1000.0 def _percentile_95_latency_milliseconds(latencies: List[float]) -> float: """ Compute the percentile 95 latency in milliseconds. :param latencies: Latency samples in seconds. :type latencies: list[float] :return: Percentile 95 latency in milliseconds. :rtype: float """ if not latencies: return 0.0 sorted_latencies = sorted(latencies) percentile_index = int(round(0.95 * (len(sorted_latencies) - 1))) return sorted_latencies[percentile_index] * 1000.0 def _snapshot_artifact_bytes(corpus: Corpus, snapshot: RetrievalSnapshot) -> int: """ Sum artifact sizes for a retrieval snapshot. :param corpus: Corpus that owns the artifacts. :type corpus: Corpus :param snapshot: Retrieval snapshot manifest. :type snapshot: RetrievalSnapshot :return: Total artifact bytes. :rtype: int """ total_bytes = 0 for artifact_relpath in snapshot.snapshot_artifacts: artifact_path = corpus.root / artifact_relpath if artifact_path.exists(): total_bytes += artifact_path.stat().st_size return total_bytes