From b8a21a601329343bf0d2de70c3adb1c08a1ef70c Mon Sep 17 00:00:00 2001 From: Richie Cahill Date: Sat, 6 Jun 2026 15:20:57 -0400 Subject: [PATCH] built workflow --- python/tools/audiobook/__init__.py | 1 + python/tools/audiobook/audible_convert.py | 428 ++++++++ python/tools/audiobook/metadata_agent.py | 1080 +++++++++++++++++++++ tests/test_audible_convert.py | 969 ++++++++++++++++++ tests/test_audiobook_catalog.py | 126 +++ tests/test_gitea_flake_lock.py | 86 ++ 6 files changed, 2690 insertions(+) create mode 100644 python/tools/audiobook/__init__.py create mode 100644 python/tools/audiobook/audible_convert.py create mode 100644 python/tools/audiobook/metadata_agent.py create mode 100644 tests/test_audible_convert.py create mode 100644 tests/test_audiobook_catalog.py create mode 100644 tests/test_gitea_flake_lock.py diff --git a/python/tools/audiobook/__init__.py b/python/tools/audiobook/__init__.py new file mode 100644 index 0000000..9dcd287 --- /dev/null +++ b/python/tools/audiobook/__init__.py @@ -0,0 +1 @@ +"""Audiobook tools.""" diff --git a/python/tools/audiobook/audible_convert.py b/python/tools/audiobook/audible_convert.py new file mode 100644 index 0000000..0c77346 --- /dev/null +++ b/python/tools/audiobook/audible_convert.py @@ -0,0 +1,428 @@ +"""Convert Audible AAX downloads into Audiobookshelf-friendly M4B files.""" + +from __future__ import annotations + +import json +import logging +import shutil +import subprocess +from concurrent.futures import ThreadPoolExecutor +from dataclasses import asdict, dataclass +from os import getenv +from pathlib import Path # noqa: TC003 This is required for the typer CLI +from typing import TYPE_CHECKING, Annotated, Any +from uuid import uuid7 + +import typer + +from python.common import configure_logger +from python.orm.common import get_postgres_engine +from python.tools.audiobook.metadata_agent import ( + AgentConfig, + StandardBookMetadata, + standard_book_metadata, + write_agent_log, +) + +if TYPE_CHECKING: + from sqlalchemy.engine import Engine + +logger = logging.getLogger(__name__) + +SENSITIVE_COMMAND_ARGUMENTS = {"-activation_bytes"} + + +@dataclass(frozen=True) +class ConversionConfig: + """Runtime settings for one conversion command.""" + + resolved_output: Path + ollama_api_key: str + agent_config: AgentConfig + engine: Engine + activation_bytes: str | None + dry_run: bool + overwrite: bool + work_directory_name: str = ".audible_convert" + temp_directory_name: str = "tmp" + log_directory_name: str = "logs" + review_directory_name: str = "review" + + +@dataclass(frozen=True) +class ConcurrentConversionResult: + """Result from running ffmpeg and metadata resolution together.""" + + metadata: StandardBookMetadata | None + conversion_error: Exception | None + metadata_error: Exception | None + + +class CommandExecutionError(RuntimeError): + """Command failed without exposing sensitive arguments.""" + + def __init__(self, arguments: list[str], returncode: int) -> None: + """Create a redacted command failure.""" + self.arguments = tuple(arguments) + self.returncode = returncode + command = " ".join(redact_command_arguments(arguments)) + super().__init__(f"Command failed with exit code {returncode}: {command}") + + +def main( + input_directory: Annotated[Path, typer.Argument(help="Directory audible-cli downloads AAX files into.")], + output_directory: Annotated[Path, typer.Argument(help="Audiobook output directory.")], + *, + dry_run: Annotated[bool, typer.Option("--dry-run", help="Print planned output files without converting.")] = False, + overwrite: Annotated[bool, typer.Option("--overwrite", help="Overwrite existing M4B files.")] = False, +) -> None: + """Convert AAX files from a download directory into M4B files.""" + configure_logger() + resolved_input = input_directory.resolve(strict=True) + resolved_output = output_directory.resolve() + if not dry_run: + resolved_output.mkdir(parents=True, exist_ok=True) + + ollama_api_key = getenv("OLLAMA_API_KEY") + if not ollama_api_key: + msg = "OLLAMA_API_KEY is required for audiobook metadata resolution" + raise RuntimeError(msg) + + config = ConversionConfig( + resolved_output=resolved_output, + ollama_api_key=ollama_api_key, + agent_config=AgentConfig(), + engine=get_postgres_engine(name="RICHIE"), + activation_bytes=getenv("AUDIBLE_ACTIVATION_BYTES"), + dry_run=dry_run, + overwrite=overwrite, + ) + + aax_files = sorted(resolved_input.glob("*.aax")) + if not aax_files: + logger.info("No AAX files found in %s", resolved_input) + return + for aax_file in aax_files: + logger.info("Converting %s", aax_file) + convert_aax_file_with_agent(aax_file, config) + + +def run_command(arguments: list[str], *, capture: bool = False) -> subprocess.CompletedProcess[str]: + """Run a command and return the completed process. + + Args: + arguments: Command and arguments to run. + capture: Whether to capture stdout and stderr. + + Returns: + The completed process. + """ + logger.debug("%s", " ".join(redact_command_arguments(arguments))) + try: + return subprocess.run(arguments, check=True, capture_output=capture, text=True) + except subprocess.CalledProcessError as error: + raise CommandExecutionError(arguments, error.returncode) from error + + +def redact_command_arguments(arguments: list[str]) -> list[str]: + """Return command arguments with sensitive values redacted.""" + redacted = [] + redact_next = False + for argument in arguments: + if redact_next: + redacted.append("") + redact_next = False + continue + + redacted.append(argument) + redact_next = argument in SENSITIVE_COMMAND_ARGUMENTS + return redacted + + +def read_metadata(aax_file: Path) -> dict[str, str]: + """Read ffprobe format tags from an AAX file. + + Args: + aax_file: AAX file to inspect. + + Returns: + Lower-cased metadata tag names mapped to their values. + """ + completed = run_command( + [ + "ffprobe", + "-v", + "quiet", + "-print_format", + "json", + "-show_format", + str(aax_file), + ], + capture=True, + ) + ffprobe_data: dict[str, Any] = json.loads(completed.stdout) + tags = ffprobe_data.get("format", {}).get("tags", {}) + return {str(key).lower(): str(value) for key, value in tags.items()} + + +def output_stem(metadata: StandardBookMetadata) -> str: + """Build the output stem for a book. + + Args: + metadata: Book metadata. + + Returns: + Output stem in author-series_01-title form. + """ + return f"{metadata.author}-{metadata.series}_{metadata.series_index:02}-{metadata.title}" + + +def metadata_output_path(output_directory: Path, metadata: StandardBookMetadata) -> Path: + """Build the final M4B path from resolved metadata.""" + stem = output_stem(metadata) + return output_directory / stem / f"{stem}.m4b" + + +def convert_aax_file( + aax_file: Path, + destination: Path, + activation_bytes: str | None, + *, + overwrite: bool, +) -> None: + """Convert an AAX file into an M4B file. + + Args: + aax_file: Source AAX file. + destination: Destination M4B file. + activation_bytes: Optional Audible activation bytes for ffmpeg. + overwrite: Whether to overwrite an existing M4B. + """ + if destination.exists() and not overwrite: + logger.info("Skipping existing file %s", destination) + return + + destination.parent.mkdir(parents=True, exist_ok=True) + arguments = ["ffmpeg", "-hide_banner", "-y" if overwrite else "-n"] + if activation_bytes: + arguments.extend(["-activation_bytes", activation_bytes]) + arguments.extend(["-i", str(aax_file), "-map_metadata", "0", "-c", "copy", str(destination)]) + run_command(arguments) + + +def write_review_file( + *, + destination: Path | None, + ffprobe_metadata: dict[str, str], + log_file: Path, + metadata: StandardBookMetadata | None, + reason: str, + review_file: Path, + source: Path, + temp_file: Path | None, +) -> None: + """Write a manual review file for an unresolved conversion.""" + review_file.parent.mkdir(parents=True, exist_ok=True) + payload = { + "destination": str(destination) if destination else None, + "ffprobe_metadata": ffprobe_metadata, + "metadata": asdict(metadata) if metadata else None, + "reason": reason, + "source": str(source), + "temp_file": str(temp_file) if temp_file else None, + } + review_file.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8") + write_agent_log(log_file, "review_written", path=str(review_file), reason=reason) + + +def cleanup_temp_output(temp_file: Path) -> None: + """Remove a run's temporary output directory.""" + shutil.rmtree(temp_file.parent, ignore_errors=True) + + +def dry_run_aax_file_with_agent( + aax_file: Path, + ffprobe_metadata: dict[str, str], + engine: Engine, + config: ConversionConfig, + log_file: Path, + review_file: Path, +) -> None: + """Resolve and print the planned output path without converting.""" + metadata = standard_book_metadata( + aax_file.name, + ffprobe_metadata, + engine, + log_file, + config.ollama_api_key, + config.agent_config, + ) + destination = None if metadata.needs_review else metadata_output_path(config.resolved_output, metadata) + if metadata.needs_review: + write_review_file( + destination=destination, + ffprobe_metadata=ffprobe_metadata, + log_file=log_file, + metadata=metadata, + reason="metadata_needs_review", + review_file=review_file, + source=aax_file, + temp_file=None, + ) + typer.echo(f"{aax_file} -> REVIEW {review_file}") + else: + typer.echo(f"{aax_file} -> {destination}") + + +def convert_temp_file_and_resolve_metadata( + aax_file: Path, + temp_file: Path, + ffprobe_metadata: dict[str, str], + config: ConversionConfig, + log_file: Path, +) -> ConcurrentConversionResult: + """Run ffmpeg and metadata resolution in parallel.""" + conversion_error: Exception | None = None + metadata_error: Exception | None = None + metadata: StandardBookMetadata | None = None + + with ThreadPoolExecutor(max_workers=2) as executor: + conversion_future = executor.submit( + convert_aax_file, + aax_file, + temp_file, + config.activation_bytes, + overwrite=True, + ) + metadata_future = executor.submit( + standard_book_metadata, + aax_file.name, + ffprobe_metadata, + config.engine, + log_file, + config.ollama_api_key, + config.agent_config, + ) + + conversion_error = conversion_future.exception() + if conversion_error is None: + conversion_future.result() + + metadata_error = metadata_future.exception() + if metadata_error is None: + metadata = metadata_future.result() + + return ConcurrentConversionResult( + metadata=metadata, + conversion_error=conversion_error, + metadata_error=metadata_error, + ) + + +def convert_aax_file_with_agent(aax_file: Path, config: ConversionConfig) -> None: + """Convert one AAX file using the metadata agent for the final path.""" + run_id = uuid7().hex + log_file = config.resolved_output / config.work_directory_name / config.log_directory_name / f"{run_id}.jsonl" + review_file = config.resolved_output / config.work_directory_name / config.review_directory_name / f"{run_id}.json" + write_agent_log(log_file, "conversion_start", source=str(aax_file), dry_run=config.dry_run) + try: + ffprobe_metadata = read_metadata(aax_file) + except Exception as error: + logger.exception("ffprobe failed") + write_review_file( + destination=None, + ffprobe_metadata={}, + log_file=log_file, + metadata=None, + reason=f"ffprobe_failed: {error}", + review_file=review_file, + source=aax_file, + temp_file=None, + ) + return + + if config.dry_run: + dry_run_aax_file_with_agent( + aax_file, + ffprobe_metadata, + config.engine, + config, + log_file, + review_file, + ) + return + + temp_file = ( + config.resolved_output / config.work_directory_name / config.temp_directory_name / run_id / "converted.m4b" + ) + temp_file.parent.mkdir(parents=True, exist_ok=True) + + result = convert_temp_file_and_resolve_metadata(aax_file, temp_file, ffprobe_metadata, config, log_file) + + if result.conversion_error: + reason = f"ffmpeg_failed: {result.conversion_error}" + write_review_file( + destination=None, + ffprobe_metadata=ffprobe_metadata, + log_file=log_file, + metadata=result.metadata, + reason=reason, + review_file=review_file, + source=aax_file, + temp_file=temp_file if temp_file.exists() else None, + ) + return + + if result.metadata_error: + write_review_file( + destination=None, + ffprobe_metadata=ffprobe_metadata, + log_file=log_file, + metadata=None, + reason=f"metadata_failed: {result.metadata_error}", + review_file=review_file, + source=aax_file, + temp_file=temp_file, + ) + return + + if result.metadata is None or result.metadata.needs_review: + write_review_file( + destination=None, + ffprobe_metadata=ffprobe_metadata, + log_file=log_file, + metadata=result.metadata, + reason="metadata_needs_review", + review_file=review_file, + source=aax_file, + temp_file=temp_file, + ) + return + + destination = metadata_output_path(config.resolved_output, result.metadata) + if destination.exists() and not config.overwrite: + write_agent_log(log_file, "destination_exists", destination=str(destination)) + cleanup_temp_output(temp_file) + return + + destination.parent.mkdir(parents=True, exist_ok=True) + try: + temp_file.replace(destination) + except Exception as error: # noqa: BLE001 + write_review_file( + destination=destination, + ffprobe_metadata=ffprobe_metadata, + log_file=log_file, + metadata=result.metadata, + reason=f"rename_failed: {error}", + review_file=review_file, + source=aax_file, + temp_file=temp_file if temp_file.exists() else None, + ) + else: + cleanup_temp_output(temp_file) + write_agent_log(log_file, "conversion_complete", destination=str(destination)) + + +if __name__ == "__main__": + typer.run(main) diff --git a/python/tools/audiobook/metadata_agent.py b/python/tools/audiobook/metadata_agent.py new file mode 100644 index 0000000..1877f43 --- /dev/null +++ b/python/tools/audiobook/metadata_agent.py @@ -0,0 +1,1080 @@ +"""Resolve audiobook metadata with a controlled Ollama tool loop.""" + +from __future__ import annotations + +import json +import re +import time +from dataclasses import asdict, dataclass, is_dataclass, replace +from os import PathLike +from typing import TYPE_CHECKING + +import httpx +from sqlalchemy import or_, select +from sqlalchemy.orm import Session + +from python.common import utcnow +from python.orm.richie import Audiobook, AudiobookAuthor, AudiobookSeries + +if TYPE_CHECKING: + from pathlib import Path + + from sqlalchemy.engine import Engine + +CATALOG_SLUG_PATTERN = re.compile(r"^[a-z0-9]+(?:_[a-z0-9]+)*$") +FENCED_JSON_PATTERN = re.compile(r"^```(?:json)?\s*(?P.*?)\s*```$", re.IGNORECASE | re.DOTALL) +TITLE_SLUG_PATTERN = re.compile(r"^[a-z0-9]+(?:-[a-z0-9]+)*$") + + +class MetadataResolutionError(ValueError): + """Metadata resolution failed validation.""" + + +@dataclass(frozen=True) +class AgentConfig: + """Runtime settings for the audiobook metadata agent.""" + + model: str = "deepseek-v4-flash:cloud" + ollama_chat_url: str = "https://ollama.com/api/chat" + http_timeout_seconds: int = 300 + max_agent_turns: int = 8 + max_tool_results: int = 10 + min_confidence: float = 0.85 + invalid_final_retries: int = 1 + standalone_series: str = "standalone" + tool_names: tuple[str, ...] = ( + "search_authors", + "search_series", + "search_books", + "ensure_author", + "ensure_series", + "ensure_book", + ) + + +@dataclass(frozen=True) +class StandardBookMetadata: + """Canonical metadata for the final audiobook path.""" + + author_id: int + author: str + book_id: int | None + title: str + series_id: int | None + series: str + series_index: int + confidence: float + needs_review: bool + evidence: list[str] + + +@dataclass(frozen=True) +class FinalMetadataFields: + """Raw model fields after schema validation.""" + + author_id: int + book_id: int | None + title: str + series_id: int | None + series_index: int + confidence: float + evidence: list[str] + + +@dataclass(frozen=True) +class ResolvedBookFields: + """Book fields after optional catalog book resolution.""" + + book_id: int | None + title: str + series_id: int | None + series_index: int + + +@dataclass(frozen=True) +class EnsuredBook: + """Book row plus whether it was created.""" + + book: Audiobook + action: str + + +@dataclass(frozen=True) +class AgentStepResult: + """Outcome from one model response.""" + + metadata: StandardBookMetadata | None + invalid_final_count: int + should_continue: bool + + +def standard_book_metadata( + aax_file_name: str, + aax_metadata_from_ffprobe: dict[str, str], + engine: Engine, + log_path: Path, + ollama_api_key: str, + config: AgentConfig, +) -> StandardBookMetadata: + """Resolve canonical audiobook metadata with the configured Ollama Cloud model.""" + with Session(engine) as session: + registry = CatalogToolRegistry(session, log_path, config) + agent = AudiobookMetadataAgent( + registry=registry, log_path=log_path, ollama_api_key=ollama_api_key, config=config + ) + metadata = agent.run(aax_file_name, aax_metadata_from_ffprobe) + if metadata.needs_review: + session.rollback() + else: + registry.prune_unused_created_rows(metadata) + session.commit() + return metadata + + +class CatalogToolRegistry: + """Controlled catalog tools exposed to the metadata model.""" + + def __init__(self, session: Session, log_path: Path, config: AgentConfig) -> None: + """Create a registry bound to one database session and audit log.""" + self._session = session + self._log_path = log_path + self._config = config + self.seen_author_ids: set[int] = set() + self.seen_series_ids: set[int] = set() + self.seen_book_ids: set[int] = set() + self.created_author_ids: set[int] = set() + self.created_series_ids: set[int] = set() + self.created_book_ids: set[int] = set() + + def tool_schemas(self) -> list[dict[str, object]]: + """Return Ollama tool schemas.""" + schemas = [ + { + "type": "function", + "function": { + "name": "search_authors", + "description": "Search canonical audiobook authors by slug or noisy source text.", + "parameters": { + "type": "object", + "properties": {"query": {"type": "string"}}, + "required": ["query"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "search_series", + "description": "Search canonical audiobook series by slug or noisy source text.", + "parameters": { + "type": "object", + "properties": { + "query": {"type": "string"}, + "author_id": {"type": ["integer", "null"]}, + }, + "required": ["query"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "search_books", + "description": "Search canonical audiobook titles with optional author and series filters.", + "parameters": { + "type": "object", + "properties": { + "query": {"type": "string"}, + "author_id": {"type": ["integer", "null"]}, + "series_id": {"type": ["integer", "null"]}, + }, + "required": ["query"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "ensure_author", + "description": "Normalize an author name to a catalog slug, then return or create that author.", + "parameters": { + "type": "object", + "properties": {"name": {"type": "string"}}, + "required": ["name"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "ensure_series", + "description": "Normalize a series name to a catalog slug, then return or create it for an author.", + "parameters": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "author_id": {"type": "integer"}, + }, + "required": ["name", "author_id"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "ensure_book", + "description": "Normalize a title to a book slug, then return or create it for an author/series.", + "parameters": { + "type": "object", + "properties": { + "title": {"type": "string"}, + "author_id": {"type": "integer"}, + "series_id": {"type": ["integer", "null"]}, + "series_index": {"type": "integer"}, + }, + "required": ["title", "author_id", "series_id", "series_index"], + }, + }, + }, + ] + enabled_tool_names = set(self._config.tool_names) + return [schema for schema in schemas if schema["function"]["name"] in enabled_tool_names] + + def run(self, name: str, arguments: dict[str, object]) -> list[dict[str, object]]: + """Run a single read-only catalog tool.""" + handlers = { + "search_authors": self.run_search_authors, + "search_series": self.run_search_series, + "search_books": self.run_search_books, + "ensure_author": self.run_ensure_author, + "ensure_series": self.run_ensure_series, + "ensure_book": self.run_ensure_book, + } + handler = handlers.get(name) + if handler is None: + write_agent_log(self._log_path, "tool_error", tool=name, arguments=arguments, error="unknown_tool") + msg = f"Unknown audiobook metadata tool: {name}" + raise MetadataResolutionError(msg) + if name not in self._config.tool_names: + write_agent_log(self._log_path, "tool_error", tool=name, arguments=arguments, error="tool_not_enabled") + msg = f"Audiobook metadata tool is not enabled: {name}" + raise MetadataResolutionError(msg) + + started = time.perf_counter() + write_agent_log(self._log_path, "tool_call", tool=name, arguments=arguments) + result = handler(arguments) + duration_ms = round((time.perf_counter() - started) * 1000, 3) + write_agent_log( + self._log_path, + "tool_result", + tool=name, + duration_ms=duration_ms, + result_count=len(result), + preview=result[:3], + ) + return result + + def get_author(self, author_id: int) -> AudiobookAuthor | None: + """Return an author by id.""" + return self._session.get(AudiobookAuthor, author_id) + + def get_book(self, book_id: int) -> Audiobook | None: + """Return a book by id.""" + return self._session.get(Audiobook, book_id) + + def get_series(self, series_id: int) -> AudiobookSeries | None: + """Return a series by id.""" + return self._session.get(AudiobookSeries, series_id) + + def prune_unused_created_rows(self, metadata: StandardBookMetadata) -> None: + """Remove catalog rows created during this run but not used by final metadata.""" + used_book_ids = {metadata.book_id} if metadata.book_id is not None else set() + for book_id in self.created_book_ids - used_book_ids: + if book := self.get_book(book_id): + self._session.delete(book) + + self._session.flush() + used_series_ids = {metadata.series_id} if metadata.series_id is not None else set() + for series_id in self.created_series_ids - used_series_ids: + series = self.get_series(series_id) + if series and not series.books: + self._session.delete(series) + + self._session.flush() + for author_id in self.created_author_ids - {metadata.author_id}: + author = self.get_author(author_id) + if author and not author.books and not author.series: + self._session.delete(author) + + def run_search_authors(self, arguments: dict[str, object]) -> list[dict[str, object]]: + """Search authors from tool arguments and remember returned ids.""" + query = required_string(arguments, "query") + statement = select(AudiobookAuthor).order_by(AudiobookAuthor.name).limit(self._config.max_tool_results) + if terms := query_terms(query): + statement = statement.where(or_(*(AudiobookAuthor.name.ilike(f"%{term}%") for term in terms))) + + authors = self._session.scalars(statement).all() + self.seen_author_ids.update(author.id for author in authors) + return [{"id": author.id, "name": author.name} for author in authors] + + def run_search_series(self, arguments: dict[str, object]) -> list[dict[str, object]]: + """Search series from tool arguments and remember returned ids.""" + query = required_string(arguments, "query") + author_id = optional_int(arguments.get("author_id"), "author_id") + statement = select(AudiobookSeries).order_by(AudiobookSeries.name).limit(self._config.max_tool_results) + if terms := query_terms(query): + statement = statement.where(or_(*(AudiobookSeries.name.ilike(f"%{term}%") for term in terms))) + if author_id is not None: + statement = statement.where(AudiobookSeries.author_id == author_id) + + series_rows = self._session.scalars(statement).all() + self.seen_series_ids.update(series.id for series in series_rows) + self.seen_author_ids.update(series.author_id for series in series_rows) + return [ + { + "id": series.id, + "name": series.name, + "author_id": series.author_id, + "author": series.author.name, + } + for series in series_rows + ] + + def run_search_books(self, arguments: dict[str, object]) -> list[dict[str, object]]: + """Search books from tool arguments and remember returned ids.""" + query = required_string(arguments, "query") + author_id = optional_int(arguments.get("author_id"), "author_id") + series_id = optional_int(arguments.get("series_id"), "series_id") + statement = select(Audiobook).order_by(Audiobook.title).limit(self._config.max_tool_results) + if terms := query_terms(query): + statement = statement.where(or_(*(Audiobook.title.ilike(f"%{term}%") for term in terms))) + if author_id is not None: + statement = statement.where(Audiobook.author_id == author_id) + if series_id is not None: + statement = statement.where(Audiobook.series_id == series_id) + + books = self._session.scalars(statement).all() + self.seen_book_ids.update(book.id for book in books) + self.seen_author_ids.update(book.author_id for book in books) + self.seen_series_ids.update(book.series_id for book in books if book.series_id is not None) + return [ + { + "id": book.id, + "title": book.title, + "author_id": book.author_id, + "author": book.author.name, + "series_id": book.series_id, + "series": book.series.name if book.series else self._config.standalone_series, + "series_index": book.series_index, + } + for book in books + ] + + def run_ensure_author(self, arguments: dict[str, object]) -> list[dict[str, object]]: + """Ensure an author from tool arguments and return a tool result.""" + name = normalize_catalog_slug(required_string(arguments, "name")) + validate_catalog_slug(name, "author") + author = self._session.scalar(select(AudiobookAuthor).where(AudiobookAuthor.name == name)) + action = "existing" + if author is None: + author = AudiobookAuthor(name=name) + self._session.add(author) + self._session.flush() + self.created_author_ids.add(author.id) + action = "created" + + self.seen_author_ids.add(author.id) + return [{"id": author.id, "name": author.name, "action": action}] + + def run_ensure_series(self, arguments: dict[str, object]) -> list[dict[str, object]]: + """Ensure a series from tool arguments and return a tool result.""" + name = normalize_catalog_slug(required_string(arguments, "name")) + author_id = required_int(arguments, "author_id") + validate_catalog_slug(name, "series") + author = self.required_author(author_id) + series = self._session.scalar( + select(AudiobookSeries).where( + AudiobookSeries.name == name, + AudiobookSeries.author_id == author.id, + ), + ) + action = "existing" + if series is None: + series = AudiobookSeries(name=name, author=author) + self._session.add(series) + self._session.flush() + self.created_series_ids.add(series.id) + action = "created" + + self.seen_author_ids.add(author.id) + self.seen_series_ids.add(series.id) + return [self.series_result(series, action)] + + def run_ensure_book(self, arguments: dict[str, object]) -> list[dict[str, object]]: + """Ensure a book from tool arguments and return a tool result.""" + title = required_string(arguments, "title") + author_id = required_int(arguments, "author_id") + series_id = optional_int(arguments.get("series_id"), "series_id") + series_index = required_int(arguments, "series_index") + ensured = self.ensure_book(title, author_id, series_id, series_index) + return [self.book_result(ensured.book, ensured.action)] + + def ensure_book( + self, + title: str, + author_id: int, + series_id: int | None, + series_index: int, + ) -> EnsuredBook: + """Return an existing book row, or create it after validating ownership.""" + title = normalize_title_slug(title) + validate_title_slug(title) + author = self.required_author(author_id) + series = None + if series_id is None: + if series_index != 0: + msg = "standalone books must use series_index 0" + raise MetadataResolutionError(msg) + else: + series = self.required_series(series_id) + if series.author_id != author.id: + msg = f"series_id {series_id} does not belong to author_id {author_id}" + raise MetadataResolutionError(msg) + if series_index <= 0: + msg = "series books must use a positive series_index" + raise MetadataResolutionError(msg) + + statement = select(Audiobook).where( + Audiobook.title == title, + Audiobook.author_id == author.id, + ) + if series is None: + statement = statement.where(Audiobook.series_id.is_(None)) + else: + statement = statement.where(Audiobook.series_id == series.id) + book = self._session.scalar(statement) + if book is None: + book = Audiobook(title=title, author=author, series=series, series_index=series_index) + self._session.add(book) + self._session.flush() + self.created_book_ids.add(book.id) + action = "created" + else: + action = "existing" + + self.seen_book_ids.add(book.id) + self.seen_author_ids.add(author.id) + if book.series_id is not None: + self.seen_series_ids.add(book.series_id) + return EnsuredBook(book=book, action=action) + + def required_author(self, author_id: int) -> AudiobookAuthor: + """Return an author or fail metadata resolution.""" + author = self.get_author(author_id) + if author is None: + msg = f"author_id {author_id} does not exist" + raise MetadataResolutionError(msg) + return author + + def required_series(self, series_id: int) -> AudiobookSeries: + """Return a series or fail metadata resolution.""" + series = self.get_series(series_id) + if series is None: + msg = f"series_id {series_id} does not exist" + raise MetadataResolutionError(msg) + return series + + def series_result(self, series: AudiobookSeries, action: str) -> dict[str, object]: + """Build a normalized series tool result.""" + return { + "id": series.id, + "name": series.name, + "author_id": series.author_id, + "author": series.author.name, + "action": action, + } + + def book_result(self, book: Audiobook, action: str) -> dict[str, object]: + """Build a normalized book tool result.""" + return { + "id": book.id, + "title": book.title, + "author_id": book.author_id, + "author": book.author.name, + "series_id": book.series_id, + "series": book.series.name if book.series else self._config.standalone_series, + "series_index": book.series_index, + "action": action, + } + + +class AudiobookMetadataAgent: + """Ollama-backed metadata resolver with a fixed local tool registry.""" + + def __init__( + self, + *, + registry: CatalogToolRegistry, + log_path: Path, + ollama_api_key: str, + config: AgentConfig, + ) -> None: + """Create an Ollama metadata agent.""" + self._registry = registry + self._log_path = log_path + self._ollama_api_key = ollama_api_key + self._config = config + + def run(self, aax_file_name: str, aax_metadata_from_ffprobe: dict[str, str]) -> StandardBookMetadata: + """Resolve metadata for one AAX file.""" + messages = [ + {"role": "system", "content": system_prompt()}, + {"role": "user", "content": user_prompt(aax_file_name, aax_metadata_from_ffprobe)}, + ] + invalid_final_count = 0 + result: StandardBookMetadata | None = None + + for turn in range(1, self._config.max_agent_turns + 1): + step = self.run_step(messages, turn, invalid_final_count) + invalid_final_count = step.invalid_final_count + if step.should_continue: + continue + result = step.metadata + break + + if result is None: + return self.force_final_response(messages) + return result + + def run_step( + self, + messages: list[dict[str, object]], + turn: int, + invalid_final_count: int, + ) -> AgentStepResult: + """Run one model turn and return the next agent-loop action.""" + data = self.chat(messages, turn) + message = data.get("message") + if not isinstance(message, dict): + return AgentStepResult( + metadata=review_metadata("Ollama response did not include a message", self._config), + invalid_final_count=invalid_final_count, + should_continue=False, + ) + + try: + tool_calls = parse_tool_calls(message) + except (json.JSONDecodeError, MetadataResolutionError) as error: + return AgentStepResult( + metadata=review_metadata(str(error), self._config), + invalid_final_count=invalid_final_count, + should_continue=False, + ) + if tool_calls: + return self.handle_tool_calls(messages, message, tool_calls, invalid_final_count) + return self.handle_final_message(messages, message, invalid_final_count) + + def handle_tool_calls( + self, + messages: list[dict[str, object]], + message: dict[str, object], + tool_calls: list[tuple[str, dict[str, object]]], + invalid_final_count: int, + ) -> AgentStepResult: + """Run tool calls from one model response and append tool results.""" + messages.append(message) + for tool_name, arguments in tool_calls: + try: + tool_result = self._registry.run(tool_name, arguments) + except MetadataResolutionError as error: + if is_fatal_tool_error(error): + return AgentStepResult( + metadata=review_metadata(str(error), self._config), + invalid_final_count=invalid_final_count, + should_continue=False, + ) + write_agent_log(self._log_path, "tool_error", tool=tool_name, arguments=arguments, error=str(error)) + messages.append( + { + "role": "tool", + "tool_name": tool_name, + "content": json.dumps({"error": str(error)}, sort_keys=True), + }, + ) + continue + messages.append( + { + "role": "tool", + "tool_name": tool_name, + "content": json.dumps(tool_result, sort_keys=True), + }, + ) + return AgentStepResult(metadata=None, invalid_final_count=invalid_final_count, should_continue=True) + + def handle_final_message( + self, + messages: list[dict[str, object]], + message: dict[str, object], + invalid_final_count: int, + ) -> AgentStepResult: + """Validate a final model message or request one retry.""" + content = message.get("content") + if not isinstance(content, str): + return AgentStepResult( + metadata=review_metadata("Ollama final response did not include string content", self._config), + invalid_final_count=invalid_final_count, + should_continue=False, + ) + + try: + resolved = self.validate_final(parse_final_json_content(content)) + except (json.JSONDecodeError, MetadataResolutionError) as error: + return self.handle_invalid_final(messages, error, invalid_final_count) + + write_agent_log(self._log_path, "final_metadata", metadata=resolved) + return AgentStepResult(metadata=resolved, invalid_final_count=invalid_final_count, should_continue=False) + + def handle_invalid_final( + self, + messages: list[dict[str, object]], + error: json.JSONDecodeError | MetadataResolutionError, + invalid_final_count: int, + ) -> AgentStepResult: + """Log invalid final JSON and either retry or return review metadata.""" + invalid_final_count += 1 + write_agent_log( + self._log_path, + "final_validation_error", + error=str(error), + invalid_final_count=invalid_final_count, + ) + if invalid_final_count > self._config.invalid_final_retries: + return AgentStepResult( + metadata=review_metadata(str(error), self._config), + invalid_final_count=invalid_final_count, + should_continue=False, + ) + messages.append( + { + "role": "user", + "content": ( + "Your previous final answer was invalid. Return only valid JSON matching the required " + f"schema. Validation error: {error}" + ), + }, + ) + return AgentStepResult(metadata=None, invalid_final_count=invalid_final_count, should_continue=True) + + def force_final_response(self, messages: list[dict[str, object]]) -> StandardBookMetadata: + """Request a no-tool final answer after the normal turn limit.""" + messages.append({"role": "user", "content": forced_final_prompt()}) + write_agent_log(self._log_path, "forced_final_request", reason="max_turns") + data = self.chat(messages, self._config.max_agent_turns + 1, tools_enabled=False) + message = data.get("message") + if not isinstance(message, dict): + return review_metadata("Ollama forced final response did not include a message", self._config) + content = message.get("content") + if not isinstance(content, str): + return review_metadata("Ollama forced final response did not include string content", self._config) + try: + resolved = self.validate_final(parse_final_json_content(content)) + except (json.JSONDecodeError, MetadataResolutionError) as error: + return review_metadata(f"Ollama forced final response was invalid: {error}", self._config) + write_agent_log(self._log_path, "final_metadata", metadata=resolved) + return resolved + + def chat(self, messages: list[dict[str, object]], turn: int, *, tools_enabled: bool = True) -> dict[str, object]: + """Send one chat request to Ollama and log the request and response.""" + payload = { + "model": self._config.model, + "messages": messages, + "stream": False, + "options": {"temperature": 0}, + } + tool_names = [] + if tools_enabled: + payload["tools"] = self._registry.tool_schemas() + tool_names = self._config.tool_names + write_agent_log( + self._log_path, + "model_request", + model=self._config.model, + turn=turn, + message_count=len(messages), + tool_names=tool_names, + tools_enabled=tools_enabled, + ) + write_agent_log( + self._log_path, + "llm_messages_sent", + model=self._config.model, + turn=turn, + messages=messages, + tools_enabled=tools_enabled, + ) + response = httpx.post( + self._config.ollama_chat_url, + headers={"Authorization": f"Bearer {self._ollama_api_key}"}, + json=payload, + timeout=self._config.http_timeout_seconds, + ) + response.raise_for_status() + raw_data = response.json() + if not isinstance(raw_data, dict): + return {} + data = {str(key): value for key, value in raw_data.items()} + message = data.get("message", {}) + content = message.get("content") if isinstance(message, dict) else "" + write_agent_log( + self._log_path, + "llm_message_received", + model=self._config.model, + turn=turn, + message=message, + ) + write_agent_log( + self._log_path, + "model_response", + model=self._config.model, + turn=turn, + has_tool_calls=bool(isinstance(message, dict) and message.get("tool_calls")), + content_chars=len(content) if isinstance(content, str) else 0, + ) + return data + + def validate_final(self, raw_metadata: object) -> StandardBookMetadata: + """Validate final model metadata against catalog rows.""" + fields = parse_final_metadata_fields(raw_metadata) + fields = replace(fields, title=normalize_title_slug(fields.title)) + author = self.validate_author(fields.author_id) + validate_title_slug(fields.title) + book_fields = self.resolve_book_fields(fields) + series = self.validate_series(fields.author_id, book_fields.series_id, book_fields.series_index) + + return StandardBookMetadata( + author_id=fields.author_id, + author=author.name, + book_id=book_fields.book_id, + title=book_fields.title, + series_id=book_fields.series_id, + series=series, + series_index=book_fields.series_index, + confidence=fields.confidence, + needs_review=fields.confidence < self._config.min_confidence, + evidence=fields.evidence, + ) + + def validate_author(self, author_id: int) -> AudiobookAuthor: + """Validate that an author id was seen and exists.""" + if author_id not in self._registry.seen_author_ids: + msg = f"author_id {author_id} was not returned by search_authors" + raise MetadataResolutionError(msg) + author = self._registry.get_author(author_id) + if author is None: + msg = f"author_id {author_id} does not exist" + raise MetadataResolutionError(msg) + validate_catalog_slug(author.name, "author") + return author + + def resolve_book_fields(self, fields: FinalMetadataFields) -> ResolvedBookFields: + """Resolve final book fields from a seen book id or created book.""" + if fields.book_id is None: + ensured = self._registry.ensure_book( + fields.title, + fields.author_id, + fields.series_id, + fields.series_index, + ) + return ResolvedBookFields( + book_id=ensured.book.id, + title=ensured.book.title, + series_id=ensured.book.series_id, + series_index=ensured.book.series_index, + ) + + if fields.book_id not in self._registry.seen_book_ids: + msg = f"book_id {fields.book_id} was not returned by search_books" + raise MetadataResolutionError(msg) + book = self._registry.get_book(fields.book_id) + if book is None: + msg = f"book_id {fields.book_id} does not exist" + raise MetadataResolutionError(msg) + if book.author_id != fields.author_id: + msg = f"book_id {fields.book_id} does not belong to author_id {fields.author_id}" + raise MetadataResolutionError(msg) + return ResolvedBookFields( + book_id=fields.book_id, + title=book.title, + series_id=book.series_id, + series_index=book.series_index, + ) + + def validate_series(self, author_id: int, series_id: int | None, series_index: int) -> str: + """Validate final series fields and return the canonical series slug.""" + if series_id is None: + if series_index != 0: + msg = "standalone books must use series_index 0" + raise MetadataResolutionError(msg) + return self._config.standalone_series + + if series_id not in self._registry.seen_series_ids: + msg = f"series_id {series_id} was not returned by search_series" + raise MetadataResolutionError(msg) + series = self._registry.get_series(series_id) + if series is None: + msg = f"series_id {series_id} does not exist" + raise MetadataResolutionError(msg) + if series.author_id != author_id: + msg = f"series_id {series_id} does not belong to author_id {author_id}" + raise MetadataResolutionError(msg) + if series_index <= 0: + msg = "series books must use a positive series_index" + raise MetadataResolutionError(msg) + validate_catalog_slug(series.name, "series") + return series.name + + +def write_agent_log(log_path: Path, event: str, **fields: object) -> None: + """Append one JSONL audit event.""" + log_path.parent.mkdir(parents=True, exist_ok=True) + record = { + "created": utcnow().isoformat(), + "event": event, + **{key: json_log_value(value) for key, value in fields.items()}, + } + with log_path.open("a", encoding="utf-8") as file: + file.write(json.dumps(record, sort_keys=True)) + file.write("\n") + + +def json_log_value(value: object) -> object: + """Return a JSON-serializable value for audit logs.""" + if is_dataclass(value) and not isinstance(value, type): + return json_log_value(asdict(value)) + if isinstance(value, dict): + return {str(key): json_log_value(item) for key, item in value.items()} + if isinstance(value, list | tuple): + return [json_log_value(item) for item in value] + if isinstance(value, set): + return [json_log_value(item) for item in sorted(value, key=str)] + if isinstance(value, PathLike): + return str(value) + return value + + +def system_prompt() -> str: + """Return the stable system prompt.""" + return """You standardize Audible audiobook metadata against a private catalog. + +Rules: +- You must use the provided tools before returning final metadata. +- Only use author_id, series_id, or book_id values returned by tools. +- Return final metadata as JSON only. Do not wrap it in Markdown. +- The final JSON object must contain author_id, book_id, title, series_id, series_index, confidence, and evidence. +- title must be a canonical title slug using lower-case words separated by hyphens. +- Use series_id null and series_index 0 for standalone books. +- If you use a series_id, series_index must be an integer greater than or equal to 1. +- Do not create publisher collections or author collections as series unless the book metadata clearly gives a + numbered series. +- Series belong to authors. Use a series_id only when it belongs to the selected author_id. +- Always search for the author before creating one. If no exact author slug exists, call ensure_author. +- Always search for a series with author_id before creating one. If no exact series slug exists, call ensure_series. +- Always search for a book before creating one. If no exact title slug exists, call ensure_book. +- If a tool returns an error, correct your tool arguments or final metadata before continuing. +- confidence must be a number from 0 to 1. +- evidence must be a short list of strings explaining which filename, tags, and catalog rows support the answer.""" + + +def forced_final_prompt() -> str: + """Return the no-tools finalization prompt.""" + return ( + "Stop calling tools. Return final metadata as JSON only using the tool results already provided. " + "If search_books returned no matching rows but author and series are known, use book_id null and resolve " + "the title slug from the AAX filename and ffprobe tags. The validator will create the missing book. " + "Use only author_id and series_id values returned by earlier tool results." + ) + + +def user_prompt(aax_file_name: str, metadata: dict[str, str]) -> str: + """Build the user prompt from source metadata.""" + return ( + "Resolve this Audible audiobook.\n\n" + f"AAX file name: {aax_file_name}\n\n" + "ffprobe format tags:\n" + f"{json.dumps(metadata, indent=2, sort_keys=True)}" + ) + + +def parse_tool_calls(message: dict[str, object]) -> list[tuple[str, dict[str, object]]]: + """Parse Ollama tool calls from a response message.""" + raw_tool_calls = message.get("tool_calls") or [] + if not isinstance(raw_tool_calls, list): + msg = "tool_calls must be a list" + raise MetadataResolutionError(msg) + + tool_calls = [] + for raw_call in raw_tool_calls: + if not isinstance(raw_call, dict): + msg = "tool call must be an object" + raise MetadataResolutionError(msg) + function = raw_call.get("function") + if not isinstance(function, dict): + msg = "tool call is missing function" + raise MetadataResolutionError(msg) + name = function.get("name") + if not isinstance(name, str) or not name: + msg = "tool call is missing function name" + raise MetadataResolutionError(msg) + arguments = parse_tool_arguments(function.get("arguments", {})) + tool_calls.append((name, arguments)) + return tool_calls + + +def parse_tool_arguments(raw_arguments: object) -> dict[str, object]: + """Parse tool call arguments returned by Ollama.""" + if isinstance(raw_arguments, dict): + return {str(key): value for key, value in raw_arguments.items()} + if isinstance(raw_arguments, str): + parsed = json.loads(raw_arguments) if raw_arguments else {} + if isinstance(parsed, dict): + return {str(key): value for key, value in parsed.items()} + msg = "tool arguments must be an object" + raise MetadataResolutionError(msg) + + +def parse_final_json_content(content: str) -> object: + """Parse final model content, accepting bare or fenced JSON.""" + stripped = content.strip() + if match := FENCED_JSON_PATTERN.fullmatch(stripped): + stripped = match.group("json").strip() + return json.loads(stripped) + + +def parse_final_metadata_fields(raw_metadata: object) -> FinalMetadataFields: + """Parse the model's final JSON object into typed fields.""" + if not isinstance(raw_metadata, dict): + msg = "Final metadata must be a JSON object" + raise MetadataResolutionError(msg) + data = {str(key): value for key, value in raw_metadata.items()} + return FinalMetadataFields( + author_id=required_int(data, "author_id"), + book_id=optional_int(data.get("book_id"), "book_id"), + title=required_string(data, "title"), + series_id=optional_int(data.get("series_id"), "series_id"), + series_index=required_int(data, "series_index"), + confidence=required_float(data, "confidence"), + evidence=required_string_list(data, "evidence"), + ) + + +def validate_title_slug(title: str) -> None: + """Validate a canonical book title slug.""" + if not TITLE_SLUG_PATTERN.fullmatch(title): + msg = f"title slug is invalid: {title}" + raise MetadataResolutionError(msg) + + +def validate_catalog_slug(value: str, label: str) -> None: + """Validate a canonical catalog slug.""" + if not CATALOG_SLUG_PATTERN.fullmatch(value): + msg = f"{label} slug is invalid: {value}" + raise MetadataResolutionError(msg) + + +def normalize_catalog_slug(value: str) -> str: + """Normalize noisy catalog names into lower snake-case slugs.""" + return re.sub(r"[^a-z0-9]+", "_", value.strip().casefold()).strip("_") + + +def normalize_title_slug(value: str) -> str: + """Normalize noisy book titles into lower kebab-case slugs.""" + return re.sub(r"[^a-z0-9]+", "-", value.strip().casefold()).strip("-") + + +def is_fatal_tool_error(error: MetadataResolutionError) -> bool: + """Return whether a tool error should stop the agent immediately.""" + message = str(error) + return message.startswith( + ( + "Unknown audiobook metadata tool", + "Audiobook metadata tool is not enabled", + ), + ) + + +def review_metadata(reason: str, config: AgentConfig) -> StandardBookMetadata: + """Return a metadata result that must be reviewed manually.""" + return StandardBookMetadata( + author_id=0, + author="unknown_author", + book_id=None, + title="unknown-title", + series_id=None, + series=config.standalone_series, + series_index=0, + confidence=0, + needs_review=True, + evidence=[reason], + ) + + +def query_terms(query: str) -> tuple[str, ...]: + """Return text variants useful for matching noisy audiobook metadata.""" + normalized = query.strip().casefold() + underscore_slug = normalize_catalog_slug(normalized) + hyphen_slug = normalize_title_slug(normalized) + return tuple(dict.fromkeys(term for term in (normalized, underscore_slug, hyphen_slug) if term)) + + +def required_string(data: dict[str, object], key: str) -> str: + """Read a required string field.""" + value = data.get(key) + if not isinstance(value, str) or not value.strip(): + msg = f"{key} must be a non-empty string" + raise MetadataResolutionError(msg) + return value.strip() + + +def required_int(data: dict[str, object], key: str) -> int: + """Read a required integer field.""" + value = data.get(key) + if isinstance(value, bool) or not isinstance(value, int): + msg = f"{key} must be an integer" + raise MetadataResolutionError(msg) + return value + + +def optional_int(value: object, key: str) -> int | None: + """Read an optional integer field.""" + if value is None: + return None + if isinstance(value, bool) or not isinstance(value, int): + msg = f"{key} must be an integer or null" + raise MetadataResolutionError(msg) + return value + + +def required_float(data: dict[str, object], key: str) -> float: + """Read a required float field.""" + value = data.get(key) + if isinstance(value, bool) or not isinstance(value, int | float): + msg = f"{key} must be a number" + raise MetadataResolutionError(msg) + confidence = float(value) + if confidence < 0 or confidence > 1: + msg = f"{key} must be between 0 and 1" + raise MetadataResolutionError(msg) + return confidence + + +def required_string_list(data: dict[str, object], key: str) -> list[str]: + """Read a required list of strings.""" + value = data.get(key) + if not isinstance(value, list) or not value or not all(isinstance(item, str) for item in value): + msg = f"{key} must be a non-empty list of strings" + raise MetadataResolutionError(msg) + strings = [item.strip() for item in value if item.strip()] + if not strings: + msg = f"{key} must include at least one non-empty string" + raise MetadataResolutionError(msg) + return strings diff --git a/tests/test_audible_convert.py b/tests/test_audible_convert.py new file mode 100644 index 0000000..3242e6c --- /dev/null +++ b/tests/test_audible_convert.py @@ -0,0 +1,969 @@ +"""test_audible_convert.""" + +from __future__ import annotations + +import json +import subprocess + +import pytest +from sqlalchemy import create_engine +from sqlalchemy.orm import Session, sessionmaker + +from python.orm.richie import Audiobook, AudiobookAuthor, AudiobookSeries, RichieBase +from python.tools.audiobook import audible_convert, metadata_agent +from python.tools.audiobook.metadata_agent import StandardBookMetadata, standard_book_metadata + + +class FakeOllamaResponse: + def __init__(self, payload): + self._payload = payload + + def raise_for_status(self): + return None + + def json(self): + return self._payload + + +class FakeFfprobeError(RuntimeError): + def __str__(self): + return "bad ffprobe" + + +@pytest.fixture +def audiobook_engine(): + engine = create_engine("sqlite+pysqlite:///:memory:", future=True) + RichieBase.metadata.create_all(engine) + with sessionmaker(bind=engine, expire_on_commit=False, future=True)() as session: + session.add_all( + [ + AudiobookAuthor(id=1, name="glynn_stewart"), + AudiobookAuthor(id=2, name="craig_alanson"), + AudiobookAuthor(id=4, name="dennis_e_taylor"), + AudiobookSeries(id=1, name="starships_mage", author_id=1), + AudiobookSeries(id=2, name="black_fleet_trilogy", author_id=1), + AudiobookSeries(id=3, name="expeditionary_force", author_id=2), + AudiobookSeries(id=4, name="bobiverse", author_id=4), + ], + ) + session.commit() + yield engine + engine.dispose() + + +def install_fake_ollama(monkeypatch, payloads): + calls = [] + + def fake_post(*args, **kwargs): + calls.append((args, kwargs)) + return FakeOllamaResponse(payloads.pop(0)) + + monkeypatch.setattr(metadata_agent.httpx, "post", fake_post) + return calls + + +def conversion_config(output_directory, *, dry_run=False, overwrite=False): + return audible_convert.ConversionConfig( + resolved_output=output_directory, + ollama_api_key="test-key", + agent_config=metadata_agent.AgentConfig(), + engine=create_engine("sqlite+pysqlite:///:memory:"), + activation_bytes=None, + dry_run=dry_run, + overwrite=overwrite, + ) + + +def sqlite_engine(): + return create_engine("sqlite+pysqlite:///:memory:") + + +def tool_response(name, arguments): + return { + "message": { + "role": "assistant", + "content": "", + "tool_calls": [{"function": {"name": name, "arguments": arguments}}], + }, + } + + +def final_response(metadata): + return {"message": {"role": "assistant", "content": json.dumps(metadata)}} + + +def fenced_final_response(metadata): + return {"message": {"role": "assistant", "content": f"```json\n{json.dumps(metadata)}\n```"}} + + +def test_output_stem_uses_catalog_slugs() -> None: + metadata = StandardBookMetadata( + author_id=1, + author="glynn_stewart", + book_id=None, + title="title-slug", + series_id=1, + series="starships_mage", + series_index=1, + confidence=0.96, + needs_review=False, + evidence=["test"], + ) + + assert audible_convert.output_stem(metadata) == "glynn_stewart-starships_mage_01-title-slug" + + +def test_convert_aax_file_runs_ffmpeg(tmp_path, monkeypatch) -> None: + """test_convert_aax_file_runs_ffmpeg.""" + commands = [] + + def fake_run_command(arguments, *, capture=False): + assert capture is False + commands.append(arguments) + return subprocess.CompletedProcess(arguments, 0, "", "") + + source = tmp_path / "book.aax" + destination = tmp_path / "book" / "book.m4b" + monkeypatch.setattr(audible_convert, "run_command", fake_run_command) + + audible_convert.convert_aax_file(source, destination, "abc123", overwrite=False) + + assert commands == [ + [ + "ffmpeg", + "-hide_banner", + "-n", + "-activation_bytes", + "abc123", + "-i", + str(source), + "-map_metadata", + "0", + "-c", + "copy", + str(destination), + ], + ] + assert destination.parent.is_dir() + + +def test_run_command_redacts_activation_bytes_in_logs_and_errors(monkeypatch, caplog) -> None: + def fake_run(arguments, *, check, capture_output, text): + assert check is True + assert capture_output is False + assert text is True + raise subprocess.CalledProcessError(1, arguments) + + monkeypatch.setattr(audible_convert.subprocess, "run", fake_run) + caplog.set_level("DEBUG", audible_convert.__name__) + + with pytest.raises(audible_convert.CommandExecutionError) as error: + audible_convert.run_command(["ffmpeg", "-activation_bytes", "secret-token", "-i", "book.aax"]) + + assert "secret-token" not in caplog.text + assert "secret-token" not in str(error.value) + assert "" in caplog.text + assert "" in str(error.value) + + +def test_write_agent_log_serializes_metadata_as_json_object(tmp_path) -> None: + metadata = StandardBookMetadata( + author_id=1, + author="glynn_stewart", + book_id=None, + title="starship-mage", + series_id=1, + series="starships_mage", + series_index=1, + confidence=0.95, + needs_review=False, + evidence=["test"], + ) + log_file = tmp_path / "agent.jsonl" + + metadata_agent.write_agent_log(log_file, "final_metadata", metadata=metadata, path=tmp_path) + + record = json.loads(log_file.read_text(encoding="utf-8")) + assert record["event"] == "final_metadata" + assert record["metadata"]["author"] == "glynn_stewart" + assert record["metadata"]["title"] == "starship-mage" + assert record["path"] == str(tmp_path) + + +def test_standard_book_metadata_accepts_valid_tool_output(tmp_path, monkeypatch, audiobook_engine) -> None: + install_fake_ollama( + monkeypatch, + [ + tool_response("search_authors", {"query": "Glynn Stewart"}), + tool_response("search_series", {"query": "starships_mage"}), + final_response( + { + "author_id": 1, + "book_id": None, + "title": "starship-mage", + "series_id": 1, + "series_index": 1, + "confidence": 0.95, + "evidence": ["filename and catalog match"], + }, + ), + ], + ) + + metadata = standard_book_metadata( + "Starship Mage.aax", + {"title": "Starship Mage", "artist": "Glynn Stewart"}, + audiobook_engine, + tmp_path / "agent.jsonl", + "test-key", + config=metadata_agent.AgentConfig(), + ) + + assert metadata == StandardBookMetadata( + author_id=1, + author="glynn_stewart", + book_id=1, + title="starship-mage", + series_id=1, + series="starships_mage", + series_index=1, + confidence=0.95, + needs_review=False, + evidence=["filename and catalog match"], + ) + records = [ + json.loads(line) + for line in (tmp_path / "agent.jsonl").read_text(encoding="utf-8").splitlines() + ] + sent = [record for record in records if record["event"] == "llm_messages_sent"] + received = [record for record in records if record["event"] == "llm_message_received"] + assert sent[0]["messages"][0]["role"] == "system" + assert "Starship Mage" in sent[0]["messages"][1]["content"] + assert received[0]["message"]["tool_calls"][0]["function"]["name"] == "search_authors" + with Session(audiobook_engine) as session: + book = session.get(Audiobook, 1) + assert book.title == "starship-mage" + assert book.author.name == "glynn_stewart" + + +def test_standard_book_metadata_uses_agent_config(tmp_path, monkeypatch, audiobook_engine) -> None: + config = metadata_agent.AgentConfig( + model="custom-model", + ollama_chat_url="https://ollama.example.test/api/chat", + http_timeout_seconds=12, + max_agent_turns=1, + min_confidence=0.5, + tool_names=("search_authors",), + ) + calls = install_fake_ollama( + monkeypatch, + [ + tool_response("search_authors", {"query": "Glynn Stewart"}), + final_response( + { + "author_id": 1, + "book_id": None, + "title": "standalone-book", + "series_id": None, + "series_index": 0, + "confidence": 0.5, + "evidence": ["custom config"], + }, + ), + ], + ) + + metadata = standard_book_metadata( + "Standalone Book.aax", + {"title": "Standalone Book", "artist": "Glynn Stewart"}, + audiobook_engine, + tmp_path / "agent.jsonl", + "test-key", + config=config, + ) + + first_request_url = calls[0][0][0] + first_request_options = calls[0][1] + tool_names = [ + tool_schema["function"]["name"] + for tool_schema in first_request_options["json"]["tools"] + ] + assert first_request_url == "https://ollama.example.test/api/chat" + assert first_request_options["timeout"] == 12 + assert first_request_options["json"]["model"] == "custom-model" + assert tool_names == ["search_authors"] + assert metadata.needs_review is False + assert metadata.series == "standalone" + + +def test_standard_book_metadata_retries_invalid_json_then_needs_review( + tmp_path, + monkeypatch, + audiobook_engine, +) -> None: + install_fake_ollama( + monkeypatch, + [ + tool_response("search_authors", {"query": "Glynn Stewart"}), + tool_response("search_series", {"query": "Starship Mage"}), + {"message": {"role": "assistant", "content": "{"}}, + {"message": {"role": "assistant", "content": "{"}}, + ], + ) + + metadata = standard_book_metadata( + "Starship Mage.aax", + {"title": "Starship Mage"}, + audiobook_engine, + tmp_path / "agent.jsonl", + "test-key", + config=metadata_agent.AgentConfig(), + ) + + assert metadata.needs_review is True + assert metadata.confidence == 0 + + +def test_standard_book_metadata_accepts_fenced_final_json( + tmp_path, + monkeypatch, + audiobook_engine, +) -> None: + install_fake_ollama( + monkeypatch, + [ + tool_response("search_authors", {"query": "Dennis E. Taylor"}), + tool_response("search_series", {"query": "Bobiverse", "author_id": 4}), + tool_response("search_books", {"query": "All These Worlds", "author_id": 4, "series_id": 4}), + fenced_final_response( + { + "author_id": 4, + "book_id": None, + "title": "all-these-worlds", + "series_id": 4, + "series_index": 3, + "confidence": 0.95, + "evidence": ["fenced json from model"], + }, + ), + ], + ) + + metadata = standard_book_metadata( + "All These Worlds.aax", + {"title": "All These Worlds: Bobiverse, Book 3", "artist": "Dennis E. Taylor"}, + audiobook_engine, + tmp_path / "agent.jsonl", + "test-key", + config=metadata_agent.AgentConfig(), + ) + + assert metadata.needs_review is False + assert metadata.author == "dennis_e_taylor" + assert metadata.series == "bobiverse" + assert metadata.title == "all-these-worlds" + + +def test_standard_book_metadata_recovers_from_tool_validation_error( + tmp_path, + monkeypatch, + audiobook_engine, +) -> None: + install_fake_ollama( + monkeypatch, + [ + tool_response("search_authors", {"query": "Cormac McCarthy"}), + tool_response("ensure_author", {"name": "Cormac McCarthy"}), + tool_response("ensure_series", {"name": "The Cormac McCarthy Collection", "author_id": 5}), + tool_response( + "ensure_book", + { + "title": "The Road", + "author_id": 5, + "series_id": 5, + "series_index": 0, + }, + ), + final_response( + { + "author_id": 5, + "book_id": None, + "title": "The Road", + "series_id": None, + "series_index": 0, + "confidence": 0.9, + "evidence": ["tool error showed this should be standalone"], + }, + ), + ], + ) + log_file = tmp_path / "agent.jsonl" + + metadata = standard_book_metadata( + "The Road.aax", + {"title": "The Road", "artist": "Cormac McCarthy"}, + audiobook_engine, + log_file, + "test-key", + config=metadata_agent.AgentConfig(), + ) + + assert metadata == StandardBookMetadata( + author_id=5, + author="cormac_mccarthy", + book_id=1, + title="the-road", + series_id=None, + series="standalone", + series_index=0, + confidence=0.9, + needs_review=False, + evidence=["tool error showed this should be standalone"], + ) + assert "series books must use a positive series_index" in log_file.read_text(encoding="utf-8") + with Session(audiobook_engine) as session: + assert session.get(AudiobookSeries, 5) is None + book = session.get(Audiobook, 1) + assert book.title == "the-road" + assert book.series_id is None + + +def test_standard_book_metadata_rejects_unknown_tool(tmp_path, monkeypatch, audiobook_engine) -> None: + log_file = tmp_path / "agent.jsonl" + install_fake_ollama(monkeypatch, [tool_response("drop_table", {})]) + + metadata = standard_book_metadata( + "Book.aax", + {"title": "Book"}, + audiobook_engine, + log_file, + "test-key", + config=metadata_agent.AgentConfig(), + ) + + assert metadata.needs_review is True + assert "Unknown audiobook metadata tool" in metadata.evidence[0] + assert "tool_error" in log_file.read_text(encoding="utf-8") + + +def test_standard_book_metadata_rejects_ids_not_returned_by_tools( + tmp_path, + monkeypatch, + audiobook_engine, +) -> None: + install_fake_ollama( + monkeypatch, + [ + tool_response("search_authors", {"query": "Glynn Stewart"}), + tool_response("search_series", {"query": "Starship Mage"}), + final_response( + { + "author_id": 2, + "book_id": None, + "title": "expeditionary-force", + "series_id": 1, + "series_index": 1, + "confidence": 0.99, + "evidence": ["bad id"], + }, + ), + final_response( + { + "author_id": 2, + "book_id": None, + "title": "expeditionary-force", + "series_id": 1, + "series_index": 1, + "confidence": 0.99, + "evidence": ["bad id"], + }, + ), + ], + ) + + metadata = standard_book_metadata( + "Book.aax", + {"title": "Book"}, + audiobook_engine, + tmp_path / "agent.jsonl", + "test-key", + config=metadata_agent.AgentConfig(), + ) + + assert metadata.needs_review is True + assert "author_id 2 was not returned" in metadata.evidence[0] + + +def test_standard_book_metadata_rejects_series_for_wrong_author( + tmp_path, + monkeypatch, + audiobook_engine, +) -> None: + install_fake_ollama( + monkeypatch, + [ + tool_response("search_authors", {"query": "Glynn Stewart"}), + tool_response("search_series", {"query": "expeditionary_force"}), + final_response( + { + "author_id": 1, + "book_id": None, + "title": "expeditionary-force", + "series_id": 3, + "series_index": 1, + "confidence": 0.99, + "evidence": ["wrong author"], + }, + ), + final_response( + { + "author_id": 1, + "book_id": None, + "title": "expeditionary-force", + "series_id": 3, + "series_index": 1, + "confidence": 0.99, + "evidence": ["wrong author"], + }, + ), + ], + ) + + metadata = standard_book_metadata( + "Book.aax", + {"title": "Book"}, + audiobook_engine, + tmp_path / "agent.jsonl", + "test-key", + config=metadata_agent.AgentConfig(), + ) + + assert metadata.needs_review is True + assert "series_id 3 does not belong to author_id 1" in metadata.evidence[0] + + +def test_standard_book_metadata_forces_final_after_empty_book_searches( + tmp_path, + monkeypatch, + audiobook_engine, +) -> None: + config = metadata_agent.AgentConfig(max_agent_turns=5) + install_fake_ollama( + monkeypatch, + [ + tool_response("search_authors", {"query": "Dennis E. Taylor"}), + tool_response("search_series", {"query": "Bobiverse", "author_id": 4}), + tool_response("search_books", {"query": "We Are Legion We Are Bob", "author_id": 4, "series_id": 4}), + tool_response("search_books", {"query": "we are legion", "author_id": 4}), + tool_response("search_books", {"query": "We Are Legion"}), + final_response( + { + "author_id": 4, + "book_id": None, + "title": "we-are-legion-we-are-bob", + "series_id": 4, + "series_index": 1, + "confidence": 0.95, + "evidence": ["author and series tool results; title from ffprobe tags"], + }, + ), + ], + ) + + metadata = standard_book_metadata( + "We_Are_Legion_(We_Are_Bob)_Bobiverse_Book_1-LC_128_44100_stereo.aax", + { + "album": "We Are Legion (We Are Bob): Bobiverse, Book 1", + "artist": "Dennis E. Taylor", + "title": "We Are Legion (We Are Bob): Bobiverse, Book 1", + }, + audiobook_engine, + tmp_path / "agent.jsonl", + "test-key", + config=config, + ) + + assert metadata == StandardBookMetadata( + author_id=4, + author="dennis_e_taylor", + book_id=1, + title="we-are-legion-we-are-bob", + series_id=4, + series="bobiverse", + series_index=1, + confidence=0.95, + needs_review=False, + evidence=["author and series tool results; title from ffprobe tags"], + ) + assert '"tools_enabled": false' in (tmp_path / "agent.jsonl").read_text(encoding="utf-8") + + +def test_standard_book_metadata_can_create_missing_catalog_rows( + tmp_path, + monkeypatch, + audiobook_engine, +) -> None: + install_fake_ollama( + monkeypatch, + [ + tool_response("search_authors", {"query": "Martha Wells"}), + tool_response("ensure_author", {"name": "martha_wells"}), + tool_response("search_series", {"query": "Murderbot Diaries", "author_id": 5}), + tool_response("ensure_series", {"name": "murderbot_diaries", "author_id": 5}), + tool_response("search_books", {"query": "All Systems Red", "author_id": 5, "series_id": 5}), + final_response( + { + "author_id": 5, + "book_id": None, + "title": "all-systems-red", + "series_id": 5, + "series_index": 1, + "confidence": 0.96, + "evidence": ["created missing author and series; title from tags"], + }, + ), + ], + ) + + metadata = standard_book_metadata( + "All Systems Red.aax", + {"title": "All Systems Red", "artist": "Martha Wells"}, + audiobook_engine, + tmp_path / "agent.jsonl", + "test-key", + config=metadata_agent.AgentConfig(), + ) + + assert metadata == StandardBookMetadata( + author_id=5, + author="martha_wells", + book_id=1, + title="all-systems-red", + series_id=5, + series="murderbot_diaries", + series_index=1, + confidence=0.96, + needs_review=False, + evidence=["created missing author and series; title from tags"], + ) + with Session(audiobook_engine) as session: + author = session.get(AudiobookAuthor, 5) + series = session.get(AudiobookSeries, 5) + book = session.get(Audiobook, 1) + assert author.name == "martha_wells" + assert series.name == "murderbot_diaries" + assert series.author_id == author.id + assert book.title == "all-systems-red" + assert book.author_id == author.id + assert book.series_id == series.id + + +def test_standard_book_metadata_normalizes_noisy_created_catalog_rows( + tmp_path, + monkeypatch, + audiobook_engine, +) -> None: + install_fake_ollama( + monkeypatch, + [ + tool_response("search_authors", {"query": "Charles Lamb"}), + tool_response("ensure_author", {"name": "charles-lamb"}), + tool_response("search_series", {"query": "AL:ICE Series", "author_id": 5}), + tool_response("ensure_series", {"name": "AL:ICE Series", "author_id": 5}), + tool_response("search_books", {"query": "AL:ICE Space War", "author_id": 5, "series_id": 5}), + final_response( + { + "author_id": 5, + "book_id": None, + "title": "AL:ICE Space War", + "series_id": 5, + "series_index": 4, + "confidence": 0.95, + "evidence": ["created normalized author and series; title from tags"], + }, + ), + ], + ) + + metadata = standard_book_metadata( + "ALICE_Space_War_ALICE_Series_Book_4-LC_64_22050_stereo.aax", + { + "album": "AL:ICE Space War: AL:ICE Series, Book 4", + "artist": "Charles Lamb", + "title": "AL:ICE Space War: AL:ICE Series, Book 4", + }, + audiobook_engine, + tmp_path / "agent.jsonl", + "test-key", + config=metadata_agent.AgentConfig(), + ) + + assert metadata == StandardBookMetadata( + author_id=5, + author="charles_lamb", + book_id=1, + title="al-ice-space-war", + series_id=5, + series="al_ice_series", + series_index=4, + confidence=0.95, + needs_review=False, + evidence=["created normalized author and series; title from tags"], + ) + with Session(audiobook_engine) as session: + author = session.get(AudiobookAuthor, 5) + series = session.get(AudiobookSeries, 5) + book = session.get(Audiobook, 1) + assert author.name == "charles_lamb" + assert series.name == "al_ice_series" + assert series.author_id == author.id + assert book.title == "al-ice-space-war" + assert book.author_id == author.id + assert book.series_id == series.id + + +def test_convert_aax_file_with_agent_success_renames_temp_output(tmp_path, monkeypatch) -> None: + source = tmp_path / "book.aax" + output_directory = tmp_path / "audiobooks" + source.touch() + monkeypatch.setattr(audible_convert, "read_metadata", lambda _: {"title": "Starship Mage"}) + monkeypatch.setattr( + audible_convert, + "standard_book_metadata", + lambda *_, **__: StandardBookMetadata( + author_id=1, + author="glynn_stewart", + book_id=None, + title="starship-mage", + series_id=1, + series="starships_mage", + series_index=1, + confidence=0.95, + needs_review=False, + evidence=["test"], + ), + ) + + def fake_convert(_source, destination, _activation_bytes, *, overwrite): + assert overwrite is True + destination.parent.mkdir(parents=True, exist_ok=True) + destination.write_text("converted", encoding="utf-8") + + monkeypatch.setattr(audible_convert, "convert_aax_file", fake_convert) + + audible_convert.convert_aax_file_with_agent( + source, + conversion_config(output_directory), + ) + + expected = output_directory / "glynn_stewart-starships_mage_01-starship-mage" + destination = expected / "glynn_stewart-starships_mage_01-starship-mage.m4b" + assert destination.read_text(encoding="utf-8") == "converted" + assert not list((output_directory / ".audible_convert" / "tmp").glob("*/converted.m4b")) + + +def test_ffprobe_failure_writes_review_without_converting(tmp_path, monkeypatch) -> None: + source = tmp_path / "book.aax" + output_directory = tmp_path / "audiobooks" + source.touch() + calls = [] + + def fake_read_metadata(_source): + raise FakeFfprobeError + + def fake_convert(*args, **kwargs): + calls.append((args, kwargs)) + + monkeypatch.setattr(audible_convert, "read_metadata", fake_read_metadata) + monkeypatch.setattr(audible_convert, "convert_aax_file", fake_convert) + + audible_convert.convert_aax_file_with_agent(source, conversion_config(output_directory)) + + review_files = list((output_directory / ".audible_convert" / "review").glob("*.json")) + assert calls == [] + assert len(review_files) == 1 + review = json.loads(review_files[0].read_text(encoding="utf-8")) + assert review["ffprobe_metadata"] == {} + assert review["reason"] == "ffprobe_failed: bad ffprobe" + assert review["temp_file"] is None + + +def test_low_confidence_metadata_keeps_temp_output_for_review(tmp_path, monkeypatch) -> None: + source = tmp_path / "book.aax" + output_directory = tmp_path / "audiobooks" + source.touch() + monkeypatch.setattr(audible_convert, "read_metadata", lambda _: {"title": "Unknown"}) + monkeypatch.setattr( + audible_convert, + "standard_book_metadata", + lambda *_, **__: StandardBookMetadata( + author_id=0, + author="unknown_author", + book_id=None, + title="unknown-title", + series_id=None, + series="standalone", + series_index=0, + confidence=0.25, + needs_review=True, + evidence=["unclear"], + ), + ) + + def fake_convert(_source, destination, _activation_bytes, *, overwrite): + assert overwrite is True + destination.parent.mkdir(parents=True, exist_ok=True) + destination.write_text("converted", encoding="utf-8") + + monkeypatch.setattr(audible_convert, "convert_aax_file", fake_convert) + + audible_convert.convert_aax_file_with_agent( + source, + conversion_config(output_directory), + ) + + temp_files = list((output_directory / ".audible_convert" / "tmp").glob("*/converted.m4b")) + review_files = list((output_directory / ".audible_convert" / "review").glob("*.json")) + assert len(temp_files) == 1 + assert temp_files[0].read_text(encoding="utf-8") == "converted" + assert len(review_files) == 1 + + +def test_existing_destination_skips_rename_and_removes_temp(tmp_path, monkeypatch) -> None: + source = tmp_path / "book.aax" + output_directory = tmp_path / "audiobooks" + source.touch() + final_file = ( + output_directory + / "glynn_stewart-starships_mage_01-starship-mage" + / "glynn_stewart-starships_mage_01-starship-mage.m4b" + ) + final_file.parent.mkdir(parents=True) + final_file.write_text("existing", encoding="utf-8") + monkeypatch.setattr(audible_convert, "read_metadata", lambda _: {"title": "Starship Mage"}) + monkeypatch.setattr( + audible_convert, + "standard_book_metadata", + lambda *_, **__: StandardBookMetadata( + author_id=1, + author="glynn_stewart", + book_id=None, + title="starship-mage", + series_id=1, + series="starships_mage", + series_index=1, + confidence=0.95, + needs_review=False, + evidence=["test"], + ), + ) + + def fake_convert(_source, destination, _activation_bytes, *, overwrite): + assert overwrite is True + destination.parent.mkdir(parents=True, exist_ok=True) + destination.write_text("converted", encoding="utf-8") + + monkeypatch.setattr(audible_convert, "convert_aax_file", fake_convert) + + audible_convert.convert_aax_file_with_agent( + source, + conversion_config(output_directory), + ) + + assert final_file.read_text(encoding="utf-8") == "existing" + assert not list((output_directory / ".audible_convert" / "tmp").glob("*/converted.m4b")) + + +def test_richie_exports_audiobook_models() -> None: + from python.orm.richie import Audiobook # noqa: PLC0415 + + assert Audiobook.__tablename__ == "audiobook" + + +def test_main_dry_run_prints_outputs_without_converting(tmp_path, monkeypatch, capsys) -> None: + input_directory = tmp_path / "raw" + output_directory = tmp_path / "audiobooks" + input_directory.mkdir() + source = input_directory / "book.aax" + source.touch() + monkeypatch.setenv("OLLAMA_API_KEY", "test-key") + monkeypatch.setattr( + audible_convert, + "read_metadata", + lambda _: { + "artist": "Charles Lamb", + "title": "Alice: Alice Series #1", + }, + ) + calls = [] + + def fake_convert(*args, **kwargs): + calls.append((args, kwargs)) + + monkeypatch.setattr(audible_convert, "convert_aax_file", fake_convert) + monkeypatch.setattr( + audible_convert, + "standard_book_metadata", + lambda *_, **__: StandardBookMetadata( + author_id=1, + author="charles_lamb", + book_id=None, + title="alice", + series_id=1, + series="alice", + series_index=1, + confidence=0.95, + needs_review=False, + evidence=["test"], + ), + ) + + def fake_get_postgres_engine(*, name): + assert name == "RICHIE" + return create_engine("sqlite+pysqlite:///:memory:") + + monkeypatch.setattr(audible_convert, "get_postgres_engine", fake_get_postgres_engine) + + audible_convert.main(input_directory, output_directory, dry_run=True) + + assert calls == [] + assert capsys.readouterr().out == ( + f"{source} -> " + f"{output_directory / 'charles_lamb-alice_01-alice' / 'charles_lamb-alice_01-alice.m4b'}\n" + ) + assert (output_directory / ".audible_convert" / "logs").is_dir() + + +def test_main_reads_activation_bytes_from_env(tmp_path, monkeypatch) -> None: + input_directory = tmp_path / "raw" + output_directory = tmp_path / "audiobooks" + input_directory.mkdir() + source = input_directory / "book.aax" + source.touch() + configs = [] + + def fake_convert(_source, config): + configs.append(config) + + def fake_get_postgres_engine(*, name): + assert name == "RICHIE" + return sqlite_engine() + + monkeypatch.setenv("OLLAMA_API_KEY", "test-key") + monkeypatch.setenv("AUDIBLE_ACTIVATION_BYTES", "activation-secret") + monkeypatch.setattr(audible_convert, "get_postgres_engine", fake_get_postgres_engine) + monkeypatch.setattr(audible_convert, "convert_aax_file_with_agent", fake_convert) + + audible_convert.main(input_directory, output_directory) + + assert configs == [ + audible_convert.ConversionConfig( + resolved_output=output_directory, + ollama_api_key="test-key", + agent_config=configs[0].agent_config, + engine=configs[0].engine, + activation_bytes="activation-secret", + dry_run=False, + overwrite=False, + ), + ] diff --git a/tests/test_audiobook_catalog.py b/tests/test_audiobook_catalog.py new file mode 100644 index 0000000..0ba4630 --- /dev/null +++ b/tests/test_audiobook_catalog.py @@ -0,0 +1,126 @@ +"""test_audiobook_catalog.""" + +from __future__ import annotations + +import pytest +from sqlalchemy import create_engine, select +from sqlalchemy.orm import sessionmaker + +from python.orm.richie import AudiobookAuthor, AudiobookSeries, RichieBase +from python.tools.audiobook import catalog + + +@pytest.fixture +def audiobook_session(): + engine = create_engine("sqlite+pysqlite:///:memory:", future=True) + RichieBase.metadata.create_all(engine) + with sessionmaker(bind=engine, expire_on_commit=False, future=True)() as session: + yield session + engine.dispose() + + +def test_upsert_catalog_csv_inserts_and_updates_authors_and_series(tmp_path, audiobook_session) -> None: + audiobook_session.add_all( + [ + AudiobookAuthor(id=10, name="old_author"), + AudiobookAuthor(id=11, name="craig_alanson"), + ], + ) + audiobook_session.commit() + authors_csv = tmp_path / "authors.csv" + series_csv = tmp_path / "series.csv" + authors_csv.write_text( + "name,id\n" + "glynn_stewart,\n" + "craig_alanson,\n" + "updated_author,10\n", + encoding="utf-8", + ) + series_csv.write_text( + "name,author_name,id\n" + "starships_mage,glynn_stewart,\n" + "expeditionary_force,craig_alanson,\n", + encoding="utf-8", + ) + + author_count = catalog.upsert_authors_from_csv(audiobook_session, authors_csv) + series_count = catalog.upsert_series_from_csv(audiobook_session, series_csv) + audiobook_session.commit() + + authors = audiobook_session.scalars(select(AudiobookAuthor).order_by(AudiobookAuthor.id)).all() + series = audiobook_session.scalars(select(AudiobookSeries).order_by(AudiobookSeries.name)).all() + assert author_count == 3 + assert series_count == 2 + assert [(author.id, author.name) for author in authors] == [ + (10, "updated_author"), + (11, "craig_alanson"), + (12, "glynn_stewart"), + ] + assert [(row.name, row.author.name) for row in series] == [ + ("expeditionary_force", "craig_alanson"), + ("starships_mage", "glynn_stewart"), + ] + + +def test_upsert_series_csv_updates_series_by_id(tmp_path, audiobook_session) -> None: + author = AudiobookAuthor(id=1, name="glynn_stewart") + audiobook_session.add_all( + [ + author, + AudiobookSeries(id=7, name="old_series", author=author), + ], + ) + audiobook_session.commit() + series_csv = tmp_path / "series.csv" + series_csv.write_text( + "name,author_name,id\n" + "starships_mage,glynn_stewart,7\n", + encoding="utf-8", + ) + + count = catalog.upsert_series_from_csv(audiobook_session, series_csv) + audiobook_session.commit() + + series = audiobook_session.get(AudiobookSeries, 7) + assert count == 1 + assert series.name == "starships_mage" + assert series.author.name == "glynn_stewart" + + +def test_upsert_csv_allows_missing_id_column(tmp_path, audiobook_session) -> None: + authors_csv = tmp_path / "authors.csv" + series_csv = tmp_path / "series.csv" + authors_csv.write_text( + "name\n" + "glynn_stewart\n", + encoding="utf-8", + ) + series_csv.write_text( + "name,author_name\n" + "starships_mage,glynn_stewart\n", + encoding="utf-8", + ) + + author_count = catalog.upsert_authors_from_csv(audiobook_session, authors_csv) + series_count = catalog.upsert_series_from_csv(audiobook_session, series_csv) + audiobook_session.commit() + + series = audiobook_session.scalar(select(AudiobookSeries)) + assert author_count == 1 + assert series_count == 1 + assert series.name == "starships_mage" + assert series.author.name == "glynn_stewart" + + +def test_upsert_series_csv_rejects_unknown_author(tmp_path, audiobook_session) -> None: + series_csv = tmp_path / "series.csv" + series_csv.write_text( + "name,author_name,id\n" + "starships_mage,glynn_stewart,\n", + encoding="utf-8", + ) + + with pytest.raises(catalog.CatalogImportError) as error: + catalog.upsert_series_from_csv(audiobook_session, series_csv) + + assert "author not found: glynn_stewart" in str(error.value) diff --git a/tests/test_gitea_flake_lock.py b/tests/test_gitea_flake_lock.py new file mode 100644 index 0000000..10e32e6 --- /dev/null +++ b/tests/test_gitea_flake_lock.py @@ -0,0 +1,86 @@ +"""Tests for Gitea flake.lock automation.""" + +from __future__ import annotations + +from python.gitea import PullRequest +from python.gitea_flake_lock import ensure_flake_lock_pull_request, find_flake_lock_pull_request + + +def _pull_request(number=1, head_branch="automation/update-flake-lock"): + return PullRequest( + number=number, + title="Update flake.lock", + html_url=f"https://gitea.example.test/pulls/{number}", + labels=(), + head_branch=head_branch, + base_branch="main", + ) + + +class FakeGiteaClient: + def __init__(self, pull_requests=None): + self.pull_requests = pull_requests or [] + self.list_calls = [] + self.create_calls = [] + + def list_open_pull_requests(self, **kwargs): + self.list_calls.append(kwargs) + return self.pull_requests + + def create_pull_request(self, **kwargs): + self.create_calls.append(kwargs) + return _pull_request() + + +def test_ensure_flake_lock_pull_request_finds_by_branch(): + pull_request = _pull_request() + client = FakeGiteaClient([pull_request]) + + result = ensure_flake_lock_pull_request( + client, + owner="Richie", + repo="dotfiles", + branch="automation/update-flake-lock", + base="main", + ) + + assert result == pull_request + assert client.list_calls == [ + {"owner": "Richie", "repo": "dotfiles", "head": "automation/update-flake-lock"}, + ] + assert client.create_calls == [] + + +def test_ensure_flake_lock_pull_request_creates_without_labels(): + client = FakeGiteaClient() + + ensure_flake_lock_pull_request( + client, + owner="Richie", + repo="dotfiles", + branch="automation/update-flake-lock", + base="main", + ) + + assert client.create_calls == [ + { + "owner": "Richie", + "repo": "dotfiles", + "title": "Update flake.lock", + "body": "Automated flake.lock update.", + "head": "automation/update-flake-lock", + "base": "main", + }, + ] + + +def test_find_flake_lock_pull_request_finds_by_branch(): + pull_request = _pull_request() + client = FakeGiteaClient([pull_request]) + + result = find_flake_lock_pull_request(client, owner="Richie", repo="dotfiles") + + assert result == pull_request + assert client.list_calls == [ + {"owner": "Richie", "repo": "dotfiles", "head": "automation/update-flake-lock"}, + ]