moved containers dir and created docker_files dir

This commit is contained in:
2026-04-28 22:36:13 -04:00
parent 3056c19f69
commit 09f7f0187f
8 changed files with 286 additions and 27 deletions
+1
View File
@@ -0,0 +1 @@
"""Prompt benchmarking system for evaluating LLMs via vLLM."""
+235
View File
@@ -0,0 +1,235 @@
"""Docker container lifecycle management for BERTopic jobs on Jeeves."""
from __future__ import annotations
import logging
import os
import subprocess
from pathlib import Path
from typing import Annotated, Literal
import typer
logger = logging.getLogger(__name__)
JOBMode = Literal["train", "infer"]
IMAGE_NAME = "bert-topic:latest"
REPO_DIR = Path(__file__).resolve().parents[3]
DEFAULT_CACHE_ROOT = Path("/zfs/storage/main/ds_thing/models/bert_topic")
DEFAULT_POSTGRES_SOCKET_DIR = Path("/run/postgresql")
DB_ENV_VARS = (
"DATA_SCIENCE_DEV_DB",
"DATA_SCIENCE_DEV_HOST",
"DATA_SCIENCE_DEV_PORT",
"DATA_SCIENCE_DEV_USER",
"DATA_SCIENCE_DEV_PASSWORD",
)
app = typer.Typer(help="BERTopic container management.")
def _container_name(mode: JOBMode) -> str:
"""Return the Docker container name for the selected BERTopic job."""
return f"bert-topic-{mode}"
def _module_name(mode: JOBMode) -> str:
"""Return the Python module to run inside the container."""
return f"pipelines.bert_topic.{mode}"
def _env_args(*, use_postgres_socket: bool) -> list[str]:
"""Pass through database environment variables from the host shell."""
required = [
"DATA_SCIENCE_DEV_DB",
"DATA_SCIENCE_DEV_PORT",
"DATA_SCIENCE_DEV_USER",
]
if not use_postgres_socket:
required.append("DATA_SCIENCE_DEV_HOST")
missing = [name for name in required if not os.getenv(name)]
if missing:
message = "Missing required database environment variables: " + ", ".join(
missing
)
raise RuntimeError(message)
args: list[str] = []
if use_postgres_socket:
args.extend(["-e", f"DATA_SCIENCE_DEV_HOST={DEFAULT_POSTGRES_SOCKET_DIR}"])
for name in DB_ENV_VARS:
if use_postgres_socket and name == "DATA_SCIENCE_DEV_HOST":
continue
if os.getenv(name):
args.extend(["-e", name])
return args
def build_image() -> None:
"""Build the BERTopic Docker image."""
dockerfile = REPO_DIR / "pipelines/containers/docker_files/Dockerfile.bert_topic"
logger.info("Building BERTopic image: %s", IMAGE_NAME)
result = subprocess.run(
[
"docker",
"build",
"--network",
"host",
"-f",
str(dockerfile),
"-t",
IMAGE_NAME,
str(REPO_DIR),
],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
message = (
"Failed to build BERTopic image. "
f"docker build stderr:\n{result.stderr.strip()}"
)
raise RuntimeError(message)
logger.info("Image built: %s", IMAGE_NAME)
def stop_job(*, mode: JOBMode) -> None:
"""Stop and remove the BERTopic container for the selected mode."""
container_name = _container_name(mode)
logger.info("Stopping BERTopic container: %s", container_name)
subprocess.run(["docker", "stop", container_name], capture_output=True, check=False)
subprocess.run(
["docker", "rm", "-f", container_name], capture_output=True, check=False
)
def start_job(
*,
mode: JOBMode,
cache_root: Path = DEFAULT_CACHE_ROOT,
postgres_socket_dir: Path = DEFAULT_POSTGRES_SOCKET_DIR,
detach: bool = False,
) -> None:
"""Run BERTopic training or inference in Docker on Jeeves."""
cache_root = cache_root.resolve()
cache_root.mkdir(parents=True, exist_ok=True)
postgres_socket_dir = postgres_socket_dir.resolve()
stop_job(mode=mode)
use_postgres_socket = postgres_socket_dir.exists()
command = [
"docker",
"run",
"--name",
_container_name(mode),
"--ipc=host",
"-v",
f"{cache_root}:/cache",
*_env_args(use_postgres_socket=use_postgres_socket),
IMAGE_NAME,
_module_name(mode),
]
if use_postgres_socket:
command[7:7] = ["-v", f"{postgres_socket_dir}:{DEFAULT_POSTGRES_SOCKET_DIR}"]
if detach:
command.insert(2, "-d")
logger.info("Starting BERTopic %s container", mode)
logger.info(" Cache root: %s", cache_root)
if use_postgres_socket:
logger.info(" Postgres socket: %s", postgres_socket_dir)
result = subprocess.run(command, text=True, capture_output=detach, check=False)
if result.returncode != 0:
detail = (
result.stderr.strip() if result.stderr else f"exit code {result.returncode}"
)
raise RuntimeError(f"BERTopic container failed to start: {detail}")
if detach:
logger.info("Container started: %s", result.stdout.strip()[:12])
else:
logger.info("BERTopic %s run complete", mode)
def logs_job(*, mode: JOBMode) -> str | None:
"""Return recent logs from the BERTopic container, or None if absent."""
result = subprocess.run(
["docker", "logs", "--tail", "100", _container_name(mode)],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
return None
return result.stdout + result.stderr
@app.command()
def build(
log_level: Annotated[str, typer.Option(help="Log level")] = "INFO",
) -> None:
"""Build the BERTopic Docker image."""
logging.basicConfig(
level=log_level,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
build_image()
@app.command("run")
def run_job_command(
mode: Annotated[JOBMode, typer.Option(help="Which BERTopic job to run")] = "train",
cache_root: Annotated[
Path, typer.Option(help="Host path mounted to /cache for model and HF cache")
] = DEFAULT_CACHE_ROOT,
postgres_socket_dir: Annotated[
Path, typer.Option(help="Host Postgres socket directory to mount into the container")
] = DEFAULT_POSTGRES_SOCKET_DIR,
detach: Annotated[
bool, typer.Option(help="Start the container in the background")
] = False,
log_level: Annotated[str, typer.Option(help="Log level")] = "INFO",
) -> None:
"""Run BERTopic training or inference inside Docker."""
logging.basicConfig(
level=log_level,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
start_job(
mode=mode,
cache_root=cache_root,
postgres_socket_dir=postgres_socket_dir,
detach=detach,
)
@app.command("stop")
def stop_job_command(
mode: Annotated[
JOBMode, typer.Option(help="Which BERTopic container to stop")
] = "train",
) -> None:
"""Stop and remove the BERTopic container."""
stop_job(mode=mode)
@app.command("logs")
def logs_job_command(
mode: Annotated[
JOBMode, typer.Option(help="Which BERTopic container logs to show")
] = "train",
) -> None:
"""Show recent logs from the BERTopic container."""
output = logs_job(mode=mode)
if output is None:
typer.echo(f"No BERTopic container found for mode={mode}.")
raise typer.Exit(code=1)
typer.echo(output)
def cli() -> None:
"""Typer entry point."""
app()
if __name__ == "__main__":
cli()
@@ -0,0 +1,38 @@
FROM python:3.12-bookworm
ENV DEBIAN_FRONTEND=noninteractive
ENV PYTHONUNBUFFERED=1
ENV PIP_NO_CACHE_DIR=1
RUN apt-get update && apt-get install -y \
build-essential \
gcc \
g++ \
git \
libgomp1 \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY pipelines ./pipelines
RUN python -m pip install --upgrade pip setuptools wheel && \
python -m pip install \
torch \
--index-url https://download.pytorch.org/whl/cpu && \
python -m pip install \
typer \
sqlalchemy \
bertopic \
sentence-transformers \
scikit-learn \
pandas \
numpy \
"psycopg[binary]"
ENV HF_HOME=/cache/huggingface
ENV TRANSFORMERS_CACHE=/cache/huggingface
ENTRYPOINT ["python", "-m"]
CMD ["pipelines.bert_topic.train"]
@@ -0,0 +1,11 @@
FROM ghcr.io/unslothai/unsloth:latest
RUN pip install --no-cache-dir typer
WORKDIR /workspace
COPY python/prompt_bench/finetune.py python/prompt_bench/finetune.py
COPY config/prompts/summarization_prompts.toml config/prompts/summarization_prompts.toml
COPY python/prompt_bench/__init__.py python/prompt_bench/__init__.py
COPY python/__init__.py python/__init__.py
ENTRYPOINT ["python", "-m", "pipelines.prompt_bench.finetune"]
+179
View File
@@ -0,0 +1,179 @@
"""Docker container lifecycle management for Unsloth fine-tuning."""
from __future__ import annotations
import logging
import subprocess
from pathlib import Path
from typing import Annotated
import typer
from pipelines.pipelines.containers.lib import check_gpu_free
logger = logging.getLogger(__name__)
CONTAINER_NAME = "bill-finetune"
FINETUNE_IMAGE = "bill-finetune:latest"
REPO_DIR = Path(__file__).resolve().parents[4]
DEFAULT_HF_CACHE = Path("/zfs/models/hf")
def build_image() -> None:
"""Build the fine-tuning Docker image."""
logger.info("Building fine-tuning image: %s", FINETUNE_IMAGE)
result = subprocess.run(
[
"docker",
"build",
"-f",
str(REPO_DIR / "pipelines/containers/docker_files/Dockerfile.finetune"),
"-t",
FINETUNE_IMAGE,
".",
],
text=True,
check=False,
)
if result.returncode != 0:
message = "Failed to build fine-tuning image"
raise RuntimeError(message)
logger.info("Image built: %s", FINETUNE_IMAGE)
def start_finetune(
*,
dataset_path: Path,
output_dir: Path,
hf_cache: Path = DEFAULT_HF_CACHE,
) -> None:
"""Run the fine-tuning container.
Args:
dataset_path: Host path to the fine-tuning JSONL dataset.
output_dir: Host path where the trained model will be saved.
hf_cache: Host path to HuggingFace model cache (bind-mounted to avoid re-downloading).
validation_split: Fraction of data held out for validation.
"""
dataset_path = dataset_path.resolve()
output_dir = output_dir.resolve()
if not dataset_path.is_file():
message = f"Dataset not found: {dataset_path}"
raise FileNotFoundError(message)
output_dir.mkdir(parents=True, exist_ok=True)
stop_finetune()
hf_cache = hf_cache.resolve()
hf_cache.mkdir(parents=True, exist_ok=True)
command = [
"docker",
"run",
"--name",
CONTAINER_NAME,
"--device=nvidia.com/gpu=all",
"--ipc=host",
"-v",
f"{hf_cache}:/root/.cache/huggingface",
"-v",
f"{output_dir}:/workspace/output/qwen-bill-summarizer",
"-v",
f"{dataset_path}:/workspace/dataset.jsonl:ro",
FINETUNE_IMAGE,
"--dataset",
"/workspace/dataset.jsonl",
"--output-dir",
"/workspace/output/qwen-bill-summarizer",
]
logger.info("Starting fine-tuning container")
logger.info(" Dataset: %s", dataset_path)
logger.info(" Output: %s", output_dir)
result = subprocess.run(command, text=True, check=False)
if result.returncode != 0:
message = f"Fine-tuning container exited with code {result.returncode}"
raise RuntimeError(message)
logger.info("Fine-tuning complete. Model saved to %s", output_dir)
def stop_finetune() -> None:
"""Stop and remove the fine-tuning container."""
logger.info("Stopping fine-tuning container")
subprocess.run(["docker", "stop", CONTAINER_NAME], capture_output=True, check=False)
subprocess.run(
["docker", "rm", "-f", CONTAINER_NAME], capture_output=True, check=False
)
def logs_finetune() -> str | None:
"""Return recent logs from the fine-tuning container, or None if not running."""
result = subprocess.run(
["docker", "logs", "--tail", "50", CONTAINER_NAME],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
return None
return result.stdout + result.stderr
app = typer.Typer(help="Fine-tuning container management.")
@app.command()
def build() -> None:
"""Build the fine-tuning Docker image."""
build_image()
@app.command()
def run(
dataset: Annotated[Path, typer.Option(help="Fine-tuning JSONL")] = REPO_DIR
/ "data/finetune_dataset.jsonl",
output_dir: Annotated[
Path, typer.Option(help="Where to save the trained model")
] = REPO_DIR / "data/output/qwen-bill-summarizer",
hf_cache: Annotated[
Path, typer.Option(help="Host path to HuggingFace model cache")
] = DEFAULT_HF_CACHE,
log_level: Annotated[str, typer.Option(help="Log level")] = "INFO",
) -> None:
"""Run fine-tuning inside a Docker container."""
logging.basicConfig(
level=log_level, format="%(asctime)s %(levelname)s %(name)s: %(message)s"
)
check_gpu_free()
start_finetune(
dataset_path=dataset,
output_dir=output_dir,
hf_cache=hf_cache,
)
@app.command()
def stop() -> None:
"""Stop and remove the fine-tuning container."""
stop_finetune()
@app.command()
def logs() -> None:
"""Show recent logs from the fine-tuning container."""
output = logs_finetune()
if output is None:
typer.echo("No running fine-tuning container found.")
raise typer.Exit(code=1)
typer.echo(output)
def cli() -> None:
"""Typer entry point."""
app()
if __name__ == "__main__":
cli()
+23
View File
@@ -0,0 +1,23 @@
from __future__ import annotations
import logging
import subprocess
logger = logging.getLogger(__name__)
def check_gpu_free() -> None:
"""Warn if GPU-heavy processes (e.g. Ollama) are running."""
result = subprocess.run(
["nvidia-smi", "--query-compute-apps=pid,process_name", "--format=csv,noheader"],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
logger.warning("Could not query GPU processes: %s", result.stderr.strip())
return
processes = result.stdout.strip()
if processes:
logger.warning("GPU processes detected:\n%s", processes)
logger.warning("Consider stopping Ollama (sudo systemctl stop ollama) before benchmarking")
+70
View File
@@ -0,0 +1,70 @@
"""Docker container lifecycle management for vLLM."""
from __future__ import annotations
import logging
import subprocess
logger = logging.getLogger(__name__)
CONTAINER_NAME = "vllm-bench"
VLLM_IMAGE = "vllm/vllm-openai:v0.19.0"
def start_vllm(
*,
model: str,
port: int,
model_dir: str,
gpu_memory_utilization: float,
) -> None:
"""Start a vLLM container serving the given model.
Args:
model: HuggingFace model directory name (relative to model_dir).
port: Host port to bind.
model_dir: Host path containing HuggingFace model directories.
gpu_memory_utilization: Fraction of GPU memory to use (0-1).
"""
command = [
"docker",
"run",
"-d",
"--name",
CONTAINER_NAME,
"--device=nvidia.com/gpu=all",
"--ipc=host",
"-v",
f"{model_dir}:/models",
"-p",
f"{port}:8000",
VLLM_IMAGE,
"--model",
f"/models/{model}",
"--served-model-name",
model,
"--gpu-memory-utilization",
str(gpu_memory_utilization),
"--max-model-len",
"4096",
]
logger.info("Starting vLLM container with model: %s", model)
stop_vllm()
result = subprocess.run(command, capture_output=True, text=True, check=False)
if result.returncode != 0:
msg = f"Failed to start vLLM container: {result.stderr.strip()}"
raise RuntimeError(msg)
logger.info("vLLM container started: %s", result.stdout.strip()[:12])
def stop_vllm() -> None:
"""Stop and remove the vLLM benchmark container."""
logger.info("Stopping vLLM container")
subprocess.run(["docker", "stop", CONTAINER_NAME], capture_output=True, check=False)
subprocess.run(["docker", "rm", "-f", CONTAINER_NAME], capture_output=True, check=False)
subprocess.run(
["docker", "network", "disconnect", "-f", "bridge", CONTAINER_NAME],
capture_output=True,
check=False,
)
logger.info("vLLM container stopped and removed")