"""Persisted BM25 corpus management.""" from __future__ import annotations import json import logging import shutil from dataclasses import dataclass from datetime import UTC, datetime from functools import cache from pathlib import Path from typing import TYPE_CHECKING import bm25s from sqlalchemy import func, select, union_all from python.orm.richie import EbookChapter, EbookChunk, EbookSource if TYPE_CHECKING: from sqlalchemy.orm import Session from python.ebook_search.config import EbookSearchConfig logger = logging.getLogger(__name__) MANIFEST_NAME = "manifest.json" REQUIRED_INDEX_FILES = frozenset( { "data.csc.index.npy", "indices.csc.index.npy", "indptr.csc.index.npy", "params.index.json", "vocab.index.json", "corpus.jsonl", } ) @dataclass(frozen=True) class BM25Manifest: """Metadata describing a persisted BM25 corpus.""" created_at: datetime db_updated_at: datetime | None chunk_count: int @dataclass(frozen=True) class BM25Corpus: """Loaded persisted BM25 corpus and retriever.""" retriever: object | None records: tuple[dict[str, object], ...] manifest: BM25Manifest class BM25CorpusUnavailableError(RuntimeError): """Raised when the persisted BM25 corpus cannot be loaded.""" def bm25_index_path(config: EbookSearchConfig) -> Path: """Return the configured BM25 index root path relative to the current working directory.""" path = Path(config.bm25_index_dir).expanduser() if path.is_absolute(): return path return Path.cwd() / path def get_current_bm25_index(index_path: Path) -> Path: """Return the live BM25 index directory.""" current_path = index_path / "current" if current_path.exists() or current_path.is_symlink(): return current_path return index_path def ensure_bm25_corpus(session: Session, config: EbookSearchConfig) -> None: """Create or refresh the persisted BM25 corpus when it is missing or stale.""" index_path = bm25_index_path(config) manifest = read_bm25_manifest(index_path) db_updated_at = corpus_last_updated_at(session) if not bm25_index_exists(index_path, manifest): logger.info("ebook_bm25_index_missing path=%s", index_path) refresh_bm25_corpus(session, config, db_updated_at=db_updated_at) return if db_updated_at is not None and manifest is not None and manifest.created_at < db_updated_at: logger.info( "ebook_bm25_index_stale path=%s created_at=%s db_updated_at=%s", index_path, manifest.created_at.isoformat(), db_updated_at.isoformat(), ) refresh_bm25_corpus(session, config, db_updated_at=db_updated_at) return logger.info( "ebook_bm25_index_current path=%s chunks=%s created_at=%s", index_path, manifest.chunk_count if manifest else 0, manifest.created_at.isoformat() if manifest else None, ) def refresh_bm25_corpus( session: Session, config: EbookSearchConfig, *, db_updated_at: datetime | None = None, ) -> BM25Manifest: """Rebuild and persist the BM25 corpus from the current database chunks.""" index_path = bm25_index_path(config) records, texts = fetch_bm25_corpus_records(session) manifest = BM25Manifest( created_at=datetime.now(tz=UTC), db_updated_at=db_updated_at if db_updated_at is not None else corpus_last_updated_at(session), chunk_count=len(records), ) write_bm25_corpus(index_path, records, texts, manifest) logger.info( "ebook_bm25_index_refreshed path=%s chunks=%s created_at=%s", index_path, manifest.chunk_count, manifest.created_at.isoformat(), ) return manifest @cache def load_bm25_corpus(config: EbookSearchConfig) -> BM25Corpus: """Load the BM25 corpus into memory once per process. Background refresh tasks clear this cache after rebuilding the on-disk corpus. """ index_path = bm25_index_path(config) active_index_path = get_current_bm25_index(index_path) logger.info("ebook_bm25_corpus_cache_load path=%s active_path=%s", index_path, active_index_path) manifest = read_bm25_manifest(index_path) if manifest is None or not bm25_index_exists(index_path, manifest): msg = f"BM25 corpus is not available: {index_path}" raise BM25CorpusUnavailableError(msg) if manifest.chunk_count == 0: return BM25Corpus(retriever=None, records=(), manifest=manifest) retriever = bm25s.BM25.load(active_index_path, load_corpus=True, mmap=True) records = tuple(dict(record) for record in retriever.corpus) return BM25Corpus(retriever=retriever, records=records, manifest=manifest) def score_bm25_corpus(query: str, corpus: BM25Corpus, *, limit: int) -> list[tuple[dict[str, object], float]]: """Score a query against a loaded BM25 corpus.""" if corpus.retriever is None or not corpus.records: return [] k = min(limit, len(corpus.records)) documents, scores = corpus.retriever.retrieve( bm25s.tokenize(query, show_progress=False), corpus=list(corpus.records), k=k, show_progress=False, ) results: list[tuple[dict[str, object], float]] = [] for document, score in zip(documents[0], scores[0], strict=True): score_value = float(score) if score_value <= 0: continue results.append((dict(document), score_value)) return results def fetch_bm25_corpus_records(session: Session) -> tuple[list[dict[str, object]], list[str]]: """Fetch persistable BM25 corpus records and their matching index texts from the database. search_text is only needed to build the index, so it is returned separately instead of being persisted into the corpus records, which would double the corpus size. """ statement = ( select( EbookChunk.id.label("chunk_id"), EbookChunk.text.label("text"), EbookSource.title.label("source_title"), EbookSource.author.label("source_author"), EbookChapter.title.label("chapter_title"), EbookChunk.page_label.label("page_label"), EbookChunk.search_text.label("bm25_text"), ) .select_from(EbookChunk) .join(EbookSource, EbookSource.id == EbookChunk.source_id) .outerjoin(EbookChapter, EbookChapter.id == EbookChunk.chapter_id) .order_by(EbookChunk.id) ) records: list[dict[str, object]] = [] texts: list[str] = [] for row in session.execute(statement).mappings(): record = dict(row) texts.append(str(record.pop("bm25_text"))) records.append(record) return records, texts def corpus_last_updated_at(session: Session) -> datetime | None: """Return the latest source/chapter/chunk update timestamp relevant to BM25 text.""" update_times = union_all( select(func.max(EbookSource.updated).label("updated")), select(func.max(EbookChapter.updated).label("updated")), select(func.max(EbookChunk.updated).label("updated")), ).subquery() return session.scalar(select(func.max(update_times.c.updated))) def write_bm25_corpus( index_path: Path, records: list[dict[str, object]], texts: list[str], manifest: BM25Manifest, ) -> None: """Write a BM25 corpus generation and publish it through the current symlink.""" index_path.mkdir(parents=True, exist_ok=True) generations_path = index_path / "generations" generations_path.mkdir(exist_ok=True) generation_path = next_bm25_generation_path(generations_path, manifest.created_at) current_path = index_path / "current" next_current_path = index_path / f".current.{generation_path.name}.tmp" try: generation_path.mkdir() # Empty corpora publish a manifest-only generation so startup succeeds before any chunks exist. if records: retriever = bm25s.BM25() retriever.index(bm25s.tokenize(texts, show_progress=False), show_progress=False) retriever.save(generation_path, corpus=records, show_progress=False) write_bm25_manifest(generation_path, manifest) next_current_path.unlink(missing_ok=True) next_current_path.symlink_to(generation_path, target_is_directory=True) next_current_path.replace(current_path) except Exception: next_current_path.unlink(missing_ok=True) shutil.rmtree(generation_path, ignore_errors=True) raise def read_bm25_manifest(index_path: Path) -> BM25Manifest | None: """Read the BM25 manifest if it exists and is valid.""" manifest_path = get_current_bm25_index(index_path) / MANIFEST_NAME if not manifest_path.exists(): return None body = json.loads(manifest_path.read_text(encoding="utf-8")) return BM25Manifest( created_at=datetime.fromisoformat(str(body["created_at"])), db_updated_at=datetime.fromisoformat(str(body["db_updated_at"])) if body.get("db_updated_at") else None, chunk_count=int(body["chunk_count"]), ) def write_bm25_manifest(index_path: Path, manifest: BM25Manifest) -> None: """Write the BM25 manifest to an index directory.""" body = { "created_at": manifest.created_at.isoformat(), "db_updated_at": manifest.db_updated_at.isoformat() if manifest.db_updated_at else None, "chunk_count": manifest.chunk_count, } (index_path / MANIFEST_NAME).write_text(json.dumps(body, indent=2, sort_keys=True), encoding="utf-8") def bm25_index_exists(index_path: Path, manifest: BM25Manifest | None) -> bool: """Return whether a usable persisted BM25 index exists.""" active_index_path = get_current_bm25_index(index_path) if manifest is None or not active_index_path.is_dir(): return False if manifest.chunk_count == 0: return True return all((active_index_path / file_name).exists() for file_name in REQUIRED_INDEX_FILES) def next_bm25_generation_path(generations_path: Path, created_at: datetime) -> Path: """Return an unused dated BM25 generation path.""" base_name = created_at.astimezone(UTC).strftime("%Y%m%dT%H%M%S.%fZ") generation_path = generations_path / base_name suffix = 1 while generation_path.exists(): generation_path = generations_path / f"{base_name}.{suffix}" suffix += 1 return generation_path