"""Convert Audible AAX downloads into Audiobookshelf-friendly M4B files.""" from __future__ import annotations import json import logging import shutil import subprocess from concurrent.futures import ThreadPoolExecutor from dataclasses import asdict, dataclass from os import getenv from pathlib import Path # noqa: TC003 This is required for the typer CLI from typing import TYPE_CHECKING, Annotated, Any from uuid import uuid7 import typer from python.common import configure_logger from python.orm.common import get_postgres_engine from python.tools.audiobook.metadata_agent import ( AgentConfig, StandardBookMetadata, standard_book_metadata, write_agent_log, ) if TYPE_CHECKING: from sqlalchemy.engine import Engine logger = logging.getLogger(__name__) SENSITIVE_COMMAND_ARGUMENTS = {"-activation_bytes"} @dataclass(frozen=True) class ConversionConfig: """Runtime settings for one conversion command.""" resolved_output: Path ollama_api_key: str agent_config: AgentConfig engine: Engine activation_bytes: str | None dry_run: bool overwrite: bool work_directory_name: str = ".audible_convert" dry_run_directory_name: str = "dry-run" temp_directory_name: str = "tmp" log_directory_name: str = "logs" review_directory_name: str = "review" @dataclass(frozen=True) class ConcurrentConversionResult: """Result from running ffmpeg and metadata resolution together.""" metadata: StandardBookMetadata | None conversion_error: Exception | None metadata_error: Exception | None class CommandExecutionError(RuntimeError): """Command failed without exposing sensitive arguments.""" def __init__(self, arguments: list[str], returncode: int) -> None: """Create a redacted command failure.""" self.arguments = tuple(arguments) self.returncode = returncode command = " ".join(redact_command_arguments(arguments)) super().__init__(f"Command failed with exit code {returncode}: {command}") def main( input_directory: Annotated[Path, typer.Argument(help="Directory audible-cli downloads AAX files into.")], output_directory: Annotated[Path, typer.Argument(help="Audiobook output directory.")], *, dry_run: Annotated[ bool, typer.Option("--dry-run", help="Print planned output files and write marker files without converting."), ] = False, overwrite: Annotated[bool, typer.Option("--overwrite", help="Overwrite existing M4B files.")] = False, ) -> None: """Convert AAX files from a download directory into M4B files.""" configure_logger() resolved_input = input_directory.resolve(strict=True) resolved_output = output_directory.resolve() if not dry_run: resolved_output.mkdir(parents=True, exist_ok=True) ollama_api_key = getenv("OLLAMA_API_KEY") if not ollama_api_key: msg = "OLLAMA_API_KEY is required for audiobook metadata resolution" raise RuntimeError(msg) config = ConversionConfig( resolved_output=resolved_output, ollama_api_key=ollama_api_key, agent_config=AgentConfig(), engine=get_postgres_engine(name="RICHIE"), activation_bytes=getenv("AUDIBLE_ACTIVATION_BYTES"), dry_run=dry_run, overwrite=overwrite, ) aax_files = sorted(resolved_input.glob("*.aax")) if not aax_files: logger.info("No AAX files found in %s", resolved_input) return for aax_file in aax_files: logger.info("Converting %s", aax_file) convert_aax_file_with_agent(aax_file, config) def run_command(arguments: list[str], *, capture: bool = False) -> subprocess.CompletedProcess[str]: """Run a command and return the completed process. Args: arguments: Command and arguments to run. capture: Whether to capture stdout and stderr. Returns: The completed process. """ logger.debug("%s", " ".join(redact_command_arguments(arguments))) try: return subprocess.run(arguments, check=True, capture_output=capture, text=True) except subprocess.CalledProcessError as error: raise CommandExecutionError(arguments, error.returncode) from error def redact_command_arguments(arguments: list[str]) -> list[str]: """Return command arguments with sensitive values redacted.""" redacted = [] redact_next = False for argument in arguments: if redact_next: redacted.append("") redact_next = False continue redacted.append(argument) redact_next = argument in SENSITIVE_COMMAND_ARGUMENTS return redacted def read_metadata(aax_file: Path) -> dict[str, str]: """Read ffprobe format tags from an AAX file. Args: aax_file: AAX file to inspect. Returns: Lower-cased metadata tag names mapped to their values. """ completed = run_command( [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", str(aax_file), ], capture=True, ) ffprobe_data: dict[str, Any] = json.loads(completed.stdout) tags = ffprobe_data.get("format", {}).get("tags", {}) return {str(key).lower(): str(value) for key, value in tags.items()} def output_stem(metadata: StandardBookMetadata) -> str: """Build the output stem for a book. Args: metadata: Book metadata. Returns: Output stem in author-series_01-title form. """ return f"{metadata.author}-{metadata.series}_{metadata.series_index:02}-{metadata.title}" def metadata_output_path(output_directory: Path, metadata: StandardBookMetadata) -> Path: """Build the final M4B path from resolved metadata.""" stem = output_stem(metadata) return output_directory / stem / f"{stem}.m4b" def convert_aax_file( aax_file: Path, destination: Path, activation_bytes: str | None, *, overwrite: bool, ) -> None: """Convert an AAX file into an M4B file. Args: aax_file: Source AAX file. destination: Destination M4B file. activation_bytes: Optional Audible activation bytes for ffmpeg. overwrite: Whether to overwrite an existing M4B. """ if destination.exists() and not overwrite: logger.info("Skipping existing file %s", destination) return destination.parent.mkdir(parents=True, exist_ok=True) arguments = ["ffmpeg", "-hide_banner", "-y" if overwrite else "-n"] if activation_bytes: arguments.extend(["-activation_bytes", activation_bytes]) arguments.extend(["-i", str(aax_file), "-map_metadata", "0", "-c", "copy", str(destination)]) run_command(arguments) def write_review_file( *, destination: Path | None, ffprobe_metadata: dict[str, str], log_file: Path, metadata: StandardBookMetadata | None, reason: str, review_file: Path, source: Path, temp_file: Path | None, ) -> None: """Write a manual review file for an unresolved conversion.""" review_file.parent.mkdir(parents=True, exist_ok=True) payload = { "destination": str(destination) if destination else None, "ffprobe_metadata": ffprobe_metadata, "metadata": asdict(metadata) if metadata else None, "reason": reason, "source": str(source), "temp_file": str(temp_file) if temp_file else None, } review_file.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8") write_agent_log(log_file, "review_written", path=str(review_file), reason=reason) def cleanup_temp_output(temp_file: Path) -> None: """Remove a run's temporary output directory.""" shutil.rmtree(temp_file.parent, ignore_errors=True) def dry_run_aax_file_with_agent( aax_file: Path, ffprobe_metadata: dict[str, str], engine: Engine, config: ConversionConfig, log_file: Path, review_file: Path, ) -> None: """Resolve and print the planned output path without converting.""" metadata = standard_book_metadata( aax_file.name, ffprobe_metadata, engine, log_file, config.ollama_api_key, config.agent_config, ) destination = None if metadata.needs_review else metadata_output_path(config.resolved_output, metadata) if metadata.needs_review: write_review_file( destination=destination, ffprobe_metadata=ffprobe_metadata, log_file=log_file, metadata=metadata, reason="metadata_needs_review", review_file=review_file, source=aax_file, temp_file=None, ) typer.echo(f"{aax_file} -> REVIEW {review_file}") else: stem = output_stem(metadata) dry_run_file = ( config.resolved_output / config.work_directory_name / config.dry_run_directory_name / stem / f"{stem}.m4b" ) dry_run_file.parent.mkdir(parents=True, exist_ok=True) dry_run_file.write_text(f"{destination}\n", encoding="utf-8") write_agent_log( log_file, "dry_run_file_written", destination=str(destination), path=str(dry_run_file), ) typer.echo(f"{aax_file} -> {destination}") def convert_temp_file_and_resolve_metadata( aax_file: Path, temp_file: Path, ffprobe_metadata: dict[str, str], config: ConversionConfig, log_file: Path, ) -> ConcurrentConversionResult: """Run ffmpeg and metadata resolution in parallel.""" conversion_error: Exception | None = None metadata_error: Exception | None = None metadata: StandardBookMetadata | None = None with ThreadPoolExecutor(max_workers=2) as executor: conversion_future = executor.submit( convert_aax_file, aax_file, temp_file, config.activation_bytes, overwrite=True, ) metadata_future = executor.submit( standard_book_metadata, aax_file.name, ffprobe_metadata, config.engine, log_file, config.ollama_api_key, config.agent_config, ) conversion_error = conversion_future.exception() if conversion_error is None: conversion_future.result() metadata_error = metadata_future.exception() if metadata_error is None: metadata = metadata_future.result() return ConcurrentConversionResult( metadata=metadata, conversion_error=conversion_error, metadata_error=metadata_error, ) def convert_aax_file_with_agent(aax_file: Path, config: ConversionConfig) -> None: """Convert one AAX file using the metadata agent for the final path.""" run_id = uuid7().hex log_file = config.resolved_output / config.work_directory_name / config.log_directory_name / f"{run_id}.jsonl" review_file = config.resolved_output / config.work_directory_name / config.review_directory_name / f"{run_id}.json" write_agent_log(log_file, "conversion_start", source=str(aax_file), dry_run=config.dry_run) try: ffprobe_metadata = read_metadata(aax_file) except Exception as error: logger.exception("ffprobe failed") write_review_file( destination=None, ffprobe_metadata={}, log_file=log_file, metadata=None, reason=f"ffprobe_failed: {error}", review_file=review_file, source=aax_file, temp_file=None, ) return if config.dry_run: dry_run_aax_file_with_agent( aax_file, ffprobe_metadata, config.engine, config, log_file, review_file, ) return temp_file = ( config.resolved_output / config.work_directory_name / config.temp_directory_name / run_id / "converted.m4b" ) temp_file.parent.mkdir(parents=True, exist_ok=True) result = convert_temp_file_and_resolve_metadata(aax_file, temp_file, ffprobe_metadata, config, log_file) if result.conversion_error: reason = f"ffmpeg_failed: {result.conversion_error}" write_review_file( destination=None, ffprobe_metadata=ffprobe_metadata, log_file=log_file, metadata=result.metadata, reason=reason, review_file=review_file, source=aax_file, temp_file=temp_file if temp_file.exists() else None, ) return if result.metadata_error: write_review_file( destination=None, ffprobe_metadata=ffprobe_metadata, log_file=log_file, metadata=None, reason=f"metadata_failed: {result.metadata_error}", review_file=review_file, source=aax_file, temp_file=temp_file, ) return if result.metadata is None or result.metadata.needs_review: write_review_file( destination=None, ffprobe_metadata=ffprobe_metadata, log_file=log_file, metadata=result.metadata, reason="metadata_needs_review", review_file=review_file, source=aax_file, temp_file=temp_file, ) return destination = metadata_output_path(config.resolved_output, result.metadata) if destination.exists() and not config.overwrite: write_agent_log(log_file, "destination_exists", destination=str(destination)) cleanup_temp_output(temp_file) return destination.parent.mkdir(parents=True, exist_ok=True) try: temp_file.replace(destination) except Exception as error: # noqa: BLE001 write_review_file( destination=destination, ffprobe_metadata=ffprobe_metadata, log_file=log_file, metadata=result.metadata, reason=f"rename_failed: {error}", review_file=review_file, source=aax_file, temp_file=temp_file if temp_file.exists() else None, ) else: cleanup_temp_output(temp_file) write_agent_log(log_file, "conversion_complete", destination=str(destination)) if __name__ == "__main__": typer.run(main)