Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 2038a90b3c |
@@ -0,0 +1,116 @@
|
|||||||
|
"""Nornsight — BERTopic POC Inference Script.
|
||||||
|
|
||||||
|
Loads the trained model and labels a small batch of posts,
|
||||||
|
writing results to main.post_topic for inspection.
|
||||||
|
|
||||||
|
POC: processes a single batch of 1k posts to validate the pipeline end-to-end.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from collections import Counter
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from bertopic import BERTopic
|
||||||
|
from sqlalchemy import Engine, func, insert, select
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
|
from pipelines.config import BertTopicInferConfig, get_bertopic_infer_config
|
||||||
|
from pipelines.orm.common import get_postgres_engine
|
||||||
|
from pipelines.orm.data_science_dev.posts import PostTopic, Posts
|
||||||
|
from pipelines.orm.data_science_dev.posts.lang_filters import ENGLISH_LANGS
|
||||||
|
from pipelines.pipelines.common import configure_logger
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
"""Run BERTopic inference against a sample of posts."""
|
||||||
|
configure_logger()
|
||||||
|
|
||||||
|
config = get_bertopic_infer_config()
|
||||||
|
run_inference(config)
|
||||||
|
logger.info(
|
||||||
|
"POC inference complete. Check main.post_topic in DBeaver to inspect results."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def run_inference(config: BertTopicInferConfig) -> None:
|
||||||
|
model_save_path = Path(config.model_save_path)
|
||||||
|
|
||||||
|
logger.info(f"Loading BERTopic model from {model_save_path}")
|
||||||
|
topic_model = BERTopic.load(str(model_save_path))
|
||||||
|
|
||||||
|
topic_info = topic_model.get_topic_info()
|
||||||
|
label_map: dict[int, str] = dict(zip(topic_info["Topic"], topic_info["Name"]))
|
||||||
|
logger.info(f"Model loaded with {len(label_map)} topics")
|
||||||
|
|
||||||
|
engine = get_postgres_engine(name="DATA_SCIENCE_DEV")
|
||||||
|
|
||||||
|
post_ids, texts = get_post_ids_and_test(engine, config)
|
||||||
|
|
||||||
|
logger.info(f"Fetched {len(texts)} posts")
|
||||||
|
|
||||||
|
logger.info("Running BERTopic transform")
|
||||||
|
start = time.perf_counter()
|
||||||
|
topics, _probabilities = topic_model.transform(texts)
|
||||||
|
elapsed = time.perf_counter() - start
|
||||||
|
logger.info(f"Transform complete in {elapsed:.1f}s")
|
||||||
|
|
||||||
|
# Write results to main.post_topic
|
||||||
|
records = [
|
||||||
|
{
|
||||||
|
"post_id": pid,
|
||||||
|
"topic_id": int(topic_id),
|
||||||
|
"topic_label": label_map.get(int(topic_id), "unknown"),
|
||||||
|
"model_version": config.model_version,
|
||||||
|
}
|
||||||
|
for pid, topic_id in zip(post_ids, topics)
|
||||||
|
]
|
||||||
|
with Session(engine) as session:
|
||||||
|
session.execute(insert(PostTopic), records)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
count_topics(records)
|
||||||
|
logger.info(f"Wrote {len(records)} topic labels to main.post_topic")
|
||||||
|
|
||||||
|
|
||||||
|
def get_post_ids_and_test(
|
||||||
|
engine: Engine,
|
||||||
|
config: BertTopicInferConfig,
|
||||||
|
) -> None | tuple[list[int], list[str]]:
|
||||||
|
with Session(engine) as session:
|
||||||
|
logger.info(f"Fetching {config.poc_batch_size} posts for inference")
|
||||||
|
# Pull a fresh batch for inference — distinct from training sample
|
||||||
|
# using a fixed seed offset so we're not re-labeling training posts
|
||||||
|
stmt = select(Posts).where(
|
||||||
|
Posts.text.is_not(None),
|
||||||
|
Posts.langs.in_(ENGLISH_LANGS),
|
||||||
|
func.length(Posts.text) > config.min_text_length,
|
||||||
|
)
|
||||||
|
if config.poc_batch_size > 0:
|
||||||
|
stmt = stmt.limit(config.poc_batch_size)
|
||||||
|
|
||||||
|
posts = session.scalars(stmt).all()
|
||||||
|
if not posts:
|
||||||
|
logger.warning("No posts were selected for inference")
|
||||||
|
return [], []
|
||||||
|
|
||||||
|
post_ids = [post.post_id for post in posts]
|
||||||
|
texts = [post.text.strip() for post in posts]
|
||||||
|
|
||||||
|
return post_ids, texts
|
||||||
|
|
||||||
|
|
||||||
|
def count_topics(records: list[dict]) -> None:
|
||||||
|
topic_counts = Counter(record.get("topic_label", "unknown") for record in records)
|
||||||
|
|
||||||
|
logger.info("Topic distribution in this batch:")
|
||||||
|
for label, count in topic_counts.most_common(10):
|
||||||
|
logger.info(" %s: %d", label, count)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -0,0 +1,119 @@
|
|||||||
|
"""Nornsight — BERTopic POC Training Script.
|
||||||
|
|
||||||
|
Pulls a small stratified sample (~11.5k posts) from main.posts,
|
||||||
|
trains BERTopic with MiniBatchKMeans on Jeeves, and saves the model locally.
|
||||||
|
|
||||||
|
POC sample rate: random() < 0.00005 (~0.005% of 230M = ~11.5k posts)
|
||||||
|
Full training rate will be: random() < 0.005 (~1.08M posts)
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from bertopic import BERTopic
|
||||||
|
from sklearn.cluster import MiniBatchKMeans
|
||||||
|
from sqlalchemy import func, select
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
|
from pipelines.config import BertTopicTrainConfig, get_bertopic_train_config
|
||||||
|
from pipelines.orm.common import get_postgres_engine
|
||||||
|
from pipelines.orm.data_science_dev.posts import Posts
|
||||||
|
from pipelines.orm.data_science_dev.posts.lang_filters import ENGLISH_LANGS
|
||||||
|
from pipelines.pipelines.common import configure_logger
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
"""Train and persist the BERTopic model."""
|
||||||
|
configure_logger()
|
||||||
|
|
||||||
|
config = get_bertopic_train_config()
|
||||||
|
docs = load_sample(config)
|
||||||
|
if not docs:
|
||||||
|
logger.warning("No training documents were selected")
|
||||||
|
return
|
||||||
|
|
||||||
|
train(docs, config)
|
||||||
|
logger.info(f"Done. Model saved as version {config.model_version}")
|
||||||
|
logger.info("Next: run infer.py to label a sample of posts in the database")
|
||||||
|
|
||||||
|
|
||||||
|
def load_sample(config: BertTopicTrainConfig) -> list[str]:
|
||||||
|
logger.info("Connecting to PostgreSQL via SQLAlchemy")
|
||||||
|
engine = get_postgres_engine(name="DATA_SCIENCE_DEV")
|
||||||
|
|
||||||
|
logger.info(f"Pulling sample from main.posts (sample_rate={config.sample_rate})")
|
||||||
|
start = time.perf_counter()
|
||||||
|
|
||||||
|
with Session(engine) as session:
|
||||||
|
texts = session.scalars(
|
||||||
|
select(Posts.text).where(
|
||||||
|
Posts.text.is_not(None),
|
||||||
|
Posts.langs.in_(ENGLISH_LANGS),
|
||||||
|
func.length(Posts.text) > config.min_text_length,
|
||||||
|
func.random() < config.sample_rate,
|
||||||
|
)
|
||||||
|
).all()
|
||||||
|
|
||||||
|
elapsed = time.perf_counter() - start
|
||||||
|
logger.info(f"Fetched {len(texts)} rows in {elapsed:.1f}s")
|
||||||
|
|
||||||
|
# Basic cleaning — strip whitespace and deduplicate
|
||||||
|
docs = list({text.strip() for text in texts})
|
||||||
|
logger.info(f"After cleaning and dedup: {len(docs)} posts")
|
||||||
|
|
||||||
|
return docs
|
||||||
|
|
||||||
|
|
||||||
|
def train(docs: list[str], config: BertTopicTrainConfig) -> None:
|
||||||
|
logger.info(
|
||||||
|
f"Initialising BERTopic with MiniBatchKMeans (n_topics={config.n_topics})"
|
||||||
|
)
|
||||||
|
|
||||||
|
cluster_model = MiniBatchKMeans(
|
||||||
|
n_clusters=config.n_topics,
|
||||||
|
random_state=42,
|
||||||
|
batch_size=1024,
|
||||||
|
n_init=3,
|
||||||
|
verbose=1,
|
||||||
|
)
|
||||||
|
|
||||||
|
topic_model = BERTopic(
|
||||||
|
hdbscan_model=cluster_model,
|
||||||
|
language="english",
|
||||||
|
calculate_probabilities=False, # saves memory
|
||||||
|
verbose=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(f"Starting fit_transform on {len(docs)} posts (CPU)")
|
||||||
|
start = time.perf_counter()
|
||||||
|
|
||||||
|
topic_model.fit_transform(docs)
|
||||||
|
|
||||||
|
elapsed = time.perf_counter() - start
|
||||||
|
logger.info(f"Training complete in {elapsed:.1f}s ({elapsed / 60:.1f} min)")
|
||||||
|
|
||||||
|
# Log topic summary for quick inspection
|
||||||
|
topic_info = topic_model.get_topic_info()
|
||||||
|
logger.info(f"Topics found: {len(topic_info)}")
|
||||||
|
logger.info(f"\n{topic_info.to_string()}")
|
||||||
|
|
||||||
|
model_save_path = Path(config.model_save_path)
|
||||||
|
model_save_path.mkdir(parents=True, exist_ok=True)
|
||||||
|
logger.info(f"Saving model to {model_save_path}")
|
||||||
|
|
||||||
|
topic_model.save(
|
||||||
|
str(model_save_path),
|
||||||
|
serialization="safetensors",
|
||||||
|
save_ctfidf=True,
|
||||||
|
save_embedding_model=True,
|
||||||
|
)
|
||||||
|
logger.info("Model saved")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
from datetime import date
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import tomllib
|
import tomllib
|
||||||
|
|
||||||
@@ -68,10 +69,50 @@ class BenchmarkConfig:
|
|||||||
return cls(**raw)
|
return cls(**raw)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class BertTopicTrainConfig:
|
||||||
|
"""BERTopic training configuration loaded from TOML."""
|
||||||
|
|
||||||
|
sample_rate: float
|
||||||
|
min_text_length: int
|
||||||
|
n_topics: int
|
||||||
|
model_save_path: str
|
||||||
|
model_version: str | None = None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_toml(cls, config_path: Path) -> BertTopicTrainConfig:
|
||||||
|
"""Load BERTopic training config from a TOML file."""
|
||||||
|
raw = tomllib.loads(config_path.read_text())["bertopic"]["train"]
|
||||||
|
|
||||||
|
today = date.today().isoformat()
|
||||||
|
if raw.get("model_version") is None:
|
||||||
|
raw["model_version"] = (
|
||||||
|
f"{today}-{raw['sample_rate']}-{raw['min_text_length']}-{raw['n_topics']}"
|
||||||
|
)
|
||||||
|
return cls(**raw)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class BertTopicInferConfig:
|
||||||
|
"""BERTopic inference configuration loaded from TOML."""
|
||||||
|
|
||||||
|
min_text_length: int
|
||||||
|
poc_batch_size: int
|
||||||
|
model_version: str
|
||||||
|
model_save_path: str
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_toml(cls, config_path: Path) -> BertTopicInferConfig:
|
||||||
|
"""Load BERTopic inference config from a TOML file."""
|
||||||
|
raw = tomllib.loads(config_path.read_text())["bertopic"]["infer"]
|
||||||
|
return cls(**raw)
|
||||||
|
|
||||||
|
|
||||||
def get_config_dir() -> Path:
|
def get_config_dir() -> Path:
|
||||||
"""Get the path to the config file."""
|
"""Get the path to the config file."""
|
||||||
return Path(__file__).resolve().parent.parent.parent / "config"
|
return Path(__file__).resolve().parent.parent.parent / "config"
|
||||||
|
|
||||||
|
|
||||||
def default_config_path() -> Path:
|
def default_config_path() -> Path:
|
||||||
"""Get the path to the config file."""
|
"""Get the path to the config file."""
|
||||||
return get_config_dir() / "config.toml"
|
return get_config_dir() / "config.toml"
|
||||||
@@ -87,3 +128,19 @@ def get_benchmark_config(config_path: Path | None = None) -> BenchmarkConfig:
|
|||||||
if config_path is None:
|
if config_path is None:
|
||||||
config_path = default_config_path()
|
config_path = default_config_path()
|
||||||
return BenchmarkConfig.from_toml(config_path)
|
return BenchmarkConfig.from_toml(config_path)
|
||||||
|
|
||||||
|
|
||||||
|
def get_bertopic_train_config(
|
||||||
|
config_path: Path | None = None,
|
||||||
|
) -> BertTopicTrainConfig:
|
||||||
|
if config_path is None:
|
||||||
|
config_path = default_config_path()
|
||||||
|
return BertTopicTrainConfig.from_toml(config_path)
|
||||||
|
|
||||||
|
|
||||||
|
def get_bertopic_infer_config(
|
||||||
|
config_path: Path | None = None,
|
||||||
|
) -> BertTopicInferConfig:
|
||||||
|
if config_path is None:
|
||||||
|
config_path = default_config_path()
|
||||||
|
return BertTopicInferConfig.from_toml(config_path)
|
||||||
@@ -1,235 +0,0 @@
|
|||||||
"""Docker container lifecycle management for BERTopic jobs on Jeeves."""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import subprocess
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import Annotated, Literal
|
|
||||||
|
|
||||||
import typer
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
JOBMode = Literal["train", "infer"]
|
|
||||||
IMAGE_NAME = "bert-topic:latest"
|
|
||||||
REPO_DIR = Path(__file__).resolve().parents[3]
|
|
||||||
DEFAULT_CACHE_ROOT = Path("/zfs/storage/main/ds_thing/models/bert_topic")
|
|
||||||
DEFAULT_POSTGRES_SOCKET_DIR = Path("/run/postgresql")
|
|
||||||
DB_ENV_VARS = (
|
|
||||||
"DATA_SCIENCE_DEV_DB",
|
|
||||||
"DATA_SCIENCE_DEV_HOST",
|
|
||||||
"DATA_SCIENCE_DEV_PORT",
|
|
||||||
"DATA_SCIENCE_DEV_USER",
|
|
||||||
"DATA_SCIENCE_DEV_PASSWORD",
|
|
||||||
)
|
|
||||||
|
|
||||||
app = typer.Typer(help="BERTopic container management.")
|
|
||||||
|
|
||||||
|
|
||||||
def _container_name(mode: JOBMode) -> str:
|
|
||||||
"""Return the Docker container name for the selected BERTopic job."""
|
|
||||||
return f"bert-topic-{mode}"
|
|
||||||
|
|
||||||
|
|
||||||
def _module_name(mode: JOBMode) -> str:
|
|
||||||
"""Return the Python module to run inside the container."""
|
|
||||||
return f"pipelines.bert_topic.{mode}"
|
|
||||||
|
|
||||||
|
|
||||||
def _env_args(*, use_postgres_socket: bool) -> list[str]:
|
|
||||||
"""Pass through database environment variables from the host shell."""
|
|
||||||
required = [
|
|
||||||
"DATA_SCIENCE_DEV_DB",
|
|
||||||
"DATA_SCIENCE_DEV_PORT",
|
|
||||||
"DATA_SCIENCE_DEV_USER",
|
|
||||||
]
|
|
||||||
if not use_postgres_socket:
|
|
||||||
required.append("DATA_SCIENCE_DEV_HOST")
|
|
||||||
missing = [name for name in required if not os.getenv(name)]
|
|
||||||
if missing:
|
|
||||||
message = "Missing required database environment variables: " + ", ".join(
|
|
||||||
missing
|
|
||||||
)
|
|
||||||
raise RuntimeError(message)
|
|
||||||
args: list[str] = []
|
|
||||||
if use_postgres_socket:
|
|
||||||
args.extend(["-e", f"DATA_SCIENCE_DEV_HOST={DEFAULT_POSTGRES_SOCKET_DIR}"])
|
|
||||||
for name in DB_ENV_VARS:
|
|
||||||
if use_postgres_socket and name == "DATA_SCIENCE_DEV_HOST":
|
|
||||||
continue
|
|
||||||
if os.getenv(name):
|
|
||||||
args.extend(["-e", name])
|
|
||||||
return args
|
|
||||||
|
|
||||||
|
|
||||||
def build_image() -> None:
|
|
||||||
"""Build the BERTopic Docker image."""
|
|
||||||
dockerfile = REPO_DIR / "pipelines/containers/docker_files/Dockerfile.bert_topic"
|
|
||||||
logger.info("Building BERTopic image: %s", IMAGE_NAME)
|
|
||||||
result = subprocess.run(
|
|
||||||
[
|
|
||||||
"docker",
|
|
||||||
"build",
|
|
||||||
"--network",
|
|
||||||
"host",
|
|
||||||
"-f",
|
|
||||||
str(dockerfile),
|
|
||||||
"-t",
|
|
||||||
IMAGE_NAME,
|
|
||||||
str(REPO_DIR),
|
|
||||||
],
|
|
||||||
capture_output=True,
|
|
||||||
text=True,
|
|
||||||
check=False,
|
|
||||||
)
|
|
||||||
if result.returncode != 0:
|
|
||||||
message = (
|
|
||||||
"Failed to build BERTopic image. "
|
|
||||||
f"docker build stderr:\n{result.stderr.strip()}"
|
|
||||||
)
|
|
||||||
raise RuntimeError(message)
|
|
||||||
logger.info("Image built: %s", IMAGE_NAME)
|
|
||||||
|
|
||||||
|
|
||||||
def stop_job(*, mode: JOBMode) -> None:
|
|
||||||
"""Stop and remove the BERTopic container for the selected mode."""
|
|
||||||
container_name = _container_name(mode)
|
|
||||||
logger.info("Stopping BERTopic container: %s", container_name)
|
|
||||||
subprocess.run(["docker", "stop", container_name], capture_output=True, check=False)
|
|
||||||
subprocess.run(
|
|
||||||
["docker", "rm", "-f", container_name], capture_output=True, check=False
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def start_job(
|
|
||||||
*,
|
|
||||||
mode: JOBMode,
|
|
||||||
cache_root: Path = DEFAULT_CACHE_ROOT,
|
|
||||||
postgres_socket_dir: Path = DEFAULT_POSTGRES_SOCKET_DIR,
|
|
||||||
detach: bool = False,
|
|
||||||
) -> None:
|
|
||||||
"""Run BERTopic training or inference in Docker on Jeeves."""
|
|
||||||
cache_root = cache_root.resolve()
|
|
||||||
cache_root.mkdir(parents=True, exist_ok=True)
|
|
||||||
postgres_socket_dir = postgres_socket_dir.resolve()
|
|
||||||
stop_job(mode=mode)
|
|
||||||
use_postgres_socket = postgres_socket_dir.exists()
|
|
||||||
|
|
||||||
command = [
|
|
||||||
"docker",
|
|
||||||
"run",
|
|
||||||
"--name",
|
|
||||||
_container_name(mode),
|
|
||||||
"--ipc=host",
|
|
||||||
"-v",
|
|
||||||
f"{cache_root}:/cache",
|
|
||||||
*_env_args(use_postgres_socket=use_postgres_socket),
|
|
||||||
IMAGE_NAME,
|
|
||||||
_module_name(mode),
|
|
||||||
]
|
|
||||||
if use_postgres_socket:
|
|
||||||
command[7:7] = ["-v", f"{postgres_socket_dir}:{DEFAULT_POSTGRES_SOCKET_DIR}"]
|
|
||||||
if detach:
|
|
||||||
command.insert(2, "-d")
|
|
||||||
|
|
||||||
logger.info("Starting BERTopic %s container", mode)
|
|
||||||
logger.info(" Cache root: %s", cache_root)
|
|
||||||
if use_postgres_socket:
|
|
||||||
logger.info(" Postgres socket: %s", postgres_socket_dir)
|
|
||||||
result = subprocess.run(command, text=True, capture_output=detach, check=False)
|
|
||||||
if result.returncode != 0:
|
|
||||||
detail = (
|
|
||||||
result.stderr.strip() if result.stderr else f"exit code {result.returncode}"
|
|
||||||
)
|
|
||||||
raise RuntimeError(f"BERTopic container failed to start: {detail}")
|
|
||||||
if detach:
|
|
||||||
logger.info("Container started: %s", result.stdout.strip()[:12])
|
|
||||||
else:
|
|
||||||
logger.info("BERTopic %s run complete", mode)
|
|
||||||
|
|
||||||
|
|
||||||
def logs_job(*, mode: JOBMode) -> str | None:
|
|
||||||
"""Return recent logs from the BERTopic container, or None if absent."""
|
|
||||||
result = subprocess.run(
|
|
||||||
["docker", "logs", "--tail", "100", _container_name(mode)],
|
|
||||||
capture_output=True,
|
|
||||||
text=True,
|
|
||||||
check=False,
|
|
||||||
)
|
|
||||||
if result.returncode != 0:
|
|
||||||
return None
|
|
||||||
return result.stdout + result.stderr
|
|
||||||
|
|
||||||
|
|
||||||
@app.command()
|
|
||||||
def build(
|
|
||||||
log_level: Annotated[str, typer.Option(help="Log level")] = "INFO",
|
|
||||||
) -> None:
|
|
||||||
"""Build the BERTopic Docker image."""
|
|
||||||
logging.basicConfig(
|
|
||||||
level=log_level,
|
|
||||||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
|
||||||
)
|
|
||||||
build_image()
|
|
||||||
|
|
||||||
|
|
||||||
@app.command("run")
|
|
||||||
def run_job_command(
|
|
||||||
mode: Annotated[JOBMode, typer.Option(help="Which BERTopic job to run")] = "train",
|
|
||||||
cache_root: Annotated[
|
|
||||||
Path, typer.Option(help="Host path mounted to /cache for model and HF cache")
|
|
||||||
] = DEFAULT_CACHE_ROOT,
|
|
||||||
postgres_socket_dir: Annotated[
|
|
||||||
Path, typer.Option(help="Host Postgres socket directory to mount into the container")
|
|
||||||
] = DEFAULT_POSTGRES_SOCKET_DIR,
|
|
||||||
detach: Annotated[
|
|
||||||
bool, typer.Option(help="Start the container in the background")
|
|
||||||
] = False,
|
|
||||||
log_level: Annotated[str, typer.Option(help="Log level")] = "INFO",
|
|
||||||
) -> None:
|
|
||||||
"""Run BERTopic training or inference inside Docker."""
|
|
||||||
logging.basicConfig(
|
|
||||||
level=log_level,
|
|
||||||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
|
||||||
)
|
|
||||||
start_job(
|
|
||||||
mode=mode,
|
|
||||||
cache_root=cache_root,
|
|
||||||
postgres_socket_dir=postgres_socket_dir,
|
|
||||||
detach=detach,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@app.command("stop")
|
|
||||||
def stop_job_command(
|
|
||||||
mode: Annotated[
|
|
||||||
JOBMode, typer.Option(help="Which BERTopic container to stop")
|
|
||||||
] = "train",
|
|
||||||
) -> None:
|
|
||||||
"""Stop and remove the BERTopic container."""
|
|
||||||
stop_job(mode=mode)
|
|
||||||
|
|
||||||
|
|
||||||
@app.command("logs")
|
|
||||||
def logs_job_command(
|
|
||||||
mode: Annotated[
|
|
||||||
JOBMode, typer.Option(help="Which BERTopic container logs to show")
|
|
||||||
] = "train",
|
|
||||||
) -> None:
|
|
||||||
"""Show recent logs from the BERTopic container."""
|
|
||||||
output = logs_job(mode=mode)
|
|
||||||
if output is None:
|
|
||||||
typer.echo(f"No BERTopic container found for mode={mode}.")
|
|
||||||
raise typer.Exit(code=1)
|
|
||||||
typer.echo(output)
|
|
||||||
|
|
||||||
|
|
||||||
def cli() -> None:
|
|
||||||
"""Typer entry point."""
|
|
||||||
app()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
cli()
|
|
||||||
@@ -1,38 +0,0 @@
|
|||||||
FROM python:3.12-bookworm
|
|
||||||
|
|
||||||
ENV DEBIAN_FRONTEND=noninteractive
|
|
||||||
ENV PYTHONUNBUFFERED=1
|
|
||||||
ENV PIP_NO_CACHE_DIR=1
|
|
||||||
|
|
||||||
RUN apt-get update && apt-get install -y \
|
|
||||||
build-essential \
|
|
||||||
gcc \
|
|
||||||
g++ \
|
|
||||||
git \
|
|
||||||
libgomp1 \
|
|
||||||
libpq-dev \
|
|
||||||
&& rm -rf /var/lib/apt/lists/*
|
|
||||||
|
|
||||||
WORKDIR /app
|
|
||||||
|
|
||||||
COPY pipelines ./pipelines
|
|
||||||
|
|
||||||
RUN python -m pip install --upgrade pip setuptools wheel && \
|
|
||||||
python -m pip install \
|
|
||||||
torch \
|
|
||||||
--index-url https://download.pytorch.org/whl/cpu && \
|
|
||||||
python -m pip install \
|
|
||||||
typer \
|
|
||||||
sqlalchemy \
|
|
||||||
bertopic \
|
|
||||||
sentence-transformers \
|
|
||||||
scikit-learn \
|
|
||||||
pandas \
|
|
||||||
numpy \
|
|
||||||
"psycopg[binary]"
|
|
||||||
|
|
||||||
ENV HF_HOME=/cache/huggingface
|
|
||||||
ENV TRANSFORMERS_CACHE=/cache/huggingface
|
|
||||||
|
|
||||||
ENTRYPOINT ["python", "-m"]
|
|
||||||
CMD ["pipelines.bert_topic.train"]
|
|
||||||
@@ -1,11 +0,0 @@
|
|||||||
FROM ghcr.io/unslothai/unsloth:latest
|
|
||||||
|
|
||||||
RUN pip install --no-cache-dir typer
|
|
||||||
|
|
||||||
WORKDIR /workspace
|
|
||||||
COPY python/prompt_bench/finetune.py python/prompt_bench/finetune.py
|
|
||||||
COPY config/prompts/summarization_prompts.toml config/prompts/summarization_prompts.toml
|
|
||||||
COPY python/prompt_bench/__init__.py python/prompt_bench/__init__.py
|
|
||||||
COPY python/__init__.py python/__init__.py
|
|
||||||
|
|
||||||
ENTRYPOINT ["python", "-m", "pipelines.prompt_bench.finetune"]
|
|
||||||
@@ -0,0 +1,25 @@
|
|||||||
|
# Unsloth fine-tuning container for Qwen 3.5 4B on RTX 3090.
|
||||||
|
#
|
||||||
|
# Build:
|
||||||
|
# docker build -f python/prompt_bench/Dockerfile.finetune -t bill-finetune .
|
||||||
|
#
|
||||||
|
# Run:
|
||||||
|
# docker run --rm --device=nvidia.com/gpu=all --ipc=host \
|
||||||
|
# -v $(pwd)/output:/workspace/output \
|
||||||
|
# -v $(pwd)/output/finetune_dataset.jsonl:/workspace/dataset.jsonl:ro \
|
||||||
|
# -v /zfs/models/hf:/models \
|
||||||
|
# bill-finetune \
|
||||||
|
# --dataset /workspace/dataset.jsonl \
|
||||||
|
# --output-dir /workspace/output/qwen-bill-summarizer
|
||||||
|
|
||||||
|
FROM ghcr.io/unslothai/unsloth:latest
|
||||||
|
|
||||||
|
RUN pip install --no-cache-dir typer
|
||||||
|
|
||||||
|
WORKDIR /workspace
|
||||||
|
COPY python/prompt_bench/finetune.py python/prompt_bench/finetune.py
|
||||||
|
COPY config/prompts/summarization_prompts.toml config/prompts/summarization_prompts.toml
|
||||||
|
COPY python/prompt_bench/__init__.py python/prompt_bench/__init__.py
|
||||||
|
COPY python/__init__.py python/__init__.py
|
||||||
|
|
||||||
|
ENTRYPOINT ["python", "-m", "pipelines.prompt_bench.finetune"]
|
||||||
@@ -9,7 +9,7 @@ from typing import Annotated
|
|||||||
|
|
||||||
import typer
|
import typer
|
||||||
|
|
||||||
from pipelines.pipelines.containers.lib import check_gpu_free
|
from pipelines.tools.containers.lib import check_gpu_free
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -27,7 +27,7 @@ def build_image() -> None:
|
|||||||
"docker",
|
"docker",
|
||||||
"build",
|
"build",
|
||||||
"-f",
|
"-f",
|
||||||
str(REPO_DIR / "pipelines/containers/docker_files/Dockerfile.finetune"),
|
str(REPO_DIR / "python/prompt_bench/Dockerfile.finetune"),
|
||||||
"-t",
|
"-t",
|
||||||
FINETUNE_IMAGE,
|
FINETUNE_IMAGE,
|
||||||
".",
|
".",
|
||||||
Reference in New Issue
Block a user