4d2a017f2e
treefmt / nix fmt (pull_request) Failing after 5s
pytest / pytest (pull_request) Successful in 26s
build_systems / build-bob (pull_request) Successful in 45s
build_systems / build-leviathan (pull_request) Successful in 55s
build_systems / build-rhapsody-in-green (pull_request) Successful in 56s
build_systems / build-brain (pull_request) Successful in 47s
build_systems / build-jeeves (pull_request) Successful in 2m36s
472 lines
15 KiB
Python
472 lines
15 KiB
Python
"""Convert Audible AAX downloads into Audiobookshelf-friendly M4B files."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from dataclasses import asdict, dataclass
|
|
from os import getenv
|
|
from pathlib import Path # noqa: TC003 This is required for the typer CLI
|
|
from typing import TYPE_CHECKING, Annotated, Any
|
|
from uuid import uuid7
|
|
|
|
import typer
|
|
|
|
from python.common import configure_logger
|
|
from python.orm.common import get_postgres_engine
|
|
from python.tools.audiobook.metadata_agent import (
|
|
AgentConfig,
|
|
StandardBookMetadata,
|
|
standard_book_metadata,
|
|
write_agent_log,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from sqlalchemy.engine import Engine
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SENSITIVE_COMMAND_ARGUMENTS = {"-activation_bytes"}
|
|
BOOK_RANGE_PATTERN = re.compile(r"(?:^|-)books?-(?P<start>[1-9]\d*)-(?P<end>[1-9]\d*)(?:-|$)")
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ConversionConfig:
|
|
"""Runtime settings for one conversion command."""
|
|
|
|
resolved_output: Path
|
|
ollama_api_key: str
|
|
agent_config: AgentConfig
|
|
engine: Engine
|
|
activation_bytes: str | None
|
|
dry_run: bool
|
|
overwrite: bool
|
|
work_directory_name: str = ".audible_convert"
|
|
dry_run_directory_name: str = "dry-run"
|
|
temp_directory_name: str = "tmp"
|
|
log_directory_name: str = "logs"
|
|
review_directory_name: str = "review"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ConcurrentConversionResult:
|
|
"""Result from running ffmpeg and metadata resolution together."""
|
|
|
|
metadata: StandardBookMetadata | None
|
|
conversion_error: Exception | None
|
|
metadata_error: Exception | None
|
|
|
|
|
|
class CommandExecutionError(RuntimeError):
|
|
"""Command failed without exposing sensitive arguments."""
|
|
|
|
def __init__(self, arguments: list[str], returncode: int) -> None:
|
|
"""Create a redacted command failure."""
|
|
self.arguments = tuple(arguments)
|
|
self.returncode = returncode
|
|
command = " ".join(redact_command_arguments(arguments))
|
|
super().__init__(f"Command failed with exit code {returncode}: {command}")
|
|
|
|
|
|
def main(
|
|
input_directory: Annotated[Path, typer.Argument(help="Directory audible-cli downloads AAX files into.")],
|
|
output_directory: Annotated[Path, typer.Argument(help="Audiobook output directory.")],
|
|
*,
|
|
dry_run: Annotated[
|
|
bool,
|
|
typer.Option("--dry-run", help="Print planned output files and write marker files without converting."),
|
|
] = False,
|
|
overwrite: Annotated[bool, typer.Option("--overwrite", help="Overwrite existing M4B files.")] = False,
|
|
) -> None:
|
|
"""Convert AAX files from a download directory into M4B files."""
|
|
configure_logger()
|
|
resolved_input = input_directory.resolve(strict=True)
|
|
resolved_output = output_directory.resolve()
|
|
if not dry_run:
|
|
resolved_output.mkdir(parents=True, exist_ok=True)
|
|
|
|
ollama_api_key = getenv("OLLAMA_API_KEY")
|
|
if not ollama_api_key:
|
|
msg = "OLLAMA_API_KEY is required for audiobook metadata resolution"
|
|
raise RuntimeError(msg)
|
|
|
|
config = ConversionConfig(
|
|
resolved_output=resolved_output,
|
|
ollama_api_key=ollama_api_key,
|
|
agent_config=AgentConfig(),
|
|
engine=get_postgres_engine(name="RICHIE"),
|
|
activation_bytes=getenv("AUDIBLE_ACTIVATION_BYTES"),
|
|
dry_run=dry_run,
|
|
overwrite=overwrite,
|
|
)
|
|
|
|
aax_files = sorted(resolved_input.glob("*.aax"))
|
|
if not aax_files:
|
|
logger.info("No AAX files found in %s", resolved_input)
|
|
return
|
|
for aax_file in aax_files:
|
|
logger.info("Converting %s", aax_file)
|
|
convert_aax_file_with_agent(aax_file, config)
|
|
|
|
|
|
def run_command(arguments: list[str], *, capture: bool = False) -> subprocess.CompletedProcess[str]:
|
|
"""Run a command and return the completed process.
|
|
|
|
Args:
|
|
arguments: Command and arguments to run.
|
|
capture: Whether to capture stdout and stderr.
|
|
|
|
Returns:
|
|
The completed process.
|
|
"""
|
|
logger.debug("%s", " ".join(redact_command_arguments(arguments)))
|
|
try:
|
|
return subprocess.run(arguments, check=True, capture_output=capture, text=True)
|
|
except subprocess.CalledProcessError as error:
|
|
raise CommandExecutionError(arguments, error.returncode) from error
|
|
|
|
|
|
def redact_command_arguments(arguments: list[str]) -> list[str]:
|
|
"""Return command arguments with sensitive values redacted."""
|
|
redacted = []
|
|
redact_next = False
|
|
for argument in arguments:
|
|
if redact_next:
|
|
redacted.append("<redacted>")
|
|
redact_next = False
|
|
continue
|
|
|
|
redacted.append(argument)
|
|
redact_next = argument in SENSITIVE_COMMAND_ARGUMENTS
|
|
return redacted
|
|
|
|
|
|
def read_metadata(aax_file: Path) -> dict[str, str]:
|
|
"""Read ffprobe format tags from an AAX file.
|
|
|
|
Args:
|
|
aax_file: AAX file to inspect.
|
|
|
|
Returns:
|
|
Lower-cased metadata tag names mapped to their values.
|
|
"""
|
|
completed = run_command(
|
|
[
|
|
"ffprobe",
|
|
"-v",
|
|
"quiet",
|
|
"-print_format",
|
|
"json",
|
|
"-show_format",
|
|
str(aax_file),
|
|
],
|
|
capture=True,
|
|
)
|
|
ffprobe_data: dict[str, Any] = json.loads(completed.stdout)
|
|
tags = ffprobe_data.get("format", {}).get("tags", {})
|
|
return {str(key).lower(): str(value) for key, value in tags.items()}
|
|
|
|
|
|
def output_stem(metadata: StandardBookMetadata) -> str:
|
|
"""Build the output stem for a book.
|
|
|
|
Args:
|
|
metadata: Book metadata.
|
|
|
|
Returns:
|
|
Output stem in author-series_01-title form.
|
|
"""
|
|
index_slug = series_index_slug(metadata.series_index, metadata.title)
|
|
return f"{metadata.author}-{metadata.series}_{index_slug}-{metadata.title}"
|
|
|
|
|
|
def series_index_slug(series_index: float, title: str = "") -> str:
|
|
"""Return a filename-safe series index."""
|
|
if title_range := title_series_range_slug(series_index, title):
|
|
return title_range
|
|
index = float(series_index)
|
|
if index.is_integer():
|
|
return f"{int(index):02}"
|
|
return f"{int(index):02}.5"
|
|
|
|
|
|
def title_series_range_slug(series_index: float, title: str) -> str | None:
|
|
"""Return a series range slug found in an omnibus title."""
|
|
index = float(series_index)
|
|
if not index.is_integer():
|
|
return None
|
|
first_index = int(index)
|
|
for match in BOOK_RANGE_PATTERN.finditer(title):
|
|
start = int(match.group("start"))
|
|
end = int(match.group("end"))
|
|
if start == first_index and end > start:
|
|
return f"{start:02}-{end:02}"
|
|
return None
|
|
|
|
|
|
def metadata_output_path(output_directory: Path, metadata: StandardBookMetadata) -> Path:
|
|
"""Build the final M4B path from resolved metadata."""
|
|
stem = output_stem(metadata)
|
|
return output_directory / stem / f"{stem}.m4b"
|
|
|
|
|
|
def convert_aax_file(
|
|
aax_file: Path,
|
|
destination: Path,
|
|
activation_bytes: str | None,
|
|
*,
|
|
overwrite: bool,
|
|
) -> None:
|
|
"""Convert an AAX file into an M4B file.
|
|
|
|
Args:
|
|
aax_file: Source AAX file.
|
|
destination: Destination M4B file.
|
|
activation_bytes: Optional Audible activation bytes for ffmpeg.
|
|
overwrite: Whether to overwrite an existing M4B.
|
|
"""
|
|
if destination.exists() and not overwrite:
|
|
logger.info("Skipping existing file %s", destination)
|
|
return
|
|
|
|
destination.parent.mkdir(parents=True, exist_ok=True)
|
|
arguments = ["ffmpeg", "-hide_banner", "-y" if overwrite else "-n"]
|
|
if activation_bytes:
|
|
arguments.extend(["-activation_bytes", activation_bytes])
|
|
arguments.extend(["-i", str(aax_file), "-map_metadata", "0", "-c", "copy", str(destination)])
|
|
run_command(arguments)
|
|
|
|
|
|
def write_review_file(
|
|
*,
|
|
destination: Path | None,
|
|
ffprobe_metadata: dict[str, str],
|
|
log_file: Path,
|
|
metadata: StandardBookMetadata | None,
|
|
reason: str,
|
|
review_file: Path,
|
|
source: Path,
|
|
temp_file: Path | None,
|
|
) -> None:
|
|
"""Write a manual review file for an unresolved conversion."""
|
|
review_file.parent.mkdir(parents=True, exist_ok=True)
|
|
payload = {
|
|
"destination": str(destination) if destination else None,
|
|
"ffprobe_metadata": ffprobe_metadata,
|
|
"metadata": asdict(metadata) if metadata else None,
|
|
"reason": reason,
|
|
"source": str(source),
|
|
"temp_file": str(temp_file) if temp_file else None,
|
|
}
|
|
review_file.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
|
|
write_agent_log(log_file, "review_written", path=str(review_file), reason=reason)
|
|
|
|
|
|
def cleanup_temp_output(temp_file: Path) -> None:
|
|
"""Remove a run's temporary output directory."""
|
|
shutil.rmtree(temp_file.parent, ignore_errors=True)
|
|
|
|
|
|
def dry_run_aax_file_with_agent(
|
|
aax_file: Path,
|
|
ffprobe_metadata: dict[str, str],
|
|
engine: Engine,
|
|
config: ConversionConfig,
|
|
log_file: Path,
|
|
review_file: Path,
|
|
) -> None:
|
|
"""Resolve and print the planned output path without converting."""
|
|
metadata = standard_book_metadata(
|
|
aax_file.name,
|
|
ffprobe_metadata,
|
|
engine,
|
|
log_file,
|
|
config.ollama_api_key,
|
|
config.agent_config,
|
|
)
|
|
destination = None if metadata.needs_review else metadata_output_path(config.resolved_output, metadata)
|
|
if metadata.needs_review:
|
|
write_review_file(
|
|
destination=destination,
|
|
ffprobe_metadata=ffprobe_metadata,
|
|
log_file=log_file,
|
|
metadata=metadata,
|
|
reason="metadata_needs_review",
|
|
review_file=review_file,
|
|
source=aax_file,
|
|
temp_file=None,
|
|
)
|
|
typer.echo(f"{aax_file} -> REVIEW {review_file}")
|
|
else:
|
|
stem = output_stem(metadata)
|
|
dry_run_file = (
|
|
config.resolved_output / config.work_directory_name / config.dry_run_directory_name / stem / f"{stem}.m4b"
|
|
)
|
|
dry_run_file.parent.mkdir(parents=True, exist_ok=True)
|
|
dry_run_file.write_text(f"{destination}\n", encoding="utf-8")
|
|
write_agent_log(
|
|
log_file,
|
|
"dry_run_file_written",
|
|
destination=str(destination),
|
|
path=str(dry_run_file),
|
|
)
|
|
typer.echo(f"{aax_file} -> {destination}")
|
|
|
|
|
|
def convert_temp_file_and_resolve_metadata(
|
|
aax_file: Path,
|
|
temp_file: Path,
|
|
ffprobe_metadata: dict[str, str],
|
|
config: ConversionConfig,
|
|
log_file: Path,
|
|
) -> ConcurrentConversionResult:
|
|
"""Run ffmpeg and metadata resolution in parallel."""
|
|
conversion_error: Exception | None = None
|
|
metadata_error: Exception | None = None
|
|
metadata: StandardBookMetadata | None = None
|
|
|
|
with ThreadPoolExecutor(max_workers=2) as executor:
|
|
conversion_future = executor.submit(
|
|
convert_aax_file,
|
|
aax_file,
|
|
temp_file,
|
|
config.activation_bytes,
|
|
overwrite=True,
|
|
)
|
|
metadata_future = executor.submit(
|
|
standard_book_metadata,
|
|
aax_file.name,
|
|
ffprobe_metadata,
|
|
config.engine,
|
|
log_file,
|
|
config.ollama_api_key,
|
|
config.agent_config,
|
|
)
|
|
|
|
conversion_error = conversion_future.exception()
|
|
if conversion_error is None:
|
|
conversion_future.result()
|
|
|
|
metadata_error = metadata_future.exception()
|
|
if metadata_error is None:
|
|
metadata = metadata_future.result()
|
|
|
|
return ConcurrentConversionResult(
|
|
metadata=metadata,
|
|
conversion_error=conversion_error,
|
|
metadata_error=metadata_error,
|
|
)
|
|
|
|
|
|
def convert_aax_file_with_agent(aax_file: Path, config: ConversionConfig) -> None:
|
|
"""Convert one AAX file using the metadata agent for the final path."""
|
|
run_id = uuid7().hex
|
|
log_file = config.resolved_output / config.work_directory_name / config.log_directory_name / f"{run_id}.jsonl"
|
|
review_file = config.resolved_output / config.work_directory_name / config.review_directory_name / f"{run_id}.json"
|
|
write_agent_log(log_file, "conversion_start", source=str(aax_file), dry_run=config.dry_run)
|
|
try:
|
|
ffprobe_metadata = read_metadata(aax_file)
|
|
except Exception as error:
|
|
logger.exception("ffprobe failed")
|
|
write_review_file(
|
|
destination=None,
|
|
ffprobe_metadata={},
|
|
log_file=log_file,
|
|
metadata=None,
|
|
reason=f"ffprobe_failed: {error}",
|
|
review_file=review_file,
|
|
source=aax_file,
|
|
temp_file=None,
|
|
)
|
|
return
|
|
|
|
if config.dry_run:
|
|
dry_run_aax_file_with_agent(
|
|
aax_file,
|
|
ffprobe_metadata,
|
|
config.engine,
|
|
config,
|
|
log_file,
|
|
review_file,
|
|
)
|
|
return
|
|
|
|
temp_file = (
|
|
config.resolved_output / config.work_directory_name / config.temp_directory_name / run_id / "converted.m4b"
|
|
)
|
|
temp_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
result = convert_temp_file_and_resolve_metadata(aax_file, temp_file, ffprobe_metadata, config, log_file)
|
|
|
|
if result.conversion_error:
|
|
reason = f"ffmpeg_failed: {result.conversion_error}"
|
|
write_review_file(
|
|
destination=None,
|
|
ffprobe_metadata=ffprobe_metadata,
|
|
log_file=log_file,
|
|
metadata=result.metadata,
|
|
reason=reason,
|
|
review_file=review_file,
|
|
source=aax_file,
|
|
temp_file=temp_file if temp_file.exists() else None,
|
|
)
|
|
return
|
|
|
|
if result.metadata_error:
|
|
write_review_file(
|
|
destination=None,
|
|
ffprobe_metadata=ffprobe_metadata,
|
|
log_file=log_file,
|
|
metadata=None,
|
|
reason=f"metadata_failed: {result.metadata_error}",
|
|
review_file=review_file,
|
|
source=aax_file,
|
|
temp_file=temp_file,
|
|
)
|
|
return
|
|
|
|
if result.metadata is None or result.metadata.needs_review:
|
|
write_review_file(
|
|
destination=None,
|
|
ffprobe_metadata=ffprobe_metadata,
|
|
log_file=log_file,
|
|
metadata=result.metadata,
|
|
reason="metadata_needs_review",
|
|
review_file=review_file,
|
|
source=aax_file,
|
|
temp_file=temp_file,
|
|
)
|
|
return
|
|
|
|
destination = metadata_output_path(config.resolved_output, result.metadata)
|
|
if destination.exists() and not config.overwrite:
|
|
write_agent_log(log_file, "destination_exists", destination=str(destination))
|
|
cleanup_temp_output(temp_file)
|
|
return
|
|
|
|
destination.parent.mkdir(parents=True, exist_ok=True)
|
|
try:
|
|
temp_file.replace(destination)
|
|
except Exception as error: # noqa: BLE001
|
|
write_review_file(
|
|
destination=destination,
|
|
ffprobe_metadata=ffprobe_metadata,
|
|
log_file=log_file,
|
|
metadata=result.metadata,
|
|
reason=f"rename_failed: {error}",
|
|
review_file=review_file,
|
|
source=aax_file,
|
|
temp_file=temp_file if temp_file.exists() else None,
|
|
)
|
|
else:
|
|
cleanup_temp_output(temp_file)
|
|
write_agent_log(log_file, "conversion_complete", destination=str(destination))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
typer.run(main)
|