"""Docker container lifecycle management for BERTopic jobs on Jeeves.""" from __future__ import annotations import logging import os import subprocess from pathlib import Path from typing import Annotated, Literal import typer logger = logging.getLogger(__name__) JOBMode = Literal["train", "infer"] IMAGE_NAME = "bert-topic:latest" REPO_DIR = Path(__file__).resolve().parents[3] DEFAULT_CACHE_ROOT = Path("/zfs/storage/main/ds_thing/models/bert_topic") DEFAULT_POSTGRES_SOCKET_DIR = Path("/run/postgresql") DB_ENV_VARS = ( "DATA_SCIENCE_DEV_DB", "DATA_SCIENCE_DEV_HOST", "DATA_SCIENCE_DEV_PORT", "DATA_SCIENCE_DEV_USER", "DATA_SCIENCE_DEV_PASSWORD", ) app = typer.Typer(help="BERTopic container management.") def _container_name(mode: JOBMode) -> str: """Return the Docker container name for the selected BERTopic job.""" return f"bert-topic-{mode}" def _module_name(mode: JOBMode) -> str: """Return the Python module to run inside the container.""" return f"pipelines.bert_topic.{mode}" def _env_args(*, use_postgres_socket: bool) -> list[str]: """Pass through database environment variables from the host shell.""" required = [ "DATA_SCIENCE_DEV_DB", "DATA_SCIENCE_DEV_PORT", "DATA_SCIENCE_DEV_USER", ] if not use_postgres_socket: required.append("DATA_SCIENCE_DEV_HOST") missing = [name for name in required if not os.getenv(name)] if missing: message = "Missing required database environment variables: " + ", ".join( missing ) raise RuntimeError(message) args: list[str] = [] if use_postgres_socket: args.extend(["-e", f"DATA_SCIENCE_DEV_HOST={DEFAULT_POSTGRES_SOCKET_DIR}"]) for name in DB_ENV_VARS: if use_postgres_socket and name == "DATA_SCIENCE_DEV_HOST": continue if os.getenv(name): args.extend(["-e", name]) return args def build_image() -> None: """Build the BERTopic Docker image.""" dockerfile = REPO_DIR / "pipelines/containers/docker_files/Dockerfile.bert_topic" logger.info("Building BERTopic image: %s", IMAGE_NAME) result = subprocess.run( [ "docker", "build", "--network", "host", "-f", str(dockerfile), "-t", IMAGE_NAME, str(REPO_DIR), ], capture_output=True, text=True, check=False, ) if result.returncode != 0: message = ( "Failed to build BERTopic image. " f"docker build stderr:\n{result.stderr.strip()}" ) raise RuntimeError(message) logger.info("Image built: %s", IMAGE_NAME) def stop_job(*, mode: JOBMode) -> None: """Stop and remove the BERTopic container for the selected mode.""" container_name = _container_name(mode) logger.info("Stopping BERTopic container: %s", container_name) subprocess.run(["docker", "stop", container_name], capture_output=True, check=False) subprocess.run( ["docker", "rm", "-f", container_name], capture_output=True, check=False ) def start_job( *, mode: JOBMode, cache_root: Path = DEFAULT_CACHE_ROOT, postgres_socket_dir: Path = DEFAULT_POSTGRES_SOCKET_DIR, detach: bool = False, ) -> None: """Run BERTopic training or inference in Docker on Jeeves.""" cache_root = cache_root.resolve() cache_root.mkdir(parents=True, exist_ok=True) postgres_socket_dir = postgres_socket_dir.resolve() stop_job(mode=mode) use_postgres_socket = postgres_socket_dir.exists() command = [ "docker", "run", "--name", _container_name(mode), "--ipc=host", "-v", f"{cache_root}:/cache", *_env_args(use_postgres_socket=use_postgres_socket), IMAGE_NAME, _module_name(mode), ] if use_postgres_socket: command[7:7] = ["-v", f"{postgres_socket_dir}:{DEFAULT_POSTGRES_SOCKET_DIR}"] if detach: command.insert(2, "-d") logger.info("Starting BERTopic %s container", mode) logger.info(" Cache root: %s", cache_root) if use_postgres_socket: logger.info(" Postgres socket: %s", postgres_socket_dir) result = subprocess.run(command, text=True, capture_output=detach, check=False) if result.returncode != 0: detail = ( result.stderr.strip() if result.stderr else f"exit code {result.returncode}" ) raise RuntimeError(f"BERTopic container failed to start: {detail}") if detach: logger.info("Container started: %s", result.stdout.strip()[:12]) else: logger.info("BERTopic %s run complete", mode) def logs_job(*, mode: JOBMode) -> str | None: """Return recent logs from the BERTopic container, or None if absent.""" result = subprocess.run( ["docker", "logs", "--tail", "100", _container_name(mode)], capture_output=True, text=True, check=False, ) if result.returncode != 0: return None return result.stdout + result.stderr @app.command() def build( log_level: Annotated[str, typer.Option(help="Log level")] = "INFO", ) -> None: """Build the BERTopic Docker image.""" logging.basicConfig( level=log_level, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) build_image() @app.command("run") def run_job_command( mode: Annotated[JOBMode, typer.Option(help="Which BERTopic job to run")] = "train", cache_root: Annotated[ Path, typer.Option(help="Host path mounted to /cache for model and HF cache") ] = DEFAULT_CACHE_ROOT, postgres_socket_dir: Annotated[ Path, typer.Option(help="Host Postgres socket directory to mount into the container") ] = DEFAULT_POSTGRES_SOCKET_DIR, detach: Annotated[ bool, typer.Option(help="Start the container in the background") ] = False, log_level: Annotated[str, typer.Option(help="Log level")] = "INFO", ) -> None: """Run BERTopic training or inference inside Docker.""" logging.basicConfig( level=log_level, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) start_job( mode=mode, cache_root=cache_root, postgres_socket_dir=postgres_socket_dir, detach=detach, ) @app.command("stop") def stop_job_command( mode: Annotated[ JOBMode, typer.Option(help="Which BERTopic container to stop") ] = "train", ) -> None: """Stop and remove the BERTopic container.""" stop_job(mode=mode) @app.command("logs") def logs_job_command( mode: Annotated[ JOBMode, typer.Option(help="Which BERTopic container logs to show") ] = "train", ) -> None: """Show recent logs from the BERTopic container.""" output = logs_job(mode=mode) if output is None: typer.echo(f"No BERTopic container found for mode={mode}.") raise typer.Exit(code=1) typer.echo(output) def cli() -> None: """Typer entry point.""" app() if __name__ == "__main__": cli()