built rag search setup
This commit is contained in:
@@ -0,0 +1,371 @@
|
|||||||
|
"""Hybrid search orchestration."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
from dataclasses import dataclass, replace
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from pgvector.sqlalchemy import Vector
|
||||||
|
from sqlalchemy import literal, select
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
|
from python.ebook_search.bm25_corpus import (
|
||||||
|
load_bm25_corpus,
|
||||||
|
score_bm25_corpus,
|
||||||
|
)
|
||||||
|
from python.ebook_search.embeddings import MODEL_DIMENSIONS, embed_query, get_embedding_table
|
||||||
|
from python.ebook_search.rerank import rerank_chunks
|
||||||
|
from python.ebook_search.timing import RuntimeStep, timed_result
|
||||||
|
from python.orm.richie import (
|
||||||
|
EbookChapter,
|
||||||
|
EbookChunk,
|
||||||
|
EbookEmbeddingModel,
|
||||||
|
EbookSource,
|
||||||
|
)
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from collections.abc import Mapping
|
||||||
|
|
||||||
|
from sqlalchemy.engine import Engine
|
||||||
|
|
||||||
|
from python.ebook_search.config import EbookSearchConfig
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
BM25_CANDIDATE_LIMIT = 120
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class SearchResult:
|
||||||
|
"""One source chunk returned by search."""
|
||||||
|
|
||||||
|
chunk_id: int
|
||||||
|
text: str
|
||||||
|
source_title: str
|
||||||
|
score: float = 0.0
|
||||||
|
vector_score: float | None = None
|
||||||
|
bm25_score: float | None = None
|
||||||
|
fused_score: float | None = None
|
||||||
|
rerank_score: float | None = None
|
||||||
|
source_author: str | None = None
|
||||||
|
chapter_title: str | None = None
|
||||||
|
page_label: str | None = None
|
||||||
|
rank_source: str = "Hybrid"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class SearchResponse:
|
||||||
|
"""Search output for the UI."""
|
||||||
|
|
||||||
|
query: str
|
||||||
|
results: list[SearchResult]
|
||||||
|
rank_label: str
|
||||||
|
timings: tuple[RuntimeStep, ...] = ()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def total_runtime_ms(self) -> float:
|
||||||
|
"""Return total measured runtime for the response."""
|
||||||
|
return sum(step.duration_ms for step in self.timings if step.counts_toward_total)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class RetrievalResponse:
|
||||||
|
"""Parallel retrieval output for vector and BM25 candidates."""
|
||||||
|
|
||||||
|
vector_results: list[SearchResult]
|
||||||
|
lexical_results: list[SearchResult]
|
||||||
|
timings: tuple[RuntimeStep, ...]
|
||||||
|
|
||||||
|
|
||||||
|
def search_ebooks(
|
||||||
|
engine: Engine,
|
||||||
|
query: str,
|
||||||
|
config: EbookSearchConfig,
|
||||||
|
*,
|
||||||
|
rerank: bool = False,
|
||||||
|
) -> SearchResponse:
|
||||||
|
"""Run hybrid vector/BM25 search and optional reranking."""
|
||||||
|
if not query.strip():
|
||||||
|
logger.info("ebook_search_empty_query")
|
||||||
|
return SearchResponse(query=query, results=[], rank_label="Hybrid")
|
||||||
|
|
||||||
|
logger.info("ebook_search_start query_length=%s rerank=%s", len(query), rerank)
|
||||||
|
timings: list[RuntimeStep] = []
|
||||||
|
retrieval_query, timing = timed_result("Query preparation", retrieval_query_from_text, query)
|
||||||
|
timings.append(timing)
|
||||||
|
retrieval, timing = timed_result(
|
||||||
|
"Hybrid retrieval",
|
||||||
|
parallel_retrieval,
|
||||||
|
engine,
|
||||||
|
retrieval_query,
|
||||||
|
config,
|
||||||
|
)
|
||||||
|
timings.extend(retrieval.timings)
|
||||||
|
timings.append(timing)
|
||||||
|
fused, timing = timed_result(
|
||||||
|
"Reciprocal rank fusion",
|
||||||
|
reciprocal_rank_fusion,
|
||||||
|
retrieval.vector_results,
|
||||||
|
retrieval.lexical_results,
|
||||||
|
)
|
||||||
|
timings.append(timing)
|
||||||
|
if config.rerank.enabled and rerank:
|
||||||
|
response, timing = timed_result("Rerank", apply_rerank, query, fused, config)
|
||||||
|
else:
|
||||||
|
response, timing = timed_result("Rerank skipped", skip_rerank, query, fused, config)
|
||||||
|
timings.append(timing)
|
||||||
|
response = replace(response, timings=tuple(timings))
|
||||||
|
logger.info(
|
||||||
|
"ebook_search_complete vector_candidates=%s lexical_candidates=%s "
|
||||||
|
"fused_candidates=%s returned=%s rank_label=%s runtime_ms=%.1f",
|
||||||
|
len(retrieval.vector_results),
|
||||||
|
len(retrieval.lexical_results),
|
||||||
|
len(fused),
|
||||||
|
len(response.results),
|
||||||
|
response.rank_label,
|
||||||
|
response.total_runtime_ms,
|
||||||
|
)
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
def parallel_retrieval(engine: Engine, query: str, config: EbookSearchConfig) -> RetrievalResponse:
|
||||||
|
"""Run vector and BM25 candidate retrieval concurrently with separate database sessions."""
|
||||||
|
with ThreadPoolExecutor(max_workers=2, thread_name_prefix="ebook-search") as executor:
|
||||||
|
vector_future = executor.submit(
|
||||||
|
timed_result,
|
||||||
|
"Embedding + vector search",
|
||||||
|
vector_candidates,
|
||||||
|
engine,
|
||||||
|
query,
|
||||||
|
config,
|
||||||
|
)
|
||||||
|
bm25_future = executor.submit(
|
||||||
|
timed_result,
|
||||||
|
"BM25 search",
|
||||||
|
bm25_candidates,
|
||||||
|
query,
|
||||||
|
config,
|
||||||
|
)
|
||||||
|
vector_results, vector_timing = vector_future.result()
|
||||||
|
lexical_results, lexical_timing = bm25_future.result()
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"ebook_parallel_retrieval_complete vector_candidates=%s lexical_candidates=%s",
|
||||||
|
len(vector_results),
|
||||||
|
len(lexical_results),
|
||||||
|
)
|
||||||
|
return RetrievalResponse(
|
||||||
|
vector_results=vector_results,
|
||||||
|
lexical_results=lexical_results,
|
||||||
|
timings=(
|
||||||
|
replace(vector_timing, counts_toward_total=False),
|
||||||
|
replace(lexical_timing, counts_toward_total=False),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def skip_rerank(
|
||||||
|
query: str,
|
||||||
|
candidates: list[SearchResult],
|
||||||
|
config: EbookSearchConfig,
|
||||||
|
) -> SearchResponse:
|
||||||
|
"""Return fused hybrid results without reranking."""
|
||||||
|
logger.info("ebook_rerank_skipped candidates=%s", len(candidates))
|
||||||
|
return SearchResponse(query=query, results=candidates[: config.top_k], rank_label="Hybrid")
|
||||||
|
|
||||||
|
|
||||||
|
def apply_rerank(
|
||||||
|
query: str,
|
||||||
|
candidates: list[SearchResult],
|
||||||
|
config: EbookSearchConfig,
|
||||||
|
) -> SearchResponse:
|
||||||
|
"""Rerank already-fused hybrid candidates."""
|
||||||
|
reranked = rerank_chunks(query, candidates[: config.rerank.candidates], config.rerank)
|
||||||
|
logger.info(
|
||||||
|
"ebook_rerank_complete input_candidates=%s returned=%s",
|
||||||
|
min(len(candidates), config.rerank.candidates),
|
||||||
|
len(reranked),
|
||||||
|
)
|
||||||
|
return SearchResponse(
|
||||||
|
query=query,
|
||||||
|
results=[replace(result, rank_source="Hybrid + rerank") for result in reranked[: config.top_k]],
|
||||||
|
rank_label="Hybrid + rerank",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def vector_candidates(engine: Engine, query: str, config: EbookSearchConfig) -> list[SearchResult]:
|
||||||
|
"""Return pgvector cosine candidates for a normalized query."""
|
||||||
|
with Session(engine) as session:
|
||||||
|
model = session.scalar(select(EbookEmbeddingModel).where(EbookEmbeddingModel.name == config.embedding_model))
|
||||||
|
if model is None:
|
||||||
|
msg = f"Embedding model is not registered: {config.embedding_model}"
|
||||||
|
raise ValueError(msg)
|
||||||
|
|
||||||
|
expected_dimension = MODEL_DIMENSIONS[config.embedding_model]
|
||||||
|
if model.dimension != expected_dimension:
|
||||||
|
msg = f"Model row dimension {model.dimension} does not match configured dimension {expected_dimension}"
|
||||||
|
raise ValueError(msg)
|
||||||
|
|
||||||
|
embedding = embed_query(query, config)
|
||||||
|
limit = max(config.rerank.candidates, config.top_k) * 4
|
||||||
|
embedding_table = get_embedding_table(model.dimension)
|
||||||
|
|
||||||
|
embedding_param = literal(embedding, type_=Vector(model.dimension))
|
||||||
|
distance = embedding_table.embedding.op("<=>")(embedding_param)
|
||||||
|
score = (literal(1.0) - distance).label("score")
|
||||||
|
statement = (
|
||||||
|
select(
|
||||||
|
EbookChunk.id.label("chunk_id"),
|
||||||
|
EbookChunk.text.label("text"),
|
||||||
|
EbookSource.title.label("source_title"),
|
||||||
|
EbookSource.author.label("source_author"),
|
||||||
|
EbookChapter.title.label("chapter_title"),
|
||||||
|
EbookChunk.page_label.label("page_label"),
|
||||||
|
score,
|
||||||
|
)
|
||||||
|
.select_from(embedding_table)
|
||||||
|
.join(EbookChunk, EbookChunk.id == embedding_table.chunk_id)
|
||||||
|
.join(EbookSource, EbookSource.id == EbookChunk.source_id)
|
||||||
|
.outerjoin(EbookChapter, EbookChapter.id == EbookChunk.chapter_id)
|
||||||
|
.where(embedding_table.model_id == model.id)
|
||||||
|
.order_by(distance)
|
||||||
|
.limit(limit)
|
||||||
|
)
|
||||||
|
rows = session.execute(statement).mappings()
|
||||||
|
results = [search_result_from_row(row) for row in rows]
|
||||||
|
logger.info(
|
||||||
|
"ebook_vector_search_complete model=%s dimension=%s candidates=%s",
|
||||||
|
config.embedding_model,
|
||||||
|
model.dimension,
|
||||||
|
len(results),
|
||||||
|
)
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
def bm25_candidates(query: str, config: EbookSearchConfig) -> list[SearchResult]:
|
||||||
|
"""Return BM25-ranked lexical candidates using the persisted corpus."""
|
||||||
|
corpus = load_bm25_corpus(config)
|
||||||
|
if not corpus.records:
|
||||||
|
logger.info("ebook_bm25_search_complete corpus=0 candidates=0")
|
||||||
|
return []
|
||||||
|
|
||||||
|
scored_records = score_bm25_corpus(query, corpus, limit=BM25_CANDIDATE_LIMIT)
|
||||||
|
results = [
|
||||||
|
replace(search_result_from_row(record), score=score, vector_score=None, bm25_score=score)
|
||||||
|
for record, score in scored_records
|
||||||
|
]
|
||||||
|
|
||||||
|
max_score = results[0].bm25_score if results else 0.0
|
||||||
|
logger.info(
|
||||||
|
"ebook_bm25_search_complete corpus=%s candidates=%s max_score=%.6f",
|
||||||
|
len(corpus.records),
|
||||||
|
len(results),
|
||||||
|
max_score,
|
||||||
|
)
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
def reciprocal_rank_fusion(
|
||||||
|
vector_results: list[SearchResult],
|
||||||
|
lexical_results: list[SearchResult],
|
||||||
|
*,
|
||||||
|
rank_constant: int = 60,
|
||||||
|
) -> list[SearchResult]:
|
||||||
|
"""Fuse vector and lexical rankings with Reciprocal Rank Fusion."""
|
||||||
|
by_chunk: dict[int, SearchResult] = {}
|
||||||
|
scores: dict[int, float] = {}
|
||||||
|
vector_scores: dict[int, float] = {}
|
||||||
|
bm25_scores: dict[int, float] = {}
|
||||||
|
|
||||||
|
for rank, result in enumerate(vector_results, start=1):
|
||||||
|
by_chunk.setdefault(result.chunk_id, result)
|
||||||
|
vector_scores[result.chunk_id] = result.vector_score if result.vector_score is not None else result.score
|
||||||
|
scores[result.chunk_id] = scores.get(result.chunk_id, 0.0) + (1 / (rank_constant + rank))
|
||||||
|
|
||||||
|
for rank, result in enumerate(lexical_results, start=1):
|
||||||
|
by_chunk.setdefault(result.chunk_id, result)
|
||||||
|
bm25_scores[result.chunk_id] = result.bm25_score if result.bm25_score is not None else result.score
|
||||||
|
scores[result.chunk_id] = scores.get(result.chunk_id, 0.0) + (1 / (rank_constant + rank))
|
||||||
|
|
||||||
|
return sorted(
|
||||||
|
(
|
||||||
|
replace(
|
||||||
|
result,
|
||||||
|
score=scores[result.chunk_id],
|
||||||
|
vector_score=vector_scores.get(result.chunk_id),
|
||||||
|
bm25_score=bm25_scores.get(result.chunk_id),
|
||||||
|
fused_score=scores[result.chunk_id],
|
||||||
|
rank_source="Hybrid",
|
||||||
|
)
|
||||||
|
for result in by_chunk.values()
|
||||||
|
),
|
||||||
|
key=lambda result: result.score,
|
||||||
|
reverse=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def search_result_from_row(row: Mapping[str, object]) -> SearchResult:
|
||||||
|
"""Convert a database row mapping into a search result."""
|
||||||
|
return SearchResult(
|
||||||
|
chunk_id=int(row["chunk_id"]),
|
||||||
|
text=str(row["text"]),
|
||||||
|
source_title=str(row["source_title"]),
|
||||||
|
source_author=optional_str(row["source_author"]),
|
||||||
|
chapter_title=optional_str(row["chapter_title"]),
|
||||||
|
page_label=optional_str(row["page_label"]),
|
||||||
|
score=float(row["score"]) if "score" in row else 0.0,
|
||||||
|
vector_score=float(row["score"]) if "score" in row else None,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def optional_str(value: object) -> str | None:
|
||||||
|
"""Convert nullable database values to optional strings."""
|
||||||
|
if value is None:
|
||||||
|
return None
|
||||||
|
return str(value)
|
||||||
|
|
||||||
|
|
||||||
|
TOKEN_RE = re.compile(r"[A-Za-z0-9_]+")
|
||||||
|
|
||||||
|
|
||||||
|
def tokens(text_value: str) -> list[str]:
|
||||||
|
"""Extract tokens from a text value.
|
||||||
|
|
||||||
|
This is a simple approximation of the tokenization used by PostgreSQL's full-text search,
|
||||||
|
which is sufficient for BM25 candidate retrieval. It lowercases tokens and includes alphanumeric characters and
|
||||||
|
underscores.
|
||||||
|
"""
|
||||||
|
return [match.group(0).lower() for match in TOKEN_RE.finditer(text_value)]
|
||||||
|
|
||||||
|
|
||||||
|
QUERY_STOP_WORDS = {
|
||||||
|
"a",
|
||||||
|
"an",
|
||||||
|
"and",
|
||||||
|
"are",
|
||||||
|
"as",
|
||||||
|
"at",
|
||||||
|
"does",
|
||||||
|
"for",
|
||||||
|
"in",
|
||||||
|
"is",
|
||||||
|
"of",
|
||||||
|
"the",
|
||||||
|
"to",
|
||||||
|
"what",
|
||||||
|
"when",
|
||||||
|
"where",
|
||||||
|
"which",
|
||||||
|
"who",
|
||||||
|
"why",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def retrieval_query_from_text(query: str) -> str:
|
||||||
|
"""Remove generic question words while preserving entity and series terms."""
|
||||||
|
keywords = [token for token in tokens(query) if token not in QUERY_STOP_WORDS]
|
||||||
|
if not keywords:
|
||||||
|
return query
|
||||||
|
return " ".join(keywords)
|
||||||
@@ -0,0 +1,36 @@
|
|||||||
|
"""Runtime timing helpers for EPUB search."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from time import perf_counter
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from collections.abc import Callable
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class RuntimeStep:
|
||||||
|
"""Elapsed runtime for one named search step."""
|
||||||
|
|
||||||
|
name: str
|
||||||
|
duration_ms: float
|
||||||
|
counts_toward_total: bool = True
|
||||||
|
|
||||||
|
|
||||||
|
def runtime_step_from_start(name: str, start_seconds: float) -> RuntimeStep:
|
||||||
|
"""Create a runtime step from a prior perf_counter timestamp."""
|
||||||
|
return RuntimeStep(name=name, duration_ms=(perf_counter() - start_seconds) * 1000)
|
||||||
|
|
||||||
|
|
||||||
|
def timed_result[T, **P](
|
||||||
|
name: str,
|
||||||
|
operation: Callable[P, T],
|
||||||
|
*args: P.args,
|
||||||
|
**kwargs: P.kwargs,
|
||||||
|
) -> tuple[T, RuntimeStep]:
|
||||||
|
"""Run an operation and return its result plus elapsed runtime."""
|
||||||
|
start_seconds = perf_counter()
|
||||||
|
result = operation(*args, **kwargs)
|
||||||
|
return result, runtime_step_from_start(name, start_seconds)
|
||||||
Reference in New Issue
Block a user