"""
Corpus storage and ingestion for Biblicus.
"""
from __future__ import annotations
import hashlib
import json
import mimetypes
import shutil
import uuid
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Sequence
from urllib.parse import quote, unquote, urlparse
import yaml
from pydantic import ValidationError
from .constants import (
ANALYSIS_DIR_NAME,
CORPUS_DIR_NAME,
DEFAULT_RAW_DIR,
EXTRACTED_DIR_NAME,
GRAPH_DIR_NAME,
LEGACY_CORPUS_DIR_NAME,
RETRIEVAL_DIR_NAME,
SCHEMA_VERSION,
SIDECAR_SUFFIX,
)
from .errors import IngestCollisionError
from .frontmatter import parse_front_matter, render_front_matter
from .hook_manager import HookManager
from .hooks import HookPoint
from .ignore import load_corpus_ignore_spec
from .models import (
CatalogItem,
CorpusCatalog,
CorpusConfig,
ExtractionSnapshotListEntry,
ExtractionSnapshotReference,
IngestResult,
RemoteSourcePullResult,
RetrievalSnapshot,
)
from .remote_sources import AzureBlobRemoteSource, S3RemoteSource
from .sources import _media_type_from_filename, load_source
from .time import utc_now_iso
from .uris import corpus_ref_to_path, normalize_corpus_uri
from .user_config import resolve_source_profile
def _sha256_bytes(data: bytes) -> str:
"""
Compute a Secure Hash Algorithm 256 digest for byte content.
:param data: Input bytes.
:type data: bytes
:return: Secure Hash Algorithm 256 hex digest.
:rtype: str
"""
return hashlib.sha256(data).hexdigest()
def _write_stream_and_hash(
stream, destination_path: Path, *, chunk_size: int = 1024 * 1024
) -> Dict[str, object]:
"""
Write a binary stream to disk while computing a digest.
:param stream: Binary stream to read from.
:type stream: object
:param destination_path: Destination path to write to.
:type destination_path: Path
:param chunk_size: Chunk size for reads.
:type chunk_size: int
:return: Mapping containing sha256 and bytes_written.
:rtype: dict[str, object]
:raises OSError: If the destination cannot be written.
"""
hasher = hashlib.sha256()
bytes_written = 0
with destination_path.open("wb") as destination_handle:
while True:
chunk = stream.read(chunk_size)
if not chunk:
break
hasher.update(chunk)
destination_handle.write(chunk)
bytes_written += len(chunk)
return {"sha256": hasher.hexdigest(), "bytes_written": bytes_written}
def _sanitize_filename(name: str) -> str:
"""
Sanitize a filename into a portable, filesystem-friendly form.
:param name: Raw filename.
:type name: str
:return: Sanitized filename.
:rtype: str
"""
allowed_characters = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-._() ")
sanitized_name = "".join(
(character if character in allowed_characters else "_") for character in name
).strip()
return sanitized_name or "file"
def _preferred_extension_for_media_type(media_type: str) -> Optional[str]:
"""
Return a preferred filename extension for a media type.
:param media_type: Internet Assigned Numbers Authority media type.
:type media_type: str
:return: Preferred extension or None.
:rtype: str or None
"""
media_type_overrides = {
"image/jpeg": ".jpg",
"audio/mpeg": ".mp3",
"audio/ogg": ".ogg",
"audio/wav": ".wav",
"audio/x-wav": ".wav",
}
if media_type in media_type_overrides:
return media_type_overrides[media_type]
return mimetypes.guess_extension(media_type)
def _ensure_filename_extension(filename: str, *, media_type: str) -> str:
"""
Ensure a usable filename extension for a media type.
:param filename: Raw filename.
:type filename: str
:param media_type: Internet Assigned Numbers Authority media type.
:type media_type: str
:return: Filename with a compatible extension.
:rtype: str
"""
raw_name = filename.strip()
if media_type == "text/markdown":
if raw_name.lower().endswith((".md", ".markdown")):
return raw_name
return raw_name + ".md"
if Path(raw_name).suffix:
if "%2F" in raw_name or "%3A" in raw_name:
decoded = unquote(raw_name)
parsed = urlparse(decoded)
decoded_path = parsed.path if parsed.scheme else decoded
if not Path(decoded_path).suffix:
pass
else:
return raw_name
else:
return raw_name
ext = _preferred_extension_for_media_type(media_type)
if not ext:
return raw_name
return raw_name + ext
def _encode_source_uri_for_filename(source_uri: str) -> str:
"""
Percent-encode a source uniform resource identifier for filename use.
:param source_uri: Source uniform resource identifier to encode.
:type source_uri: str
:return: Percent-encoded uniform resource identifier safe for filenames.
:rtype: str
"""
return quote(source_uri, safe="")
def _storage_filename_for_ingest(
*, filename: Optional[str], media_type: str, source_uri: Optional[str]
) -> str:
"""
Derive a collision-safe filename for corpus storage.
If a source uniform resource identifier is provided, the full uniform resource identifier is
percent-encoded to namespace the stored file, preventing collisions between identical basenames
from different sources. When no uniform resource identifier is available, fall back to a
sanitized filename.
:param filename: Optional filename hint from the caller.
:type filename: str or None
:param media_type: Media type of the payload.
:type media_type: str
:param source_uri: Optional source uniform resource identifier for provenance.
:type source_uri: str or None
:return: Storage filename with an appropriate extension, or an empty string when no hint exists.
:rtype: str
"""
base_name = ""
if source_uri:
base_name = _encode_source_uri_for_filename(source_uri)
if filename and not source_uri.startswith("file:"):
sanitized = _sanitize_filename(filename)
if sanitized:
base_name = f"{base_name}--{sanitized}"
if not base_name and filename:
base_name = _sanitize_filename(filename)
if not base_name:
return ""
if len(base_name) > 180:
digest = hashlib.sha256(base_name.encode("utf-8")).hexdigest()
base_name = f"hash-{digest}"
return _ensure_filename_extension(base_name, media_type=media_type)
def _merge_tags(explicit: Sequence[str], from_frontmatter: Any) -> List[str]:
"""
Merge tags from explicit input and front matter values.
:param explicit: Explicit tags provided by callers.
:type explicit: Sequence[str]
:param from_frontmatter: Tags from front matter.
:type from_frontmatter: Any
:return: Deduplicated tag list preserving order.
:rtype: list[str]
"""
merged_tags: List[str] = []
for explicit_tag in explicit:
cleaned_tag = explicit_tag.strip()
if cleaned_tag:
merged_tags.append(cleaned_tag)
if isinstance(from_frontmatter, str):
merged_tags.append(from_frontmatter)
elif isinstance(from_frontmatter, list):
for item in from_frontmatter:
if isinstance(item, str) and item.strip():
merged_tags.append(item.strip())
seen_tags = set()
deduplicated_tags: List[str] = []
for tag_value in merged_tags:
if tag_value not in seen_tags:
seen_tags.add(tag_value)
deduplicated_tags.append(tag_value)
return deduplicated_tags
def _sidecar_path_for(content_path: Path) -> Path:
"""
Compute the sidecar metadata path for a content file.
:param content_path: Path to the content file.
:type content_path: Path
:return: Sidecar path.
:rtype: Path
"""
return content_path.with_name(content_path.name + SIDECAR_SUFFIX)
def _load_sidecar(content_path: Path) -> Dict[str, Any]:
"""
Load sidecar metadata for a content file.
:param content_path: Path to the content file.
:type content_path: Path
:return: Parsed sidecar metadata.
:rtype: dict[str, Any]
:raises ValueError: If the sidecar content is not a mapping.
"""
path = _sidecar_path_for(content_path)
if not path.is_file():
return {}
data = yaml.safe_load(path.read_text(encoding="utf-8")) or {}
if not isinstance(data, dict):
raise ValueError(f"Sidecar metadata must be a mapping/object: {path}")
return dict(data)
def _write_sidecar(content_path: Path, metadata: Dict[str, Any]) -> None:
"""
Write a sidecar metadata file.
:param content_path: Path to the content file.
:type content_path: Path
:param metadata: Metadata to serialize.
:type metadata: dict[str, Any]
:return: None.
:rtype: None
"""
path = _sidecar_path_for(content_path)
text = yaml.safe_dump(
metadata,
sort_keys=False,
allow_unicode=True,
default_flow_style=False,
).strip()
path.write_text(text + "\n", encoding="utf-8")
def _ensure_biblicus_block(
metadata: Dict[str, Any], *, item_id: str, source_uri: str
) -> Dict[str, Any]:
"""
Ensure the biblicus metadata block exists and is populated.
:param metadata: Existing metadata.
:type metadata: dict[str, Any]
:param item_id: Item identifier to store.
:type item_id: str
:param source_uri: Source uniform resource identifier to store.
:type source_uri: str
:return: Updated metadata mapping.
:rtype: dict[str, Any]
"""
updated_metadata = dict(metadata)
existing_biblicus = updated_metadata.get("biblicus")
if not isinstance(existing_biblicus, dict):
existing_biblicus = {}
biblicus_block = dict(existing_biblicus)
biblicus_block["id"] = item_id
biblicus_block["source"] = source_uri
updated_metadata["biblicus"] = biblicus_block
return updated_metadata
def _update_biblicus_block(metadata: Dict[str, Any], updates: Dict[str, Any]) -> Dict[str, Any]:
updated_metadata = dict(metadata)
existing_biblicus = updated_metadata.get("biblicus")
if not isinstance(existing_biblicus, dict):
existing_biblicus = {}
biblicus_block = dict(existing_biblicus)
for key, value in updates.items():
if value is not None:
biblicus_block[key] = value
updated_metadata["biblicus"] = biblicus_block
return updated_metadata
def _parse_uuid_prefix(filename: str) -> Optional[str]:
"""
Extract a universally unique identifier prefix from a filename, if present.
:param filename: Filename to inspect.
:type filename: str
:return: Universally unique identifier string or None.
:rtype: str or None
"""
if len(filename) < 36:
return None
prefix = filename[:36]
try:
return str(uuid.UUID(prefix))
except ValueError:
return None
def _merge_metadata(front: Dict[str, Any], side: Dict[str, Any]) -> Dict[str, Any]:
"""
Merge front matter and sidecar metadata.
:param front: Front matter metadata.
:type front: dict[str, Any]
:param side: Sidecar metadata.
:type side: dict[str, Any]
:return: Merged metadata.
:rtype: dict[str, Any]
"""
merged_metadata: Dict[str, Any] = dict(front)
front_biblicus = merged_metadata.get("biblicus")
sidecar_biblicus = side.get("biblicus")
if isinstance(front_biblicus, dict) or isinstance(sidecar_biblicus, dict):
merged_biblicus: Dict[str, Any] = {}
if isinstance(front_biblicus, dict):
merged_biblicus.update(front_biblicus)
if isinstance(sidecar_biblicus, dict):
merged_biblicus.update(sidecar_biblicus)
merged_metadata["biblicus"] = merged_biblicus
merged_tags = _merge_tags(_merge_tags([], front.get("tags")), side.get("tags"))
if merged_tags:
merged_metadata["tags"] = merged_tags
for metadata_key, metadata_value in side.items():
if metadata_key in {"biblicus", "tags"}:
continue
if metadata_key in merged_metadata:
continue
merged_metadata[metadata_key] = metadata_value
return merged_metadata
[docs]
class Corpus:
"""
Local corpus manager for Biblicus.
:ivar root: Corpus root directory.
:vartype root: Path
:ivar meta_dir: Metadata directory under the corpus root.
:vartype meta_dir: Path
:ivar raw_dir: Raw item directory under the corpus root.
:vartype raw_dir: Path
:ivar config: Parsed corpus config, if present.
:vartype config: CorpusConfig or None
"""
def __init__(self, root: Path):
"""
Initialize a corpus wrapper around a filesystem path.
:param root: Corpus root directory.
:type root: Path
"""
self.root = root
self.meta_dir = self.root / CORPUS_DIR_NAME
self.config = self._load_config()
self.raw_dir = self._resolve_raw_dir()
self._hooks = self._load_hooks()
def _ensure_local_ingest_allowed(self) -> None:
if self.config is not None and self.config.source is not None:
raise ValueError(
"Local ingest is disabled because this corpus is backed by a remote source. "
"Use `biblicus source pull` to refresh the corpus."
)
def _resolve_raw_dir(self) -> Path:
"""
Resolve the raw directory path for the corpus.
:return: Raw directory path.
:rtype: Path
"""
raw_dir = DEFAULT_RAW_DIR
if self.config is not None and isinstance(self.config.raw_dir, str):
raw_dir = self.config.raw_dir.strip() or DEFAULT_RAW_DIR
if raw_dir == ".":
return self.root
return self.root / raw_dir
@property
def extracted_dir(self) -> Path:
"""
Location of extraction artifacts for the corpus.
:return: Extracted artifacts directory.
:rtype: Path
"""
return self.root / EXTRACTED_DIR_NAME
@property
def graph_dir(self) -> Path:
"""
Location of graph artifacts for the corpus.
:return: Graph artifacts directory.
:rtype: Path
"""
return self.root / GRAPH_DIR_NAME
@property
def retrieval_dir(self) -> Path:
"""
Location of retrieval artifacts for the corpus.
:return: Retrieval artifacts directory.
:rtype: Path
"""
return self.root / RETRIEVAL_DIR_NAME
@property
def analysis_dir(self) -> Path:
"""
Location of analysis artifacts for the corpus.
:return: Analysis artifacts directory.
:rtype: Path
"""
return self.root / ANALYSIS_DIR_NAME
@property
def uri(self) -> str:
"""
Return the canonical uniform resource identifier for the corpus root.
:return: Corpus uniform resource identifier.
:rtype: str
"""
return self.root.as_uri()
def _load_config(self) -> Optional[CorpusConfig]:
"""
Load the corpus config if it exists.
:return: Parsed corpus config or None.
:rtype: CorpusConfig or None
:raises ValueError: If the config schema is invalid.
"""
path = self.meta_dir / "config.json"
if not path.is_file():
legacy_path = self.root / LEGACY_CORPUS_DIR_NAME / "config.json"
if not legacy_path.is_file():
return None
path = legacy_path
data = json.loads(path.read_text(encoding="utf-8"))
try:
return CorpusConfig.model_validate(data)
except ValidationError as exc:
has_hook_error = any(
isinstance(error.get("loc"), tuple)
and error.get("loc")
and error.get("loc")[0] == "hooks"
for error in exc.errors()
)
if has_hook_error:
raise ValueError(f"Invalid hook specification: {exc}") from exc
raise ValueError(f"Invalid corpus config: {exc}") from exc
def _load_hooks(self) -> Optional[HookManager]:
"""
Load the hook manager from config if hooks are configured.
:return: Hook manager or None.
:rtype: HookManager or None
:raises ValueError: If hook specifications are invalid.
"""
if self.config is None or not self.config.hooks:
return None
return HookManager.from_config(
corpus_root=self.root,
corpus_uri=self.uri,
hook_specs=self.config.hooks,
)
def _reserved_dir_names(self) -> List[str]:
return [
CORPUS_DIR_NAME,
LEGACY_CORPUS_DIR_NAME,
EXTRACTED_DIR_NAME,
GRAPH_DIR_NAME,
RETRIEVAL_DIR_NAME,
ANALYSIS_DIR_NAME,
]
def _is_reserved_path(self, path: Path) -> bool:
if not path.is_absolute():
candidate = (self.root / path).resolve()
else:
candidate = path.resolve()
try:
relative = candidate.relative_to(self.root)
except ValueError:
return False
if not relative.parts:
return False
if relative.parts[0] in self._reserved_dir_names():
return True
return relative.name == ".biblicusignore"
def _raw_relpath(self, *, output_name: str, storage_subdir: Optional[str]) -> str:
relpath = Path(output_name)
if storage_subdir:
relpath = Path(storage_subdir) / relpath
raw_dir_name = DEFAULT_RAW_DIR
if self.config is not None and isinstance(self.config.raw_dir, str):
raw_dir_name = self.config.raw_dir.strip() or DEFAULT_RAW_DIR
if raw_dir_name and raw_dir_name != ".":
relpath = Path(raw_dir_name) / relpath
return str(relpath)
def _raw_prefix_for_storage(self, storage_subdir: str) -> Path:
raw_dir_name = DEFAULT_RAW_DIR
if self.config is not None and isinstance(self.config.raw_dir, str):
raw_dir_name = self.config.raw_dir.strip() or DEFAULT_RAW_DIR
if raw_dir_name and raw_dir_name != ".":
return Path(raw_dir_name) / storage_subdir
return Path(storage_subdir)
[docs]
@classmethod
def find(cls, start: Path) -> "Corpus":
"""
Locate a corpus by searching upward from a path.
:param start: Starting path to search.
:type start: Path
:return: Located corpus instance.
:rtype: Corpus
:raises FileNotFoundError: If no corpus config is found.
"""
start = start.resolve()
for candidate in [start, *start.parents]:
if (candidate / CORPUS_DIR_NAME / "config.json").is_file():
return cls(candidate)
if (candidate / LEGACY_CORPUS_DIR_NAME / "config.json").is_file():
return cls(candidate)
raise FileNotFoundError(
f"Not a Biblicus corpus (no metadata or legacy config found from {start})"
)
[docs]
@classmethod
def open(cls, ref: str | Path) -> "Corpus":
"""
Open a corpus from a path or uniform resource identifier reference.
:param ref: Filesystem path or file:// uniform resource identifier.
:type ref: str or Path
:return: Opened corpus instance.
:rtype: Corpus
"""
return cls.find(corpus_ref_to_path(ref))
[docs]
@classmethod
def init(cls, root: Path, *, force: bool = False) -> "Corpus":
"""
Initialize a new corpus on disk.
:param root: Corpus root directory.
:type root: Path
:param force: Whether to overwrite existing config.
:type force: bool
:return: Initialized corpus instance.
:rtype: Corpus
:raises FileExistsError: If the corpus already exists and force is False.
"""
root = root.resolve()
root.mkdir(parents=True, exist_ok=True)
corpus = cls(root)
corpus.meta_dir.mkdir(parents=True, exist_ok=True)
if corpus.raw_dir != corpus.root:
corpus.raw_dir.mkdir(parents=True, exist_ok=True)
config_path = corpus.meta_dir / "config.json"
if config_path.exists() and not force:
raise FileExistsError(f"Corpus already exists at {root}")
config = CorpusConfig(
schema_version=SCHEMA_VERSION,
created_at=utc_now_iso(),
corpus_uri=normalize_corpus_uri(root),
raw_dir=DEFAULT_RAW_DIR,
)
config_path.write_text(config.model_dump_json(indent=2) + "\n", encoding="utf-8")
corpus._init_catalog()
return corpus
@property
def catalog_path(self) -> Path:
"""
Return the path to the corpus catalog file.
:return: Catalog file path.
:rtype: Path
"""
return self.meta_dir / "catalog.json"
def _init_catalog(self) -> None:
"""
Initialize the catalog if it does not already exist.
:return: None.
:rtype: None
"""
if self.catalog_path.exists():
return
catalog = CorpusCatalog(
schema_version=SCHEMA_VERSION,
generated_at=utc_now_iso(),
corpus_uri=normalize_corpus_uri(self.root),
raw_dir=DEFAULT_RAW_DIR,
latest_snapshot_id=None,
items={},
order=[],
)
self._write_catalog(catalog)
def _load_catalog(self) -> CorpusCatalog:
"""
Read and validate the corpus catalog file.
:return: Parsed corpus catalog.
:rtype: CorpusCatalog
:raises FileNotFoundError: If the catalog file does not exist.
:raises ValueError: If the catalog schema is invalid.
"""
if not self.catalog_path.is_file():
raise FileNotFoundError(f"Missing corpus catalog: {self.catalog_path}")
catalog_data = json.loads(self.catalog_path.read_text(encoding="utf-8"))
return CorpusCatalog.model_validate(catalog_data)
[docs]
def load_catalog(self) -> CorpusCatalog:
"""
Load the current corpus catalog.
:return: Parsed corpus catalog.
:rtype: CorpusCatalog
:raises FileNotFoundError: If the catalog file does not exist.
:raises ValueError: If the catalog schema is invalid.
"""
return self._load_catalog()
[docs]
def has_items(self) -> bool:
"""
Return whether the corpus catalog contains any items.
:return: True when the catalog has at least one item.
:rtype: bool
"""
catalog = self._load_catalog()
return bool(catalog.items)
[docs]
def catalog_generated_at(self) -> str:
"""
Return the catalog generation timestamp.
:return: International Organization for Standardization 8601 timestamp.
:rtype: str
"""
return self._load_catalog().generated_at
def _write_catalog(self, catalog: CorpusCatalog) -> None:
"""
Atomically write a corpus catalog to disk.
:param catalog: Catalog to persist.
:type catalog: CorpusCatalog
:return: None.
:rtype: None
"""
temp_path = self.catalog_path.with_suffix(".json.tmp")
temp_path.write_text(catalog.model_dump_json(indent=2) + "\n", encoding="utf-8")
temp_path.replace(self.catalog_path)
def _find_item_by_source_uri(self, source_uri: str) -> Optional[CatalogItem]:
"""
Locate an existing catalog item by source uniform resource identifier.
:param source_uri: Source uniform resource identifier to search for.
:type source_uri: str
:return: Matching catalog item or None.
:rtype: CatalogItem or None
"""
if not source_uri:
return None
self._init_catalog()
catalog = self._load_catalog()
for item in catalog.items.values():
if item.source_uri == source_uri:
return item
return None
@property
def snapshots_dir(self) -> Path:
"""
Location of retrieval snapshot manifests.
:return: Path to the snapshots directory.
:rtype: Path
"""
return self.retrieval_dir
@property
def extraction_snapshots_dir(self) -> Path:
"""
Location of extraction snapshot artifacts.
:return: Path to the extraction snapshots directory.
:rtype: Path
"""
return self.extracted_dir
@property
def analysis_runs_dir(self) -> Path:
"""
Location of analysis snapshot artifacts.
:return: Path to the analysis snapshots directory.
:rtype: Path
"""
return self.analysis_dir
@property
def graph_snapshots_dir(self) -> Path:
"""
Location of graph snapshot artifacts.
:return: Path to the graph snapshots directory.
:rtype: Path
"""
return self.graph_dir
[docs]
def analysis_run_dir(self, *, analysis_id: str, snapshot_id: str) -> Path:
"""
Resolve an analysis snapshot directory.
:param analysis_id: Analysis backend identifier.
:type analysis_id: str
:param snapshot_id: Analysis snapshot identifier.
:type snapshot_id: str
:return: Analysis snapshot directory.
:rtype: Path
"""
return self.analysis_runs_dir / analysis_id / snapshot_id
[docs]
def graph_snapshot_dir(self, *, extractor_id: str, snapshot_id: str) -> Path:
"""
Resolve a graph snapshot directory.
:param extractor_id: Graph extractor identifier.
:type extractor_id: str
:param snapshot_id: Graph snapshot identifier.
:type snapshot_id: str
:return: Graph snapshot directory.
:rtype: Path
"""
return self.graph_snapshots_dir / extractor_id / snapshot_id
def _ensure_snapshots_dir(self) -> None:
"""
Ensure the retrieval snapshots directory exists.
:return: None.
:rtype: None
"""
self.retrieval_dir.mkdir(parents=True, exist_ok=True)
[docs]
def write_snapshot(self, snapshot: RetrievalSnapshot) -> None:
"""
Persist a retrieval snapshot manifest and update the catalog pointer.
:param snapshot: Snapshot manifest to persist.
:type snapshot: RetrievalSnapshot
:return: None.
:rtype: None
"""
self._ensure_snapshots_dir()
retriever_id = snapshot.configuration.retriever_id
snapshot_dir = self.retrieval_dir / retriever_id / snapshot.snapshot_id
snapshot_dir.mkdir(parents=True, exist_ok=True)
path = snapshot_dir / "manifest.json"
path.write_text(snapshot.model_dump_json(indent=2) + "\n", encoding="utf-8")
latest_path = self.retrieval_dir / retriever_id / "latest.json"
latest_path.write_text(
json.dumps(
{"snapshot_id": snapshot.snapshot_id, "created_at": snapshot.created_at},
indent=2,
)
+ "\n",
encoding="utf-8",
)
catalog = self._load_catalog()
catalog.latest_snapshot_id = snapshot.snapshot_id
self._write_catalog(catalog)
[docs]
def load_snapshot(self, snapshot_id: str) -> RetrievalSnapshot:
"""
Load a retrieval snapshot manifest by identifier.
:param snapshot_id: Snapshot identifier.
:type snapshot_id: str
:return: Parsed snapshot manifest.
:rtype: RetrievalSnapshot
:raises FileNotFoundError: If the snapshot manifest does not exist.
"""
legacy_path = self.snapshots_dir / f"{snapshot_id}.json"
if legacy_path.is_file():
data = json.loads(legacy_path.read_text(encoding="utf-8"))
return RetrievalSnapshot.model_validate(data)
if not self.retrieval_dir.is_dir():
raise FileNotFoundError(f"Missing snapshot manifest for: {snapshot_id}")
for retriever_dir in sorted(self.retrieval_dir.iterdir()):
if not retriever_dir.is_dir():
continue
manifest_path = retriever_dir / snapshot_id / "manifest.json"
if manifest_path.is_file():
data = json.loads(manifest_path.read_text(encoding="utf-8"))
return RetrievalSnapshot.model_validate(data)
raise FileNotFoundError(f"Missing snapshot manifest for: {snapshot_id}")
@property
def latest_snapshot_id(self) -> Optional[str]:
"""
Latest retrieval snapshot identifier recorded in the catalog.
:return: Latest snapshot identifier or None.
:rtype: str or None
"""
return self._load_catalog().latest_snapshot_id
def _upsert_catalog_item(self, item: CatalogItem) -> None:
"""
Upsert a catalog item and reset the latest run pointer.
:param item: Catalog item to insert or update.
:type item: CatalogItem
:return: None.
:rtype: None
"""
self._init_catalog()
catalog = self._load_catalog()
catalog.items[item.id] = item
ordered_ids = [item_id for item_id in catalog.order if item_id != item.id]
ordered_ids.insert(0, item.id)
catalog.order = ordered_ids
catalog.generated_at = utc_now_iso()
catalog.latest_snapshot_id = None
self._write_catalog(catalog)
[docs]
def ingest_item(
self,
data: bytes,
*,
filename: Optional[str] = None,
media_type: str = "application/octet-stream",
title: Optional[str] = None,
tags: Sequence[str] = (),
metadata: Optional[Dict[str, Any]] = None,
source_uri: str = "unknown",
storage_subdir: Optional[str] = "imports",
) -> IngestResult:
"""
Ingest a single raw item into the corpus.
This is the modality-neutral primitive: callers provide bytes + a media type.
Higher-level conveniences (ingest_note, ingest_source, and related methods) build on top.
:param data: Raw item bytes.
:type data: bytes
:param filename: Optional filename for the stored item.
:type filename: str or None
:param media_type: Internet Assigned Numbers Authority media type for the item.
:type media_type: str
:param title: Optional title metadata.
:type title: str or None
:param tags: Tags to associate with the item.
:type tags: Sequence[str]
:param metadata: Optional metadata mapping.
:type metadata: dict[str, Any] or None
:param source_uri: Source uniform resource identifier for provenance.
:type source_uri: str
:param storage_subdir: Optional subdirectory under the raw root.
:type storage_subdir: str or None
:return: Ingestion result summary.
:rtype: IngestResult
:raises ValueError: If markdown is not Unicode Transformation Format 8.
:raises IngestCollisionError: If a source uniform resource identifier is already ingested.
"""
self._ensure_local_ingest_allowed()
existing_item = self._find_item_by_source_uri(source_uri)
if existing_item is not None:
raise IngestCollisionError(
source_uri=source_uri,
existing_item_id=existing_item.id,
existing_relpath=existing_item.relpath,
)
item_id = str(uuid.uuid4())
storage_filename = _storage_filename_for_ingest(
filename=filename, media_type=media_type, source_uri=source_uri
)
if media_type == "text/markdown":
output_name = f"{item_id}--{storage_filename}" if storage_filename else f"{item_id}.md"
else:
if storage_filename:
output_name = f"{item_id}--{storage_filename}"
else:
extension = _preferred_extension_for_media_type(media_type) or ""
output_name = f"{item_id}{extension}" if extension else f"{item_id}"
relpath = self._raw_relpath(output_name=output_name, storage_subdir=storage_subdir)
output_path = self.root / relpath
output_path.parent.mkdir(parents=True, exist_ok=True)
resolved_title = title.strip() if isinstance(title, str) and title.strip() else None
resolved_tags = list(tags)
metadata_input: Dict[str, Any] = dict(metadata or {})
if resolved_title and "title" not in metadata_input:
metadata_input["title"] = resolved_title
if resolved_tags and "tags" not in metadata_input:
metadata_input["tags"] = list(resolved_tags)
if self._hooks is not None:
mutation = self._hooks.run_ingest_hooks(
hook_point=HookPoint.before_ingest,
filename=filename,
media_type=media_type,
title=resolved_title,
tags=list(resolved_tags),
metadata=dict(metadata_input),
source_uri=source_uri,
)
if mutation.add_tags:
for tag in mutation.add_tags:
if tag not in resolved_tags:
resolved_tags.append(tag)
frontmatter: Dict[str, Any] = {}
if media_type == "text/markdown":
try:
markdown_text = data.decode("utf-8")
except UnicodeDecodeError as decode_error:
raise ValueError(
"Markdown must be Unicode Transformation Format 8"
) from decode_error
parsed_document = parse_front_matter(markdown_text)
frontmatter = dict(parsed_document.metadata)
merged_tags = _merge_tags(resolved_tags, frontmatter.get("tags"))
if merged_tags:
frontmatter["tags"] = merged_tags
resolved_tags = merged_tags
if resolved_title and not (
isinstance(frontmatter.get("title"), str) and frontmatter.get("title").strip()
):
frontmatter["title"] = resolved_title
title_value = frontmatter.get("title")
if isinstance(title_value, str) and title_value.strip():
resolved_title = title_value.strip()
frontmatter = _ensure_biblicus_block(
frontmatter, item_id=item_id, source_uri=source_uri
)
rendered_document = render_front_matter(frontmatter, parsed_document.body)
data_to_write = rendered_document.encode("utf-8")
else:
data_to_write = data
sha256_digest = _sha256_bytes(data_to_write)
output_path.write_bytes(data_to_write)
if media_type != "text/markdown":
sidecar: Dict[str, Any] = {}
sidecar["media_type"] = media_type
if resolved_tags:
sidecar["tags"] = resolved_tags
if metadata_input:
for metadata_key, metadata_value in metadata_input.items():
if metadata_key in {"tags", "biblicus"}:
continue
sidecar[metadata_key] = metadata_value
sidecar["biblicus"] = {"id": item_id, "source": source_uri}
_write_sidecar(output_path, sidecar)
frontmatter = sidecar
if self._hooks is not None:
mutation = self._hooks.run_ingest_hooks(
hook_point=HookPoint.after_ingest,
filename=filename,
media_type=media_type,
title=resolved_title,
tags=list(resolved_tags),
metadata=dict(metadata_input),
source_uri=source_uri,
item_id=item_id,
relpath=relpath,
)
if mutation.add_tags:
updated_tags = list(resolved_tags)
for tag in mutation.add_tags:
if tag not in updated_tags:
updated_tags.append(tag)
resolved_tags = updated_tags
sidecar_metadata = _load_sidecar(output_path)
sidecar_metadata["tags"] = resolved_tags
if media_type != "text/markdown":
sidecar_metadata["media_type"] = media_type
sidecar_metadata["biblicus"] = {"id": item_id, "source": source_uri}
_write_sidecar(output_path, sidecar_metadata)
frontmatter = _merge_metadata(
frontmatter if isinstance(frontmatter, dict) else {}, sidecar_metadata
)
created_at = utc_now_iso()
item_record = CatalogItem(
id=item_id,
relpath=relpath,
sha256=sha256_digest,
bytes=len(data_to_write),
media_type=media_type,
title=resolved_title,
tags=list(resolved_tags),
metadata=dict(frontmatter or {}),
created_at=created_at,
source_uri=source_uri,
)
self._upsert_catalog_item(item_record)
return IngestResult(item_id=item_id, relpath=relpath, sha256=sha256_digest)
[docs]
def ingest_item_stream(
self,
stream,
*,
filename: Optional[str] = None,
media_type: str = "application/octet-stream",
tags: Sequence[str] = (),
metadata: Optional[Dict[str, Any]] = None,
source_uri: str = "unknown",
storage_subdir: Optional[str] = "imports",
) -> IngestResult:
"""
Ingest a binary item from a readable stream.
This method is intended for large non-markdown items. It writes bytes to disk incrementally
while computing a checksum.
:param stream: Readable binary stream.
:type stream: object
:param filename: Optional filename for the stored item.
:type filename: str or None
:param media_type: Internet Assigned Numbers Authority media type for the item.
:type media_type: str
:param tags: Tags to associate with the item.
:type tags: Sequence[str]
:param metadata: Optional metadata mapping.
:type metadata: dict[str, Any] or None
:param source_uri: Source uniform resource identifier for provenance.
:type source_uri: str
:param storage_subdir: Optional subdirectory under the raw root.
:type storage_subdir: str or None
:return: Ingestion result summary.
:rtype: IngestResult
:raises ValueError: If the media_type is text/markdown.
"""
self._ensure_local_ingest_allowed()
if media_type == "text/markdown":
raise ValueError("Stream ingestion is not supported for Markdown")
existing_item = self._find_item_by_source_uri(source_uri)
if existing_item is not None:
raise IngestCollisionError(
source_uri=source_uri,
existing_item_id=existing_item.id,
existing_relpath=existing_item.relpath,
)
item_id = str(uuid.uuid4())
storage_filename = _storage_filename_for_ingest(
filename=filename, media_type=media_type, source_uri=source_uri
)
if storage_filename:
output_name = f"{item_id}--{storage_filename}"
else:
extension = _preferred_extension_for_media_type(media_type) or ""
output_name = f"{item_id}{extension}" if extension else f"{item_id}"
relpath = self._raw_relpath(output_name=output_name, storage_subdir=storage_subdir)
output_path = self.root / relpath
output_path.parent.mkdir(parents=True, exist_ok=True)
resolved_tags = list(tags)
metadata_input: Dict[str, Any] = dict(metadata or {})
if resolved_tags and "tags" not in metadata_input:
metadata_input["tags"] = list(resolved_tags)
if self._hooks is not None:
mutation = self._hooks.run_ingest_hooks(
hook_point=HookPoint.before_ingest,
filename=filename,
media_type=media_type,
title=None,
tags=list(resolved_tags),
metadata=dict(metadata_input),
source_uri=source_uri,
)
if mutation.add_tags:
for tag in mutation.add_tags:
if tag not in resolved_tags:
resolved_tags.append(tag)
write_result = _write_stream_and_hash(stream, output_path)
sha256_digest = str(write_result["sha256"])
bytes_written = int(write_result["bytes_written"])
sidecar: Dict[str, Any] = {}
sidecar["media_type"] = media_type
if resolved_tags:
sidecar["tags"] = resolved_tags
if metadata_input:
for metadata_key, metadata_value in metadata_input.items():
if metadata_key in {"tags", "biblicus"}:
continue
sidecar[metadata_key] = metadata_value
sidecar["biblicus"] = {"id": item_id, "source": source_uri}
_write_sidecar(output_path, sidecar)
if self._hooks is not None:
mutation = self._hooks.run_ingest_hooks(
hook_point=HookPoint.after_ingest,
filename=filename,
media_type=media_type,
title=None,
tags=list(resolved_tags),
metadata=dict(metadata_input),
source_uri=source_uri,
item_id=item_id,
relpath=relpath,
)
if mutation.add_tags:
updated_tags = list(resolved_tags)
for tag in mutation.add_tags:
if tag not in updated_tags:
updated_tags.append(tag)
resolved_tags = updated_tags
sidecar["tags"] = resolved_tags
_write_sidecar(output_path, sidecar)
created_at = utc_now_iso()
item_record = CatalogItem(
id=item_id,
relpath=relpath,
sha256=sha256_digest,
bytes=bytes_written,
media_type=media_type,
title=None,
tags=list(resolved_tags),
metadata=dict(sidecar or {}),
created_at=created_at,
source_uri=source_uri,
)
self._upsert_catalog_item(item_record)
return IngestResult(item_id=item_id, relpath=relpath, sha256=sha256_digest)
[docs]
def ingest_note(
self,
text: str,
*,
title: Optional[str] = None,
tags: Sequence[str] = (),
source_uri: Optional[str] = None,
) -> IngestResult:
"""
Ingest a text note as Markdown.
:param text: Note content.
:type text: str
:param title: Optional title metadata.
:type title: str or None
:param tags: Tags to associate with the note.
:type tags: Sequence[str]
:param source_uri: Optional source uniform resource identifier for provenance.
:type source_uri: str or None
:return: Ingestion result summary.
:rtype: IngestResult
"""
if source_uri is None:
digest_source = (title or "") + "\n" + text
digest = hashlib.sha256(digest_source.encode("utf-8")).hexdigest()
source_uri = f"text:{digest}"
data = text.encode("utf-8")
return self.ingest_item(
data,
filename=None,
media_type="text/markdown",
title=title,
tags=tags,
metadata=None,
source_uri=source_uri,
storage_subdir="notes",
)
def _register_existing_file(
self,
*,
path: Path,
tags: Sequence[str],
metadata: Optional[Dict[str, Any]],
source_uri: str,
) -> IngestResult:
sanitized_name = _sanitize_filename(path.name)
if sanitized_name != path.name:
destination = path.with_name(sanitized_name)
if destination.exists():
raise IngestCollisionError(
source_uri=source_uri,
existing_item_id="unknown",
existing_relpath=str(destination.relative_to(self.root)),
)
path = path.rename(destination)
if self._is_reserved_path(path):
raise ValueError("Cannot ingest files inside reserved corpus folders")
existing_item = self._find_item_by_source_uri(source_uri)
if existing_item is not None:
raise IngestCollisionError(
source_uri=source_uri,
existing_item_id=existing_item.id,
existing_relpath=existing_item.relpath,
)
data = path.read_bytes()
relpath = str(path.relative_to(self.root))
media_type, _ = mimetypes.guess_type(path.name)
media_type = media_type or "application/octet-stream"
if path.suffix.lower() in {".md", ".markdown"}:
media_type = "text/markdown"
frontmatter: Dict[str, Any] = {}
markdown_body: Optional[str] = None
if media_type == "text/markdown":
try:
decoded = data.decode("utf-8")
parsed_document = parse_front_matter(decoded)
except UnicodeDecodeError as decode_error:
raise ValueError(
"Markdown file must be Unicode Transformation Format 8"
) from decode_error
frontmatter = dict(parsed_document.metadata)
markdown_body = parsed_document.body
sidecar = _load_sidecar(path)
merged_metadata = _merge_metadata(frontmatter, sidecar)
resolved_tags = _merge_tags(_merge_tags([], tags), merged_metadata.get("tags"))
if metadata:
for metadata_key, metadata_value in metadata.items():
if metadata_key in {"tags", "biblicus"}:
continue
merged_metadata[metadata_key] = metadata_value
biblicus_block = merged_metadata.get("biblicus")
item_id: Optional[str] = None
if isinstance(biblicus_block, dict):
biblicus_id = biblicus_block.get("id")
if isinstance(biblicus_id, str):
try:
item_id = str(uuid.UUID(biblicus_id))
except ValueError:
item_id = None
if item_id is None:
item_id = str(uuid.uuid4())
if media_type == "text/markdown":
updated_metadata: Dict[str, Any] = dict(merged_metadata)
if resolved_tags:
updated_metadata["tags"] = resolved_tags
updated_metadata = _ensure_biblicus_block(
updated_metadata, item_id=item_id, source_uri=source_uri
)
if markdown_body is None:
markdown_body = ""
rendered_document = render_front_matter(updated_metadata, markdown_body)
path.write_text(rendered_document, encoding="utf-8")
data = rendered_document.encode("utf-8")
merged_metadata = updated_metadata
else:
sidecar_metadata: Dict[str, Any] = dict(merged_metadata)
sidecar_metadata["biblicus"] = {"id": item_id, "source": source_uri}
if resolved_tags:
sidecar_metadata["tags"] = resolved_tags
sidecar_metadata["media_type"] = media_type
_write_sidecar(path, sidecar_metadata)
merged_metadata = sidecar_metadata
sha256_digest = _sha256_bytes(data)
created_at = utc_now_iso()
item_record = CatalogItem(
id=item_id,
relpath=relpath,
sha256=sha256_digest,
bytes=len(data),
media_type=media_type,
title=merged_metadata.get("title")
if isinstance(merged_metadata.get("title"), str)
else None,
tags=list(resolved_tags),
metadata=dict(merged_metadata),
created_at=created_at,
source_uri=source_uri,
)
self._upsert_catalog_item(item_record)
return IngestResult(item_id=item_id, relpath=relpath, sha256=sha256_digest)
[docs]
def ingest_source(
self,
source: str | Path,
*,
tags: Sequence[str] = (),
source_uri: Optional[str] = None,
allow_external: bool = False,
) -> IngestResult:
"""
Ingest a file path or uniform resource locator source.
:param source: File path or uniform resource locator.
:type source: str or Path
:param tags: Tags to associate with the item.
:type tags: Sequence[str]
:param source_uri: Optional override for the source uniform resource identifier.
:type source_uri: str or None
:param allow_external: Whether to ingest files outside the corpus root by copying them into imports.
:type allow_external: bool
:return: Ingestion result summary.
:rtype: IngestResult
"""
candidate_path = Path(source) if isinstance(source, str) and "://" not in source else None
if isinstance(source, Path) or (candidate_path is not None and candidate_path.exists()):
path = source if isinstance(source, Path) else candidate_path
assert isinstance(path, Path)
path = path.resolve()
if not allow_external and not path.is_relative_to(self.root):
raise ValueError(
"Local ingest requires the file to be inside the corpus root. "
"Move the file into the corpus and run reindex."
)
resolved_source_uri = source_uri or path.as_uri()
data = path.read_bytes()
media_type, _ = mimetypes.guess_type(path.name)
media_type = media_type or "application/octet-stream"
if path.suffix.lower() in {".md", ".markdown"}:
media_type = "text/markdown"
return self.ingest_item(
data,
filename=path.name,
media_type=media_type,
title=None,
tags=tags,
metadata=None,
source_uri=resolved_source_uri,
storage_subdir="imports",
)
payload = load_source(source, source_uri=source_uri)
return self.ingest_item(
payload.data,
filename=payload.filename,
media_type=payload.media_type,
title=None,
tags=tags,
metadata=None,
source_uri=payload.source_uri,
storage_subdir="imports",
)
[docs]
def import_tree(self, source_root: Path, *, tags: Sequence[str] = ()) -> Dict[str, int]:
"""
Import a folder tree into the corpus, preserving relative paths and provenance.
Imported content must already live under the corpus root. The import registers files
in-place and writes sidecars when needed.
:param source_root: Root directory of the folder tree to import.
:type source_root: Path
:param tags: Tags to associate with imported items.
:type tags: Sequence[str]
:return: Import statistics.
:rtype: dict[str, int]
:raises FileNotFoundError: If the source_root does not exist.
:raises ValueError: If the source root is outside the corpus root.
"""
self._ensure_local_ingest_allowed()
source_root = source_root.resolve()
if not source_root.is_dir():
raise FileNotFoundError(f"Import source root does not exist: {source_root}")
if not source_root.is_relative_to(self.root):
raise ValueError(
"Import requires the source folder to live inside the corpus root. "
"Move it under the corpus and reindex."
)
ignore_spec = load_corpus_ignore_spec(self.root)
stats = {"scanned": 0, "ignored": 0, "imported": 0}
for source_path in sorted(source_root.rglob("*")):
if not source_path.is_file():
continue
relative_root_path = source_path.relative_to(self.root)
relative_source_path = relative_root_path.as_posix()
stats["scanned"] += 1
if ignore_spec.matches(relative_source_path):
stats["ignored"] += 1
continue
if self._is_reserved_path(relative_root_path):
stats["ignored"] += 1
continue
self._register_existing_file(
path=source_path,
tags=tags,
metadata=None,
source_uri=source_path.as_uri(),
)
stats["imported"] += 1
return stats
def _import_file(
self,
*,
source_path: Path,
import_id: str,
relative_source_path: str,
tags: Sequence[str],
) -> None:
"""
Import a single file into the corpus under an import namespace.
:param source_path: Source file path to import.
:type source_path: Path
:param import_id: Import identifier.
:type import_id: str
:param relative_source_path: Relative path within the imported tree.
:type relative_source_path: str
:param tags: Tags to apply.
:type tags: Sequence[str]
:return: None.
:rtype: None
:raises ValueError: If a markdown file cannot be decoded as Unicode Transformation Format 8.
"""
item_id = str(uuid.uuid4())
destination_relpath = self._raw_relpath(
output_name=relative_source_path,
storage_subdir=str(Path("imports") / import_id),
)
destination_path = (self.root / destination_relpath).resolve()
destination_path.parent.mkdir(parents=True, exist_ok=True)
raw_bytes = source_path.read_bytes()
sha256_digest = _sha256_bytes(raw_bytes)
media_type, _ = mimetypes.guess_type(source_path.name)
media_type = media_type or "application/octet-stream"
if source_path.suffix.lower() in {".md", ".markdown"}:
media_type = "text/markdown"
title: Optional[str] = None
frontmatter_metadata: Dict[str, Any] = {}
if media_type == "text/markdown":
try:
text = raw_bytes.decode("utf-8")
except UnicodeDecodeError as decode_error:
raise ValueError(
f"Markdown file must be Unicode Transformation Format 8: {relative_source_path}"
) from decode_error
parsed_document = parse_front_matter(text)
frontmatter_metadata = dict(parsed_document.metadata)
title_value = frontmatter_metadata.get("title")
if isinstance(title_value, str) and title_value.strip():
title = title_value.strip()
destination_path.write_bytes(raw_bytes)
sidecar: Dict[str, Any] = {}
if tags:
sidecar["tags"] = [t.strip() for t in tags if isinstance(t, str) and t.strip()]
if media_type != "text/markdown":
sidecar["media_type"] = media_type
sidecar["biblicus"] = {"id": item_id, "source": source_path.as_uri()}
_write_sidecar(destination_path, sidecar)
merged_metadata = _merge_metadata(frontmatter_metadata, sidecar)
resolved_tags = _merge_tags([], merged_metadata.get("tags"))
item_record = CatalogItem(
id=item_id,
relpath=destination_relpath,
sha256=sha256_digest,
bytes=len(raw_bytes),
media_type=media_type,
title=title,
tags=list(resolved_tags),
metadata=dict(merged_metadata or {}),
created_at=utc_now_iso(),
source_uri=source_path.as_uri(),
)
self._upsert_catalog_item(item_record)
[docs]
def list_items(self, *, limit: int = 50) -> List[CatalogItem]:
"""
List items from the catalog.
:param limit: Maximum number of items to return.
:type limit: int
:return: Catalog items ordered by recency.
:rtype: list[CatalogItem]
"""
catalog = self._load_catalog()
ordered_ids = catalog.order[:limit] if catalog.order else list(catalog.items.keys())[:limit]
collected_items: List[CatalogItem] = []
for item_id in ordered_ids:
item = catalog.items.get(item_id)
if item is not None:
collected_items.append(item)
return collected_items
[docs]
def get_item(self, item_id: str) -> CatalogItem:
"""
Fetch a catalog item by identifier.
:param item_id: Item identifier.
:type item_id: str
:return: Catalog item.
:rtype: CatalogItem
:raises KeyError: If the item identifier is unknown.
"""
catalog = self._load_catalog()
item = catalog.items.get(item_id)
if item is None:
raise KeyError(f"Unknown item identifier: {item_id}")
return item
[docs]
def create_crawl_id(self) -> str:
"""
Create a new crawl identifier.
:return: Crawl identifier.
:rtype: str
"""
return str(uuid.uuid4())
[docs]
def ingest_crawled_payload(
self,
*,
crawl_id: str,
relative_path: str,
data: bytes,
filename: str,
media_type: str,
source_uri: str,
tags: Sequence[str],
) -> None:
"""
Ingest a crawled payload under a crawl import namespace.
:param crawl_id: Crawl identifier used to group crawled artifacts.
:type crawl_id: str
:param relative_path: Relative path within the crawl prefix.
:type relative_path: str
:param data: Raw payload bytes.
:type data: bytes
:param filename: Suggested filename from the payload metadata.
:type filename: str
:param media_type: Internet Assigned Numbers Authority media type.
:type media_type: str
:param source_uri: Source uniform resource identifier (typically an http or https uniform resource locator).
:type source_uri: str
:param tags: Tags to attach to the stored item.
:type tags: Sequence[str]
:return: None.
:rtype: None
"""
self._ensure_local_ingest_allowed()
_ = filename
item_id = str(uuid.uuid4())
destination_relpath = self._raw_relpath(
output_name=relative_path,
storage_subdir=str(Path("imports") / "crawl" / crawl_id),
)
destination_path = (self.root / destination_relpath).resolve()
destination_path.parent.mkdir(parents=True, exist_ok=True)
destination_path.write_bytes(data)
sha256_digest = _sha256_bytes(data)
sidecar: Dict[str, Any] = {}
sidecar["tags"] = [t.strip() for t in tags if isinstance(t, str) and t.strip()]
sidecar["media_type"] = media_type
sidecar["biblicus"] = {"id": item_id, "source": source_uri}
_write_sidecar(destination_path, sidecar)
merged_metadata = _merge_metadata({}, sidecar)
resolved_tags = _merge_tags([], merged_metadata.get("tags"))
item_record = CatalogItem(
id=item_id,
relpath=destination_relpath,
sha256=sha256_digest,
bytes=len(data),
media_type=media_type,
title=None,
tags=list(resolved_tags),
metadata=dict(merged_metadata or {}),
created_at=utc_now_iso(),
source_uri=source_uri,
)
self._upsert_catalog_item(item_record)
[docs]
def pull_source(
self, *, tag_resolver: Optional[Callable[[str], List[str]]] = None
) -> RemoteSourcePullResult:
"""
Mirror a remote source into the corpus.
:return: Pull summary.
:rtype: RemoteSourcePullResult
:raises ValueError: If the corpus has no configured remote source.
"""
if self.config is None or self.config.source is None:
raise ValueError("Remote source is not configured for this corpus.")
source_config = self.config.source
source_name = self._resolve_remote_source_name(source_config)
storage_subdir = str(Path("imports") / "remote" / source_name)
ignore_spec = load_corpus_ignore_spec(self.root)
profile = resolve_source_profile(source_config.profile)
if source_config.kind != profile.kind:
raise ValueError(
"Remote source kind does not match source profile kind: "
f"{source_config.kind} vs {profile.kind}"
)
if source_config.kind == "s3":
source = S3RemoteSource(source_config, profile)
elif source_config.kind == "azure-blob":
source = AzureBlobRemoteSource(source_config, profile)
else:
raise ValueError(f"Unsupported remote source kind: {source_config.kind}")
result = RemoteSourcePullResult()
items = source.list_items()
result.listed = len(items)
remote_uris = set()
for item in items:
remote_uris.add(item.source_uri)
relative_key = self._relative_remote_key(item.key, prefix=source_config.prefix)
if not relative_key:
result.skipped += 1
continue
if ignore_spec.matches(relative_key):
result.skipped += 1
continue
existing_item = self._find_item_by_source_uri(item.source_uri)
if existing_item is not None and self._remote_item_unchanged(existing_item, item):
result.skipped += 1
continue
content, content_type = source.fetch_bytes(item)
relpath = self._raw_relpath(output_name=relative_key, storage_subdir=storage_subdir)
if existing_item is not None and existing_item.relpath != relpath:
self._delete_item_files(existing_item.relpath)
extra_tags = tag_resolver(relative_key) if tag_resolver is not None else []
catalog_item = self._write_remote_item(
data=content,
relpath=relpath,
source_uri=item.source_uri,
source_etag=item.etag,
source_last_modified=item.last_modified,
content_type=content_type or item.content_type,
item_id=existing_item.id if existing_item is not None else None,
created_at=existing_item.created_at if existing_item is not None else None,
extra_tags=extra_tags,
)
self._upsert_catalog_item(catalog_item)
if existing_item is None:
result.downloaded += 1
else:
result.updated += 1
result.pruned = self._prune_remote_items(
storage_subdir=storage_subdir, remote_uris=remote_uris
)
self.reindex()
return result
def _resolve_remote_source_name(self, source_config) -> str:
if source_config.name and source_config.name.strip():
return _sanitize_filename(source_config.name.strip())
if source_config.kind == "s3" and source_config.bucket:
return _sanitize_filename(source_config.bucket)
if source_config.kind == "azure-blob" and source_config.container:
return _sanitize_filename(source_config.container)
return "remote"
def _relative_remote_key(self, key: str, *, prefix: Optional[str]) -> str:
relative_key = key
if prefix and relative_key.startswith(prefix):
relative_key = relative_key[len(prefix) :]
return relative_key.lstrip("/")
def _remote_item_unchanged(self, existing_item: CatalogItem, item) -> bool:
metadata = existing_item.metadata or {}
biblicus_block = metadata.get("biblicus") if isinstance(metadata, dict) else None
if not isinstance(biblicus_block, dict):
biblicus_block = {}
existing_etag = biblicus_block.get("source_etag")
existing_last_modified = biblicus_block.get("source_last_modified")
if item.etag and existing_etag == item.etag:
return True
if not item.etag and item.last_modified and existing_last_modified == item.last_modified:
return True
return False
def _write_remote_item(
self,
*,
data: bytes,
relpath: str,
source_uri: str,
source_etag: Optional[str],
source_last_modified: Optional[str],
content_type: Optional[str],
item_id: Optional[str],
created_at: Optional[str],
extra_tags: Optional[Sequence[str]] = None,
) -> CatalogItem:
output_path = self.root / relpath
output_path.parent.mkdir(parents=True, exist_ok=True)
normalized_type = None
if content_type:
normalized_type = content_type.split(";", 1)[0].strip()
media_type = normalized_type or _media_type_from_filename(output_path.name)
if output_path.suffix.lower() in {".md", ".markdown"}:
media_type = "text/markdown"
resolved_item_id = item_id or str(uuid.uuid4())
metadata: Dict[str, Any] = {}
title: Optional[str] = None
tags: List[str] = []
if media_type == "text/markdown":
try:
markdown_text = data.decode("utf-8")
except UnicodeDecodeError as decode_error:
raise ValueError(
f"Markdown must be Unicode Transformation Format 8: {output_path.name}"
) from decode_error
sidecar_path = _sidecar_path_for(output_path)
if sidecar_path.exists():
sidecar_path.unlink()
parsed_document = parse_front_matter(markdown_text)
frontmatter = dict(parsed_document.metadata)
frontmatter = _ensure_biblicus_block(
frontmatter, item_id=resolved_item_id, source_uri=source_uri
)
frontmatter = _update_biblicus_block(
frontmatter,
{
"source_etag": source_etag,
"source_last_modified": source_last_modified,
},
)
if extra_tags:
merged_tags = _merge_tags([], frontmatter.get("tags"))
merged_tags = _merge_tags(merged_tags, extra_tags)
frontmatter["tags"] = list(merged_tags)
rendered_document = render_front_matter(frontmatter, parsed_document.body)
data_to_write = rendered_document.encode("utf-8")
metadata = frontmatter
else:
data_to_write = data
sidecar: Dict[str, Any] = {}
sidecar["media_type"] = media_type
sidecar = _ensure_biblicus_block(
sidecar, item_id=resolved_item_id, source_uri=source_uri
)
sidecar = _update_biblicus_block(
sidecar,
{
"source_etag": source_etag,
"source_last_modified": source_last_modified,
},
)
if extra_tags:
merged_tags = _merge_tags([], sidecar.get("tags"))
merged_tags = _merge_tags(merged_tags, extra_tags)
sidecar["tags"] = list(merged_tags)
_write_sidecar(output_path, sidecar)
metadata = sidecar
title_value = metadata.get("title")
if isinstance(title_value, str) and title_value.strip():
title = title_value.strip()
tags = _merge_tags([], metadata.get("tags"))
sha256_digest = _sha256_bytes(data_to_write)
output_path.write_bytes(data_to_write)
return CatalogItem(
id=resolved_item_id,
relpath=relpath,
sha256=sha256_digest,
bytes=len(data_to_write),
media_type=media_type,
title=title,
tags=list(tags),
metadata=dict(metadata or {}),
created_at=created_at or utc_now_iso(),
source_uri=source_uri,
)
def _delete_item_files(self, relpath: str) -> None:
content_path = self.root / relpath
if content_path.exists():
content_path.unlink()
sidecar_path = _sidecar_path_for(content_path)
if sidecar_path.exists():
sidecar_path.unlink()
def _prune_remote_items(self, *, storage_subdir: str, remote_uris: set[str]) -> int:
catalog = self._load_catalog()
prefix_path = self._raw_prefix_for_storage(storage_subdir)
prefix_text = prefix_path.as_posix().rstrip("/") + "/"
pruned = 0
removed_ids = []
for item_id, item in catalog.items.items():
if not item.relpath.startswith(prefix_text):
continue
if item.source_uri in remote_uris:
continue
self._delete_item_files(item.relpath)
removed_ids.append(item_id)
pruned += 1
if removed_ids:
for item_id in removed_ids:
catalog.items.pop(item_id, None)
catalog.order = [item_id for item_id in catalog.order if item_id not in removed_ids]
catalog.generated_at = utc_now_iso()
catalog.latest_snapshot_id = None
self._write_catalog(catalog)
return pruned
[docs]
def reindex(self) -> Dict[str, int]:
"""
Rebuild/refresh the corpus catalog from the current on-disk corpus contents.
This is the core "mutable corpus with re-indexing" loop: edit raw files or sidecars,
then reindex to refresh the derived catalog.
:return: Reindex statistics.
:rtype: dict[str, int]
:raises ValueError: If a markdown file cannot be decoded as Unicode Transformation Format 8.
"""
self._init_catalog()
existing_catalog = self._load_catalog()
stats = {"scanned": 0, "skipped": 0, "inserted": 0, "updated": 0}
if self.raw_dir == self.root:
content_files = [
content_path
for content_path in self.root.rglob("*")
if content_path.is_file()
and not content_path.name.endswith(SIDECAR_SUFFIX)
and not self._is_reserved_path(content_path)
]
else:
content_files = [
content_path
for content_path in self.raw_dir.rglob("*")
if content_path.is_file() and not content_path.name.endswith(SIDECAR_SUFFIX)
]
new_items: Dict[str, CatalogItem] = {}
for content_path in content_files:
stats["scanned"] += 1
relpath = str(content_path.relative_to(self.root))
data = content_path.read_bytes()
sha256 = _sha256_bytes(data)
media_type, _ = mimetypes.guess_type(content_path.name)
media_type = media_type or "application/octet-stream"
sidecar = _load_sidecar(content_path)
frontmatter: Dict[str, Any] = {}
if content_path.suffix.lower() in {".md", ".markdown"}:
try:
text = data.decode("utf-8")
except UnicodeDecodeError as decode_error:
raise ValueError(
f"Markdown file must be Unicode Transformation Format 8: {relpath}"
) from decode_error
parsed_document = parse_front_matter(text)
frontmatter = parsed_document.metadata
media_type = "text/markdown"
merged_metadata = _merge_metadata(frontmatter, sidecar)
if media_type != "text/markdown":
media_type_override = merged_metadata.get("media_type")
if isinstance(media_type_override, str) and media_type_override.strip():
media_type = media_type_override.strip()
item_id: Optional[str] = None
biblicus_block = merged_metadata.get("biblicus")
if isinstance(biblicus_block, dict):
biblicus_id = biblicus_block.get("id")
if isinstance(biblicus_id, str):
try:
item_id = str(uuid.UUID(biblicus_id))
except ValueError:
item_id = None
if item_id is None:
item_id = _parse_uuid_prefix(content_path.name)
if item_id is None:
stats["skipped"] += 1
continue
title: Optional[str] = None
title_value = merged_metadata.get("title")
if isinstance(title_value, str) and title_value.strip():
title = title_value.strip()
resolved_tags = _merge_tags([], merged_metadata.get("tags"))
source_uri: Optional[str] = None
if isinstance(biblicus_block, dict):
source_value = biblicus_block.get("source")
if isinstance(source_value, str) and source_value.strip():
source_uri = source_value.strip()
previous_item = existing_catalog.items.get(item_id)
created_at = previous_item.created_at if previous_item is not None else utc_now_iso()
source_uri = source_uri or (
previous_item.source_uri if previous_item is not None else None
)
if previous_item is None:
stats["inserted"] += 1
else:
stats["updated"] += 1
new_items[item_id] = CatalogItem(
id=item_id,
relpath=relpath,
sha256=sha256,
bytes=len(data),
media_type=media_type,
title=title,
tags=list(resolved_tags),
metadata=dict(merged_metadata or {}),
created_at=created_at,
source_uri=source_uri,
)
order = sorted(
new_items.keys(),
key=lambda item_id: (new_items[item_id].created_at, item_id),
reverse=True,
)
catalog = CorpusCatalog(
schema_version=SCHEMA_VERSION,
generated_at=utc_now_iso(),
corpus_uri=normalize_corpus_uri(self.root),
raw_dir=DEFAULT_RAW_DIR,
latest_snapshot_id=None,
items=new_items,
order=order,
)
self._write_catalog(catalog)
return stats
@property
def name(self) -> str:
"""
Return the corpus name (directory basename).
:return: Corpus name.
:rtype: str
"""
return self.root.name
[docs]
def purge(self, *, confirm: str) -> None:
"""
Delete all ingested items and derived files, preserving corpus identity/config.
:param confirm: Confirmation string matching the corpus name.
:type confirm: str
:return: None.
:rtype: None
:raises ValueError: If the confirmation does not match.
"""
expected = self.name
if confirm != expected:
raise ValueError(
f"Confirmation mismatch: pass --confirm {expected!r} to purge this corpus"
)
if self.raw_dir == self.root:
for path in self.root.iterdir():
if path.name in self._reserved_dir_names():
continue
if path.is_dir():
shutil.rmtree(path)
else:
path.unlink()
elif self.raw_dir.exists():
shutil.rmtree(self.raw_dir)
self.raw_dir.mkdir(parents=True, exist_ok=True)
for path in self.meta_dir.iterdir():
if path.name == "config.json":
continue
if path.is_dir():
shutil.rmtree(path)
else:
path.unlink()
for derived_dir in [
self.extracted_dir,
self.graph_dir,
self.retrieval_dir,
self.analysis_dir,
]:
if derived_dir.exists():
shutil.rmtree(derived_dir)
self._init_catalog()
self._write_catalog(
CorpusCatalog(
schema_version=SCHEMA_VERSION,
generated_at=utc_now_iso(),
corpus_uri=normalize_corpus_uri(self.root),
raw_dir=DEFAULT_RAW_DIR,
latest_snapshot_id=None,
items={},
order=[],
)
)