Files
dotfiles/python/tools/audiobook/audible_convert.py
T

472 lines
15 KiB
Python

"""Convert Audible AAX downloads into Audiobookshelf-friendly M4B files."""
from __future__ import annotations
import json
import logging
import re
import shutil
import subprocess
from concurrent.futures import ThreadPoolExecutor
from dataclasses import asdict, dataclass
from os import getenv
from pathlib import Path # noqa: TC003 This is required for the typer CLI
from typing import TYPE_CHECKING, Annotated, Any
from uuid import uuid7
import typer
from python.common import configure_logger
from python.orm.common import get_postgres_engine
from python.tools.audiobook.metadata_agent import (
AgentConfig,
StandardBookMetadata,
standard_book_metadata,
write_agent_log,
)
if TYPE_CHECKING:
from sqlalchemy.engine import Engine
logger = logging.getLogger(__name__)
SENSITIVE_COMMAND_ARGUMENTS = {"-activation_bytes"}
BOOK_RANGE_PATTERN = re.compile(r"(?:^|-)books?-(?P<start>[1-9]\d*)-(?P<end>[1-9]\d*)(?:-|$)")
@dataclass(frozen=True)
class ConversionConfig:
"""Runtime settings for one conversion command."""
resolved_output: Path
ollama_api_key: str
agent_config: AgentConfig
engine: Engine
activation_bytes: str | None
dry_run: bool
overwrite: bool
work_directory_name: str = ".audible_convert"
dry_run_directory_name: str = "dry-run"
temp_directory_name: str = "tmp"
log_directory_name: str = "logs"
review_directory_name: str = "review"
@dataclass(frozen=True)
class ConcurrentConversionResult:
"""Result from running ffmpeg and metadata resolution together."""
metadata: StandardBookMetadata | None
conversion_error: Exception | None
metadata_error: Exception | None
class CommandExecutionError(RuntimeError):
"""Command failed without exposing sensitive arguments."""
def __init__(self, arguments: list[str], returncode: int) -> None:
"""Create a redacted command failure."""
self.arguments = tuple(arguments)
self.returncode = returncode
command = " ".join(redact_command_arguments(arguments))
super().__init__(f"Command failed with exit code {returncode}: {command}")
def main(
input_directory: Annotated[Path, typer.Argument(help="Directory audible-cli downloads AAX files into.")],
output_directory: Annotated[Path, typer.Argument(help="Audiobook output directory.")],
*,
dry_run: Annotated[
bool,
typer.Option("--dry-run", help="Print planned output files and write marker files without converting."),
] = False,
overwrite: Annotated[bool, typer.Option("--overwrite", help="Overwrite existing M4B files.")] = False,
) -> None:
"""Convert AAX files from a download directory into M4B files."""
configure_logger()
resolved_input = input_directory.resolve(strict=True)
resolved_output = output_directory.resolve()
if not dry_run:
resolved_output.mkdir(parents=True, exist_ok=True)
ollama_api_key = getenv("OLLAMA_API_KEY")
if not ollama_api_key:
msg = "OLLAMA_API_KEY is required for audiobook metadata resolution"
raise RuntimeError(msg)
config = ConversionConfig(
resolved_output=resolved_output,
ollama_api_key=ollama_api_key,
agent_config=AgentConfig(),
engine=get_postgres_engine(name="RICHIE"),
activation_bytes=getenv("AUDIBLE_ACTIVATION_BYTES"),
dry_run=dry_run,
overwrite=overwrite,
)
aax_files = sorted(resolved_input.glob("*.aax"))
if not aax_files:
logger.info("No AAX files found in %s", resolved_input)
return
for aax_file in aax_files:
logger.info("Converting %s", aax_file)
convert_aax_file_with_agent(aax_file, config)
def run_command(arguments: list[str], *, capture: bool = False) -> subprocess.CompletedProcess[str]:
"""Run a command and return the completed process.
Args:
arguments: Command and arguments to run.
capture: Whether to capture stdout and stderr.
Returns:
The completed process.
"""
logger.debug("%s", " ".join(redact_command_arguments(arguments)))
try:
return subprocess.run(arguments, check=True, capture_output=capture, text=True)
except subprocess.CalledProcessError as error:
raise CommandExecutionError(arguments, error.returncode) from error
def redact_command_arguments(arguments: list[str]) -> list[str]:
"""Return command arguments with sensitive values redacted."""
redacted = []
redact_next = False
for argument in arguments:
if redact_next:
redacted.append("<redacted>")
redact_next = False
continue
redacted.append(argument)
redact_next = argument in SENSITIVE_COMMAND_ARGUMENTS
return redacted
def read_metadata(aax_file: Path) -> dict[str, str]:
"""Read ffprobe format tags from an AAX file.
Args:
aax_file: AAX file to inspect.
Returns:
Lower-cased metadata tag names mapped to their values.
"""
completed = run_command(
[
"ffprobe",
"-v",
"quiet",
"-print_format",
"json",
"-show_format",
str(aax_file),
],
capture=True,
)
ffprobe_data: dict[str, Any] = json.loads(completed.stdout)
tags = ffprobe_data.get("format", {}).get("tags", {})
return {str(key).lower(): str(value) for key, value in tags.items()}
def output_stem(metadata: StandardBookMetadata) -> str:
"""Build the output stem for a book.
Args:
metadata: Book metadata.
Returns:
Output stem in author-series_01-title form.
"""
index_slug = series_index_slug(metadata.series_index, metadata.title)
return f"{metadata.author}-{metadata.series}_{index_slug}-{metadata.title}"
def series_index_slug(series_index: float, title: str = "") -> str:
"""Return a filename-safe series index."""
if title_range := title_series_range_slug(series_index, title):
return title_range
index = float(series_index)
if index.is_integer():
return f"{int(index):02}"
return f"{int(index):02}.5"
def title_series_range_slug(series_index: float, title: str) -> str | None:
"""Return a series range slug found in an omnibus title."""
index = float(series_index)
if not index.is_integer():
return None
first_index = int(index)
for match in BOOK_RANGE_PATTERN.finditer(title):
start = int(match.group("start"))
end = int(match.group("end"))
if start == first_index and end > start:
return f"{start:02}-{end:02}"
return None
def metadata_output_path(output_directory: Path, metadata: StandardBookMetadata) -> Path:
"""Build the final M4B path from resolved metadata."""
stem = output_stem(metadata)
return output_directory / stem / f"{stem}.m4b"
def convert_aax_file(
aax_file: Path,
destination: Path,
activation_bytes: str | None,
*,
overwrite: bool,
) -> None:
"""Convert an AAX file into an M4B file.
Args:
aax_file: Source AAX file.
destination: Destination M4B file.
activation_bytes: Optional Audible activation bytes for ffmpeg.
overwrite: Whether to overwrite an existing M4B.
"""
if destination.exists() and not overwrite:
logger.info("Skipping existing file %s", destination)
return
destination.parent.mkdir(parents=True, exist_ok=True)
arguments = ["ffmpeg", "-hide_banner", "-y" if overwrite else "-n"]
if activation_bytes:
arguments.extend(["-activation_bytes", activation_bytes])
arguments.extend(["-i", str(aax_file), "-map_metadata", "0", "-c", "copy", str(destination)])
run_command(arguments)
def write_review_file(
*,
destination: Path | None,
ffprobe_metadata: dict[str, str],
log_file: Path,
metadata: StandardBookMetadata | None,
reason: str,
review_file: Path,
source: Path,
temp_file: Path | None,
) -> None:
"""Write a manual review file for an unresolved conversion."""
review_file.parent.mkdir(parents=True, exist_ok=True)
payload = {
"destination": str(destination) if destination else None,
"ffprobe_metadata": ffprobe_metadata,
"metadata": asdict(metadata) if metadata else None,
"reason": reason,
"source": str(source),
"temp_file": str(temp_file) if temp_file else None,
}
review_file.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
write_agent_log(log_file, "review_written", path=str(review_file), reason=reason)
def cleanup_temp_output(temp_file: Path) -> None:
"""Remove a run's temporary output directory."""
shutil.rmtree(temp_file.parent, ignore_errors=True)
def dry_run_aax_file_with_agent(
aax_file: Path,
ffprobe_metadata: dict[str, str],
engine: Engine,
config: ConversionConfig,
log_file: Path,
review_file: Path,
) -> None:
"""Resolve and print the planned output path without converting."""
metadata = standard_book_metadata(
aax_file.name,
ffprobe_metadata,
engine,
log_file,
config.ollama_api_key,
config.agent_config,
)
destination = None if metadata.needs_review else metadata_output_path(config.resolved_output, metadata)
if metadata.needs_review:
write_review_file(
destination=destination,
ffprobe_metadata=ffprobe_metadata,
log_file=log_file,
metadata=metadata,
reason="metadata_needs_review",
review_file=review_file,
source=aax_file,
temp_file=None,
)
typer.echo(f"{aax_file} -> REVIEW {review_file}")
else:
stem = output_stem(metadata)
dry_run_file = (
config.resolved_output / config.work_directory_name / config.dry_run_directory_name / stem / f"{stem}.m4b"
)
dry_run_file.parent.mkdir(parents=True, exist_ok=True)
dry_run_file.write_text(f"{destination}\n", encoding="utf-8")
write_agent_log(
log_file,
"dry_run_file_written",
destination=str(destination),
path=str(dry_run_file),
)
typer.echo(f"{aax_file} -> {destination}")
def convert_temp_file_and_resolve_metadata(
aax_file: Path,
temp_file: Path,
ffprobe_metadata: dict[str, str],
config: ConversionConfig,
log_file: Path,
) -> ConcurrentConversionResult:
"""Run ffmpeg and metadata resolution in parallel."""
conversion_error: Exception | None = None
metadata_error: Exception | None = None
metadata: StandardBookMetadata | None = None
with ThreadPoolExecutor(max_workers=2) as executor:
conversion_future = executor.submit(
convert_aax_file,
aax_file,
temp_file,
config.activation_bytes,
overwrite=True,
)
metadata_future = executor.submit(
standard_book_metadata,
aax_file.name,
ffprobe_metadata,
config.engine,
log_file,
config.ollama_api_key,
config.agent_config,
)
conversion_error = conversion_future.exception()
if conversion_error is None:
conversion_future.result()
metadata_error = metadata_future.exception()
if metadata_error is None:
metadata = metadata_future.result()
return ConcurrentConversionResult(
metadata=metadata,
conversion_error=conversion_error,
metadata_error=metadata_error,
)
def convert_aax_file_with_agent(aax_file: Path, config: ConversionConfig) -> None:
"""Convert one AAX file using the metadata agent for the final path."""
run_id = uuid7().hex
log_file = config.resolved_output / config.work_directory_name / config.log_directory_name / f"{run_id}.jsonl"
review_file = config.resolved_output / config.work_directory_name / config.review_directory_name / f"{run_id}.json"
write_agent_log(log_file, "conversion_start", source=str(aax_file), dry_run=config.dry_run)
try:
ffprobe_metadata = read_metadata(aax_file)
except Exception as error:
logger.exception("ffprobe failed")
write_review_file(
destination=None,
ffprobe_metadata={},
log_file=log_file,
metadata=None,
reason=f"ffprobe_failed: {error}",
review_file=review_file,
source=aax_file,
temp_file=None,
)
return
if config.dry_run:
dry_run_aax_file_with_agent(
aax_file,
ffprobe_metadata,
config.engine,
config,
log_file,
review_file,
)
return
temp_file = (
config.resolved_output / config.work_directory_name / config.temp_directory_name / run_id / "converted.m4b"
)
temp_file.parent.mkdir(parents=True, exist_ok=True)
result = convert_temp_file_and_resolve_metadata(aax_file, temp_file, ffprobe_metadata, config, log_file)
if result.conversion_error:
reason = f"ffmpeg_failed: {result.conversion_error}"
write_review_file(
destination=None,
ffprobe_metadata=ffprobe_metadata,
log_file=log_file,
metadata=result.metadata,
reason=reason,
review_file=review_file,
source=aax_file,
temp_file=temp_file if temp_file.exists() else None,
)
return
if result.metadata_error:
write_review_file(
destination=None,
ffprobe_metadata=ffprobe_metadata,
log_file=log_file,
metadata=None,
reason=f"metadata_failed: {result.metadata_error}",
review_file=review_file,
source=aax_file,
temp_file=temp_file,
)
return
if result.metadata is None or result.metadata.needs_review:
write_review_file(
destination=None,
ffprobe_metadata=ffprobe_metadata,
log_file=log_file,
metadata=result.metadata,
reason="metadata_needs_review",
review_file=review_file,
source=aax_file,
temp_file=temp_file,
)
return
destination = metadata_output_path(config.resolved_output, result.metadata)
if destination.exists() and not config.overwrite:
write_agent_log(log_file, "destination_exists", destination=str(destination))
cleanup_temp_output(temp_file)
return
destination.parent.mkdir(parents=True, exist_ok=True)
try:
temp_file.replace(destination)
except Exception as error: # noqa: BLE001
write_review_file(
destination=destination,
ffprobe_metadata=ffprobe_metadata,
log_file=log_file,
metadata=result.metadata,
reason=f"rename_failed: {error}",
review_file=review_file,
source=aax_file,
temp_file=temp_file if temp_file.exists() else None,
)
else:
cleanup_temp_output(temp_file)
write_agent_log(log_file, "conversion_complete", destination=str(destination))
if __name__ == "__main__":
typer.run(main)