Source code for biblicus.evaluation.benchmark_runner

"""
Multi-category benchmark runner for the Biblicus Document Understanding Benchmark.

This module orchestrates benchmarking across multiple document categories:
- Forms (FUNSD)
- Academic papers (Scanned ArXiv)
- Receipts (SROIE)

Each category has its own primary metric and evaluation approach, with results
aggregated into a unified benchmark report.
"""

from __future__ import annotations

import json
import time
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional

import yaml

from biblicus.corpus import Corpus
from biblicus.evaluation.ocr_benchmark import OCRBenchmark


[docs] @dataclass class CategoryConfig: """Configuration for a single benchmark category.""" name: str dataset: str primary_metric: str pipelines: List[object] = field(default_factory=list) corpus_path: Optional[Path] = None ground_truth_subdir: Optional[str] = None subset_size: Optional[int] = None tags: List[str] = field(default_factory=list)
[docs] @dataclass class BenchmarkConfig: """Configuration for a complete benchmark run.""" benchmark_name: str categories: Dict[str, CategoryConfig] pipelines: List[Path] aggregate_weights: Dict[str, float] output_dir: Path = Path("results")
[docs] @classmethod def load(cls, config_path: Path) -> "BenchmarkConfig": """ Load benchmark configuration from YAML file. :param config_path: Path to configuration file. :type config_path: Path :return: Loaded configuration. :rtype: BenchmarkConfig """ with open(config_path, "r", encoding="utf-8") as f: data = yaml.safe_load(f) categories = {} for name, cat_data in data.get("categories", {}).items(): categories[name] = CategoryConfig( name=name, dataset=cat_data["dataset"], corpus_path=Path(cat_data.get("corpus_path", f"corpora/{cat_data['dataset']}_benchmark")), ground_truth_subdir=cat_data.get("ground_truth_subdir", f"{cat_data['dataset']}_ground_truth"), primary_metric=cat_data.get("primary_metric", "f1_score"), subset_size=cat_data.get("subset_size"), tags=cat_data.get("tags", []), ) pipelines = [Path(p) for p in data.get("pipelines", [])] aggregate_weights = data.get("aggregate_weights", { "forms": 0.40, "academic": 0.35, "receipts": 0.25, }) return cls( benchmark_name=data.get("benchmark_name", "standard"), categories=categories, pipelines=pipelines, aggregate_weights=aggregate_weights, output_dir=Path(data.get("output_dir", "results")), )
[docs] @dataclass class CategoryResult: """Results for a single category.""" category_name: str dataset: str documents_evaluated: int pipelines: List[Dict[str, Any]] best_pipeline: str best_score: float primary_metric: str primary_score: float processing_time_seconds: float
def run_category(output_dir: Path, cat_config: CategoryConfig, runner: object) -> CategoryResult: """ Execute a single benchmark category using a provided runner. This utility is intentionally lightweight so tests can exercise the primary-metric selection logic without needing full corpus assets. :param output_dir: Directory where intermediate artifacts may be written. :type output_dir: Path :param cat_config: Parsed category configuration. :type cat_config: CategoryConfig :param runner: Object exposing ``run_pipeline(name)`` returning metrics. :type runner: object :return: CategoryResult summarizing the best pipeline. :rtype: CategoryResult """ _ = output_dir start_time = time.time() pipelines: List[Dict[str, Any]] = [] best_pipeline = "" best_score = float("-inf") documents_evaluated = 0 for pipeline in cat_config.pipelines: pipeline_name = getattr(pipeline, "name", str(pipeline)) metrics_obj = runner.run_pipeline(pipeline_name) metrics = { "f1": getattr(metrics_obj, "avg_f1", None), "recall": getattr(metrics_obj, "avg_recall", None), "precision": getattr(metrics_obj, "avg_precision", None), "wer": getattr(metrics_obj, "avg_word_error_rate", None), "lcs_ratio": getattr(metrics_obj, "avg_lcs_ratio", None), "bigram_overlap": getattr(metrics_obj, "avg_bigram_overlap", None), "sequence_accuracy": getattr(metrics_obj, "avg_sequence_accuracy", None), } documents_evaluated = getattr(metrics_obj, "total_documents", documents_evaluated) pipelines.append({"name": pipeline_name, "metrics": metrics}) primary_value = metrics.get(cat_config.primary_metric) if primary_value is None: primary_value = getattr(metrics_obj, f"avg_{cat_config.primary_metric}", None) if primary_value is None: continue if primary_value > best_score: best_score = primary_value best_pipeline = pipeline_name processing_time = time.time() - start_time if best_score == float("-inf"): best_score = 0.0 return CategoryResult( category_name=cat_config.name, dataset=cat_config.dataset, documents_evaluated=documents_evaluated, pipelines=pipelines, best_pipeline=best_pipeline, best_score=best_score, primary_metric=cat_config.primary_metric, primary_score=best_score, processing_time_seconds=processing_time, )
[docs] @dataclass class BenchmarkResult: """Complete benchmark results across all categories.""" benchmark_version: str = "1.0.0" benchmark_name: str = "" timestamp: str = "" categories: Dict[str, CategoryResult] = field(default_factory=dict) aggregate: Dict[str, float] = field(default_factory=dict) recommendations: Dict[str, str] = field(default_factory=dict) total_documents: int = 0 total_processing_time_seconds: float = 0.0
[docs] def to_json(self, path: Path) -> None: """Export results to JSON file.""" data = { "benchmark_version": self.benchmark_version, "benchmark_name": self.benchmark_name, "timestamp": self.timestamp, "total_documents": self.total_documents, "total_processing_time_seconds": self.total_processing_time_seconds, "categories": {}, "aggregate": self.aggregate, "recommendations": self.recommendations, } for cat_name, cat_result in self.categories.items(): data["categories"][cat_name] = { "dataset": cat_result.dataset, "documents_evaluated": cat_result.documents_evaluated, "primary_metric": cat_result.primary_metric, "best_pipeline": cat_result.best_pipeline, "best_score": cat_result.best_score, "processing_time_seconds": cat_result.processing_time_seconds, "pipelines": cat_result.pipelines, } path.parent.mkdir(parents=True, exist_ok=True) with open(path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2)
[docs] def to_markdown(self, path: Path) -> None: """Export results to Markdown file.""" lines = [ "# Biblicus Document Understanding Benchmark Results", "", f"**Benchmark:** {self.benchmark_name}", f"**Date:** {self.timestamp}", f"**Total Documents:** {self.total_documents}", f"**Processing Time:** {self.total_processing_time_seconds:.1f}s", "", "## Executive Summary", "", "| Category | Dataset | Docs | Best Pipeline | Score | Metric |", "|----------|---------|------|---------------|-------|--------|", ] for cat_name, cat_result in self.categories.items(): lines.append( f"| {cat_name.title()} | {cat_result.dataset} | {cat_result.documents_evaluated} | " f"{cat_result.best_pipeline} | {cat_result.best_score:.3f} | {cat_result.primary_metric} |" ) lines.extend([ "", "## Aggregate Score", "", f"**Weighted Score:** {self.aggregate.get('weighted_score', 0):.3f}", "", "**Weights:**", ]) for cat, weight in self.aggregate.get("weights", {}).items(): lines.append(f"- {cat.title()}: {weight:.0%}") lines.extend([ "", "## Recommendations", "", ]) for rec_type, pipeline in self.recommendations.items(): rec_label = rec_type.replace("_", " ").title() lines.append(f"- **{rec_label}:** {pipeline}") lines.extend([ "", "## Category Details", "", ]) for cat_name, cat_result in self.categories.items(): lines.extend([ f"### {cat_name.title()} ({cat_result.dataset})", "", f"Primary Metric: {cat_result.primary_metric}", "", "| Pipeline | F1 | Recall | Precision | WER | LCS |", "|----------|-----|--------|-----------|-----|-----|", ]) for pipeline in cat_result.pipelines: metrics = pipeline.get("metrics", {}) lines.append( f"| {pipeline['name']} | " f"{metrics.get('f1', 0):.3f} | " f"{metrics.get('recall', 0):.3f} | " f"{metrics.get('precision', 0):.3f} | " f"{metrics.get('wer', 0):.3f} | " f"{metrics.get('lcs_ratio', 0):.3f} |" ) lines.append("") path.parent.mkdir(parents=True, exist_ok=True) with open(path, "w", encoding="utf-8") as f: f.write("\n".join(lines))
[docs] def print_summary(self) -> None: """Print summary to console.""" print("\n" + "=" * 70) print(f"BIBLICUS DOCUMENT UNDERSTANDING BENCHMARK: {self.benchmark_name}") print("=" * 70) print(f"Timestamp: {self.timestamp}") print(f"Total Documents: {self.total_documents}") print(f"Processing Time: {self.total_processing_time_seconds:.1f}s") print() print("Category Results:") print("-" * 70) for cat_name, cat_result in self.categories.items(): print(f" {cat_name.title():12} | {cat_result.dataset:15} | " f"{cat_result.documents_evaluated:4} docs | " f"Best: {cat_result.best_pipeline} ({cat_result.best_score:.3f} {cat_result.primary_metric})") print() print(f"Aggregate Score: {self.aggregate.get('weighted_score', 0):.3f}") print() print("Recommendations:") for rec_type, pipeline in self.recommendations.items(): print(f" {rec_type.replace('_', ' ').title():20}: {pipeline}") print("=" * 70)
[docs] class BenchmarkRunner: """ Orchestrates multi-category benchmarking. Usage: config = BenchmarkConfig.load("configs/benchmark/standard.yaml") runner = BenchmarkRunner(config) results = runner.run_all() results.to_json(Path("results/benchmark.json")) """ def __init__(self, config: BenchmarkConfig): """ Initialize the benchmark runner. :param config: Benchmark configuration. :type config: BenchmarkConfig """ self.config = config
[docs] def run_all(self) -> BenchmarkResult: """ Run benchmark across all configured categories. :return: Complete benchmark results. :rtype: BenchmarkResult """ result = BenchmarkResult( benchmark_name=self.config.benchmark_name, timestamp=datetime.utcnow().isoformat() + "Z", ) start_time = time.time() for cat_name, cat_config in self.config.categories.items(): print(f"\n{'='*60}") print(f"Running {cat_name.upper()} benchmark ({cat_config.dataset})...") print(f"{'='*60}") try: cat_result = self.run_category(cat_config) result.categories[cat_name] = cat_result result.total_documents += cat_result.documents_evaluated except Exception as e: print(f" ERROR: Failed to run {cat_name} benchmark: {e}") continue result.total_processing_time_seconds = time.time() - start_time # Calculate aggregate score result.aggregate = self._calculate_aggregate(result.categories) # Generate recommendations result.recommendations = self._generate_recommendations(result.categories) return result
[docs] def run_category(self, cat_config: CategoryConfig) -> CategoryResult: """ Run benchmark for a single category. :param cat_config: Category configuration. :type cat_config: CategoryConfig :return: Category results. :rtype: CategoryResult """ start_time = time.time() # Open corpus if not cat_config.corpus_path.exists(): raise FileNotFoundError(f"Corpus not found: {cat_config.corpus_path}") corpus = Corpus.open(cat_config.corpus_path) # Get ground truth directory gt_dir = corpus.meta_dir / cat_config.ground_truth_subdir if not gt_dir.exists(): raise FileNotFoundError(f"Ground truth directory not found: {gt_dir}") # Initialize benchmark benchmark = OCRBenchmark(corpus) pipeline_results: List[Dict[str, Any]] = [] best_pipeline = "" best_score = 0.0 for pipeline_path in self.config.pipelines: pipeline_name = pipeline_path.stem print(f"\n Testing pipeline: {pipeline_name}") try: # Load pipeline config with open(pipeline_path, "r", encoding="utf-8") as f: pipeline_config = yaml.safe_load(f) # Run extraction extractor_id = pipeline_config.get("extractor_id", "pipeline") config = pipeline_config.get("config", {}) # Build extraction snapshot snapshot = corpus.extract(extractor_id=extractor_id, config=config) # Evaluate against ground truth report = benchmark.evaluate_extraction( snapshot_reference=snapshot.snapshot_id, ground_truth_dir=gt_dir, ) # Extract metrics metrics = { "f1": report.avg_f1, "recall": report.avg_recall, "precision": report.avg_precision, "wer": report.avg_word_error_rate, "lcs_ratio": report.avg_lcs_ratio, "bigram_overlap": report.avg_bigram_overlap, "sequence_accuracy": report.avg_sequence_accuracy, } pipeline_results.append({ "name": pipeline_name, "metrics": metrics, "documents_evaluated": report.total_documents, }) # Check if this is the best pipeline for primary metric primary_score = metrics.get(cat_config.primary_metric, metrics.get("f1", 0)) if primary_score > best_score: best_score = primary_score best_pipeline = pipeline_name print(f" {cat_config.primary_metric}: {primary_score:.3f}") except Exception as e: print(f" ERROR: {e}") continue processing_time = time.time() - start_time return CategoryResult( category_name=cat_config.name, dataset=cat_config.dataset, documents_evaluated=pipeline_results[0]["documents_evaluated"] if pipeline_results else 0, pipelines=pipeline_results, best_pipeline=best_pipeline, best_score=best_score, primary_metric=cat_config.primary_metric, primary_score=best_score, processing_time_seconds=processing_time, )
def _calculate_aggregate(self, categories: Dict[str, CategoryResult]) -> Dict[str, float]: """Calculate weighted aggregate score.""" weighted_sum = 0.0 total_weight = 0.0 weights = {} for cat_name, cat_result in categories.items(): weight = self.config.aggregate_weights.get(cat_name, 0.0) if weight > 0: weighted_sum += cat_result.best_score * weight total_weight += weight weights[cat_name] = weight weighted_score = weighted_sum / total_weight if total_weight > 0 else 0.0 return { "weighted_score": weighted_score, "weights": weights, } def _generate_recommendations(self, categories: Dict[str, CategoryResult]) -> Dict[str, str]: """Generate pipeline recommendations based on results.""" recommendations = {} # Best overall (highest aggregate across categories) pipeline_scores: Dict[str, List[float]] = {} for cat_result in categories.values(): for pipeline in cat_result.pipelines: name = pipeline["name"] if name not in pipeline_scores: pipeline_scores[name] = [] pipeline_scores[name].append(pipeline["metrics"].get("f1", 0)) if pipeline_scores: best_overall = max(pipeline_scores.items(), key=lambda x: sum(x[1]) / len(x[1])) recommendations["best_overall"] = best_overall[0] # Best for layout (highest LCS ratio) best_lcs = "" best_lcs_score = 0.0 for cat_result in categories.values(): for pipeline in cat_result.pipelines: lcs = pipeline["metrics"].get("lcs_ratio", 0) if lcs > best_lcs_score: best_lcs_score = lcs best_lcs = pipeline["name"] if best_lcs: recommendations["best_for_layout"] = best_lcs # Best for recall best_recall = "" best_recall_score = 0.0 for cat_result in categories.values(): for pipeline in cat_result.pipelines: recall = pipeline["metrics"].get("recall", 0) if recall > best_recall_score: best_recall_score = recall best_recall = pipeline["name"] if best_recall: recommendations["best_for_completeness"] = best_recall # Best for precision best_precision = "" best_precision_score = 0.0 for cat_result in categories.values(): for pipeline in cat_result.pipelines: precision = pipeline["metrics"].get("precision", 0) if precision > best_precision_score: best_precision_score = precision best_precision = pipeline["name"] if best_precision: recommendations["best_for_accuracy"] = best_precision return recommendations