Source code for biblicus.extraction

"""
Text extraction snapshots for Biblicus.
"""

from __future__ import annotations

import json
import os
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

from pydantic import BaseModel, ConfigDict, Field

from .corpus import Corpus
from .errors import ExtractionSnapshotFatalError
from .extractors import get_extractor
from .extractors.pipeline import PipelineExtractorConfig, PipelineStageSpec
from .models import CatalogItem, ExtractionStageOutput
from .retrieval import hash_text
from .time import utc_now_iso


[docs] class ExtractionConfigurationManifest(BaseModel): """ Reproducible configuration for an extraction plugin snapshot. :ivar configuration_id: Deterministic configuration identifier. :vartype configuration_id: str :ivar extractor_id: Extractor plugin identifier. :vartype extractor_id: str :ivar name: Human-readable configuration name. :vartype name: str :ivar created_at: International Organization for Standardization 8601 timestamp. :vartype created_at: str :ivar configuration: Extractor-specific configuration values. :vartype configuration: dict[str, Any] """ model_config = ConfigDict(extra="forbid") configuration_id: str extractor_id: str name: str created_at: str configuration: Dict[str, Any] = Field(default_factory=dict)
[docs] class ExtractionStageResult(BaseModel): """ Per-item result record for a single pipeline stage. :ivar stage_index: One-based pipeline stage index. :vartype stage_index: int :ivar extractor_id: Extractor identifier for the stage. :vartype extractor_id: str :ivar status: Stage status, extracted, skipped, or errored. :vartype status: str :ivar text_relpath: Relative path to the stage text artifact, when extracted. :vartype text_relpath: str or None :ivar text_characters: Character count of the extracted text. :vartype text_characters: int :ivar producer_extractor_id: Extractor identifier that produced the text content. :vartype producer_extractor_id: str or None :ivar source_stage_index: Optional stage index that supplied the text for selection-style extractors. :vartype source_stage_index: int or None :ivar confidence: Optional confidence score from 0.0 to 1.0. :vartype confidence: float or None :ivar metadata_relpath: Relative path to the stage metadata artifact, when present. :vartype metadata_relpath: str or None :ivar error_type: Optional error type name for errored stages. :vartype error_type: str or None :ivar error_message: Optional error message for errored stages. :vartype error_message: str or None """ model_config = ConfigDict(extra="forbid") stage_index: int = Field(ge=1) extractor_id: str status: str text_relpath: Optional[str] = None text_characters: int = Field(default=0, ge=0) producer_extractor_id: Optional[str] = None source_stage_index: Optional[int] = Field(default=None, ge=1) confidence: Optional[float] = Field(default=None, ge=0.0, le=1.0) metadata_relpath: Optional[str] = None error_type: Optional[str] = None error_message: Optional[str] = None
[docs] class ExtractionItemResult(BaseModel): """ Per-item result record for an extraction snapshot. :ivar item_id: Item identifier. :vartype item_id: str :ivar status: Final result status, extracted, skipped, or errored. :vartype status: str :ivar final_text_relpath: Relative path to the final extracted text artifact, when extracted. :vartype final_text_relpath: str or None :ivar final_metadata_relpath: Relative path to the final metadata artifact, when present. :vartype final_metadata_relpath: str or None :ivar final_stage_index: Pipeline stage index that produced the final text. :vartype final_stage_index: int or None :ivar final_stage_extractor_id: Extractor identifier of the stage that produced the final text. :vartype final_stage_extractor_id: str or None :ivar final_producer_extractor_id: Extractor identifier that produced the final text content. :vartype final_producer_extractor_id: str or None :ivar final_source_stage_index: Optional stage index that supplied the final text for selection-style extractors. :vartype final_source_stage_index: int or None :ivar error_type: Optional error type name when no extracted text was produced. :vartype error_type: str or None :ivar error_message: Optional error message when no extracted text was produced. :vartype error_message: str or None :ivar stage_results: Per-stage results recorded for this item. :vartype stage_results: list[ExtractionStageResult] """ model_config = ConfigDict(extra="forbid") item_id: str status: str final_text_relpath: Optional[str] = None final_metadata_relpath: Optional[str] = None final_stage_index: Optional[int] = Field(default=None, ge=1) final_stage_extractor_id: Optional[str] = None final_producer_extractor_id: Optional[str] = None final_source_stage_index: Optional[int] = Field(default=None, ge=1) error_type: Optional[str] = None error_message: Optional[str] = None stage_results: List[ExtractionStageResult] = Field(default_factory=list)
[docs] class ExtractionSnapshotManifest(BaseModel): """ Immutable record describing an extraction snapshot. :ivar snapshot_id: Unique snapshot identifier. :vartype snapshot_id: str :ivar configuration: Configuration manifest for this snapshot. :vartype configuration: ExtractionConfigurationManifest :ivar corpus_uri: Canonical uniform resource identifier for the corpus root. :vartype corpus_uri: str :ivar catalog_generated_at: Catalog timestamp used for the snapshot. :vartype catalog_generated_at: str :ivar created_at: International Organization for Standardization 8601 timestamp for snapshot creation. :vartype created_at: str :ivar items: Per-item results. :vartype items: list[ExtractionItemResult] :ivar stats: Snapshot statistics. :vartype stats: dict[str, Any] """ model_config = ConfigDict(extra="forbid") snapshot_id: str configuration: ExtractionConfigurationManifest corpus_uri: str catalog_generated_at: str created_at: str items: List[ExtractionItemResult] = Field(default_factory=list) stats: Dict[str, Any] = Field(default_factory=dict)
[docs] def create_extraction_configuration_manifest( *, extractor_id: str, name: str, configuration: Dict[str, Any] ) -> ExtractionConfigurationManifest: """ Create a deterministic extraction configuration manifest. :param extractor_id: Extractor plugin identifier. :type extractor_id: str :param name: Human configuration name. :type name: str :param configuration: Extractor configuration. :type configuration: dict[str, Any] :return: Configuration manifest. :rtype: ExtractionConfigurationManifest """ configuration_payload = json.dumps( {"extractor_id": extractor_id, "name": name, "configuration": configuration}, sort_keys=True, ) configuration_id = hash_text(configuration_payload) return ExtractionConfigurationManifest( configuration_id=configuration_id, extractor_id=extractor_id, name=name, created_at=utc_now_iso(), configuration=configuration, )
[docs] def create_extraction_snapshot_manifest( corpus: Corpus, *, configuration: ExtractionConfigurationManifest ) -> ExtractionSnapshotManifest: """ Create a new extraction snapshot manifest for a corpus. :param corpus: Corpus associated with the snapshot. :type corpus: Corpus :param configuration: Configuration manifest. :type configuration: ExtractionConfigurationManifest :return: Snapshot manifest. :rtype: ExtractionSnapshotManifest """ catalog = corpus.load_catalog() snapshot_id = hash_text(f"{configuration.configuration_id}:{catalog.generated_at}") return ExtractionSnapshotManifest( snapshot_id=snapshot_id, configuration=configuration, corpus_uri=corpus.uri, catalog_generated_at=catalog.generated_at, created_at=utc_now_iso(), items=[], stats={}, )
[docs] def write_extraction_snapshot_manifest( *, snapshot_dir: Path, manifest: ExtractionSnapshotManifest ) -> None: """ Persist an extraction snapshot manifest to a snapshot directory. :param snapshot_dir: Extraction snapshot directory. :type snapshot_dir: Path :param manifest: Snapshot manifest to write. :type manifest: ExtractionSnapshotManifest :return: None. :rtype: None """ manifest_path = snapshot_dir / "manifest.json" manifest_path.write_text(manifest.model_dump_json(indent=2) + "\n", encoding="utf-8")
[docs] def write_extraction_latest_pointer( *, extractor_dir: Path, manifest: ExtractionSnapshotManifest ) -> None: """ Persist the latest pointer for an extractor. :param extractor_dir: Extractor directory containing snapshots. :type extractor_dir: Path :param manifest: Snapshot manifest used for the pointer. :type manifest: ExtractionSnapshotManifest :return: None. :rtype: None """ latest_path = extractor_dir / "latest.json" latest_path.write_text( json.dumps( {"snapshot_id": manifest.snapshot_id, "created_at": manifest.created_at}, indent=2, ) + "\n", encoding="utf-8", )
def _ensure_extraction_alias_snapshot_dir( *, corpus: Corpus, stage_extractor_id: str, manifest: ExtractionSnapshotManifest, ) -> Path: snapshot_dir = corpus.extraction_snapshot_dir( extractor_id=stage_extractor_id, snapshot_id=manifest.snapshot_id ) snapshot_dir.mkdir(parents=True, exist_ok=True) write_extraction_snapshot_manifest(snapshot_dir=snapshot_dir, manifest=manifest) write_extraction_latest_pointer(extractor_dir=snapshot_dir.parent, manifest=manifest) alias_path = snapshot_dir / "alias.json" alias_path.write_text( json.dumps( { "source_extractor_id": manifest.configuration.extractor_id, "source_snapshot_id": manifest.snapshot_id, "stage_extractor_id": stage_extractor_id, "created_at": manifest.created_at, }, indent=2, ) + "\n", encoding="utf-8", ) return snapshot_dir def _write_alias_text_artifact(*, alias_snapshot_dir: Path, item: CatalogItem, text: str) -> str: text_dir = alias_snapshot_dir / "text" text_dir.mkdir(parents=True, exist_ok=True) relpath = str(Path("text") / f"{item.id}.txt") (alias_snapshot_dir / relpath).write_text(text, encoding="utf-8") return relpath def _write_alias_metadata_artifact( *, alias_snapshot_dir: Path, item: CatalogItem, metadata: Dict[str, Any] ) -> Optional[str]: if not metadata: return None metadata_dir = alias_snapshot_dir / "metadata" metadata_dir.mkdir(parents=True, exist_ok=True) relpath = str(Path("metadata") / f"{item.id}.json") (alias_snapshot_dir / relpath).write_text(json.dumps(metadata, indent=2), encoding="utf-8") return relpath
[docs] def write_extracted_text_artifact(*, snapshot_dir: Path, item: CatalogItem, text: str) -> str: """ Write an extracted text artifact for an item into the snapshot directory. :param snapshot_dir: Extraction snapshot directory. :type snapshot_dir: Path :param item: Catalog item being extracted. :type item: CatalogItem :param text: Extracted text. :type text: str :return: Relative path to the stored text artifact. :rtype: str """ text_dir = snapshot_dir / "text" text_dir.mkdir(parents=True, exist_ok=True) relpath = str(Path("text") / f"{item.id}.txt") path = snapshot_dir / relpath path.write_text(text, encoding="utf-8") return relpath
def _pipeline_stage_dir_name(*, stage_index: int, extractor_id: str) -> str: """ Build a stable directory name for a pipeline stage. :param stage_index: One-based pipeline stage index. :type stage_index: int :param extractor_id: Extractor identifier for the stage. :type extractor_id: str :return: Directory name for the stage. :rtype: str """ return f"{stage_index:02d}-{extractor_id}"
[docs] def write_pipeline_stage_text_artifact( *, snapshot_dir: Path, stage_index: int, extractor_id: str, item: CatalogItem, text: str, ) -> str: """ Write a pipeline stage text artifact for an item. :param snapshot_dir: Extraction snapshot directory. :type snapshot_dir: Path :param stage_index: One-based pipeline stage index. :type stage_index: int :param extractor_id: Extractor identifier for the stage. :type extractor_id: str :param item: Catalog item being extracted. :type item: CatalogItem :param text: Extracted text content. :type text: str :return: Relative path to the stored stage text artifact. :rtype: str """ stage_dir_name = _pipeline_stage_dir_name(stage_index=stage_index, extractor_id=extractor_id) text_dir = snapshot_dir / "stages" / stage_dir_name / "text" text_dir.mkdir(parents=True, exist_ok=True) relpath = str(Path("stages") / stage_dir_name / "text" / f"{item.id}.txt") (snapshot_dir / relpath).write_text(text, encoding="utf-8") return relpath
[docs] def write_extracted_metadata_artifact( *, snapshot_dir: Path, item: CatalogItem, metadata: Dict[str, Any] ) -> Optional[str]: """ Write an extracted metadata artifact for an item into the snapshot directory. :param snapshot_dir: Extraction snapshot directory. :type snapshot_dir: Path :param item: Catalog item being extracted. :type item: CatalogItem :param metadata: Metadata dictionary to persist. :type metadata: dict[str, Any] :return: Relative path to the stored metadata artifact, or None if empty. :rtype: str or None """ if not metadata: return None metadata_dir = snapshot_dir / "metadata" metadata_dir.mkdir(parents=True, exist_ok=True) relpath = str(Path("metadata") / f"{item.id}.json") path = snapshot_dir / relpath path.write_text(json.dumps(metadata, indent=2), encoding="utf-8") return relpath
[docs] def write_pipeline_stage_metadata_artifact( *, snapshot_dir: Path, stage_index: int, extractor_id: str, item: CatalogItem, metadata: Dict[str, Any], ) -> Optional[str]: """ Write a pipeline stage metadata artifact for an item. :param snapshot_dir: Extraction snapshot directory. :type snapshot_dir: Path :param stage_index: One-based pipeline stage index. :type stage_index: int :param extractor_id: Extractor identifier for the stage. :type extractor_id: str :param item: Catalog item being extracted. :type item: CatalogItem :param metadata: Metadata dictionary to persist. :type metadata: dict[str, Any] :return: Relative path to the stored stage metadata artifact, or None if empty. :rtype: str or None """ if not metadata: return None stage_dir_name = _pipeline_stage_dir_name(stage_index=stage_index, extractor_id=extractor_id) metadata_dir = snapshot_dir / "stages" / stage_dir_name / "metadata" metadata_dir.mkdir(parents=True, exist_ok=True) relpath = str(Path("stages") / stage_dir_name / "metadata" / f"{item.id}.json") (snapshot_dir / relpath).write_text(json.dumps(metadata, indent=2), encoding="utf-8") return relpath
def _final_output_from_stages( stage_outputs: List[ExtractionStageOutput], ) -> Optional[ExtractionStageOutput]: """ Select the final pipeline output for an item. The final output is the last extracted stage output in pipeline order. :param stage_outputs: Extracted outputs produced by pipeline stages. :type stage_outputs: list[biblicus.models.ExtractionStageOutput] :return: Final stage output or None when no stages produced extracted text. :rtype: biblicus.models.ExtractionStageOutput or None """ if not stage_outputs: return None return stage_outputs[-1]
[docs] def build_extraction_snapshot( corpus: Corpus, *, extractor_id: str, configuration_name: str, configuration: Dict[str, Any], force: bool = False, max_workers: int = 1, ) -> ExtractionSnapshotManifest: """ Build an extraction snapshot for a corpus using the pipeline extractor. :param corpus: Corpus to extract from. :type corpus: Corpus :param extractor_id: Extractor plugin identifier (must be ``pipeline``). :type extractor_id: str :param configuration_name: Human-readable configuration name. :type configuration_name: str :param configuration: Extractor configuration mapping. :type configuration: dict[str, Any] :param force: Whether to reprocess items even if artifacts already exist. :type force: bool :param max_workers: Maximum number of concurrent workers. :type max_workers: int :return: Extraction snapshot manifest describing the build. :rtype: ExtractionSnapshotManifest :raises KeyError: If the extractor identifier is unknown. :raises ValueError: If the extractor configuration is invalid. :raises OSError: If the snapshot directory or artifacts cannot be written. :raises ExtractionSnapshotFatalError: If the extractor is not the pipeline. """ if max_workers < 1: raise ValueError("max_workers must be at least 1") extractor = get_extractor(extractor_id) parsed_config = extractor.validate_config(configuration) config_manifest = create_extraction_configuration_manifest( extractor_id=extractor_id, name=configuration_name, configuration=parsed_config.model_dump(), ) manifest = create_extraction_snapshot_manifest(corpus, configuration=config_manifest) snapshot_dir = corpus.extraction_snapshot_dir( extractor_id=extractor_id, snapshot_id=manifest.snapshot_id ) if snapshot_dir.exists(): try: manifest = corpus.load_extraction_snapshot_manifest( extractor_id=extractor_id, snapshot_id=manifest.snapshot_id ) except FileNotFoundError: manifest_path = snapshot_dir / "manifest.json" if manifest_path.exists(): raise else: snapshot_dir.mkdir(parents=True, exist_ok=False) catalog = corpus.load_catalog() if extractor_id != "pipeline": raise ExtractionSnapshotFatalError("Extraction snapshots must use the pipeline extractor") pipeline_config = ( parsed_config if isinstance(parsed_config, PipelineExtractorConfig) else PipelineExtractorConfig.model_validate(parsed_config) ) validated_stages: List[Tuple[PipelineStageSpec, BaseModel]] = [] for stage in pipeline_config.stages: stage_extractor = get_extractor(stage.extractor_id) parsed_stage_config = stage_extractor.validate_config(stage.configuration) validated_stages.append((stage, parsed_stage_config)) previous_items = {item.item_id: item for item in (manifest.items or [])} extracted_items: List[ExtractionItemResult] = [] extracted_count = 0 skipped_count = 0 errored_count = 0 extracted_nonempty_count = 0 extracted_empty_count = 0 already_text_item_count = 0 needs_extraction_item_count = 0 converted_item_count = 0 total_item_count = len(catalog.items) if total_item_count <= 25: log_interval = 1 elif total_item_count <= 100: log_interval = 10 else: log_interval = 25 start_time = time.perf_counter() processed_count = 0 print( f"[extract] building snapshot {manifest.snapshot_id} items={total_item_count} " f"workers={max_workers}", flush=True, file=sys.stderr, ) def _write_partial_manifest() -> None: stats = { "total_items": total_item_count, "already_text_items": already_text_item_count, "needs_extraction_items": needs_extraction_item_count, "extracted_items": extracted_count, "extracted_nonempty_items": extracted_nonempty_count, "extracted_empty_items": extracted_empty_count, "skipped_items": skipped_count, "errored_items": errored_count, "converted_items": converted_item_count, } partial_manifest = manifest.model_copy(update={"items": extracted_items, "stats": stats}) write_extraction_snapshot_manifest(snapshot_dir=snapshot_dir, manifest=partial_manifest) lock = threading.Lock() progress_lock = threading.Lock() current_item_id: Optional[str] = None current_stage_label: Optional[str] = None stop_event = threading.Event() def _heartbeat() -> None: while not stop_event.wait(30): with progress_lock: active_item = current_item_id active_stage = current_stage_label processed = processed_count elapsed = time.perf_counter() - start_time print( f"[extract] heartbeat processed={processed}/{total_item_count} " f"active_item={active_item or 'none'} stage={active_stage or 'none'} " f"elapsed={elapsed:.1f}s", flush=True, file=sys.stderr, ) heartbeat_thread = threading.Thread(target=_heartbeat, daemon=True) heartbeat_thread.start() def _load_stage_cache( *, stage_index: int, extractor_id: str, item: CatalogItem ) -> Optional[Tuple[ExtractionStageResult, ExtractionStageOutput]]: stage_dir_name = _pipeline_stage_dir_name( stage_index=stage_index, extractor_id=extractor_id ) text_relpath = str(Path("stages") / stage_dir_name / "text" / f"{item.id}.txt") text_path = snapshot_dir / text_relpath if not text_path.is_file(): legacy_text_path = next( ( path for path in snapshot_dir.parent.glob( f"*/stages/{stage_dir_name}/text/{item.id}.txt" ) if path.is_file() ), None, ) if legacy_text_path is None: return None text_value = legacy_text_path.read_text(encoding="utf-8") text_path.parent.mkdir(parents=True, exist_ok=True) text_path.write_text(text_value, encoding="utf-8") else: text_value = text_path.read_text(encoding="utf-8") metadata_relpath = str(Path("stages") / stage_dir_name / "metadata" / f"{item.id}.json") metadata_path = snapshot_dir / metadata_relpath metadata_value: Dict[str, Any] = {} if metadata_path.is_file(): metadata_value = json.loads(metadata_path.read_text(encoding="utf-8")) else: legacy_metadata_path = next( ( path for path in snapshot_dir.parent.glob( f"*/stages/{stage_dir_name}/metadata/{item.id}.json" ) if path.is_file() ), None, ) if legacy_metadata_path is not None: metadata_value = json.loads(legacy_metadata_path.read_text(encoding="utf-8")) metadata_path.parent.mkdir(parents=True, exist_ok=True) metadata_path.write_text( legacy_metadata_path.read_text(encoding="utf-8"), encoding="utf-8" ) stage_result = ExtractionStageResult( stage_index=stage_index, extractor_id=extractor_id, status="extracted", text_relpath=text_relpath, text_characters=len(text_value), producer_extractor_id=extractor_id, source_stage_index=None, confidence=None, metadata_relpath=metadata_relpath if metadata_path.is_file() else None, error_type=None, error_message=None, ) stage_output = ExtractionStageOutput( stage_index=stage_index, extractor_id=extractor_id, status="extracted", text=text_value, text_characters=len(text_value), producer_extractor_id=extractor_id, source_stage_index=None, confidence=None, metadata=metadata_value, error_type=None, error_message=None, ) return stage_result, stage_output def _build_item_result(item: CatalogItem) -> Tuple[ExtractionItemResult, Dict[str, int]]: nonlocal current_item_id nonlocal current_stage_label media_type = item.media_type item_is_text = media_type == "text/markdown" or media_type.startswith("text/") stats_delta = { "already_text_items": 1 if item_is_text else 0, "needs_extraction_items": 0 if item_is_text else 1, "extracted_items": 0, "extracted_nonempty_items": 0, "extracted_empty_items": 0, "skipped_items": 0, "errored_items": 0, "converted_items": 0, } with progress_lock: current_item_id = item.id current_stage_label = "prepare" final_text_relpath = str(Path("text") / f"{item.id}.txt") final_metadata_relpath = str(Path("metadata") / f"{item.id}.json") final_text_path = snapshot_dir / final_text_relpath if not force and not final_text_path.is_file(): legacy_text_path = next( ( path for path in snapshot_dir.parent.glob(f"*/text/{item.id}.txt") if path.is_file() ), None, ) if legacy_text_path is not None: legacy_text = legacy_text_path.read_text(encoding="utf-8") final_text_path.parent.mkdir(parents=True, exist_ok=True) final_text_path.write_text(legacy_text, encoding="utf-8") legacy_metadata_path = ( legacy_text_path.parent.parent / "metadata" / f"{item.id}.json" ) if legacy_metadata_path.is_file(): legacy_metadata_dest = snapshot_dir / final_metadata_relpath legacy_metadata_dest.parent.mkdir(parents=True, exist_ok=True) legacy_metadata_dest.write_text( legacy_metadata_path.read_text(encoding="utf-8"), encoding="utf-8", ) if not force and final_text_path.is_file(): final_text_value = final_text_path.read_text(encoding="utf-8") cached_item = previous_items.get(item.id) if cached_item and cached_item.final_stage_extractor_id: alias_snapshot_dir = _ensure_extraction_alias_snapshot_dir( corpus=corpus, stage_extractor_id=cached_item.final_stage_extractor_id, manifest=manifest, ) _write_alias_text_artifact( alias_snapshot_dir=alias_snapshot_dir, item=item, text=final_text_value, ) metadata_value: Dict[str, Any] = {} metadata_path = snapshot_dir / final_metadata_relpath if metadata_path.is_file(): metadata_value = json.loads(metadata_path.read_text(encoding="utf-8")) _write_alias_metadata_artifact( alias_snapshot_dir=alias_snapshot_dir, item=item, metadata=metadata_value, ) stats_delta["extracted_items"] = 1 if final_text_value.strip(): stats_delta["extracted_nonempty_items"] = 1 if not item_is_text: stats_delta["converted_items"] = 1 else: stats_delta["extracted_empty_items"] = 1 if cached_item is not None: return cached_item, stats_delta return ( ExtractionItemResult( item_id=item.id, status="extracted", final_text_relpath=final_text_relpath, final_metadata_relpath=( final_metadata_relpath if (snapshot_dir / final_metadata_relpath).is_file() else None ), final_stage_index=None, final_stage_extractor_id=None, final_producer_extractor_id=None, final_source_stage_index=None, error_type=None, error_message=None, stage_results=[], ), stats_delta, ) stage_results: List[ExtractionStageResult] = [] stage_outputs: List[ExtractionStageOutput] = [] last_error_type: Optional[str] = None last_error_message: Optional[str] = None for stage_index, (stage, parsed_stage_config) in enumerate(validated_stages, start=1): with progress_lock: current_stage_label = f"{stage.extractor_id}:{stage_index}" if not force: cached = _load_stage_cache( stage_index=stage_index, extractor_id=stage.extractor_id, item=item, ) if cached: with progress_lock: current_stage_label = f"{stage.extractor_id}:{stage_index}:cache" cached_result, cached_output = cached stage_results.append(cached_result) stage_outputs.append(cached_output) continue try: stage_extractor = get_extractor(stage.extractor_id) extracted_text = stage_extractor.extract_text( corpus=corpus, item=item, config=parsed_stage_config, previous_extractions=stage_outputs, ) except Exception as extraction_error: if isinstance(extraction_error, ExtractionSnapshotFatalError): raise last_error_type = extraction_error.__class__.__name__ last_error_message = str(extraction_error) stage_results.append( ExtractionStageResult( stage_index=stage_index, extractor_id=stage.extractor_id, status="errored", text_relpath=None, text_characters=0, producer_extractor_id=None, source_stage_index=None, error_type=last_error_type, error_message=last_error_message, ) ) continue if extracted_text is None: stage_results.append( ExtractionStageResult( stage_index=stage_index, extractor_id=stage.extractor_id, status="skipped", text_relpath=None, text_characters=0, producer_extractor_id=None, source_stage_index=None, error_type=None, error_message=None, ) ) continue relpath = write_pipeline_stage_text_artifact( snapshot_dir=snapshot_dir, stage_index=stage_index, extractor_id=stage.extractor_id, item=item, text=extracted_text.text, ) metadata_relpath = write_pipeline_stage_metadata_artifact( snapshot_dir=snapshot_dir, stage_index=stage_index, extractor_id=stage.extractor_id, item=item, metadata=extracted_text.metadata, ) text_characters = len(extracted_text.text) stage_results.append( ExtractionStageResult( stage_index=stage_index, extractor_id=stage.extractor_id, status="extracted", text_relpath=relpath, text_characters=text_characters, producer_extractor_id=extracted_text.producer_extractor_id, source_stage_index=extracted_text.source_stage_index, confidence=extracted_text.confidence, metadata_relpath=metadata_relpath, error_type=None, error_message=None, ) ) stage_outputs.append( ExtractionStageOutput( stage_index=stage_index, extractor_id=stage.extractor_id, status="extracted", text=extracted_text.text, text_characters=text_characters, producer_extractor_id=extracted_text.producer_extractor_id, source_stage_index=extracted_text.source_stage_index, confidence=extracted_text.confidence, metadata=extracted_text.metadata, error_type=None, error_message=None, ) ) final_output = _final_output_from_stages(stage_outputs) if final_output is None: status = "errored" if last_error_type else "skipped" if status == "errored": stats_delta["errored_items"] = 1 else: stats_delta["skipped_items"] = 1 return ( ExtractionItemResult( item_id=item.id, status=status, final_text_relpath=None, final_metadata_relpath=None, final_stage_index=None, final_stage_extractor_id=None, final_producer_extractor_id=None, final_source_stage_index=None, error_type=last_error_type if status == "errored" else None, error_message=last_error_message if status == "errored" else None, stage_results=stage_results, ), stats_delta, ) final_text = final_output.text or "" final_text_relpath = write_extracted_text_artifact( snapshot_dir=snapshot_dir, item=item, text=final_text ) final_metadata_relpath = write_extracted_metadata_artifact( snapshot_dir=snapshot_dir, item=item, metadata=final_output.metadata ) alias_snapshot_dir = _ensure_extraction_alias_snapshot_dir( corpus=corpus, stage_extractor_id=final_output.extractor_id, manifest=manifest, ) _write_alias_text_artifact( alias_snapshot_dir=alias_snapshot_dir, item=item, text=final_text, ) _write_alias_metadata_artifact( alias_snapshot_dir=alias_snapshot_dir, item=item, metadata=final_output.metadata, ) stats_delta["extracted_items"] = 1 if final_text.strip(): stats_delta["extracted_nonempty_items"] = 1 if not item_is_text: stats_delta["converted_items"] = 1 else: stats_delta["extracted_empty_items"] = 1 return ( ExtractionItemResult( item_id=item.id, status="extracted", final_text_relpath=final_text_relpath, final_metadata_relpath=final_metadata_relpath, final_stage_index=final_output.stage_index, final_stage_extractor_id=final_output.extractor_id, final_producer_extractor_id=final_output.producer_extractor_id, final_source_stage_index=final_output.source_stage_index, error_type=None, error_message=None, stage_results=stage_results, ), stats_delta, ) def _apply_result(item_result: ExtractionItemResult, stats_delta: Dict[str, int]) -> None: nonlocal extracted_count nonlocal skipped_count nonlocal errored_count nonlocal extracted_nonempty_count nonlocal extracted_empty_count nonlocal already_text_item_count nonlocal needs_extraction_item_count nonlocal converted_item_count nonlocal processed_count extracted_items.append(item_result) extracted_count += stats_delta["extracted_items"] skipped_count += stats_delta["skipped_items"] errored_count += stats_delta["errored_items"] extracted_nonempty_count += stats_delta["extracted_nonempty_items"] extracted_empty_count += stats_delta["extracted_empty_items"] already_text_item_count += stats_delta["already_text_items"] needs_extraction_item_count += stats_delta["needs_extraction_items"] converted_item_count += stats_delta["converted_items"] processed_count += 1 if processed_count % log_interval == 0 or processed_count == total_item_count: elapsed = time.perf_counter() - start_time rate = processed_count / elapsed if elapsed > 0 else 0.0 print( f"[extract] processed {processed_count}/{total_item_count} " f"extracted={extracted_count} skipped={skipped_count} errored={errored_count} " f"elapsed={elapsed:.1f}s rate={rate:.2f}/s", flush=True, file=sys.stderr, ) _write_partial_manifest() def _process_and_record(item: CatalogItem) -> None: item_result, stats_delta = _build_item_result(item) with lock: _apply_result(item_result, stats_delta) if max_workers == 1: try: for item in catalog.items.values(): _process_and_record(item) finally: stop_event.set() heartbeat_thread.join(timeout=1) else: try: with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [ executor.submit(_build_item_result, item) for item in catalog.items.values() ] for future in as_completed(futures): item_result, stats_delta = future.result() with lock: _apply_result(item_result, stats_delta) finally: stop_event.set() heartbeat_thread.join(timeout=1) stats = { "total_items": total_item_count, "already_text_items": already_text_item_count, "needs_extraction_items": needs_extraction_item_count, "extracted_items": extracted_count, "extracted_nonempty_items": extracted_nonempty_count, "extracted_empty_items": extracted_empty_count, "skipped_items": skipped_count, "errored_items": errored_count, "converted_items": converted_item_count, } manifest = manifest.model_copy(update={"items": extracted_items, "stats": stats}) write_extraction_snapshot_manifest(snapshot_dir=snapshot_dir, manifest=manifest) write_extraction_latest_pointer(extractor_dir=snapshot_dir.parent, manifest=manifest) # Auto-sync catalog to Amplify if configured if os.getenv("AMPLIFY_AUTO_SYNC_CATALOG", "false").lower() == "true": try: from .sync.amplify_publisher import AmplifyPublisher publisher = AmplifyPublisher(corpus.name) catalog_path = corpus.root / "catalog.json" if catalog_path.exists(): result = publisher.sync_catalog(catalog_path, force=False) if not result.skipped: print( f"✓ Synced catalog: {result.created} created, {result.updated} updated, {result.deleted} deleted", file=sys.stderr, ) except ImportError: # AmplifyPublisher not available, skip sync pass except Exception as e: # Don't fail extraction if sync fails print(f"Warning: Catalog sync failed: {e}", file=sys.stderr) return manifest
[docs] def load_or_build_extraction_snapshot( corpus: Corpus, *, extractor_id: str, configuration_name: str, configuration: Dict[str, Any], max_workers: int = 1, ) -> ExtractionSnapshotManifest: """ Load an extraction snapshot if it exists or build it when missing. :param corpus: Corpus to extract from. :type corpus: Corpus :param extractor_id: Extractor plugin identifier (must be ``pipeline``). :type extractor_id: str :param configuration_name: Human-readable configuration name. :type configuration_name: str :param configuration: Extractor configuration mapping. :type configuration: dict[str, Any] :param max_workers: Maximum number of concurrent workers. :type max_workers: int :return: Extraction snapshot manifest describing the build. :rtype: ExtractionSnapshotManifest """ configuration_manifest = create_extraction_configuration_manifest( extractor_id=extractor_id, name=configuration_name, configuration=configuration, ) snapshot_manifest = create_extraction_snapshot_manifest( corpus, configuration=configuration_manifest, ) snapshot_dir = corpus.extraction_snapshot_dir( extractor_id=extractor_id, snapshot_id=snapshot_manifest.snapshot_id, ) manifest_path = snapshot_dir / "manifest.json" if manifest_path.is_file(): print( f"[extract] reusing snapshot {snapshot_manifest.snapshot_id}", flush=True, file=sys.stderr, ) return corpus.load_extraction_snapshot_manifest( extractor_id=extractor_id, snapshot_id=snapshot_manifest.snapshot_id, ) print( f"[extract] building snapshot {snapshot_manifest.snapshot_id}", flush=True, file=sys.stderr, ) return build_extraction_snapshot( corpus, extractor_id=extractor_id, configuration_name=configuration_name, configuration=configuration, max_workers=max_workers, )