diff --git a/python/tools/audiobook/catalog.py b/python/tools/audiobook/catalog.py new file mode 100644 index 0000000..7188493 --- /dev/null +++ b/python/tools/audiobook/catalog.py @@ -0,0 +1,176 @@ +"""Import audiobook catalog authors and series from CSV files.""" + +from __future__ import annotations + +import csv +import logging +from pathlib import Path # noqa: TC003 This is required for the typer CLI +from typing import Annotated + +import typer +from sqlalchemy import select +from sqlalchemy.orm import Session + +from python.common import configure_logger +from python.orm.common import get_postgres_engine +from python.orm.richie import AudiobookAuthor, AudiobookSeries + +logger = logging.getLogger(__name__) + +AUTHOR_NAME_COLUMN = "author_name" +ID_COLUMN = "id" +NAME_COLUMN = "name" + + +class CatalogImportError(ValueError): + """CSV catalog import failed validation.""" + + +def main( + authors_csv: Annotated[Path, typer.Argument(help="CSV with name and optional id.")], + series_csv: Annotated[Path, typer.Argument(help="CSV with name, author_name, and optional id.")], +) -> None: + """Upsert audiobook authors and series from CSV files.""" + configure_logger() + try: + engine = get_postgres_engine(name="RICHIE") + with Session(engine) as session: + author_count = upsert_authors_from_csv(session, authors_csv) + series_count = upsert_series_from_csv(session, series_csv) + session.commit() + except CatalogImportError as error: + typer.echo(str(error), err=True) + raise typer.Exit(code=1) from error + + logger.info("Upserted %s authors and %s series", author_count, series_count) + + +def upsert_authors_from_csv(session: Session, authors_csv: Path) -> int: + """Upsert authors from a CSV file.""" + count = 0 + for row_number, row in csv_rows(authors_csv): + name = required_csv_value(row, authors_csv, row_number, NAME_COLUMN) + upsert_author(session, name, csv_id(row, authors_csv, row_number)) + count += 1 + return count + + +def upsert_series_from_csv(session: Session, series_csv: Path) -> int: + """Upsert series from a CSV file.""" + count = 0 + for row_number, row in csv_rows(series_csv): + series_name = required_csv_value(row, series_csv, row_number, NAME_COLUMN) + author_name = required_csv_value(row, series_csv, row_number, AUTHOR_NAME_COLUMN) + author = find_author_by_name(session, author_name) + if author is None: + msg = f"{series_csv}:{row_number}: author not found: {author_name}" + raise CatalogImportError(msg) + upsert_series(session, series_name, author, csv_id(row, series_csv, row_number)) + count += 1 + return count + + +def upsert_author(session: Session, name: str, author_id: int | None) -> AudiobookAuthor: + """Upsert one author by id or exact name.""" + if author_id is not None: + author = session.get(AudiobookAuthor, author_id) + if author is None: + author = AudiobookAuthor(id=author_id, name=name) + session.add(author) + else: + author.name = name + session.flush() + return author + + author = find_author_by_name(session, name) + if author is None: + author = AudiobookAuthor(name=name) + session.add(author) + session.flush() + return author + + +def upsert_series( + session: Session, + name: str, + author: AudiobookAuthor, + series_id: int | None, +) -> AudiobookSeries: + """Upsert one series by id or exact author/name match.""" + if series_id is not None: + series = session.get(AudiobookSeries, series_id) + if series is None: + series = AudiobookSeries(id=series_id, name=name, author=author) + session.add(series) + else: + series.name = name + series.author = author + session.flush() + return series + + series = find_series_by_name_and_author(session, name, author.id) + if series is None: + series = AudiobookSeries(name=name, author=author) + session.add(series) + session.flush() + return series + + +def find_author_by_name(session: Session, name: str) -> AudiobookAuthor | None: + """Find one author by exact name.""" + return session.scalar(select(AudiobookAuthor).where(AudiobookAuthor.name == name)) + + +def find_series_by_name_and_author( + session: Session, + name: str, + author_id: int, +) -> AudiobookSeries | None: + """Find one series by exact name and author.""" + return session.scalar( + select(AudiobookSeries).where( + AudiobookSeries.name == name, + AudiobookSeries.author_id == author_id, + ), + ) + + +def csv_rows(csv_path: Path) -> list[tuple[int, dict[str, str | None]]]: + """Read a CSV file as numbered rows.""" + with csv_path.open(newline="", encoding="utf-8") as file: + reader = csv.DictReader(file) + if reader.fieldnames is None: + msg = f"{csv_path}: missing CSV header" + raise CatalogImportError(msg) + return [(row_number, row) for row_number, row in enumerate(reader, start=2)] + + +def required_csv_value( + row: dict[str, str | None], + csv_path: Path, + row_number: int, + column: str, +) -> str: + """Read a required CSV value.""" + value = row.get(column) + if value and value.strip(): + return value.strip() + msg = f"{csv_path}:{row_number}: missing required column value: {column}" + raise CatalogImportError(msg) + + +def csv_id(row: dict[str, str | None], csv_path: Path, row_number: int) -> int | None: + """Read an optional id field from a CSV row.""" + value = row.get(ID_COLUMN) + if value is None or not value.strip(): + return None + try: + return int(value) + except ValueError as error: + msg = f"{csv_path}:{row_number}: id must be an integer: {value}" + raise CatalogImportError(msg) from error + return None + + +if __name__ == "__main__": + typer.run(main)