177 lines
5.6 KiB
Python
177 lines
5.6 KiB
Python
"""Import audiobook catalog authors and series from CSV files."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import logging
|
|
from pathlib import Path # noqa: TC003 This is required for the typer CLI
|
|
from typing import Annotated
|
|
|
|
import typer
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from python.common import configure_logger
|
|
from python.orm.common import get_postgres_engine
|
|
from python.orm.richie import AudiobookAuthor, AudiobookSeries
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
AUTHOR_NAME_COLUMN = "author_name"
|
|
ID_COLUMN = "id"
|
|
NAME_COLUMN = "name"
|
|
|
|
|
|
class CatalogImportError(ValueError):
|
|
"""CSV catalog import failed validation."""
|
|
|
|
|
|
def main(
|
|
authors_csv: Annotated[Path, typer.Argument(help="CSV with name and optional id.")],
|
|
series_csv: Annotated[Path, typer.Argument(help="CSV with name, author_name, and optional id.")],
|
|
) -> None:
|
|
"""Upsert audiobook authors and series from CSV files."""
|
|
configure_logger()
|
|
try:
|
|
engine = get_postgres_engine(name="RICHIE")
|
|
with Session(engine) as session:
|
|
author_count = upsert_authors_from_csv(session, authors_csv)
|
|
series_count = upsert_series_from_csv(session, series_csv)
|
|
session.commit()
|
|
except CatalogImportError as error:
|
|
typer.echo(str(error), err=True)
|
|
raise typer.Exit(code=1) from error
|
|
|
|
logger.info("Upserted %s authors and %s series", author_count, series_count)
|
|
|
|
|
|
def upsert_authors_from_csv(session: Session, authors_csv: Path) -> int:
|
|
"""Upsert authors from a CSV file."""
|
|
count = 0
|
|
for row_number, row in csv_rows(authors_csv):
|
|
name = required_csv_value(row, authors_csv, row_number, NAME_COLUMN)
|
|
upsert_author(session, name, csv_id(row, authors_csv, row_number))
|
|
count += 1
|
|
return count
|
|
|
|
|
|
def upsert_series_from_csv(session: Session, series_csv: Path) -> int:
|
|
"""Upsert series from a CSV file."""
|
|
count = 0
|
|
for row_number, row in csv_rows(series_csv):
|
|
series_name = required_csv_value(row, series_csv, row_number, NAME_COLUMN)
|
|
author_name = required_csv_value(row, series_csv, row_number, AUTHOR_NAME_COLUMN)
|
|
author = find_author_by_name(session, author_name)
|
|
if author is None:
|
|
msg = f"{series_csv}:{row_number}: author not found: {author_name}"
|
|
raise CatalogImportError(msg)
|
|
upsert_series(session, series_name, author, csv_id(row, series_csv, row_number))
|
|
count += 1
|
|
return count
|
|
|
|
|
|
def upsert_author(session: Session, name: str, author_id: int | None) -> AudiobookAuthor:
|
|
"""Upsert one author by id or exact name."""
|
|
if author_id is not None:
|
|
author = session.get(AudiobookAuthor, author_id)
|
|
if author is None:
|
|
author = AudiobookAuthor(id=author_id, name=name)
|
|
session.add(author)
|
|
else:
|
|
author.name = name
|
|
session.flush()
|
|
return author
|
|
|
|
author = find_author_by_name(session, name)
|
|
if author is None:
|
|
author = AudiobookAuthor(name=name)
|
|
session.add(author)
|
|
session.flush()
|
|
return author
|
|
|
|
|
|
def upsert_series(
|
|
session: Session,
|
|
name: str,
|
|
author: AudiobookAuthor,
|
|
series_id: int | None,
|
|
) -> AudiobookSeries:
|
|
"""Upsert one series by id or exact author/name match."""
|
|
if series_id is not None:
|
|
series = session.get(AudiobookSeries, series_id)
|
|
if series is None:
|
|
series = AudiobookSeries(id=series_id, name=name, author=author)
|
|
session.add(series)
|
|
else:
|
|
series.name = name
|
|
series.author = author
|
|
session.flush()
|
|
return series
|
|
|
|
series = find_series_by_name_and_author(session, name, author.id)
|
|
if series is None:
|
|
series = AudiobookSeries(name=name, author=author)
|
|
session.add(series)
|
|
session.flush()
|
|
return series
|
|
|
|
|
|
def find_author_by_name(session: Session, name: str) -> AudiobookAuthor | None:
|
|
"""Find one author by exact name."""
|
|
return session.scalar(select(AudiobookAuthor).where(AudiobookAuthor.name == name))
|
|
|
|
|
|
def find_series_by_name_and_author(
|
|
session: Session,
|
|
name: str,
|
|
author_id: int,
|
|
) -> AudiobookSeries | None:
|
|
"""Find one series by exact name and author."""
|
|
return session.scalar(
|
|
select(AudiobookSeries).where(
|
|
AudiobookSeries.name == name,
|
|
AudiobookSeries.author_id == author_id,
|
|
),
|
|
)
|
|
|
|
|
|
def csv_rows(csv_path: Path) -> list[tuple[int, dict[str, str | None]]]:
|
|
"""Read a CSV file as numbered rows."""
|
|
with csv_path.open(newline="", encoding="utf-8") as file:
|
|
reader = csv.DictReader(file)
|
|
if reader.fieldnames is None:
|
|
msg = f"{csv_path}: missing CSV header"
|
|
raise CatalogImportError(msg)
|
|
return [(row_number, row) for row_number, row in enumerate(reader, start=2)]
|
|
|
|
|
|
def required_csv_value(
|
|
row: dict[str, str | None],
|
|
csv_path: Path,
|
|
row_number: int,
|
|
column: str,
|
|
) -> str:
|
|
"""Read a required CSV value."""
|
|
value = row.get(column)
|
|
if value and value.strip():
|
|
return value.strip()
|
|
msg = f"{csv_path}:{row_number}: missing required column value: {column}"
|
|
raise CatalogImportError(msg)
|
|
|
|
|
|
def csv_id(row: dict[str, str | None], csv_path: Path, row_number: int) -> int | None:
|
|
"""Read an optional id field from a CSV row."""
|
|
value = row.get(ID_COLUMN)
|
|
if value is None or not value.strip():
|
|
return None
|
|
try:
|
|
return int(value)
|
|
except ValueError as error:
|
|
msg = f"{csv_path}:{row_number}: id must be an integer: {value}"
|
|
raise CatalogImportError(msg) from error
|
|
return None
|
|
|
|
|
|
if __name__ == "__main__":
|
|
typer.run(main)
|