built workflow
treefmt / nix fmt (pull_request) Successful in 6s
pytest / pytest (pull_request) Failing after 27s
build_systems / build-bob (pull_request) Successful in 47s
build_systems / build-brain (pull_request) Successful in 47s
build_systems / build-leviathan (pull_request) Successful in 52s
build_systems / build-rhapsody-in-green (pull_request) Successful in 58s
build_systems / build-jeeves (pull_request) Successful in 2m28s
treefmt / nix fmt (pull_request) Successful in 6s
pytest / pytest (pull_request) Failing after 27s
build_systems / build-bob (pull_request) Successful in 47s
build_systems / build-brain (pull_request) Successful in 47s
build_systems / build-leviathan (pull_request) Successful in 52s
build_systems / build-rhapsody-in-green (pull_request) Successful in 58s
build_systems / build-jeeves (pull_request) Successful in 2m28s
This commit is contained in:
@@ -0,0 +1 @@
|
||||
"""Audiobook tools."""
|
||||
@@ -0,0 +1,428 @@
|
||||
"""Convert Audible AAX downloads into Audiobookshelf-friendly M4B files."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import shutil
|
||||
import subprocess
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from dataclasses import asdict, dataclass
|
||||
from os import getenv
|
||||
from pathlib import Path # noqa: TC003 This is required for the typer CLI
|
||||
from typing import TYPE_CHECKING, Annotated, Any
|
||||
from uuid import uuid7
|
||||
|
||||
import typer
|
||||
|
||||
from python.common import configure_logger
|
||||
from python.orm.common import get_postgres_engine
|
||||
from python.tools.audiobook.metadata_agent import (
|
||||
AgentConfig,
|
||||
StandardBookMetadata,
|
||||
standard_book_metadata,
|
||||
write_agent_log,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from sqlalchemy.engine import Engine
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SENSITIVE_COMMAND_ARGUMENTS = {"-activation_bytes"}
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ConversionConfig:
|
||||
"""Runtime settings for one conversion command."""
|
||||
|
||||
resolved_output: Path
|
||||
ollama_api_key: str
|
||||
agent_config: AgentConfig
|
||||
engine: Engine
|
||||
activation_bytes: str | None
|
||||
dry_run: bool
|
||||
overwrite: bool
|
||||
work_directory_name: str = ".audible_convert"
|
||||
temp_directory_name: str = "tmp"
|
||||
log_directory_name: str = "logs"
|
||||
review_directory_name: str = "review"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ConcurrentConversionResult:
|
||||
"""Result from running ffmpeg and metadata resolution together."""
|
||||
|
||||
metadata: StandardBookMetadata | None
|
||||
conversion_error: Exception | None
|
||||
metadata_error: Exception | None
|
||||
|
||||
|
||||
class CommandExecutionError(RuntimeError):
|
||||
"""Command failed without exposing sensitive arguments."""
|
||||
|
||||
def __init__(self, arguments: list[str], returncode: int) -> None:
|
||||
"""Create a redacted command failure."""
|
||||
self.arguments = tuple(arguments)
|
||||
self.returncode = returncode
|
||||
command = " ".join(redact_command_arguments(arguments))
|
||||
super().__init__(f"Command failed with exit code {returncode}: {command}")
|
||||
|
||||
|
||||
def main(
|
||||
input_directory: Annotated[Path, typer.Argument(help="Directory audible-cli downloads AAX files into.")],
|
||||
output_directory: Annotated[Path, typer.Argument(help="Audiobook output directory.")],
|
||||
*,
|
||||
dry_run: Annotated[bool, typer.Option("--dry-run", help="Print planned output files without converting.")] = False,
|
||||
overwrite: Annotated[bool, typer.Option("--overwrite", help="Overwrite existing M4B files.")] = False,
|
||||
) -> None:
|
||||
"""Convert AAX files from a download directory into M4B files."""
|
||||
configure_logger()
|
||||
resolved_input = input_directory.resolve(strict=True)
|
||||
resolved_output = output_directory.resolve()
|
||||
if not dry_run:
|
||||
resolved_output.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
ollama_api_key = getenv("OLLAMA_API_KEY")
|
||||
if not ollama_api_key:
|
||||
msg = "OLLAMA_API_KEY is required for audiobook metadata resolution"
|
||||
raise RuntimeError(msg)
|
||||
|
||||
config = ConversionConfig(
|
||||
resolved_output=resolved_output,
|
||||
ollama_api_key=ollama_api_key,
|
||||
agent_config=AgentConfig(),
|
||||
engine=get_postgres_engine(name="RICHIE"),
|
||||
activation_bytes=getenv("AUDIBLE_ACTIVATION_BYTES"),
|
||||
dry_run=dry_run,
|
||||
overwrite=overwrite,
|
||||
)
|
||||
|
||||
aax_files = sorted(resolved_input.glob("*.aax"))
|
||||
if not aax_files:
|
||||
logger.info("No AAX files found in %s", resolved_input)
|
||||
return
|
||||
for aax_file in aax_files:
|
||||
logger.info("Converting %s", aax_file)
|
||||
convert_aax_file_with_agent(aax_file, config)
|
||||
|
||||
|
||||
def run_command(arguments: list[str], *, capture: bool = False) -> subprocess.CompletedProcess[str]:
|
||||
"""Run a command and return the completed process.
|
||||
|
||||
Args:
|
||||
arguments: Command and arguments to run.
|
||||
capture: Whether to capture stdout and stderr.
|
||||
|
||||
Returns:
|
||||
The completed process.
|
||||
"""
|
||||
logger.debug("%s", " ".join(redact_command_arguments(arguments)))
|
||||
try:
|
||||
return subprocess.run(arguments, check=True, capture_output=capture, text=True)
|
||||
except subprocess.CalledProcessError as error:
|
||||
raise CommandExecutionError(arguments, error.returncode) from error
|
||||
|
||||
|
||||
def redact_command_arguments(arguments: list[str]) -> list[str]:
|
||||
"""Return command arguments with sensitive values redacted."""
|
||||
redacted = []
|
||||
redact_next = False
|
||||
for argument in arguments:
|
||||
if redact_next:
|
||||
redacted.append("<redacted>")
|
||||
redact_next = False
|
||||
continue
|
||||
|
||||
redacted.append(argument)
|
||||
redact_next = argument in SENSITIVE_COMMAND_ARGUMENTS
|
||||
return redacted
|
||||
|
||||
|
||||
def read_metadata(aax_file: Path) -> dict[str, str]:
|
||||
"""Read ffprobe format tags from an AAX file.
|
||||
|
||||
Args:
|
||||
aax_file: AAX file to inspect.
|
||||
|
||||
Returns:
|
||||
Lower-cased metadata tag names mapped to their values.
|
||||
"""
|
||||
completed = run_command(
|
||||
[
|
||||
"ffprobe",
|
||||
"-v",
|
||||
"quiet",
|
||||
"-print_format",
|
||||
"json",
|
||||
"-show_format",
|
||||
str(aax_file),
|
||||
],
|
||||
capture=True,
|
||||
)
|
||||
ffprobe_data: dict[str, Any] = json.loads(completed.stdout)
|
||||
tags = ffprobe_data.get("format", {}).get("tags", {})
|
||||
return {str(key).lower(): str(value) for key, value in tags.items()}
|
||||
|
||||
|
||||
def output_stem(metadata: StandardBookMetadata) -> str:
|
||||
"""Build the output stem for a book.
|
||||
|
||||
Args:
|
||||
metadata: Book metadata.
|
||||
|
||||
Returns:
|
||||
Output stem in author-series_01-title form.
|
||||
"""
|
||||
return f"{metadata.author}-{metadata.series}_{metadata.series_index:02}-{metadata.title}"
|
||||
|
||||
|
||||
def metadata_output_path(output_directory: Path, metadata: StandardBookMetadata) -> Path:
|
||||
"""Build the final M4B path from resolved metadata."""
|
||||
stem = output_stem(metadata)
|
||||
return output_directory / stem / f"{stem}.m4b"
|
||||
|
||||
|
||||
def convert_aax_file(
|
||||
aax_file: Path,
|
||||
destination: Path,
|
||||
activation_bytes: str | None,
|
||||
*,
|
||||
overwrite: bool,
|
||||
) -> None:
|
||||
"""Convert an AAX file into an M4B file.
|
||||
|
||||
Args:
|
||||
aax_file: Source AAX file.
|
||||
destination: Destination M4B file.
|
||||
activation_bytes: Optional Audible activation bytes for ffmpeg.
|
||||
overwrite: Whether to overwrite an existing M4B.
|
||||
"""
|
||||
if destination.exists() and not overwrite:
|
||||
logger.info("Skipping existing file %s", destination)
|
||||
return
|
||||
|
||||
destination.parent.mkdir(parents=True, exist_ok=True)
|
||||
arguments = ["ffmpeg", "-hide_banner", "-y" if overwrite else "-n"]
|
||||
if activation_bytes:
|
||||
arguments.extend(["-activation_bytes", activation_bytes])
|
||||
arguments.extend(["-i", str(aax_file), "-map_metadata", "0", "-c", "copy", str(destination)])
|
||||
run_command(arguments)
|
||||
|
||||
|
||||
def write_review_file(
|
||||
*,
|
||||
destination: Path | None,
|
||||
ffprobe_metadata: dict[str, str],
|
||||
log_file: Path,
|
||||
metadata: StandardBookMetadata | None,
|
||||
reason: str,
|
||||
review_file: Path,
|
||||
source: Path,
|
||||
temp_file: Path | None,
|
||||
) -> None:
|
||||
"""Write a manual review file for an unresolved conversion."""
|
||||
review_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
payload = {
|
||||
"destination": str(destination) if destination else None,
|
||||
"ffprobe_metadata": ffprobe_metadata,
|
||||
"metadata": asdict(metadata) if metadata else None,
|
||||
"reason": reason,
|
||||
"source": str(source),
|
||||
"temp_file": str(temp_file) if temp_file else None,
|
||||
}
|
||||
review_file.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
|
||||
write_agent_log(log_file, "review_written", path=str(review_file), reason=reason)
|
||||
|
||||
|
||||
def cleanup_temp_output(temp_file: Path) -> None:
|
||||
"""Remove a run's temporary output directory."""
|
||||
shutil.rmtree(temp_file.parent, ignore_errors=True)
|
||||
|
||||
|
||||
def dry_run_aax_file_with_agent(
|
||||
aax_file: Path,
|
||||
ffprobe_metadata: dict[str, str],
|
||||
engine: Engine,
|
||||
config: ConversionConfig,
|
||||
log_file: Path,
|
||||
review_file: Path,
|
||||
) -> None:
|
||||
"""Resolve and print the planned output path without converting."""
|
||||
metadata = standard_book_metadata(
|
||||
aax_file.name,
|
||||
ffprobe_metadata,
|
||||
engine,
|
||||
log_file,
|
||||
config.ollama_api_key,
|
||||
config.agent_config,
|
||||
)
|
||||
destination = None if metadata.needs_review else metadata_output_path(config.resolved_output, metadata)
|
||||
if metadata.needs_review:
|
||||
write_review_file(
|
||||
destination=destination,
|
||||
ffprobe_metadata=ffprobe_metadata,
|
||||
log_file=log_file,
|
||||
metadata=metadata,
|
||||
reason="metadata_needs_review",
|
||||
review_file=review_file,
|
||||
source=aax_file,
|
||||
temp_file=None,
|
||||
)
|
||||
typer.echo(f"{aax_file} -> REVIEW {review_file}")
|
||||
else:
|
||||
typer.echo(f"{aax_file} -> {destination}")
|
||||
|
||||
|
||||
def convert_temp_file_and_resolve_metadata(
|
||||
aax_file: Path,
|
||||
temp_file: Path,
|
||||
ffprobe_metadata: dict[str, str],
|
||||
config: ConversionConfig,
|
||||
log_file: Path,
|
||||
) -> ConcurrentConversionResult:
|
||||
"""Run ffmpeg and metadata resolution in parallel."""
|
||||
conversion_error: Exception | None = None
|
||||
metadata_error: Exception | None = None
|
||||
metadata: StandardBookMetadata | None = None
|
||||
|
||||
with ThreadPoolExecutor(max_workers=2) as executor:
|
||||
conversion_future = executor.submit(
|
||||
convert_aax_file,
|
||||
aax_file,
|
||||
temp_file,
|
||||
config.activation_bytes,
|
||||
overwrite=True,
|
||||
)
|
||||
metadata_future = executor.submit(
|
||||
standard_book_metadata,
|
||||
aax_file.name,
|
||||
ffprobe_metadata,
|
||||
config.engine,
|
||||
log_file,
|
||||
config.ollama_api_key,
|
||||
config.agent_config,
|
||||
)
|
||||
|
||||
conversion_error = conversion_future.exception()
|
||||
if conversion_error is None:
|
||||
conversion_future.result()
|
||||
|
||||
metadata_error = metadata_future.exception()
|
||||
if metadata_error is None:
|
||||
metadata = metadata_future.result()
|
||||
|
||||
return ConcurrentConversionResult(
|
||||
metadata=metadata,
|
||||
conversion_error=conversion_error,
|
||||
metadata_error=metadata_error,
|
||||
)
|
||||
|
||||
|
||||
def convert_aax_file_with_agent(aax_file: Path, config: ConversionConfig) -> None:
|
||||
"""Convert one AAX file using the metadata agent for the final path."""
|
||||
run_id = uuid7().hex
|
||||
log_file = config.resolved_output / config.work_directory_name / config.log_directory_name / f"{run_id}.jsonl"
|
||||
review_file = config.resolved_output / config.work_directory_name / config.review_directory_name / f"{run_id}.json"
|
||||
write_agent_log(log_file, "conversion_start", source=str(aax_file), dry_run=config.dry_run)
|
||||
try:
|
||||
ffprobe_metadata = read_metadata(aax_file)
|
||||
except Exception as error:
|
||||
logger.exception("ffprobe failed")
|
||||
write_review_file(
|
||||
destination=None,
|
||||
ffprobe_metadata={},
|
||||
log_file=log_file,
|
||||
metadata=None,
|
||||
reason=f"ffprobe_failed: {error}",
|
||||
review_file=review_file,
|
||||
source=aax_file,
|
||||
temp_file=None,
|
||||
)
|
||||
return
|
||||
|
||||
if config.dry_run:
|
||||
dry_run_aax_file_with_agent(
|
||||
aax_file,
|
||||
ffprobe_metadata,
|
||||
config.engine,
|
||||
config,
|
||||
log_file,
|
||||
review_file,
|
||||
)
|
||||
return
|
||||
|
||||
temp_file = (
|
||||
config.resolved_output / config.work_directory_name / config.temp_directory_name / run_id / "converted.m4b"
|
||||
)
|
||||
temp_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
result = convert_temp_file_and_resolve_metadata(aax_file, temp_file, ffprobe_metadata, config, log_file)
|
||||
|
||||
if result.conversion_error:
|
||||
reason = f"ffmpeg_failed: {result.conversion_error}"
|
||||
write_review_file(
|
||||
destination=None,
|
||||
ffprobe_metadata=ffprobe_metadata,
|
||||
log_file=log_file,
|
||||
metadata=result.metadata,
|
||||
reason=reason,
|
||||
review_file=review_file,
|
||||
source=aax_file,
|
||||
temp_file=temp_file if temp_file.exists() else None,
|
||||
)
|
||||
return
|
||||
|
||||
if result.metadata_error:
|
||||
write_review_file(
|
||||
destination=None,
|
||||
ffprobe_metadata=ffprobe_metadata,
|
||||
log_file=log_file,
|
||||
metadata=None,
|
||||
reason=f"metadata_failed: {result.metadata_error}",
|
||||
review_file=review_file,
|
||||
source=aax_file,
|
||||
temp_file=temp_file,
|
||||
)
|
||||
return
|
||||
|
||||
if result.metadata is None or result.metadata.needs_review:
|
||||
write_review_file(
|
||||
destination=None,
|
||||
ffprobe_metadata=ffprobe_metadata,
|
||||
log_file=log_file,
|
||||
metadata=result.metadata,
|
||||
reason="metadata_needs_review",
|
||||
review_file=review_file,
|
||||
source=aax_file,
|
||||
temp_file=temp_file,
|
||||
)
|
||||
return
|
||||
|
||||
destination = metadata_output_path(config.resolved_output, result.metadata)
|
||||
if destination.exists() and not config.overwrite:
|
||||
write_agent_log(log_file, "destination_exists", destination=str(destination))
|
||||
cleanup_temp_output(temp_file)
|
||||
return
|
||||
|
||||
destination.parent.mkdir(parents=True, exist_ok=True)
|
||||
try:
|
||||
temp_file.replace(destination)
|
||||
except Exception as error: # noqa: BLE001
|
||||
write_review_file(
|
||||
destination=destination,
|
||||
ffprobe_metadata=ffprobe_metadata,
|
||||
log_file=log_file,
|
||||
metadata=result.metadata,
|
||||
reason=f"rename_failed: {error}",
|
||||
review_file=review_file,
|
||||
source=aax_file,
|
||||
temp_file=temp_file if temp_file.exists() else None,
|
||||
)
|
||||
else:
|
||||
cleanup_temp_output(temp_file)
|
||||
write_agent_log(log_file, "conversion_complete", destination=str(destination))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
typer.run(main)
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user