"""Import audiobook catalog authors and series from CSV files.""" from __future__ import annotations import csv import logging from pathlib import Path # noqa: TC003 This is required for the typer CLI from typing import Annotated import typer from sqlalchemy import select from sqlalchemy.orm import Session from python.common import configure_logger from python.orm.common import get_postgres_engine from python.orm.richie import AudiobookAuthor, AudiobookSeries logger = logging.getLogger(__name__) AUTHOR_NAME_COLUMN = "author_name" ID_COLUMN = "id" NAME_COLUMN = "name" class CatalogImportError(ValueError): """CSV catalog import failed validation.""" def main( authors_csv: Annotated[Path, typer.Argument(help="CSV with name and optional id.")], series_csv: Annotated[Path, typer.Argument(help="CSV with name, author_name, and optional id.")], ) -> None: """Upsert audiobook authors and series from CSV files.""" configure_logger() try: engine = get_postgres_engine(name="RICHIE") with Session(engine) as session: author_count = upsert_authors_from_csv(session, authors_csv) series_count = upsert_series_from_csv(session, series_csv) session.commit() except CatalogImportError as error: typer.echo(str(error), err=True) raise typer.Exit(code=1) from error logger.info("Upserted %s authors and %s series", author_count, series_count) def upsert_authors_from_csv(session: Session, authors_csv: Path) -> int: """Upsert authors from a CSV file.""" count = 0 for row_number, row in csv_rows(authors_csv): name = required_csv_value(row, authors_csv, row_number, NAME_COLUMN) upsert_author(session, name, csv_id(row, authors_csv, row_number)) count += 1 return count def upsert_series_from_csv(session: Session, series_csv: Path) -> int: """Upsert series from a CSV file.""" count = 0 for row_number, row in csv_rows(series_csv): series_name = required_csv_value(row, series_csv, row_number, NAME_COLUMN) author_name = required_csv_value(row, series_csv, row_number, AUTHOR_NAME_COLUMN) author = find_author_by_name(session, author_name) if author is None: msg = f"{series_csv}:{row_number}: author not found: {author_name}" raise CatalogImportError(msg) upsert_series(session, series_name, author, csv_id(row, series_csv, row_number)) count += 1 return count def upsert_author(session: Session, name: str, author_id: int | None) -> AudiobookAuthor: """Upsert one author by id or exact name.""" if author_id is not None: author = session.get(AudiobookAuthor, author_id) if author is None: author = AudiobookAuthor(id=author_id, name=name) session.add(author) else: author.name = name session.flush() return author author = find_author_by_name(session, name) if author is None: author = AudiobookAuthor(name=name) session.add(author) session.flush() return author def upsert_series( session: Session, name: str, author: AudiobookAuthor, series_id: int | None, ) -> AudiobookSeries: """Upsert one series by id or exact author/name match.""" if series_id is not None: series = session.get(AudiobookSeries, series_id) if series is None: series = AudiobookSeries(id=series_id, name=name, author=author) session.add(series) else: series.name = name series.author = author session.flush() return series series = find_series_by_name_and_author(session, name, author.id) if series is None: series = AudiobookSeries(name=name, author=author) session.add(series) session.flush() return series def find_author_by_name(session: Session, name: str) -> AudiobookAuthor | None: """Find one author by exact name.""" return session.scalar(select(AudiobookAuthor).where(AudiobookAuthor.name == name)) def find_series_by_name_and_author( session: Session, name: str, author_id: int, ) -> AudiobookSeries | None: """Find one series by exact name and author.""" return session.scalar( select(AudiobookSeries).where( AudiobookSeries.name == name, AudiobookSeries.author_id == author_id, ), ) def csv_rows(csv_path: Path) -> list[tuple[int, dict[str, str | None]]]: """Read a CSV file as numbered rows.""" with csv_path.open(newline="", encoding="utf-8") as file: reader = csv.DictReader(file) if reader.fieldnames is None: msg = f"{csv_path}: missing CSV header" raise CatalogImportError(msg) return [(row_number, row) for row_number, row in enumerate(reader, start=2)] def required_csv_value( row: dict[str, str | None], csv_path: Path, row_number: int, column: str, ) -> str: """Read a required CSV value.""" value = row.get(column) if value and value.strip(): return value.strip() msg = f"{csv_path}:{row_number}: missing required column value: {column}" raise CatalogImportError(msg) def csv_id(row: dict[str, str | None], csv_path: Path, row_number: int) -> int | None: """Read an optional id field from a CSV row.""" value = row.get(ID_COLUMN) if value is None or not value.strip(): return None try: return int(value) except ValueError as error: msg = f"{csv_path}:{row_number}: id must be an integer: {value}" raise CatalogImportError(msg) from error return None if __name__ == "__main__": typer.run(main)