From e3266ac6949868856e89d41fc997edaad395035c Mon Sep 17 00:00:00 2001 From: Richie Cahill Date: Fri, 12 Jun 2026 03:06:16 -0400 Subject: [PATCH] built ingest --- python/ebook_search/epub_parse.py | 95 +++++++++++++++ python/ebook_search/ingest.py | 190 ++++++++++++++++++++++++++++++ 2 files changed, 285 insertions(+) create mode 100644 python/ebook_search/epub_parse.py create mode 100644 python/ebook_search/ingest.py diff --git a/python/ebook_search/epub_parse.py b/python/ebook_search/epub_parse.py new file mode 100644 index 0000000..919a096 --- /dev/null +++ b/python/ebook_search/epub_parse.py @@ -0,0 +1,95 @@ +"""EPUB parsing helpers.""" + +from __future__ import annotations + +import re +from dataclasses import dataclass +from typing import TYPE_CHECKING + +from bs4 import BeautifulSoup +from ebooklib import ITEM_DOCUMENT, epub + +if TYPE_CHECKING: + from pathlib import Path + +WHITESPACE_RE = re.compile(r"\s+") + + +@dataclass(frozen=True) +class ParsedChapter: + """Text extracted from one EPUB spine document.""" + + title: str | None + href: str | None + text: str + page_labels: tuple[str, ...] + + +@dataclass(frozen=True) +class ParsedEpub: + """Parsed EPUB metadata and text.""" + + title: str + author: str | None + language: str | None + publisher: str | None + identifier: str | None + chapters: tuple[ParsedChapter, ...] + + +def parse_epub(path: Path) -> ParsedEpub: + """Parse EPUB metadata and spine text.""" + book = epub.read_epub(path) + chapters = [] + for item in book.get_items_of_type(ITEM_DOCUMENT): + soup = BeautifulSoup(item.get_content(), "html.parser") + title = chapter_title(soup) + page_labels = tuple(extract_page_labels(soup)) + text = clean_text(soup.get_text(" ")) + if text: + chapters.append(ParsedChapter(title=title, href=item.get_name(), text=text, page_labels=page_labels)) + + return ParsedEpub( + title=metadata_value(book, "title") or path.stem, + author=metadata_value(book, "creator"), + language=metadata_value(book, "language"), + publisher=metadata_value(book, "publisher"), + identifier=metadata_value(book, "identifier"), + chapters=tuple(chapters), + ) + + +def metadata_value(book: epub.EpubBook, name: str) -> str | None: + """Return the first non-empty Dublin Core metadata value for a name.""" + values = book.get_metadata("DC", name) + if not values: + return None + value = values[0][0] + return str(value).strip() or None + + +def chapter_title(soup: BeautifulSoup) -> str | None: + """Extract the best available title from an EPUB document soup.""" + heading = soup.find(["h1", "h2", "h3"]) + if heading is None: + title = soup.find("title") + if title is None: + return None + return clean_text(title.get_text(" ")) or None + return clean_text(heading.get_text(" ")) or None + + +def extract_page_labels(soup: BeautifulSoup) -> list[str]: + """Extract EPUB page-break labels from a document soup.""" + labels: list[str] = [] + for tag in soup.find_all(attrs={"epub:type": "pagebreak"}): + label = tag.get("title") or tag.get("aria-label") or tag.get_text(" ") + clean = clean_text(str(label)) + if clean: + labels.append(clean) + return labels + + +def clean_text(text: str) -> str: + """Normalize whitespace in extracted EPUB text.""" + return WHITESPACE_RE.sub(" ", text).strip() diff --git a/python/ebook_search/ingest.py b/python/ebook_search/ingest.py new file mode 100644 index 0000000..2b8e44a --- /dev/null +++ b/python/ebook_search/ingest.py @@ -0,0 +1,190 @@ +"""EPUB ingestion into Richie DB.""" + +from __future__ import annotations + +import hashlib +import logging +from dataclasses import dataclass +from datetime import UTC, datetime +from pathlib import Path +from typing import TYPE_CHECKING + +import tiktoken +from sqlalchemy import or_, select + +from python.ebook_search.epub_parse import parse_epub +from python.orm.richie import EbookChapter, EbookChunk, EbookSource + +logger = logging.getLogger(__name__) +DEFAULT_CHUNK_TOKENS = 700 +DEFAULT_CHUNK_OVERLAP = 100 + +if TYPE_CHECKING: + from sqlalchemy.orm import Session + + from python.ebook_search.config import EbookSearchConfig + from python.ebook_search.epub_parse import ParsedChapter + + +@dataclass(frozen=True) +class TextChunk: + """A token-bounded chunk of text.""" + + text: str + token_start: int + token_count: int + + +def chunk_text( + text: str, + *, + chunk_tokens: int = DEFAULT_CHUNK_TOKENS, + overlap_tokens: int = DEFAULT_CHUNK_OVERLAP, +) -> list[TextChunk]: + """Split text into overlapping token chunks.""" + if chunk_tokens <= 0: + msg = "chunk_tokens must be positive" + raise ValueError(msg) + if overlap_tokens < 0 or overlap_tokens >= chunk_tokens: + msg = "overlap_tokens must be non-negative and smaller than chunk_tokens" + raise ValueError(msg) + + encoding = tiktoken.get_encoding("cl100k_base") + tokens = encoding.encode(text) + if not tokens: + return [] + + chunks: list[TextChunk] = [] + step = chunk_tokens - overlap_tokens + for start in range(0, len(tokens), step): + chunk = tokens[start : start + chunk_tokens] + if not chunk: + continue + chunks.append( + TextChunk( + text=encoding.decode(chunk).strip(), + token_start=start, + token_count=len(chunk), + ) + ) + if start + chunk_tokens >= len(tokens): + break + return [chunk for chunk in chunks if chunk.text] + + +def ingest_configured_paths(session: Session, config: EbookSearchConfig) -> int: + """Ingest every EPUB found under configured library paths.""" + count = 0 + for library_path in config.library_paths: + path = Path(library_path).expanduser() + logger.info("ebook_ingest_path_start path=%s", path) + if path.is_file() and path.suffix.lower() == ".epub": + count += int(ingest_file(session, path)) + elif path.is_dir(): + for epub_path in sorted(path.rglob("*.epub")): + count += int(ingest_file(session, epub_path)) + else: + logger.warning("ebook_ingest_path_missing path=%s", path) + logger.info("ebook_ingest_paths_complete changed_files=%s configured_paths=%s", count, len(config.library_paths)) + return count + + +def ingest_file(session: Session, path: Path) -> bool: + """Ingest one EPUB file. Return True when the database changed.""" + resolved_path = path.expanduser().resolve() + logger.info("ebook_ingest_file_start path=%s", resolved_path) + file_hash = sha256_file(resolved_path) + existing = find_existing_source(session, resolved_path, file_hash) + if existing is not None and existing.file_sha256 == file_hash: + stat = resolved_path.stat() + existing.file_path = str(resolved_path) + existing.file_mtime = datetime.fromtimestamp(stat.st_mtime, tz=UTC) + existing.file_size = stat.st_size + session.flush() + logger.info("ebook_ingest_file_unchanged source_id=%s path=%s", existing.id, resolved_path) + return False + if existing is not None: + logger.info("ebook_ingest_file_replacing source_id=%s path=%s", existing.id, resolved_path) + session.delete(existing) + session.flush() + + stat = resolved_path.stat() + parsed = parse_epub(resolved_path) + source = EbookSource( + title=parsed.title, + author=parsed.author, + language=parsed.language, + publisher=parsed.publisher, + identifier=parsed.identifier, + file_path=str(resolved_path), + file_sha256=file_hash, + file_mtime=datetime.fromtimestamp(stat.st_mtime, tz=UTC), + file_size=stat.st_size, + ) + session.add(source) + session.flush() + + chunk_index = 0 + for spine_index, parsed_chapter in enumerate(parsed.chapters): + chapter = EbookChapter( + source_id=source.id, + spine_index=spine_index, + title=parsed_chapter.title, + href=parsed_chapter.href, + ) + session.add(chapter) + session.flush() + chunk_index = add_chapter_chunks(session, source, chapter, parsed_chapter, chunk_index) + + session.flush() + logger.info( + "ebook_ingest_file_complete source_id=%s path=%s chapters=%s chunks=%s", + source.id, + resolved_path, + len(parsed.chapters), + chunk_index, + ) + return True + + +def find_existing_source(session: Session, path: Path, file_hash: str) -> EbookSource | None: + """Find an existing source by canonical path or file hash.""" + return session.scalar( + select(EbookSource).where(or_(EbookSource.file_path == str(path), EbookSource.file_sha256 == file_hash)) + ) + + +def add_chapter_chunks( + session: Session, + source: EbookSource, + chapter: EbookChapter, + parsed_chapter: ParsedChapter, + chunk_index: int, +) -> int: + """Add chunk rows for one parsed chapter and return the next chunk index.""" + page_label = parsed_chapter.page_labels[0] if parsed_chapter.page_labels else None + for text_chunk in chunk_text(parsed_chapter.text): + session.add( + EbookChunk( + source_id=source.id, + chapter_id=chapter.id, + chunk_index=chunk_index, + text=text_chunk.text, + token_start=text_chunk.token_start, + token_count=text_chunk.token_count, + page_label=page_label, + content_sha256=hashlib.sha256(text_chunk.text.encode()).hexdigest(), + search_text=f"{source.title} {source.author or ''} {chapter.title or ''} {text_chunk.text}", + ) + ) + chunk_index += 1 + return chunk_index + + +def sha256_file(path: Path) -> str: + """Calculate the SHA-256 digest for a file.""" + digest = hashlib.sha256() + with path.open("rb") as file: + for block in iter(lambda: file.read(1024 * 1024), b""): + digest.update(block) + return digest.hexdigest()