Compare commits
4 Commits
main
...
cf8c91635a
| Author | SHA1 | Date | |
|---|---|---|---|
| cf8c91635a | |||
| 79162b65d4 | |||
| 67a95657a2 | |||
| a31b83b9c9 |
+93
@@ -0,0 +1,93 @@
|
||||
"""adding audiobook libreary metadata.
|
||||
|
||||
Revision ID: d7864d1ffc17
|
||||
Revises: c8a794340928
|
||||
Create Date: 2026-06-03 20:24:09.200837
|
||||
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
from python.orm import RichieBase
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Sequence
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "d7864d1ffc17"
|
||||
down_revision: str | None = "c8a794340928"
|
||||
branch_labels: str | Sequence[str] | None = None
|
||||
depends_on: str | Sequence[str] | None = None
|
||||
|
||||
schema = RichieBase.schema_name
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
"""Upgrade."""
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table(
|
||||
"audiobook_author",
|
||||
sa.Column("name", sa.String(), nullable=False),
|
||||
sa.Column("id", sa.Integer(), nullable=False),
|
||||
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
|
||||
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
|
||||
sa.PrimaryKeyConstraint("id", name=op.f("pk_audiobook_author")),
|
||||
sa.UniqueConstraint("name", name=op.f("uq_audiobook_author_name")),
|
||||
schema=schema,
|
||||
)
|
||||
op.create_table(
|
||||
"audiobook_series",
|
||||
sa.Column("name", sa.String(), nullable=False),
|
||||
sa.Column("author_id", sa.Integer(), nullable=False),
|
||||
sa.Column("id", sa.Integer(), nullable=False),
|
||||
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
|
||||
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
|
||||
sa.ForeignKeyConstraint(
|
||||
["author_id"],
|
||||
[f"{schema}.audiobook_author.id"],
|
||||
name=op.f("fk_audiobook_series_author_id_audiobook_author"),
|
||||
ondelete="CASCADE",
|
||||
),
|
||||
sa.PrimaryKeyConstraint("id", name=op.f("pk_audiobook_series")),
|
||||
sa.UniqueConstraint("author_id", "name", name=op.f("uq_audiobook_series_author_id")),
|
||||
schema=schema,
|
||||
)
|
||||
op.create_table(
|
||||
"audiobook",
|
||||
sa.Column("title", sa.String(), nullable=False),
|
||||
sa.Column("author_id", sa.Integer(), nullable=False),
|
||||
sa.Column("series_id", sa.Integer(), nullable=True),
|
||||
sa.Column("series_index", sa.Integer(), nullable=False),
|
||||
sa.Column("id", sa.Integer(), nullable=False),
|
||||
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
|
||||
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
|
||||
sa.ForeignKeyConstraint(
|
||||
["author_id"],
|
||||
[f"{schema}.audiobook_author.id"],
|
||||
name=op.f("fk_audiobook_author_id_audiobook_author"),
|
||||
ondelete="CASCADE",
|
||||
),
|
||||
sa.ForeignKeyConstraint(
|
||||
["series_id"],
|
||||
[f"{schema}.audiobook_series.id"],
|
||||
name=op.f("fk_audiobook_series_id_audiobook_series"),
|
||||
ondelete="SET NULL",
|
||||
),
|
||||
sa.PrimaryKeyConstraint("id", name=op.f("pk_audiobook")),
|
||||
schema=schema,
|
||||
)
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
"""Downgrade."""
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_table("audiobook", schema=schema)
|
||||
op.drop_table("audiobook_series", schema=schema)
|
||||
op.drop_table("audiobook_author", schema=schema)
|
||||
# ### end Alembic commands ###
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from python.orm.richie.audiobook import Audiobook, AudiobookAuthor, AudiobookSeries
|
||||
from python.orm.richie.base import RichieBase, TableBase, TableBaseBig, TableBaseSmall
|
||||
from python.orm.richie.contact import (
|
||||
Contact,
|
||||
@@ -12,6 +13,9 @@ from python.orm.richie.contact import (
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"Audiobook",
|
||||
"AudiobookAuthor",
|
||||
"AudiobookSeries",
|
||||
"Contact",
|
||||
"ContactNeed",
|
||||
"ContactRelationship",
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
"""Audiobook catalog models."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from sqlalchemy import ForeignKey, String, UniqueConstraint
|
||||
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
||||
|
||||
from python.orm.richie.base import TableBase
|
||||
|
||||
|
||||
class AudiobookAuthor(TableBase):
|
||||
"""Canonical audiobook author."""
|
||||
|
||||
__tablename__ = "audiobook_author"
|
||||
|
||||
name: Mapped[str] = mapped_column(String, unique=True)
|
||||
|
||||
books: Mapped[list[Audiobook]] = relationship("Audiobook", back_populates="author")
|
||||
series: Mapped[list[AudiobookSeries]] = relationship("AudiobookSeries", back_populates="author")
|
||||
|
||||
|
||||
class AudiobookSeries(TableBase):
|
||||
"""Canonical audiobook series."""
|
||||
|
||||
__tablename__ = "audiobook_series"
|
||||
__table_args__ = (UniqueConstraint("author_id", "name"),)
|
||||
|
||||
name: Mapped[str] = mapped_column(String)
|
||||
author_id: Mapped[int] = mapped_column(ForeignKey("main.audiobook_author.id", ondelete="CASCADE"))
|
||||
|
||||
author: Mapped[AudiobookAuthor] = relationship("AudiobookAuthor", back_populates="series")
|
||||
books: Mapped[list[Audiobook]] = relationship("Audiobook", back_populates="series")
|
||||
|
||||
|
||||
class Audiobook(TableBase):
|
||||
"""Canonical audiobook title."""
|
||||
|
||||
__tablename__ = "audiobook"
|
||||
|
||||
title: Mapped[str] = mapped_column(String)
|
||||
author_id: Mapped[int] = mapped_column(ForeignKey("main.audiobook_author.id", ondelete="CASCADE"))
|
||||
series_id: Mapped[int | None] = mapped_column(ForeignKey("main.audiobook_series.id", ondelete="SET NULL"))
|
||||
series_index: Mapped[int] = mapped_column(default=0)
|
||||
|
||||
author: Mapped[AudiobookAuthor] = relationship("AudiobookAuthor", back_populates="books")
|
||||
series: Mapped[AudiobookSeries | None] = relationship("AudiobookSeries", back_populates="books")
|
||||
@@ -0,0 +1 @@
|
||||
"""Audiobook tools."""
|
||||
@@ -0,0 +1,428 @@
|
||||
"""Convert Audible AAX downloads into Audiobookshelf-friendly M4B files."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import shutil
|
||||
import subprocess
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from dataclasses import asdict, dataclass
|
||||
from os import getenv
|
||||
from pathlib import Path # noqa: TC003 This is required for the typer CLI
|
||||
from typing import TYPE_CHECKING, Annotated, Any
|
||||
from uuid import uuid7
|
||||
|
||||
import typer
|
||||
|
||||
from python.common import configure_logger
|
||||
from python.orm.common import get_postgres_engine
|
||||
from python.tools.audiobook.metadata_agent import (
|
||||
AgentConfig,
|
||||
StandardBookMetadata,
|
||||
standard_book_metadata,
|
||||
write_agent_log,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from sqlalchemy.engine import Engine
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SENSITIVE_COMMAND_ARGUMENTS = {"-activation_bytes"}
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ConversionConfig:
|
||||
"""Runtime settings for one conversion command."""
|
||||
|
||||
resolved_output: Path
|
||||
ollama_api_key: str
|
||||
agent_config: AgentConfig
|
||||
engine: Engine
|
||||
activation_bytes: str | None
|
||||
dry_run: bool
|
||||
overwrite: bool
|
||||
work_directory_name: str = ".audible_convert"
|
||||
temp_directory_name: str = "tmp"
|
||||
log_directory_name: str = "logs"
|
||||
review_directory_name: str = "review"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ConcurrentConversionResult:
|
||||
"""Result from running ffmpeg and metadata resolution together."""
|
||||
|
||||
metadata: StandardBookMetadata | None
|
||||
conversion_error: Exception | None
|
||||
metadata_error: Exception | None
|
||||
|
||||
|
||||
class CommandExecutionError(RuntimeError):
|
||||
"""Command failed without exposing sensitive arguments."""
|
||||
|
||||
def __init__(self, arguments: list[str], returncode: int) -> None:
|
||||
"""Create a redacted command failure."""
|
||||
self.arguments = tuple(arguments)
|
||||
self.returncode = returncode
|
||||
command = " ".join(redact_command_arguments(arguments))
|
||||
super().__init__(f"Command failed with exit code {returncode}: {command}")
|
||||
|
||||
|
||||
def main(
|
||||
input_directory: Annotated[Path, typer.Argument(help="Directory audible-cli downloads AAX files into.")],
|
||||
output_directory: Annotated[Path, typer.Argument(help="Audiobook output directory.")],
|
||||
*,
|
||||
dry_run: Annotated[bool, typer.Option("--dry-run", help="Print planned output files without converting.")] = False,
|
||||
overwrite: Annotated[bool, typer.Option("--overwrite", help="Overwrite existing M4B files.")] = False,
|
||||
) -> None:
|
||||
"""Convert AAX files from a download directory into M4B files."""
|
||||
configure_logger()
|
||||
resolved_input = input_directory.resolve(strict=True)
|
||||
resolved_output = output_directory.resolve()
|
||||
if not dry_run:
|
||||
resolved_output.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
ollama_api_key = getenv("OLLAMA_API_KEY")
|
||||
if not ollama_api_key:
|
||||
msg = "OLLAMA_API_KEY is required for audiobook metadata resolution"
|
||||
raise RuntimeError(msg)
|
||||
|
||||
config = ConversionConfig(
|
||||
resolved_output=resolved_output,
|
||||
ollama_api_key=ollama_api_key,
|
||||
agent_config=AgentConfig(),
|
||||
engine=get_postgres_engine(name="RICHIE"),
|
||||
activation_bytes=getenv("AUDIBLE_ACTIVATION_BYTES"),
|
||||
dry_run=dry_run,
|
||||
overwrite=overwrite,
|
||||
)
|
||||
|
||||
aax_files = sorted(resolved_input.glob("*.aax"))
|
||||
if not aax_files:
|
||||
logger.info("No AAX files found in %s", resolved_input)
|
||||
return
|
||||
for aax_file in aax_files:
|
||||
logger.info("Converting %s", aax_file)
|
||||
convert_aax_file_with_agent(aax_file, config)
|
||||
|
||||
|
||||
def run_command(arguments: list[str], *, capture: bool = False) -> subprocess.CompletedProcess[str]:
|
||||
"""Run a command and return the completed process.
|
||||
|
||||
Args:
|
||||
arguments: Command and arguments to run.
|
||||
capture: Whether to capture stdout and stderr.
|
||||
|
||||
Returns:
|
||||
The completed process.
|
||||
"""
|
||||
logger.debug("%s", " ".join(redact_command_arguments(arguments)))
|
||||
try:
|
||||
return subprocess.run(arguments, check=True, capture_output=capture, text=True)
|
||||
except subprocess.CalledProcessError as error:
|
||||
raise CommandExecutionError(arguments, error.returncode) from error
|
||||
|
||||
|
||||
def redact_command_arguments(arguments: list[str]) -> list[str]:
|
||||
"""Return command arguments with sensitive values redacted."""
|
||||
redacted = []
|
||||
redact_next = False
|
||||
for argument in arguments:
|
||||
if redact_next:
|
||||
redacted.append("<redacted>")
|
||||
redact_next = False
|
||||
continue
|
||||
|
||||
redacted.append(argument)
|
||||
redact_next = argument in SENSITIVE_COMMAND_ARGUMENTS
|
||||
return redacted
|
||||
|
||||
|
||||
def read_metadata(aax_file: Path) -> dict[str, str]:
|
||||
"""Read ffprobe format tags from an AAX file.
|
||||
|
||||
Args:
|
||||
aax_file: AAX file to inspect.
|
||||
|
||||
Returns:
|
||||
Lower-cased metadata tag names mapped to their values.
|
||||
"""
|
||||
completed = run_command(
|
||||
[
|
||||
"ffprobe",
|
||||
"-v",
|
||||
"quiet",
|
||||
"-print_format",
|
||||
"json",
|
||||
"-show_format",
|
||||
str(aax_file),
|
||||
],
|
||||
capture=True,
|
||||
)
|
||||
ffprobe_data: dict[str, Any] = json.loads(completed.stdout)
|
||||
tags = ffprobe_data.get("format", {}).get("tags", {})
|
||||
return {str(key).lower(): str(value) for key, value in tags.items()}
|
||||
|
||||
|
||||
def output_stem(metadata: StandardBookMetadata) -> str:
|
||||
"""Build the output stem for a book.
|
||||
|
||||
Args:
|
||||
metadata: Book metadata.
|
||||
|
||||
Returns:
|
||||
Output stem in author-series_01-title form.
|
||||
"""
|
||||
return f"{metadata.author}-{metadata.series}_{metadata.series_index:02}-{metadata.title}"
|
||||
|
||||
|
||||
def metadata_output_path(output_directory: Path, metadata: StandardBookMetadata) -> Path:
|
||||
"""Build the final M4B path from resolved metadata."""
|
||||
stem = output_stem(metadata)
|
||||
return output_directory / stem / f"{stem}.m4b"
|
||||
|
||||
|
||||
def convert_aax_file(
|
||||
aax_file: Path,
|
||||
destination: Path,
|
||||
activation_bytes: str | None,
|
||||
*,
|
||||
overwrite: bool,
|
||||
) -> None:
|
||||
"""Convert an AAX file into an M4B file.
|
||||
|
||||
Args:
|
||||
aax_file: Source AAX file.
|
||||
destination: Destination M4B file.
|
||||
activation_bytes: Optional Audible activation bytes for ffmpeg.
|
||||
overwrite: Whether to overwrite an existing M4B.
|
||||
"""
|
||||
if destination.exists() and not overwrite:
|
||||
logger.info("Skipping existing file %s", destination)
|
||||
return
|
||||
|
||||
destination.parent.mkdir(parents=True, exist_ok=True)
|
||||
arguments = ["ffmpeg", "-hide_banner", "-y" if overwrite else "-n"]
|
||||
if activation_bytes:
|
||||
arguments.extend(["-activation_bytes", activation_bytes])
|
||||
arguments.extend(["-i", str(aax_file), "-map_metadata", "0", "-c", "copy", str(destination)])
|
||||
run_command(arguments)
|
||||
|
||||
|
||||
def write_review_file(
|
||||
*,
|
||||
destination: Path | None,
|
||||
ffprobe_metadata: dict[str, str],
|
||||
log_file: Path,
|
||||
metadata: StandardBookMetadata | None,
|
||||
reason: str,
|
||||
review_file: Path,
|
||||
source: Path,
|
||||
temp_file: Path | None,
|
||||
) -> None:
|
||||
"""Write a manual review file for an unresolved conversion."""
|
||||
review_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
payload = {
|
||||
"destination": str(destination) if destination else None,
|
||||
"ffprobe_metadata": ffprobe_metadata,
|
||||
"metadata": asdict(metadata) if metadata else None,
|
||||
"reason": reason,
|
||||
"source": str(source),
|
||||
"temp_file": str(temp_file) if temp_file else None,
|
||||
}
|
||||
review_file.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
|
||||
write_agent_log(log_file, "review_written", path=str(review_file), reason=reason)
|
||||
|
||||
|
||||
def cleanup_temp_output(temp_file: Path) -> None:
|
||||
"""Remove a run's temporary output directory."""
|
||||
shutil.rmtree(temp_file.parent, ignore_errors=True)
|
||||
|
||||
|
||||
def dry_run_aax_file_with_agent(
|
||||
aax_file: Path,
|
||||
ffprobe_metadata: dict[str, str],
|
||||
engine: Engine,
|
||||
config: ConversionConfig,
|
||||
log_file: Path,
|
||||
review_file: Path,
|
||||
) -> None:
|
||||
"""Resolve and print the planned output path without converting."""
|
||||
metadata = standard_book_metadata(
|
||||
aax_file.name,
|
||||
ffprobe_metadata,
|
||||
engine,
|
||||
log_file,
|
||||
config.ollama_api_key,
|
||||
config.agent_config,
|
||||
)
|
||||
destination = None if metadata.needs_review else metadata_output_path(config.resolved_output, metadata)
|
||||
if metadata.needs_review:
|
||||
write_review_file(
|
||||
destination=destination,
|
||||
ffprobe_metadata=ffprobe_metadata,
|
||||
log_file=log_file,
|
||||
metadata=metadata,
|
||||
reason="metadata_needs_review",
|
||||
review_file=review_file,
|
||||
source=aax_file,
|
||||
temp_file=None,
|
||||
)
|
||||
typer.echo(f"{aax_file} -> REVIEW {review_file}")
|
||||
else:
|
||||
typer.echo(f"{aax_file} -> {destination}")
|
||||
|
||||
|
||||
def convert_temp_file_and_resolve_metadata(
|
||||
aax_file: Path,
|
||||
temp_file: Path,
|
||||
ffprobe_metadata: dict[str, str],
|
||||
config: ConversionConfig,
|
||||
log_file: Path,
|
||||
) -> ConcurrentConversionResult:
|
||||
"""Run ffmpeg and metadata resolution in parallel."""
|
||||
conversion_error: Exception | None = None
|
||||
metadata_error: Exception | None = None
|
||||
metadata: StandardBookMetadata | None = None
|
||||
|
||||
with ThreadPoolExecutor(max_workers=2) as executor:
|
||||
conversion_future = executor.submit(
|
||||
convert_aax_file,
|
||||
aax_file,
|
||||
temp_file,
|
||||
config.activation_bytes,
|
||||
overwrite=True,
|
||||
)
|
||||
metadata_future = executor.submit(
|
||||
standard_book_metadata,
|
||||
aax_file.name,
|
||||
ffprobe_metadata,
|
||||
config.engine,
|
||||
log_file,
|
||||
config.ollama_api_key,
|
||||
config.agent_config,
|
||||
)
|
||||
|
||||
conversion_error = conversion_future.exception()
|
||||
if conversion_error is None:
|
||||
conversion_future.result()
|
||||
|
||||
metadata_error = metadata_future.exception()
|
||||
if metadata_error is None:
|
||||
metadata = metadata_future.result()
|
||||
|
||||
return ConcurrentConversionResult(
|
||||
metadata=metadata,
|
||||
conversion_error=conversion_error,
|
||||
metadata_error=metadata_error,
|
||||
)
|
||||
|
||||
|
||||
def convert_aax_file_with_agent(aax_file: Path, config: ConversionConfig) -> None:
|
||||
"""Convert one AAX file using the metadata agent for the final path."""
|
||||
run_id = uuid7().hex
|
||||
log_file = config.resolved_output / config.work_directory_name / config.log_directory_name / f"{run_id}.jsonl"
|
||||
review_file = config.resolved_output / config.work_directory_name / config.review_directory_name / f"{run_id}.json"
|
||||
write_agent_log(log_file, "conversion_start", source=str(aax_file), dry_run=config.dry_run)
|
||||
try:
|
||||
ffprobe_metadata = read_metadata(aax_file)
|
||||
except Exception as error:
|
||||
logger.exception("ffprobe failed")
|
||||
write_review_file(
|
||||
destination=None,
|
||||
ffprobe_metadata={},
|
||||
log_file=log_file,
|
||||
metadata=None,
|
||||
reason=f"ffprobe_failed: {error}",
|
||||
review_file=review_file,
|
||||
source=aax_file,
|
||||
temp_file=None,
|
||||
)
|
||||
return
|
||||
|
||||
if config.dry_run:
|
||||
dry_run_aax_file_with_agent(
|
||||
aax_file,
|
||||
ffprobe_metadata,
|
||||
config.engine,
|
||||
config,
|
||||
log_file,
|
||||
review_file,
|
||||
)
|
||||
return
|
||||
|
||||
temp_file = (
|
||||
config.resolved_output / config.work_directory_name / config.temp_directory_name / run_id / "converted.m4b"
|
||||
)
|
||||
temp_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
result = convert_temp_file_and_resolve_metadata(aax_file, temp_file, ffprobe_metadata, config, log_file)
|
||||
|
||||
if result.conversion_error:
|
||||
reason = f"ffmpeg_failed: {result.conversion_error}"
|
||||
write_review_file(
|
||||
destination=None,
|
||||
ffprobe_metadata=ffprobe_metadata,
|
||||
log_file=log_file,
|
||||
metadata=result.metadata,
|
||||
reason=reason,
|
||||
review_file=review_file,
|
||||
source=aax_file,
|
||||
temp_file=temp_file if temp_file.exists() else None,
|
||||
)
|
||||
return
|
||||
|
||||
if result.metadata_error:
|
||||
write_review_file(
|
||||
destination=None,
|
||||
ffprobe_metadata=ffprobe_metadata,
|
||||
log_file=log_file,
|
||||
metadata=None,
|
||||
reason=f"metadata_failed: {result.metadata_error}",
|
||||
review_file=review_file,
|
||||
source=aax_file,
|
||||
temp_file=temp_file,
|
||||
)
|
||||
return
|
||||
|
||||
if result.metadata is None or result.metadata.needs_review:
|
||||
write_review_file(
|
||||
destination=None,
|
||||
ffprobe_metadata=ffprobe_metadata,
|
||||
log_file=log_file,
|
||||
metadata=result.metadata,
|
||||
reason="metadata_needs_review",
|
||||
review_file=review_file,
|
||||
source=aax_file,
|
||||
temp_file=temp_file,
|
||||
)
|
||||
return
|
||||
|
||||
destination = metadata_output_path(config.resolved_output, result.metadata)
|
||||
if destination.exists() and not config.overwrite:
|
||||
write_agent_log(log_file, "destination_exists", destination=str(destination))
|
||||
cleanup_temp_output(temp_file)
|
||||
return
|
||||
|
||||
destination.parent.mkdir(parents=True, exist_ok=True)
|
||||
try:
|
||||
temp_file.replace(destination)
|
||||
except Exception as error: # noqa: BLE001
|
||||
write_review_file(
|
||||
destination=destination,
|
||||
ffprobe_metadata=ffprobe_metadata,
|
||||
log_file=log_file,
|
||||
metadata=result.metadata,
|
||||
reason=f"rename_failed: {error}",
|
||||
review_file=review_file,
|
||||
source=aax_file,
|
||||
temp_file=temp_file if temp_file.exists() else None,
|
||||
)
|
||||
else:
|
||||
cleanup_temp_output(temp_file)
|
||||
write_agent_log(log_file, "conversion_complete", destination=str(destination))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
typer.run(main)
|
||||
@@ -0,0 +1,176 @@
|
||||
"""Import audiobook catalog authors and series from CSV files."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import csv
|
||||
import logging
|
||||
from pathlib import Path # noqa: TC003 This is required for the typer CLI
|
||||
from typing import Annotated
|
||||
|
||||
import typer
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from python.common import configure_logger
|
||||
from python.orm.common import get_postgres_engine
|
||||
from python.orm.richie import AudiobookAuthor, AudiobookSeries
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
AUTHOR_NAME_COLUMN = "author_name"
|
||||
ID_COLUMN = "id"
|
||||
NAME_COLUMN = "name"
|
||||
|
||||
|
||||
class CatalogImportError(ValueError):
|
||||
"""CSV catalog import failed validation."""
|
||||
|
||||
|
||||
def main(
|
||||
authors_csv: Annotated[Path, typer.Argument(help="CSV with name and optional id.")],
|
||||
series_csv: Annotated[Path, typer.Argument(help="CSV with name, author_name, and optional id.")],
|
||||
) -> None:
|
||||
"""Upsert audiobook authors and series from CSV files."""
|
||||
configure_logger()
|
||||
try:
|
||||
engine = get_postgres_engine(name="RICHIE")
|
||||
with Session(engine) as session:
|
||||
author_count = upsert_authors_from_csv(session, authors_csv)
|
||||
series_count = upsert_series_from_csv(session, series_csv)
|
||||
session.commit()
|
||||
except CatalogImportError as error:
|
||||
typer.echo(str(error), err=True)
|
||||
raise typer.Exit(code=1) from error
|
||||
|
||||
logger.info("Upserted %s authors and %s series", author_count, series_count)
|
||||
|
||||
|
||||
def upsert_authors_from_csv(session: Session, authors_csv: Path) -> int:
|
||||
"""Upsert authors from a CSV file."""
|
||||
count = 0
|
||||
for row_number, row in csv_rows(authors_csv):
|
||||
name = required_csv_value(row, authors_csv, row_number, NAME_COLUMN)
|
||||
upsert_author(session, name, csv_id(row, authors_csv, row_number))
|
||||
count += 1
|
||||
return count
|
||||
|
||||
|
||||
def upsert_series_from_csv(session: Session, series_csv: Path) -> int:
|
||||
"""Upsert series from a CSV file."""
|
||||
count = 0
|
||||
for row_number, row in csv_rows(series_csv):
|
||||
series_name = required_csv_value(row, series_csv, row_number, NAME_COLUMN)
|
||||
author_name = required_csv_value(row, series_csv, row_number, AUTHOR_NAME_COLUMN)
|
||||
author = find_author_by_name(session, author_name)
|
||||
if author is None:
|
||||
msg = f"{series_csv}:{row_number}: author not found: {author_name}"
|
||||
raise CatalogImportError(msg)
|
||||
upsert_series(session, series_name, author, csv_id(row, series_csv, row_number))
|
||||
count += 1
|
||||
return count
|
||||
|
||||
|
||||
def upsert_author(session: Session, name: str, author_id: int | None) -> AudiobookAuthor:
|
||||
"""Upsert one author by id or exact name."""
|
||||
if author_id is not None:
|
||||
author = session.get(AudiobookAuthor, author_id)
|
||||
if author is None:
|
||||
author = AudiobookAuthor(id=author_id, name=name)
|
||||
session.add(author)
|
||||
else:
|
||||
author.name = name
|
||||
session.flush()
|
||||
return author
|
||||
|
||||
author = find_author_by_name(session, name)
|
||||
if author is None:
|
||||
author = AudiobookAuthor(name=name)
|
||||
session.add(author)
|
||||
session.flush()
|
||||
return author
|
||||
|
||||
|
||||
def upsert_series(
|
||||
session: Session,
|
||||
name: str,
|
||||
author: AudiobookAuthor,
|
||||
series_id: int | None,
|
||||
) -> AudiobookSeries:
|
||||
"""Upsert one series by id or exact author/name match."""
|
||||
if series_id is not None:
|
||||
series = session.get(AudiobookSeries, series_id)
|
||||
if series is None:
|
||||
series = AudiobookSeries(id=series_id, name=name, author=author)
|
||||
session.add(series)
|
||||
else:
|
||||
series.name = name
|
||||
series.author = author
|
||||
session.flush()
|
||||
return series
|
||||
|
||||
series = find_series_by_name_and_author(session, name, author.id)
|
||||
if series is None:
|
||||
series = AudiobookSeries(name=name, author=author)
|
||||
session.add(series)
|
||||
session.flush()
|
||||
return series
|
||||
|
||||
|
||||
def find_author_by_name(session: Session, name: str) -> AudiobookAuthor | None:
|
||||
"""Find one author by exact name."""
|
||||
return session.scalar(select(AudiobookAuthor).where(AudiobookAuthor.name == name))
|
||||
|
||||
|
||||
def find_series_by_name_and_author(
|
||||
session: Session,
|
||||
name: str,
|
||||
author_id: int,
|
||||
) -> AudiobookSeries | None:
|
||||
"""Find one series by exact name and author."""
|
||||
return session.scalar(
|
||||
select(AudiobookSeries).where(
|
||||
AudiobookSeries.name == name,
|
||||
AudiobookSeries.author_id == author_id,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def csv_rows(csv_path: Path) -> list[tuple[int, dict[str, str | None]]]:
|
||||
"""Read a CSV file as numbered rows."""
|
||||
with csv_path.open(newline="", encoding="utf-8") as file:
|
||||
reader = csv.DictReader(file)
|
||||
if reader.fieldnames is None:
|
||||
msg = f"{csv_path}: missing CSV header"
|
||||
raise CatalogImportError(msg)
|
||||
return [(row_number, row) for row_number, row in enumerate(reader, start=2)]
|
||||
|
||||
|
||||
def required_csv_value(
|
||||
row: dict[str, str | None],
|
||||
csv_path: Path,
|
||||
row_number: int,
|
||||
column: str,
|
||||
) -> str:
|
||||
"""Read a required CSV value."""
|
||||
value = row.get(column)
|
||||
if value and value.strip():
|
||||
return value.strip()
|
||||
msg = f"{csv_path}:{row_number}: missing required column value: {column}"
|
||||
raise CatalogImportError(msg)
|
||||
|
||||
|
||||
def csv_id(row: dict[str, str | None], csv_path: Path, row_number: int) -> int | None:
|
||||
"""Read an optional id field from a CSV row."""
|
||||
value = row.get(ID_COLUMN)
|
||||
if value is None or not value.strip():
|
||||
return None
|
||||
try:
|
||||
return int(value)
|
||||
except ValueError as error:
|
||||
msg = f"{csv_path}:{row_number}: id must be an integer: {value}"
|
||||
raise CatalogImportError(msg) from error
|
||||
return None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
typer.run(main)
|
||||
@@ -0,0 +1,565 @@
|
||||
"""LLM tool calling support for audiobook metadata resolution."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from sqlalchemy import or_, select
|
||||
|
||||
from python.orm.richie import Audiobook, AudiobookAuthor, AudiobookSeries
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pathlib import Path
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from python.tools.audiobook.metadata_agent import AgentConfig
|
||||
|
||||
CATALOG_SLUG_PATTERN = re.compile(r"^[a-z0-9]+(?:_[a-z0-9]+)*$")
|
||||
TITLE_SLUG_PATTERN = re.compile(r"^[a-z0-9]+(?:-[a-z0-9]+)*$")
|
||||
|
||||
LogWriter = Callable[..., None]
|
||||
|
||||
|
||||
class MetadataResolutionError(ValueError):
|
||||
"""Metadata resolution failed validation."""
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class EnsuredBook:
|
||||
"""Book row plus whether it was created."""
|
||||
|
||||
book: Audiobook
|
||||
action: str
|
||||
|
||||
|
||||
class CatalogToolRegistry:
|
||||
"""Controlled catalog tools exposed to the metadata model."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
session: Session,
|
||||
log_path: Path,
|
||||
config: AgentConfig,
|
||||
write_log: LogWriter,
|
||||
) -> None:
|
||||
"""Create a registry bound to one database session and audit log."""
|
||||
self.session = session
|
||||
self.log_path = log_path
|
||||
self.config = config
|
||||
self.write_log = write_log
|
||||
self.seen_author_ids: set[int] = set()
|
||||
self.seen_series_ids: set[int] = set()
|
||||
self.seen_book_ids: set[int] = set()
|
||||
self.created_author_ids: set[int] = set()
|
||||
self.created_series_ids: set[int] = set()
|
||||
self.created_book_ids: set[int] = set()
|
||||
|
||||
def tool_schemas(self) -> list[dict[str, object]]:
|
||||
"""Return Ollama tool schemas."""
|
||||
schemas = [
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "search_authors",
|
||||
"description": "Search canonical audiobook authors by slug or noisy source text.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {"query": {"type": "string"}},
|
||||
"required": ["query"],
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "search_series",
|
||||
"description": "Search canonical audiobook series by slug or noisy source text.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"query": {"type": "string"},
|
||||
"author_id": {"type": ["integer", "null"]},
|
||||
},
|
||||
"required": ["query"],
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "search_books",
|
||||
"description": "Search canonical audiobook titles with optional author and series filters.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"query": {"type": "string"},
|
||||
"author_id": {"type": ["integer", "null"]},
|
||||
"series_id": {"type": ["integer", "null"]},
|
||||
},
|
||||
"required": ["query"],
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "ensure_author",
|
||||
"description": "Normalize an author name to a catalog slug, then return or create that author.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {"name": {"type": "string"}},
|
||||
"required": ["name"],
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "ensure_series",
|
||||
"description": "Normalize a series name to a catalog slug, then return or create it for an author.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {"type": "string"},
|
||||
"author_id": {"type": "integer"},
|
||||
},
|
||||
"required": ["name", "author_id"],
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "ensure_book",
|
||||
"description": "Normalize a title to a book slug, then return or create it for an author/series.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"title": {"type": "string"},
|
||||
"author_id": {"type": "integer"},
|
||||
"series_id": {"type": ["integer", "null"]},
|
||||
"series_index": {"type": "integer"},
|
||||
},
|
||||
"required": ["title", "author_id", "series_id", "series_index"],
|
||||
},
|
||||
},
|
||||
},
|
||||
]
|
||||
enabled_tool_names = set(self.config.tool_names)
|
||||
return [schema for schema in schemas if schema["function"]["name"] in enabled_tool_names]
|
||||
|
||||
def run(self, name: str, arguments: dict[str, object]) -> list[dict[str, object]]:
|
||||
"""Run one catalog tool and audit the call."""
|
||||
handlers = {
|
||||
"search_authors": self.run_search_authors,
|
||||
"search_series": self.run_search_series,
|
||||
"search_books": self.run_search_books,
|
||||
"ensure_author": self.run_ensure_author,
|
||||
"ensure_series": self.run_ensure_series,
|
||||
"ensure_book": self.run_ensure_book,
|
||||
}
|
||||
handler = handlers.get(name)
|
||||
if handler is None:
|
||||
self.write_log(self.log_path, "tool_error", tool=name, arguments=arguments, error="unknown_tool")
|
||||
msg = f"Unknown audiobook metadata tool: {name}"
|
||||
raise MetadataResolutionError(msg)
|
||||
if name not in self.config.tool_names:
|
||||
self.write_log(self.log_path, "tool_error", tool=name, arguments=arguments, error="tool_not_enabled")
|
||||
msg = f"Audiobook metadata tool is not enabled: {name}"
|
||||
raise MetadataResolutionError(msg)
|
||||
|
||||
started = time.perf_counter()
|
||||
self.write_log(self.log_path, "tool_call", tool=name, arguments=arguments)
|
||||
result = handler(arguments)
|
||||
duration_ms = round((time.perf_counter() - started) * 1000, 3)
|
||||
self.write_log(
|
||||
self.log_path,
|
||||
"tool_result",
|
||||
tool=name,
|
||||
duration_ms=duration_ms,
|
||||
result_count=len(result),
|
||||
preview=result[:3],
|
||||
)
|
||||
return result
|
||||
|
||||
def get_author(self, author_id: int) -> AudiobookAuthor | None:
|
||||
"""Return an author by id."""
|
||||
return self.session.get(AudiobookAuthor, author_id)
|
||||
|
||||
def get_book(self, book_id: int) -> Audiobook | None:
|
||||
"""Return a book by id."""
|
||||
return self.session.get(Audiobook, book_id)
|
||||
|
||||
def get_series(self, series_id: int) -> AudiobookSeries | None:
|
||||
"""Return a series by id."""
|
||||
return self.session.get(AudiobookSeries, series_id)
|
||||
|
||||
def prune_unused_created_rows(self, *, author_id: int, book_id: int | None, series_id: int | None) -> None:
|
||||
"""Remove catalog rows created during this run but not used by final metadata."""
|
||||
used_book_ids = {book_id} if book_id is not None else set()
|
||||
for created_book_id in self.created_book_ids - used_book_ids:
|
||||
if book := self.get_book(created_book_id):
|
||||
self.session.delete(book)
|
||||
|
||||
self.session.flush()
|
||||
used_series_ids = {series_id} if series_id is not None else set()
|
||||
for created_series_id in self.created_series_ids - used_series_ids:
|
||||
series = self.get_series(created_series_id)
|
||||
if series and not series.books:
|
||||
self.session.delete(series)
|
||||
|
||||
self.session.flush()
|
||||
for created_author_id in self.created_author_ids - {author_id}:
|
||||
author = self.get_author(created_author_id)
|
||||
if author and not author.books and not author.series:
|
||||
self.session.delete(author)
|
||||
|
||||
def run_search_authors(self, arguments: dict[str, object]) -> list[dict[str, object]]:
|
||||
"""Search authors from tool arguments and remember returned ids."""
|
||||
query = required_string(arguments, "query")
|
||||
statement = select(AudiobookAuthor).order_by(AudiobookAuthor.name).limit(self.config.max_tool_results)
|
||||
if terms := query_terms(query):
|
||||
statement = statement.where(or_(*(AudiobookAuthor.name.ilike(f"%{term}%") for term in terms)))
|
||||
|
||||
authors = self.session.scalars(statement).all()
|
||||
self.seen_author_ids.update(author.id for author in authors)
|
||||
return [{"id": author.id, "name": author.name} for author in authors]
|
||||
|
||||
def run_search_series(self, arguments: dict[str, object]) -> list[dict[str, object]]:
|
||||
"""Search series from tool arguments and remember returned ids."""
|
||||
query = required_string(arguments, "query")
|
||||
author_id = optional_int(arguments.get("author_id"), "author_id")
|
||||
statement = select(AudiobookSeries).order_by(AudiobookSeries.name).limit(self.config.max_tool_results)
|
||||
if terms := query_terms(query):
|
||||
statement = statement.where(or_(*(AudiobookSeries.name.ilike(f"%{term}%") for term in terms)))
|
||||
if author_id is not None:
|
||||
statement = statement.where(AudiobookSeries.author_id == author_id)
|
||||
|
||||
series_rows = self.session.scalars(statement).all()
|
||||
self.seen_series_ids.update(series.id for series in series_rows)
|
||||
self.seen_author_ids.update(series.author_id for series in series_rows)
|
||||
return [
|
||||
{
|
||||
"id": series.id,
|
||||
"name": series.name,
|
||||
"author_id": series.author_id,
|
||||
"author": series.author.name,
|
||||
}
|
||||
for series in series_rows
|
||||
]
|
||||
|
||||
def run_search_books(self, arguments: dict[str, object]) -> list[dict[str, object]]:
|
||||
"""Search books from tool arguments and remember returned ids."""
|
||||
query = required_string(arguments, "query")
|
||||
author_id = optional_int(arguments.get("author_id"), "author_id")
|
||||
series_id = optional_int(arguments.get("series_id"), "series_id")
|
||||
statement = select(Audiobook).order_by(Audiobook.title).limit(self.config.max_tool_results)
|
||||
if terms := query_terms(query):
|
||||
statement = statement.where(or_(*(Audiobook.title.ilike(f"%{term}%") for term in terms)))
|
||||
if author_id is not None:
|
||||
statement = statement.where(Audiobook.author_id == author_id)
|
||||
if series_id is not None:
|
||||
statement = statement.where(Audiobook.series_id == series_id)
|
||||
|
||||
books = self.session.scalars(statement).all()
|
||||
self.seen_book_ids.update(book.id for book in books)
|
||||
self.seen_author_ids.update(book.author_id for book in books)
|
||||
self.seen_series_ids.update(book.series_id for book in books if book.series_id is not None)
|
||||
return [
|
||||
{
|
||||
"id": book.id,
|
||||
"title": book.title,
|
||||
"author_id": book.author_id,
|
||||
"author": book.author.name,
|
||||
"series_id": book.series_id,
|
||||
"series": book.series.name if book.series else self.config.standalone_series,
|
||||
"series_index": book.series_index,
|
||||
}
|
||||
for book in books
|
||||
]
|
||||
|
||||
def run_ensure_author(self, arguments: dict[str, object]) -> list[dict[str, object]]:
|
||||
"""Ensure an author from tool arguments and return a tool result."""
|
||||
name = normalize_catalog_slug(required_string(arguments, "name"))
|
||||
validate_catalog_slug(name, "author")
|
||||
author = self.session.scalar(select(AudiobookAuthor).where(AudiobookAuthor.name == name))
|
||||
action = "existing"
|
||||
if author is None:
|
||||
author = AudiobookAuthor(name=name)
|
||||
self.session.add(author)
|
||||
self.session.flush()
|
||||
self.created_author_ids.add(author.id)
|
||||
action = "created"
|
||||
|
||||
self.seen_author_ids.add(author.id)
|
||||
return [{"id": author.id, "name": author.name, "action": action}]
|
||||
|
||||
def run_ensure_series(self, arguments: dict[str, object]) -> list[dict[str, object]]:
|
||||
"""Ensure a series from tool arguments and return a tool result."""
|
||||
name = normalize_catalog_slug(required_string(arguments, "name"))
|
||||
author_id = required_int(arguments, "author_id")
|
||||
validate_catalog_slug(name, "series")
|
||||
author = self.required_author(author_id)
|
||||
series = self.session.scalar(
|
||||
select(AudiobookSeries).where(
|
||||
AudiobookSeries.name == name,
|
||||
AudiobookSeries.author_id == author.id,
|
||||
),
|
||||
)
|
||||
action = "existing"
|
||||
if series is None:
|
||||
series = AudiobookSeries(name=name, author=author)
|
||||
self.session.add(series)
|
||||
self.session.flush()
|
||||
self.created_series_ids.add(series.id)
|
||||
action = "created"
|
||||
|
||||
self.seen_author_ids.add(author.id)
|
||||
self.seen_series_ids.add(series.id)
|
||||
return [self.series_result(series, action)]
|
||||
|
||||
def run_ensure_book(self, arguments: dict[str, object]) -> list[dict[str, object]]:
|
||||
"""Ensure a book from tool arguments and return a tool result."""
|
||||
title = required_string(arguments, "title")
|
||||
author_id = required_int(arguments, "author_id")
|
||||
series_id = optional_int(arguments.get("series_id"), "series_id")
|
||||
series_index = required_int(arguments, "series_index")
|
||||
ensured = self.ensure_book(title, author_id, series_id, series_index)
|
||||
return [self.book_result(ensured.book, ensured.action)]
|
||||
|
||||
def ensure_book(
|
||||
self,
|
||||
title: str,
|
||||
author_id: int,
|
||||
series_id: int | None,
|
||||
series_index: int,
|
||||
) -> EnsuredBook:
|
||||
"""Return an existing book row, or create it after validating ownership."""
|
||||
title = normalize_title_slug(title)
|
||||
validate_title_slug(title)
|
||||
author = self.required_author(author_id)
|
||||
series = None
|
||||
if series_id is None:
|
||||
if series_index != 0:
|
||||
msg = "standalone books must use series_index 0"
|
||||
raise MetadataResolutionError(msg)
|
||||
else:
|
||||
series = self.required_series(series_id)
|
||||
if series.author_id != author.id:
|
||||
msg = f"series_id {series_id} does not belong to author_id {author_id}"
|
||||
raise MetadataResolutionError(msg)
|
||||
if series_index <= 0:
|
||||
msg = "series books must use a positive series_index"
|
||||
raise MetadataResolutionError(msg)
|
||||
|
||||
statement = select(Audiobook).where(
|
||||
Audiobook.title == title,
|
||||
Audiobook.author_id == author.id,
|
||||
)
|
||||
if series is None:
|
||||
statement = statement.where(Audiobook.series_id.is_(None))
|
||||
else:
|
||||
statement = statement.where(Audiobook.series_id == series.id)
|
||||
book = self.session.scalar(statement)
|
||||
if book is None:
|
||||
book = Audiobook(title=title, author=author, series=series, series_index=series_index)
|
||||
self.session.add(book)
|
||||
self.session.flush()
|
||||
self.created_book_ids.add(book.id)
|
||||
action = "created"
|
||||
else:
|
||||
action = "existing"
|
||||
|
||||
self.seen_book_ids.add(book.id)
|
||||
self.seen_author_ids.add(author.id)
|
||||
if book.series_id is not None:
|
||||
self.seen_series_ids.add(book.series_id)
|
||||
return EnsuredBook(book=book, action=action)
|
||||
|
||||
def required_author(self, author_id: int) -> AudiobookAuthor:
|
||||
"""Return an author or fail metadata resolution."""
|
||||
author = self.get_author(author_id)
|
||||
if author is None:
|
||||
msg = f"author_id {author_id} does not exist"
|
||||
raise MetadataResolutionError(msg)
|
||||
return author
|
||||
|
||||
def required_series(self, series_id: int) -> AudiobookSeries:
|
||||
"""Return a series or fail metadata resolution."""
|
||||
series = self.get_series(series_id)
|
||||
if series is None:
|
||||
msg = f"series_id {series_id} does not exist"
|
||||
raise MetadataResolutionError(msg)
|
||||
return series
|
||||
|
||||
def series_result(self, series: AudiobookSeries, action: str) -> dict[str, object]:
|
||||
"""Build a normalized series tool result."""
|
||||
return {
|
||||
"id": series.id,
|
||||
"name": series.name,
|
||||
"author_id": series.author_id,
|
||||
"author": series.author.name,
|
||||
"action": action,
|
||||
}
|
||||
|
||||
def book_result(self, book: Audiobook, action: str) -> dict[str, object]:
|
||||
"""Build a normalized book tool result."""
|
||||
return {
|
||||
"id": book.id,
|
||||
"title": book.title,
|
||||
"author_id": book.author_id,
|
||||
"author": book.author.name,
|
||||
"series_id": book.series_id,
|
||||
"series": book.series.name if book.series else self.config.standalone_series,
|
||||
"series_index": book.series_index,
|
||||
"action": action,
|
||||
}
|
||||
|
||||
|
||||
def run_tool_calls(
|
||||
messages: list[dict[str, object]],
|
||||
message: dict[str, object],
|
||||
tool_calls: list[tuple[str, dict[str, object]]],
|
||||
registry: CatalogToolRegistry,
|
||||
log_path: Path,
|
||||
write_log: LogWriter,
|
||||
) -> str | None:
|
||||
"""Run tool calls, append tool messages, and return fatal error text when stopped."""
|
||||
messages.append(message)
|
||||
for tool_name, arguments in tool_calls:
|
||||
try:
|
||||
tool_result = registry.run(tool_name, arguments)
|
||||
except MetadataResolutionError as error:
|
||||
if is_fatal_tool_error(error):
|
||||
return str(error)
|
||||
write_log(log_path, "tool_error", tool=tool_name, arguments=arguments, error=str(error))
|
||||
messages.append(
|
||||
{
|
||||
"role": "tool",
|
||||
"tool_name": tool_name,
|
||||
"content": json.dumps({"error": str(error)}, sort_keys=True),
|
||||
},
|
||||
)
|
||||
continue
|
||||
messages.append(
|
||||
{
|
||||
"role": "tool",
|
||||
"tool_name": tool_name,
|
||||
"content": json.dumps(tool_result, sort_keys=True),
|
||||
},
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def parse_tool_calls(message: dict[str, object]) -> list[tuple[str, dict[str, object]]]:
|
||||
"""Parse Ollama tool calls from a response message."""
|
||||
raw_tool_calls = message.get("tool_calls") or []
|
||||
if not isinstance(raw_tool_calls, list):
|
||||
msg = "tool_calls must be a list"
|
||||
raise MetadataResolutionError(msg)
|
||||
|
||||
tool_calls = []
|
||||
for raw_call in raw_tool_calls:
|
||||
if not isinstance(raw_call, dict):
|
||||
msg = "tool call must be an object"
|
||||
raise MetadataResolutionError(msg)
|
||||
function = raw_call.get("function")
|
||||
if not isinstance(function, dict):
|
||||
msg = "tool call is missing function"
|
||||
raise MetadataResolutionError(msg)
|
||||
name = function.get("name")
|
||||
if not isinstance(name, str) or not name:
|
||||
msg = "tool call is missing function name"
|
||||
raise MetadataResolutionError(msg)
|
||||
arguments = parse_tool_arguments(function.get("arguments", {}))
|
||||
tool_calls.append((name, arguments))
|
||||
return tool_calls
|
||||
|
||||
|
||||
def parse_tool_arguments(raw_arguments: object) -> dict[str, object]:
|
||||
"""Parse tool call arguments returned by Ollama."""
|
||||
if isinstance(raw_arguments, dict):
|
||||
return {str(key): value for key, value in raw_arguments.items()}
|
||||
if isinstance(raw_arguments, str):
|
||||
parsed = json.loads(raw_arguments) if raw_arguments else {}
|
||||
if isinstance(parsed, dict):
|
||||
return {str(key): value for key, value in parsed.items()}
|
||||
msg = "tool arguments must be an object"
|
||||
raise MetadataResolutionError(msg)
|
||||
|
||||
|
||||
def validate_title_slug(title: str) -> None:
|
||||
"""Validate a canonical book title slug."""
|
||||
if not TITLE_SLUG_PATTERN.fullmatch(title):
|
||||
msg = f"title slug is invalid: {title}"
|
||||
raise MetadataResolutionError(msg)
|
||||
|
||||
|
||||
def validate_catalog_slug(value: str, label: str) -> None:
|
||||
"""Validate a canonical catalog slug."""
|
||||
if not CATALOG_SLUG_PATTERN.fullmatch(value):
|
||||
msg = f"{label} slug is invalid: {value}"
|
||||
raise MetadataResolutionError(msg)
|
||||
|
||||
|
||||
def normalize_catalog_slug(value: str) -> str:
|
||||
"""Normalize noisy catalog names into lower snake-case slugs."""
|
||||
return re.sub(r"[^a-z0-9]+", "_", value.strip().casefold()).strip("_")
|
||||
|
||||
|
||||
def normalize_title_slug(value: str) -> str:
|
||||
"""Normalize noisy book titles into lower kebab-case slugs."""
|
||||
return re.sub(r"[^a-z0-9]+", "-", value.strip().casefold()).strip("-")
|
||||
|
||||
|
||||
def is_fatal_tool_error(error: MetadataResolutionError) -> bool:
|
||||
"""Return whether a tool error should stop the agent immediately."""
|
||||
message = str(error)
|
||||
return message.startswith(
|
||||
(
|
||||
"Unknown audiobook metadata tool",
|
||||
"Audiobook metadata tool is not enabled",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def query_terms(query: str) -> tuple[str, ...]:
|
||||
"""Return text variants useful for matching noisy audiobook metadata."""
|
||||
normalized = query.strip().casefold()
|
||||
underscore_slug = normalize_catalog_slug(normalized)
|
||||
hyphen_slug = normalize_title_slug(normalized)
|
||||
return tuple(dict.fromkeys(term for term in (normalized, underscore_slug, hyphen_slug) if term))
|
||||
|
||||
|
||||
def required_string(data: dict[str, object], key: str) -> str:
|
||||
"""Read a required string field."""
|
||||
value = data.get(key)
|
||||
if not isinstance(value, str) or not value.strip():
|
||||
msg = f"{key} must be a non-empty string"
|
||||
raise MetadataResolutionError(msg)
|
||||
return value.strip()
|
||||
|
||||
|
||||
def required_int(data: dict[str, object], key: str) -> int:
|
||||
"""Read a required integer field."""
|
||||
value = data.get(key)
|
||||
if isinstance(value, bool) or not isinstance(value, int):
|
||||
msg = f"{key} must be an integer"
|
||||
raise MetadataResolutionError(msg)
|
||||
return value
|
||||
|
||||
|
||||
def optional_int(value: object, key: str) -> int | None:
|
||||
"""Read an optional integer field."""
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, bool) or not isinstance(value, int):
|
||||
msg = f"{key} must be an integer or null"
|
||||
raise MetadataResolutionError(msg)
|
||||
return value
|
||||
@@ -0,0 +1,566 @@
|
||||
"""Resolve audiobook metadata with a controlled Ollama tool loop."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
from dataclasses import asdict, dataclass, is_dataclass, replace
|
||||
from os import PathLike
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import httpx
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from python.common import utcnow
|
||||
from python.tools.audiobook.llm_tool_calling import (
|
||||
CatalogToolRegistry,
|
||||
MetadataResolutionError,
|
||||
normalize_title_slug,
|
||||
optional_int,
|
||||
parse_tool_calls,
|
||||
required_int,
|
||||
required_string,
|
||||
run_tool_calls,
|
||||
validate_catalog_slug,
|
||||
validate_title_slug,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pathlib import Path
|
||||
|
||||
from sqlalchemy.engine import Engine
|
||||
|
||||
from python.orm.richie import AudiobookAuthor
|
||||
|
||||
FENCED_JSON_PATTERN = re.compile(r"^```(?:json)?\s*(?P<json>.*?)\s*```$", re.IGNORECASE | re.DOTALL)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class AgentConfig:
|
||||
"""Runtime settings for the audiobook metadata agent."""
|
||||
|
||||
model: str = "deepseek-v4-flash:cloud"
|
||||
ollama_chat_url: str = "https://ollama.com/api/chat"
|
||||
http_timeout_seconds: int = 300
|
||||
max_agent_turns: int = 8
|
||||
max_tool_results: int = 10
|
||||
min_confidence: float = 0.85
|
||||
invalid_final_retries: int = 1
|
||||
standalone_series: str = "standalone"
|
||||
tool_names: tuple[str, ...] = (
|
||||
"search_authors",
|
||||
"search_series",
|
||||
"search_books",
|
||||
"ensure_author",
|
||||
"ensure_series",
|
||||
"ensure_book",
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class StandardBookMetadata:
|
||||
"""Canonical metadata for the final audiobook path."""
|
||||
|
||||
author_id: int
|
||||
author: str
|
||||
book_id: int | None
|
||||
title: str
|
||||
series_id: int | None
|
||||
series: str
|
||||
series_index: int
|
||||
confidence: float
|
||||
needs_review: bool
|
||||
evidence: list[str]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class FinalMetadataFields:
|
||||
"""Raw model fields after schema validation."""
|
||||
|
||||
author_id: int
|
||||
book_id: int | None
|
||||
title: str
|
||||
series_id: int | None
|
||||
series_index: int
|
||||
confidence: float
|
||||
evidence: list[str]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ResolvedBookFields:
|
||||
"""Book fields after optional catalog book resolution."""
|
||||
|
||||
book_id: int | None
|
||||
title: str
|
||||
series_id: int | None
|
||||
series_index: int
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class AgentStepResult:
|
||||
"""Outcome from one model response."""
|
||||
|
||||
metadata: StandardBookMetadata | None
|
||||
invalid_final_count: int
|
||||
should_continue: bool
|
||||
|
||||
|
||||
def standard_book_metadata(
|
||||
aax_file_name: str,
|
||||
aax_metadata_from_ffprobe: dict[str, str],
|
||||
engine: Engine,
|
||||
log_path: Path,
|
||||
ollama_api_key: str,
|
||||
config: AgentConfig,
|
||||
) -> StandardBookMetadata:
|
||||
"""Resolve canonical audiobook metadata with the configured Ollama Cloud model."""
|
||||
with Session(engine) as session:
|
||||
registry = CatalogToolRegistry(session, log_path, config, write_agent_log)
|
||||
agent = AudiobookMetadataAgent(
|
||||
registry=registry, log_path=log_path, ollama_api_key=ollama_api_key, config=config
|
||||
)
|
||||
metadata = agent.run(aax_file_name, aax_metadata_from_ffprobe)
|
||||
if metadata.needs_review:
|
||||
session.rollback()
|
||||
else:
|
||||
registry.prune_unused_created_rows(
|
||||
author_id=metadata.author_id,
|
||||
book_id=metadata.book_id,
|
||||
series_id=metadata.series_id,
|
||||
)
|
||||
session.commit()
|
||||
return metadata
|
||||
|
||||
|
||||
class AudiobookMetadataAgent:
|
||||
"""Ollama-backed metadata resolver with a fixed local tool registry."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
registry: CatalogToolRegistry,
|
||||
log_path: Path,
|
||||
ollama_api_key: str,
|
||||
config: AgentConfig,
|
||||
) -> None:
|
||||
"""Create an Ollama metadata agent."""
|
||||
self._registry = registry
|
||||
self._log_path = log_path
|
||||
self._ollama_api_key = ollama_api_key
|
||||
self._config = config
|
||||
|
||||
def run(self, aax_file_name: str, aax_metadata_from_ffprobe: dict[str, str]) -> StandardBookMetadata:
|
||||
"""Resolve metadata for one AAX file."""
|
||||
messages = [
|
||||
{"role": "system", "content": system_prompt()},
|
||||
{"role": "user", "content": user_prompt(aax_file_name, aax_metadata_from_ffprobe)},
|
||||
]
|
||||
invalid_final_count = 0
|
||||
result: StandardBookMetadata | None = None
|
||||
|
||||
for turn in range(1, self._config.max_agent_turns + 1):
|
||||
step = self.run_step(messages, turn, invalid_final_count)
|
||||
invalid_final_count = step.invalid_final_count
|
||||
if step.should_continue:
|
||||
continue
|
||||
result = step.metadata
|
||||
break
|
||||
|
||||
if result is None:
|
||||
return self.force_final_response(messages)
|
||||
return result
|
||||
|
||||
def run_step(
|
||||
self,
|
||||
messages: list[dict[str, object]],
|
||||
turn: int,
|
||||
invalid_final_count: int,
|
||||
) -> AgentStepResult:
|
||||
"""Run one model turn and return the next agent-loop action."""
|
||||
data = self.chat(messages, turn)
|
||||
message = data.get("message")
|
||||
if not isinstance(message, dict):
|
||||
return AgentStepResult(
|
||||
metadata=review_metadata("Ollama response did not include a message", self._config),
|
||||
invalid_final_count=invalid_final_count,
|
||||
should_continue=False,
|
||||
)
|
||||
|
||||
try:
|
||||
tool_calls = parse_tool_calls(message)
|
||||
except (json.JSONDecodeError, MetadataResolutionError) as error:
|
||||
return AgentStepResult(
|
||||
metadata=review_metadata(str(error), self._config),
|
||||
invalid_final_count=invalid_final_count,
|
||||
should_continue=False,
|
||||
)
|
||||
if tool_calls:
|
||||
fatal_error = run_tool_calls(messages, message, tool_calls, self._registry, self._log_path, write_agent_log)
|
||||
if fatal_error is not None:
|
||||
return AgentStepResult(
|
||||
metadata=review_metadata(fatal_error, self._config),
|
||||
invalid_final_count=invalid_final_count,
|
||||
should_continue=False,
|
||||
)
|
||||
return AgentStepResult(metadata=None, invalid_final_count=invalid_final_count, should_continue=True)
|
||||
return self.handle_final_message(messages, message, invalid_final_count)
|
||||
|
||||
def handle_final_message(
|
||||
self,
|
||||
messages: list[dict[str, object]],
|
||||
message: dict[str, object],
|
||||
invalid_final_count: int,
|
||||
) -> AgentStepResult:
|
||||
"""Validate a final model message or request one retry."""
|
||||
content = message.get("content")
|
||||
if not isinstance(content, str):
|
||||
return AgentStepResult(
|
||||
metadata=review_metadata("Ollama final response did not include string content", self._config),
|
||||
invalid_final_count=invalid_final_count,
|
||||
should_continue=False,
|
||||
)
|
||||
|
||||
try:
|
||||
resolved = self.validate_final(parse_final_json_content(content))
|
||||
except (json.JSONDecodeError, MetadataResolutionError) as error:
|
||||
return self.handle_invalid_final(messages, error, invalid_final_count)
|
||||
|
||||
write_agent_log(self._log_path, "final_metadata", metadata=resolved)
|
||||
return AgentStepResult(metadata=resolved, invalid_final_count=invalid_final_count, should_continue=False)
|
||||
|
||||
def handle_invalid_final(
|
||||
self,
|
||||
messages: list[dict[str, object]],
|
||||
error: json.JSONDecodeError | MetadataResolutionError,
|
||||
invalid_final_count: int,
|
||||
) -> AgentStepResult:
|
||||
"""Log invalid final JSON and either retry or return review metadata."""
|
||||
invalid_final_count += 1
|
||||
write_agent_log(
|
||||
self._log_path,
|
||||
"final_validation_error",
|
||||
error=str(error),
|
||||
invalid_final_count=invalid_final_count,
|
||||
)
|
||||
if invalid_final_count > self._config.invalid_final_retries:
|
||||
return AgentStepResult(
|
||||
metadata=review_metadata(str(error), self._config),
|
||||
invalid_final_count=invalid_final_count,
|
||||
should_continue=False,
|
||||
)
|
||||
messages.append(
|
||||
{
|
||||
"role": "user",
|
||||
"content": (
|
||||
"Your previous final answer was invalid. Return only valid JSON matching the required "
|
||||
f"schema. Validation error: {error}"
|
||||
),
|
||||
},
|
||||
)
|
||||
return AgentStepResult(metadata=None, invalid_final_count=invalid_final_count, should_continue=True)
|
||||
|
||||
def force_final_response(self, messages: list[dict[str, object]]) -> StandardBookMetadata:
|
||||
"""Request a no-tool final answer after the normal turn limit."""
|
||||
messages.append({"role": "user", "content": forced_final_prompt()})
|
||||
write_agent_log(self._log_path, "forced_final_request", reason="max_turns")
|
||||
data = self.chat(messages, self._config.max_agent_turns + 1, tools_enabled=False)
|
||||
message = data.get("message")
|
||||
if not isinstance(message, dict):
|
||||
return review_metadata("Ollama forced final response did not include a message", self._config)
|
||||
content = message.get("content")
|
||||
if not isinstance(content, str):
|
||||
return review_metadata("Ollama forced final response did not include string content", self._config)
|
||||
try:
|
||||
resolved = self.validate_final(parse_final_json_content(content))
|
||||
except (json.JSONDecodeError, MetadataResolutionError) as error:
|
||||
return review_metadata(f"Ollama forced final response was invalid: {error}", self._config)
|
||||
write_agent_log(self._log_path, "final_metadata", metadata=resolved)
|
||||
return resolved
|
||||
|
||||
def chat(self, messages: list[dict[str, object]], turn: int, *, tools_enabled: bool = True) -> dict[str, object]:
|
||||
"""Send one chat request to Ollama and log the request and response."""
|
||||
payload = {
|
||||
"model": self._config.model,
|
||||
"messages": messages,
|
||||
"stream": False,
|
||||
"options": {"temperature": 0},
|
||||
}
|
||||
tool_names = []
|
||||
if tools_enabled:
|
||||
payload["tools"] = self._registry.tool_schemas()
|
||||
tool_names = self._config.tool_names
|
||||
write_agent_log(
|
||||
self._log_path,
|
||||
"model_request",
|
||||
model=self._config.model,
|
||||
turn=turn,
|
||||
message_count=len(messages),
|
||||
tool_names=tool_names,
|
||||
tools_enabled=tools_enabled,
|
||||
)
|
||||
write_agent_log(
|
||||
self._log_path,
|
||||
"llm_messages_sent",
|
||||
model=self._config.model,
|
||||
turn=turn,
|
||||
messages=messages,
|
||||
tools_enabled=tools_enabled,
|
||||
)
|
||||
response = httpx.post(
|
||||
self._config.ollama_chat_url,
|
||||
headers={"Authorization": f"Bearer {self._ollama_api_key}"},
|
||||
json=payload,
|
||||
timeout=self._config.http_timeout_seconds,
|
||||
)
|
||||
response.raise_for_status()
|
||||
raw_data = response.json()
|
||||
if not isinstance(raw_data, dict):
|
||||
return {}
|
||||
data = {str(key): value for key, value in raw_data.items()}
|
||||
message = data.get("message", {})
|
||||
content = message.get("content") if isinstance(message, dict) else ""
|
||||
write_agent_log(
|
||||
self._log_path,
|
||||
"llm_message_received",
|
||||
model=self._config.model,
|
||||
turn=turn,
|
||||
message=message,
|
||||
)
|
||||
write_agent_log(
|
||||
self._log_path,
|
||||
"model_response",
|
||||
model=self._config.model,
|
||||
turn=turn,
|
||||
has_tool_calls=bool(isinstance(message, dict) and message.get("tool_calls")),
|
||||
content_chars=len(content) if isinstance(content, str) else 0,
|
||||
)
|
||||
return data
|
||||
|
||||
def validate_final(self, raw_metadata: object) -> StandardBookMetadata:
|
||||
"""Validate final model metadata against catalog rows."""
|
||||
fields = parse_final_metadata_fields(raw_metadata)
|
||||
fields = replace(fields, title=normalize_title_slug(fields.title))
|
||||
author = self.validate_author(fields.author_id)
|
||||
validate_title_slug(fields.title)
|
||||
book_fields = self.resolve_book_fields(fields)
|
||||
series = self.validate_series(fields.author_id, book_fields.series_id, book_fields.series_index)
|
||||
|
||||
return StandardBookMetadata(
|
||||
author_id=fields.author_id,
|
||||
author=author.name,
|
||||
book_id=book_fields.book_id,
|
||||
title=book_fields.title,
|
||||
series_id=book_fields.series_id,
|
||||
series=series,
|
||||
series_index=book_fields.series_index,
|
||||
confidence=fields.confidence,
|
||||
needs_review=fields.confidence < self._config.min_confidence,
|
||||
evidence=fields.evidence,
|
||||
)
|
||||
|
||||
def validate_author(self, author_id: int) -> AudiobookAuthor:
|
||||
"""Validate that an author id was seen and exists."""
|
||||
if author_id not in self._registry.seen_author_ids:
|
||||
msg = f"author_id {author_id} was not returned by search_authors"
|
||||
raise MetadataResolutionError(msg)
|
||||
author = self._registry.get_author(author_id)
|
||||
if author is None:
|
||||
msg = f"author_id {author_id} does not exist"
|
||||
raise MetadataResolutionError(msg)
|
||||
validate_catalog_slug(author.name, "author")
|
||||
return author
|
||||
|
||||
def resolve_book_fields(self, fields: FinalMetadataFields) -> ResolvedBookFields:
|
||||
"""Resolve final book fields from a seen book id or created book."""
|
||||
if fields.book_id is None:
|
||||
ensured = self._registry.ensure_book(
|
||||
fields.title,
|
||||
fields.author_id,
|
||||
fields.series_id,
|
||||
fields.series_index,
|
||||
)
|
||||
return ResolvedBookFields(
|
||||
book_id=ensured.book.id,
|
||||
title=ensured.book.title,
|
||||
series_id=ensured.book.series_id,
|
||||
series_index=ensured.book.series_index,
|
||||
)
|
||||
|
||||
if fields.book_id not in self._registry.seen_book_ids:
|
||||
msg = f"book_id {fields.book_id} was not returned by search_books"
|
||||
raise MetadataResolutionError(msg)
|
||||
book = self._registry.get_book(fields.book_id)
|
||||
if book is None:
|
||||
msg = f"book_id {fields.book_id} does not exist"
|
||||
raise MetadataResolutionError(msg)
|
||||
if book.author_id != fields.author_id:
|
||||
msg = f"book_id {fields.book_id} does not belong to author_id {fields.author_id}"
|
||||
raise MetadataResolutionError(msg)
|
||||
return ResolvedBookFields(
|
||||
book_id=fields.book_id,
|
||||
title=book.title,
|
||||
series_id=book.series_id,
|
||||
series_index=book.series_index,
|
||||
)
|
||||
|
||||
def validate_series(self, author_id: int, series_id: int | None, series_index: int) -> str:
|
||||
"""Validate final series fields and return the canonical series slug."""
|
||||
if series_id is None:
|
||||
if series_index != 0:
|
||||
msg = "standalone books must use series_index 0"
|
||||
raise MetadataResolutionError(msg)
|
||||
return self._config.standalone_series
|
||||
|
||||
if series_id not in self._registry.seen_series_ids:
|
||||
msg = f"series_id {series_id} was not returned by search_series"
|
||||
raise MetadataResolutionError(msg)
|
||||
series = self._registry.get_series(series_id)
|
||||
if series is None:
|
||||
msg = f"series_id {series_id} does not exist"
|
||||
raise MetadataResolutionError(msg)
|
||||
if series.author_id != author_id:
|
||||
msg = f"series_id {series_id} does not belong to author_id {author_id}"
|
||||
raise MetadataResolutionError(msg)
|
||||
if series_index <= 0:
|
||||
msg = "series books must use a positive series_index"
|
||||
raise MetadataResolutionError(msg)
|
||||
validate_catalog_slug(series.name, "series")
|
||||
return series.name
|
||||
|
||||
|
||||
def write_agent_log(log_path: Path, event: str, **fields: object) -> None:
|
||||
"""Append one JSONL audit event."""
|
||||
log_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
record = {
|
||||
"created": utcnow().isoformat(),
|
||||
"event": event,
|
||||
**{key: json_log_value(value) for key, value in fields.items()},
|
||||
}
|
||||
with log_path.open("a", encoding="utf-8") as file:
|
||||
file.write(json.dumps(record, sort_keys=True))
|
||||
file.write("\n")
|
||||
|
||||
|
||||
def json_log_value(value: object) -> object:
|
||||
"""Return a JSON-serializable value for audit logs."""
|
||||
if is_dataclass(value) and not isinstance(value, type):
|
||||
return json_log_value(asdict(value))
|
||||
if isinstance(value, dict):
|
||||
return {str(key): json_log_value(item) for key, item in value.items()}
|
||||
if isinstance(value, list | tuple):
|
||||
return [json_log_value(item) for item in value]
|
||||
if isinstance(value, set):
|
||||
return [json_log_value(item) for item in sorted(value, key=str)]
|
||||
if isinstance(value, PathLike):
|
||||
return str(value)
|
||||
return value
|
||||
|
||||
|
||||
def system_prompt() -> str:
|
||||
"""Return the stable system prompt."""
|
||||
return """You standardize Audible audiobook metadata against a private catalog.
|
||||
|
||||
Rules:
|
||||
- You must use the provided tools before returning final metadata.
|
||||
- Only use author_id, series_id, or book_id values returned by tools.
|
||||
- Return final metadata as JSON only. Do not wrap it in Markdown.
|
||||
- The final JSON object must contain author_id, book_id, title, series_id, series_index, confidence, and evidence.
|
||||
- title must be a canonical title slug using lower-case words separated by hyphens.
|
||||
- Use series_id null and series_index 0 for standalone books.
|
||||
- If you use a series_id, series_index must be an integer greater than or equal to 1.
|
||||
- Do not create publisher collections or author collections as series unless the book metadata clearly gives a
|
||||
numbered series.
|
||||
- Series belong to authors. Use a series_id only when it belongs to the selected author_id.
|
||||
- Always search for the author before creating one. If no exact author slug exists, call ensure_author.
|
||||
- Always search for a series with author_id before creating one. If no exact series slug exists, call ensure_series.
|
||||
- Always search for a book before creating one. If no exact title slug exists, call ensure_book.
|
||||
- If a tool returns an error, correct your tool arguments or final metadata before continuing.
|
||||
- confidence must be a number from 0 to 1.
|
||||
- evidence must be a short list of strings explaining which filename, tags, and catalog rows support the answer."""
|
||||
|
||||
|
||||
def forced_final_prompt() -> str:
|
||||
"""Return the no-tools finalization prompt."""
|
||||
return (
|
||||
"Stop calling tools. Return final metadata as JSON only using the tool results already provided. "
|
||||
"If search_books returned no matching rows but author and series are known, use book_id null and resolve "
|
||||
"the title slug from the AAX filename and ffprobe tags. The validator will create the missing book. "
|
||||
"Use only author_id and series_id values returned by earlier tool results."
|
||||
)
|
||||
|
||||
|
||||
def user_prompt(aax_file_name: str, metadata: dict[str, str]) -> str:
|
||||
"""Build the user prompt from source metadata."""
|
||||
return (
|
||||
"Resolve this Audible audiobook.\n\n"
|
||||
f"AAX file name: {aax_file_name}\n\n"
|
||||
"ffprobe format tags:\n"
|
||||
f"{json.dumps(metadata, indent=2, sort_keys=True)}"
|
||||
)
|
||||
|
||||
|
||||
def parse_final_json_content(content: str) -> object:
|
||||
"""Parse final model content, accepting bare or fenced JSON."""
|
||||
stripped = content.strip()
|
||||
if match := FENCED_JSON_PATTERN.fullmatch(stripped):
|
||||
stripped = match.group("json").strip()
|
||||
return json.loads(stripped)
|
||||
|
||||
|
||||
def parse_final_metadata_fields(raw_metadata: object) -> FinalMetadataFields:
|
||||
"""Parse the model's final JSON object into typed fields."""
|
||||
if not isinstance(raw_metadata, dict):
|
||||
msg = "Final metadata must be a JSON object"
|
||||
raise MetadataResolutionError(msg)
|
||||
data = {str(key): value for key, value in raw_metadata.items()}
|
||||
return FinalMetadataFields(
|
||||
author_id=required_int(data, "author_id"),
|
||||
book_id=optional_int(data.get("book_id"), "book_id"),
|
||||
title=required_string(data, "title"),
|
||||
series_id=optional_int(data.get("series_id"), "series_id"),
|
||||
series_index=required_int(data, "series_index"),
|
||||
confidence=required_float(data, "confidence"),
|
||||
evidence=required_string_list(data, "evidence"),
|
||||
)
|
||||
|
||||
|
||||
def review_metadata(reason: str, config: AgentConfig) -> StandardBookMetadata:
|
||||
"""Return a metadata result that must be reviewed manually."""
|
||||
return StandardBookMetadata(
|
||||
author_id=0,
|
||||
author="unknown_author",
|
||||
book_id=None,
|
||||
title="unknown-title",
|
||||
series_id=None,
|
||||
series=config.standalone_series,
|
||||
series_index=0,
|
||||
confidence=0,
|
||||
needs_review=True,
|
||||
evidence=[reason],
|
||||
)
|
||||
|
||||
|
||||
def required_float(data: dict[str, object], key: str) -> float:
|
||||
"""Read a required float field."""
|
||||
value = data.get(key)
|
||||
if isinstance(value, bool) or not isinstance(value, int | float):
|
||||
msg = f"{key} must be a number"
|
||||
raise MetadataResolutionError(msg)
|
||||
confidence = float(value)
|
||||
if confidence < 0 or confidence > 1:
|
||||
msg = f"{key} must be between 0 and 1"
|
||||
raise MetadataResolutionError(msg)
|
||||
return confidence
|
||||
|
||||
|
||||
def required_string_list(data: dict[str, object], key: str) -> list[str]:
|
||||
"""Read a required list of strings."""
|
||||
value = data.get(key)
|
||||
if not isinstance(value, list) or not value or not all(isinstance(item, str) for item in value):
|
||||
msg = f"{key} must be a non-empty list of strings"
|
||||
raise MetadataResolutionError(msg)
|
||||
strings = [item.strip() for item in value if item.strip()]
|
||||
if not strings:
|
||||
msg = f"{key} must include at least one non-empty string"
|
||||
raise MetadataResolutionError(msg)
|
||||
return strings
|
||||
@@ -0,0 +1,969 @@
|
||||
"""test_audible_convert."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
|
||||
from python.orm.richie import Audiobook, AudiobookAuthor, AudiobookSeries, RichieBase
|
||||
from python.tools.audiobook import audible_convert, metadata_agent
|
||||
from python.tools.audiobook.metadata_agent import StandardBookMetadata, standard_book_metadata
|
||||
|
||||
|
||||
class FakeOllamaResponse:
|
||||
def __init__(self, payload):
|
||||
self._payload = payload
|
||||
|
||||
def raise_for_status(self):
|
||||
return None
|
||||
|
||||
def json(self):
|
||||
return self._payload
|
||||
|
||||
|
||||
class FakeFfprobeError(RuntimeError):
|
||||
def __str__(self):
|
||||
return "bad ffprobe"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def audiobook_engine():
|
||||
engine = create_engine("sqlite+pysqlite:///:memory:", future=True)
|
||||
RichieBase.metadata.create_all(engine)
|
||||
with sessionmaker(bind=engine, expire_on_commit=False, future=True)() as session:
|
||||
session.add_all(
|
||||
[
|
||||
AudiobookAuthor(id=1, name="glynn_stewart"),
|
||||
AudiobookAuthor(id=2, name="craig_alanson"),
|
||||
AudiobookAuthor(id=4, name="dennis_e_taylor"),
|
||||
AudiobookSeries(id=1, name="starships_mage", author_id=1),
|
||||
AudiobookSeries(id=2, name="black_fleet_trilogy", author_id=1),
|
||||
AudiobookSeries(id=3, name="expeditionary_force", author_id=2),
|
||||
AudiobookSeries(id=4, name="bobiverse", author_id=4),
|
||||
],
|
||||
)
|
||||
session.commit()
|
||||
yield engine
|
||||
engine.dispose()
|
||||
|
||||
|
||||
def install_fake_ollama(monkeypatch, payloads):
|
||||
calls = []
|
||||
|
||||
def fake_post(*args, **kwargs):
|
||||
calls.append((args, kwargs))
|
||||
return FakeOllamaResponse(payloads.pop(0))
|
||||
|
||||
monkeypatch.setattr(metadata_agent.httpx, "post", fake_post)
|
||||
return calls
|
||||
|
||||
|
||||
def conversion_config(output_directory, *, dry_run=False, overwrite=False):
|
||||
return audible_convert.ConversionConfig(
|
||||
resolved_output=output_directory,
|
||||
ollama_api_key="test-key",
|
||||
agent_config=metadata_agent.AgentConfig(),
|
||||
engine=create_engine("sqlite+pysqlite:///:memory:"),
|
||||
activation_bytes=None,
|
||||
dry_run=dry_run,
|
||||
overwrite=overwrite,
|
||||
)
|
||||
|
||||
|
||||
def sqlite_engine():
|
||||
return create_engine("sqlite+pysqlite:///:memory:")
|
||||
|
||||
|
||||
def tool_response(name, arguments):
|
||||
return {
|
||||
"message": {
|
||||
"role": "assistant",
|
||||
"content": "",
|
||||
"tool_calls": [{"function": {"name": name, "arguments": arguments}}],
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def final_response(metadata):
|
||||
return {"message": {"role": "assistant", "content": json.dumps(metadata)}}
|
||||
|
||||
|
||||
def fenced_final_response(metadata):
|
||||
return {"message": {"role": "assistant", "content": f"```json\n{json.dumps(metadata)}\n```"}}
|
||||
|
||||
|
||||
def test_output_stem_uses_catalog_slugs() -> None:
|
||||
metadata = StandardBookMetadata(
|
||||
author_id=1,
|
||||
author="glynn_stewart",
|
||||
book_id=None,
|
||||
title="title-slug",
|
||||
series_id=1,
|
||||
series="starships_mage",
|
||||
series_index=1,
|
||||
confidence=0.96,
|
||||
needs_review=False,
|
||||
evidence=["test"],
|
||||
)
|
||||
|
||||
assert audible_convert.output_stem(metadata) == "glynn_stewart-starships_mage_01-title-slug"
|
||||
|
||||
|
||||
def test_convert_aax_file_runs_ffmpeg(tmp_path, monkeypatch) -> None:
|
||||
"""test_convert_aax_file_runs_ffmpeg."""
|
||||
commands = []
|
||||
|
||||
def fake_run_command(arguments, *, capture=False):
|
||||
assert capture is False
|
||||
commands.append(arguments)
|
||||
return subprocess.CompletedProcess(arguments, 0, "", "")
|
||||
|
||||
source = tmp_path / "book.aax"
|
||||
destination = tmp_path / "book" / "book.m4b"
|
||||
monkeypatch.setattr(audible_convert, "run_command", fake_run_command)
|
||||
|
||||
audible_convert.convert_aax_file(source, destination, "abc123", overwrite=False)
|
||||
|
||||
assert commands == [
|
||||
[
|
||||
"ffmpeg",
|
||||
"-hide_banner",
|
||||
"-n",
|
||||
"-activation_bytes",
|
||||
"abc123",
|
||||
"-i",
|
||||
str(source),
|
||||
"-map_metadata",
|
||||
"0",
|
||||
"-c",
|
||||
"copy",
|
||||
str(destination),
|
||||
],
|
||||
]
|
||||
assert destination.parent.is_dir()
|
||||
|
||||
|
||||
def test_run_command_redacts_activation_bytes_in_logs_and_errors(monkeypatch, caplog) -> None:
|
||||
def fake_run(arguments, *, check, capture_output, text):
|
||||
assert check is True
|
||||
assert capture_output is False
|
||||
assert text is True
|
||||
raise subprocess.CalledProcessError(1, arguments)
|
||||
|
||||
monkeypatch.setattr(audible_convert.subprocess, "run", fake_run)
|
||||
caplog.set_level("DEBUG", audible_convert.__name__)
|
||||
|
||||
with pytest.raises(audible_convert.CommandExecutionError) as error:
|
||||
audible_convert.run_command(["ffmpeg", "-activation_bytes", "secret-token", "-i", "book.aax"])
|
||||
|
||||
assert "secret-token" not in caplog.text
|
||||
assert "secret-token" not in str(error.value)
|
||||
assert "<redacted>" in caplog.text
|
||||
assert "<redacted>" in str(error.value)
|
||||
|
||||
|
||||
def test_write_agent_log_serializes_metadata_as_json_object(tmp_path) -> None:
|
||||
metadata = StandardBookMetadata(
|
||||
author_id=1,
|
||||
author="glynn_stewart",
|
||||
book_id=None,
|
||||
title="starship-mage",
|
||||
series_id=1,
|
||||
series="starships_mage",
|
||||
series_index=1,
|
||||
confidence=0.95,
|
||||
needs_review=False,
|
||||
evidence=["test"],
|
||||
)
|
||||
log_file = tmp_path / "agent.jsonl"
|
||||
|
||||
metadata_agent.write_agent_log(log_file, "final_metadata", metadata=metadata, path=tmp_path)
|
||||
|
||||
record = json.loads(log_file.read_text(encoding="utf-8"))
|
||||
assert record["event"] == "final_metadata"
|
||||
assert record["metadata"]["author"] == "glynn_stewart"
|
||||
assert record["metadata"]["title"] == "starship-mage"
|
||||
assert record["path"] == str(tmp_path)
|
||||
|
||||
|
||||
def test_standard_book_metadata_accepts_valid_tool_output(tmp_path, monkeypatch, audiobook_engine) -> None:
|
||||
install_fake_ollama(
|
||||
monkeypatch,
|
||||
[
|
||||
tool_response("search_authors", {"query": "Glynn Stewart"}),
|
||||
tool_response("search_series", {"query": "starships_mage"}),
|
||||
final_response(
|
||||
{
|
||||
"author_id": 1,
|
||||
"book_id": None,
|
||||
"title": "starship-mage",
|
||||
"series_id": 1,
|
||||
"series_index": 1,
|
||||
"confidence": 0.95,
|
||||
"evidence": ["filename and catalog match"],
|
||||
},
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
metadata = standard_book_metadata(
|
||||
"Starship Mage.aax",
|
||||
{"title": "Starship Mage", "artist": "Glynn Stewart"},
|
||||
audiobook_engine,
|
||||
tmp_path / "agent.jsonl",
|
||||
"test-key",
|
||||
config=metadata_agent.AgentConfig(),
|
||||
)
|
||||
|
||||
assert metadata == StandardBookMetadata(
|
||||
author_id=1,
|
||||
author="glynn_stewart",
|
||||
book_id=1,
|
||||
title="starship-mage",
|
||||
series_id=1,
|
||||
series="starships_mage",
|
||||
series_index=1,
|
||||
confidence=0.95,
|
||||
needs_review=False,
|
||||
evidence=["filename and catalog match"],
|
||||
)
|
||||
records = [
|
||||
json.loads(line)
|
||||
for line in (tmp_path / "agent.jsonl").read_text(encoding="utf-8").splitlines()
|
||||
]
|
||||
sent = [record for record in records if record["event"] == "llm_messages_sent"]
|
||||
received = [record for record in records if record["event"] == "llm_message_received"]
|
||||
assert sent[0]["messages"][0]["role"] == "system"
|
||||
assert "Starship Mage" in sent[0]["messages"][1]["content"]
|
||||
assert received[0]["message"]["tool_calls"][0]["function"]["name"] == "search_authors"
|
||||
with Session(audiobook_engine) as session:
|
||||
book = session.get(Audiobook, 1)
|
||||
assert book.title == "starship-mage"
|
||||
assert book.author.name == "glynn_stewart"
|
||||
|
||||
|
||||
def test_standard_book_metadata_uses_agent_config(tmp_path, monkeypatch, audiobook_engine) -> None:
|
||||
config = metadata_agent.AgentConfig(
|
||||
model="custom-model",
|
||||
ollama_chat_url="https://ollama.example.test/api/chat",
|
||||
http_timeout_seconds=12,
|
||||
max_agent_turns=1,
|
||||
min_confidence=0.5,
|
||||
tool_names=("search_authors",),
|
||||
)
|
||||
calls = install_fake_ollama(
|
||||
monkeypatch,
|
||||
[
|
||||
tool_response("search_authors", {"query": "Glynn Stewart"}),
|
||||
final_response(
|
||||
{
|
||||
"author_id": 1,
|
||||
"book_id": None,
|
||||
"title": "standalone-book",
|
||||
"series_id": None,
|
||||
"series_index": 0,
|
||||
"confidence": 0.5,
|
||||
"evidence": ["custom config"],
|
||||
},
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
metadata = standard_book_metadata(
|
||||
"Standalone Book.aax",
|
||||
{"title": "Standalone Book", "artist": "Glynn Stewart"},
|
||||
audiobook_engine,
|
||||
tmp_path / "agent.jsonl",
|
||||
"test-key",
|
||||
config=config,
|
||||
)
|
||||
|
||||
first_request_url = calls[0][0][0]
|
||||
first_request_options = calls[0][1]
|
||||
tool_names = [
|
||||
tool_schema["function"]["name"]
|
||||
for tool_schema in first_request_options["json"]["tools"]
|
||||
]
|
||||
assert first_request_url == "https://ollama.example.test/api/chat"
|
||||
assert first_request_options["timeout"] == 12
|
||||
assert first_request_options["json"]["model"] == "custom-model"
|
||||
assert tool_names == ["search_authors"]
|
||||
assert metadata.needs_review is False
|
||||
assert metadata.series == "standalone"
|
||||
|
||||
|
||||
def test_standard_book_metadata_retries_invalid_json_then_needs_review(
|
||||
tmp_path,
|
||||
monkeypatch,
|
||||
audiobook_engine,
|
||||
) -> None:
|
||||
install_fake_ollama(
|
||||
monkeypatch,
|
||||
[
|
||||
tool_response("search_authors", {"query": "Glynn Stewart"}),
|
||||
tool_response("search_series", {"query": "Starship Mage"}),
|
||||
{"message": {"role": "assistant", "content": "{"}},
|
||||
{"message": {"role": "assistant", "content": "{"}},
|
||||
],
|
||||
)
|
||||
|
||||
metadata = standard_book_metadata(
|
||||
"Starship Mage.aax",
|
||||
{"title": "Starship Mage"},
|
||||
audiobook_engine,
|
||||
tmp_path / "agent.jsonl",
|
||||
"test-key",
|
||||
config=metadata_agent.AgentConfig(),
|
||||
)
|
||||
|
||||
assert metadata.needs_review is True
|
||||
assert metadata.confidence == 0
|
||||
|
||||
|
||||
def test_standard_book_metadata_accepts_fenced_final_json(
|
||||
tmp_path,
|
||||
monkeypatch,
|
||||
audiobook_engine,
|
||||
) -> None:
|
||||
install_fake_ollama(
|
||||
monkeypatch,
|
||||
[
|
||||
tool_response("search_authors", {"query": "Dennis E. Taylor"}),
|
||||
tool_response("search_series", {"query": "Bobiverse", "author_id": 4}),
|
||||
tool_response("search_books", {"query": "All These Worlds", "author_id": 4, "series_id": 4}),
|
||||
fenced_final_response(
|
||||
{
|
||||
"author_id": 4,
|
||||
"book_id": None,
|
||||
"title": "all-these-worlds",
|
||||
"series_id": 4,
|
||||
"series_index": 3,
|
||||
"confidence": 0.95,
|
||||
"evidence": ["fenced json from model"],
|
||||
},
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
metadata = standard_book_metadata(
|
||||
"All These Worlds.aax",
|
||||
{"title": "All These Worlds: Bobiverse, Book 3", "artist": "Dennis E. Taylor"},
|
||||
audiobook_engine,
|
||||
tmp_path / "agent.jsonl",
|
||||
"test-key",
|
||||
config=metadata_agent.AgentConfig(),
|
||||
)
|
||||
|
||||
assert metadata.needs_review is False
|
||||
assert metadata.author == "dennis_e_taylor"
|
||||
assert metadata.series == "bobiverse"
|
||||
assert metadata.title == "all-these-worlds"
|
||||
|
||||
|
||||
def test_standard_book_metadata_recovers_from_tool_validation_error(
|
||||
tmp_path,
|
||||
monkeypatch,
|
||||
audiobook_engine,
|
||||
) -> None:
|
||||
install_fake_ollama(
|
||||
monkeypatch,
|
||||
[
|
||||
tool_response("search_authors", {"query": "Cormac McCarthy"}),
|
||||
tool_response("ensure_author", {"name": "Cormac McCarthy"}),
|
||||
tool_response("ensure_series", {"name": "The Cormac McCarthy Collection", "author_id": 5}),
|
||||
tool_response(
|
||||
"ensure_book",
|
||||
{
|
||||
"title": "The Road",
|
||||
"author_id": 5,
|
||||
"series_id": 5,
|
||||
"series_index": 0,
|
||||
},
|
||||
),
|
||||
final_response(
|
||||
{
|
||||
"author_id": 5,
|
||||
"book_id": None,
|
||||
"title": "The Road",
|
||||
"series_id": None,
|
||||
"series_index": 0,
|
||||
"confidence": 0.9,
|
||||
"evidence": ["tool error showed this should be standalone"],
|
||||
},
|
||||
),
|
||||
],
|
||||
)
|
||||
log_file = tmp_path / "agent.jsonl"
|
||||
|
||||
metadata = standard_book_metadata(
|
||||
"The Road.aax",
|
||||
{"title": "The Road", "artist": "Cormac McCarthy"},
|
||||
audiobook_engine,
|
||||
log_file,
|
||||
"test-key",
|
||||
config=metadata_agent.AgentConfig(),
|
||||
)
|
||||
|
||||
assert metadata == StandardBookMetadata(
|
||||
author_id=5,
|
||||
author="cormac_mccarthy",
|
||||
book_id=1,
|
||||
title="the-road",
|
||||
series_id=None,
|
||||
series="standalone",
|
||||
series_index=0,
|
||||
confidence=0.9,
|
||||
needs_review=False,
|
||||
evidence=["tool error showed this should be standalone"],
|
||||
)
|
||||
assert "series books must use a positive series_index" in log_file.read_text(encoding="utf-8")
|
||||
with Session(audiobook_engine) as session:
|
||||
assert session.get(AudiobookSeries, 5) is None
|
||||
book = session.get(Audiobook, 1)
|
||||
assert book.title == "the-road"
|
||||
assert book.series_id is None
|
||||
|
||||
|
||||
def test_standard_book_metadata_rejects_unknown_tool(tmp_path, monkeypatch, audiobook_engine) -> None:
|
||||
log_file = tmp_path / "agent.jsonl"
|
||||
install_fake_ollama(monkeypatch, [tool_response("drop_table", {})])
|
||||
|
||||
metadata = standard_book_metadata(
|
||||
"Book.aax",
|
||||
{"title": "Book"},
|
||||
audiobook_engine,
|
||||
log_file,
|
||||
"test-key",
|
||||
config=metadata_agent.AgentConfig(),
|
||||
)
|
||||
|
||||
assert metadata.needs_review is True
|
||||
assert "Unknown audiobook metadata tool" in metadata.evidence[0]
|
||||
assert "tool_error" in log_file.read_text(encoding="utf-8")
|
||||
|
||||
|
||||
def test_standard_book_metadata_rejects_ids_not_returned_by_tools(
|
||||
tmp_path,
|
||||
monkeypatch,
|
||||
audiobook_engine,
|
||||
) -> None:
|
||||
install_fake_ollama(
|
||||
monkeypatch,
|
||||
[
|
||||
tool_response("search_authors", {"query": "Glynn Stewart"}),
|
||||
tool_response("search_series", {"query": "Starship Mage"}),
|
||||
final_response(
|
||||
{
|
||||
"author_id": 2,
|
||||
"book_id": None,
|
||||
"title": "expeditionary-force",
|
||||
"series_id": 1,
|
||||
"series_index": 1,
|
||||
"confidence": 0.99,
|
||||
"evidence": ["bad id"],
|
||||
},
|
||||
),
|
||||
final_response(
|
||||
{
|
||||
"author_id": 2,
|
||||
"book_id": None,
|
||||
"title": "expeditionary-force",
|
||||
"series_id": 1,
|
||||
"series_index": 1,
|
||||
"confidence": 0.99,
|
||||
"evidence": ["bad id"],
|
||||
},
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
metadata = standard_book_metadata(
|
||||
"Book.aax",
|
||||
{"title": "Book"},
|
||||
audiobook_engine,
|
||||
tmp_path / "agent.jsonl",
|
||||
"test-key",
|
||||
config=metadata_agent.AgentConfig(),
|
||||
)
|
||||
|
||||
assert metadata.needs_review is True
|
||||
assert "author_id 2 was not returned" in metadata.evidence[0]
|
||||
|
||||
|
||||
def test_standard_book_metadata_rejects_series_for_wrong_author(
|
||||
tmp_path,
|
||||
monkeypatch,
|
||||
audiobook_engine,
|
||||
) -> None:
|
||||
install_fake_ollama(
|
||||
monkeypatch,
|
||||
[
|
||||
tool_response("search_authors", {"query": "Glynn Stewart"}),
|
||||
tool_response("search_series", {"query": "expeditionary_force"}),
|
||||
final_response(
|
||||
{
|
||||
"author_id": 1,
|
||||
"book_id": None,
|
||||
"title": "expeditionary-force",
|
||||
"series_id": 3,
|
||||
"series_index": 1,
|
||||
"confidence": 0.99,
|
||||
"evidence": ["wrong author"],
|
||||
},
|
||||
),
|
||||
final_response(
|
||||
{
|
||||
"author_id": 1,
|
||||
"book_id": None,
|
||||
"title": "expeditionary-force",
|
||||
"series_id": 3,
|
||||
"series_index": 1,
|
||||
"confidence": 0.99,
|
||||
"evidence": ["wrong author"],
|
||||
},
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
metadata = standard_book_metadata(
|
||||
"Book.aax",
|
||||
{"title": "Book"},
|
||||
audiobook_engine,
|
||||
tmp_path / "agent.jsonl",
|
||||
"test-key",
|
||||
config=metadata_agent.AgentConfig(),
|
||||
)
|
||||
|
||||
assert metadata.needs_review is True
|
||||
assert "series_id 3 does not belong to author_id 1" in metadata.evidence[0]
|
||||
|
||||
|
||||
def test_standard_book_metadata_forces_final_after_empty_book_searches(
|
||||
tmp_path,
|
||||
monkeypatch,
|
||||
audiobook_engine,
|
||||
) -> None:
|
||||
config = metadata_agent.AgentConfig(max_agent_turns=5)
|
||||
install_fake_ollama(
|
||||
monkeypatch,
|
||||
[
|
||||
tool_response("search_authors", {"query": "Dennis E. Taylor"}),
|
||||
tool_response("search_series", {"query": "Bobiverse", "author_id": 4}),
|
||||
tool_response("search_books", {"query": "We Are Legion We Are Bob", "author_id": 4, "series_id": 4}),
|
||||
tool_response("search_books", {"query": "we are legion", "author_id": 4}),
|
||||
tool_response("search_books", {"query": "We Are Legion"}),
|
||||
final_response(
|
||||
{
|
||||
"author_id": 4,
|
||||
"book_id": None,
|
||||
"title": "we-are-legion-we-are-bob",
|
||||
"series_id": 4,
|
||||
"series_index": 1,
|
||||
"confidence": 0.95,
|
||||
"evidence": ["author and series tool results; title from ffprobe tags"],
|
||||
},
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
metadata = standard_book_metadata(
|
||||
"We_Are_Legion_(We_Are_Bob)_Bobiverse_Book_1-LC_128_44100_stereo.aax",
|
||||
{
|
||||
"album": "We Are Legion (We Are Bob): Bobiverse, Book 1",
|
||||
"artist": "Dennis E. Taylor",
|
||||
"title": "We Are Legion (We Are Bob): Bobiverse, Book 1",
|
||||
},
|
||||
audiobook_engine,
|
||||
tmp_path / "agent.jsonl",
|
||||
"test-key",
|
||||
config=config,
|
||||
)
|
||||
|
||||
assert metadata == StandardBookMetadata(
|
||||
author_id=4,
|
||||
author="dennis_e_taylor",
|
||||
book_id=1,
|
||||
title="we-are-legion-we-are-bob",
|
||||
series_id=4,
|
||||
series="bobiverse",
|
||||
series_index=1,
|
||||
confidence=0.95,
|
||||
needs_review=False,
|
||||
evidence=["author and series tool results; title from ffprobe tags"],
|
||||
)
|
||||
assert '"tools_enabled": false' in (tmp_path / "agent.jsonl").read_text(encoding="utf-8")
|
||||
|
||||
|
||||
def test_standard_book_metadata_can_create_missing_catalog_rows(
|
||||
tmp_path,
|
||||
monkeypatch,
|
||||
audiobook_engine,
|
||||
) -> None:
|
||||
install_fake_ollama(
|
||||
monkeypatch,
|
||||
[
|
||||
tool_response("search_authors", {"query": "Martha Wells"}),
|
||||
tool_response("ensure_author", {"name": "martha_wells"}),
|
||||
tool_response("search_series", {"query": "Murderbot Diaries", "author_id": 5}),
|
||||
tool_response("ensure_series", {"name": "murderbot_diaries", "author_id": 5}),
|
||||
tool_response("search_books", {"query": "All Systems Red", "author_id": 5, "series_id": 5}),
|
||||
final_response(
|
||||
{
|
||||
"author_id": 5,
|
||||
"book_id": None,
|
||||
"title": "all-systems-red",
|
||||
"series_id": 5,
|
||||
"series_index": 1,
|
||||
"confidence": 0.96,
|
||||
"evidence": ["created missing author and series; title from tags"],
|
||||
},
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
metadata = standard_book_metadata(
|
||||
"All Systems Red.aax",
|
||||
{"title": "All Systems Red", "artist": "Martha Wells"},
|
||||
audiobook_engine,
|
||||
tmp_path / "agent.jsonl",
|
||||
"test-key",
|
||||
config=metadata_agent.AgentConfig(),
|
||||
)
|
||||
|
||||
assert metadata == StandardBookMetadata(
|
||||
author_id=5,
|
||||
author="martha_wells",
|
||||
book_id=1,
|
||||
title="all-systems-red",
|
||||
series_id=5,
|
||||
series="murderbot_diaries",
|
||||
series_index=1,
|
||||
confidence=0.96,
|
||||
needs_review=False,
|
||||
evidence=["created missing author and series; title from tags"],
|
||||
)
|
||||
with Session(audiobook_engine) as session:
|
||||
author = session.get(AudiobookAuthor, 5)
|
||||
series = session.get(AudiobookSeries, 5)
|
||||
book = session.get(Audiobook, 1)
|
||||
assert author.name == "martha_wells"
|
||||
assert series.name == "murderbot_diaries"
|
||||
assert series.author_id == author.id
|
||||
assert book.title == "all-systems-red"
|
||||
assert book.author_id == author.id
|
||||
assert book.series_id == series.id
|
||||
|
||||
|
||||
def test_standard_book_metadata_normalizes_noisy_created_catalog_rows(
|
||||
tmp_path,
|
||||
monkeypatch,
|
||||
audiobook_engine,
|
||||
) -> None:
|
||||
install_fake_ollama(
|
||||
monkeypatch,
|
||||
[
|
||||
tool_response("search_authors", {"query": "Charles Lamb"}),
|
||||
tool_response("ensure_author", {"name": "charles-lamb"}),
|
||||
tool_response("search_series", {"query": "AL:ICE Series", "author_id": 5}),
|
||||
tool_response("ensure_series", {"name": "AL:ICE Series", "author_id": 5}),
|
||||
tool_response("search_books", {"query": "AL:ICE Space War", "author_id": 5, "series_id": 5}),
|
||||
final_response(
|
||||
{
|
||||
"author_id": 5,
|
||||
"book_id": None,
|
||||
"title": "AL:ICE Space War",
|
||||
"series_id": 5,
|
||||
"series_index": 4,
|
||||
"confidence": 0.95,
|
||||
"evidence": ["created normalized author and series; title from tags"],
|
||||
},
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
metadata = standard_book_metadata(
|
||||
"ALICE_Space_War_ALICE_Series_Book_4-LC_64_22050_stereo.aax",
|
||||
{
|
||||
"album": "AL:ICE Space War: AL:ICE Series, Book 4",
|
||||
"artist": "Charles Lamb",
|
||||
"title": "AL:ICE Space War: AL:ICE Series, Book 4",
|
||||
},
|
||||
audiobook_engine,
|
||||
tmp_path / "agent.jsonl",
|
||||
"test-key",
|
||||
config=metadata_agent.AgentConfig(),
|
||||
)
|
||||
|
||||
assert metadata == StandardBookMetadata(
|
||||
author_id=5,
|
||||
author="charles_lamb",
|
||||
book_id=1,
|
||||
title="al-ice-space-war",
|
||||
series_id=5,
|
||||
series="al_ice_series",
|
||||
series_index=4,
|
||||
confidence=0.95,
|
||||
needs_review=False,
|
||||
evidence=["created normalized author and series; title from tags"],
|
||||
)
|
||||
with Session(audiobook_engine) as session:
|
||||
author = session.get(AudiobookAuthor, 5)
|
||||
series = session.get(AudiobookSeries, 5)
|
||||
book = session.get(Audiobook, 1)
|
||||
assert author.name == "charles_lamb"
|
||||
assert series.name == "al_ice_series"
|
||||
assert series.author_id == author.id
|
||||
assert book.title == "al-ice-space-war"
|
||||
assert book.author_id == author.id
|
||||
assert book.series_id == series.id
|
||||
|
||||
|
||||
def test_convert_aax_file_with_agent_success_renames_temp_output(tmp_path, monkeypatch) -> None:
|
||||
source = tmp_path / "book.aax"
|
||||
output_directory = tmp_path / "audiobooks"
|
||||
source.touch()
|
||||
monkeypatch.setattr(audible_convert, "read_metadata", lambda _: {"title": "Starship Mage"})
|
||||
monkeypatch.setattr(
|
||||
audible_convert,
|
||||
"standard_book_metadata",
|
||||
lambda *_, **__: StandardBookMetadata(
|
||||
author_id=1,
|
||||
author="glynn_stewart",
|
||||
book_id=None,
|
||||
title="starship-mage",
|
||||
series_id=1,
|
||||
series="starships_mage",
|
||||
series_index=1,
|
||||
confidence=0.95,
|
||||
needs_review=False,
|
||||
evidence=["test"],
|
||||
),
|
||||
)
|
||||
|
||||
def fake_convert(_source, destination, _activation_bytes, *, overwrite):
|
||||
assert overwrite is True
|
||||
destination.parent.mkdir(parents=True, exist_ok=True)
|
||||
destination.write_text("converted", encoding="utf-8")
|
||||
|
||||
monkeypatch.setattr(audible_convert, "convert_aax_file", fake_convert)
|
||||
|
||||
audible_convert.convert_aax_file_with_agent(
|
||||
source,
|
||||
conversion_config(output_directory),
|
||||
)
|
||||
|
||||
expected = output_directory / "glynn_stewart-starships_mage_01-starship-mage"
|
||||
destination = expected / "glynn_stewart-starships_mage_01-starship-mage.m4b"
|
||||
assert destination.read_text(encoding="utf-8") == "converted"
|
||||
assert not list((output_directory / ".audible_convert" / "tmp").glob("*/converted.m4b"))
|
||||
|
||||
|
||||
def test_ffprobe_failure_writes_review_without_converting(tmp_path, monkeypatch) -> None:
|
||||
source = tmp_path / "book.aax"
|
||||
output_directory = tmp_path / "audiobooks"
|
||||
source.touch()
|
||||
calls = []
|
||||
|
||||
def fake_read_metadata(_source):
|
||||
raise FakeFfprobeError
|
||||
|
||||
def fake_convert(*args, **kwargs):
|
||||
calls.append((args, kwargs))
|
||||
|
||||
monkeypatch.setattr(audible_convert, "read_metadata", fake_read_metadata)
|
||||
monkeypatch.setattr(audible_convert, "convert_aax_file", fake_convert)
|
||||
|
||||
audible_convert.convert_aax_file_with_agent(source, conversion_config(output_directory))
|
||||
|
||||
review_files = list((output_directory / ".audible_convert" / "review").glob("*.json"))
|
||||
assert calls == []
|
||||
assert len(review_files) == 1
|
||||
review = json.loads(review_files[0].read_text(encoding="utf-8"))
|
||||
assert review["ffprobe_metadata"] == {}
|
||||
assert review["reason"] == "ffprobe_failed: bad ffprobe"
|
||||
assert review["temp_file"] is None
|
||||
|
||||
|
||||
def test_low_confidence_metadata_keeps_temp_output_for_review(tmp_path, monkeypatch) -> None:
|
||||
source = tmp_path / "book.aax"
|
||||
output_directory = tmp_path / "audiobooks"
|
||||
source.touch()
|
||||
monkeypatch.setattr(audible_convert, "read_metadata", lambda _: {"title": "Unknown"})
|
||||
monkeypatch.setattr(
|
||||
audible_convert,
|
||||
"standard_book_metadata",
|
||||
lambda *_, **__: StandardBookMetadata(
|
||||
author_id=0,
|
||||
author="unknown_author",
|
||||
book_id=None,
|
||||
title="unknown-title",
|
||||
series_id=None,
|
||||
series="standalone",
|
||||
series_index=0,
|
||||
confidence=0.25,
|
||||
needs_review=True,
|
||||
evidence=["unclear"],
|
||||
),
|
||||
)
|
||||
|
||||
def fake_convert(_source, destination, _activation_bytes, *, overwrite):
|
||||
assert overwrite is True
|
||||
destination.parent.mkdir(parents=True, exist_ok=True)
|
||||
destination.write_text("converted", encoding="utf-8")
|
||||
|
||||
monkeypatch.setattr(audible_convert, "convert_aax_file", fake_convert)
|
||||
|
||||
audible_convert.convert_aax_file_with_agent(
|
||||
source,
|
||||
conversion_config(output_directory),
|
||||
)
|
||||
|
||||
temp_files = list((output_directory / ".audible_convert" / "tmp").glob("*/converted.m4b"))
|
||||
review_files = list((output_directory / ".audible_convert" / "review").glob("*.json"))
|
||||
assert len(temp_files) == 1
|
||||
assert temp_files[0].read_text(encoding="utf-8") == "converted"
|
||||
assert len(review_files) == 1
|
||||
|
||||
|
||||
def test_existing_destination_skips_rename_and_removes_temp(tmp_path, monkeypatch) -> None:
|
||||
source = tmp_path / "book.aax"
|
||||
output_directory = tmp_path / "audiobooks"
|
||||
source.touch()
|
||||
final_file = (
|
||||
output_directory
|
||||
/ "glynn_stewart-starships_mage_01-starship-mage"
|
||||
/ "glynn_stewart-starships_mage_01-starship-mage.m4b"
|
||||
)
|
||||
final_file.parent.mkdir(parents=True)
|
||||
final_file.write_text("existing", encoding="utf-8")
|
||||
monkeypatch.setattr(audible_convert, "read_metadata", lambda _: {"title": "Starship Mage"})
|
||||
monkeypatch.setattr(
|
||||
audible_convert,
|
||||
"standard_book_metadata",
|
||||
lambda *_, **__: StandardBookMetadata(
|
||||
author_id=1,
|
||||
author="glynn_stewart",
|
||||
book_id=None,
|
||||
title="starship-mage",
|
||||
series_id=1,
|
||||
series="starships_mage",
|
||||
series_index=1,
|
||||
confidence=0.95,
|
||||
needs_review=False,
|
||||
evidence=["test"],
|
||||
),
|
||||
)
|
||||
|
||||
def fake_convert(_source, destination, _activation_bytes, *, overwrite):
|
||||
assert overwrite is True
|
||||
destination.parent.mkdir(parents=True, exist_ok=True)
|
||||
destination.write_text("converted", encoding="utf-8")
|
||||
|
||||
monkeypatch.setattr(audible_convert, "convert_aax_file", fake_convert)
|
||||
|
||||
audible_convert.convert_aax_file_with_agent(
|
||||
source,
|
||||
conversion_config(output_directory),
|
||||
)
|
||||
|
||||
assert final_file.read_text(encoding="utf-8") == "existing"
|
||||
assert not list((output_directory / ".audible_convert" / "tmp").glob("*/converted.m4b"))
|
||||
|
||||
|
||||
def test_richie_exports_audiobook_models() -> None:
|
||||
from python.orm.richie import Audiobook # noqa: PLC0415
|
||||
|
||||
assert Audiobook.__tablename__ == "audiobook"
|
||||
|
||||
|
||||
def test_main_dry_run_prints_outputs_without_converting(tmp_path, monkeypatch, capsys) -> None:
|
||||
input_directory = tmp_path / "raw"
|
||||
output_directory = tmp_path / "audiobooks"
|
||||
input_directory.mkdir()
|
||||
source = input_directory / "book.aax"
|
||||
source.touch()
|
||||
monkeypatch.setenv("OLLAMA_API_KEY", "test-key")
|
||||
monkeypatch.setattr(
|
||||
audible_convert,
|
||||
"read_metadata",
|
||||
lambda _: {
|
||||
"artist": "Charles Lamb",
|
||||
"title": "Alice: Alice Series #1",
|
||||
},
|
||||
)
|
||||
calls = []
|
||||
|
||||
def fake_convert(*args, **kwargs):
|
||||
calls.append((args, kwargs))
|
||||
|
||||
monkeypatch.setattr(audible_convert, "convert_aax_file", fake_convert)
|
||||
monkeypatch.setattr(
|
||||
audible_convert,
|
||||
"standard_book_metadata",
|
||||
lambda *_, **__: StandardBookMetadata(
|
||||
author_id=1,
|
||||
author="charles_lamb",
|
||||
book_id=None,
|
||||
title="alice",
|
||||
series_id=1,
|
||||
series="alice",
|
||||
series_index=1,
|
||||
confidence=0.95,
|
||||
needs_review=False,
|
||||
evidence=["test"],
|
||||
),
|
||||
)
|
||||
|
||||
def fake_get_postgres_engine(*, name):
|
||||
assert name == "RICHIE"
|
||||
return create_engine("sqlite+pysqlite:///:memory:")
|
||||
|
||||
monkeypatch.setattr(audible_convert, "get_postgres_engine", fake_get_postgres_engine)
|
||||
|
||||
audible_convert.main(input_directory, output_directory, dry_run=True)
|
||||
|
||||
assert calls == []
|
||||
assert capsys.readouterr().out == (
|
||||
f"{source} -> "
|
||||
f"{output_directory / 'charles_lamb-alice_01-alice' / 'charles_lamb-alice_01-alice.m4b'}\n"
|
||||
)
|
||||
assert (output_directory / ".audible_convert" / "logs").is_dir()
|
||||
|
||||
|
||||
def test_main_reads_activation_bytes_from_env(tmp_path, monkeypatch) -> None:
|
||||
input_directory = tmp_path / "raw"
|
||||
output_directory = tmp_path / "audiobooks"
|
||||
input_directory.mkdir()
|
||||
source = input_directory / "book.aax"
|
||||
source.touch()
|
||||
configs = []
|
||||
|
||||
def fake_convert(_source, config):
|
||||
configs.append(config)
|
||||
|
||||
def fake_get_postgres_engine(*, name):
|
||||
assert name == "RICHIE"
|
||||
return sqlite_engine()
|
||||
|
||||
monkeypatch.setenv("OLLAMA_API_KEY", "test-key")
|
||||
monkeypatch.setenv("AUDIBLE_ACTIVATION_BYTES", "activation-secret")
|
||||
monkeypatch.setattr(audible_convert, "get_postgres_engine", fake_get_postgres_engine)
|
||||
monkeypatch.setattr(audible_convert, "convert_aax_file_with_agent", fake_convert)
|
||||
|
||||
audible_convert.main(input_directory, output_directory)
|
||||
|
||||
assert configs == [
|
||||
audible_convert.ConversionConfig(
|
||||
resolved_output=output_directory,
|
||||
ollama_api_key="test-key",
|
||||
agent_config=configs[0].agent_config,
|
||||
engine=configs[0].engine,
|
||||
activation_bytes="activation-secret",
|
||||
dry_run=False,
|
||||
overwrite=False,
|
||||
),
|
||||
]
|
||||
@@ -0,0 +1,126 @@
|
||||
"""test_audiobook_catalog."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import create_engine, select
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
from python.orm.richie import AudiobookAuthor, AudiobookSeries, RichieBase
|
||||
from python.tools.audiobook import catalog
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def audiobook_session():
|
||||
engine = create_engine("sqlite+pysqlite:///:memory:", future=True)
|
||||
RichieBase.metadata.create_all(engine)
|
||||
with sessionmaker(bind=engine, expire_on_commit=False, future=True)() as session:
|
||||
yield session
|
||||
engine.dispose()
|
||||
|
||||
|
||||
def test_upsert_catalog_csv_inserts_and_updates_authors_and_series(tmp_path, audiobook_session) -> None:
|
||||
audiobook_session.add_all(
|
||||
[
|
||||
AudiobookAuthor(id=10, name="old_author"),
|
||||
AudiobookAuthor(id=11, name="craig_alanson"),
|
||||
],
|
||||
)
|
||||
audiobook_session.commit()
|
||||
authors_csv = tmp_path / "authors.csv"
|
||||
series_csv = tmp_path / "series.csv"
|
||||
authors_csv.write_text(
|
||||
"name,id\n"
|
||||
"glynn_stewart,\n"
|
||||
"craig_alanson,\n"
|
||||
"updated_author,10\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
series_csv.write_text(
|
||||
"name,author_name,id\n"
|
||||
"starships_mage,glynn_stewart,\n"
|
||||
"expeditionary_force,craig_alanson,\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
author_count = catalog.upsert_authors_from_csv(audiobook_session, authors_csv)
|
||||
series_count = catalog.upsert_series_from_csv(audiobook_session, series_csv)
|
||||
audiobook_session.commit()
|
||||
|
||||
authors = audiobook_session.scalars(select(AudiobookAuthor).order_by(AudiobookAuthor.id)).all()
|
||||
series = audiobook_session.scalars(select(AudiobookSeries).order_by(AudiobookSeries.name)).all()
|
||||
assert author_count == 3
|
||||
assert series_count == 2
|
||||
assert [(author.id, author.name) for author in authors] == [
|
||||
(10, "updated_author"),
|
||||
(11, "craig_alanson"),
|
||||
(12, "glynn_stewart"),
|
||||
]
|
||||
assert [(row.name, row.author.name) for row in series] == [
|
||||
("expeditionary_force", "craig_alanson"),
|
||||
("starships_mage", "glynn_stewart"),
|
||||
]
|
||||
|
||||
|
||||
def test_upsert_series_csv_updates_series_by_id(tmp_path, audiobook_session) -> None:
|
||||
author = AudiobookAuthor(id=1, name="glynn_stewart")
|
||||
audiobook_session.add_all(
|
||||
[
|
||||
author,
|
||||
AudiobookSeries(id=7, name="old_series", author=author),
|
||||
],
|
||||
)
|
||||
audiobook_session.commit()
|
||||
series_csv = tmp_path / "series.csv"
|
||||
series_csv.write_text(
|
||||
"name,author_name,id\n"
|
||||
"starships_mage,glynn_stewart,7\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
count = catalog.upsert_series_from_csv(audiobook_session, series_csv)
|
||||
audiobook_session.commit()
|
||||
|
||||
series = audiobook_session.get(AudiobookSeries, 7)
|
||||
assert count == 1
|
||||
assert series.name == "starships_mage"
|
||||
assert series.author.name == "glynn_stewart"
|
||||
|
||||
|
||||
def test_upsert_csv_allows_missing_id_column(tmp_path, audiobook_session) -> None:
|
||||
authors_csv = tmp_path / "authors.csv"
|
||||
series_csv = tmp_path / "series.csv"
|
||||
authors_csv.write_text(
|
||||
"name\n"
|
||||
"glynn_stewart\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
series_csv.write_text(
|
||||
"name,author_name\n"
|
||||
"starships_mage,glynn_stewart\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
author_count = catalog.upsert_authors_from_csv(audiobook_session, authors_csv)
|
||||
series_count = catalog.upsert_series_from_csv(audiobook_session, series_csv)
|
||||
audiobook_session.commit()
|
||||
|
||||
series = audiobook_session.scalar(select(AudiobookSeries))
|
||||
assert author_count == 1
|
||||
assert series_count == 1
|
||||
assert series.name == "starships_mage"
|
||||
assert series.author.name == "glynn_stewart"
|
||||
|
||||
|
||||
def test_upsert_series_csv_rejects_unknown_author(tmp_path, audiobook_session) -> None:
|
||||
series_csv = tmp_path / "series.csv"
|
||||
series_csv.write_text(
|
||||
"name,author_name,id\n"
|
||||
"starships_mage,glynn_stewart,\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
with pytest.raises(catalog.CatalogImportError) as error:
|
||||
catalog.upsert_series_from_csv(audiobook_session, series_csv)
|
||||
|
||||
assert "author not found: glynn_stewart" in str(error.value)
|
||||
Reference in New Issue
Block a user