236 lines
7.0 KiB
Python
236 lines
7.0 KiB
Python
"""Docker container lifecycle management for BERTopic jobs on Jeeves."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import Annotated, Literal
|
|
|
|
import typer
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
JOBMode = Literal["train", "infer"]
|
|
IMAGE_NAME = "bert-topic:latest"
|
|
REPO_DIR = Path(__file__).resolve().parents[3]
|
|
DEFAULT_CACHE_ROOT = Path("/zfs/storage/main/ds_thing/models/bert_topic")
|
|
DEFAULT_POSTGRES_SOCKET_DIR = Path("/run/postgresql")
|
|
DB_ENV_VARS = (
|
|
"DATA_SCIENCE_DEV_DB",
|
|
"DATA_SCIENCE_DEV_HOST",
|
|
"DATA_SCIENCE_DEV_PORT",
|
|
"DATA_SCIENCE_DEV_USER",
|
|
"DATA_SCIENCE_DEV_PASSWORD",
|
|
)
|
|
|
|
app = typer.Typer(help="BERTopic container management.")
|
|
|
|
|
|
def _container_name(mode: JOBMode) -> str:
|
|
"""Return the Docker container name for the selected BERTopic job."""
|
|
return f"bert-topic-{mode}"
|
|
|
|
|
|
def _module_name(mode: JOBMode) -> str:
|
|
"""Return the Python module to run inside the container."""
|
|
return f"pipelines.bert_topic.{mode}"
|
|
|
|
|
|
def _env_args(*, use_postgres_socket: bool) -> list[str]:
|
|
"""Pass through database environment variables from the host shell."""
|
|
required = [
|
|
"DATA_SCIENCE_DEV_DB",
|
|
"DATA_SCIENCE_DEV_PORT",
|
|
"DATA_SCIENCE_DEV_USER",
|
|
]
|
|
if not use_postgres_socket:
|
|
required.append("DATA_SCIENCE_DEV_HOST")
|
|
missing = [name for name in required if not os.getenv(name)]
|
|
if missing:
|
|
message = "Missing required database environment variables: " + ", ".join(
|
|
missing
|
|
)
|
|
raise RuntimeError(message)
|
|
args: list[str] = []
|
|
if use_postgres_socket:
|
|
args.extend(["-e", f"DATA_SCIENCE_DEV_HOST={DEFAULT_POSTGRES_SOCKET_DIR}"])
|
|
for name in DB_ENV_VARS:
|
|
if use_postgres_socket and name == "DATA_SCIENCE_DEV_HOST":
|
|
continue
|
|
if os.getenv(name):
|
|
args.extend(["-e", name])
|
|
return args
|
|
|
|
|
|
def build_image() -> None:
|
|
"""Build the BERTopic Docker image."""
|
|
dockerfile = REPO_DIR / "pipelines/containers/docker_files/Dockerfile.bert_topic"
|
|
logger.info("Building BERTopic image: %s", IMAGE_NAME)
|
|
result = subprocess.run(
|
|
[
|
|
"docker",
|
|
"build",
|
|
"--network",
|
|
"host",
|
|
"-f",
|
|
str(dockerfile),
|
|
"-t",
|
|
IMAGE_NAME,
|
|
str(REPO_DIR),
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
check=False,
|
|
)
|
|
if result.returncode != 0:
|
|
message = (
|
|
"Failed to build BERTopic image. "
|
|
f"docker build stderr:\n{result.stderr.strip()}"
|
|
)
|
|
raise RuntimeError(message)
|
|
logger.info("Image built: %s", IMAGE_NAME)
|
|
|
|
|
|
def stop_job(*, mode: JOBMode) -> None:
|
|
"""Stop and remove the BERTopic container for the selected mode."""
|
|
container_name = _container_name(mode)
|
|
logger.info("Stopping BERTopic container: %s", container_name)
|
|
subprocess.run(["docker", "stop", container_name], capture_output=True, check=False)
|
|
subprocess.run(
|
|
["docker", "rm", "-f", container_name], capture_output=True, check=False
|
|
)
|
|
|
|
|
|
def start_job(
|
|
*,
|
|
mode: JOBMode,
|
|
cache_root: Path = DEFAULT_CACHE_ROOT,
|
|
postgres_socket_dir: Path = DEFAULT_POSTGRES_SOCKET_DIR,
|
|
detach: bool = False,
|
|
) -> None:
|
|
"""Run BERTopic training or inference in Docker on Jeeves."""
|
|
cache_root = cache_root.resolve()
|
|
cache_root.mkdir(parents=True, exist_ok=True)
|
|
postgres_socket_dir = postgres_socket_dir.resolve()
|
|
stop_job(mode=mode)
|
|
use_postgres_socket = postgres_socket_dir.exists()
|
|
|
|
command = [
|
|
"docker",
|
|
"run",
|
|
"--name",
|
|
_container_name(mode),
|
|
"--ipc=host",
|
|
"-v",
|
|
f"{cache_root}:/cache",
|
|
*_env_args(use_postgres_socket=use_postgres_socket),
|
|
IMAGE_NAME,
|
|
_module_name(mode),
|
|
]
|
|
if use_postgres_socket:
|
|
command[7:7] = ["-v", f"{postgres_socket_dir}:{DEFAULT_POSTGRES_SOCKET_DIR}"]
|
|
if detach:
|
|
command.insert(2, "-d")
|
|
|
|
logger.info("Starting BERTopic %s container", mode)
|
|
logger.info(" Cache root: %s", cache_root)
|
|
if use_postgres_socket:
|
|
logger.info(" Postgres socket: %s", postgres_socket_dir)
|
|
result = subprocess.run(command, text=True, capture_output=detach, check=False)
|
|
if result.returncode != 0:
|
|
detail = (
|
|
result.stderr.strip() if result.stderr else f"exit code {result.returncode}"
|
|
)
|
|
raise RuntimeError(f"BERTopic container failed to start: {detail}")
|
|
if detach:
|
|
logger.info("Container started: %s", result.stdout.strip()[:12])
|
|
else:
|
|
logger.info("BERTopic %s run complete", mode)
|
|
|
|
|
|
def logs_job(*, mode: JOBMode) -> str | None:
|
|
"""Return recent logs from the BERTopic container, or None if absent."""
|
|
result = subprocess.run(
|
|
["docker", "logs", "--tail", "100", _container_name(mode)],
|
|
capture_output=True,
|
|
text=True,
|
|
check=False,
|
|
)
|
|
if result.returncode != 0:
|
|
return None
|
|
return result.stdout + result.stderr
|
|
|
|
|
|
@app.command()
|
|
def build(
|
|
log_level: Annotated[str, typer.Option(help="Log level")] = "INFO",
|
|
) -> None:
|
|
"""Build the BERTopic Docker image."""
|
|
logging.basicConfig(
|
|
level=log_level,
|
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
|
)
|
|
build_image()
|
|
|
|
|
|
@app.command("run")
|
|
def run_job_command(
|
|
mode: Annotated[JOBMode, typer.Option(help="Which BERTopic job to run")] = "train",
|
|
cache_root: Annotated[
|
|
Path, typer.Option(help="Host path mounted to /cache for model and HF cache")
|
|
] = DEFAULT_CACHE_ROOT,
|
|
postgres_socket_dir: Annotated[
|
|
Path, typer.Option(help="Host Postgres socket directory to mount into the container")
|
|
] = DEFAULT_POSTGRES_SOCKET_DIR,
|
|
detach: Annotated[
|
|
bool, typer.Option(help="Start the container in the background")
|
|
] = False,
|
|
log_level: Annotated[str, typer.Option(help="Log level")] = "INFO",
|
|
) -> None:
|
|
"""Run BERTopic training or inference inside Docker."""
|
|
logging.basicConfig(
|
|
level=log_level,
|
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
|
)
|
|
start_job(
|
|
mode=mode,
|
|
cache_root=cache_root,
|
|
postgres_socket_dir=postgres_socket_dir,
|
|
detach=detach,
|
|
)
|
|
|
|
|
|
@app.command("stop")
|
|
def stop_job_command(
|
|
mode: Annotated[
|
|
JOBMode, typer.Option(help="Which BERTopic container to stop")
|
|
] = "train",
|
|
) -> None:
|
|
"""Stop and remove the BERTopic container."""
|
|
stop_job(mode=mode)
|
|
|
|
|
|
@app.command("logs")
|
|
def logs_job_command(
|
|
mode: Annotated[
|
|
JOBMode, typer.Option(help="Which BERTopic container logs to show")
|
|
] = "train",
|
|
) -> None:
|
|
"""Show recent logs from the BERTopic container."""
|
|
output = logs_job(mode=mode)
|
|
if output is None:
|
|
typer.echo(f"No BERTopic container found for mode={mode}.")
|
|
raise typer.Exit(code=1)
|
|
typer.echo(output)
|
|
|
|
|
|
def cli() -> None:
|
|
"""Typer entry point."""
|
|
app()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cli()
|