built ingest

This commit is contained in:
2026-06-12 03:06:16 -04:00
parent d740b25b2c
commit c5293b0dcf
2 changed files with 285 additions and 0 deletions
+190
View File
@@ -0,0 +1,190 @@
"""EPUB ingestion into Richie DB."""
from __future__ import annotations
import hashlib
import logging
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING
import tiktoken
from sqlalchemy import or_, select
from python.ebook_search.epub_parse import parse_epub
from python.orm.richie import EbookChapter, EbookChunk, EbookSource
logger = logging.getLogger(__name__)
DEFAULT_CHUNK_TOKENS = 700
DEFAULT_CHUNK_OVERLAP = 100
if TYPE_CHECKING:
from sqlalchemy.orm import Session
from python.ebook_search.config import EbookSearchConfig
from python.ebook_search.epub_parse import ParsedChapter
@dataclass(frozen=True)
class TextChunk:
"""A token-bounded chunk of text."""
text: str
token_start: int
token_count: int
def chunk_text(
text: str,
*,
chunk_tokens: int = DEFAULT_CHUNK_TOKENS,
overlap_tokens: int = DEFAULT_CHUNK_OVERLAP,
) -> list[TextChunk]:
"""Split text into overlapping token chunks."""
if chunk_tokens <= 0:
msg = "chunk_tokens must be positive"
raise ValueError(msg)
if overlap_tokens < 0 or overlap_tokens >= chunk_tokens:
msg = "overlap_tokens must be non-negative and smaller than chunk_tokens"
raise ValueError(msg)
encoding = tiktoken.get_encoding("cl100k_base")
tokens = encoding.encode(text)
if not tokens:
return []
chunks: list[TextChunk] = []
step = chunk_tokens - overlap_tokens
for start in range(0, len(tokens), step):
chunk = tokens[start : start + chunk_tokens]
if not chunk:
continue
chunks.append(
TextChunk(
text=encoding.decode(chunk).strip(),
token_start=start,
token_count=len(chunk),
)
)
if start + chunk_tokens >= len(tokens):
break
return [chunk for chunk in chunks if chunk.text]
def ingest_configured_paths(session: Session, config: EbookSearchConfig) -> int:
"""Ingest every EPUB found under configured library paths."""
count = 0
for library_path in config.library_paths:
path = Path(library_path).expanduser()
logger.info("ebook_ingest_path_start path=%s", path)
if path.is_file() and path.suffix.lower() == ".epub":
count += int(ingest_file(session, path))
elif path.is_dir():
for epub_path in sorted(path.rglob("*.epub")):
count += int(ingest_file(session, epub_path))
else:
logger.warning("ebook_ingest_path_missing path=%s", path)
logger.info("ebook_ingest_paths_complete changed_files=%s configured_paths=%s", count, len(config.library_paths))
return count
def ingest_file(session: Session, path: Path) -> bool:
"""Ingest one EPUB file. Return True when the database changed."""
resolved_path = path.expanduser().resolve()
logger.info("ebook_ingest_file_start path=%s", resolved_path)
file_hash = sha256_file(resolved_path)
existing = find_existing_source(session, resolved_path, file_hash)
if existing is not None and existing.file_sha256 == file_hash:
stat = resolved_path.stat()
existing.file_path = str(resolved_path)
existing.file_mtime = datetime.fromtimestamp(stat.st_mtime, tz=UTC)
existing.file_size = stat.st_size
session.flush()
logger.info("ebook_ingest_file_unchanged source_id=%s path=%s", existing.id, resolved_path)
return False
if existing is not None:
logger.info("ebook_ingest_file_replacing source_id=%s path=%s", existing.id, resolved_path)
session.delete(existing)
session.flush()
stat = resolved_path.stat()
parsed = parse_epub(resolved_path)
source = EbookSource(
title=parsed.title,
author=parsed.author,
language=parsed.language,
publisher=parsed.publisher,
identifier=parsed.identifier,
file_path=str(resolved_path),
file_sha256=file_hash,
file_mtime=datetime.fromtimestamp(stat.st_mtime, tz=UTC),
file_size=stat.st_size,
)
session.add(source)
session.flush()
chunk_index = 0
for spine_index, parsed_chapter in enumerate(parsed.chapters):
chapter = EbookChapter(
source_id=source.id,
spine_index=spine_index,
title=parsed_chapter.title,
href=parsed_chapter.href,
)
session.add(chapter)
session.flush()
chunk_index = add_chapter_chunks(session, source, chapter, parsed_chapter, chunk_index)
session.flush()
logger.info(
"ebook_ingest_file_complete source_id=%s path=%s chapters=%s chunks=%s",
source.id,
resolved_path,
len(parsed.chapters),
chunk_index,
)
return True
def find_existing_source(session: Session, path: Path, file_hash: str) -> EbookSource | None:
"""Find an existing source by canonical path or file hash."""
return session.scalar(
select(EbookSource).where(or_(EbookSource.file_path == str(path), EbookSource.file_sha256 == file_hash))
)
def add_chapter_chunks(
session: Session,
source: EbookSource,
chapter: EbookChapter,
parsed_chapter: ParsedChapter,
chunk_index: int,
) -> int:
"""Add chunk rows for one parsed chapter and return the next chunk index."""
page_label = parsed_chapter.page_labels[0] if parsed_chapter.page_labels else None
for text_chunk in chunk_text(parsed_chapter.text):
session.add(
EbookChunk(
source_id=source.id,
chapter_id=chapter.id,
chunk_index=chunk_index,
text=text_chunk.text,
token_start=text_chunk.token_start,
token_count=text_chunk.token_count,
page_label=page_label,
content_sha256=hashlib.sha256(text_chunk.text.encode()).hexdigest(),
search_text=f"{source.title} {source.author or ''} {chapter.title or ''} {text_chunk.text}",
)
)
chunk_index += 1
return chunk_index
def sha256_file(path: Path) -> str:
"""Calculate the SHA-256 digest for a file."""
digest = hashlib.sha256()
with path.open("rb") as file:
for block in iter(lambda: file.read(1024 * 1024), b""):
digest.update(block)
return digest.hexdigest()