Source code for biblicus.graph.neo4j

"""
Neo4j graph storage helpers for Biblicus.
"""

from __future__ import annotations

import json
import os
import shutil
import subprocess
import time
from dataclasses import dataclass
from typing import List, Optional

from ..user_config import BiblicusUserConfig, load_user_config


[docs] @dataclass(frozen=True) class Neo4jSettings: """ Configuration values for Neo4j connectivity and lifecycle. :ivar uri: Neo4j connection URI. :ivar username: Neo4j username. :ivar password: Neo4j password. :ivar database: Optional Neo4j database name. :ivar auto_start: Whether to auto-start Neo4j via Docker. :ivar container_name: Docker container name for auto-start. :ivar docker_image: Docker image for auto-start. :ivar http_port: HTTP port for Neo4j UI. :ivar bolt_port: Bolt port for Neo4j driver connections. """ uri: str username: str password: str database: Optional[str] auto_start: bool container_name: str docker_image: str http_port: int bolt_port: int
[docs] def resolve_neo4j_settings(*, config: Optional[BiblicusUserConfig] = None) -> Neo4jSettings: """ Resolve Neo4j settings from environment or user configuration. :param config: Optional pre-loaded user configuration. :type config: BiblicusUserConfig or None :return: Resolved Neo4j settings. :rtype: Neo4jSettings """ loaded = config or load_user_config() neo4j_config = getattr(loaded, "neo4j", None) uri = os.environ.get("NEO4J_URI") or (neo4j_config.uri if neo4j_config else None) username = os.environ.get("NEO4J_USERNAME") or ( neo4j_config.username if neo4j_config else None ) password = os.environ.get("NEO4J_PASSWORD") or ( neo4j_config.password if neo4j_config else None ) database = os.environ.get("NEO4J_DATABASE") or ( neo4j_config.database if neo4j_config else None ) auto_start_env = os.environ.get("BIBLICUS_NEO4J_AUTO_START") if auto_start_env is not None: auto_start = auto_start_env.strip().lower() in {"1", "true", "yes", "on"} else: auto_start = neo4j_config.auto_start if neo4j_config else True container_name = os.environ.get("BIBLICUS_NEO4J_CONTAINER_NAME") or ( neo4j_config.container_name if neo4j_config else "biblicus-neo4j" ) docker_image = os.environ.get("BIBLICUS_NEO4J_IMAGE") or ( neo4j_config.docker_image if neo4j_config else "neo4j:5" ) http_port = _resolve_int( "BIBLICUS_NEO4J_HTTP_PORT", neo4j_config.http_port if neo4j_config else 7474, ) bolt_port = _resolve_int( "BIBLICUS_NEO4J_BOLT_PORT", neo4j_config.bolt_port if neo4j_config else 7687, ) resolved_uri = uri or f"bolt://localhost:{bolt_port}" resolved_username = username or "neo4j" resolved_password = password or "testpassword" return Neo4jSettings( uri=resolved_uri, username=resolved_username, password=resolved_password, database=database, auto_start=auto_start, container_name=container_name, docker_image=docker_image, http_port=http_port, bolt_port=bolt_port, )
[docs] def ensure_neo4j_running(settings: Neo4jSettings) -> None: """ Ensure the Neo4j container is running when auto-start is enabled. :param settings: Resolved Neo4j settings. :type settings: Neo4jSettings :return: None. :rtype: None :raises ValueError: If Docker is unavailable or the container cannot be started. """ if not settings.auto_start: return if shutil.which("docker") is None: raise ValueError("Neo4j auto-start requires Docker to be installed") if _container_running(settings.container_name): return if _container_exists(settings.container_name): _docker_start(settings.container_name) else: _docker_run(settings)
[docs] def create_neo4j_driver(settings: Neo4jSettings): """ Create a Neo4j driver, waiting for availability when auto-start is enabled. :param settings: Resolved Neo4j settings. :type settings: Neo4jSettings :return: Neo4j driver instance. :rtype: neo4j.Driver :raises ValueError: If the Neo4j driver dependency is missing. """ try: from neo4j import GraphDatabase except ImportError as exc: raise ValueError( "Neo4j support requires an optional dependency. " 'Install it with pip install "biblicus[neo4j]".' ) from exc ensure_neo4j_running(settings) driver = GraphDatabase.driver( settings.uri, auth=(settings.username, settings.password), ) _wait_for_neo4j(driver, settings) return driver
def _wait_for_neo4j(driver, settings: Neo4jSettings) -> None: deadline = time.time() + 30 last_error: Optional[Exception] = None while time.time() < deadline: try: with driver.session(database=settings.database) as session: session.run("RETURN 1") return except Exception as exc: last_error = exc time.sleep(0.5) message = "Neo4j did not become available within 30 seconds" if last_error is not None: message = f"{message}: {last_error}" raise ValueError(message) def _resolve_int(env_key: str, default: int) -> int: raw = os.environ.get(env_key) if raw is None: return default try: return int(raw) except ValueError as exc: raise ValueError(f"{env_key} must be an integer") from exc def _container_running(name: str) -> bool: output = _run_docker( ["ps", "--filter", f"name={name}", "--filter", "status=running", "--format", "{{.Names}}"] ) return name in output.splitlines() def _container_exists(name: str) -> bool: output = _run_docker( ["ps", "-a", "--filter", f"name={name}", "--format", "{{.Names}}"] ) return name in output.splitlines() def _docker_start(name: str) -> None: _run_docker(["start", name]) def _docker_run(settings: Neo4jSettings) -> None: args = [ "run", "-d", "--name", settings.container_name, "-p", f"{settings.http_port}:7474", "-p", f"{settings.bolt_port}:7687", "-e", f"NEO4J_AUTH={settings.username}/{settings.password}", settings.docker_image, ] _run_docker(args) def _run_docker(args: List[str]) -> str: result = subprocess.run( ["docker", *args], check=False, capture_output=True, text=True, ) if result.returncode != 0: message = result.stderr.strip() or result.stdout.strip() or "Docker command failed" raise ValueError(message) return result.stdout
[docs] def write_graph_records( *, driver, settings: Neo4jSettings, corpus_id: str, graph_id: str, extraction_snapshot: str, item_id: str, nodes, edges, ) -> None: """ Persist graph nodes and edges to Neo4j. :param driver: Neo4j driver instance. :type driver: neo4j.Driver :param settings: Resolved Neo4j settings. :type settings: Neo4jSettings :param corpus_id: Corpus identifier. :type corpus_id: str :param graph_id: Graph identifier. :type graph_id: str :param extraction_snapshot: Extraction snapshot reference. :type extraction_snapshot: str :param item_id: Corpus item identifier. :type item_id: str :param nodes: Iterable of graph nodes. :type nodes: Iterable[biblicus.graph.models.GraphNode] :param edges: Iterable of graph edges. :type edges: Iterable[biblicus.graph.models.GraphEdge] :return: None. :rtype: None """ node_payload = [ { "node_id": node.node_id, "node_type": node.node_type, "label": node.label, "properties_json": json.dumps(node.properties, sort_keys=True), } for node in nodes ] edge_payload = [ { "edge_id": edge.edge_id, "src": edge.src, "dst": edge.dst, "edge_type": edge.edge_type, "weight": edge.weight, "properties_json": json.dumps(edge.properties, sort_keys=True), } for edge in edges ] with driver.session(database=settings.database) as session: if node_payload: session.execute_write( _write_nodes, corpus_id, graph_id, extraction_snapshot, item_id, node_payload, ) if edge_payload: session.execute_write( _write_edges, corpus_id, graph_id, extraction_snapshot, item_id, edge_payload, )
def _write_nodes(tx, corpus_id: str, graph_id: str, extraction_snapshot: str, item_id: str, nodes): tx.run( """ UNWIND $nodes AS node MERGE (n:GraphNode { corpus_id: $corpus_id, graph_id: $graph_id, extraction_snapshot_id: $extraction_snapshot, item_id: $item_id, node_id: node.node_id }) SET n.node_type = node.node_type, n.label = node.label, n.properties_json = node.properties_json """, corpus_id=corpus_id, graph_id=graph_id, extraction_snapshot=extraction_snapshot, item_id=item_id, nodes=nodes, ) def _write_edges( tx, corpus_id: str, graph_id: str, extraction_snapshot: str, item_id: str, edges ): tx.run( """ UNWIND $edges AS edge MATCH (src:GraphNode { corpus_id: $corpus_id, graph_id: $graph_id, extraction_snapshot_id: $extraction_snapshot, item_id: $item_id, node_id: edge.src }) MATCH (dst:GraphNode { corpus_id: $corpus_id, graph_id: $graph_id, extraction_snapshot_id: $extraction_snapshot, item_id: $item_id, node_id: edge.dst }) MERGE (src)-[r:RELATED { corpus_id: $corpus_id, graph_id: $graph_id, extraction_snapshot_id: $extraction_snapshot, item_id: $item_id, edge_id: edge.edge_id }]->(dst) SET r.edge_type = edge.edge_type, r.weight = edge.weight, r.properties_json = edge.properties_json """, corpus_id=corpus_id, graph_id=graph_id, extraction_snapshot=extraction_snapshot, item_id=item_id, edges=edges, )