"""Tests for EPUB search core helpers.""" from __future__ import annotations import logging from datetime import UTC, datetime from os import environ from pathlib import Path from types import ModuleType from typing import TYPE_CHECKING import pytest from sqlalchemy import create_engine, select from sqlalchemy.orm import sessionmaker from python.ebook_search.answer import answer_query from python.ebook_search.bm25_corpus import ( BM25Corpus, BM25CorpusUnavailableError, BM25Manifest, ensure_bm25_corpus, fetch_bm25_corpus_records, load_bm25_corpus, read_bm25_manifest, score_bm25_corpus, write_bm25_corpus, ) from python.ebook_search.config import EbookSearchConfig, RerankConfig, load_config, normalize_embedding_model from python.ebook_search.embeddings import MODEL_DIMENSIONS, ensure_embedding_models from python.ebook_search.ingest import chunk_text, find_existing_source from python.ebook_search.search import ( SearchResponse, SearchResult, bm25_candidates, reciprocal_rank_fusion, retrieval_query_from_text, ) from python.ebook_search.timing import RuntimeStep from python.orm.richie import ( EbookChapter, EbookChunk, EbookChunkEmbedding1024, EbookEmbeddingModel, EbookSource, RichieBase, ) if TYPE_CHECKING: from pytest_mock import MockerFixture def test_chunk_text_uses_overlap() -> None: chunks = chunk_text(" ".join(str(index) for index in range(100)), chunk_tokens=20, overlap_tokens=5) assert len(chunks) > 1 assert chunks[0].token_start == 0 assert chunks[1].token_start == 15 assert all(chunk.token_count <= 20 for chunk in chunks) def test_reciprocal_rank_fusion_combines_vector_and_bm25_rankings() -> None: vector_results = [ SearchResult(chunk_id=1, text="a", source_title="A", score=0.9, vector_score=0.9), SearchResult(chunk_id=2, text="b", source_title="B", score=0.8, vector_score=0.8), ] lexical_results = [ SearchResult(chunk_id=2, text="b", source_title="B", score=4.2, bm25_score=4.2), SearchResult(chunk_id=3, text="c", source_title="C", score=2.1, bm25_score=2.1), ] fused = reciprocal_rank_fusion(vector_results, lexical_results, rank_constant=60) assert [result.chunk_id for result in fused] == [2, 1, 3] assert fused[0].rank_source == "Hybrid" assert fused[0].vector_score == 0.8 assert fused[0].bm25_score == 4.2 assert fused[0].fused_score == fused[0].score def test_find_existing_source_matches_path_or_hash() -> None: engine = create_engine("sqlite+pysqlite:///:memory:", future=True) RichieBase.metadata.create_all(engine) with sessionmaker(bind=engine, expire_on_commit=False, future=True)() as session: source = EbookSource( title="Book", author=None, language=None, publisher=None, identifier=None, file_path="/old/book.epub", file_sha256="a" * 64, file_mtime=datetime.now(tz=UTC), file_size=10, ) session.add(source) session.commit() assert find_existing_source(session, Path("/old/book.epub"), "b" * 64) == source assert find_existing_source(session, Path("/new/book.epub"), "a" * 64) == source def test_bm25_corpus_uses_existing_search_text_without_duplicate_metadata() -> None: engine = create_engine("sqlite+pysqlite:///:memory:", future=True) RichieBase.metadata.create_all(engine) with sessionmaker(bind=engine, expire_on_commit=False, future=True)() as session: source = EbookSource( title="Book", author="Author", language=None, publisher=None, identifier=None, file_path="/book.epub", file_sha256="a" * 64, file_mtime=datetime.now(tz=UTC), file_size=10, ) session.add(source) session.flush() chapter = EbookChapter(source_id=source.id, spine_index=0, title="Chapter", href=None) session.add(chapter) session.flush() session.add( EbookChunk( id=1, source_id=source.id, chapter_id=chapter.id, chunk_index=0, text="content", token_start=0, token_count=1, page_label=None, content_sha256="b" * 64, search_text="Book Author Chapter content", ) ) session.commit() records, texts = fetch_bm25_corpus_records(session) assert texts == ["Book Author Chapter content"] assert records[0]["chunk_id"] == 1 assert "bm25_text" not in records[0] def test_reciprocal_rank_fusion_marks_hybrid_source() -> None: vector_results = [SearchResult(chunk_id=1, text="a", source_title="A")] lexical_results = [SearchResult(chunk_id=2, text="b", source_title="B")] fused = reciprocal_rank_fusion(vector_results, lexical_results, rank_constant=60) assert {result.rank_source for result in fused} == {"Hybrid"} def test_search_response_sums_runtime_steps() -> None: response = SearchResponse( query="query", results=[], rank_label="Hybrid", timings=( RuntimeStep(name="A", duration_ms=1.25), RuntimeStep(name="B", duration_ms=2.75), RuntimeStep(name="Parallel detail", duration_ms=10.0, counts_toward_total=False), ), ) assert response.total_runtime_ms == 4.0 def test_retrieval_query_keeps_entity_and_series_terms() -> None: assert retrieval_query_from_text("what does Damien Montgomery stand for in starship mage") == ( "damien montgomery stand starship mage" ) def test_bm25_candidates_scores_whole_corpus(mocker: MockerFixture) -> None: record = { "chunk_id": 2, "text": "high", "source_title": "B", "source_author": None, "chapter_title": None, "page_label": None, "bm25_text": "high", } manifest = BM25Manifest(created_at=datetime.now(tz=UTC), db_updated_at=None, chunk_count=1) corpus = BM25Corpus(retriever=object(), records=(record,), manifest=manifest) captured: dict[str, object] = {} def fake_score_bm25_corpus(query, saved_corpus, *, limit): captured["query"] = query captured["corpus"] = saved_corpus captured["limit"] = limit return [(record, 1.5)] mocker.patch("python.ebook_search.search.load_bm25_corpus", side_effect=lambda _config: corpus) mocker.patch("python.ebook_search.search.score_bm25_corpus", side_effect=fake_score_bm25_corpus) config = EbookSearchConfig(rerank=RerankConfig(enabled=False)) results = bm25_candidates("high", config) assert captured["query"] == "high" assert captured["corpus"] == corpus assert captured["limit"] == 120 assert [result.chunk_id for result in results] == [2] assert [result.bm25_score for result in results] == [1.5] def test_bm25_candidates_returns_empty_when_corpus_is_unavailable(mocker: MockerFixture, caplog) -> None: def fake_load_bm25_corpus(_config): raise BM25CorpusUnavailableError mocker.patch("python.ebook_search.search.load_bm25_corpus", side_effect=fake_load_bm25_corpus) config = EbookSearchConfig(rerank=RerankConfig(enabled=False)) with caplog.at_level(logging.WARNING): results = bm25_candidates("high", config) assert results == [] assert "ebook_bm25_index_unavailable_skipping" in caplog.text def test_write_bm25_corpus_publishes_dated_generation(tmp_path) -> None: index_path = tmp_path / "bm25" index_path.mkdir() generations_path = index_path / "generations" generations_path.mkdir() old_generation = generations_path / "20260101T000000.000000Z" old_generation.mkdir() (old_generation / "sentinel").write_text("old", encoding="utf-8") (index_path / "current").symlink_to(Path("generations") / old_generation.name, target_is_directory=True) manifest = BM25Manifest( created_at=datetime(2026, 6, 12, 1, 2, 3, 456789, tzinfo=UTC), db_updated_at=None, chunk_count=0, ) write_bm25_corpus(index_path, [], [], manifest) current_path = index_path / "current" assert current_path.is_symlink() assert current_path.readlink() == generations_path / "20260612T010203.456789Z" assert old_generation.is_dir() assert (old_generation / "sentinel").read_text(encoding="utf-8") == "old" assert (generations_path / "20260612T010203.456789Z").is_dir() assert read_bm25_manifest(index_path) == manifest def test_write_bm25_corpus_keeps_current_generation_when_publish_fails(mocker: MockerFixture, tmp_path) -> None: index_path = tmp_path / "bm25" index_path.mkdir() generations_path = index_path / "generations" generations_path.mkdir() old_generation = generations_path / "20260101T000000.000000Z" old_generation.mkdir() (old_generation / "sentinel").write_text("old", encoding="utf-8") current_path = index_path / "current" current_path.symlink_to(Path("generations") / old_generation.name, target_is_directory=True) original_replace = Path.replace def fail_current_replace(self, target): if self.parent == index_path and self.name.startswith(".current.") and target == current_path: msg = "current publish failed" raise OSError(msg) return original_replace(self, target) mocker.patch.object(Path, "replace", fail_current_replace) manifest = BM25Manifest( created_at=datetime(2026, 6, 12, 1, 2, 3, 456789, tzinfo=UTC), db_updated_at=None, chunk_count=0, ) with pytest.raises(OSError, match="current publish failed"): write_bm25_corpus(index_path, [], [], manifest) assert current_path.readlink() == Path("generations") / old_generation.name assert (old_generation / "sentinel").read_text(encoding="utf-8") == "old" assert not (generations_path / "20260612T010203.456789Z").exists() def test_load_bm25_corpus_uses_current_generation(tmp_path) -> None: load_bm25_corpus.cache_clear() index_path = tmp_path / "bm25" manifest = BM25Manifest( created_at=datetime(2026, 6, 12, 1, 2, 3, 456789, tzinfo=UTC), db_updated_at=None, chunk_count=1, ) record = { "chunk_id": 2, "text": "cached", "source_title": "B", "source_author": None, "chapter_title": None, "page_label": None, } write_bm25_corpus(index_path, [record], ["cached phrase"], manifest) config = EbookSearchConfig(rerank=RerankConfig(enabled=False), bm25_index_dir=str(index_path)) try: corpus = load_bm25_corpus(config) finally: load_bm25_corpus.cache_clear() assert corpus.manifest == manifest assert corpus.records[0]["chunk_id"] == 2 assert score_bm25_corpus("cached", corpus, limit=10) def test_load_bm25_corpus_caches_disk_load(mocker: MockerFixture, tmp_path) -> None: load_bm25_corpus.cache_clear() manifest = BM25Manifest(created_at=datetime.now(tz=UTC), db_updated_at=None, chunk_count=1) record = { "chunk_id": 2, "text": "cached", "source_title": "B", "source_author": None, "chapter_title": None, "page_label": None, "bm25_text": "cached", } load_count = 0 class FakeRetriever: """Fake persisted BM25 retriever.""" corpus = (record,) class FakeBM25: """Fake BM25 class with observable load count.""" @staticmethod def load(index_path, *, load_corpus, mmap): nonlocal load_count load_count += 1 assert index_path == tmp_path assert load_corpus is True assert mmap is True return FakeRetriever() fake_bm25s = ModuleType("bm25s") fake_bm25s.BM25 = FakeBM25 mocker.patch("python.ebook_search.bm25_corpus.read_bm25_manifest", side_effect=lambda _path: manifest) mocker.patch("python.ebook_search.bm25_corpus.bm25_index_exists", side_effect=lambda _path, _manifest: True) mocker.patch("python.ebook_search.bm25_corpus.bm25s", fake_bm25s) config = EbookSearchConfig(rerank=RerankConfig(enabled=False), bm25_index_dir=str(tmp_path)) try: first = load_bm25_corpus(config) second = load_bm25_corpus(config) finally: load_bm25_corpus.cache_clear() assert first is second assert first is not None assert first.records == (record,) assert load_count == 1 def test_load_bm25_corpus_raises_when_index_is_missing(mocker: MockerFixture, tmp_path) -> None: load_bm25_corpus.cache_clear() mocker.patch("python.ebook_search.bm25_corpus.read_bm25_manifest", side_effect=lambda _path: None) mocker.patch("python.ebook_search.bm25_corpus.bm25_index_exists", side_effect=lambda _path, _manifest: False) config = EbookSearchConfig(rerank=RerankConfig(enabled=False), bm25_index_dir=str(tmp_path)) try: with pytest.raises(BM25CorpusUnavailableError, match="BM25 corpus is not available"): load_bm25_corpus(config) finally: load_bm25_corpus.cache_clear() def test_ensure_bm25_corpus_refreshes_missing_index(mocker: MockerFixture) -> None: refreshed: list[object] = [] db_updated_at = datetime.now(tz=UTC) mocker.patch("python.ebook_search.bm25_corpus.read_bm25_manifest", side_effect=lambda _path: None) mocker.patch("python.ebook_search.bm25_corpus.bm25_index_exists", side_effect=lambda _path, _manifest: False) mocker.patch("python.ebook_search.bm25_corpus.corpus_last_updated_at", side_effect=lambda _session: db_updated_at) mocker.patch( "python.ebook_search.bm25_corpus.refresh_bm25_corpus", side_effect=lambda session, config, *, db_updated_at: refreshed.append((session, config, db_updated_at)), ) config = EbookSearchConfig(rerank=RerankConfig(enabled=False)) session = object() ensure_bm25_corpus(session, config) assert refreshed == [(session, config, db_updated_at)] def test_ensure_bm25_corpus_refreshes_stale_index(mocker: MockerFixture) -> None: refreshed: list[object] = [] created_at = datetime(2026, 1, 1, tzinfo=UTC) db_updated_at = datetime(2026, 1, 2, tzinfo=UTC) manifest = BM25Manifest(created_at=created_at, db_updated_at=created_at, chunk_count=10) mocker.patch("python.ebook_search.bm25_corpus.read_bm25_manifest", side_effect=lambda _path: manifest) mocker.patch("python.ebook_search.bm25_corpus.bm25_index_exists", side_effect=lambda _path, _manifest: True) mocker.patch("python.ebook_search.bm25_corpus.corpus_last_updated_at", side_effect=lambda _session: db_updated_at) mocker.patch( "python.ebook_search.bm25_corpus.refresh_bm25_corpus", side_effect=lambda session, config, *, db_updated_at: refreshed.append((session, config, db_updated_at)), ) config = EbookSearchConfig(rerank=RerankConfig(enabled=False)) session = object() ensure_bm25_corpus(session, config) assert refreshed == [(session, config, db_updated_at)] def test_supported_embedding_models_match_service_names() -> None: assert MODEL_DIMENSIONS == { "qwen3-embedding-0.6b": 1024, "qwen3-embedding-4b": 2560, "qwen3-embedding-8b": 4096, } def test_ensure_embedding_models_registers_service_names() -> None: engine = create_engine("sqlite+pysqlite:///:memory:", future=True) RichieBase.metadata.create_all(engine) with sessionmaker(bind=engine, expire_on_commit=False, future=True)() as session: ensure_embedding_models(session) session.commit() models = list(session.scalars(select(EbookEmbeddingModel).order_by(EbookEmbeddingModel.name))) assert [(model.name, model.dimension) for model in models] == [ ("qwen3-embedding-0.6b", 1024), ("qwen3-embedding-4b", 2560), ("qwen3-embedding-8b", 4096), ] def test_1024_embedding_table_has_cosine_hnsw_index() -> None: indexes = {index.name: index for index in EbookChunkEmbedding1024.__table__.indexes} index = indexes["ix_ebook_chunk_embedding_1024_embedding_cosine"] assert [column.name for column in index.columns] == ["embedding"] assert index.dialect_options["postgresql"]["using"] == "hnsw" assert index.dialect_options["postgresql"]["ops"] == {"embedding": "vector_cosine_ops"} def test_embedding_model_aliases_normalize_to_provider_names(mocker: MockerFixture) -> None: mocker.patch.dict(environ, {}, clear=False) assert normalize_embedding_model() == "qwen3-embedding-0.6b" environ["EBOOK_SEARCH_EMBEDDING_MODEL"] = "qwen3-embedding-0.6b" assert normalize_embedding_model() == "qwen3-embedding-0.6b" environ["EBOOK_SEARCH_EMBEDDING_MODEL"] = "Qwen3-Embedding-0.6B" assert normalize_embedding_model() == "qwen3-embedding-0.6b" environ["EBOOK_SEARCH_EMBEDDING_MODEL"] = "Qwen/Qwen3-Embedding-4B" assert normalize_embedding_model() == "qwen3-embedding-4b" environ["EBOOK_SEARCH_EMBEDDING_MODEL"] = "qwen3-embedding:8b" assert normalize_embedding_model() == "qwen3-embedding-8b" environ["EBOOK_SEARCH_EMBEDDING_MODEL"] = "qwen3-embedding-8b" assert normalize_embedding_model() == "qwen3-embedding-8b" def test_answer_generation_is_enabled_by_default(mocker: MockerFixture) -> None: mocker.patch.dict(environ, {}, clear=False) environ.pop("EBOOK_SEARCH_ANSWER_ENABLED", None) config = load_config() assert config.answer_enabled is True def test_chat_defaults_use_ollama_cloud(mocker: MockerFixture) -> None: mocker.patch.dict(environ, {}, clear=False) environ.pop("EBOOK_SEARCH_VLLM_BASE_URL", None) environ.pop("EBOOK_SEARCH_CHAT_MODEL", None) config = load_config() assert config.vllm_base_url == "https://ollama.com/v1" assert config.chat_model == "deepseek-v4-flash" def test_chat_api_key_falls_back_to_ollama_api_key(mocker: MockerFixture) -> None: mocker.patch.dict(environ, {"OLLAMA_API_KEY": "ollama-key"}, clear=False) environ.pop("EBOOK_SEARCH_VLLM_API_KEY", None) config = load_config() assert config.vllm_api_key == "ollama-key" def test_answer_query_does_not_call_model_when_disabled() -> None: config = load_config().model_copy(update={"answer_enabled": False}) result = SearchResult(chunk_id=1, text="source text", source_title="Book") answer = answer_query("question", [result], config) assert "Answer generation is disabled" in answer