Compare commits

..

41 Commits

Author SHA1 Message Date
Richie fcb69cc68b adding words to spell check
build_systems / build-jeeves (pull_request) Waiting to run
build_systems / build-bob (pull_request) Waiting to run
build_systems / build-brain (pull_request) Waiting to run
build_systems / build-leviathan (pull_request) Waiting to run
build_systems / build-rhapsody-in-green (pull_request) Waiting to run
pytest / pytest (pull_request) Waiting to run
treefmt / nix fmt (pull_request) Waiting to run
2026-06-18 12:55:46 -04:00
Richie 6bc30115d9 fix(ebook-search): code clean up to impove reliablty and readabilty 2026-06-18 12:45:56 -04:00
Richie 6ae1ff1f5c feat(ebook-search): add load-test CLI for the search service
Add a Typer CLI script that drives POST /search on a running server at a
configurable concurrency and reports latency percentiles (p50/p90/p95/p99),
throughput, and HTTP status distribution. Queries are drawn from the shared
eval JSONL set so load testing and evaluation exercise the same questions.
2026-06-18 12:39:55 -04:00
Richie dbc6b5b53b test(ebook-search): organize tests under dedicated package
Move ebook search tests into tests/ebook_search and standardize mocking on pytest-mock.
2026-06-16 21:47:40 -04:00
Richie a9daa60c17 moved TC001 expetop out of pyproject.toml 2026-06-16 18:31:40 -04:00
Richie 241a42b20d updated front end
treefmt / nix fmt (pull_request) Failing after 6s
pytest / pytest (pull_request) Failing after 52s
build_systems / build-brain (pull_request) Successful in 1m11s
build_systems / build-bob (pull_request) Successful in 1m13s
build_systems / build-leviathan (pull_request) Successful in 1m46s
build_systems / build-rhapsody-in-green (pull_request) Successful in 1m48s
build_systems / build-jeeves (pull_request) Successful in 3m46s
2026-06-16 00:00:13 -04:00
Richie 4640ebf8ce converted addmin.py and page.py to DbSession and AppConfig 2026-06-15 22:01:42 -04:00
Richie 0c9583c1cc moved config = load_config() to lifespan 2026-06-15 21:59:51 -04:00
Richie f71ae7d2c6 added guardrails.py to constrain responses and added validation to config.py 2026-06-15 21:57:38 -04:00
Richie 2e68c83021 added health endpoints 2026-06-15 21:21:01 -04:00
Richie b126987b63 added AppConfig and AppEngine 2026-06-15 21:08:55 -04:00
Richie 68b3a38b81 converting to pydantic-settings 2026-06-14 21:29:45 -04:00
Richie a5d7c3be4f fixed fomat issue
treefmt / nix fmt (pull_request) Successful in 5s
pytest / pytest (pull_request) Successful in 28s
build_systems / build-brain (pull_request) Successful in 46s
build_systems / build-bob (pull_request) Successful in 47s
build_systems / build-leviathan (pull_request) Successful in 53s
build_systems / build-rhapsody-in-green (pull_request) Successful in 59s
build_systems / build-jeeves (pull_request) Successful in 2m32s
2026-06-14 15:42:05 -04:00
Richie 2995a75748 fixed test
treefmt / nix fmt (pull_request) Failing after 5s
pytest / pytest (pull_request) Successful in 28s
build_systems / build-bob (pull_request) Successful in 48s
build_systems / build-brain (pull_request) Successful in 53s
build_systems / build-leviathan (pull_request) Successful in 54s
build_systems / build-rhapsody-in-green (pull_request) Successful in 1m1s
build_systems / build-jeeves (pull_request) Successful in 2m43s
2026-06-14 15:41:09 -04:00
Richie dce1838163 opning ports for testing 2026-06-14 15:41:09 -04:00
Richie 121eb979a4 added a index for the VEctor DB 2026-06-14 15:41:09 -04:00
Richie c88315e9b6 improved BM25 write 2026-06-14 15:41:09 -04:00
Richie 50795ab7fc added ZstdMiddleware to ebook_search 2026-06-14 15:41:09 -04:00
Richie e9a80a0308 added vector_engine to fix name postgres name space issue 2026-06-14 15:41:09 -04:00
Richie eb76edb740 reworked ebook_search routers 2026-06-14 15:41:09 -04:00
Richie c53afb3c70 made fastapi tools 2026-06-14 15:41:09 -04:00
Richie 8301db39e5 added proper cache invalidation to load_bm25_corpus 2026-06-14 15:40:31 -04:00
Richie 45a9e90524 updated tests 2026-06-14 15:40:31 -04:00
Richie dd67de3993 improved reranking weights 2026-06-14 15:40:31 -04:00
Richie 6e3635ca01 fixed duplicat enrichment 2026-06-14 15:40:31 -04:00
Richie 11cbe31152 improved queary for vector search 2026-06-14 15:40:31 -04:00
Richie 5544dfa61c add .ebook_search_bm25 to gitignore 2026-06-14 15:40:31 -04:00
Richie 911df63513 updated python 2026-06-14 15:40:31 -04:00
Richie 07eb170b34 setup tests 2026-06-14 15:40:04 -04:00
Richie 6715bbf0a5 build api and frountend 2026-06-14 15:40:04 -04:00
Richie 26ff1f0fd3 added answer.py and config 2026-06-14 15:40:04 -04:00
Richie 666ea97754 added __init__ 2026-06-14 15:40:04 -04:00
Richie e01c687625 made llm_interface.py 2026-06-14 15:40:04 -04:00
Richie 82a367a2b6 added rerank 2026-06-14 15:40:04 -04:00
Richie ad1834c537 built ingest 2026-06-14 15:40:04 -04:00
Richie aed1e14d95 built rag search setup 2026-06-14 15:40:04 -04:00
Richie 0a2d4c08cb set up embedding system 2026-06-14 15:40:04 -04:00
Richie db98bd3559 built BM25 search foundation 2026-06-14 15:40:04 -04:00
Richie cdded5da12 added ebook embedding to orm 2026-06-14 15:40:04 -04:00
Richie d022251a58 removed hedgedoc 2026-06-14 15:40:04 -04:00
Richie e1ef4de6a3 adding embedding Models to jeeves 2026-06-14 15:40:04 -04:00
67 changed files with 4987 additions and 42 deletions
+2 -1
View File
@@ -171,4 +171,5 @@ frontend/dist/
frontend/node_modules/
# data from testing llms
data/*
data/*
.ebook_search_bm25
+1
View File
@@ -242,6 +242,7 @@
"referer",
"REFERERS",
"relatime",
"rerank",
"Rhosts",
"ripgrep",
"roboto",
Generated
+18 -18
View File
@@ -8,11 +8,11 @@
},
"locked": {
"dir": "pkgs/firefox-addons",
"lastModified": 1781928171,
"narHash": "sha256-2IIxdVe7afJ8HyTeR/MD9Qw5iIJ68o1iRYkiXn7LDag=",
"lastModified": 1781150628,
"narHash": "sha256-b4mp8l3qWuSCyYYo9HSngDtcB3PpecYiOXjULrjwwlw=",
"owner": "rycee",
"repo": "nur-expressions",
"rev": "1777920f3688105a47b95b229dcefc85ae1bb42e",
"rev": "753319310f4673a2dabbfab87482187b40bf9bac",
"type": "gitlab"
},
"original": {
@@ -29,11 +29,11 @@
]
},
"locked": {
"lastModified": 1781989573,
"narHash": "sha256-npfH7Zv7t1akX/ArqCNro4zU4ViPlghLaPnbEfHbCxk=",
"lastModified": 1781189114,
"narHash": "sha256-5inaamLgUMWy+MOBE9ChF9QAF1o/74LFuHkI0W/9rqc=",
"owner": "nix-community",
"repo": "home-manager",
"rev": "78e7d8b13ecd7f5256a5c11ce216876164099d9f",
"rev": "486595d2cf49cfcd649b58a284fa11ac0e34da22",
"type": "github"
},
"original": {
@@ -47,11 +47,11 @@
"nixpkgs": "nixpkgs"
},
"locked": {
"lastModified": 1781622756,
"narHash": "sha256-JrPh4M6S7aPsEE9tOENuZrxC6o2szSLlK+t4+nLke9s=",
"lastModified": 1781168557,
"narHash": "sha256-LOnLQ2tpYF9gqIDDr3+j3DbpJJr/QCH6zPRT2GzEUOE=",
"owner": "nixos",
"repo": "nixos-hardware",
"rev": "08018c72174a4df5657f8d94178ac69fb9c243e5",
"rev": "6358ff76821101c178e3ab4919a62799bfe3652e",
"type": "github"
},
"original": {
@@ -76,11 +76,11 @@
},
"nixpkgs-master": {
"locked": {
"lastModified": 1782009160,
"narHash": "sha256-BRto9JC5S8uaapULr/n+tLbZf896K4kbtVZ0PnFra0U=",
"lastModified": 1781229721,
"narHash": "sha256-ORvqDbb/LYxiJljGIejapjkc/kJbVote2N1WSb9W45I=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "a7ef34422d8e3cd1f9ffaec949735adb983d4ffc",
"rev": "173d0ad7a974f8543a9ab01d2271b2e290341b33",
"type": "github"
},
"original": {
@@ -108,11 +108,11 @@
},
"nixpkgs_2": {
"locked": {
"lastModified": 1781577229,
"narHash": "sha256-lrp67w8AulE9Ks53n27I45ADSzbOCn4H+CNW1Ck8B+8=",
"lastModified": 1781074563,
"narHash": "sha256-md8WlXOlfnIeHeOScMTTHFyf2d6iaTwPl2apR5EQ3P4=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "567a49d1913ce81ac6e9582e3553dd90a955875f",
"rev": "9ae611a455b90cf061d8f332b977e387bda8e1ca",
"type": "github"
},
"original": {
@@ -141,11 +141,11 @@
]
},
"locked": {
"lastModified": 1781943681,
"narHash": "sha256-NFHmA7H47adqiyp+0iEOyZOQhmigDqA/NBAlf4imB6U=",
"lastModified": 1780547341,
"narHash": "sha256-Gq8KNx5A7hBB3uGJaj6eQfLDIz5YdLu92gqBcvHvoUo=",
"owner": "Mic92",
"repo": "sops-nix",
"rev": "420f8d2e9882911f65cfac15cc706f639ba96cca",
"rev": "9ed65852b6257fbeae4355bc24ecfea307ca759a",
"type": "github"
},
"original": {
+29 -1
View File
@@ -17,18 +17,45 @@
python-env = final: _prev: {
my_python = final.python314.withPackages (
ps: with ps; [
ps:
let
bm25s = ps.buildPythonPackage rec {
pname = "bm25s";
version = "0.3.9";
pyproject = true;
src = final.fetchPypi {
inherit pname version;
hash = "sha256-iVxnnZUrfeg1XttfPhpiCh4vKU0dQrkZvwghzOLi9Zc=";
};
build-system = [ ps.setuptools ];
dependencies = with ps; [
numpy
scipy
];
pythonImportsCheck = [ "bm25s" ];
};
in
with ps;
[
alembic
apprise
apscheduler
beautifulsoup4
ebooklib
fastapi
fastapi-cli
httpx
mypy
numpy
orjson
pgvector
polars
psycopg
pydantic
pydantic-settings
pyfakefs
pytest
pytest-cov
@@ -38,6 +65,7 @@
ruff
scalene
sqlalchemy
bm25s
tenacity
textual
tiktoken
+1
View File
@@ -18,6 +18,7 @@ dependencies = [
"polars",
"psycopg[binary]",
"pydantic",
"pydantic-settings",
"python-multipart",
"sqlalchemy",
"tenacity",
@@ -0,0 +1,200 @@
"""add ebook search tables.
Revision ID: 2db132cace1a
Revises: b3c60cc5beb5
Create Date: 2026-06-10 22:10:54.379159
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import pgvector
import sqlalchemy as sa
from alembic import op
from python.orm import RichieBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "2db132cace1a"
down_revision: str | None = "b3c60cc5beb5"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = RichieBase.schema_name
def upgrade() -> None:
"""Upgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"ebook_embedding_model",
sa.Column("name", sa.String(), nullable=False),
sa.Column("dimension", sa.Integer(), nullable=False),
sa.Column("is_default", sa.Boolean(), nullable=False),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("pk_ebook_embedding_model")),
sa.UniqueConstraint("name", name=op.f("uq_ebook_embedding_model_name")),
schema=schema,
)
op.create_table(
"ebook_source",
sa.Column("title", sa.String(), nullable=False),
sa.Column("author", sa.String(), nullable=True),
sa.Column("language", sa.String(), nullable=True),
sa.Column("publisher", sa.String(), nullable=True),
sa.Column("identifier", sa.String(), nullable=True),
sa.Column("file_path", sa.String(), nullable=False),
sa.Column("file_sha256", sa.String(length=64), nullable=False),
sa.Column("file_mtime", sa.DateTime(timezone=True), nullable=False),
sa.Column("file_size", sa.BigInteger(), nullable=False),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("pk_ebook_source")),
sa.UniqueConstraint("file_path", name=op.f("uq_ebook_source_file_path")),
sa.UniqueConstraint("file_sha256", name=op.f("uq_ebook_source_file_sha256")),
schema=schema,
)
op.create_table(
"ebook_chapter",
sa.Column("source_id", sa.Integer(), nullable=False),
sa.Column("spine_index", sa.Integer(), nullable=False),
sa.Column("title", sa.String(), nullable=True),
sa.Column("href", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.ForeignKeyConstraint(
["source_id"],
[f"{schema}.ebook_source.id"],
name=op.f("fk_ebook_chapter_source_id_ebook_source"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_ebook_chapter")),
sa.UniqueConstraint("source_id", "spine_index", name=op.f("uq_ebook_chapter_source_id")),
schema=schema,
)
op.create_table(
"ebook_chunk",
sa.Column("source_id", sa.Integer(), nullable=False),
sa.Column("chapter_id", sa.Integer(), nullable=True),
sa.Column("chunk_index", sa.Integer(), nullable=False),
sa.Column("text", sa.String(), nullable=False),
sa.Column("token_start", sa.Integer(), nullable=False),
sa.Column("token_count", sa.Integer(), nullable=False),
sa.Column("page_label", sa.String(), nullable=True),
sa.Column("content_sha256", sa.String(length=64), nullable=False),
sa.Column("search_text", sa.String(), nullable=False),
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.ForeignKeyConstraint(
["chapter_id"],
[f"{schema}.ebook_chapter.id"],
name=op.f("fk_ebook_chunk_chapter_id_ebook_chapter"),
ondelete="SET NULL",
),
sa.ForeignKeyConstraint(
["source_id"],
[f"{schema}.ebook_source.id"],
name=op.f("fk_ebook_chunk_source_id_ebook_source"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_ebook_chunk")),
sa.UniqueConstraint("source_id", "chunk_index", name="uq_ebook_chunk_source_id_chunk_index"),
sa.UniqueConstraint("source_id", "content_sha256", name="uq_ebook_chunk_source_id_content_sha256"),
schema=schema,
)
op.create_table(
"ebook_chunk_embedding_1024",
sa.Column("chunk_id", sa.BigInteger(), nullable=False),
sa.Column("model_id", sa.Integer(), nullable=False),
sa.Column("embedding", pgvector.sqlalchemy.vector.VECTOR(dim=1024), nullable=False),
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.ForeignKeyConstraint(
["chunk_id"],
[f"{schema}.ebook_chunk.id"],
name=op.f("fk_ebook_chunk_embedding_1024_chunk_id_ebook_chunk"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["model_id"],
[f"{schema}.ebook_embedding_model.id"],
name=op.f("fk_ebook_chunk_embedding_1024_model_id_ebook_embedding_model"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_ebook_chunk_embedding_1024")),
sa.UniqueConstraint("chunk_id", "model_id", name=op.f("uq_ebook_chunk_embedding_1024_chunk_id")),
schema=schema,
)
op.create_table(
"ebook_chunk_embedding_2560",
sa.Column("chunk_id", sa.BigInteger(), nullable=False),
sa.Column("model_id", sa.Integer(), nullable=False),
sa.Column("embedding", pgvector.sqlalchemy.vector.VECTOR(dim=2560), nullable=False),
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.ForeignKeyConstraint(
["chunk_id"],
[f"{schema}.ebook_chunk.id"],
name=op.f("fk_ebook_chunk_embedding_2560_chunk_id_ebook_chunk"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["model_id"],
[f"{schema}.ebook_embedding_model.id"],
name=op.f("fk_ebook_chunk_embedding_2560_model_id_ebook_embedding_model"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_ebook_chunk_embedding_2560")),
sa.UniqueConstraint("chunk_id", "model_id", name=op.f("uq_ebook_chunk_embedding_2560_chunk_id")),
schema=schema,
)
op.create_table(
"ebook_chunk_embedding_4096",
sa.Column("chunk_id", sa.BigInteger(), nullable=False),
sa.Column("model_id", sa.Integer(), nullable=False),
sa.Column("embedding", pgvector.sqlalchemy.vector.VECTOR(dim=4096), nullable=False),
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.ForeignKeyConstraint(
["chunk_id"],
[f"{schema}.ebook_chunk.id"],
name=op.f("fk_ebook_chunk_embedding_4096_chunk_id_ebook_chunk"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["model_id"],
[f"{schema}.ebook_embedding_model.id"],
name=op.f("fk_ebook_chunk_embedding_4096_model_id_ebook_embedding_model"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_ebook_chunk_embedding_4096")),
sa.UniqueConstraint("chunk_id", "model_id", name=op.f("uq_ebook_chunk_embedding_4096_chunk_id")),
schema=schema,
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("ebook_chunk_embedding_4096", schema=schema)
op.drop_table("ebook_chunk_embedding_2560", schema=schema)
op.drop_table("ebook_chunk_embedding_1024", schema=schema)
op.drop_table("ebook_chunk", schema=schema)
op.drop_table("ebook_chapter", schema=schema)
op.drop_table("ebook_source", schema=schema)
op.drop_table("ebook_embedding_model", schema=schema)
# ### end Alembic commands ###
@@ -0,0 +1,54 @@
"""add 1024 ebook embedding cosine index.
Revision ID: c460105682d2
Revises: 2db132cace1a
Create Date: 2026-06-13 19:53:45.680289
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from alembic import op
from python.orm import RichieBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "c460105682d2"
down_revision: str | None = "2db132cace1a"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = RichieBase.schema_name
def upgrade() -> None:
"""Upgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_index(
"ix_ebook_chunk_embedding_1024_embedding_cosine",
"ebook_chunk_embedding_1024",
["embedding"],
unique=False,
schema=schema,
postgresql_using="hnsw",
postgresql_ops={"embedding": "vector_cosine_ops"},
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(
"ix_ebook_chunk_embedding_1024_embedding_cosine",
table_name="ebook_chunk_embedding_1024",
schema=schema,
postgresql_using="hnsw",
postgresql_ops={"embedding": "vector_cosine_ops"},
)
# ### end Alembic commands ###
+1 -1
View File
@@ -10,9 +10,9 @@ import typer
import uvicorn
from fastapi import FastAPI
from python.api.middleware import ZstdMiddleware
from python.api.routers import contact_router, views_router
from python.common import configure_logger
from python.fastapi_tools import ZstdMiddleware
from python.orm.common import get_postgres_engine
if TYPE_CHECKING:
+1 -1
View File
@@ -9,7 +9,7 @@ from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from python.api.dependencies import DbSession # noqa: TC001 this is a FastAPI needed at runtime
from python.fastapi_tools.db import DbSession # noqa: TC001 this is a FastAPI needed at runtime
from python.orm.richie.contact import Contact, ContactRelationship, Need, RelationshipType
TEMPLATES_DIR = Path(__file__).parent.parent / "templates"
+1 -1
View File
@@ -9,7 +9,7 @@ from fastapi.templating import Jinja2Templates
from sqlalchemy import select
from sqlalchemy.orm import Session, selectinload
from python.api.dependencies import DbSession # noqa: TC001 this is a FastAPI needed at runtime
from python.fastapi_tools.db import DbSession # noqa: TC001 this is a FastAPI needed at runtime
from python.orm.richie.contact import Contact, ContactRelationship, Need, RelationshipType
TEMPLATES_DIR = Path(__file__).parent.parent / "templates"
+1
View File
@@ -0,0 +1 @@
"""EPUB search package."""
+57
View File
@@ -0,0 +1,57 @@
"""Grounded answer generation."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from python.ebook_search.llm_interface import request_chat_completion
if TYPE_CHECKING:
from python.ebook_search.config import EbookSearchConfig
from python.ebook_search.search import SearchResult
logger = logging.getLogger(__name__)
def answer_query(query: str, results: list[SearchResult], config: EbookSearchConfig) -> str:
"""Answer a question using only retrieved chunks."""
if not config.answer_enabled:
logger.info("ebook_answer_skipped_disabled")
return "Answer generation is disabled. Source chunks are shown below."
if not results:
logger.info("ebook_answer_skipped_no_results")
return "No relevant sources were found."
logger.info(
"ebook_answer_request_start base_url=%s model=%s sources=%s query_length=%s",
config.vllm_base_url,
config.chat_model,
len(results),
len(query),
)
context = "\n\n".join(
f"[{index}] {result.source_title}{' - ' + result.chapter_title if result.chapter_title else ''}\n{result.text}"
for index, result in enumerate(results, start=1)
)
content = request_chat_completion(
config,
[
{
"role": "system",
"content": (
"Answer only from the provided context. Cite sources with bracketed numbers like [1]. "
"If the context is insufficient, say so."
),
},
{"role": "user", "content": f"Question:\n{query}\n\nContext:\n{context}"},
],
)
logger.info(
"ebook_answer_request_complete model=%s answer_length=%s",
config.chat_model,
len(content),
)
return content or "The model returned an empty answer."
+1
View File
@@ -0,0 +1 @@
"""Web and external API adapters for EPUB search."""
+60
View File
@@ -0,0 +1,60 @@
"""Background BM25 refresh tasks for the web app."""
from __future__ import annotations
import logging
from threading import Timer
from typing import TYPE_CHECKING
from sqlalchemy.orm import Session
from python.ebook_search.bm25_corpus import load_bm25_corpus, refresh_bm25_corpus
if TYPE_CHECKING:
from fastapi import FastAPI
from sqlalchemy.engine import Engine
from python.ebook_search.config import EbookSearchConfig
logger = logging.getLogger(__name__)
def schedule_bm25_refresh(app: FastAPI) -> None:
"""Schedule a delayed BM25 corpus refresh, replacing any pending refresh."""
existing_timer = getattr(app.state, "bm25_refresh_timer", None)
if existing_timer is not None:
existing_timer.cancel()
timer = Timer(app.state.config.bm25_refresh_delay_seconds, refresh_bm25_for_app, args=(app,))
timer.daemon = True
timer.start()
app.state.bm25_refresh_timer = timer
logger.info(
"ebook_bm25_refresh_scheduled delay_seconds=%s",
app.state.config.bm25_refresh_delay_seconds,
)
def cancel_bm25_refresh(app: FastAPI) -> None:
"""Cancel any pending BM25 corpus refresh."""
existing_timer = getattr(app.state, "bm25_refresh_timer", None)
if existing_timer is not None:
existing_timer.cancel()
app.state.bm25_refresh_timer = None
logger.info("ebook_bm25_refresh_cancelled")
def refresh_bm25_for_app(app: FastAPI) -> None:
"""Refresh the BM25 corpus using the app engine and config."""
try:
refresh_bm25_for_engine(app.state.engine, app.state.config)
except Exception:
logger.exception("ebook_bm25_refresh_failed")
def refresh_bm25_for_engine(engine: Engine, config: EbookSearchConfig) -> None:
"""Refresh the BM25 corpus using a SQLAlchemy engine."""
with Session(engine) as session:
refresh_bm25_corpus(session, config)
load_bm25_corpus.cache_clear()
logger.info("ebook_bm25_corpus_cache_cleared_after_refresh")
+24
View File
@@ -0,0 +1,24 @@
"""FastAPI dependencies for the EPUB search app."""
from __future__ import annotations
from typing import Annotated
from fastapi import Depends, Request
from sqlalchemy.engine import Engine
from python.ebook_search.config import EbookSearchConfig
def get_config(request: Request) -> EbookSearchConfig:
"""Get the loaded search config from app state."""
return request.app.state.config
def get_engine(request: Request) -> Engine:
"""Get the database engine from app state."""
return request.app.state.engine
AppConfig = Annotated[EbookSearchConfig, Depends(get_config)]
AppEngine = Annotated[Engine, Depends(get_engine)]
+86
View File
@@ -0,0 +1,86 @@
"""FastAPI HTMX app for EPUB search."""
from __future__ import annotations
import logging
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, Annotated
import typer
import uvicorn
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from sqlalchemy.orm import Session
from python.common import configure_logger
from python.ebook_search.api.bm25_tasks import cancel_bm25_refresh
from python.ebook_search.api.routes import admin_router, health_router, page_router, search_router
from python.ebook_search.api.web import STATIC_DIR
from python.ebook_search.bm25_corpus import ensure_bm25_corpus
from python.ebook_search.config import load_config
from python.fastapi_tools import ZstdMiddleware
from python.orm.common import get_postgres_engine
if TYPE_CHECKING:
from collections.abc import AsyncIterator
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
"""Manage application startup and shutdown resources."""
logger.info("ebook_search_startup")
config = load_config()
app.state.config = config
logger.info(
"ebook_search_config_loaded top_k=%s embedding_model=%s embedding_base_url=%s vllm_base_url=%s "
"rerank_enabled=%s answer_enabled=%s library_paths=%s",
config.top_k,
config.embedding_model,
config.embedding_base_url,
config.vllm_base_url,
config.rerank.enabled,
config.answer_enabled,
len(config.library_paths),
)
if not config.library_paths:
logger.warning("ebook_search_no_library_paths_configured")
app.state.engine = get_postgres_engine(name="RICHIE", vector_engine=True)
with Session(app.state.engine) as session:
ensure_bm25_corpus(session, config)
try:
yield
finally:
logger.info("ebook_search_shutdown")
cancel_bm25_refresh(app)
app.state.engine.dispose()
def create_app() -> FastAPI:
"""Create the EPUB search web app."""
app = FastAPI(title="EPUB Search", lifespan=lifespan)
app.add_middleware(ZstdMiddleware)
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
app.include_router(admin_router)
app.include_router(health_router)
app.include_router(page_router)
app.include_router(search_router)
return app
def serve(
host: Annotated[str, typer.Option("--host", "-h", help="Host to bind to")] = "127.0.0.1",
port: Annotated[int, typer.Option("--port", "-p", help="Port to bind to")] = 8070,
log_level: Annotated[str, typer.Option("--log-level", "-l", help="Log level")] = "INFO",
) -> None:
"""Start the EPUB search server."""
configure_logger(log_level)
uvicorn.run(create_app(), host=host, port=port)
if __name__ == "__main__":
typer.run(serve)
@@ -0,0 +1,13 @@
"""EPUB search web route modules."""
from python.ebook_search.api.routes.admin import router as admin_router
from python.ebook_search.api.routes.health import router as health_router
from python.ebook_search.api.routes.page import router as page_router
from python.ebook_search.api.routes.search import router as search_router
__all__ = [
"admin_router",
"health_router",
"page_router",
"search_router",
]
+103
View File
@@ -0,0 +1,103 @@
"""Admin routes for the EPUB search web UI."""
from __future__ import annotations
import logging
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse
from python.ebook_search.api.bm25_tasks import schedule_bm25_refresh
from python.ebook_search.api.dependencies import (
AppConfig, # noqa: TC001 FastAPI resolves this annotated dependency at runtime
)
from python.ebook_search.api.web import templates
from python.ebook_search.embeddings import embed_missing_chunks, embedding_model_stats
from python.ebook_search.ingest import ingest_configured_paths
from python.fastapi_tools import DbSession # noqa: TC001 FastAPI resolves this annotated dependency at runtime
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/admin")
@router.get("", response_class=HTMLResponse)
def admin(request: Request, config: AppConfig, session: DbSession) -> HTMLResponse:
"""Render the admin page."""
stats = embedding_model_stats(session)
logger.info("ebook_admin_page_loaded models=%s", len(stats))
return templates.TemplateResponse(request, "admin.html", {"config": config, "stats": stats})
@router.post("/scan", response_class=HTMLResponse)
def scan_library(request: Request, config: AppConfig, session: DbSession) -> HTMLResponse:
"""Scan configured library paths for EPUB changes."""
try:
count = ingest_configured_paths(session, config)
session.commit()
except Exception as error:
logger.exception("ebook_admin_scan_failed")
return templates.TemplateResponse(request, "partials/error.html", {"message": str(error)}, status_code=500)
logger.info("ebook_admin_scan_complete changed_files=%s", count)
if count > 0:
schedule_bm25_refresh(request.app)
return templates.TemplateResponse(request, "partials/admin_status.html", {"message": f"Indexed {count} EPUBs"})
@router.post("/embed-missing", response_class=HTMLResponse)
def embed_missing(request: Request, config: AppConfig, session: DbSession) -> HTMLResponse:
"""Embed chunks missing vectors for the configured model."""
try:
count = embed_missing_chunks(session, config)
session.commit()
except Exception as error:
logger.exception("ebook_admin_embed_missing_failed")
return templates.TemplateResponse(request, "partials/error.html", {"message": str(error)}, status_code=500)
logger.info("ebook_admin_embed_missing_complete chunks=%s", count)
return templates.TemplateResponse(
request,
"partials/admin_status.html",
{"message": f"Embedded {count} chunks"},
)
@router.post("/embed-all", response_class=HTMLResponse)
def embed_all(request: Request, config: AppConfig, session: DbSession) -> HTMLResponse:
"""Embed all chunks missing vectors in fixed-size batches."""
total = 0
batches = 0
try:
while True:
count = embed_missing_chunks(session, config)
if count == 0:
break
session.commit()
total += count
batches += 1
logger.info(
"ebook_admin_embed_all_batch_complete batch=%s chunks=%s total_chunks=%s",
batches,
count,
total,
)
except Exception as error:
logger.exception(
"ebook_admin_embed_all_failed batches=%s chunks=%s",
batches,
total,
)
return templates.TemplateResponse(
request,
"partials/error.html",
{"message": f"Embed all failed after {total} chunks in {batches} batches: {error}"},
status_code=500,
)
logger.info("ebook_admin_embed_all_complete batches=%s chunks=%s", batches, total)
return templates.TemplateResponse(
request,
"partials/admin_status.html",
{"message": f"Embedded {total} chunks in {batches} batches of {config.embedding_batch_size}"},
)
+97
View File
@@ -0,0 +1,97 @@
"""Liveness and readiness routes for the EPUB search service."""
from __future__ import annotations
import logging
from http import HTTPStatus
from typing import TYPE_CHECKING
from fastapi import APIRouter
from fastapi.responses import JSONResponse
from sqlalchemy import literal, select
from sqlalchemy.exc import SQLAlchemyError
from python.ebook_search.api.dependencies import (
AppConfig, # noqa: TC001 FastAPI resolves this annotated dependency at runtime
)
from python.ebook_search.bm25_corpus import bm25_index_exists, bm25_index_path, read_bm25_manifest
from python.ebook_search.llm_interface import check_chat_endpoint, check_embedding_endpoint
from python.fastapi_tools import DbSession # noqa: TC001 FastAPI resolves this annotated dependency at runtime
if TYPE_CHECKING:
from sqlalchemy.orm import Session
from python.ebook_search.config import EbookSearchConfig
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get("/health")
def health() -> dict[str, str]:
"""Liveness probe that returns ok without touching dependencies."""
return {"status": "ok"}
@router.get("/ready")
def ready(config: AppConfig, session: DbSession) -> JSONResponse:
"""Readiness probe reporting database, embedding endpoint, and BM25 index status."""
database_ok = check_database(session)
embedding_ok = check_embedding_endpoint(config)
chat_status = chat_endpoint_status(config)
bm25_status = check_bm25_status(config)
checks = {
"database": "ok" if database_ok else "fail",
"embedding": "ok" if embedding_ok else "fail",
"chat": chat_status,
"bm25": bm25_status,
}
if not database_ok:
status = "unavailable"
status_code = HTTPStatus.SERVICE_UNAVAILABLE
elif not embedding_ok or chat_status == "fail" or bm25_status == "missing":
status = "degraded"
status_code = HTTPStatus.OK
else:
status = "ready"
status_code = HTTPStatus.OK
logger.info(
"ebook_ready_check status=%s database=%s embedding=%s chat=%s bm25=%s",
status,
database_ok,
embedding_ok,
chat_status,
bm25_status,
)
return JSONResponse(content={"status": status, "checks": checks}, status_code=status_code)
def chat_endpoint_status(config: EbookSearchConfig) -> str:
"""Return the answering chat endpoint status, or disabled when answers are off."""
if not config.answer_enabled:
return "disabled"
return "ok" if check_chat_endpoint(config) else "fail"
def check_database(session: Session) -> bool:
"""Return whether the database answers a trivial query."""
try:
session.execute(select(literal(1)))
except SQLAlchemyError as error:
logger.warning("ebook_ready_database_unavailable error=%s", error)
return False
return True
def check_bm25_status(config: EbookSearchConfig) -> str:
"""Return the persisted BM25 index status without loading it into memory."""
index_path = bm25_index_path(config)
manifest = read_bm25_manifest(index_path)
if manifest is None or not bm25_index_exists(index_path, manifest):
return "missing"
if manifest.chunk_count == 0:
return "empty"
return "ok"
+58
View File
@@ -0,0 +1,58 @@
"""Page routes for the EPUB search web UI."""
from __future__ import annotations
import logging
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse
from sqlalchemy import select
from python.ebook_search.api.dependencies import (
AppConfig, # noqa: TC001 FastAPI resolves this annotated dependency at runtime
)
from python.ebook_search.api.web import templates
from python.fastapi_tools import DbSession # noqa: TC001 FastAPI resolves this annotated dependency at runtime
from python.orm.richie import EbookSource
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get("/", response_class=HTMLResponse)
def index(request: Request, config: AppConfig) -> HTMLResponse:
"""Render the search page."""
return templates.TemplateResponse(request, "search.html", {"config": config})
@router.get("/books", response_class=HTMLResponse)
def books(request: Request, session: DbSession) -> HTMLResponse:
"""Render the indexed books page."""
sources = list(session.scalars(select(EbookSource).order_by(EbookSource.title)).all())
logger.info("ebook_books_page_loaded count=%s", len(sources))
return templates.TemplateResponse(request, "books.html", {"sources": sources})
@router.get("/books/{source_id}", response_class=HTMLResponse)
def book_detail(source_id: int, request: Request, session: DbSession) -> HTMLResponse:
"""Render details for one indexed book."""
source = session.get(EbookSource, source_id)
if source is not None:
chapter_count = len(source.chapters)
chunk_count = len(source.chunks)
else:
chapter_count = 0
chunk_count = 0
logger.info(
"ebook_book_detail_loaded source_id=%s found=%s chapters=%s chunks=%s",
source_id,
source is not None,
chapter_count,
chunk_count,
)
return templates.TemplateResponse(
request,
"book_detail.html",
{"chapter_count": chapter_count, "chunk_count": chunk_count, "source": source},
)
+116
View File
@@ -0,0 +1,116 @@
"""Search routes for the EPUB search web UI."""
from __future__ import annotations
import logging
from dataclasses import replace
from time import perf_counter
from typing import TYPE_CHECKING, Annotated
from fastapi import APIRouter, Form, Request
from fastapi.responses import HTMLResponse
from python.ebook_search.answer import answer_query
from python.ebook_search.api.dependencies import ( # noqa: TC001 FastAPI resolves these annotated dependencies at runtime
AppConfig,
AppEngine,
)
from python.ebook_search.api.web import templates
from python.ebook_search.guardrails import (
CitationReport,
is_confident,
retrieval_confidence,
validate_citations,
)
from python.ebook_search.search import SearchResponse, search_ebooks
from python.ebook_search.timing import runtime_step_from_start
if TYPE_CHECKING:
from python.ebook_search.config import EbookSearchConfig
logger = logging.getLogger(__name__)
router = APIRouter()
def build_answer(
query: str,
response: SearchResponse,
config: EbookSearchConfig,
) -> tuple[str, bool, CitationReport | None]:
"""Generate the answer for a search, returning ``(answer, low_confidence, citation_report)``."""
if not config.answer_enabled:
logger.info("ebook_answer_skipped_disabled")
return "Answer generation is disabled. Source chunks are shown below.", False, None
if not is_confident(response.results, config):
logger.info(
"ebook_answer_low_confidence confidence=%.4f threshold=%.4f",
retrieval_confidence(response.results),
config.min_retrieval_confidence,
)
answer = (
"Retrieval confidence is low for this query, so answer generation was skipped. "
"Source chunks are shown below."
)
return answer, True, None
try:
answer = answer_query(query, response.results, config)
except RuntimeError as error:
logger.warning("ebook_answer_request_failed_falling_back error=%s", error)
return "Answer generation failed. Source chunks are still shown below.", False, None
citation_report = None
if config.validate_citations_enabled and response.results:
citation_report = validate_citations(answer, len(response.results))
if citation_report.invalid or not citation_report.grounded:
logger.warning(
"ebook_answer_citation_issue invalid=%s grounded=%s",
citation_report.invalid,
citation_report.grounded,
)
return answer, False, citation_report
@router.post("/search", response_class=HTMLResponse)
def search(
request: Request,
config: AppConfig,
engine: AppEngine,
query: Annotated[str, Form()],
rerank: Annotated[str | None, Form()] = None,
) -> HTMLResponse:
"""Run a search and render HTMX results."""
try:
response = search_ebooks(engine, query, config, rerank=rerank == "true")
except Exception as error:
logger.exception("ebook_search_request_failed")
return templates.TemplateResponse(request, "partials/error.html", {"message": str(error)}, status_code=500)
answer_start = perf_counter()
answer, low_confidence, citation_report = build_answer(query, response, config)
answer_step_name = "Answer generation" if config.answer_enabled else "Answer skipped"
response = replace(
response,
timings=(*response.timings, runtime_step_from_start(answer_step_name, answer_start)),
)
for step in response.timings:
logger.info("ebook_search_timing step=%r runtime_ms=%.1f", step.name, step.duration_ms)
logger.info(
"ebook_search_request_complete results=%s rank_label=%s runtime_ms=%.1f",
len(response.results),
response.rank_label,
response.total_runtime_ms,
)
return templates.TemplateResponse(
request,
"partials/results.html",
{
"answer": answer,
"response": response,
"low_confidence": low_confidence,
"citation_report": citation_report,
},
)
+149
View File
@@ -0,0 +1,149 @@
body {
margin: 0;
background: #f7f7f4;
color: #202124;
font-family: system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif;
}
main {
max-width: 960px;
margin: 0 auto;
padding: 24px;
}
nav {
display: flex;
gap: 12px;
align-items: center;
margin-bottom: 20px;
}
nav form {
margin: 0;
}
.actions {
display: flex;
flex-wrap: wrap;
gap: 12px;
margin-bottom: 24px;
}
textarea {
display: block;
width: 100%;
margin: 8px 0 12px;
}
button {
padding: 8px 14px;
}
.check {
display: inline-flex;
gap: 8px;
align-items: center;
margin-right: 12px;
}
.rank-label {
margin-top: 24px;
font-weight: 700;
}
.results {
padding-left: 24px;
}
.meta,
.scores,
.status {
color: #626a73;
}
.scores {
display: flex;
flex-wrap: wrap;
gap: 8px;
margin: 12px 0;
}
.scores div {
display: inline-flex;
gap: 4px;
align-items: baseline;
}
.scores dt {
font-weight: 700;
}
.scores dd {
margin: 0;
}
.runtime {
margin-top: 16px;
}
.timing-chart {
display: grid;
gap: 8px;
padding: 0;
list-style: none;
}
.timing-chart li {
display: grid;
grid-template-columns: minmax(150px, 1fr) minmax(160px, 2fr) auto auto;
gap: 8px;
align-items: center;
}
.timing-bar {
height: 10px;
overflow: hidden;
background: #e5e5df;
}
.timing-bar span {
display: block;
height: 100%;
background: #3767c8;
}
.timing-value,
.timing-remaining {
color: #626a73;
font-variant-numeric: tabular-nums;
}
table {
width: 100%;
border-collapse: collapse;
}
th,
td {
padding: 8px;
border-bottom: 1px solid #d8d8d2;
text-align: left;
}
th {
font-weight: 700;
}
.error {
color: #9f1d20;
font-weight: 700;
}
.notice {
margin: 8px 0;
padding: 8px 12px;
border-left: 4px solid #c8881d;
background: #fcf3e2;
color: #6b4a06;
font-weight: 600;
}
@@ -0,0 +1,57 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>EPUB Admin</title>
<script src="https://unpkg.com/htmx.org@2.0.4"></script>
<link rel="stylesheet" href="/static/style.css">
</head>
<body>
<main>
<nav>
<a href="/">Search</a>
<a href="/books">Books</a>
<a href="/admin">Admin</a>
</nav>
<h1>Admin</h1>
<section id="admin-status"></section>
<section class="actions">
<form hx-post="/admin/scan" hx-target="#admin-status" hx-swap="innerHTML">
<button type="submit">Scan</button>
</form>
<form hx-post="/admin/embed-missing" hx-target="#admin-status" hx-swap="innerHTML">
<button type="submit">Embed</button>
</form>
<form hx-post="/admin/embed-all" hx-target="#admin-status" hx-swap="innerHTML">
<button type="submit">Embed all</button>
</form>
</section>
<section>
<h2>Embeddings</h2>
<table>
<thead>
<tr>
<th>Model</th>
<th>Dimensions</th>
<th>Embedded</th>
<th>Missing</th>
<th>Total chunks</th>
</tr>
</thead>
<tbody>
{% for item in stats %}
<tr>
<td>{{ item.model_name }}</td>
<td>{{ item.dimension }}</td>
<td>{{ item.embedded_chunks }}</td>
<td>{{ item.missing_chunks }}</td>
<td>{{ item.total_chunks }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</section>
</main>
</body>
</html>
@@ -0,0 +1,32 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{% if source %}{{ source.title }}{% else %}Book not found{% endif %}</title>
<link rel="stylesheet" href="/static/style.css">
</head>
<body>
<main>
<nav>
<a href="/">Search</a>
<a href="/books">Books</a>
<a href="/admin">Admin</a>
</nav>
{% if source %}
<h1>{{ source.title }}</h1>
<p class="meta">{{ source.author or "Unknown author" }}</p>
<dl>
<dt>File</dt>
<dd>{{ source.file_path }}</dd>
<dt>Chapters</dt>
<dd>{{ chapter_count }}</dd>
<dt>Chunks</dt>
<dd>{{ chunk_count }}</dd>
</dl>
{% else %}
<h1>Book not found</h1>
{% endif %}
</main>
</body>
</html>
@@ -0,0 +1,31 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>EPUB Books</title>
<link rel="stylesheet" href="/static/style.css">
</head>
<body>
<main>
<nav>
<a href="/">Search</a>
<a href="/books">Books</a>
<a href="/admin">Admin</a>
</nav>
<h1>Books</h1>
{% if sources %}
<ol class="results">
{% for source in sources %}
<li>
<h2><a href="/books/{{ source.id }}">{{ source.title }}</a></h2>
<p class="meta">{{ source.author or "Unknown author" }}</p>
</li>
{% endfor %}
</ol>
{% else %}
<p>No EPUBs indexed.</p>
{% endif %}
</main>
</body>
</html>
@@ -0,0 +1 @@
<p class="status">{{ message }}</p>
@@ -0,0 +1 @@
<p class="error">{{ message }}</p>
@@ -0,0 +1,84 @@
<div class="rank-label">{{ response.rank_label }}</div>
{% if response.timings %}
<section class="runtime">
<h2>Runtime</h2>
<p class="meta">Total {{ "%.1f"|format(response.total_runtime_ms) }} ms</p>
<ol class="timing-chart">
{% set total = response.total_runtime_ms %}
{% set ns = namespace(remaining=total) %}
{% for step in response.timings %}
{% set width = (step.duration_ms / total * 100) if total else 0 %}
{% if step.counts_toward_total %}
{% set ns.remaining = ns.remaining - step.duration_ms %}
{% endif %}
<li>
<span class="timing-label">{{ step.name }}</span>
<span class="timing-bar"><span style="width: {{ "%.2f"|format(width) }}%"></span></span>
<span class="timing-value">{{ "%.1f"|format(step.duration_ms) }} ms</span>
<span class="timing-remaining">{{ "%.1f"|format([ns.remaining, 0]|max) }} ms left</span>
</li>
{% endfor %}
</ol>
</section>
{% endif %}
<section class="answer">
<h2>Answer</h2>
{% if low_confidence|default(false) %}
<p class="notice">Low retrieval confidence — answer generation was skipped.</p>
{% endif %}
{% set report = citation_report|default(none) %}
{% if report is not none and not report.grounded %}
<p class="notice">Unverified — no source citations were found in this answer.</p>
{% endif %}
{% if report is not none and report.invalid %}
<p class="notice">Invalid citations: {{ report.invalid|join(", ") }} (no matching source).</p>
{% endif %}
<p>{{ answer }}</p>
</section>
{% if response.results %}
<ol class="results">
{% for result in response.results %}
<li>
<h2>{{ result.source_title }}</h2>
<p class="meta">
{% if result.source_author %}{{ result.source_author }}{% endif %}
{% if result.chapter_title %} · {{ result.chapter_title }}{% endif %}
{% if result.page_label %} · page {{ result.page_label }}{% endif %}
</p>
<p>{{ result.text }}</p>
<dl class="scores">
<div>
<dt>final</dt>
<dd>{{ "%.3f"|format(result.score) }}</dd>
</div>
{% if result.rerank_score is not none %}
<div>
<dt>rerank</dt>
<dd>{{ "%.3f"|format(result.rerank_score) }}</dd>
</div>
{% endif %}
{% if result.vector_score is not none %}
<div>
<dt>vector cosine</dt>
<dd>{{ "%.3f"|format(result.vector_score) }}</dd>
</div>
{% endif %}
{% if result.bm25_score is not none %}
<div>
<dt>BM25</dt>
<dd>{{ "%.6f"|format(result.bm25_score) }}</dd>
</div>
{% endif %}
{% if result.fused_score is not none %}
<div>
<dt>RRF</dt>
<dd>{{ "%.3f"|format(result.fused_score) }}</dd>
</div>
{% endif %}
</dl>
</li>
{% endfor %}
</ol>
{% else %}
<p>No results.</p>
{% endif %}
@@ -0,0 +1,30 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>EPUB Search</title>
<script src="https://unpkg.com/htmx.org@2.0.4"></script>
<link rel="stylesheet" href="/static/style.css">
</head>
<body>
<main>
<nav>
<a href="/">Search</a>
<a href="/books">Books</a>
<a href="/admin">Admin</a>
</nav>
<h1>EPUB Search</h1>
<form hx-post="/search" hx-target="#results" hx-swap="innerHTML">
<label for="query">Search</label>
<textarea id="query" name="query" rows="4" required></textarea>
<label class="check">
<input type="checkbox" name="rerank" value="true" {% if config.rerank.enabled %}checked{% endif %}>
Rerank
</label>
<button type="submit">Search</button>
</form>
<section id="results"></section>
</main>
</body>
</html>
+13
View File
@@ -0,0 +1,13 @@
"""Shared web UI resources for EPUB search."""
from __future__ import annotations
from pathlib import Path
from fastapi.templating import Jinja2Templates
PACKAGE_DIR = Path(__file__).resolve().parent
TEMPLATE_DIR = PACKAGE_DIR / "templates"
STATIC_DIR = PACKAGE_DIR / "static"
templates = Jinja2Templates(directory=TEMPLATE_DIR)
+281
View File
@@ -0,0 +1,281 @@
"""Persisted BM25 corpus management."""
from __future__ import annotations
import json
import logging
import shutil
from dataclasses import dataclass
from datetime import UTC, datetime
from functools import cache
from pathlib import Path
from typing import TYPE_CHECKING
import bm25s
from sqlalchemy import func, select, union_all
from python.orm.richie import EbookChapter, EbookChunk, EbookSource
if TYPE_CHECKING:
from sqlalchemy.orm import Session
from python.ebook_search.config import EbookSearchConfig
logger = logging.getLogger(__name__)
MANIFEST_NAME = "manifest.json"
REQUIRED_INDEX_FILES = frozenset(
{
"data.csc.index.npy",
"indices.csc.index.npy",
"indptr.csc.index.npy",
"params.index.json",
"vocab.index.json",
"corpus.jsonl",
}
)
@dataclass(frozen=True)
class BM25Manifest:
"""Metadata describing a persisted BM25 corpus."""
created_at: datetime
db_updated_at: datetime | None
chunk_count: int
@dataclass(frozen=True)
class BM25Corpus:
"""Loaded persisted BM25 corpus and retriever."""
retriever: object | None
records: tuple[dict[str, object], ...]
manifest: BM25Manifest
class BM25CorpusUnavailableError(RuntimeError):
"""Raised when the persisted BM25 corpus cannot be loaded."""
def bm25_index_path(config: EbookSearchConfig) -> Path:
"""Return the configured BM25 index root path relative to the current working directory."""
path = Path(config.bm25_index_dir).expanduser()
if path.is_absolute():
return path
return Path.cwd() / path
def get_current_bm25_index(index_path: Path) -> Path:
"""Return the live BM25 index directory."""
current_path = index_path / "current"
if current_path.exists() or current_path.is_symlink():
return current_path
return index_path
def ensure_bm25_corpus(session: Session, config: EbookSearchConfig) -> None:
"""Create or refresh the persisted BM25 corpus when it is missing or stale."""
index_path = bm25_index_path(config)
manifest = read_bm25_manifest(index_path)
db_updated_at = corpus_last_updated_at(session)
if not bm25_index_exists(index_path, manifest):
logger.info("ebook_bm25_index_missing path=%s", index_path)
refresh_bm25_corpus(session, config, db_updated_at=db_updated_at)
return
if db_updated_at is not None and manifest is not None and manifest.created_at < db_updated_at:
logger.info(
"ebook_bm25_index_stale path=%s created_at=%s db_updated_at=%s",
index_path,
manifest.created_at.isoformat(),
db_updated_at.isoformat(),
)
refresh_bm25_corpus(session, config, db_updated_at=db_updated_at)
return
logger.info(
"ebook_bm25_index_current path=%s chunks=%s created_at=%s",
index_path,
manifest.chunk_count if manifest else 0,
manifest.created_at.isoformat() if manifest else None,
)
def refresh_bm25_corpus(
session: Session,
config: EbookSearchConfig,
*,
db_updated_at: datetime | None = None,
) -> BM25Manifest:
"""Rebuild and persist the BM25 corpus from the current database chunks."""
index_path = bm25_index_path(config)
records, texts = fetch_bm25_corpus_records(session)
manifest = BM25Manifest(
created_at=datetime.now(tz=UTC),
db_updated_at=db_updated_at if db_updated_at is not None else corpus_last_updated_at(session),
chunk_count=len(records),
)
write_bm25_corpus(index_path, records, texts, manifest)
logger.info(
"ebook_bm25_index_refreshed path=%s chunks=%s created_at=%s",
index_path,
manifest.chunk_count,
manifest.created_at.isoformat(),
)
return manifest
@cache
def load_bm25_corpus(config: EbookSearchConfig) -> BM25Corpus:
"""Load the BM25 corpus into memory once per process.
Background refresh tasks clear this cache after rebuilding the on-disk corpus.
"""
index_path = bm25_index_path(config)
active_index_path = get_current_bm25_index(index_path)
logger.info("ebook_bm25_corpus_cache_load path=%s active_path=%s", index_path, active_index_path)
manifest = read_bm25_manifest(index_path)
if manifest is None or not bm25_index_exists(index_path, manifest):
msg = f"BM25 corpus is not available: {index_path}"
raise BM25CorpusUnavailableError(msg)
if manifest.chunk_count == 0:
return BM25Corpus(retriever=None, records=(), manifest=manifest)
retriever = bm25s.BM25.load(active_index_path, load_corpus=True, mmap=True)
records = tuple(dict(record) for record in retriever.corpus)
return BM25Corpus(retriever=retriever, records=records, manifest=manifest)
def score_bm25_corpus(query: str, corpus: BM25Corpus, *, limit: int) -> list[tuple[dict[str, object], float]]:
"""Score a query against a loaded BM25 corpus."""
if corpus.retriever is None or not corpus.records:
return []
k = min(limit, len(corpus.records))
documents, scores = corpus.retriever.retrieve(
bm25s.tokenize(query, show_progress=False),
corpus=list(corpus.records),
k=k,
show_progress=False,
)
results: list[tuple[dict[str, object], float]] = []
for document, score in zip(documents[0], scores[0], strict=True):
score_value = float(score)
if score_value <= 0:
continue
results.append((dict(document), score_value))
return results
def fetch_bm25_corpus_records(session: Session) -> tuple[list[dict[str, object]], list[str]]:
"""Fetch persistable BM25 corpus records and their matching index texts from the database.
search_text is only needed to build the index, so it is returned separately instead of
being persisted into the corpus records, which would double the corpus size.
"""
statement = (
select(
EbookChunk.id.label("chunk_id"),
EbookChunk.text.label("text"),
EbookSource.title.label("source_title"),
EbookSource.author.label("source_author"),
EbookChapter.title.label("chapter_title"),
EbookChunk.page_label.label("page_label"),
EbookChunk.search_text.label("bm25_text"),
)
.select_from(EbookChunk)
.join(EbookSource, EbookSource.id == EbookChunk.source_id)
.outerjoin(EbookChapter, EbookChapter.id == EbookChunk.chapter_id)
.order_by(EbookChunk.id)
)
records: list[dict[str, object]] = []
texts: list[str] = []
for row in session.execute(statement).mappings():
record = dict(row)
texts.append(str(record.pop("bm25_text")))
records.append(record)
return records, texts
def corpus_last_updated_at(session: Session) -> datetime | None:
"""Return the latest source/chapter/chunk update timestamp relevant to BM25 text."""
update_times = union_all(
select(func.max(EbookSource.updated).label("updated")),
select(func.max(EbookChapter.updated).label("updated")),
select(func.max(EbookChunk.updated).label("updated")),
).subquery()
return session.scalar(select(func.max(update_times.c.updated)))
def write_bm25_corpus(
index_path: Path,
records: list[dict[str, object]],
texts: list[str],
manifest: BM25Manifest,
) -> None:
"""Write a BM25 corpus generation and publish it through the current symlink."""
index_path.mkdir(parents=True, exist_ok=True)
generations_path = index_path / "generations"
generations_path.mkdir(exist_ok=True)
generation_path = next_bm25_generation_path(generations_path, manifest.created_at)
current_path = index_path / "current"
next_current_path = index_path / f".current.{generation_path.name}.tmp"
try:
generation_path.mkdir()
# Empty corpora publish a manifest-only generation so startup succeeds before any chunks exist.
if records:
retriever = bm25s.BM25()
retriever.index(bm25s.tokenize(texts, show_progress=False), show_progress=False)
retriever.save(generation_path, corpus=records, show_progress=False)
write_bm25_manifest(generation_path, manifest)
next_current_path.unlink(missing_ok=True)
next_current_path.symlink_to(generation_path, target_is_directory=True)
next_current_path.replace(current_path)
except Exception:
next_current_path.unlink(missing_ok=True)
shutil.rmtree(generation_path, ignore_errors=True)
raise
def read_bm25_manifest(index_path: Path) -> BM25Manifest | None:
"""Read the BM25 manifest if it exists and is valid."""
manifest_path = get_current_bm25_index(index_path) / MANIFEST_NAME
if not manifest_path.exists():
return None
body = json.loads(manifest_path.read_text(encoding="utf-8"))
return BM25Manifest(
created_at=datetime.fromisoformat(str(body["created_at"])),
db_updated_at=datetime.fromisoformat(str(body["db_updated_at"])) if body.get("db_updated_at") else None,
chunk_count=int(body["chunk_count"]),
)
def write_bm25_manifest(index_path: Path, manifest: BM25Manifest) -> None:
"""Write the BM25 manifest to an index directory."""
body = {
"created_at": manifest.created_at.isoformat(),
"db_updated_at": manifest.db_updated_at.isoformat() if manifest.db_updated_at else None,
"chunk_count": manifest.chunk_count,
}
(index_path / MANIFEST_NAME).write_text(json.dumps(body, indent=2, sort_keys=True), encoding="utf-8")
def bm25_index_exists(index_path: Path, manifest: BM25Manifest | None) -> bool:
"""Return whether a usable persisted BM25 index exists."""
active_index_path = get_current_bm25_index(index_path)
if manifest is None or not active_index_path.is_dir():
return False
if manifest.chunk_count == 0:
return True
return all((active_index_path / file_name).exists() for file_name in REQUIRED_INDEX_FILES)
def next_bm25_generation_path(generations_path: Path, created_at: datetime) -> Path:
"""Return an unused dated BM25 generation path."""
base_name = created_at.astimezone(UTC).strftime("%Y%m%dT%H%M%S.%fZ")
generation_path = generations_path / base_name
suffix = 1
while generation_path.exists():
generation_path = generations_path / f"{base_name}.{suffix}"
suffix += 1
return generation_path
+126
View File
@@ -0,0 +1,126 @@
"""Configuration for the EPUB search app."""
from __future__ import annotations
from os import getenv
from typing import Annotated, Self
from pydantic import AliasChoices, Field, field_validator, model_validator
from pydantic_settings import BaseSettings, NoDecode, SettingsConfigDict
def normalize_embedding_alias(model: str) -> str:
"""Normalize a supported embedding alias to its provider model name."""
aliases = {
"Qwen3-Embedding-0.6B": "qwen3-embedding-0.6b",
"Qwen3-Embedding-4B": "qwen3-embedding-4b",
"Qwen3-Embedding-8B": "qwen3-embedding-8b",
"Qwen/Qwen3-Embedding-0.6B": "qwen3-embedding-0.6b",
"Qwen/Qwen3-Embedding-4B": "qwen3-embedding-4b",
"Qwen/Qwen3-Embedding-8B": "qwen3-embedding-8b",
"qwen3-embedding:0.6b": "qwen3-embedding-0.6b",
"qwen3-embedding:4b": "qwen3-embedding-4b",
"qwen3-embedding:8b": "qwen3-embedding-8b",
"qwen3-embedding-0.6b": "qwen3-embedding-0.6b",
"qwen3-embedding-4b": "qwen3-embedding-4b",
"qwen3-embedding-8b": "qwen3-embedding-8b",
}
standard_model = aliases.get(model)
if standard_model is None:
error = f"Embedding model {model} is not supported. Supported models are {aliases.keys()}"
raise ValueError(error)
return standard_model
def normalize_embedding_model(default: str = "qwen3-embedding-0.6b") -> str:
"""Normalize the configured embedding alias to its provider model name."""
return normalize_embedding_alias(getenv("EBOOK_SEARCH_EMBEDDING_MODEL", default))
class RerankConfig(BaseSettings):
"""vLLM reranker settings."""
model_config = SettingsConfigDict(env_prefix="EBOOK_SEARCH_RERANK_", frozen=True, protected_namespaces=())
enabled: bool = True
base_url: str = "http://192.168.90.25:8001"
model: str = "qwen3-reranker-06b"
candidates: int = 24
timeout_seconds: float = 30.0
score_weight: float = 0.7
hybrid_weight: float = 0.3
class EbookSearchConfig(BaseSettings):
"""Runtime settings for EPUB search."""
model_config = SettingsConfigDict(
env_prefix="EBOOK_SEARCH_",
frozen=True,
populate_by_name=True,
protected_namespaces=(),
)
rerank: RerankConfig = Field(default_factory=RerankConfig)
top_k: int = 12
library_paths: Annotated[tuple[str, ...], NoDecode] = ()
chunk_tokens: int = 700
chunk_overlap: int = 100
vllm_base_url: str = "https://ollama.com/v1"
vllm_api_key: str = Field(
default="not-needed",
validation_alias=AliasChoices("EBOOK_SEARCH_VLLM_API_KEY", "OLLAMA_API_KEY"),
)
chat_model: str = "deepseek-v4-flash"
answer_enabled: bool = True
embedding_base_url: str = "http://192.168.90.25:8000/v1"
embedding_api_key: str = "not-needed"
embedding_model: str = "qwen3-embedding-0.6b"
embedding_batch_size: int = 32
embedding_timeout_seconds: float = 60.0
chat_timeout_seconds: float = 60.0
vector_candidate_multiplier: int = 4
bm25_candidate_limit: int = 120
rrf_rank_constant: int = 60
min_retrieval_confidence: float = 0.0
validate_citations_enabled: bool = True
bm25_index_dir: str = ".ebook_search_bm25"
bm25_refresh_delay_seconds: int = 60
@field_validator("library_paths", mode="before")
@classmethod
def split_library_paths(cls, value: object) -> object:
"""Split a colon-separated library path string into a tuple of paths."""
if isinstance(value, str):
return tuple(path for path in value.split(":") if path)
return value
@field_validator("embedding_model")
@classmethod
def normalize_embedding(cls, value: str) -> str:
"""Normalize the configured embedding alias to its provider model name."""
return normalize_embedding_alias(value)
@model_validator(mode="after")
def validate_runtime_consistency(self) -> Self:
"""Reject configurations that cannot serve the features they enable."""
if not self.embedding_base_url.strip():
msg = "embedding_base_url must be set"
raise ValueError(msg)
if self.answer_enabled and (not self.vllm_base_url.strip() or not self.chat_model.strip()):
msg = "answer_enabled requires vllm_base_url and chat_model to be set"
raise ValueError(msg)
if self.rerank.enabled and not self.rerank.base_url.strip():
msg = "rerank.enabled requires rerank.base_url to be set"
raise ValueError(msg)
return self
def load_rerank_config() -> RerankConfig:
"""Load reranker config from environment variables."""
return RerankConfig()
def load_config() -> EbookSearchConfig:
"""Load EPUB search config from environment variables."""
return EbookSearchConfig()
+170
View File
@@ -0,0 +1,170 @@
"""Embedding model helpers."""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING
from sqlalchemy import func, select
from sqlalchemy.dialects.postgresql import insert
from python.ebook_search.llm_interface import request_embeddings
from python.orm.richie import (
EbookChunk,
EbookChunkEmbedding1024,
EbookChunkEmbedding2560,
EbookChunkEmbedding4096,
EbookEmbeddingModel,
)
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from collections.abc import Sequence
from sqlalchemy.orm import Session
from python.ebook_search.config import EbookSearchConfig
MODEL_DIMENSIONS = {
"qwen3-embedding-0.6b": 1024,
"qwen3-embedding-4b": 2560,
"qwen3-embedding-8b": 4096,
}
def get_embedding_table(
dimension: int,
) -> type[EbookChunkEmbedding1024 | EbookChunkEmbedding2560 | EbookChunkEmbedding4096]:
"""Return the embedding table mapped to an embedding dimension."""
embedding_tables = {
1024: EbookChunkEmbedding1024,
2560: EbookChunkEmbedding2560,
4096: EbookChunkEmbedding4096,
}
table = embedding_tables.get(dimension)
if not table:
msg = f"Embedding dimension {dimension} is not supported"
raise ValueError(msg)
return table
@dataclass(frozen=True)
class EmbeddingModelStats:
"""Embedding coverage for one model."""
model_name: str
dimension: int
embedded_chunks: int
total_chunks: int
@property
def missing_chunks(self) -> int:
"""Return chunks missing this embedding model."""
return max(self.total_chunks - self.embedded_chunks, 0)
def embed_texts(texts: Sequence[str], config: EbookSearchConfig) -> list[list[float]]:
"""Embed text with the configured vLLM embedding model."""
logger.info(
"ebook_embed_request_start base_url=%s model=%s count=%s",
config.embedding_base_url,
config.embedding_model,
len(texts),
)
vectors = request_embeddings(texts, config)
expected_dimension = MODEL_DIMENSIONS[config.embedding_model]
for vector in vectors:
if len(vector) != expected_dimension:
msg = f"Expected {expected_dimension} dimensions, got {len(vector)}"
raise ValueError(msg)
logger.info(
"ebook_embed_request_complete model=%s count=%s dimension=%s",
config.embedding_model,
len(vectors),
expected_dimension,
)
return vectors
def embed_query(query: str, config: EbookSearchConfig) -> list[float]:
"""Embed a search query with the Qwen retrieval instruction."""
instructed_query = f"Instruct: Retrieve relevant passages for the query.\nQuery: {query}"
return embed_texts([instructed_query], config)[0]
def ensure_embedding_models(session: Session) -> None:
"""Ensure supported embedding model rows exist."""
for name, dimension in MODEL_DIMENSIONS.items():
existing = session.scalar(select(EbookEmbeddingModel).where(EbookEmbeddingModel.name == name))
if existing is None:
session.add(EbookEmbeddingModel(name=name, dimension=dimension, is_default=name == "qwen3-embedding-0.6b"))
logger.info("ebook_embedding_model_created model=%s dimension=%s", name, dimension)
session.flush()
def embedding_model_stats(session: Session) -> list[EmbeddingModelStats]:
"""Return embedding coverage counts for every supported model."""
total_chunks = session.scalar(select(func.count(EbookChunk.id))) or 0
models = {
model.name: model
for model in session.scalars(
select(EbookEmbeddingModel)
.where(EbookEmbeddingModel.name.in_(MODEL_DIMENSIONS))
.order_by(EbookEmbeddingModel.name)
)
}
stats: list[EmbeddingModelStats] = []
for model_name, dimension in MODEL_DIMENSIONS.items():
model = models.get(model_name)
embedded_chunks = 0
if model is not None:
table = get_embedding_table(dimension)
embedded_chunks = session.scalar(select(func.count(table.id)).where(table.model_id == model.id)) or 0
stats.append(
EmbeddingModelStats(
model_name=model_name,
dimension=dimension,
embedded_chunks=embedded_chunks,
total_chunks=total_chunks,
)
)
return stats
def embed_missing_chunks(session: Session, config: EbookSearchConfig) -> int:
"""Embed chunks missing embeddings for the configured model."""
ensure_embedding_models(session)
model = session.scalar(select(EbookEmbeddingModel).where(EbookEmbeddingModel.name == config.embedding_model))
if model is None:
supported_models = ", ".join(MODEL_DIMENSIONS)
msg = f"Unknown embedding model: {config.embedding_model}. Supported models: {supported_models}"
raise ValueError(msg)
table = get_embedding_table(model.dimension)
chunks = list(
session.scalars(
select(EbookChunk)
.outerjoin(table, (table.chunk_id == EbookChunk.id) & (table.model_id == model.id))
.where(table.id.is_(None))
.order_by(EbookChunk.id)
.limit(config.embedding_batch_size)
)
)
if not chunks:
logger.info("ebook_embed_missing_none model=%s", config.embedding_model)
return 0
logger.info("ebook_embed_missing_batch_start model=%s count=%s", config.embedding_model, len(chunks))
vectors = embed_texts([chunk.text for chunk in chunks], config)
rows = [
{"chunk_id": chunk.id, "model_id": model.id, "embedding": vector}
for chunk, vector in zip(chunks, vectors, strict=True)
]
statement = insert(table).values(rows).on_conflict_do_nothing(index_elements=["chunk_id", "model_id"])
session.execute(statement)
session.flush()
logger.info("ebook_embed_missing_batch_complete model=%s count=%s", config.embedding_model, len(rows))
return len(rows)
+95
View File
@@ -0,0 +1,95 @@
"""EPUB parsing helpers."""
from __future__ import annotations
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING
from bs4 import BeautifulSoup
from ebooklib import ITEM_DOCUMENT, epub
if TYPE_CHECKING:
from pathlib import Path
WHITESPACE_RE = re.compile(r"\s+")
@dataclass(frozen=True)
class ParsedChapter:
"""Text extracted from one EPUB spine document."""
title: str | None
href: str | None
text: str
page_labels: tuple[str, ...]
@dataclass(frozen=True)
class ParsedEpub:
"""Parsed EPUB metadata and text."""
title: str
author: str | None
language: str | None
publisher: str | None
identifier: str | None
chapters: tuple[ParsedChapter, ...]
def parse_epub(path: Path) -> ParsedEpub:
"""Parse EPUB metadata and spine text."""
book = epub.read_epub(path)
chapters = []
for item in book.get_items_of_type(ITEM_DOCUMENT):
soup = BeautifulSoup(item.get_content(), "html.parser")
title = chapter_title(soup)
page_labels = tuple(extract_page_labels(soup))
text = clean_text(soup.get_text(" "))
if text:
chapters.append(ParsedChapter(title=title, href=item.get_name(), text=text, page_labels=page_labels))
return ParsedEpub(
title=metadata_value(book, "title") or path.stem,
author=metadata_value(book, "creator"),
language=metadata_value(book, "language"),
publisher=metadata_value(book, "publisher"),
identifier=metadata_value(book, "identifier"),
chapters=tuple(chapters),
)
def metadata_value(book: epub.EpubBook, name: str) -> str | None:
"""Return the first non-empty Dublin Core metadata value for a name."""
values = book.get_metadata("DC", name)
if not values:
return None
value = values[0][0]
return str(value).strip() or None
def chapter_title(soup: BeautifulSoup) -> str | None:
"""Extract the best available title from an EPUB document soup."""
heading = soup.find(["h1", "h2", "h3"])
if heading is None:
title = soup.find("title")
if title is None:
return None
return clean_text(title.get_text(" ")) or None
return clean_text(heading.get_text(" ")) or None
def extract_page_labels(soup: BeautifulSoup) -> list[str]:
"""Extract EPUB page-break labels from a document soup."""
labels: list[str] = []
for tag in soup.find_all(attrs={"epub:type": "pagebreak"}):
label = tag.get("title") or tag.get("aria-label") or tag.get_text(" ")
clean = clean_text(str(label))
if clean:
labels.append(clean)
return labels
def clean_text(text: str) -> str:
"""Normalize whitespace in extracted EPUB text."""
return WHITESPACE_RE.sub(" ", text).strip()
+1
View File
@@ -0,0 +1 @@
"""Offline evaluation tooling for the ebook search pipeline."""
@@ -0,0 +1,71 @@
{"query": "Who is Damien Montgomery and how does he become a Jump Mage?", "answer": null, "answerable": true, "relevant_sources": ["Starship's Mage"]}
{"query": "What is a Rune Wright and why is Damien so rare?", "answer": null, "answerable": true, "relevant_sources": ["Starship's Mage"]}
{"query": "How does jump magic let starships travel faster than light?", "answer": null, "answerable": true, "relevant_sources": ["Starship's Mage"]}
{"query": "What is the role of the Mage-King of Mars in the Protectorate?", "answer": null, "answerable": true, "relevant_sources": ["Starship's Mage"]}
{"query": "What happened aboard the Blue Jay in the first Starship's Mage book?", "answer": null, "answerable": true, "relevant_sources": ["Starship's Mage"]}
{"query": "Who is Captain David Rice?", "answer": null, "answerable": true, "relevant_sources": ["Starship's Mage"]}
{"query": "How are amplifiers and simulacrums used to power a ship's jump?", "answer": null, "answerable": true, "relevant_sources": ["Starship's Mage"]}
{"query": "What duties does a Hand of the Mage-King carry out?", "answer": null, "answerable": true, "relevant_sources": ["Starship's Mage"]}
{"query": "Explain the structure of the Royal Martian Navy.", "answer": null, "answerable": true, "relevant_sources": ["Starship's Mage"]}
{"query": "How do mages carve runes to enchant a starship?", "answer": null, "answerable": true, "relevant_sources": ["Starship's Mage"]}
{"query": "What threat do the Legatan rebels pose to the Protectorate?", "answer": null, "answerable": true, "relevant_sources": ["Starship's Mage"]}
{"query": "How does Damien handle his first command?", "answer": null, "answerable": true, "relevant_sources": ["Starship's Mage"]}
{"query": "What is the significance of the simulacrum on a jump ship?", "answer": null, "answerable": true, "relevant_sources": ["Starship's Mage"]}
{"query": "Describe a mage duel in the Starship's Mage series.", "answer": null, "answerable": true, "relevant_sources": ["Starship's Mage"]}
{"query": "What moral conflicts does Damien face as a Hand of the Mage-King?", "answer": null, "answerable": true, "relevant_sources": ["Starship's Mage"]}
{"query": "How does the Protectorate keep peace among its member worlds?", "answer": null, "answerable": true, "relevant_sources": ["Starship's Mage"]}
{"query": "Who is the Keeper of Oaths and how does Damien work with them?", "answer": null, "answerable": true, "relevant_sources": ["Starship's Mage"]}
{"query": "What event is known as the Onset and how does it change the world?", "answer": null, "answerable": true, "relevant_sources": ["The Onset"]}
{"query": "Who is the main character at the start of the Onset?", "answer": null, "answerable": true, "relevant_sources": ["The Onset"]}
{"query": "How do survivors adapt after the Onset begins?", "answer": null, "answerable": true, "relevant_sources": ["The Onset"]}
{"query": "What new abilities emerge during the Onset?", "answer": null, "answerable": true, "relevant_sources": ["The Onset"]}
{"query": "Describe the primary antagonist in the Onset series.", "answer": null, "answerable": true, "relevant_sources": ["The Onset"]}
{"query": "How does society collapse and reorganize after the Onset?", "answer": null, "answerable": true, "relevant_sources": ["The Onset"]}
{"query": "What factions form in the aftermath of the Onset?", "answer": null, "answerable": true, "relevant_sources": ["The Onset"]}
{"query": "How does the protagonist gain power throughout the Onset?", "answer": null, "answerable": true, "relevant_sources": ["The Onset"]}
{"query": "What is the cause or origin of the Onset?", "answer": null, "answerable": true, "relevant_sources": ["The Onset"]}
{"query": "Describe an early survival challenge faced after the Onset.", "answer": null, "answerable": true, "relevant_sources": ["The Onset"]}
{"query": "How do the characters defend their stronghold during the Onset?", "answer": null, "answerable": true, "relevant_sources": ["The Onset"]}
{"query": "What relationships drive the protagonist's choices in the Onset?", "answer": null, "answerable": true, "relevant_sources": ["The Onset"]}
{"query": "How does the Onset escalate by the end of the first book?", "answer": null, "answerable": true, "relevant_sources": ["The Onset"]}
{"query": "What mysteries about the Onset remain unresolved?", "answer": null, "answerable": true, "relevant_sources": ["The Onset"]}
{"query": "How do the rules of the world change once the Onset takes hold?", "answer": null, "answerable": true, "relevant_sources": ["The Onset"]}
{"query": "What weapons or tactics work best against the threats of the Onset?", "answer": null, "answerable": true, "relevant_sources": ["The Onset"]}
{"query": "How does Bob Johansson become a von Neumann probe?", "answer": null, "answerable": true, "relevant_sources": ["We Are Legion (We Are Bob)"]}
{"query": "What is a replicant and why do Bob's copies have different personalities?", "answer": null, "answerable": true, "relevant_sources": ["We Are Legion (We Are Bob)"]}
{"query": "Who are Riker, Homer, and Bill among the Bob clones?", "answer": null, "answerable": true, "relevant_sources": ["We Are Legion (We Are Bob)"]}
{"query": "What is GUPPI and how does Bob use it?", "answer": null, "answerable": true, "relevant_sources": ["We Are Legion (We Are Bob)"]}
{"query": "Describe the threat posed by the Others.", "answer": null, "answerable": true, "relevant_sources": ["We Are Legion (We Are Bob)"]}
{"query": "How does Bob protect and uplift the Deltans?", "answer": null, "answerable": true, "relevant_sources": ["We Are Legion (We Are Bob)"]}
{"query": "Why do the replicants drift apart in personality over time?", "answer": null, "answerable": true, "relevant_sources": ["We Are Legion (We Are Bob)"]}
{"query": "What is the role of FAITH and the Brazilian Empire on Earth?", "answer": null, "answerable": true, "relevant_sources": ["We Are Legion (We Are Bob)"]}
{"query": "How does subspace communication work for the Bobs?", "answer": null, "answerable": true, "relevant_sources": ["We Are Legion (We Are Bob)"]}
{"query": "What happens to Bender after he goes missing?", "answer": null, "answerable": true, "relevant_sources": ["We Are Legion (We Are Bob)"]}
{"query": "How do the Bobs build self-replicating probes across the galaxy?", "answer": null, "answerable": true, "relevant_sources": ["We Are Legion (We Are Bob)"]}
{"query": "How does Bob evacuate humanity after Earth becomes uninhabitable?", "answer": null, "answerable": true, "relevant_sources": ["We Are Legion (We Are Bob)"]}
{"query": "Describe the conflict between different factions of Bobs.", "answer": null, "answerable": true, "relevant_sources": ["We Are Legion (We Are Bob)"]}
{"query": "What ethical dilemmas does Bob face when interfering with primitive species?", "answer": null, "answerable": true, "relevant_sources": ["We Are Legion (We Are Bob)"]}
{"query": "How does the original Bob differ from later generations of clones?", "answer": null, "answerable": true, "relevant_sources": ["We Are Legion (We Are Bob)"]}
{"query": "How do the Bobs defeat the Others' system-harvesting fleets?", "answer": null, "answerable": true, "relevant_sources": ["We Are Legion (We Are Bob)"]}
{"query": "What role does Howard play in the human colonies?", "answer": null, "answerable": true, "relevant_sources": ["We Are Legion (We Are Bob)"]}
// querys not it the dataset
{"query": "How does Frodo destroy the One Ring in The Lord of the Rings?", "answer": null, "answerable": false, "relevant_sources": []}
{"query": "Who killed Dumbledore in Harry Potter and the Half-Blood Prince?", "answer": null, "answerable": false, "relevant_sources": []}
{"query": "What house does Tyrion Lannister belong to in A Game of Thrones?", "answer": null, "answerable": false, "relevant_sources": []}
{"query": "How does Paul Atreides control the spice on Arrakis in Dune?", "answer": null, "answerable": false, "relevant_sources": []}
{"query": "What does the green light at the end of the dock mean in The Great Gatsby?", "answer": null, "answerable": false, "relevant_sources": []}
{"query": "Why does Hester Prynne wear a scarlet letter?", "answer": null, "answerable": false, "relevant_sources": []}
{"query": "What does the white whale represent in Moby-Dick?", "answer": null, "answerable": false, "relevant_sources": []}
{"query": "How does Elizabeth Bennet's view of Mr. Darcy change in Pride and Prejudice?", "answer": null, "answerable": false, "relevant_sources": []}
{"query": "What crime does Raskolnikov commit in Crime and Punishment?", "answer": null, "answerable": false, "relevant_sources": []}
{"query": "How does Katniss volunteer for the Hunger Games?", "answer": null, "answerable": false, "relevant_sources": []}
{"query": "What is Winston Smith's job in Nineteen Eighty-Four?", "answer": null, "answerable": false, "relevant_sources": []}
{"query": "Who is Atticus Finch defending in To Kill a Mockingbird?", "answer": null, "answerable": false, "relevant_sources": []}
{"query": "What is the capital of Australia?", "answer": null, "answerable": false, "relevant_sources": []}
{"query": "How do I bake a sourdough loaf from scratch?", "answer": null, "answerable": false, "relevant_sources": []}
{"query": "Explain how photosynthesis converts sunlight into energy.", "answer": null, "answerable": false, "relevant_sources": []}
{"query": "What were the main causes of World War I?", "answer": null, "answerable": false, "relevant_sources": []}
{"query": "How does compound interest work?", "answer": null, "answerable": false, "relevant_sources": []}
{"query": "How do I change a flat tire on a car?", "answer": null, "answerable": false, "relevant_sources": []}
{"query": "What is the boiling point of water at sea level?", "answer": null, "answerable": false, "relevant_sources": []}
{"query": "What is the recommended daily intake of vitamin D?", "answer": null, "answerable": false, "relevant_sources": []}
+47
View File
@@ -0,0 +1,47 @@
"""Shared query set loading for evaluation and load testing.
Each JSONL record has a ``query`` and an optional reference ``answer``. ``answerable``
marks whether the query should be answerable from the library (false for out-of-corpus
"garbage" queries used to test the refusal path). Relevance for retrieval metrics is
labeled at source (book) granularity in ``relevant_sources``; source titles must match
``ebook_source.title`` values for the indexed corpus.
"""
from __future__ import annotations
import json
from dataclasses import dataclass
from pathlib import Path
DEFAULT_QUERIES_PATH = Path(__file__).parent / "data" / "queries.jsonl"
@dataclass(frozen=True)
class GoldQuery:
"""One labeled query shared by the eval and load-test tools."""
query: str
answer: str | None
answerable: bool
relevant_sources: tuple[str, ...]
relevant_substrings: tuple[str, ...]
def load_gold_queries(path: Path = DEFAULT_QUERIES_PATH) -> list[GoldQuery]:
"""Load labeled queries from a JSONL file."""
queries: list[GoldQuery] = []
for line in path.read_text(encoding="utf-8").splitlines():
stripped = line.strip()
if not stripped:
continue
record = json.loads(stripped)
queries.append(
GoldQuery(
query=str(record["query"]),
answer=record.get("answer"),
answerable=bool(record.get("answerable", True)),
relevant_sources=tuple(record.get("relevant_sources", ())),
relevant_substrings=tuple(record.get("relevant_substrings", ())),
)
)
return queries
+57
View File
@@ -0,0 +1,57 @@
"""Serve-time output guardrails for retrieval confidence and answer citations."""
from __future__ import annotations
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from python.ebook_search.config import EbookSearchConfig
from python.ebook_search.search import SearchResult
CITATION_RE = re.compile(r"\[(\d+)\]")
def retrieval_confidence(results: list[SearchResult]) -> float:
"""Return the strongest interpretable relevance signal of the top result.
Reciprocal-rank-fusion scores are rank-based and not comparable across queries,
so the rerank relevance score is preferred, then vector cosine similarity, then
the final score.
"""
if not results:
return 0.0
top = results[0]
if top.rerank_score is not None:
return top.rerank_score
if top.vector_score is not None:
return top.vector_score
return top.score
def is_confident(results: list[SearchResult], config: EbookSearchConfig) -> bool:
"""Return whether top-result confidence meets the configured threshold."""
return retrieval_confidence(results) >= config.min_retrieval_confidence
@dataclass(frozen=True)
class CitationReport:
"""Validation summary for bracketed citation markers in a generated answer."""
cited: tuple[int, ...]
invalid: tuple[int, ...]
grounded: bool
def validate_citations(answer: str, result_count: int) -> CitationReport:
"""Validate bracketed citation markers against the number of shown sources.
A marker is valid when it points to a returned source (``1..result_count``).
``grounded`` is true when the answer cites at least one valid source.
"""
markers = sorted({int(match.group(1)) for match in CITATION_RE.finditer(answer)})
valid = range(1, result_count + 1)
cited = tuple(marker for marker in markers if marker in valid)
invalid = tuple(marker for marker in markers if marker not in valid)
return CitationReport(cited=cited, invalid=invalid, grounded=bool(cited))
+195
View File
@@ -0,0 +1,195 @@
"""EPUB ingestion into Richie DB."""
from __future__ import annotations
import hashlib
import logging
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING
import tiktoken
from sqlalchemy import or_, select
from python.ebook_search.epub_parse import parse_epub
from python.orm.richie import EbookChapter, EbookChunk, EbookSource
logger = logging.getLogger(__name__)
DEFAULT_CHUNK_TOKENS = 700
DEFAULT_CHUNK_OVERLAP = 100
if TYPE_CHECKING:
from sqlalchemy.orm import Session
from python.ebook_search.config import EbookSearchConfig
from python.ebook_search.epub_parse import ParsedChapter
@dataclass(frozen=True)
class TextChunk:
"""A token-bounded chunk of text."""
text: str
token_start: int
token_count: int
def chunk_text(
text: str,
*,
chunk_tokens: int = DEFAULT_CHUNK_TOKENS,
overlap_tokens: int = DEFAULT_CHUNK_OVERLAP,
) -> list[TextChunk]:
"""Split text into overlapping token chunks."""
if chunk_tokens <= 0:
msg = "chunk_tokens must be positive"
raise ValueError(msg)
if overlap_tokens < 0 or overlap_tokens >= chunk_tokens:
msg = "overlap_tokens must be non-negative and smaller than chunk_tokens"
raise ValueError(msg)
encoding = tiktoken.get_encoding("cl100k_base")
tokens = encoding.encode(text)
if not tokens:
return []
chunks: list[TextChunk] = []
step = chunk_tokens - overlap_tokens
for start in range(0, len(tokens), step):
chunk = tokens[start : start + chunk_tokens]
if not chunk:
continue
chunks.append(
TextChunk(
text=encoding.decode(chunk).strip(),
token_start=start,
token_count=len(chunk),
)
)
if start + chunk_tokens >= len(tokens):
break
return [chunk for chunk in chunks if chunk.text]
def ingest_configured_paths(session: Session, config: EbookSearchConfig) -> int:
"""Ingest every EPUB found under configured library paths."""
count = 0
for library_path in config.library_paths:
path = Path(library_path).expanduser()
logger.info("ebook_ingest_path_start path=%s", path)
if path.is_file() and path.suffix.lower() == ".epub":
count += int(ingest_file(session, path, config))
elif path.is_dir():
for epub_path in sorted(path.rglob("*.epub")):
count += int(ingest_file(session, epub_path, config))
else:
logger.warning("ebook_ingest_path_missing path=%s", path)
logger.info("ebook_ingest_paths_complete changed_files=%s configured_paths=%s", count, len(config.library_paths))
return count
def ingest_file(session: Session, path: Path, config: EbookSearchConfig) -> bool:
"""Ingest one EPUB file. Return True when the database changed."""
resolved_path = path.expanduser().resolve()
logger.info("ebook_ingest_file_start path=%s", resolved_path)
file_hash = sha256_file(resolved_path)
existing = find_existing_source(session, resolved_path, file_hash)
if existing is not None and existing.file_sha256 == file_hash:
stat = resolved_path.stat()
existing.file_path = str(resolved_path)
existing.file_mtime = datetime.fromtimestamp(stat.st_mtime, tz=UTC)
existing.file_size = stat.st_size
session.flush()
logger.info("ebook_ingest_file_unchanged source_id=%s path=%s", existing.id, resolved_path)
return False
if existing is not None:
logger.info("ebook_ingest_file_replacing source_id=%s path=%s", existing.id, resolved_path)
session.delete(existing)
session.flush()
stat = resolved_path.stat()
parsed = parse_epub(resolved_path)
source = EbookSource(
title=parsed.title,
author=parsed.author,
language=parsed.language,
publisher=parsed.publisher,
identifier=parsed.identifier,
file_path=str(resolved_path),
file_sha256=file_hash,
file_mtime=datetime.fromtimestamp(stat.st_mtime, tz=UTC),
file_size=stat.st_size,
)
session.add(source)
session.flush()
chunk_index = 0
for spine_index, parsed_chapter in enumerate(parsed.chapters):
chapter = EbookChapter(
source_id=source.id,
spine_index=spine_index,
title=parsed_chapter.title,
href=parsed_chapter.href,
)
session.add(chapter)
session.flush()
chunk_index = add_chapter_chunks(session, source, chapter, parsed_chapter, chunk_index, config)
session.flush()
logger.info(
"ebook_ingest_file_complete source_id=%s path=%s chapters=%s chunks=%s",
source.id,
resolved_path,
len(parsed.chapters),
chunk_index,
)
return True
def find_existing_source(session: Session, path: Path, file_hash: str) -> EbookSource | None:
"""Find an existing source by canonical path or file hash."""
return session.scalar(
select(EbookSource).where(or_(EbookSource.file_path == str(path), EbookSource.file_sha256 == file_hash))
)
def add_chapter_chunks(
session: Session,
source: EbookSource,
chapter: EbookChapter,
parsed_chapter: ParsedChapter,
chunk_index: int,
config: EbookSearchConfig,
) -> int:
"""Add chunk rows for one parsed chapter and return the next chunk index."""
page_label = parsed_chapter.page_labels[0] if parsed_chapter.page_labels else None
for text_chunk in chunk_text(
parsed_chapter.text,
chunk_tokens=config.chunk_tokens,
overlap_tokens=config.chunk_overlap,
):
session.add(
EbookChunk(
source_id=source.id,
chapter_id=chapter.id,
chunk_index=chunk_index,
text=text_chunk.text,
token_start=text_chunk.token_start,
token_count=text_chunk.token_count,
page_label=page_label,
content_sha256=hashlib.sha256(text_chunk.text.encode()).hexdigest(),
search_text=f"{source.title} {source.author or ''} {chapter.title or ''} {text_chunk.text}",
)
)
chunk_index += 1
return chunk_index
def sha256_file(path: Path) -> str:
"""Calculate the SHA-256 digest for a file."""
digest = hashlib.sha256()
with path.open("rb") as file:
for block in iter(lambda: file.read(1024 * 1024), b""):
digest.update(block)
return digest.hexdigest()
+173
View File
@@ -0,0 +1,173 @@
"""LLM provider HTTP adapters."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
import httpx
if TYPE_CHECKING:
from collections.abc import Sequence
from python.ebook_search.config import EbookSearchConfig, RerankConfig
logger = logging.getLogger(__name__)
def auth_headers(api_key: str) -> dict[str, str]:
"""Build authorization headers when an API key is configured."""
if api_key == "not-needed":
return {}
return {"Authorization": f"Bearer {api_key}"}
def request_embeddings(texts: Sequence[str], config: EbookSearchConfig) -> list[list[float]]:
"""Request embeddings from the configured OpenAI-compatible endpoint."""
try:
response = httpx.post(
f"{config.embedding_base_url.rstrip('/')}/embeddings",
headers=auth_headers(config.embedding_api_key),
json={"model": config.embedding_model, "input": list(texts)},
timeout=config.embedding_timeout_seconds,
)
response.raise_for_status()
return embedding_vectors_from_response(response.json())
except (httpx.HTTPError, ValueError, KeyError, TypeError) as error:
logger.exception(
"ebook_embed_request_failed base_url=%s model=%s count=%s",
config.embedding_base_url,
config.embedding_model,
len(texts),
)
msg = f"Embedding request failed. base_url={config.embedding_base_url} model={config.embedding_model}"
raise RuntimeError(msg) from error
def check_embedding_endpoint(config: EbookSearchConfig, *, timeout_seconds: float = 5.0) -> bool:
"""Return whether the configured embedding endpoint answers a model listing."""
try:
response = httpx.get(
f"{config.embedding_base_url.rstrip('/')}/models",
headers=auth_headers(config.embedding_api_key),
timeout=timeout_seconds,
)
response.raise_for_status()
except httpx.HTTPError as error:
logger.warning("ebook_embedding_endpoint_unreachable base_url=%s error=%s", config.embedding_base_url, error)
return False
return True
def check_chat_endpoint(config: EbookSearchConfig, *, timeout_seconds: float = 5.0) -> bool:
"""Return whether the configured chat (answering) endpoint answers a model listing."""
try:
response = httpx.get(
f"{config.vllm_base_url.rstrip('/')}/models",
headers=auth_headers(config.vllm_api_key),
timeout=timeout_seconds,
)
response.raise_for_status()
except httpx.HTTPError as error:
logger.warning("ebook_chat_endpoint_unreachable base_url=%s error=%s", config.vllm_base_url, error)
return False
return True
def embedding_vectors_from_response(body: object) -> list[list[float]]:
"""Extract embedding vectors from an OpenAI-compatible embedding response."""
if not isinstance(body, dict):
msg = "Embedding response is not an object"
raise TypeError(msg)
data = body["data"]
if not isinstance(data, list):
msg = "Embedding response data is not a list"
raise TypeError(msg)
vectors: list[list[float]] = []
for item in data:
if not isinstance(item, dict):
msg = "Embedding item is not an object"
raise TypeError(msg)
embedding = item["embedding"]
if not isinstance(embedding, list):
msg = "Embedding value is not a list"
raise TypeError(msg)
vectors.append([float(value) for value in embedding])
return vectors
def request_rerank(
query: str,
documents: Sequence[str],
config: RerankConfig,
) -> object | None:
"""Request rerank scores from the configured vLLM endpoint."""
payload = {
"model": config.model,
"query": query,
"documents": list(documents),
}
response = httpx.post(
f"{config.base_url.rstrip('/')}/rerank",
json=payload,
timeout=config.timeout_seconds,
)
response.raise_for_status()
try:
return response.json()
except ValueError:
logger.debug("ebook_rerank_response_invalid_json", extra={"response": response.text})
return None
def request_chat_completion(
config: EbookSearchConfig,
messages: Sequence[dict[str, str]],
) -> str:
"""Request a chat completion from the configured OpenAI-compatible endpoint."""
try:
response = httpx.post(
f"{config.vllm_base_url.rstrip('/')}/chat/completions",
headers=auth_headers(config.vllm_api_key),
json={
"model": config.chat_model,
"messages": list(messages),
"temperature": 0,
},
timeout=config.chat_timeout_seconds,
)
response.raise_for_status()
return chat_content_from_response(response.json())
except (httpx.HTTPError, ValueError, KeyError, TypeError) as error:
msg = f"Chat request failed. base_url={config.vllm_base_url} model={config.chat_model}"
raise RuntimeError(msg) from error
def chat_content_from_response(body: object) -> str:
"""Extract text content from an OpenAI-compatible chat response."""
if not isinstance(body, dict):
msg = "Chat response is not an object"
raise TypeError(msg)
choices = body["choices"]
if not isinstance(choices, list) or not choices:
msg = "Chat response has no choices"
raise ValueError(msg)
first = choices[0]
if not isinstance(first, dict):
msg = "Chat choice is not an object"
raise TypeError(msg)
message = first["message"]
if not isinstance(message, dict):
msg = "Chat message is not an object"
raise TypeError(msg)
content = message.get("content") or ""
if not isinstance(content, str):
msg = "Chat content is not text"
raise TypeError(msg)
return content
+218
View File
@@ -0,0 +1,218 @@
"""Load test for the EPUB search service.
Drives ``POST /search`` on a running server at a configurable concurrency and reports
latency percentiles, throughput, and HTTP status distribution. Queries are drawn from
the shared JSONL set (see ``eval/data/queries.jsonl``) that the eval also uses, so load
and evaluation exercise the same questions. Answer generation and reranking happen
server-side, so this exercises the full retrieval pipeline.
"""
from __future__ import annotations
import asyncio
import logging
import math
import random
import statistics
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Annotated
import httpx
import typer
from python.common import configure_logger
from python.ebook_search.eval.dataset import DEFAULT_QUERIES_PATH, load_gold_queries
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class RequestResult:
"""Outcome of a single search request."""
status_code: int
latency_ms: float
ok: bool
@dataclass(frozen=True)
class LoadSummary:
"""Aggregate results of a load test run."""
total: int
successes: int
failures: int
wall_seconds: float
throughput_rps: float
latency_p50_ms: float
latency_p90_ms: float
latency_p95_ms: float
latency_p99_ms: float
latency_mean_ms: float
latency_max_ms: float
status_counts: dict[int, int]
def load_queries(queries_file: str | None) -> list[str]:
"""Return the query strings from the shared JSONL set (or a custom JSONL file)."""
path = Path(queries_file) if queries_file else DEFAULT_QUERIES_PATH
queries = [gold.query for gold in load_gold_queries(path)]
if not queries:
msg = f"No queries found in {path}"
raise typer.BadParameter(msg)
return queries
def pick_query(queries: list[str]) -> str:
"""Return a uniformly random query from the pool (not a security context)."""
return random.choice(queries) # noqa: S311 load-test query sampling is not security-sensitive
def percentile(values_sorted: list[float], pct: float) -> float:
"""Return the linearly-interpolated percentile of a sorted list."""
if not values_sorted:
return 0.0
rank = (pct / 100) * (len(values_sorted) - 1)
low = math.floor(rank)
high = math.ceil(rank)
if low == high:
return values_sorted[low]
return values_sorted[low] + (values_sorted[high] - values_sorted[low]) * (rank - low)
def summarize(results: list[RequestResult], wall_seconds: float) -> LoadSummary:
"""Aggregate per-request results into a load summary."""
latencies = sorted(result.latency_ms for result in results)
successes = sum(1 for result in results if result.ok)
status_counts: dict[int, int] = {}
for result in results:
status_counts[result.status_code] = status_counts.get(result.status_code, 0) + 1
return LoadSummary(
total=len(results),
successes=successes,
failures=len(results) - successes,
wall_seconds=wall_seconds,
throughput_rps=len(results) / wall_seconds if wall_seconds > 0 else 0.0,
latency_p50_ms=percentile(latencies, 50),
latency_p90_ms=percentile(latencies, 90),
latency_p95_ms=percentile(latencies, 95),
latency_p99_ms=percentile(latencies, 99),
latency_mean_ms=statistics.fmean(latencies) if latencies else 0.0,
latency_max_ms=latencies[-1] if latencies else 0.0,
status_counts=status_counts,
)
async def send_search(client: httpx.AsyncClient, query: str, *, rerank: bool) -> RequestResult:
"""Send one search request and record its status and latency."""
data = {"query": query, "rerank": "true"} if rerank else {"query": query}
start = time.perf_counter()
try:
response = await client.post("/search", data=data)
except httpx.HTTPError as error:
logger.warning("ebook_loadtest_request_failed error=%s", error)
return RequestResult(status_code=0, latency_ms=(time.perf_counter() - start) * 1000, ok=False)
return RequestResult(
status_code=response.status_code,
latency_ms=(time.perf_counter() - start) * 1000,
ok=response.is_success,
)
async def worker(
client: httpx.AsyncClient,
queue: asyncio.Queue[str],
results: list[RequestResult],
*,
rerank: bool,
) -> None:
"""Pull queries off the queue and send requests until it is empty."""
while True:
try:
query = queue.get_nowait()
except asyncio.QueueEmpty:
return
results.append(await send_search(client, query, rerank=rerank))
async def run_load(
*,
base_url: str,
queries: list[str],
request_count: int,
concurrency: int,
rerank: bool,
warmup: int,
timeout_seconds: float,
) -> LoadSummary:
"""Run the load test and return its aggregate summary."""
limits = httpx.Limits(max_connections=concurrency, max_keepalive_connections=concurrency)
async with httpx.AsyncClient(base_url=base_url, timeout=timeout_seconds, limits=limits) as client:
for _ in range(warmup):
await send_search(client, pick_query(queries), rerank=rerank)
queue: asyncio.Queue[str] = asyncio.Queue()
for _ in range(request_count):
queue.put_nowait(pick_query(queries))
results: list[RequestResult] = []
start = time.perf_counter()
workers = [asyncio.create_task(worker(client, queue, results, rerank=rerank)) for _ in range(concurrency)]
await asyncio.gather(*workers)
wall_seconds = time.perf_counter() - start
return summarize(results, wall_seconds)
def print_summary(summary: LoadSummary) -> None:
"""Print the load summary to stdout."""
typer.echo(f"requests={summary.total} successes={summary.successes} failures={summary.failures}")
typer.echo(f"wall={summary.wall_seconds:.2f}s throughput={summary.throughput_rps:.1f} req/s")
typer.echo(
f"latency_ms p50={summary.latency_p50_ms:.1f} p90={summary.latency_p90_ms:.1f} "
f"p95={summary.latency_p95_ms:.1f} p99={summary.latency_p99_ms:.1f} "
f"mean={summary.latency_mean_ms:.1f} max={summary.latency_max_ms:.1f}"
)
status_summary = " ".join(f"{code}={count}" for code, count in sorted(summary.status_counts.items()))
typer.echo(f"status {status_summary}")
def main(
*,
base_url: Annotated[str, typer.Option(help="Base URL of the running service")] = "http://127.0.0.1:8070",
request_count: Annotated[int, typer.Option("--requests", help="Total requests to send")] = 200,
concurrency: Annotated[int, typer.Option(help="Concurrent in-flight requests")] = 10,
rerank: Annotated[bool, typer.Option(help="Request server-side reranking")] = False,
warmup: Annotated[int, typer.Option(help="Warmup requests, not measured")] = 5,
timeout_seconds: Annotated[float, typer.Option("--timeout", help="Per-request timeout seconds")] = 120.0,
queries_file: Annotated[str | None, typer.Option(help="Query JSONL file (defaults to the shared set)")] = None,
log_level: Annotated[str, typer.Option(help="Log level")] = "WARNING",
) -> None:
"""Load test the search endpoint and report latency and throughput."""
configure_logger(log_level)
queries = load_queries(queries_file)
logger.info(
"ebook_loadtest_start base_url=%s requests=%s concurrency=%s rerank=%s queries=%s",
base_url,
request_count,
concurrency,
rerank,
len(queries),
)
summary = asyncio.run(
run_load(
base_url=base_url,
queries=queries,
request_count=request_count,
concurrency=concurrency,
rerank=rerank,
warmup=warmup,
timeout_seconds=timeout_seconds,
)
)
print_summary(summary)
if __name__ == "__main__":
typer.run(main)
+132
View File
@@ -0,0 +1,132 @@
"""vLLM-backed optional reranking."""
from __future__ import annotations
import logging
from dataclasses import dataclass, replace
from typing import TYPE_CHECKING
from python.ebook_search.llm_interface import request_rerank
if TYPE_CHECKING:
from python.ebook_search.config import RerankConfig
from python.ebook_search.search import SearchResult
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class RerankResult:
"""A relevance score for one candidate chunk."""
chunk_id: int
score: float
def rerank_chunks(query: str, candidates: list[SearchResult], config: RerankConfig) -> list[SearchResult]:
"""Rerank candidates with a vLLM rerank endpoint."""
if not candidates:
return []
logger.info(
"ebook_rerank_request_start base_url=%s model=%s candidates=%s",
config.base_url,
config.model,
len(candidates),
)
scores = score_candidates(query, candidates, config)
results = sorted(
(
replace(
result,
score=final_rerank_score(result, scores[result.chunk_id].score, candidates, config),
rerank_score=scores[result.chunk_id].score,
)
for result in candidates
),
key=lambda result: result.score,
reverse=True,
)
logger.info(
"ebook_rerank_request_complete base_url=%s model=%s candidates=%s",
config.base_url,
config.model,
len(results),
)
return results
def score_candidates(
query: str,
candidates: list[SearchResult],
config: RerankConfig,
) -> dict[int, RerankResult]:
"""Score candidate chunks with the configured rerank API."""
body = request_rerank(query, [candidate.text for candidate in candidates], config)
if body is None:
return zero_rerank_scores(candidates)
scores = parse_vllm_scores(body, candidates)
for result in scores.values():
logger.debug("ebook_rerank_candidate_scored chunk_id=%s score=%s", result.chunk_id, result.score)
return scores
def parse_vllm_scores(body: object, candidates: list[SearchResult]) -> dict[int, RerankResult]:
"""Parse vLLM rerank scores into chunk-id keyed results."""
if not isinstance(body, dict):
logger.debug("ebook_rerank_response_not_object", extra={"response": body})
return zero_rerank_scores(candidates)
results = body.get("results") or body.get("data")
if not isinstance(results, list):
logger.debug("ebook_rerank_response_missing_results", extra={"response": body})
return zero_rerank_scores(candidates)
scores = zero_rerank_scores(candidates)
for item in results:
if not isinstance(item, dict):
continue
index = item.get("index")
score = item.get("relevance_score", item.get("score"))
if not isinstance(index, int) or index < 0 or index >= len(candidates):
continue
if not isinstance(score, int | float):
continue
chunk_id = candidates[index].chunk_id
scores[chunk_id] = RerankResult(chunk_id=chunk_id, score=clamp_score(float(score)))
return scores
def zero_rerank_scores(candidates: list[SearchResult]) -> dict[int, RerankResult]:
"""Return zero relevance scores for all candidate chunks."""
return {candidate.chunk_id: RerankResult(chunk_id=candidate.chunk_id, score=0.0) for candidate in candidates}
def clamp_score(score: float) -> float:
"""Clamp a rerank score into the supported 0.0 to 1.0 range."""
return min(max(score, 0.0), 1.0)
def final_rerank_score(
result: SearchResult,
rerank_score: float,
candidates: list[SearchResult],
config: RerankConfig,
) -> float:
"""Combine rerank relevance with normalized hybrid retrieval evidence."""
return (config.score_weight * rerank_score) + (config.hybrid_weight * normalized_hybrid_score(result, candidates))
def normalized_hybrid_score(result: SearchResult, candidates: list[SearchResult]) -> float:
"""Normalize a candidate hybrid score against the rerank candidate set."""
hybrid_scores = [
candidate.fused_score if candidate.fused_score is not None else candidate.score for candidate in candidates
]
low = min(hybrid_scores)
high = max(hybrid_scores)
if high == low:
return 1.0
score = result.fused_score if result.fused_score is not None else result.score
return (score - low) / (high - low)
+380
View File
@@ -0,0 +1,380 @@
"""Hybrid search orchestration."""
from __future__ import annotations
import logging
import re
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, replace
from typing import TYPE_CHECKING
from pgvector.sqlalchemy import Vector
from sqlalchemy import literal, select
from sqlalchemy.orm import Session
from python.ebook_search.bm25_corpus import (
BM25CorpusUnavailableError,
load_bm25_corpus,
score_bm25_corpus,
)
from python.ebook_search.embeddings import MODEL_DIMENSIONS, embed_query, get_embedding_table
from python.ebook_search.rerank import rerank_chunks
from python.ebook_search.timing import RuntimeStep, timed_result
from python.orm.richie import (
EbookChapter,
EbookChunk,
EbookEmbeddingModel,
EbookSource,
)
if TYPE_CHECKING:
from collections.abc import Mapping
from sqlalchemy.engine import Engine
from python.ebook_search.config import EbookSearchConfig
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class SearchResult:
"""One source chunk returned by search."""
chunk_id: int
text: str
source_title: str
score: float = 0.0
vector_score: float | None = None
bm25_score: float | None = None
fused_score: float | None = None
rerank_score: float | None = None
source_author: str | None = None
chapter_title: str | None = None
page_label: str | None = None
rank_source: str = "Hybrid"
@dataclass(frozen=True)
class SearchResponse:
"""Search output for the UI."""
query: str
results: list[SearchResult]
rank_label: str
timings: tuple[RuntimeStep, ...] = ()
@property
def total_runtime_ms(self) -> float:
"""Return total measured runtime for the response."""
return sum(step.duration_ms for step in self.timings if step.counts_toward_total)
@dataclass(frozen=True)
class RetrievalResponse:
"""Parallel retrieval output for vector and BM25 candidates."""
vector_results: list[SearchResult]
lexical_results: list[SearchResult]
timings: tuple[RuntimeStep, ...]
def search_ebooks(
engine: Engine,
query: str,
config: EbookSearchConfig,
*,
rerank: bool = False,
) -> SearchResponse:
"""Run hybrid vector/BM25 search and optional reranking."""
if not query.strip():
logger.info("ebook_search_empty_query")
return SearchResponse(query=query, results=[], rank_label="Hybrid")
logger.info("ebook_search_start query_length=%s rerank=%s", len(query), rerank)
timings: list[RuntimeStep] = []
retrieval, timing = timed_result(
"Hybrid retrieval",
parallel_retrieval,
engine,
query,
config,
)
timings.extend(retrieval.timings)
timings.append(timing)
fused, timing = timed_result(
"Reciprocal rank fusion",
reciprocal_rank_fusion,
retrieval.vector_results,
retrieval.lexical_results,
rank_constant=config.rrf_rank_constant,
)
timings.append(timing)
if config.rerank.enabled and rerank:
response, timing = timed_result("Rerank", apply_rerank, query, fused, config)
else:
response, timing = timed_result("Rerank skipped", skip_rerank, query, fused, config)
timings.append(timing)
response = replace(response, timings=tuple(timings))
logger.info(
"ebook_search_complete vector_candidates=%s lexical_candidates=%s "
"fused_candidates=%s returned=%s rank_label=%s runtime_ms=%.1f",
len(retrieval.vector_results),
len(retrieval.lexical_results),
len(fused),
len(response.results),
response.rank_label,
response.total_runtime_ms,
)
return response
def parallel_retrieval(
engine: Engine,
query: str,
config: EbookSearchConfig,
) -> RetrievalResponse:
"""Run vector and BM25 candidate retrieval concurrently with separate database sessions."""
with ThreadPoolExecutor(max_workers=2, thread_name_prefix="ebook-search") as executor:
vector_future = executor.submit(
timed_result,
"Embedding + vector search",
vector_candidates,
engine,
query,
config,
)
bm25_future = executor.submit(
timed_result,
"BM25 search",
bm25_candidates,
query,
config,
)
vector_results, vector_timing = vector_future.result()
lexical_results, lexical_timing = bm25_future.result()
logger.info(
"ebook_parallel_retrieval_complete vector_candidates=%s lexical_candidates=%s",
len(vector_results),
len(lexical_results),
)
return RetrievalResponse(
vector_results=vector_results,
lexical_results=lexical_results,
timings=(
replace(vector_timing, counts_toward_total=False),
replace(lexical_timing, counts_toward_total=False),
),
)
def skip_rerank(
query: str,
candidates: list[SearchResult],
config: EbookSearchConfig,
) -> SearchResponse:
"""Return fused hybrid results without reranking."""
logger.info("ebook_rerank_skipped candidates=%s", len(candidates))
return SearchResponse(query=query, results=candidates[: config.top_k], rank_label="Hybrid")
def apply_rerank(
query: str,
candidates: list[SearchResult],
config: EbookSearchConfig,
) -> SearchResponse:
"""Rerank already-fused hybrid candidates."""
reranked = rerank_chunks(query, candidates[: config.rerank.candidates], config.rerank)
logger.info(
"ebook_rerank_complete input_candidates=%s returned=%s",
min(len(candidates), config.rerank.candidates),
len(reranked),
)
return SearchResponse(
query=query,
results=[replace(result, rank_source="Hybrid + rerank") for result in reranked[: config.top_k]],
rank_label="Hybrid + rerank",
)
def vector_candidates(engine: Engine, query: str, config: EbookSearchConfig) -> list[SearchResult]:
"""Return pgvector cosine candidates for a natural-language query."""
with Session(engine) as session:
model = session.scalar(select(EbookEmbeddingModel).where(EbookEmbeddingModel.name == config.embedding_model))
if model is None:
msg = f"Embedding model is not registered: {config.embedding_model}"
raise ValueError(msg)
expected_dimension = MODEL_DIMENSIONS[config.embedding_model]
if model.dimension != expected_dimension:
msg = f"Model row dimension {model.dimension} does not match configured dimension {expected_dimension}"
raise ValueError(msg)
embedding = embed_query(query, config)
limit = max(config.rerank.candidates, config.top_k) * config.vector_candidate_multiplier
embedding_table = get_embedding_table(model.dimension)
embedding_param = literal(embedding, type_=Vector(model.dimension))
distance = embedding_table.embedding.op("<=>")(embedding_param)
score = (literal(1.0) - distance).label("score")
statement = (
select(
EbookChunk.id.label("chunk_id"),
EbookChunk.text.label("text"),
EbookSource.title.label("source_title"),
EbookSource.author.label("source_author"),
EbookChapter.title.label("chapter_title"),
EbookChunk.page_label.label("page_label"),
score,
)
.select_from(embedding_table)
.join(EbookChunk, EbookChunk.id == embedding_table.chunk_id)
.join(EbookSource, EbookSource.id == EbookChunk.source_id)
.outerjoin(EbookChapter, EbookChapter.id == EbookChunk.chapter_id)
.where(embedding_table.model_id == model.id)
.order_by(distance)
.limit(limit)
)
rows = session.execute(statement).mappings()
results = [search_result_from_row(row) for row in rows]
logger.info(
"ebook_vector_search_complete model=%s dimension=%s candidates=%s",
config.embedding_model,
model.dimension,
len(results),
)
return results
def bm25_candidates(query: str, config: EbookSearchConfig) -> list[SearchResult]:
"""Return BM25-ranked lexical candidates using the persisted corpus."""
try:
corpus = load_bm25_corpus(config)
except BM25CorpusUnavailableError as error:
logger.warning("ebook_bm25_index_unavailable_skipping error=%s", error)
return []
if not corpus.records:
logger.info("ebook_bm25_search_complete corpus=0 candidates=0")
return []
bm25_query = retrieval_query_from_text(query)
scored_records = score_bm25_corpus(bm25_query, corpus, limit=config.bm25_candidate_limit)
results = [
replace(search_result_from_row(record), score=score, vector_score=None, bm25_score=score)
for record, score in scored_records
]
max_score = results[0].bm25_score if results else 0.0
logger.info(
"ebook_bm25_search_complete corpus=%s candidates=%s max_score=%.6f",
len(corpus.records),
len(results),
max_score,
)
return results
def reciprocal_rank_fusion(
vector_results: list[SearchResult],
lexical_results: list[SearchResult],
rank_constant: int,
) -> list[SearchResult]:
"""Fuse vector and lexical rankings with Reciprocal Rank Fusion."""
by_chunk: dict[int, SearchResult] = {}
scores: defaultdict[int, float] = defaultdict(float)
vector_scores: dict[int, float] = {}
bm25_scores: dict[int, float] = {}
for rank, result in enumerate(vector_results, start=1):
by_chunk.setdefault(result.chunk_id, result)
vector_scores[result.chunk_id] = result.vector_score if result.vector_score is not None else result.score
scores[result.chunk_id] += 1 / (rank_constant + rank)
for rank, result in enumerate(lexical_results, start=1):
by_chunk.setdefault(result.chunk_id, result)
bm25_scores[result.chunk_id] = result.bm25_score if result.bm25_score is not None else result.score
scores[result.chunk_id] += 1 / (rank_constant + rank)
return sorted(
(
replace(
result,
score=scores[result.chunk_id],
vector_score=vector_scores.get(result.chunk_id),
bm25_score=bm25_scores.get(result.chunk_id),
fused_score=scores[result.chunk_id],
rank_source="Hybrid",
)
for result in by_chunk.values()
),
key=lambda result: result.score,
reverse=True,
)
def search_result_from_row(row: Mapping[str, object]) -> SearchResult:
"""Convert a database row mapping into a search result."""
return SearchResult(
chunk_id=int(row["chunk_id"]),
text=str(row["text"]),
source_title=str(row["source_title"]),
source_author=optional_str(row["source_author"]),
chapter_title=optional_str(row["chapter_title"]),
page_label=optional_str(row["page_label"]),
score=float(row["score"]) if "score" in row else 0.0,
vector_score=float(row["score"]) if "score" in row else None,
)
def optional_str(value: object) -> str | None:
"""Convert nullable database values to optional strings."""
if value is None:
return None
return str(value)
TOKEN_RE = re.compile(r"[A-Za-z0-9_]+")
def tokens(text_value: str) -> list[str]:
"""Extract tokens from a text value.
This is a simple approximation of the tokenization used by PostgreSQL's full-text search,
which is sufficient for BM25 candidate retrieval. It lowercases tokens and includes alphanumeric characters and
underscores.
"""
return [match.group(0).lower() for match in TOKEN_RE.finditer(text_value)]
QUERY_STOP_WORDS = {
"a",
"an",
"and",
"are",
"as",
"at",
"does",
"for",
"in",
"is",
"of",
"the",
"to",
"what",
"when",
"where",
"which",
"who",
"why",
}
def retrieval_query_from_text(query: str) -> str:
"""Remove generic question words while preserving entity and series terms."""
keywords = [token for token in tokens(query) if token not in QUERY_STOP_WORDS]
if not keywords:
return query
return " ".join(keywords)
+36
View File
@@ -0,0 +1,36 @@
"""Runtime timing helpers for EPUB search."""
from __future__ import annotations
from dataclasses import dataclass
from time import perf_counter
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Callable
@dataclass(frozen=True)
class RuntimeStep:
"""Elapsed runtime for one named search step."""
name: str
duration_ms: float
counts_toward_total: bool = True
def runtime_step_from_start(name: str, start_seconds: float) -> RuntimeStep:
"""Create a runtime step from a prior perf_counter timestamp."""
return RuntimeStep(name=name, duration_ms=(perf_counter() - start_seconds) * 1000)
def timed_result[T, **P](
name: str,
operation: Callable[P, T],
*args: P.args,
**kwargs: P.kwargs,
) -> tuple[T, RuntimeStep]:
"""Run an operation and return its result plus elapsed runtime."""
start_seconds = perf_counter()
result = operation(*args, **kwargs)
return result, runtime_step_from_start(name, start_seconds)
+6
View File
@@ -0,0 +1,6 @@
"""Reusable FastAPI tools."""
from python.fastapi_tools.db import DbSession, get_db
from python.fastapi_tools.zstd_middleware import ZstdMiddleware
__all__ = ["DbSession", "ZstdMiddleware", "get_db"]
@@ -1,4 +1,4 @@
"""Middleware for the FastAPI application."""
"""Zstd response compression middleware."""
from compression import zstd
from typing import TYPE_CHECKING
+24 -2
View File
@@ -31,8 +31,24 @@ def get_connection_info(name: str) -> tuple[str, str, str, str, str | None]:
return cast("tuple[str, str, str, str, str | None]", (database, host, port, username, password))
def get_postgres_engine(*, name: str = "POSTGRES", pool_pre_ping: bool = True) -> Engine:
"""Create a SQLAlchemy engine from environment variables."""
def get_postgres_engine(
*,
name: str = "POSTGRES",
pool_pre_ping: bool = True,
vector_engine: bool = False,
) -> Engine:
"""Create a SQLAlchemy engine from environment variables.
Args:
name (str, optional): The name of the environment variable prefix. Defaults to "POSTGRES".
pool_pre_ping (bool, optional): Whether to ping the database before each connection. Defaults to True.
This fixes the issue of trying to use a conection that has timed out on the database side.
vector_engine (bool, optional): Whether to use the vector search schema. Defaults to False.
This updates the search path the incldued the vecore types and operators.
Returns:
Engine: The SQLAlchemy engine.
"""
database, host, port, username, password = get_connection_info(name)
url = URL.create(
@@ -44,8 +60,14 @@ def get_postgres_engine(*, name: str = "POSTGRES", pool_pre_ping: bool = True) -
database=database,
)
connect_args = {}
# There more better way to do this is with separate PG account and a dedicated vector schema for the vector types
if vector_engine:
connect_args["options"] = "-csearch_path=main,public"
return create_engine(
url=url,
pool_pre_ping=pool_pre_ping,
pool_recycle=1800,
connect_args=connect_args,
)
+16
View File
@@ -11,6 +11,15 @@ from python.orm.richie.contact import (
Need,
RelationshipType,
)
from python.orm.richie.ebook import (
EbookChapter,
EbookChunk,
EbookChunkEmbedding1024,
EbookChunkEmbedding2560,
EbookChunkEmbedding4096,
EbookEmbeddingModel,
EbookSource,
)
__all__ = [
"Audiobook",
@@ -19,6 +28,13 @@ __all__ = [
"Contact",
"ContactNeed",
"ContactRelationship",
"EbookChapter",
"EbookChunk",
"EbookChunkEmbedding1024",
"EbookChunkEmbedding2560",
"EbookChunkEmbedding4096",
"EbookEmbeddingModel",
"EbookSource",
"Need",
"RelationshipType",
"RichieBase",
+138
View File
@@ -0,0 +1,138 @@
"""EPUB search models."""
from __future__ import annotations
from datetime import datetime
from pgvector.sqlalchemy import Vector
from sqlalchemy import BigInteger, Boolean, DateTime, ForeignKey, Index, String, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from python.orm.richie.base import TableBase, TableBaseBig
class EbookSource(TableBase):
"""One indexed EPUB file."""
__tablename__ = "ebook_source"
__table_args__ = (
UniqueConstraint("file_path"),
UniqueConstraint("file_sha256"),
)
title: Mapped[str]
author: Mapped[str | None]
language: Mapped[str | None]
publisher: Mapped[str | None]
identifier: Mapped[str | None]
file_path: Mapped[str]
file_sha256: Mapped[str] = mapped_column(String(64))
file_mtime: Mapped[datetime] = mapped_column(DateTime(timezone=True))
file_size: Mapped[int] = mapped_column(BigInteger)
chapters: Mapped[list[EbookChapter]] = relationship(
"EbookChapter",
back_populates="source",
cascade="all, delete-orphan",
passive_deletes=True,
)
chunks: Mapped[list[EbookChunk]] = relationship(
"EbookChunk",
back_populates="source",
cascade="all, delete-orphan",
passive_deletes=True,
)
class EbookChapter(TableBase):
"""A chapter or spine document inside an EPUB."""
__tablename__ = "ebook_chapter"
__table_args__ = (UniqueConstraint("source_id", "spine_index"),)
source_id: Mapped[int] = mapped_column(ForeignKey("main.ebook_source.id", ondelete="CASCADE"))
spine_index: Mapped[int]
title: Mapped[str | None]
href: Mapped[str | None]
source: Mapped[EbookSource] = relationship("EbookSource", back_populates="chapters")
chunks: Mapped[list[EbookChunk]] = relationship(
"EbookChunk",
back_populates="chapter",
cascade="all, delete-orphan",
passive_deletes=True,
)
class EbookChunk(TableBaseBig):
"""A searchable text chunk."""
__tablename__ = "ebook_chunk"
__table_args__ = (
UniqueConstraint("source_id", "chunk_index", name="uq_ebook_chunk_source_id_chunk_index"),
UniqueConstraint("source_id", "content_sha256", name="uq_ebook_chunk_source_id_content_sha256"),
)
source_id: Mapped[int] = mapped_column(ForeignKey("main.ebook_source.id", ondelete="CASCADE"))
chapter_id: Mapped[int | None] = mapped_column(ForeignKey("main.ebook_chapter.id", ondelete="SET NULL"))
chunk_index: Mapped[int]
text: Mapped[str]
token_start: Mapped[int]
token_count: Mapped[int]
page_label: Mapped[str | None]
content_sha256: Mapped[str] = mapped_column(String(64))
search_text: Mapped[str]
source: Mapped[EbookSource] = relationship("EbookSource", back_populates="chunks")
chapter: Mapped[EbookChapter | None] = relationship("EbookChapter", back_populates="chunks")
class EbookEmbeddingModel(TableBase):
"""A supported embedding model."""
__tablename__ = "ebook_embedding_model"
name: Mapped[str] = mapped_column(String, unique=True)
dimension: Mapped[int]
is_default: Mapped[bool] = mapped_column(Boolean, default=False)
class EbookChunkEmbedding1024(TableBaseBig):
"""1024-dimensional chunk embedding."""
__tablename__ = "ebook_chunk_embedding_1024"
__table_args__ = (
UniqueConstraint("chunk_id", "model_id"),
Index(
"ix_ebook_chunk_embedding_1024_embedding_cosine",
"embedding",
postgresql_using="hnsw",
postgresql_ops={"embedding": "vector_cosine_ops"},
),
)
chunk_id: Mapped[int] = mapped_column(ForeignKey("main.ebook_chunk.id", ondelete="CASCADE"))
model_id: Mapped[int] = mapped_column(ForeignKey("main.ebook_embedding_model.id", ondelete="CASCADE"))
embedding: Mapped[list[float]] = mapped_column(Vector(1024))
class EbookChunkEmbedding2560(TableBaseBig):
"""2560-dimensional chunk embedding."""
__tablename__ = "ebook_chunk_embedding_2560"
__table_args__ = (UniqueConstraint("chunk_id", "model_id"),)
chunk_id: Mapped[int] = mapped_column(ForeignKey("main.ebook_chunk.id", ondelete="CASCADE"))
model_id: Mapped[int] = mapped_column(ForeignKey("main.ebook_embedding_model.id", ondelete="CASCADE"))
embedding: Mapped[list[float]] = mapped_column(Vector(2560))
class EbookChunkEmbedding4096(TableBaseBig):
"""4096-dimensional chunk embedding."""
__tablename__ = "ebook_chunk_embedding_4096"
__table_args__ = (UniqueConstraint("chunk_id", "model_id"),)
chunk_id: Mapped[int] = mapped_column(ForeignKey("main.ebook_chunk.id", ondelete="CASCADE"))
model_id: Mapped[int] = mapped_column(ForeignKey("main.ebook_embedding_model.id", ondelete="CASCADE"))
embedding: Mapped[list[float]] = mapped_column(Vector(4096))
+2
View File
@@ -32,6 +32,8 @@
enable = true;
allowedTCPPorts = [
8000
8001
8002
];
};
networkmanager.enable = true;
+1 -1
View File
@@ -4,7 +4,7 @@
host = "0.0.0.0";
enable = true;
syncModels = true;
syncModels = false;
loadModels = [
"codellama:7b"
"deepscaler:1.5b"
+3
View File
@@ -17,6 +17,9 @@
allowedTCPPorts = [ ];
allowedUDPPorts = [ ];
};
allowedTCPPorts = [
8070
];
};
useNetworkd = true;
};
+4 -1
View File
@@ -6,7 +6,7 @@ in
user = "ollama";
enable = true;
host = "0.0.0.0";
syncModels = true;
syncModels = false;
loadModels = [
"codellama:7b"
"deepscaler:1.5b"
@@ -30,6 +30,9 @@ in
"ministral-3:14b"
"nemotron-3-nano:30b"
"qwen3-coder:30b"
"qwen3-embedding:0.6b"
"qwen3-embedding:4b"
"qwen3-embedding:8b"
"qwen3-vl:32b"
"qwen3:14b"
"qwen3.5:35b"
-11
View File
@@ -38,9 +38,6 @@ in
# signalbot
local signalbot signalbot trust
# hedgedoc
local hedgedoc hedgedoc trust
# math
local postgres math trust
host postgres math 127.0.0.1/32 trust
@@ -120,19 +117,11 @@ in
login = true;
};
}
{
name = "hedgedoc";
ensureDBOwnership = true;
ensureClauses = {
login = true;
};
}
];
ensureDatabases = [
"data_science_dev"
"hass"
"gitea"
"hedgedoc"
"math"
"n8n"
"richie"
+1
View File
@@ -0,0 +1 @@
"""Focused ebook search tests."""
+505
View File
@@ -0,0 +1,505 @@
"""Tests for EPUB search core helpers."""
from __future__ import annotations
import logging
from datetime import UTC, datetime
from os import environ
from pathlib import Path
from types import ModuleType
from typing import TYPE_CHECKING
import pytest
from sqlalchemy import create_engine, select
from sqlalchemy.orm import sessionmaker
from python.ebook_search.answer import answer_query
from python.ebook_search.bm25_corpus import (
BM25Corpus,
BM25CorpusUnavailableError,
BM25Manifest,
ensure_bm25_corpus,
fetch_bm25_corpus_records,
load_bm25_corpus,
read_bm25_manifest,
score_bm25_corpus,
write_bm25_corpus,
)
from python.ebook_search.config import EbookSearchConfig, RerankConfig, load_config, normalize_embedding_model
from python.ebook_search.embeddings import MODEL_DIMENSIONS, ensure_embedding_models
from python.ebook_search.ingest import chunk_text, find_existing_source
from python.ebook_search.search import (
SearchResponse,
SearchResult,
bm25_candidates,
reciprocal_rank_fusion,
retrieval_query_from_text,
)
from python.ebook_search.timing import RuntimeStep
from python.orm.richie import (
EbookChapter,
EbookChunk,
EbookChunkEmbedding1024,
EbookEmbeddingModel,
EbookSource,
RichieBase,
)
if TYPE_CHECKING:
from pytest_mock import MockerFixture
def test_chunk_text_uses_overlap() -> None:
chunks = chunk_text(" ".join(str(index) for index in range(100)), chunk_tokens=20, overlap_tokens=5)
assert len(chunks) > 1
assert chunks[0].token_start == 0
assert chunks[1].token_start == 15
assert all(chunk.token_count <= 20 for chunk in chunks)
def test_reciprocal_rank_fusion_combines_vector_and_bm25_rankings() -> None:
vector_results = [
SearchResult(chunk_id=1, text="a", source_title="A", score=0.9, vector_score=0.9),
SearchResult(chunk_id=2, text="b", source_title="B", score=0.8, vector_score=0.8),
]
lexical_results = [
SearchResult(chunk_id=2, text="b", source_title="B", score=4.2, bm25_score=4.2),
SearchResult(chunk_id=3, text="c", source_title="C", score=2.1, bm25_score=2.1),
]
fused = reciprocal_rank_fusion(vector_results, lexical_results)
assert [result.chunk_id for result in fused] == [2, 1, 3]
assert fused[0].rank_source == "Hybrid"
assert fused[0].vector_score == 0.8
assert fused[0].bm25_score == 4.2
assert fused[0].fused_score == fused[0].score
def test_find_existing_source_matches_path_or_hash() -> None:
engine = create_engine("sqlite+pysqlite:///:memory:", future=True)
RichieBase.metadata.create_all(engine)
with sessionmaker(bind=engine, expire_on_commit=False, future=True)() as session:
source = EbookSource(
title="Book",
author=None,
language=None,
publisher=None,
identifier=None,
file_path="/old/book.epub",
file_sha256="a" * 64,
file_mtime=datetime.now(tz=UTC),
file_size=10,
)
session.add(source)
session.commit()
assert find_existing_source(session, Path("/old/book.epub"), "b" * 64) == source
assert find_existing_source(session, Path("/new/book.epub"), "a" * 64) == source
def test_bm25_corpus_uses_existing_search_text_without_duplicate_metadata() -> None:
engine = create_engine("sqlite+pysqlite:///:memory:", future=True)
RichieBase.metadata.create_all(engine)
with sessionmaker(bind=engine, expire_on_commit=False, future=True)() as session:
source = EbookSource(
title="Book",
author="Author",
language=None,
publisher=None,
identifier=None,
file_path="/book.epub",
file_sha256="a" * 64,
file_mtime=datetime.now(tz=UTC),
file_size=10,
)
session.add(source)
session.flush()
chapter = EbookChapter(source_id=source.id, spine_index=0, title="Chapter", href=None)
session.add(chapter)
session.flush()
session.add(
EbookChunk(
id=1,
source_id=source.id,
chapter_id=chapter.id,
chunk_index=0,
text="content",
token_start=0,
token_count=1,
page_label=None,
content_sha256="b" * 64,
search_text="Book Author Chapter content",
)
)
session.commit()
records, texts = fetch_bm25_corpus_records(session)
assert texts == ["Book Author Chapter content"]
assert records[0]["chunk_id"] == 1
assert "bm25_text" not in records[0]
def test_reciprocal_rank_fusion_marks_hybrid_source() -> None:
vector_results = [SearchResult(chunk_id=1, text="a", source_title="A")]
lexical_results = [SearchResult(chunk_id=2, text="b", source_title="B")]
fused = reciprocal_rank_fusion(vector_results, lexical_results)
assert {result.rank_source for result in fused} == {"Hybrid"}
def test_search_response_sums_runtime_steps() -> None:
response = SearchResponse(
query="query",
results=[],
rank_label="Hybrid",
timings=(
RuntimeStep(name="A", duration_ms=1.25),
RuntimeStep(name="B", duration_ms=2.75),
RuntimeStep(name="Parallel detail", duration_ms=10.0, counts_toward_total=False),
),
)
assert response.total_runtime_ms == 4.0
def test_retrieval_query_keeps_entity_and_series_terms() -> None:
assert retrieval_query_from_text("what does Damien Montgomery stand for in starship mage") == (
"damien montgomery stand starship mage"
)
def test_bm25_candidates_scores_whole_corpus(mocker: MockerFixture) -> None:
record = {
"chunk_id": 2,
"text": "high",
"source_title": "B",
"source_author": None,
"chapter_title": None,
"page_label": None,
"bm25_text": "high",
}
manifest = BM25Manifest(created_at=datetime.now(tz=UTC), db_updated_at=None, chunk_count=1)
corpus = BM25Corpus(retriever=object(), records=(record,), manifest=manifest)
captured: dict[str, object] = {}
def fake_score_bm25_corpus(query, saved_corpus, *, limit):
captured["query"] = query
captured["corpus"] = saved_corpus
captured["limit"] = limit
return [(record, 1.5)]
mocker.patch("python.ebook_search.search.load_bm25_corpus", side_effect=lambda _config: corpus)
mocker.patch("python.ebook_search.search.score_bm25_corpus", side_effect=fake_score_bm25_corpus)
config = EbookSearchConfig(rerank=RerankConfig(enabled=False))
results = bm25_candidates("high", config)
assert captured["query"] == "high"
assert captured["corpus"] == corpus
assert captured["limit"] == 120
assert [result.chunk_id for result in results] == [2]
assert [result.bm25_score for result in results] == [1.5]
def test_bm25_candidates_returns_empty_when_corpus_is_unavailable(mocker: MockerFixture, caplog) -> None:
def fake_load_bm25_corpus(_config):
raise BM25CorpusUnavailableError
mocker.patch("python.ebook_search.search.load_bm25_corpus", side_effect=fake_load_bm25_corpus)
config = EbookSearchConfig(rerank=RerankConfig(enabled=False))
with caplog.at_level(logging.WARNING):
results = bm25_candidates("high", config)
assert results == []
assert "ebook_bm25_index_unavailable_skipping" in caplog.text
def test_write_bm25_corpus_publishes_dated_generation(tmp_path) -> None:
index_path = tmp_path / "bm25"
index_path.mkdir()
generations_path = index_path / "generations"
generations_path.mkdir()
old_generation = generations_path / "20260101T000000.000000Z"
old_generation.mkdir()
(old_generation / "sentinel").write_text("old", encoding="utf-8")
(index_path / "current").symlink_to(Path("generations") / old_generation.name, target_is_directory=True)
manifest = BM25Manifest(
created_at=datetime(2026, 6, 12, 1, 2, 3, 456789, tzinfo=UTC),
db_updated_at=None,
chunk_count=0,
)
write_bm25_corpus(index_path, [], [], manifest)
current_path = index_path / "current"
assert current_path.is_symlink()
assert current_path.readlink() == generations_path / "20260612T010203.456789Z"
assert old_generation.is_dir()
assert (old_generation / "sentinel").read_text(encoding="utf-8") == "old"
assert (generations_path / "20260612T010203.456789Z").is_dir()
assert read_bm25_manifest(index_path) == manifest
def test_write_bm25_corpus_keeps_current_generation_when_publish_fails(mocker: MockerFixture, tmp_path) -> None:
index_path = tmp_path / "bm25"
index_path.mkdir()
generations_path = index_path / "generations"
generations_path.mkdir()
old_generation = generations_path / "20260101T000000.000000Z"
old_generation.mkdir()
(old_generation / "sentinel").write_text("old", encoding="utf-8")
current_path = index_path / "current"
current_path.symlink_to(Path("generations") / old_generation.name, target_is_directory=True)
original_replace = Path.replace
def fail_current_replace(self, target):
if self.parent == index_path and self.name.startswith(".current.") and target == current_path:
msg = "current publish failed"
raise OSError(msg)
return original_replace(self, target)
mocker.patch.object(Path, "replace", fail_current_replace)
manifest = BM25Manifest(
created_at=datetime(2026, 6, 12, 1, 2, 3, 456789, tzinfo=UTC),
db_updated_at=None,
chunk_count=0,
)
with pytest.raises(OSError, match="current publish failed"):
write_bm25_corpus(index_path, [], [], manifest)
assert current_path.readlink() == Path("generations") / old_generation.name
assert (old_generation / "sentinel").read_text(encoding="utf-8") == "old"
assert not (generations_path / "20260612T010203.456789Z").exists()
def test_load_bm25_corpus_uses_current_generation(tmp_path) -> None:
load_bm25_corpus.cache_clear()
index_path = tmp_path / "bm25"
manifest = BM25Manifest(
created_at=datetime(2026, 6, 12, 1, 2, 3, 456789, tzinfo=UTC),
db_updated_at=None,
chunk_count=1,
)
record = {
"chunk_id": 2,
"text": "cached",
"source_title": "B",
"source_author": None,
"chapter_title": None,
"page_label": None,
}
write_bm25_corpus(index_path, [record], ["cached phrase"], manifest)
config = EbookSearchConfig(rerank=RerankConfig(enabled=False), bm25_index_dir=str(index_path))
try:
corpus = load_bm25_corpus(config)
finally:
load_bm25_corpus.cache_clear()
assert corpus.manifest == manifest
assert corpus.records[0]["chunk_id"] == 2
assert score_bm25_corpus("cached", corpus, limit=10)
def test_load_bm25_corpus_caches_disk_load(mocker: MockerFixture, tmp_path) -> None:
load_bm25_corpus.cache_clear()
manifest = BM25Manifest(created_at=datetime.now(tz=UTC), db_updated_at=None, chunk_count=1)
record = {
"chunk_id": 2,
"text": "cached",
"source_title": "B",
"source_author": None,
"chapter_title": None,
"page_label": None,
"bm25_text": "cached",
}
load_count = 0
class FakeRetriever:
"""Fake persisted BM25 retriever."""
corpus = (record,)
class FakeBM25:
"""Fake BM25 class with observable load count."""
@staticmethod
def load(index_path, *, load_corpus, mmap):
nonlocal load_count
load_count += 1
assert index_path == tmp_path
assert load_corpus is True
assert mmap is True
return FakeRetriever()
fake_bm25s = ModuleType("bm25s")
fake_bm25s.BM25 = FakeBM25
mocker.patch("python.ebook_search.bm25_corpus.read_bm25_manifest", side_effect=lambda _path: manifest)
mocker.patch("python.ebook_search.bm25_corpus.bm25_index_exists", side_effect=lambda _path, _manifest: True)
mocker.patch("python.ebook_search.bm25_corpus.bm25s", fake_bm25s)
config = EbookSearchConfig(rerank=RerankConfig(enabled=False), bm25_index_dir=str(tmp_path))
try:
first = load_bm25_corpus(config)
second = load_bm25_corpus(config)
finally:
load_bm25_corpus.cache_clear()
assert first is second
assert first is not None
assert first.records == (record,)
assert load_count == 1
def test_load_bm25_corpus_raises_when_index_is_missing(mocker: MockerFixture, tmp_path) -> None:
load_bm25_corpus.cache_clear()
mocker.patch("python.ebook_search.bm25_corpus.read_bm25_manifest", side_effect=lambda _path: None)
mocker.patch("python.ebook_search.bm25_corpus.bm25_index_exists", side_effect=lambda _path, _manifest: False)
config = EbookSearchConfig(rerank=RerankConfig(enabled=False), bm25_index_dir=str(tmp_path))
try:
with pytest.raises(BM25CorpusUnavailableError, match="BM25 corpus is not available"):
load_bm25_corpus(config)
finally:
load_bm25_corpus.cache_clear()
def test_ensure_bm25_corpus_refreshes_missing_index(mocker: MockerFixture) -> None:
refreshed: list[object] = []
db_updated_at = datetime.now(tz=UTC)
mocker.patch("python.ebook_search.bm25_corpus.read_bm25_manifest", side_effect=lambda _path: None)
mocker.patch("python.ebook_search.bm25_corpus.bm25_index_exists", side_effect=lambda _path, _manifest: False)
mocker.patch("python.ebook_search.bm25_corpus.corpus_last_updated_at", side_effect=lambda _session: db_updated_at)
mocker.patch(
"python.ebook_search.bm25_corpus.refresh_bm25_corpus",
side_effect=lambda session, config, *, db_updated_at: refreshed.append((session, config, db_updated_at)),
)
config = EbookSearchConfig(rerank=RerankConfig(enabled=False))
session = object()
ensure_bm25_corpus(session, config)
assert refreshed == [(session, config, db_updated_at)]
def test_ensure_bm25_corpus_refreshes_stale_index(mocker: MockerFixture) -> None:
refreshed: list[object] = []
created_at = datetime(2026, 1, 1, tzinfo=UTC)
db_updated_at = datetime(2026, 1, 2, tzinfo=UTC)
manifest = BM25Manifest(created_at=created_at, db_updated_at=created_at, chunk_count=10)
mocker.patch("python.ebook_search.bm25_corpus.read_bm25_manifest", side_effect=lambda _path: manifest)
mocker.patch("python.ebook_search.bm25_corpus.bm25_index_exists", side_effect=lambda _path, _manifest: True)
mocker.patch("python.ebook_search.bm25_corpus.corpus_last_updated_at", side_effect=lambda _session: db_updated_at)
mocker.patch(
"python.ebook_search.bm25_corpus.refresh_bm25_corpus",
side_effect=lambda session, config, *, db_updated_at: refreshed.append((session, config, db_updated_at)),
)
config = EbookSearchConfig(rerank=RerankConfig(enabled=False))
session = object()
ensure_bm25_corpus(session, config)
assert refreshed == [(session, config, db_updated_at)]
def test_supported_embedding_models_match_service_names() -> None:
assert MODEL_DIMENSIONS == {
"qwen3-embedding-0.6b": 1024,
"qwen3-embedding-4b": 2560,
"qwen3-embedding-8b": 4096,
}
def test_ensure_embedding_models_registers_service_names() -> None:
engine = create_engine("sqlite+pysqlite:///:memory:", future=True)
RichieBase.metadata.create_all(engine)
with sessionmaker(bind=engine, expire_on_commit=False, future=True)() as session:
ensure_embedding_models(session)
session.commit()
models = list(session.scalars(select(EbookEmbeddingModel).order_by(EbookEmbeddingModel.name)))
assert [(model.name, model.dimension) for model in models] == [
("qwen3-embedding-0.6b", 1024),
("qwen3-embedding-4b", 2560),
("qwen3-embedding-8b", 4096),
]
def test_1024_embedding_table_has_cosine_hnsw_index() -> None:
indexes = {index.name: index for index in EbookChunkEmbedding1024.__table__.indexes}
index = indexes["ix_ebook_chunk_embedding_1024_embedding_cosine"]
assert [column.name for column in index.columns] == ["embedding"]
assert index.dialect_options["postgresql"]["using"] == "hnsw"
assert index.dialect_options["postgresql"]["ops"] == {"embedding": "vector_cosine_ops"}
def test_embedding_model_aliases_normalize_to_provider_names(mocker: MockerFixture) -> None:
mocker.patch.dict(environ, {}, clear=False)
assert normalize_embedding_model() == "qwen3-embedding-0.6b"
environ["EBOOK_SEARCH_EMBEDDING_MODEL"] = "qwen3-embedding-0.6b"
assert normalize_embedding_model() == "qwen3-embedding-0.6b"
environ["EBOOK_SEARCH_EMBEDDING_MODEL"] = "Qwen3-Embedding-0.6B"
assert normalize_embedding_model() == "qwen3-embedding-0.6b"
environ["EBOOK_SEARCH_EMBEDDING_MODEL"] = "Qwen/Qwen3-Embedding-4B"
assert normalize_embedding_model() == "qwen3-embedding-4b"
environ["EBOOK_SEARCH_EMBEDDING_MODEL"] = "qwen3-embedding:8b"
assert normalize_embedding_model() == "qwen3-embedding-8b"
environ["EBOOK_SEARCH_EMBEDDING_MODEL"] = "qwen3-embedding-8b"
assert normalize_embedding_model() == "qwen3-embedding-8b"
def test_answer_generation_is_enabled_by_default(mocker: MockerFixture) -> None:
mocker.patch.dict(environ, {}, clear=False)
environ.pop("EBOOK_SEARCH_ANSWER_ENABLED", None)
config = load_config()
assert config.answer_enabled is True
def test_chat_defaults_use_ollama_cloud(mocker: MockerFixture) -> None:
mocker.patch.dict(environ, {}, clear=False)
environ.pop("EBOOK_SEARCH_VLLM_BASE_URL", None)
environ.pop("EBOOK_SEARCH_CHAT_MODEL", None)
config = load_config()
assert config.vllm_base_url == "https://ollama.com/v1"
assert config.chat_model == "deepseek-v4-flash"
def test_chat_api_key_falls_back_to_ollama_api_key(mocker: MockerFixture) -> None:
mocker.patch.dict(environ, {"OLLAMA_API_KEY": "ollama-key"}, clear=False)
environ.pop("EBOOK_SEARCH_VLLM_API_KEY", None)
config = load_config()
assert config.vllm_api_key == "ollama-key"
def test_answer_query_does_not_call_model_when_disabled() -> None:
config = load_config().model_copy(update={"answer_enabled": False})
result = SearchResult(chunk_id=1, text="source text", source_title="Book")
answer = answer_query("question", [result], config)
assert "Answer generation is disabled" in answer
+18
View File
@@ -0,0 +1,18 @@
"""Tests for the shared query/gold set loader."""
from __future__ import annotations
from python.ebook_search.eval.dataset import load_gold_queries
def test_default_query_set_counts() -> None:
queries = load_gold_queries()
answerable = [query for query in queries if query.answerable]
assert len(queries) == 70
assert len(answerable) == 50
assert len(queries) - len(answerable) == 20
assert all(query.query for query in queries)
# Answerable queries carry at least one source; garbage queries carry none.
assert all(query.relevant_sources for query in answerable)
assert all(not query.relevant_sources for query in queries if not query.answerable)
+147
View File
@@ -0,0 +1,147 @@
"""Tests for serve-time output guardrails."""
from __future__ import annotations
from typing import TYPE_CHECKING
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from python.ebook_search.api.main import create_app
from python.ebook_search.config import EbookSearchConfig, RerankConfig
from python.ebook_search.guardrails import is_confident, retrieval_confidence, validate_citations
from python.ebook_search.search import SearchResponse, SearchResult
if TYPE_CHECKING:
from pytest_mock import MockerFixture
def make_results(count, *, vector_score=0.8):
return [
SearchResult(
chunk_id=index,
text=f"source text {index}",
source_title="Book",
score=vector_score,
vector_score=vector_score,
)
for index in range(1, count + 1)
]
def test_validate_citations_partitions_markers() -> None:
report = validate_citations("Supported by [1] and [2].", result_count=3)
assert report.cited == (1, 2)
assert report.invalid == ()
assert report.grounded is True
def test_validate_citations_flags_out_of_range_marker() -> None:
report = validate_citations("As shown in [5].", result_count=2)
assert report.cited == ()
assert report.invalid == (5,)
assert report.grounded is False
def test_validate_citations_uncited_answer_is_not_grounded() -> None:
report = validate_citations("No citations at all.", result_count=2)
assert report.cited == ()
assert report.invalid == ()
assert report.grounded is False
def test_retrieval_confidence_prefers_rerank_then_vector() -> None:
assert retrieval_confidence([]) == 0.0
rerank_top = [SearchResult(chunk_id=1, text="t", source_title="B", rerank_score=0.7, vector_score=0.2)]
assert retrieval_confidence(rerank_top) == 0.7
vector_top = [SearchResult(chunk_id=1, text="t", source_title="B", vector_score=0.5)]
assert retrieval_confidence(vector_top) == 0.5
def test_is_confident_against_threshold() -> None:
config = EbookSearchConfig(rerank=RerankConfig(enabled=False), min_retrieval_confidence=0.5)
assert is_confident(make_results(1, vector_score=0.6), config) is True
assert is_confident(make_results(1, vector_score=0.4), config) is False
def patch_app_runtime(mocker: MockerFixture):
mocker.patch(
"python.ebook_search.api.main.get_postgres_engine",
side_effect=lambda **_kwargs: create_engine("sqlite+pysqlite:///:memory:", future=True),
)
mocker.patch("python.ebook_search.api.main.ensure_bm25_corpus", side_effect=lambda _session, _config: None)
def test_low_confidence_skips_answer_generation(mocker: MockerFixture) -> None:
called = False
def fake_search_ebooks(_engine, query, _config, *, rerank=False):
del rerank
return SearchResponse(query=query, rank_label="Hybrid", results=make_results(1, vector_score=0.05))
def fake_answer_query(_query, _results, _config):
nonlocal called
called = True
return "answer"
config = EbookSearchConfig(
rerank=RerankConfig(enabled=False),
answer_enabled=True,
min_retrieval_confidence=0.5,
)
mocker.patch("python.ebook_search.api.routes.search.search_ebooks", side_effect=fake_search_ebooks)
mocker.patch("python.ebook_search.api.routes.search.answer_query", side_effect=fake_answer_query)
mocker.patch("python.ebook_search.api.main.load_config", side_effect=lambda: config)
patch_app_runtime(mocker)
app = create_app()
with TestClient(app) as client:
response = client.post("/search", data={"query": "q"})
assert response.status_code == 200
assert called is False
assert "Low retrieval confidence" in response.text
def test_invalid_citation_is_flagged(mocker: MockerFixture) -> None:
def fake_search_ebooks(_engine, query, _config, *, rerank=False):
del rerank
return SearchResponse(query=query, rank_label="Hybrid", results=make_results(2, vector_score=0.9))
mocker.patch("python.ebook_search.api.routes.search.search_ebooks", side_effect=fake_search_ebooks)
mocker.patch(
"python.ebook_search.api.routes.search.answer_query",
side_effect=lambda _query, _results, _config: "Per the text [9].",
)
patch_app_runtime(mocker)
app = create_app()
app.state.config = EbookSearchConfig(rerank=RerankConfig(enabled=False), answer_enabled=True)
with TestClient(app) as client:
response = client.post("/search", data={"query": "q"})
assert response.status_code == 200
assert "Invalid citations" in response.text
assert "9" in response.text
def test_grounded_answer_has_no_warning_badge(mocker: MockerFixture) -> None:
def fake_search_ebooks(_engine, query, _config, *, rerank=False):
del rerank
return SearchResponse(query=query, rank_label="Hybrid", results=make_results(2, vector_score=0.9))
mocker.patch("python.ebook_search.api.routes.search.search_ebooks", side_effect=fake_search_ebooks)
mocker.patch(
"python.ebook_search.api.routes.search.answer_query",
side_effect=lambda _query, _results, _config: "Grounded in [1] and [2].",
)
patch_app_runtime(mocker)
app = create_app()
app.state.config = EbookSearchConfig(rerank=RerankConfig(enabled=False), answer_enabled=True)
with TestClient(app) as client:
response = client.post("/search", data={"query": "q"})
assert response.status_code == 200
assert "Unverified" not in response.text
assert "Invalid citations" not in response.text
+122
View File
@@ -0,0 +1,122 @@
"""Tests for EPUB search health and readiness routes."""
from __future__ import annotations
from typing import TYPE_CHECKING
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from python.ebook_search.api.main import create_app
from python.ebook_search.config import EbookSearchConfig, RerankConfig
HEALTH_MODULE = "python.ebook_search.api.routes.health"
if TYPE_CHECKING:
from pytest_mock import MockerFixture
def fake_get_postgres_engine(**_kwargs):
"""Return an in-memory engine for route tests."""
return create_engine("sqlite+pysqlite:///:memory:", future=True)
def patch_app_runtime(mocker: MockerFixture):
mocker.patch("python.ebook_search.api.main.get_postgres_engine", side_effect=fake_get_postgres_engine)
mocker.patch("python.ebook_search.api.main.ensure_bm25_corpus", side_effect=lambda _session, _config: None)
def patch_dependencies(mocker: MockerFixture, *, database=True, embedding=True, chat=True, bm25="ok"):
mocker.patch(f"{HEALTH_MODULE}.check_database", side_effect=lambda _session: database)
mocker.patch(f"{HEALTH_MODULE}.check_embedding_endpoint", side_effect=lambda _config: embedding)
mocker.patch(f"{HEALTH_MODULE}.check_chat_endpoint", side_effect=lambda _config: chat)
mocker.patch(f"{HEALTH_MODULE}.check_bm25_status", side_effect=lambda _config: bm25)
def build_client(mocker: MockerFixture, config=None):
resolved = config or EbookSearchConfig(rerank=RerankConfig(enabled=False))
mocker.patch("python.ebook_search.api.main.load_config", side_effect=lambda: resolved)
patch_app_runtime(mocker)
app = create_app()
return TestClient(app)
def test_health_returns_ok(mocker: MockerFixture) -> None:
with build_client(mocker) as client:
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
def test_ready_all_dependencies_ok(mocker: MockerFixture) -> None:
patch_dependencies(mocker)
with build_client(mocker) as client:
response = client.get("/ready")
assert response.status_code == 200
body = response.json()
assert body["status"] == "ready"
assert body["checks"] == {"database": "ok", "embedding": "ok", "chat": "ok", "bm25": "ok"}
def test_ready_embedding_down_is_degraded(mocker: MockerFixture) -> None:
patch_dependencies(mocker, embedding=False)
with build_client(mocker) as client:
response = client.get("/ready")
assert response.status_code == 200
body = response.json()
assert body["status"] == "degraded"
assert body["checks"]["embedding"] == "fail"
def test_ready_chat_down_is_degraded(mocker: MockerFixture) -> None:
patch_dependencies(mocker, chat=False)
with build_client(mocker) as client:
response = client.get("/ready")
assert response.status_code == 200
body = response.json()
assert body["status"] == "degraded"
assert body["checks"]["chat"] == "fail"
def test_ready_chat_disabled_when_answers_off(mocker: MockerFixture) -> None:
patch_dependencies(mocker)
config = EbookSearchConfig(rerank=RerankConfig(enabled=False), answer_enabled=False)
with build_client(mocker, config) as client:
response = client.get("/ready")
assert response.status_code == 200
body = response.json()
assert body["status"] == "ready"
assert body["checks"]["chat"] == "disabled"
def test_ready_database_down_is_unavailable(mocker: MockerFixture) -> None:
patch_dependencies(mocker, database=False)
with build_client(mocker) as client:
response = client.get("/ready")
assert response.status_code == 503
body = response.json()
assert body["status"] == "unavailable"
assert body["checks"]["database"] == "fail"
def test_ready_bm25_missing_is_degraded(mocker: MockerFixture) -> None:
patch_dependencies(mocker, bm25="missing")
with build_client(mocker) as client:
response = client.get("/ready")
assert response.status_code == 200
body = response.json()
assert body["status"] == "degraded"
assert body["checks"]["bm25"] == "missing"
+89
View File
@@ -0,0 +1,89 @@
"""Tests for EPUB search HTTP model adapters."""
from __future__ import annotations
from typing import TYPE_CHECKING
import httpx
import pytest
from python.ebook_search.answer import answer_query
from python.ebook_search.config import EbookSearchConfig, RerankConfig
from python.ebook_search.embeddings import embed_texts
from python.ebook_search.search import SearchResult
if TYPE_CHECKING:
from pytest_mock import MockerFixture
def test_answer_query_uses_httpx_chat_completions(mocker: MockerFixture) -> None:
captured: dict[str, object] = {}
def fake_post(url: str, **kwargs: object) -> httpx.Response:
captured["url"] = url
captured["kwargs"] = kwargs
return httpx.Response(
200,
json={"choices": [{"message": {"content": "grounded answer"}}]},
request=httpx.Request("POST", url),
)
mocker.patch.object(httpx, "post", side_effect=fake_post)
config = EbookSearchConfig(
rerank=RerankConfig(enabled=False),
vllm_base_url="https://ollama.com/v1",
vllm_api_key="secret",
chat_model="deepseek-v4-flash",
)
answer = answer_query("question", [SearchResult(chunk_id=1, text="source", source_title="Book")], config)
assert answer == "grounded answer"
assert captured["url"] == "https://ollama.com/v1/chat/completions"
kwargs = captured["kwargs"]
assert isinstance(kwargs, dict)
assert kwargs["headers"] == {"Authorization": "Bearer secret"}
payload = kwargs["json"]
assert isinstance(payload, dict)
assert payload["model"] == "deepseek-v4-flash"
def test_embed_texts_uses_httpx_embeddings(mocker: MockerFixture) -> None:
captured: dict[str, object] = {}
vector = [0.0] * 1024
def fake_post(url: str, **kwargs: object) -> httpx.Response:
captured["url"] = url
captured["kwargs"] = kwargs
return httpx.Response(
200,
json={"data": [{"embedding": vector}]},
request=httpx.Request("POST", url),
)
mocker.patch.object(httpx, "post", side_effect=fake_post)
config = EbookSearchConfig(
rerank=RerankConfig(enabled=False),
embedding_base_url="http://bob:8000/v1",
embedding_model="qwen3-embedding-0.6b",
)
embeddings = embed_texts(["hello"], config)
assert embeddings == [vector]
assert captured["url"] == "http://bob:8000/v1/embeddings"
kwargs = captured["kwargs"]
assert isinstance(kwargs, dict)
assert kwargs["headers"] == {}
assert kwargs["json"] == {"model": "qwen3-embedding-0.6b", "input": ["hello"]}
def test_embed_texts_rejects_bad_response_shape(mocker: MockerFixture) -> None:
def fake_post(url: str, **_kwargs: object) -> httpx.Response:
return httpx.Response(200, json={"data": [{}]}, request=httpx.Request("POST", url))
mocker.patch.object(httpx, "post", side_effect=fake_post)
config = EbookSearchConfig(rerank=RerankConfig(enabled=False))
with pytest.raises(RuntimeError, match="Embedding request failed"):
embed_texts(["hello"], config)
+79
View File
@@ -0,0 +1,79 @@
"""Tests for the load-test runner and its statistics helpers."""
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
import pytest
from python.ebook_search.loadtest import RequestResult, load_queries, percentile, run_load, summarize
if TYPE_CHECKING:
from pytest_mock import MockerFixture
def test_load_queries_reads_shared_set() -> None:
queries = load_queries(None)
assert len(queries) == 70
assert all(isinstance(query, str) and query for query in queries)
def test_percentile_interpolates() -> None:
values = [10.0, 20.0, 30.0, 40.0]
assert percentile(values, 50) == pytest.approx(25.0)
assert percentile(values, 90) == pytest.approx(37.0)
assert percentile(values, 0) == 10.0
assert percentile(values, 100) == 40.0
assert percentile([], 95) == 0.0
def test_summarize_counts_and_throughput() -> None:
results = [
RequestResult(status_code=200, latency_ms=10.0, ok=True),
RequestResult(status_code=200, latency_ms=20.0, ok=True),
RequestResult(status_code=200, latency_ms=30.0, ok=True),
RequestResult(status_code=500, latency_ms=40.0, ok=False),
]
summary = summarize(results, wall_seconds=2.0)
assert summary.total == 4
assert summary.successes == 3
assert summary.failures == 1
assert summary.throughput_rps == pytest.approx(2.0)
assert summary.latency_max_ms == 40.0
assert summary.status_counts == {200: 3, 500: 1}
def test_summarize_handles_empty() -> None:
summary = summarize([], wall_seconds=0.0)
assert summary.total == 0
assert summary.throughput_rps == 0.0
assert summary.latency_p95_ms == 0.0
def test_run_load_aggregates_mocked_responses(mocker: MockerFixture) -> None:
response = mocker.Mock(status_code=200, is_success=True)
client = mocker.MagicMock()
client.__aenter__.return_value = client
client.post = mocker.AsyncMock(return_value=response)
mocker.patch("python.ebook_search.loadtest.httpx.AsyncClient", return_value=client)
summary = asyncio.run(
run_load(
base_url="http://test",
queries=["q1", "q2"],
request_count=4,
concurrency=2,
rerank=False,
warmup=1,
timeout_seconds=1.0,
)
)
assert summary.total == 4
assert summary.successes == 4
assert summary.failures == 0
assert summary.status_counts == {200: 4}
# 1 warmup request (not measured) plus 4 measured requests.
assert client.post.await_count == 5
+50
View File
@@ -0,0 +1,50 @@
"""Tests for the ebook search RAG pipeline orchestration."""
from __future__ import annotations
from threading import Event
from typing import TYPE_CHECKING
from sqlalchemy import create_engine
from python.ebook_search.config import EbookSearchConfig, RerankConfig
from python.ebook_search.search import SearchResult, search_ebooks
if TYPE_CHECKING:
from pytest_mock import MockerFixture
def test_search_ebooks_runs_vector_and_bm25_in_parallel(mocker: MockerFixture) -> None:
engine = create_engine("sqlite+pysqlite:///:memory:", future=True)
vector_started = Event()
bm25_started = Event()
received_engines: list[object] = []
def fake_vector_candidates(received_engine, query, _config):
"""Return vector candidates after confirming BM25 has started."""
received_engines.append(received_engine)
assert query == "what is parallel"
vector_started.set()
assert bm25_started.wait(timeout=2)
return [SearchResult(chunk_id=1, text="vector", source_title="Vector", vector_score=0.9)]
def fake_bm25_candidates(query, _config):
"""Return BM25 candidates after confirming vector search has started."""
assert query == "parallel"
bm25_started.set()
assert vector_started.wait(timeout=2)
return [SearchResult(chunk_id=2, text="bm25", source_title="BM25", bm25_score=2.0)]
mocker.patch("python.ebook_search.search.vector_candidates", side_effect=fake_vector_candidates)
mocker.patch("python.ebook_search.search.bm25_candidates", side_effect=fake_bm25_candidates)
config = EbookSearchConfig(rerank=RerankConfig(enabled=False))
response = search_ebooks(engine, "what is parallel", config)
timings = {step.name: step for step in response.timings}
assert [result.chunk_id for result in response.results] == [1, 2]
assert timings["Embedding + vector search"].counts_toward_total is False
assert timings["BM25 search"].counts_toward_total is False
assert timings["Hybrid retrieval"].counts_toward_total is True
assert timings["BM25 query preparation"].counts_toward_total is True
assert received_engines == [engine]
+157
View File
@@ -0,0 +1,157 @@
"""Tests for EPUB search reranking."""
from __future__ import annotations
from os import environ
from typing import TYPE_CHECKING
import httpx
import pytest
from python.ebook_search.config import EbookSearchConfig, RerankConfig, load_rerank_config
from python.ebook_search.rerank import rerank_chunks
from python.ebook_search.search import SearchResult, apply_rerank, skip_rerank
if TYPE_CHECKING:
from pytest_mock import MockerFixture
def candidates() -> list[SearchResult]:
return [
SearchResult(chunk_id=1, text="alpha", source_title="A", score=0.9),
SearchResult(chunk_id=2, text="beta", source_title="B", score=0.8),
SearchResult(chunk_id=3, text="gamma", source_title="C", score=0.7),
]
def rerank_response(payload: dict[str, object] | None = None, *, content: bytes | None = None) -> httpx.Response:
return httpx.Response(
200,
content=content,
json=payload,
request=httpx.Request("POST", "http://rerank.test/rerank"),
)
def test_config_defaults_keep_reranking_optional(mocker: MockerFixture) -> None:
mocker.patch.dict(environ, {}, clear=False)
environ.pop("EBOOK_SEARCH_RERANK_ENABLED", None)
environ.pop("EBOOK_SEARCH_RERANK_BASE_URL", None)
environ.pop("EBOOK_SEARCH_RERANK_MODEL", None)
environ.pop("EBOOK_SEARCH_RERANK_CANDIDATES", None)
environ.pop("EBOOK_SEARCH_RERANK_TIMEOUT_SECONDS", None)
config = load_rerank_config()
assert config.enabled is False
assert config.base_url == "http://192.168.90.25:8001"
assert config.model == "qwen3-reranker-06b"
assert config.candidates == 24
assert config.timeout_seconds == 30
def test_reranking_disabled_returns_original_fused_order() -> None:
config = EbookSearchConfig(rerank=RerankConfig(enabled=False), top_k=2)
response = skip_rerank("query", candidates(), config)
assert response.rank_label == "Hybrid"
assert [result.chunk_id for result in response.results] == [1, 2]
def test_reranking_enabled_reorders_candidates(mocker: MockerFixture) -> None:
def fake_post(_url: str, *, json: dict[str, object], timeout: float) -> httpx.Response:
assert timeout == 30
assert json == {
"model": "qwen3-reranker-06b",
"query": "query",
"documents": ["alpha", "beta", "gamma"],
}
return rerank_response(
{
"results": [
{"index": 0, "relevance_score": 0.1},
{"index": 1, "relevance_score": 0.9},
{"index": 2, "relevance_score": 0.4},
]
}
)
mocker.patch.object(httpx, "post", side_effect=fake_post)
results = rerank_chunks("query", candidates(), RerankConfig())
assert [result.chunk_id for result in results] == [2, 1, 3]
assert [round(result.score, 3) for result in results] == [0.78, 0.37, 0.28]
assert [result.rerank_score for result in results] == [0.9, 0.1, 0.4]
def test_reranking_cannot_ignore_hybrid_score(mocker: MockerFixture) -> None:
candidates = [
SearchResult(chunk_id=1, text="strong hybrid", source_title="A", score=1.0),
SearchResult(chunk_id=2, text="weak hybrid", source_title="B", score=0.1),
]
def fake_post(_url: str, **_kwargs: object) -> httpx.Response:
return rerank_response(
{
"results": [
{"index": 0, "relevance_score": 0.7},
{"index": 1, "relevance_score": 1.0},
]
}
)
mocker.patch.object(httpx, "post", side_effect=fake_post)
results = rerank_chunks("query", candidates, RerankConfig())
assert [result.chunk_id for result in results] == [1, 2]
assert results[0].score == pytest.approx(0.79)
assert results[1].score == 0.7
assert results[1].rerank_score == 1.0
def test_vllm_rerank_timeout_raises(mocker: MockerFixture) -> None:
def fake_rerank_chunks(
_query: str,
_candidates: list[SearchResult],
_config: RerankConfig,
) -> list[SearchResult]:
message = "timeout"
raise httpx.TimeoutException(message)
mocker.patch("python.ebook_search.search.rerank_chunks", side_effect=fake_rerank_chunks)
config = EbookSearchConfig(rerank=RerankConfig(enabled=True), top_k=2)
with pytest.raises(httpx.TimeoutException, match="timeout"):
apply_rerank("query", candidates(), config)
def test_malformed_vllm_rerank_json_does_not_crash_search(mocker: MockerFixture) -> None:
def fake_post(_url: str, **_kwargs: object) -> httpx.Response:
return rerank_response(content=b"not-json")
mocker.patch.object(httpx, "post", side_effect=fake_post)
results = rerank_chunks("query", candidates()[:1], RerankConfig())
assert results[0].score == 0.3
def test_vllm_rerank_scores_are_clamped(mocker: MockerFixture) -> None:
def fake_post(_url: str, **_kwargs: object) -> httpx.Response:
return rerank_response(
{
"results": [
{"index": 0, "relevance_score": -1},
{"index": 1, "relevance_score": 2},
]
}
)
mocker.patch.object(httpx, "post", side_effect=fake_post)
results = rerank_chunks("query", candidates()[:2], RerankConfig())
assert {result.chunk_id: result.rerank_score for result in results} == {1: 0.0, 2: 1.0}
+312
View File
@@ -0,0 +1,312 @@
"""Tests for EPUB search HTMX routes."""
from __future__ import annotations
from compression import zstd
from typing import TYPE_CHECKING
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from python.ebook_search.api.bm25_tasks import refresh_bm25_for_engine
from python.ebook_search.api.main import create_app
from python.ebook_search.config import EbookSearchConfig, RerankConfig
from python.ebook_search.embeddings import EmbeddingModelStats
from python.ebook_search.search import SearchResponse, SearchResult
from python.ebook_search.timing import RuntimeStep
if TYPE_CHECKING:
from pytest_mock import MockerFixture
def patch_app_runtime(mocker: MockerFixture):
"""Patch app startup dependencies used by UI route tests."""
mocker.patch("python.ebook_search.api.main.get_postgres_engine", side_effect=fake_get_postgres_engine)
mocker.patch("python.ebook_search.api.main.ensure_bm25_corpus", side_effect=lambda _session, _config: None)
def fake_get_postgres_engine(**_kwargs):
"""Return an in-memory engine for route tests."""
return create_engine("sqlite+pysqlite:///:memory:", future=True)
def test_search_page_uses_zstd_when_requested(mocker: MockerFixture) -> None:
patch_app_runtime(mocker)
app = create_app()
app.state.config = EbookSearchConfig(rerank=RerankConfig(enabled=False))
with TestClient(app) as client:
response = client.get("/", headers={"accept-encoding": "zstd"})
assert response.status_code == 200
assert response.headers["content-encoding"] == "zstd"
assert b"EPUB Search" in zstd.decompress(response.content)
def test_ui_form_passes_rerank_flag_to_search_handler(mocker: MockerFixture) -> None:
captured: dict[str, object] = {}
def fake_search_ebooks(_engine, query, config, *, rerank=False):
captured["query"] = query
captured["rerank"] = rerank
captured["config"] = config
return SearchResponse(query=query, results=[], rank_label="Hybrid + rerank")
mocker.patch("python.ebook_search.api.routes.search.search_ebooks", side_effect=fake_search_ebooks)
mocker.patch(
"python.ebook_search.api.routes.search.answer_query",
side_effect=lambda _query, _results, _config: "answer",
)
patch_app_runtime(mocker)
app = create_app()
app.state.config = EbookSearchConfig(rerank=RerankConfig(enabled=False), top_k=12, answer_enabled=True)
with TestClient(app) as client:
response = client.post("/search", data={"query": "where is the quote?", "rerank": "true"})
assert response.status_code == 200
assert "Hybrid + rerank" in response.text
assert captured["query"] == "where is the quote?"
assert captured["rerank"] is True
def test_ui_search_failure_returns_visible_error(mocker: MockerFixture) -> None:
def fake_search_ebooks(_engine, _query, _config, *, rerank=False):
del rerank
msg = "search exploded"
raise RuntimeError(msg)
mocker.patch("python.ebook_search.api.routes.search.search_ebooks", side_effect=fake_search_ebooks)
patch_app_runtime(mocker)
app = create_app()
app.state.config = EbookSearchConfig(rerank=RerankConfig(enabled=False), top_k=12)
with TestClient(app) as client:
response = client.post("/search", data={"query": "where is the quote?"})
assert response.status_code == 500
assert "search exploded" in response.text
def test_ui_answer_failure_still_returns_sources(mocker: MockerFixture) -> None:
def fake_search_ebooks(_engine, query, _config, *, rerank=False):
del rerank
return SearchResponse(query=query, results=[], rank_label="Hybrid")
def fake_answer_query(_query, _results, _config):
msg = "answer exploded"
raise RuntimeError(msg)
mocker.patch("python.ebook_search.api.routes.search.search_ebooks", side_effect=fake_search_ebooks)
mocker.patch("python.ebook_search.api.routes.search.answer_query", side_effect=fake_answer_query)
patch_app_runtime(mocker)
app = create_app()
app.state.config = EbookSearchConfig(rerank=RerankConfig(enabled=False), top_k=12, answer_enabled=True)
with TestClient(app) as client:
response = client.post("/search", data={"query": "where is the quote?"})
assert response.status_code == 200
assert "Answer generation failed" in response.text
def test_ui_skips_answer_when_disabled(mocker: MockerFixture) -> None:
called = False
def fake_search_ebooks(_engine, query, _config, *, rerank=False):
del rerank
return SearchResponse(query=query, results=[], rank_label="Hybrid")
def fake_answer_query(_query, _results, _config):
nonlocal called
called = True
return "answer"
config = EbookSearchConfig(rerank=RerankConfig(enabled=False), answer_enabled=False)
mocker.patch("python.ebook_search.api.routes.search.search_ebooks", side_effect=fake_search_ebooks)
mocker.patch("python.ebook_search.api.routes.search.answer_query", side_effect=fake_answer_query)
mocker.patch("python.ebook_search.api.main.load_config", side_effect=lambda: config)
patch_app_runtime(mocker)
app = create_app()
with TestClient(app) as client:
response = client.post("/search", data={"query": "where is the quote?"})
assert response.status_code == 200
assert called is False
assert "Answer generation is disabled" in response.text
def test_ui_shows_component_scores(mocker: MockerFixture) -> None:
def fake_search_ebooks(_engine, query, _config, *, rerank=False):
del rerank
return SearchResponse(
query=query,
rank_label="Hybrid + rerank",
results=[
SearchResult(
chunk_id=1,
text="source text",
source_title="Book",
score=0.9,
rerank_score=0.9,
vector_score=0.8,
bm25_score=2.5,
fused_score=0.03,
)
],
)
mocker.patch("python.ebook_search.api.routes.search.search_ebooks", side_effect=fake_search_ebooks)
mocker.patch(
"python.ebook_search.api.routes.search.answer_query",
side_effect=lambda _query, _results, _config: "answer",
)
patch_app_runtime(mocker)
app = create_app()
app.state.config = EbookSearchConfig(rerank=RerankConfig(enabled=False), answer_enabled=True)
with TestClient(app) as client:
response = client.post("/search", data={"query": "where is the quote?"})
assert response.status_code == 200
assert "rerank" in response.text
assert "vector cosine" in response.text
assert "BM25" in response.text
assert "RRF" in response.text
def test_ui_shows_search_runtime_chart(mocker: MockerFixture) -> None:
def fake_search_ebooks(_engine, query, _config, *, rerank=False):
del rerank
return SearchResponse(
query=query,
rank_label="Hybrid",
results=[],
timings=(
RuntimeStep(name="Embedding + vector search", duration_ms=12.5),
RuntimeStep(name="BM25 search", duration_ms=4.0),
),
)
mocker.patch("python.ebook_search.api.routes.search.search_ebooks", side_effect=fake_search_ebooks)
mocker.patch(
"python.ebook_search.api.routes.search.answer_query",
side_effect=lambda _query, _results, _config: "answer",
)
patch_app_runtime(mocker)
app = create_app()
app.state.config = EbookSearchConfig(rerank=RerankConfig(enabled=False), answer_enabled=True)
with TestClient(app) as client:
response = client.post("/search", data={"query": "where is the quote?"})
assert response.status_code == 200
assert "Runtime" in response.text
assert "Total" in response.text
assert "Embedding + vector search" in response.text
assert "BM25 search" in response.text
assert "Answer generation" in response.text
assert "ms left" in response.text
def test_ui_embed_all_batches_until_complete(mocker: MockerFixture) -> None:
counts = iter([32, 32, 5, 0])
batch_sizes: list[int] = []
def fake_embed_missing_chunks(_session, config):
batch_sizes.append(config.embedding_batch_size)
return next(counts)
mocker.patch("python.ebook_search.api.routes.admin.embed_missing_chunks", side_effect=fake_embed_missing_chunks)
patch_app_runtime(mocker)
app = create_app()
with TestClient(app) as client:
response = client.post("/admin/embed-all")
assert response.status_code == 200
assert "Embedded 69 chunks in 3 batches of 32" in response.text
assert batch_sizes == [32, 32, 32, 32]
def test_ui_scan_schedules_bm25_refresh_after_database_change(mocker: MockerFixture) -> None:
scheduled = False
def fake_ingest_configured_paths(_session, _config):
return 1
def fake_schedule_bm25_refresh(_app):
nonlocal scheduled
scheduled = True
mocker.patch(
"python.ebook_search.api.routes.admin.ingest_configured_paths",
side_effect=fake_ingest_configured_paths,
)
mocker.patch("python.ebook_search.api.routes.admin.schedule_bm25_refresh", side_effect=fake_schedule_bm25_refresh)
patch_app_runtime(mocker)
app = create_app()
with TestClient(app) as client:
response = client.post("/admin/scan")
assert response.status_code == 200
assert "Indexed 1 EPUBs" in response.text
assert scheduled is True
def test_bm25_refresh_clears_loaded_corpus_cache(mocker: MockerFixture) -> None:
refreshed: list[object] = []
cache_cleared = False
def fake_refresh_bm25_corpus(session, config):
refreshed.append((session, config))
def fake_cache_clear():
nonlocal cache_cleared
cache_cleared = True
mocker.patch("python.ebook_search.api.bm25_tasks.refresh_bm25_corpus", side_effect=fake_refresh_bm25_corpus)
mocker.patch("python.ebook_search.api.bm25_tasks.load_bm25_corpus.cache_clear", side_effect=fake_cache_clear)
engine = create_engine("sqlite+pysqlite:///:memory:", future=True)
config = EbookSearchConfig(rerank=RerankConfig(enabled=False))
refresh_bm25_for_engine(engine, config)
assert len(refreshed) == 1
assert refreshed[0][1] == config
assert cache_cleared is True
def test_admin_page_shows_embedding_counts_by_model(mocker: MockerFixture) -> None:
def fake_embedding_model_stats(_session):
return [
EmbeddingModelStats(
model_name="qwen3-embedding-0.6b",
dimension=1024,
embedded_chunks=40,
total_chunks=64,
),
EmbeddingModelStats(
model_name="qwen3-embedding-4b",
dimension=2560,
embedded_chunks=8,
total_chunks=64,
),
]
mocker.patch("python.ebook_search.api.routes.admin.embedding_model_stats", side_effect=fake_embedding_model_stats)
patch_app_runtime(mocker)
app = create_app()
with TestClient(app) as client:
response = client.get("/admin")
assert response.status_code == 200
assert "qwen3-embedding-0.6b" in response.text
assert "1024" in response.text
assert "40" in response.text
assert "24" in response.text
assert "qwen3-embedding-4b" in response.text
assert "2560" in response.text
+6 -3
View File
@@ -21,7 +21,7 @@ def test_validate_system(mocker: MockerFixture, fs: FakeFilesystem) -> None:
"""test_validate_system."""
fs.create_file(
"/mock_snapshot_config.toml",
contents='zpool = ["root_pool", "storage", "media"]\nservices = ["docker"]\n',
contents='zpools = ["root_pool", "storage", "media"]\nservices = ["docker"]\n',
)
mocker.patch(f"{VALIDATE_SYSTEM}.systemd_tests", return_value=None)
@@ -33,9 +33,10 @@ def test_validate_system_errors(mocker: MockerFixture, fs: FakeFilesystem) -> No
"""test_validate_system_errors."""
fs.create_file(
"/mock_snapshot_config.toml",
contents='zpool = ["root_pool", "storage", "media"]\nservices = ["docker"]\n',
contents='zpools = ["root_pool", "storage", "media"]\nservices = ["docker"]\n',
)
mocker.patch(f"{VALIDATE_SYSTEM}.signal_alert")
mocker.patch(f"{VALIDATE_SYSTEM}.systemd_tests", return_value=["systemd_tests error"])
mocker.patch(f"{VALIDATE_SYSTEM}.zpool_tests", return_value=["zpool_tests error"])
@@ -49,9 +50,11 @@ def test_validate_system_execution(mocker: MockerFixture, fs: FakeFilesystem) ->
"""test_validate_system_execution."""
fs.create_file(
"/mock_snapshot_config.toml",
contents='zpool = ["root_pool", "storage", "media"]\nservices = ["docker"]\n',
contents='zpools = ["root_pool", "storage", "media"]\nservices = ["docker"]\n',
)
mocker.patch(f"{VALIDATE_SYSTEM}.signal_alert")
mocker.patch(f"{VALIDATE_SYSTEM}.systemd_tests", return_value=None)
mocker.patch(f"{VALIDATE_SYSTEM}.zpool_tests", side_effect=RuntimeError("zpool_tests error"))
with pytest.raises(SystemExit) as exception_info:
@@ -80,8 +80,10 @@
"fastapi",
"Michal",
"Nornsight",
"pydantic",
"sandboxing",
"syncthing",
"vllm",
],
// nix