1 Commits

Author SHA1 Message Date
Richie 09f7f0187f moved containers dir and created docker_files dir 2026-04-28 22:36:13 -04:00
11 changed files with 286 additions and 319 deletions
-57
View File
@@ -1,7 +1,6 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import date
from pathlib import Path
import tomllib
@@ -69,50 +68,10 @@ class BenchmarkConfig:
return cls(**raw)
@dataclass
class BertTopicTrainConfig:
"""BERTopic training configuration loaded from TOML."""
sample_rate: float
min_text_length: int
n_topics: int
model_save_path: str
model_version: str | None = None
@classmethod
def from_toml(cls, config_path: Path) -> BertTopicTrainConfig:
"""Load BERTopic training config from a TOML file."""
raw = tomllib.loads(config_path.read_text())["bertopic"]["train"]
today = date.today().isoformat()
if raw.get("model_version") is None:
raw["model_version"] = (
f"{today}-{raw['sample_rate']}-{raw['min_text_length']}-{raw['n_topics']}"
)
return cls(**raw)
@dataclass
class BertTopicInferConfig:
"""BERTopic inference configuration loaded from TOML."""
min_text_length: int
poc_batch_size: int
model_version: str
model_save_path: str
@classmethod
def from_toml(cls, config_path: Path) -> BertTopicInferConfig:
"""Load BERTopic inference config from a TOML file."""
raw = tomllib.loads(config_path.read_text())["bertopic"]["infer"]
return cls(**raw)
def get_config_dir() -> Path:
"""Get the path to the config file."""
return Path(__file__).resolve().parent.parent.parent / "config"
def default_config_path() -> Path:
"""Get the path to the config file."""
return get_config_dir() / "config.toml"
@@ -128,19 +87,3 @@ def get_benchmark_config(config_path: Path | None = None) -> BenchmarkConfig:
if config_path is None:
config_path = default_config_path()
return BenchmarkConfig.from_toml(config_path)
def get_bertopic_train_config(
config_path: Path | None = None,
) -> BertTopicTrainConfig:
if config_path is None:
config_path = default_config_path()
return BertTopicTrainConfig.from_toml(config_path)
def get_bertopic_infer_config(
config_path: Path | None = None,
) -> BertTopicInferConfig:
if config_path is None:
config_path = default_config_path()
return BertTopicInferConfig.from_toml(config_path)
-116
View File
@@ -1,116 +0,0 @@
"""Nornsight — BERTopic POC Inference Script.
Loads the trained model and labels a small batch of posts,
writing results to main.post_topic for inspection.
POC: processes a single batch of 1k posts to validate the pipeline end-to-end.
"""
from __future__ import annotations
import logging
import time
from collections import Counter
from pathlib import Path
from bertopic import BERTopic
from sqlalchemy import Engine, func, insert, select
from sqlalchemy.orm import Session
from pipelines.config import BertTopicInferConfig, get_bertopic_infer_config
from pipelines.orm.common import get_postgres_engine
from pipelines.orm.data_science_dev.posts import PostTopic, Posts
from pipelines.orm.data_science_dev.posts.lang_filters import ENGLISH_LANGS
from pipelines.pipelines.common import configure_logger
logger = logging.getLogger(__name__)
def main() -> None:
"""Run BERTopic inference against a sample of posts."""
configure_logger()
config = get_bertopic_infer_config()
run_inference(config)
logger.info(
"POC inference complete. Check main.post_topic in DBeaver to inspect results."
)
def run_inference(config: BertTopicInferConfig) -> None:
model_save_path = Path(config.model_save_path)
logger.info(f"Loading BERTopic model from {model_save_path}")
topic_model = BERTopic.load(str(model_save_path))
topic_info = topic_model.get_topic_info()
label_map: dict[int, str] = dict(zip(topic_info["Topic"], topic_info["Name"]))
logger.info(f"Model loaded with {len(label_map)} topics")
engine = get_postgres_engine(name="DATA_SCIENCE_DEV")
post_ids, texts = get_post_ids_and_test(engine, config)
logger.info(f"Fetched {len(texts)} posts")
logger.info("Running BERTopic transform")
start = time.perf_counter()
topics, _probabilities = topic_model.transform(texts)
elapsed = time.perf_counter() - start
logger.info(f"Transform complete in {elapsed:.1f}s")
# Write results to main.post_topic
records = [
{
"post_id": pid,
"topic_id": int(topic_id),
"topic_label": label_map.get(int(topic_id), "unknown"),
"model_version": config.model_version,
}
for pid, topic_id in zip(post_ids, topics)
]
with Session(engine) as session:
session.execute(insert(PostTopic), records)
session.commit()
count_topics(records)
logger.info(f"Wrote {len(records)} topic labels to main.post_topic")
def get_post_ids_and_test(
engine: Engine,
config: BertTopicInferConfig,
) -> None | tuple[list[int], list[str]]:
with Session(engine) as session:
logger.info(f"Fetching {config.poc_batch_size} posts for inference")
# Pull a fresh batch for inference — distinct from training sample
# using a fixed seed offset so we're not re-labeling training posts
stmt = select(Posts).where(
Posts.text.is_not(None),
Posts.langs.in_(ENGLISH_LANGS),
func.length(Posts.text) > config.min_text_length,
)
if config.poc_batch_size > 0:
stmt = stmt.limit(config.poc_batch_size)
posts = session.scalars(stmt).all()
if not posts:
logger.warning("No posts were selected for inference")
return [], []
post_ids = [post.post_id for post in posts]
texts = [post.text.strip() for post in posts]
return post_ids, texts
def count_topics(records: list[dict]) -> None:
topic_counts = Counter(record.get("topic_label", "unknown") for record in records)
logger.info("Topic distribution in this batch:")
for label, count in topic_counts.most_common(10):
logger.info(" %s: %d", label, count)
if __name__ == "__main__":
main()
-119
View File
@@ -1,119 +0,0 @@
"""Nornsight — BERTopic POC Training Script.
Pulls a small stratified sample (~11.5k posts) from main.posts,
trains BERTopic with MiniBatchKMeans on Jeeves, and saves the model locally.
POC sample rate: random() < 0.00005 (~0.005% of 230M = ~11.5k posts)
Full training rate will be: random() < 0.005 (~1.08M posts)
"""
from __future__ import annotations
import logging
import time
from pathlib import Path
from bertopic import BERTopic
from sklearn.cluster import MiniBatchKMeans
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from pipelines.config import BertTopicTrainConfig, get_bertopic_train_config
from pipelines.orm.common import get_postgres_engine
from pipelines.orm.data_science_dev.posts import Posts
from pipelines.orm.data_science_dev.posts.lang_filters import ENGLISH_LANGS
from pipelines.pipelines.common import configure_logger
logger = logging.getLogger(__name__)
def main() -> None:
"""Train and persist the BERTopic model."""
configure_logger()
config = get_bertopic_train_config()
docs = load_sample(config)
if not docs:
logger.warning("No training documents were selected")
return
train(docs, config)
logger.info(f"Done. Model saved as version {config.model_version}")
logger.info("Next: run infer.py to label a sample of posts in the database")
def load_sample(config: BertTopicTrainConfig) -> list[str]:
logger.info("Connecting to PostgreSQL via SQLAlchemy")
engine = get_postgres_engine(name="DATA_SCIENCE_DEV")
logger.info(f"Pulling sample from main.posts (sample_rate={config.sample_rate})")
start = time.perf_counter()
with Session(engine) as session:
texts = session.scalars(
select(Posts.text).where(
Posts.text.is_not(None),
Posts.langs.in_(ENGLISH_LANGS),
func.length(Posts.text) > config.min_text_length,
func.random() < config.sample_rate,
)
).all()
elapsed = time.perf_counter() - start
logger.info(f"Fetched {len(texts)} rows in {elapsed:.1f}s")
# Basic cleaning — strip whitespace and deduplicate
docs = list({text.strip() for text in texts})
logger.info(f"After cleaning and dedup: {len(docs)} posts")
return docs
def train(docs: list[str], config: BertTopicTrainConfig) -> None:
logger.info(
f"Initialising BERTopic with MiniBatchKMeans (n_topics={config.n_topics})"
)
cluster_model = MiniBatchKMeans(
n_clusters=config.n_topics,
random_state=42,
batch_size=1024,
n_init=3,
verbose=1,
)
topic_model = BERTopic(
hdbscan_model=cluster_model,
language="english",
calculate_probabilities=False, # saves memory
verbose=True,
)
logger.info(f"Starting fit_transform on {len(docs)} posts (CPU)")
start = time.perf_counter()
topic_model.fit_transform(docs)
elapsed = time.perf_counter() - start
logger.info(f"Training complete in {elapsed:.1f}s ({elapsed / 60:.1f} min)")
# Log topic summary for quick inspection
topic_info = topic_model.get_topic_info()
logger.info(f"Topics found: {len(topic_info)}")
logger.info(f"\n{topic_info.to_string()}")
model_save_path = Path(config.model_save_path)
model_save_path.mkdir(parents=True, exist_ok=True)
logger.info(f"Saving model to {model_save_path}")
topic_model.save(
str(model_save_path),
serialization="safetensors",
save_ctfidf=True,
save_embedding_model=True,
)
logger.info("Model saved")
if __name__ == "__main__":
main()
+235
View File
@@ -0,0 +1,235 @@
"""Docker container lifecycle management for BERTopic jobs on Jeeves."""
from __future__ import annotations
import logging
import os
import subprocess
from pathlib import Path
from typing import Annotated, Literal
import typer
logger = logging.getLogger(__name__)
JOBMode = Literal["train", "infer"]
IMAGE_NAME = "bert-topic:latest"
REPO_DIR = Path(__file__).resolve().parents[3]
DEFAULT_CACHE_ROOT = Path("/zfs/storage/main/ds_thing/models/bert_topic")
DEFAULT_POSTGRES_SOCKET_DIR = Path("/run/postgresql")
DB_ENV_VARS = (
"DATA_SCIENCE_DEV_DB",
"DATA_SCIENCE_DEV_HOST",
"DATA_SCIENCE_DEV_PORT",
"DATA_SCIENCE_DEV_USER",
"DATA_SCIENCE_DEV_PASSWORD",
)
app = typer.Typer(help="BERTopic container management.")
def _container_name(mode: JOBMode) -> str:
"""Return the Docker container name for the selected BERTopic job."""
return f"bert-topic-{mode}"
def _module_name(mode: JOBMode) -> str:
"""Return the Python module to run inside the container."""
return f"pipelines.bert_topic.{mode}"
def _env_args(*, use_postgres_socket: bool) -> list[str]:
"""Pass through database environment variables from the host shell."""
required = [
"DATA_SCIENCE_DEV_DB",
"DATA_SCIENCE_DEV_PORT",
"DATA_SCIENCE_DEV_USER",
]
if not use_postgres_socket:
required.append("DATA_SCIENCE_DEV_HOST")
missing = [name for name in required if not os.getenv(name)]
if missing:
message = "Missing required database environment variables: " + ", ".join(
missing
)
raise RuntimeError(message)
args: list[str] = []
if use_postgres_socket:
args.extend(["-e", f"DATA_SCIENCE_DEV_HOST={DEFAULT_POSTGRES_SOCKET_DIR}"])
for name in DB_ENV_VARS:
if use_postgres_socket and name == "DATA_SCIENCE_DEV_HOST":
continue
if os.getenv(name):
args.extend(["-e", name])
return args
def build_image() -> None:
"""Build the BERTopic Docker image."""
dockerfile = REPO_DIR / "pipelines/containers/docker_files/Dockerfile.bert_topic"
logger.info("Building BERTopic image: %s", IMAGE_NAME)
result = subprocess.run(
[
"docker",
"build",
"--network",
"host",
"-f",
str(dockerfile),
"-t",
IMAGE_NAME,
str(REPO_DIR),
],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
message = (
"Failed to build BERTopic image. "
f"docker build stderr:\n{result.stderr.strip()}"
)
raise RuntimeError(message)
logger.info("Image built: %s", IMAGE_NAME)
def stop_job(*, mode: JOBMode) -> None:
"""Stop and remove the BERTopic container for the selected mode."""
container_name = _container_name(mode)
logger.info("Stopping BERTopic container: %s", container_name)
subprocess.run(["docker", "stop", container_name], capture_output=True, check=False)
subprocess.run(
["docker", "rm", "-f", container_name], capture_output=True, check=False
)
def start_job(
*,
mode: JOBMode,
cache_root: Path = DEFAULT_CACHE_ROOT,
postgres_socket_dir: Path = DEFAULT_POSTGRES_SOCKET_DIR,
detach: bool = False,
) -> None:
"""Run BERTopic training or inference in Docker on Jeeves."""
cache_root = cache_root.resolve()
cache_root.mkdir(parents=True, exist_ok=True)
postgres_socket_dir = postgres_socket_dir.resolve()
stop_job(mode=mode)
use_postgres_socket = postgres_socket_dir.exists()
command = [
"docker",
"run",
"--name",
_container_name(mode),
"--ipc=host",
"-v",
f"{cache_root}:/cache",
*_env_args(use_postgres_socket=use_postgres_socket),
IMAGE_NAME,
_module_name(mode),
]
if use_postgres_socket:
command[7:7] = ["-v", f"{postgres_socket_dir}:{DEFAULT_POSTGRES_SOCKET_DIR}"]
if detach:
command.insert(2, "-d")
logger.info("Starting BERTopic %s container", mode)
logger.info(" Cache root: %s", cache_root)
if use_postgres_socket:
logger.info(" Postgres socket: %s", postgres_socket_dir)
result = subprocess.run(command, text=True, capture_output=detach, check=False)
if result.returncode != 0:
detail = (
result.stderr.strip() if result.stderr else f"exit code {result.returncode}"
)
raise RuntimeError(f"BERTopic container failed to start: {detail}")
if detach:
logger.info("Container started: %s", result.stdout.strip()[:12])
else:
logger.info("BERTopic %s run complete", mode)
def logs_job(*, mode: JOBMode) -> str | None:
"""Return recent logs from the BERTopic container, or None if absent."""
result = subprocess.run(
["docker", "logs", "--tail", "100", _container_name(mode)],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
return None
return result.stdout + result.stderr
@app.command()
def build(
log_level: Annotated[str, typer.Option(help="Log level")] = "INFO",
) -> None:
"""Build the BERTopic Docker image."""
logging.basicConfig(
level=log_level,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
build_image()
@app.command("run")
def run_job_command(
mode: Annotated[JOBMode, typer.Option(help="Which BERTopic job to run")] = "train",
cache_root: Annotated[
Path, typer.Option(help="Host path mounted to /cache for model and HF cache")
] = DEFAULT_CACHE_ROOT,
postgres_socket_dir: Annotated[
Path, typer.Option(help="Host Postgres socket directory to mount into the container")
] = DEFAULT_POSTGRES_SOCKET_DIR,
detach: Annotated[
bool, typer.Option(help="Start the container in the background")
] = False,
log_level: Annotated[str, typer.Option(help="Log level")] = "INFO",
) -> None:
"""Run BERTopic training or inference inside Docker."""
logging.basicConfig(
level=log_level,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
start_job(
mode=mode,
cache_root=cache_root,
postgres_socket_dir=postgres_socket_dir,
detach=detach,
)
@app.command("stop")
def stop_job_command(
mode: Annotated[
JOBMode, typer.Option(help="Which BERTopic container to stop")
] = "train",
) -> None:
"""Stop and remove the BERTopic container."""
stop_job(mode=mode)
@app.command("logs")
def logs_job_command(
mode: Annotated[
JOBMode, typer.Option(help="Which BERTopic container logs to show")
] = "train",
) -> None:
"""Show recent logs from the BERTopic container."""
output = logs_job(mode=mode)
if output is None:
typer.echo(f"No BERTopic container found for mode={mode}.")
raise typer.Exit(code=1)
typer.echo(output)
def cli() -> None:
"""Typer entry point."""
app()
if __name__ == "__main__":
cli()
@@ -0,0 +1,38 @@
FROM python:3.12-bookworm
ENV DEBIAN_FRONTEND=noninteractive
ENV PYTHONUNBUFFERED=1
ENV PIP_NO_CACHE_DIR=1
RUN apt-get update && apt-get install -y \
build-essential \
gcc \
g++ \
git \
libgomp1 \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY pipelines ./pipelines
RUN python -m pip install --upgrade pip setuptools wheel && \
python -m pip install \
torch \
--index-url https://download.pytorch.org/whl/cpu && \
python -m pip install \
typer \
sqlalchemy \
bertopic \
sentence-transformers \
scikit-learn \
pandas \
numpy \
"psycopg[binary]"
ENV HF_HOME=/cache/huggingface
ENV TRANSFORMERS_CACHE=/cache/huggingface
ENTRYPOINT ["python", "-m"]
CMD ["pipelines.bert_topic.train"]
@@ -0,0 +1,11 @@
FROM ghcr.io/unslothai/unsloth:latest
RUN pip install --no-cache-dir typer
WORKDIR /workspace
COPY python/prompt_bench/finetune.py python/prompt_bench/finetune.py
COPY config/prompts/summarization_prompts.toml config/prompts/summarization_prompts.toml
COPY python/prompt_bench/__init__.py python/prompt_bench/__init__.py
COPY python/__init__.py python/__init__.py
ENTRYPOINT ["python", "-m", "pipelines.prompt_bench.finetune"]
@@ -9,7 +9,7 @@ from typing import Annotated
import typer
from pipelines.tools.containers.lib import check_gpu_free
from pipelines.pipelines.containers.lib import check_gpu_free
logger = logging.getLogger(__name__)
@@ -27,7 +27,7 @@ def build_image() -> None:
"docker",
"build",
"-f",
str(REPO_DIR / "python/prompt_bench/Dockerfile.finetune"),
str(REPO_DIR / "pipelines/containers/docker_files/Dockerfile.finetune"),
"-t",
FINETUNE_IMAGE,
".",
-25
View File
@@ -1,25 +0,0 @@
# Unsloth fine-tuning container for Qwen 3.5 4B on RTX 3090.
#
# Build:
# docker build -f python/prompt_bench/Dockerfile.finetune -t bill-finetune .
#
# Run:
# docker run --rm --device=nvidia.com/gpu=all --ipc=host \
# -v $(pwd)/output:/workspace/output \
# -v $(pwd)/output/finetune_dataset.jsonl:/workspace/dataset.jsonl:ro \
# -v /zfs/models/hf:/models \
# bill-finetune \
# --dataset /workspace/dataset.jsonl \
# --output-dir /workspace/output/qwen-bill-summarizer
FROM ghcr.io/unslothai/unsloth:latest
RUN pip install --no-cache-dir typer
WORKDIR /workspace
COPY python/prompt_bench/finetune.py python/prompt_bench/finetune.py
COPY config/prompts/summarization_prompts.toml config/prompts/summarization_prompts.toml
COPY python/prompt_bench/__init__.py python/prompt_bench/__init__.py
COPY python/__init__.py python/__init__.py
ENTRYPOINT ["python", "-m", "pipelines.prompt_bench.finetune"]