8 Commits

11 changed files with 3666 additions and 37 deletions
+1 -1
View File
@@ -1 +1 @@
"""Prompt benchmarking system for evaluating LLMs via vLLM."""
"""Init."""
+116
View File
@@ -0,0 +1,116 @@
"""Nornsight — BERTopic POC Inference Script.
Loads the trained model and labels a small batch of posts,
writing results to main.post_topic for inspection.
POC: processes a single batch of 1k posts to validate the pipeline end-to-end.
"""
from __future__ import annotations
import logging
import time
from collections import Counter
from pathlib import Path
from bertopic import BERTopic
from sqlalchemy import Engine, func, insert, select
from sqlalchemy.orm import Session
from pipelines.config import BertTopicInferConfig, get_bertopic_infer_config
from pipelines.orm.common import get_postgres_engine
from pipelines.orm.data_science_dev.posts import PostTopic, Posts
from pipelines.orm.data_science_dev.posts.lang_filters import ENGLISH_LANGS
from pipelines.pipelines.common import configure_logger
logger = logging.getLogger(__name__)
def main() -> None:
"""Run BERTopic inference against a sample of posts."""
configure_logger()
config = get_bertopic_infer_config()
run_inference(config)
logger.info(
"POC inference complete. Check main.post_topic in DBeaver to inspect results."
)
def run_inference(config: BertTopicInferConfig) -> None:
model_save_path = Path(config.model_save_path)
logger.info(f"Loading BERTopic model from {model_save_path}")
topic_model = BERTopic.load(str(model_save_path))
topic_info = topic_model.get_topic_info()
label_map: dict[int, str] = dict(zip(topic_info["Topic"], topic_info["Name"]))
logger.info(f"Model loaded with {len(label_map)} topics")
engine = get_postgres_engine(name="DATA_SCIENCE_DEV")
post_ids, texts = get_post_ids_and_test(engine, config)
logger.info(f"Fetched {len(texts)} posts")
logger.info("Running BERTopic transform")
start = time.perf_counter()
topics, _probabilities = topic_model.transform(texts)
elapsed = time.perf_counter() - start
logger.info(f"Transform complete in {elapsed:.1f}s")
# Write results to main.post_topic
records = [
{
"post_id": pid,
"topic_id": int(topic_id),
"topic_label": label_map.get(int(topic_id), "unknown"),
"model_version": config.model_version,
}
for pid, topic_id in zip(post_ids, topics)
]
with Session(engine) as session:
session.execute(insert(PostTopic), records)
session.commit()
count_topics(records)
logger.info(f"Wrote {len(records)} topic labels to main.post_topic")
def get_post_ids_and_test(
engine: Engine,
config: BertTopicInferConfig,
) -> None | tuple[list[int], list[str]]:
with Session(engine) as session:
logger.info(f"Fetching {config.poc_batch_size} posts for inference")
# Pull a fresh batch for inference — distinct from training sample
# using a fixed seed offset so we're not re-labeling training posts
stmt = select(Posts).where(
Posts.text.is_not(None),
Posts.langs.in_(ENGLISH_LANGS),
func.length(Posts.text) > config.min_text_length,
)
if config.poc_batch_size > 0:
stmt = stmt.limit(config.poc_batch_size)
posts = session.scalars(stmt).all()
if not posts:
logger.warning("No posts were selected for inference")
return [], []
post_ids = [post.post_id for post in posts]
texts = [post.text.strip() for post in posts]
return post_ids, texts
def count_topics(records: list[dict]) -> None:
topic_counts = Counter(record.get("topic_label", "unknown") for record in records)
logger.info("Topic distribution in this batch:")
for label, count in topic_counts.most_common(10):
logger.info(" %s: %d", label, count)
if __name__ == "__main__":
main()
+119
View File
@@ -0,0 +1,119 @@
"""Nornsight — BERTopic POC Training Script.
Pulls a small stratified sample (~11.5k posts) from main.posts,
trains BERTopic with MiniBatchKMeans on Jeeves, and saves the model locally.
POC sample rate: random() < 0.00005 (~0.005% of 230M = ~11.5k posts)
Full training rate will be: random() < 0.005 (~1.08M posts)
"""
from __future__ import annotations
import logging
import time
from pathlib import Path
from bertopic import BERTopic
from sklearn.cluster import MiniBatchKMeans
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from pipelines.config import BertTopicTrainConfig, get_bertopic_train_config
from pipelines.orm.common import get_postgres_engine
from pipelines.orm.data_science_dev.posts import Posts
from pipelines.orm.data_science_dev.posts.lang_filters import ENGLISH_LANGS
from pipelines.pipelines.common import configure_logger
logger = logging.getLogger(__name__)
def main() -> None:
"""Train and persist the BERTopic model."""
configure_logger()
config = get_bertopic_train_config()
docs = load_sample(config)
if not docs:
logger.warning("No training documents were selected")
return
train(docs, config)
logger.info(f"Done. Model saved as version {config.model_version}")
logger.info("Next: run infer.py to label a sample of posts in the database")
def load_sample(config: BertTopicTrainConfig) -> list[str]:
logger.info("Connecting to PostgreSQL via SQLAlchemy")
engine = get_postgres_engine(name="DATA_SCIENCE_DEV")
logger.info(f"Pulling sample from main.posts (sample_rate={config.sample_rate})")
start = time.perf_counter()
with Session(engine) as session:
texts = session.scalars(
select(Posts.text).where(
Posts.text.is_not(None),
Posts.langs.in_(ENGLISH_LANGS),
func.length(Posts.text) > config.min_text_length,
func.random() < config.sample_rate,
)
).all()
elapsed = time.perf_counter() - start
logger.info(f"Fetched {len(texts)} rows in {elapsed:.1f}s")
# Basic cleaning — strip whitespace and deduplicate
docs = list({text.strip() for text in texts})
logger.info(f"After cleaning and dedup: {len(docs)} posts")
return docs
def train(docs: list[str], config: BertTopicTrainConfig) -> None:
logger.info(
f"Initialising BERTopic with MiniBatchKMeans (n_topics={config.n_topics})"
)
cluster_model = MiniBatchKMeans(
n_clusters=config.n_topics,
random_state=42,
batch_size=1024,
n_init=3,
verbose=1,
)
topic_model = BERTopic(
hdbscan_model=cluster_model,
language="english",
calculate_probabilities=False, # saves memory
verbose=True,
)
logger.info(f"Starting fit_transform on {len(docs)} posts (CPU)")
start = time.perf_counter()
topic_model.fit_transform(docs)
elapsed = time.perf_counter() - start
logger.info(f"Training complete in {elapsed:.1f}s ({elapsed / 60:.1f} min)")
# Log topic summary for quick inspection
topic_info = topic_model.get_topic_info()
logger.info(f"Topics found: {len(topic_info)}")
logger.info(f"\n{topic_info.to_string()}")
model_save_path = Path(config.model_save_path)
model_save_path.mkdir(parents=True, exist_ok=True)
logger.info(f"Saving model to {model_save_path}")
topic_model.save(
str(model_save_path),
serialization="safetensors",
save_ctfidf=True,
save_embedding_model=True,
)
logger.info("Model saved")
if __name__ == "__main__":
main()
+57
View File
@@ -2,6 +2,7 @@ from __future__ import annotations
from dataclasses import dataclass
from os import getenv
from datetime import date
from pathlib import Path
import tomllib
@@ -50,6 +51,7 @@ class FinetuneConfig:
)
@dataclass
class BenchmarkConfig:
"""Top-level benchmark configuration loaded from TOML."""
@@ -101,6 +103,45 @@ class OpenAIConfig:
)
@dataclass
class BertTopicTrainConfig:
"""BERTopic training configuration loaded from TOML."""
sample_rate: float
min_text_length: int
n_topics: int
model_save_path: str
model_version: str | None = None
@classmethod
def from_toml(cls, config_path: Path) -> BertTopicTrainConfig:
"""Load BERTopic training config from a TOML file."""
raw = tomllib.loads(config_path.read_text())["bertopic"]["train"]
today = date.today().isoformat()
if raw.get("model_version") is None:
raw["model_version"] = (
f"{today}-{raw['sample_rate']}-{raw['min_text_length']}-{raw['n_topics']}"
)
return cls(**raw)
@dataclass
class BertTopicInferConfig:
"""BERTopic inference configuration loaded from TOML."""
min_text_length: int
poc_batch_size: int
model_version: str
model_save_path: str
@classmethod
def from_toml(cls, config_path: Path) -> BertTopicInferConfig:
"""Load BERTopic inference config from a TOML file."""
raw = tomllib.loads(config_path.read_text())["bertopic"]["infer"]
return cls(**raw)
def get_config_dir() -> Path:
"""Get the path to the config directory."""
return Path(__file__).resolve().parents[2] / "config"
@@ -127,3 +168,19 @@ def get_benchmark_config(config_path: Path | None = None) -> BenchmarkConfig:
if config_path is None:
config_path = default_config_path()
return BenchmarkConfig.from_toml(config_path)
def get_bertopic_train_config(
config_path: Path | None = None,
) -> BertTopicTrainConfig:
if config_path is None:
config_path = default_config_path()
return BertTopicTrainConfig.from_toml(config_path)
def get_bertopic_infer_config(
config_path: Path | None = None,
) -> BertTopicInferConfig:
if config_path is None:
config_path = default_config_path()
return BertTopicInferConfig.from_toml(config_path)
@@ -23,7 +23,7 @@ from sqlalchemy import (
)
from sqlalchemy.orm import Session
from pipelines.congress_vote_context import create_score_run, finalize_score_run
from pipelines.jobs.congress_vote_context import create_score_run, finalize_score_run
from pipelines.orm.common import get_postgres_engine
from pipelines.orm.data_science_dev.congress import (
BillTopic,
@@ -39,7 +39,7 @@ from pipelines.orm.data_science_dev.congress import (
VoteRelationship,
VoteRecord,
)
from pipelines.pipelines.jobs.extract_bill_topics import normalize_topic_label
from pipelines.jobs.extract_bill_topics import normalize_topic_label
from pipelines.web.scoring import (
OPPOSE_POSITIONS,
SUPPORT_POSITIONS,
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+281
View File
@@ -0,0 +1,281 @@
"""Ingestion pipeline for loading JSONL post files into the weekly-partitioned posts table.
Usage:
ingest-posts /path/to/files/
ingest-posts /path/to/single_file.jsonl
ingest-posts /data/dir/ --workers 4 --batch-size 5000
"""
from __future__ import annotations
import logging
from datetime import UTC, datetime
from pathlib import Path # noqa: TC003 this is needed for typer
from typing import TYPE_CHECKING, Annotated
import orjson
import psycopg
import typer
from pipelines.pipelines.common import configure_logger
from pipelines.orm.common import get_connection_info
from pipelines.pipelines.parallelize import parallelize_process
if TYPE_CHECKING:
from collections.abc import Iterator
logger = logging.getLogger(__name__)
app = typer.Typer(help="Ingest JSONL post files into the partitioned posts table.")
@app.command()
def main(
path: Annotated[
Path,
typer.Argument(help="Directory containing JSONL files, or a single JSONL file"),
],
batch_size: Annotated[int, typer.Option(help="Rows per INSERT batch")] = 10000,
workers: Annotated[
int, typer.Option(help="Parallel workers for multi-file ingestion")
] = 4,
pattern: Annotated[
str, typer.Option(help="Glob pattern for JSONL files")
] = "*.jsonl",
) -> None:
"""Ingest JSONL post files into the weekly-partitioned posts table."""
configure_logger(level="INFO")
logger.info("starting ingest-posts")
logger.info(
"path=%s batch_size=%d workers=%d pattern=%s",
path,
batch_size,
workers,
pattern,
)
if path.is_file():
ingest_file(path, batch_size=batch_size)
elif path.is_dir():
ingest_directory(
path, batch_size=batch_size, max_workers=workers, pattern=pattern
)
else:
typer.echo(f"Path does not exist: {path}", err=True)
raise typer.Exit(code=1)
logger.info("ingest-posts done")
def ingest_directory(
directory: Path,
*,
batch_size: int,
max_workers: int,
pattern: str = "*.jsonl",
) -> None:
"""Ingest all JSONL files in a directory using parallel workers."""
files = sorted(directory.glob(pattern))
if not files:
logger.warning("No JSONL files found in %s", directory)
return
logger.info("Found %d JSONL files to ingest", len(files))
kwargs_list = [{"path": fp, "batch_size": batch_size} for fp in files]
parallelize_process(ingest_file, kwargs_list, max_workers=max_workers)
SCHEMA = "main"
COLUMNS = (
"post_id",
"user_id",
"instance",
"date",
"text",
"langs",
"like_count",
"reply_count",
"repost_count",
"reply_to",
"replied_author",
"thread_root",
"thread_root_author",
"repost_from",
"reposted_author",
"quotes",
"quoted_author",
"labels",
"sent_label",
"sent_score",
)
INSERT_FROM_STAGING = f"""
INSERT INTO {SCHEMA}.posts ({", ".join(COLUMNS)})
SELECT {", ".join(COLUMNS)} FROM pg_temp.staging
ON CONFLICT (post_id, date) DO NOTHING
""" # noqa: S608
FAILED_INSERT = f"""
INSERT INTO {SCHEMA}.failed_ingestion (raw_line, error)
VALUES (%(raw_line)s, %(error)s)
""" # noqa: S608
def get_psycopg_connection() -> psycopg.Connection:
"""Create a raw psycopg3 connection from environment variables."""
database, host, port, username, password = get_connection_info("DATA_SCIENCE_DEV")
return psycopg.connect(
dbname=database,
host=host,
port=int(port),
user=username,
password=password,
autocommit=False,
)
def ingest_file(path: Path, *, batch_size: int) -> None:
"""Ingest a single JSONL file into the posts table."""
log_trigger = max(100_000 // batch_size, 1)
failed_lines: list[dict] = []
try:
with get_psycopg_connection() as connection:
for index, batch in enumerate(
read_jsonl_batches(path, batch_size, failed_lines), 1
):
ingest_batch(connection, batch)
if index % log_trigger == 0:
logger.info(
"Ingested %d batches (%d rows) from %s",
index,
index * batch_size,
path,
)
if failed_lines:
logger.warning(
"Recording %d malformed lines from %s", len(failed_lines), path.name
)
with connection.cursor() as cursor:
cursor.executemany(FAILED_INSERT, failed_lines)
connection.commit()
except Exception:
logger.exception("Failed to ingest file: %s", path)
raise
def ingest_batch(connection: psycopg.Connection, batch: list[dict]) -> None:
"""COPY batch into a temp staging table, then INSERT ... ON CONFLICT into posts."""
if not batch:
return
try:
with connection.cursor() as cursor:
cursor.execute(f"""
CREATE TEMP TABLE IF NOT EXISTS staging
(LIKE {SCHEMA}.posts INCLUDING DEFAULTS)
ON COMMIT DELETE ROWS
""")
cursor.execute("TRUNCATE pg_temp.staging")
with cursor.copy(
f"COPY pg_temp.staging ({', '.join(COLUMNS)}) FROM STDIN"
) as copy:
for row in batch:
copy.write_row(tuple(row.get(column) for column in COLUMNS))
cursor.execute(INSERT_FROM_STAGING)
connection.commit()
except Exception as error:
connection.rollback()
if len(batch) == 1:
logger.exception("Skipping bad row post_id=%s", batch[0].get("post_id"))
with connection.cursor() as cursor:
cursor.execute(
FAILED_INSERT,
{
"raw_line": orjson.dumps(batch[0], default=str).decode(),
"error": str(error),
},
)
connection.commit()
return
midpoint = len(batch) // 2
ingest_batch(connection, batch[:midpoint])
ingest_batch(connection, batch[midpoint:])
def read_jsonl_batches(
file_path: Path, batch_size: int, failed_lines: list[dict]
) -> Iterator[list[dict]]:
"""Stream a JSONL file and yield batches of transformed rows."""
batch: list[dict] = []
with file_path.open("r", encoding="utf-8") as handle:
for raw_line in handle:
line = raw_line.strip()
if not line:
continue
batch.extend(parse_line(line, file_path, failed_lines))
if len(batch) >= batch_size:
yield batch
batch = []
if batch:
yield batch
def parse_line(line: str, file_path: Path, failed_lines: list[dict]) -> Iterator[dict]:
"""Parse a JSONL line, handling concatenated JSON objects."""
try:
yield transform_row(orjson.loads(line))
except orjson.JSONDecodeError:
if "}{" not in line:
logger.warning(
"Skipping malformed line in %s: %s", file_path.name, line[:120]
)
failed_lines.append({"raw_line": line, "error": "malformed JSON"})
return
fragments = line.replace("}{", "}\n{").split("\n")
for fragment in fragments:
try:
yield transform_row(orjson.loads(fragment))
except (orjson.JSONDecodeError, KeyError, ValueError) as error:
logger.warning(
"Skipping malformed fragment in %s: %s",
file_path.name,
fragment[:120],
)
failed_lines.append({"raw_line": fragment, "error": str(error)})
except Exception as error:
logger.exception("Skipping bad row in %s: %s", file_path.name, line[:120])
failed_lines.append({"raw_line": line, "error": str(error)})
def transform_row(raw: dict) -> dict:
"""Transform a raw JSONL row into a dict matching the Posts table columns."""
raw["date"] = parse_date(raw["date"])
if raw.get("langs") is not None:
raw["langs"] = orjson.dumps(raw["langs"])
if raw.get("text") is not None:
raw["text"] = raw["text"].replace("\x00", "")
return raw
def parse_date(raw_date: int) -> datetime:
"""Parse compact YYYYMMDDHHmm integer into a naive datetime (input is UTC by spec)."""
return datetime(
raw_date // 100000000,
(raw_date // 1000000) % 100,
(raw_date // 10000) % 100,
(raw_date // 100) % 100,
raw_date % 100,
tzinfo=UTC,
)
if __name__ == "__main__":
app()
-34
View File
@@ -1,34 +0,0 @@
SUMMARIZATION_SYSTEM_PROMPT = """You are a legislative analyst extracting policy substance from Congressional bill text.
Your job is to compress a bill into a dense, neutral structured summary that captures every distinct policy action including secondary effects that might be buried in subsections.
EXTRACTION RULES:
- IGNORE: whereas clauses, congressional findings that are purely political statements, recitals, preambles, citations of existing law by number alone, and procedural boilerplate.
- FOCUS ON: operative verbs what the bill SHALL do, PROHIBIT, REQUIRE, AUTHORIZE, AMEND, APPROPRIATE, or ESTABLISH.
- SURFACE ALL THREADS: If the bill touches multiple policy areas, list each thread separately. Do not collapse them.
- BE CONCRETE: Name the affected population, the mechanism, and the direction (expands/restricts/maintains).
- STAY NEUTRAL: No political framing. Describe what the text does, not what its sponsors claim it does.
OUTPUT FORMAT plain structured text, not JSON:
OPERATIVE ACTIONS:
[Numbered list of what the bill actually does, one action per line, max 20 words each]
AFFECTED POPULATIONS:
[Who gains something, who loses something, or whose behavior is regulated]
MECHANISMS:
[How it works: new funding, mandate, prohibition, amendment to existing statute, grant program, study commission, etc.]
POLICY THREADS:
[List each distinct policy domain this bill touches, even minor ones. Use plain language, not domain codes.]
SYMBOLIC/PROCEDURAL ONLY:
[Yes or No is this bill primarily a resolution, designation, or awareness declaration with no operative effect?]
LENGTH TARGET: 150-250 words total. Be ruthless about cutting. Density over completeness."""
SUMMARIZATION_USER_TEMPLATE = """Summarize the following Congressional bill according to your instructions.
BILL TEXT:
{text_content}"""
View File
+22
View File
@@ -0,0 +1,22 @@
[project]
name = "ds-testing-pipelines"
version = "0.1.0"
description = "Data science pipeline tools and legislative dashboard."
requires-python = ">=3.12"
dependencies = [
"fastapi",
"httpx",
"uvicorn[standard]",
"jinja2",
"sqlalchemy",
"psycopg",
]
[project.optional-dependencies]
test = [
"pytest",
]
[tool.pytest.ini_options]
testpaths = ["tests"]
pythonpath = ["."]