10 Commits

30 changed files with 4950 additions and 177 deletions
+16
View File
@@ -0,0 +1,16 @@
.git
.pytest_cache
.ruff_cache
__pycache__
*.pyc
*.pyo
*.pyd
.venv
venv
env
ENV
.env
dist
build
htmlcov
coverage.xml
+21
View File
@@ -0,0 +1,21 @@
# Postgres used by the FastAPI app
DATA_SCIENCE_DEV_DB=your_existing_database
DATA_SCIENCE_DEV_HOST=your_existing_postgres_host
DATA_SCIENCE_DEV_PORT=5432
DATA_SCIENCE_DEV_USER=your_existing_postgres_user
DATA_SCIENCE_DEV_PASSWORD=your_existing_postgres_password
# WorkOS AuthKit
WORKOS_API_KEY=sk_test_your_workos_api_key
WORKOS_CLIENT_ID=client_your_workos_client_id
WORKOS_COOKIE_PASSWORD=replace_with_a_long_random_secret_at_least_32_chars
WORKOS_ORGANIZATION_ID=org_your_workspace_org_id
WORKOS_REDIRECT_URI=http://localhost:8000/callback
WORKOS_LOGOUT_REDIRECT_URI=http://localhost:8000/
WORKOS_SESSION_COOKIE_NAME=workos_session
# Optional local port overrides for Docker Compose
WEB_PUBLISHED_PORT=8000
# Only used if you explicitly start the optional local Postgres profile
POSTGRES_PUBLISHED_PORT=5432
+25
View File
@@ -0,0 +1,25 @@
FROM python:3.12-slim
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
WORKDIR /app
RUN apt-get update \
&& apt-get install -y --no-install-recommends libpq5 \
&& rm -rf /var/lib/apt/lists/*
COPY pyproject.toml /app/pyproject.toml
COPY __init__.py /app/__init__.py
COPY alembic /app/alembic
COPY database_cli.py /app/database_cli.py
COPY pipelines /app/pipelines
COPY docker /app/docker
RUN pip install --no-cache-dir .
RUN chmod +x /app/docker/web-entrypoint.sh
EXPOSE 8000
CMD ["/app/docker/web-entrypoint.sh"]
+52
View File
@@ -0,0 +1,52 @@
services:
db:
image: postgres:16
profiles: ["localdb"]
restart: unless-stopped
environment:
POSTGRES_DB: ${DATA_SCIENCE_DEV_DB:-nornsight}
POSTGRES_USER: ${DATA_SCIENCE_DEV_USER:-nornsight}
POSTGRES_PASSWORD: ${DATA_SCIENCE_DEV_PASSWORD:-nornsight}
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test:
[
"CMD-SHELL",
"pg_isready -U ${DATA_SCIENCE_DEV_USER:-nornsight} -d ${DATA_SCIENCE_DEV_DB:-nornsight}",
]
interval: 5s
timeout: 5s
retries: 20
start_period: 5s
ports:
- "${POSTGRES_PUBLISHED_PORT:-5432}:5432"
web:
build:
context: .
dockerfile: Dockerfile
restart: unless-stopped
dns:
- ${WEB_DNS_1:-1.1.1.1}
- ${WEB_DNS_2:-8.8.8.8}
environment:
DATA_SCIENCE_DEV_DB: ${DATA_SCIENCE_DEV_DB}
DATA_SCIENCE_DEV_HOST: ${DATA_SCIENCE_DEV_HOST}
DATA_SCIENCE_DEV_PORT: ${DATA_SCIENCE_DEV_PORT}
DATA_SCIENCE_DEV_USER: ${DATA_SCIENCE_DEV_USER}
DATA_SCIENCE_DEV_PASSWORD: ${DATA_SCIENCE_DEV_PASSWORD}
WORKOS_API_KEY: ${WORKOS_API_KEY}
WORKOS_CLIENT_ID: ${WORKOS_CLIENT_ID}
WORKOS_COOKIE_PASSWORD: ${WORKOS_COOKIE_PASSWORD}
WORKOS_ORGANIZATION_ID: ${WORKOS_ORGANIZATION_ID}
WORKOS_REDIRECT_URI: ${WORKOS_REDIRECT_URI:-http://localhost:8000/callback}
WORKOS_LOGOUT_REDIRECT_URI: ${WORKOS_LOGOUT_REDIRECT_URI:-http://localhost:8000/}
WORKOS_SESSION_COOKIE_NAME: ${WORKOS_SESSION_COOKIE_NAME:-workos_session}
UVICORN_HOST: 0.0.0.0
UVICORN_PORT: 8000
ports:
- "${WEB_PUBLISHED_PORT:-8000}:8000"
volumes:
postgres_data:
+33
View File
@@ -0,0 +1,33 @@
#!/usr/bin/env sh
set -eu
python - <<'PY'
import os
import time
import psycopg
db = os.environ["DATA_SCIENCE_DEV_DB"]
host = os.environ["DATA_SCIENCE_DEV_HOST"]
port = os.environ["DATA_SCIENCE_DEV_PORT"]
user = os.environ["DATA_SCIENCE_DEV_USER"]
password = os.environ.get("DATA_SCIENCE_DEV_PASSWORD", "")
dsn = f"dbname={db} host={host} port={port} user={user} password={password}"
for attempt in range(60):
try:
with psycopg.connect(dsn) as conn:
with conn.cursor() as cur:
cur.execute("CREATE SCHEMA IF NOT EXISTS main")
conn.commit()
break
except psycopg.OperationalError:
if attempt == 59:
raise
time.sleep(1)
PY
python /app/database_cli.py data_science_dev upgrade head
exec uvicorn pipelines.web.main:app --host "${UVICORN_HOST:-0.0.0.0}" --port "${UVICORN_PORT:-8000}"
+1 -1
View File
@@ -1 +1 @@
"""Prompt benchmarking system for evaluating LLMs via vLLM."""
"""Init."""
+116
View File
@@ -0,0 +1,116 @@
"""Nornsight — BERTopic POC Inference Script.
Loads the trained model and labels a small batch of posts,
writing results to main.post_topic for inspection.
POC: processes a single batch of 1k posts to validate the pipeline end-to-end.
"""
from __future__ import annotations
import logging
import time
from collections import Counter
from pathlib import Path
from bertopic import BERTopic
from sqlalchemy import Engine, func, insert, select
from sqlalchemy.orm import Session
from pipelines.config import BertTopicInferConfig, get_bertopic_infer_config
from pipelines.orm.common import get_postgres_engine
from pipelines.orm.data_science_dev.posts import PostTopic, Posts
from pipelines.orm.data_science_dev.posts.lang_filters import ENGLISH_LANGS
from pipelines.pipelines.common import configure_logger
logger = logging.getLogger(__name__)
def main() -> None:
"""Run BERTopic inference against a sample of posts."""
configure_logger()
config = get_bertopic_infer_config()
run_inference(config)
logger.info(
"POC inference complete. Check main.post_topic in DBeaver to inspect results."
)
def run_inference(config: BertTopicInferConfig) -> None:
model_save_path = Path(config.model_save_path)
logger.info(f"Loading BERTopic model from {model_save_path}")
topic_model = BERTopic.load(str(model_save_path))
topic_info = topic_model.get_topic_info()
label_map: dict[int, str] = dict(zip(topic_info["Topic"], topic_info["Name"]))
logger.info(f"Model loaded with {len(label_map)} topics")
engine = get_postgres_engine(name="DATA_SCIENCE_DEV")
post_ids, texts = get_post_ids_and_test(engine, config)
logger.info(f"Fetched {len(texts)} posts")
logger.info("Running BERTopic transform")
start = time.perf_counter()
topics, _probabilities = topic_model.transform(texts)
elapsed = time.perf_counter() - start
logger.info(f"Transform complete in {elapsed:.1f}s")
# Write results to main.post_topic
records = [
{
"post_id": pid,
"topic_id": int(topic_id),
"topic_label": label_map.get(int(topic_id), "unknown"),
"model_version": config.model_version,
}
for pid, topic_id in zip(post_ids, topics)
]
with Session(engine) as session:
session.execute(insert(PostTopic), records)
session.commit()
count_topics(records)
logger.info(f"Wrote {len(records)} topic labels to main.post_topic")
def get_post_ids_and_test(
engine: Engine,
config: BertTopicInferConfig,
) -> None | tuple[list[int], list[str]]:
with Session(engine) as session:
logger.info(f"Fetching {config.poc_batch_size} posts for inference")
# Pull a fresh batch for inference — distinct from training sample
# using a fixed seed offset so we're not re-labeling training posts
stmt = select(Posts).where(
Posts.text.is_not(None),
Posts.langs.in_(ENGLISH_LANGS),
func.length(Posts.text) > config.min_text_length,
)
if config.poc_batch_size > 0:
stmt = stmt.limit(config.poc_batch_size)
posts = session.scalars(stmt).all()
if not posts:
logger.warning("No posts were selected for inference")
return [], []
post_ids = [post.post_id for post in posts]
texts = [post.text.strip() for post in posts]
return post_ids, texts
def count_topics(records: list[dict]) -> None:
topic_counts = Counter(record.get("topic_label", "unknown") for record in records)
logger.info("Topic distribution in this batch:")
for label, count in topic_counts.most_common(10):
logger.info(" %s: %d", label, count)
if __name__ == "__main__":
main()
+119
View File
@@ -0,0 +1,119 @@
"""Nornsight — BERTopic POC Training Script.
Pulls a small stratified sample (~11.5k posts) from main.posts,
trains BERTopic with MiniBatchKMeans on Jeeves, and saves the model locally.
POC sample rate: random() < 0.00005 (~0.005% of 230M = ~11.5k posts)
Full training rate will be: random() < 0.005 (~1.08M posts)
"""
from __future__ import annotations
import logging
import time
from pathlib import Path
from bertopic import BERTopic
from sklearn.cluster import MiniBatchKMeans
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from pipelines.config import BertTopicTrainConfig, get_bertopic_train_config
from pipelines.orm.common import get_postgres_engine
from pipelines.orm.data_science_dev.posts import Posts
from pipelines.orm.data_science_dev.posts.lang_filters import ENGLISH_LANGS
from pipelines.pipelines.common import configure_logger
logger = logging.getLogger(__name__)
def main() -> None:
"""Train and persist the BERTopic model."""
configure_logger()
config = get_bertopic_train_config()
docs = load_sample(config)
if not docs:
logger.warning("No training documents were selected")
return
train(docs, config)
logger.info(f"Done. Model saved as version {config.model_version}")
logger.info("Next: run infer.py to label a sample of posts in the database")
def load_sample(config: BertTopicTrainConfig) -> list[str]:
logger.info("Connecting to PostgreSQL via SQLAlchemy")
engine = get_postgres_engine(name="DATA_SCIENCE_DEV")
logger.info(f"Pulling sample from main.posts (sample_rate={config.sample_rate})")
start = time.perf_counter()
with Session(engine) as session:
texts = session.scalars(
select(Posts.text).where(
Posts.text.is_not(None),
Posts.langs.in_(ENGLISH_LANGS),
func.length(Posts.text) > config.min_text_length,
func.random() < config.sample_rate,
)
).all()
elapsed = time.perf_counter() - start
logger.info(f"Fetched {len(texts)} rows in {elapsed:.1f}s")
# Basic cleaning — strip whitespace and deduplicate
docs = list({text.strip() for text in texts})
logger.info(f"After cleaning and dedup: {len(docs)} posts")
return docs
def train(docs: list[str], config: BertTopicTrainConfig) -> None:
logger.info(
f"Initialising BERTopic with MiniBatchKMeans (n_topics={config.n_topics})"
)
cluster_model = MiniBatchKMeans(
n_clusters=config.n_topics,
random_state=42,
batch_size=1024,
n_init=3,
verbose=1,
)
topic_model = BERTopic(
hdbscan_model=cluster_model,
language="english",
calculate_probabilities=False, # saves memory
verbose=True,
)
logger.info(f"Starting fit_transform on {len(docs)} posts (CPU)")
start = time.perf_counter()
topic_model.fit_transform(docs)
elapsed = time.perf_counter() - start
logger.info(f"Training complete in {elapsed:.1f}s ({elapsed / 60:.1f} min)")
# Log topic summary for quick inspection
topic_info = topic_model.get_topic_info()
logger.info(f"Topics found: {len(topic_info)}")
logger.info(f"\n{topic_info.to_string()}")
model_save_path = Path(config.model_save_path)
model_save_path.mkdir(parents=True, exist_ok=True)
logger.info(f"Saving model to {model_save_path}")
topic_model.save(
str(model_save_path),
serialization="safetensors",
save_ctfidf=True,
save_embedding_model=True,
)
logger.info("Model saved")
if __name__ == "__main__":
main()
-21
View File
@@ -5,10 +5,8 @@ from __future__ import annotations
import logging
import sys
from datetime import UTC, datetime
from os import getenv
from subprocess import PIPE, Popen
from apprise import Apprise
logger = logging.getLogger(__name__)
@@ -47,25 +45,6 @@ def bash_wrapper(command: str) -> tuple[str, int]:
return output.decode(), process.returncode
def signal_alert(body: str, title: str = "") -> None:
"""Send a signal alert.
Args:
body (str): The body of the alert.
title (str, optional): The title of the alert. Defaults to "".
"""
apprise_client = Apprise()
from_phone = getenv("SIGNAL_ALERT_FROM_PHONE")
to_phone = getenv("SIGNAL_ALERT_TO_PHONE")
if not from_phone or not to_phone:
logger.info("SIGNAL_ALERT_FROM_PHONE or SIGNAL_ALERT_TO_PHONE not set")
return
apprise_client.add(f"signal://localhost:8989/{from_phone}/{to_phone}")
apprise_client.notify(title=title, body=body)
def utcnow() -> datetime:
"""Get the current UTC time."""
+57
View File
@@ -2,6 +2,7 @@ from __future__ import annotations
from dataclasses import dataclass
from os import getenv
from datetime import date
from pathlib import Path
import tomllib
@@ -50,6 +51,7 @@ class FinetuneConfig:
)
@dataclass
class BenchmarkConfig:
"""Top-level benchmark configuration loaded from TOML."""
@@ -101,6 +103,45 @@ class OpenAIConfig:
)
@dataclass
class BertTopicTrainConfig:
"""BERTopic training configuration loaded from TOML."""
sample_rate: float
min_text_length: int
n_topics: int
model_save_path: str
model_version: str | None = None
@classmethod
def from_toml(cls, config_path: Path) -> BertTopicTrainConfig:
"""Load BERTopic training config from a TOML file."""
raw = tomllib.loads(config_path.read_text())["bertopic"]["train"]
today = date.today().isoformat()
if raw.get("model_version") is None:
raw["model_version"] = (
f"{today}-{raw['sample_rate']}-{raw['min_text_length']}-{raw['n_topics']}"
)
return cls(**raw)
@dataclass
class BertTopicInferConfig:
"""BERTopic inference configuration loaded from TOML."""
min_text_length: int
poc_batch_size: int
model_version: str
model_save_path: str
@classmethod
def from_toml(cls, config_path: Path) -> BertTopicInferConfig:
"""Load BERTopic inference config from a TOML file."""
raw = tomllib.loads(config_path.read_text())["bertopic"]["infer"]
return cls(**raw)
def get_config_dir() -> Path:
"""Get the path to the config directory."""
return Path(__file__).resolve().parents[2] / "config"
@@ -127,3 +168,19 @@ def get_benchmark_config(config_path: Path | None = None) -> BenchmarkConfig:
if config_path is None:
config_path = default_config_path()
return BenchmarkConfig.from_toml(config_path)
def get_bertopic_train_config(
config_path: Path | None = None,
) -> BertTopicTrainConfig:
if config_path is None:
config_path = default_config_path()
return BertTopicTrainConfig.from_toml(config_path)
def get_bertopic_infer_config(
config_path: Path | None = None,
) -> BertTopicInferConfig:
if config_path is None:
config_path = default_config_path()
return BertTopicInferConfig.from_toml(config_path)
+197
View File
@@ -0,0 +1,197 @@
"""Docker container lifecycle management for the web app stack."""
from __future__ import annotations
import logging
import os
import subprocess
from pathlib import Path
from typing import Annotated, Literal
import typer
logger = logging.getLogger(__name__)
REPO_DIR = Path(__file__).resolve().parents[2]
COMPOSE_FILE = REPO_DIR / "docker-compose.yml"
EnvTarget = Literal["all", "web", "db"]
REQUIRED_WORKOS_ENV_VARS = (
"WORKOS_API_KEY",
"WORKOS_CLIENT_ID",
"WORKOS_COOKIE_PASSWORD",
"WORKOS_ORGANIZATION_ID",
)
app = typer.Typer(help="Web stack container management.")
def _compose_command(*args: str) -> list[str]:
"""Build a docker compose command for the repo-local stack."""
return ["docker", "compose", "-f", str(COMPOSE_FILE), *args]
def _run_compose(
*args: str,
capture_output: bool = False,
check: bool = True,
) -> subprocess.CompletedProcess[str]:
"""Run docker compose in the repository root."""
result = subprocess.run(
_compose_command(*args),
cwd=REPO_DIR,
text=True,
capture_output=capture_output,
check=False,
)
if check and result.returncode != 0:
detail = result.stderr.strip() if result.stderr else f"exit code {result.returncode}"
raise RuntimeError(f"docker compose {' '.join(args)} failed: {detail}")
return result
def _validate_workos_env() -> None:
"""Ensure the web app has the WorkOS env vars it needs before startup."""
missing = [name for name in REQUIRED_WORKOS_ENV_VARS if not os.getenv(name)]
if missing:
message = (
"Missing required WorkOS environment variables: "
+ ", ".join(missing)
+ ". Populate .env before running the web stack."
)
raise RuntimeError(message)
cookie_password = os.getenv("WORKOS_COOKIE_PASSWORD", "")
if len(cookie_password) < 32:
raise RuntimeError("WORKOS_COOKIE_PASSWORD must be at least 32 characters long.")
def build_stack() -> None:
"""Build the web app image."""
logger.info("Building web image from %s", COMPOSE_FILE)
_run_compose("build", "web", capture_output=False)
logger.info("Web image built")
def _validate_database_env() -> None:
"""Ensure the web app has the database env vars it needs before startup."""
required = (
"DATA_SCIENCE_DEV_DB",
"DATA_SCIENCE_DEV_HOST",
"DATA_SCIENCE_DEV_PORT",
"DATA_SCIENCE_DEV_USER",
)
missing = [name for name in required if not os.getenv(name)]
if missing:
message = (
"Missing required database environment variables: "
+ ", ".join(missing)
+ ". Populate .env before running the web stack."
)
raise RuntimeError(message)
def start_stack(
*, build: bool = False, detach: bool = False, with_local_db: bool = False
) -> None:
"""Start the web stack, using the existing DB by default."""
_validate_workos_env()
_validate_database_env()
command = ["up"]
if build:
command.append("--build")
if detach:
command.append("-d")
if with_local_db:
command.extend(["--profile", "localdb", "db", "web"])
else:
command.append("web")
logger.info(
"Starting web stack%s",
" with local Postgres" if with_local_db else " against existing Postgres",
)
_run_compose(*command, capture_output=False)
def stop_stack(*, drop_volumes: bool = False) -> None:
"""Stop and remove the web stack."""
logger.info("Stopping web stack")
command = ["down"]
if drop_volumes:
command.append("--volumes")
_run_compose(*command, capture_output=False)
def logs_stack(*, target: EnvTarget = "all", follow: bool = False, tail: int = 100) -> None:
"""Show docker compose logs for the web stack."""
command = ["logs", "--tail", str(tail)]
if follow:
command.append("--follow")
if target != "all":
command.append(target)
_run_compose(*command, capture_output=False)
@app.command()
def build(
log_level: Annotated[str, typer.Option(help="Log level")] = "INFO",
) -> None:
"""Build the web Docker image."""
logging.basicConfig(
level=log_level,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
build_stack()
@app.command()
def run(
build: Annotated[
bool, typer.Option(help="Rebuild the web image before starting the stack")
] = False,
detach: Annotated[
bool, typer.Option(help="Start the stack in the background")
] = False,
with_local_db: Annotated[
bool, typer.Option(help="Also start the optional local Postgres container")
] = False,
log_level: Annotated[str, typer.Option(help="Log level")] = "INFO",
) -> None:
"""Run the web + Postgres stack."""
logging.basicConfig(
level=log_level,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
start_stack(build=build, detach=detach, with_local_db=with_local_db)
@app.command()
def stop(
drop_volumes: Annotated[
bool, typer.Option(help="Also delete the Postgres volume")
] = False,
) -> None:
"""Stop and remove the web stack."""
stop_stack(drop_volumes=drop_volumes)
@app.command()
def logs(
target: Annotated[
EnvTarget, typer.Option(help="Which service logs to show")
] = "all",
follow: Annotated[
bool, typer.Option(help="Follow logs until interrupted")
] = False,
tail: Annotated[int, typer.Option(help="How many recent lines to show")] = 100,
) -> None:
"""Show recent logs from the web stack."""
logs_stack(target=target, follow=follow, tail=tail)
def cli() -> None:
"""Typer entry point."""
app()
if __name__ == "__main__":
cli()
@@ -23,7 +23,7 @@ from sqlalchemy import (
)
from sqlalchemy.orm import Session
from pipelines.congress_vote_context import create_score_run, finalize_score_run
from pipelines.jobs.congress_vote_context import create_score_run, finalize_score_run
from pipelines.orm.common import get_postgres_engine
from pipelines.orm.data_science_dev.congress import (
BillTopic,
@@ -39,7 +39,7 @@ from pipelines.orm.data_science_dev.congress import (
VoteRelationship,
VoteRecord,
)
from pipelines.pipelines.jobs.extract_bill_topics import normalize_topic_label
from pipelines.jobs.extract_bill_topics import normalize_topic_label
from pipelines.web.scoring import (
OPPOSE_POSITIONS,
SUPPORT_POSITIONS,
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+281
View File
@@ -0,0 +1,281 @@
"""Ingestion pipeline for loading JSONL post files into the weekly-partitioned posts table.
Usage:
ingest-posts /path/to/files/
ingest-posts /path/to/single_file.jsonl
ingest-posts /data/dir/ --workers 4 --batch-size 5000
"""
from __future__ import annotations
import logging
from datetime import UTC, datetime
from pathlib import Path # noqa: TC003 this is needed for typer
from typing import TYPE_CHECKING, Annotated
import orjson
import psycopg
import typer
from pipelines.pipelines.common import configure_logger
from pipelines.orm.common import get_connection_info
from pipelines.pipelines.parallelize import parallelize_process
if TYPE_CHECKING:
from collections.abc import Iterator
logger = logging.getLogger(__name__)
app = typer.Typer(help="Ingest JSONL post files into the partitioned posts table.")
@app.command()
def main(
path: Annotated[
Path,
typer.Argument(help="Directory containing JSONL files, or a single JSONL file"),
],
batch_size: Annotated[int, typer.Option(help="Rows per INSERT batch")] = 10000,
workers: Annotated[
int, typer.Option(help="Parallel workers for multi-file ingestion")
] = 4,
pattern: Annotated[
str, typer.Option(help="Glob pattern for JSONL files")
] = "*.jsonl",
) -> None:
"""Ingest JSONL post files into the weekly-partitioned posts table."""
configure_logger(level="INFO")
logger.info("starting ingest-posts")
logger.info(
"path=%s batch_size=%d workers=%d pattern=%s",
path,
batch_size,
workers,
pattern,
)
if path.is_file():
ingest_file(path, batch_size=batch_size)
elif path.is_dir():
ingest_directory(
path, batch_size=batch_size, max_workers=workers, pattern=pattern
)
else:
typer.echo(f"Path does not exist: {path}", err=True)
raise typer.Exit(code=1)
logger.info("ingest-posts done")
def ingest_directory(
directory: Path,
*,
batch_size: int,
max_workers: int,
pattern: str = "*.jsonl",
) -> None:
"""Ingest all JSONL files in a directory using parallel workers."""
files = sorted(directory.glob(pattern))
if not files:
logger.warning("No JSONL files found in %s", directory)
return
logger.info("Found %d JSONL files to ingest", len(files))
kwargs_list = [{"path": fp, "batch_size": batch_size} for fp in files]
parallelize_process(ingest_file, kwargs_list, max_workers=max_workers)
SCHEMA = "main"
COLUMNS = (
"post_id",
"user_id",
"instance",
"date",
"text",
"langs",
"like_count",
"reply_count",
"repost_count",
"reply_to",
"replied_author",
"thread_root",
"thread_root_author",
"repost_from",
"reposted_author",
"quotes",
"quoted_author",
"labels",
"sent_label",
"sent_score",
)
INSERT_FROM_STAGING = f"""
INSERT INTO {SCHEMA}.posts ({", ".join(COLUMNS)})
SELECT {", ".join(COLUMNS)} FROM pg_temp.staging
ON CONFLICT (post_id, date) DO NOTHING
""" # noqa: S608
FAILED_INSERT = f"""
INSERT INTO {SCHEMA}.failed_ingestion (raw_line, error)
VALUES (%(raw_line)s, %(error)s)
""" # noqa: S608
def get_psycopg_connection() -> psycopg.Connection:
"""Create a raw psycopg3 connection from environment variables."""
database, host, port, username, password = get_connection_info("DATA_SCIENCE_DEV")
return psycopg.connect(
dbname=database,
host=host,
port=int(port),
user=username,
password=password,
autocommit=False,
)
def ingest_file(path: Path, *, batch_size: int) -> None:
"""Ingest a single JSONL file into the posts table."""
log_trigger = max(100_000 // batch_size, 1)
failed_lines: list[dict] = []
try:
with get_psycopg_connection() as connection:
for index, batch in enumerate(
read_jsonl_batches(path, batch_size, failed_lines), 1
):
ingest_batch(connection, batch)
if index % log_trigger == 0:
logger.info(
"Ingested %d batches (%d rows) from %s",
index,
index * batch_size,
path,
)
if failed_lines:
logger.warning(
"Recording %d malformed lines from %s", len(failed_lines), path.name
)
with connection.cursor() as cursor:
cursor.executemany(FAILED_INSERT, failed_lines)
connection.commit()
except Exception:
logger.exception("Failed to ingest file: %s", path)
raise
def ingest_batch(connection: psycopg.Connection, batch: list[dict]) -> None:
"""COPY batch into a temp staging table, then INSERT ... ON CONFLICT into posts."""
if not batch:
return
try:
with connection.cursor() as cursor:
cursor.execute(f"""
CREATE TEMP TABLE IF NOT EXISTS staging
(LIKE {SCHEMA}.posts INCLUDING DEFAULTS)
ON COMMIT DELETE ROWS
""")
cursor.execute("TRUNCATE pg_temp.staging")
with cursor.copy(
f"COPY pg_temp.staging ({', '.join(COLUMNS)}) FROM STDIN"
) as copy:
for row in batch:
copy.write_row(tuple(row.get(column) for column in COLUMNS))
cursor.execute(INSERT_FROM_STAGING)
connection.commit()
except Exception as error:
connection.rollback()
if len(batch) == 1:
logger.exception("Skipping bad row post_id=%s", batch[0].get("post_id"))
with connection.cursor() as cursor:
cursor.execute(
FAILED_INSERT,
{
"raw_line": orjson.dumps(batch[0], default=str).decode(),
"error": str(error),
},
)
connection.commit()
return
midpoint = len(batch) // 2
ingest_batch(connection, batch[:midpoint])
ingest_batch(connection, batch[midpoint:])
def read_jsonl_batches(
file_path: Path, batch_size: int, failed_lines: list[dict]
) -> Iterator[list[dict]]:
"""Stream a JSONL file and yield batches of transformed rows."""
batch: list[dict] = []
with file_path.open("r", encoding="utf-8") as handle:
for raw_line in handle:
line = raw_line.strip()
if not line:
continue
batch.extend(parse_line(line, file_path, failed_lines))
if len(batch) >= batch_size:
yield batch
batch = []
if batch:
yield batch
def parse_line(line: str, file_path: Path, failed_lines: list[dict]) -> Iterator[dict]:
"""Parse a JSONL line, handling concatenated JSON objects."""
try:
yield transform_row(orjson.loads(line))
except orjson.JSONDecodeError:
if "}{" not in line:
logger.warning(
"Skipping malformed line in %s: %s", file_path.name, line[:120]
)
failed_lines.append({"raw_line": line, "error": "malformed JSON"})
return
fragments = line.replace("}{", "}\n{").split("\n")
for fragment in fragments:
try:
yield transform_row(orjson.loads(fragment))
except (orjson.JSONDecodeError, KeyError, ValueError) as error:
logger.warning(
"Skipping malformed fragment in %s: %s",
file_path.name,
fragment[:120],
)
failed_lines.append({"raw_line": fragment, "error": str(error)})
except Exception as error:
logger.exception("Skipping bad row in %s: %s", file_path.name, line[:120])
failed_lines.append({"raw_line": line, "error": str(error)})
def transform_row(raw: dict) -> dict:
"""Transform a raw JSONL row into a dict matching the Posts table columns."""
raw["date"] = parse_date(raw["date"])
if raw.get("langs") is not None:
raw["langs"] = orjson.dumps(raw["langs"])
if raw.get("text") is not None:
raw["text"] = raw["text"].replace("\x00", "")
return raw
def parse_date(raw_date: int) -> datetime:
"""Parse compact YYYYMMDDHHmm integer into a naive datetime (input is UTC by spec)."""
return datetime(
raw_date // 100000000,
(raw_date // 1000000) % 100,
(raw_date // 10000) % 100,
(raw_date // 100) % 100,
raw_date % 100,
tzinfo=UTC,
)
if __name__ == "__main__":
app()
-34
View File
@@ -1,34 +0,0 @@
SUMMARIZATION_SYSTEM_PROMPT = """You are a legislative analyst extracting policy substance from Congressional bill text.
Your job is to compress a bill into a dense, neutral structured summary that captures every distinct policy action including secondary effects that might be buried in subsections.
EXTRACTION RULES:
- IGNORE: whereas clauses, congressional findings that are purely political statements, recitals, preambles, citations of existing law by number alone, and procedural boilerplate.
- FOCUS ON: operative verbs what the bill SHALL do, PROHIBIT, REQUIRE, AUTHORIZE, AMEND, APPROPRIATE, or ESTABLISH.
- SURFACE ALL THREADS: If the bill touches multiple policy areas, list each thread separately. Do not collapse them.
- BE CONCRETE: Name the affected population, the mechanism, and the direction (expands/restricts/maintains).
- STAY NEUTRAL: No political framing. Describe what the text does, not what its sponsors claim it does.
OUTPUT FORMAT plain structured text, not JSON:
OPERATIVE ACTIONS:
[Numbered list of what the bill actually does, one action per line, max 20 words each]
AFFECTED POPULATIONS:
[Who gains something, who loses something, or whose behavior is regulated]
MECHANISMS:
[How it works: new funding, mandate, prohibition, amendment to existing statute, grant program, study commission, etc.]
POLICY THREADS:
[List each distinct policy domain this bill touches, even minor ones. Use plain language, not domain codes.]
SYMBOLIC/PROCEDURAL ONLY:
[Yes or No is this bill primarily a resolution, designation, or awareness declaration with no operative effect?]
LENGTH TARGET: 150-250 words total. Be ruthless about cutting. Density over completeness."""
SUMMARIZATION_USER_TEMPLATE = """Summarize the following Congressional bill according to your instructions.
BILL TEXT:
{text_content}"""
+202
View File
@@ -0,0 +1,202 @@
"""WorkOS AuthKit helpers for the FastAPI web app."""
from __future__ import annotations
from dataclasses import dataclass
from functools import lru_cache
from os import getenv
from typing import Any
from fastapi import Request
from workos import WorkOSClient
from workos.session import seal_session_from_auth_response
@dataclass(frozen=True)
class AuthConfig:
"""Runtime configuration for WorkOS AuthKit."""
api_key: str
client_id: str
cookie_password: str
redirect_uri: str
logout_redirect_uri: str
session_cookie_name: str
organization_id: str
@property
def secure_cookies(self) -> bool:
return self.redirect_uri.startswith("https://")
@dataclass(frozen=True)
class AuthSession:
"""Normalized auth session passed through the app."""
user_id: str
email: str
first_name: str | None
last_name: str | None
role_slugs: set[str]
organization_id: str | None
raw_user: Any
raw_session: Any
@property
def display_name(self) -> str:
parts = [part for part in (self.first_name, self.last_name) if part]
return " ".join(parts) if parts else self.email
@property
def is_admin(self) -> bool:
return "admin" in self.role_slugs
@dataclass(frozen=True)
class CallbackResult:
"""Result of exchanging a WorkOS callback code."""
sealed_session: str
next_path: str
def safe_next_path(value: str | None, default: str = "/dashboard") -> str:
"""Allow only local relative redirect targets."""
if value and value.startswith("/") and not value.startswith("//"):
return value
return default
def build_authorization_url(next_path: str) -> str:
"""Build the WorkOS hosted login URL."""
config = get_auth_config()
return get_workos_client().user_management.get_authorization_url(
provider="authkit",
redirect_uri=config.redirect_uri,
state=safe_next_path(next_path),
organization_id=config.organization_id,
)
def exchange_code(request: Request) -> CallbackResult:
"""Exchange a WorkOS callback code for a sealed session cookie value."""
code = request.query_params.get("code")
if not code:
raise ValueError("Missing authentication code.")
config = get_auth_config()
auth_response = get_workos_client().user_management.authenticate_with_code(
code=code,
ip_address=_request_ip(request),
user_agent=request.headers.get("user-agent"),
)
sealed_session = seal_session_from_auth_response(
access_token=auth_response.access_token,
refresh_token=auth_response.refresh_token,
user=auth_response.user.to_dict(),
impersonator=auth_response.impersonator.to_dict()
if auth_response.impersonator is not None
else None,
cookie_password=config.cookie_password,
)
return CallbackResult(
sealed_session=sealed_session,
next_path=safe_next_path(request.query_params.get("state")),
)
def get_current_session(request: Request) -> AuthSession | None:
"""Load the current signed-in WorkOS session from the sealed cookie."""
cookie_name = getenv("WORKOS_SESSION_COOKIE_NAME", "workos_session")
sealed_session = request.cookies.get(cookie_name)
if not sealed_session:
return None
config = get_auth_config()
session = get_workos_client().user_management.load_sealed_session(
session_data=sealed_session,
cookie_password=config.cookie_password,
)
auth_response = session.authenticate()
if not getattr(auth_response, "authenticated", False):
return None
user = auth_response.user or {}
organization_id = getattr(auth_response, "organization_id", None)
if config.organization_id and organization_id != config.organization_id:
return None
role_slugs = set(getattr(auth_response, "roles", None) or [])
role = getattr(auth_response, "role", None)
if role:
role_slugs.add(role)
return AuthSession(
user_id=_user_field(user, "id") or "",
email=_user_field(user, "email") or "",
first_name=_user_field(user, "first_name"),
last_name=_user_field(user, "last_name"),
role_slugs=role_slugs,
organization_id=organization_id,
raw_user=user,
raw_session=auth_response,
)
def get_logout_url(request: Request) -> str:
"""Return the WorkOS logout URL for the current sealed session."""
config = get_auth_config()
sealed_session = request.cookies.get(config.session_cookie_name)
if not sealed_session:
return config.logout_redirect_uri
session = get_workos_client().user_management.load_sealed_session(
session_data=sealed_session,
cookie_password=config.cookie_password,
)
return session.get_logout_url(return_to=config.logout_redirect_uri)
@lru_cache(maxsize=1)
def get_auth_config() -> AuthConfig:
"""Load and validate WorkOS environment configuration."""
values = {
"WORKOS_API_KEY": getenv("WORKOS_API_KEY"),
"WORKOS_CLIENT_ID": getenv("WORKOS_CLIENT_ID"),
"WORKOS_COOKIE_PASSWORD": getenv("WORKOS_COOKIE_PASSWORD"),
"WORKOS_ORGANIZATION_ID": getenv("WORKOS_ORGANIZATION_ID"),
}
missing = [name for name, value in values.items() if not value]
if missing:
raise RuntimeError(
"Missing WorkOS configuration: " + ", ".join(sorted(missing))
)
return AuthConfig(
api_key=values["WORKOS_API_KEY"] or "",
client_id=values["WORKOS_CLIENT_ID"] or "",
cookie_password=values["WORKOS_COOKIE_PASSWORD"] or "",
redirect_uri=getenv("WORKOS_REDIRECT_URI", "http://localhost:8000/callback"),
logout_redirect_uri=getenv("WORKOS_LOGOUT_REDIRECT_URI", "http://localhost:8000/"),
session_cookie_name=getenv("WORKOS_SESSION_COOKIE_NAME", "workos_session"),
organization_id=values["WORKOS_ORGANIZATION_ID"] or "",
)
@lru_cache(maxsize=1)
def get_workos_client():
"""Create and cache the WorkOS SDK client."""
config = get_auth_config()
return WorkOSClient(api_key=config.api_key, client_id=config.client_id)
def _request_ip(request: Request) -> str | None:
if request.client is None:
return None
return request.client.host
def _user_field(user: Any, key: str) -> Any:
if isinstance(user, dict):
return user.get(key)
return getattr(user, key, None)
+125 -105
View File
@@ -4,20 +4,17 @@ from __future__ import annotations
from contextlib import asynccontextmanager
from dataclasses import dataclass
import hashlib
import hmac
import logging
from os import getenv
from pathlib import Path
import secrets
from typing import Any
from urllib.parse import parse_qs
from fastapi import Depends, FastAPI, HTTPException, Request, Response, status
from fastapi.responses import HTMLResponse, PlainTextResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pipelines.web import repository
from pipelines.web import auth, repository
from pipelines.web.db import session_scope, validate_database_connection
from pipelines.web.repository import Chamber, RankingResult
from pipelines.web.scoring import normalize_issues
@@ -28,10 +25,7 @@ TEMPLATES_DIR = BASE_DIR / "templates"
STATIC_DIR = BASE_DIR / "static"
templates = Jinja2Templates(directory=TEMPLATES_DIR)
ADMIN_USERNAME = "admin"
ADMIN_PASSWORD = "admin"
SESSION_COOKIE = "nornsight_admin"
SESSION_SECRET = "nornsight-local-dev-session-secret"
logger = logging.getLogger(__name__)
@asynccontextmanager
@@ -62,72 +56,69 @@ def healthz() -> str:
return "ok"
@app.get("/login", response_class=HTMLResponse)
@app.get("/", response_class=HTMLResponse)
def home(request: Request) -> Response:
"""Render the public home page."""
current_user = auth.get_current_session(request)
return templates.TemplateResponse(
request,
"home.html",
{
**_auth_context(current_user),
"auth_error": request.query_params.get("auth_error") == "1",
},
)
@app.get("/login")
def login(request: Request) -> Response:
"""Render the integrated login page."""
next_path = _safe_next_path(request.query_params.get("next"))
if _authenticated_user(request) is not None:
"""Start the WorkOS hosted login flow."""
next_path = auth.safe_next_path(request.query_params.get("next"))
current_user = auth.get_current_session(request)
if current_user is not None:
return RedirectResponse(next_path, status_code=status.HTTP_303_SEE_OTHER)
return templates.TemplateResponse(
request,
"login.html",
{
"error": "",
"is_authenticated": False,
"show_primary_nav": False,
"next_path": next_path,
"username": "",
},
return RedirectResponse(
auth.build_authorization_url(next_path),
status_code=status.HTTP_303_SEE_OTHER,
)
@app.post("/login", response_class=HTMLResponse)
async def login_submit(request: Request) -> Response:
"""Authenticate the hard-coded admin user and set a session cookie."""
form = parse_qs((await request.body()).decode())
username = form.get("username", [""])[0]
password = form.get("password", [""])[0]
next_path = _safe_next_path(form.get("next", [request.query_params.get("next")])[0])
@app.get("/callback")
def callback(request: Request) -> Response:
"""Exchange the WorkOS code for a sealed session cookie."""
try:
result = auth.exchange_code(request)
except Exception:
logger.exception("WorkOS callback exchange failed.")
response = RedirectResponse("/?auth_error=1", status_code=status.HTTP_303_SEE_OTHER)
_delete_auth_cookie(response)
return response
username_ok = secrets.compare_digest(username, ADMIN_USERNAME)
password_ok = secrets.compare_digest(password, ADMIN_PASSWORD)
if not (username_ok and password_ok):
return templates.TemplateResponse(
request,
"login.html",
{
"error": "Invalid username or password.",
"is_authenticated": False,
"show_primary_nav": False,
"next_path": next_path,
"username": username,
},
status_code=status.HTTP_401_UNAUTHORIZED,
)
response = RedirectResponse(next_path, status_code=status.HTTP_303_SEE_OTHER)
config = auth.get_auth_config()
response = RedirectResponse(result.next_path, status_code=status.HTTP_303_SEE_OTHER)
response.set_cookie(
SESSION_COOKIE,
_sign_session(username),
config.session_cookie_name,
result.sealed_session,
httponly=True,
samesite="lax",
secure=config.secure_cookies,
)
return response
@app.get("/logout")
def logout() -> Response:
"""Clear the local admin session."""
response = RedirectResponse("/login", status_code=status.HTTP_303_SEE_OTHER)
response.delete_cookie(SESSION_COOKIE)
@app.post("/logout")
def logout(request: Request) -> Response:
"""End the WorkOS session and clear the local sealed session cookie."""
response = RedirectResponse(auth.get_logout_url(request), status_code=status.HTTP_303_SEE_OTHER)
_delete_auth_cookie(response)
return response
def require_admin(request: Request) -> str:
"""Redirect unauthenticated users to the in-site login page."""
username = _authenticated_user(request)
if username is not None:
return username
def require_user(request: Request) -> auth.AuthSession:
"""Redirect unauthenticated users to the WorkOS sign-in flow."""
current_user = auth.get_current_session(request)
if current_user is not None:
return current_user
next_path = request.url.path
if request.url.query:
next_path = f"{next_path}?{request.url.query}"
@@ -138,87 +129,64 @@ def require_admin(request: Request) -> str:
)
def _authenticated_user(request: Request) -> str | None:
token = request.cookies.get(SESSION_COOKIE)
if token is None:
return None
try:
username, signature = token.split(":", 1)
except ValueError:
return None
if username != ADMIN_USERNAME:
return None
expected = _session_signature(username)
if secrets.compare_digest(signature, expected):
return username
return None
def require_admin(current_user: auth.AuthSession = Depends(require_user)) -> auth.AuthSession:
"""Restrict a route to WorkOS users with the admin role."""
if current_user.is_admin:
return current_user
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required.")
def _sign_session(username: str) -> str:
return f"{username}:{_session_signature(username)}"
def _session_signature(username: str) -> str:
return hmac.new(
SESSION_SECRET.encode(),
username.encode(),
hashlib.sha256,
).hexdigest()
def _safe_next_path(value: str | None) -> str:
if value and value.startswith("/") and not value.startswith("//"):
return value
return "/"
@app.get("/", response_class=HTMLResponse)
def dashboard(request: Request, _: str = Depends(require_admin)) -> Response:
@app.get("/dashboard", response_class=HTMLResponse)
def dashboard(
request: Request, current_user: auth.AuthSession = Depends(require_user)
) -> Response:
"""Render the full dashboard page."""
context = _dashboard_context(request)
context = {**_auth_context(current_user), **_dashboard_context(request)}
if request.headers.get("hx-request") == "true":
return templates.TemplateResponse(request, "partials/_dashboard.html", context)
return templates.TemplateResponse(request, "dashboard.html", context)
@app.get("/partials/dashboard", response_class=HTMLResponse)
def dashboard_partial(request: Request, _: str = Depends(require_admin)) -> Response:
def dashboard_partial(request: Request, _: auth.AuthSession = Depends(require_user)) -> Response:
"""Render the filter-dependent dashboard body."""
context = _dashboard_context(request)
return templates.TemplateResponse(request, "partials/_dashboard.html", context)
@app.get("/partials/issues", response_class=HTMLResponse)
def issues_partial(request: Request, _: str = Depends(require_admin)) -> Response:
def issues_partial(request: Request, _: auth.AuthSession = Depends(require_user)) -> Response:
"""Render only issue filters."""
context = _dashboard_context(request)
return templates.TemplateResponse(request, "partials/_issue_filters.html", context)
@app.get("/partials/rankings", response_class=HTMLResponse)
def rankings_partial(request: Request, _: str = Depends(require_admin)) -> Response:
def rankings_partial(request: Request, _: auth.AuthSession = Depends(require_user)) -> Response:
"""Render only ranking panels."""
context = _dashboard_context(request)
return templates.TemplateResponse(request, "partials/_rankings.html", context)
@app.get("/partials/chart", response_class=HTMLResponse)
def chart_partial(request: Request, _: str = Depends(require_admin)) -> Response:
def chart_partial(request: Request, _: auth.AuthSession = Depends(require_user)) -> Response:
"""Render only the SVG chart panel."""
context = _dashboard_context(request)
return templates.TemplateResponse(request, "partials/_chart.html", context)
@app.get("/legislators", response_class=HTMLResponse)
def legislators(request: Request, _: str = Depends(require_admin)) -> Response:
def legislators(
request: Request, current_user: auth.AuthSession = Depends(require_user)
) -> Response:
"""Render the legislator profile/search page."""
context = _legislators_context(request)
context = {**_auth_context(current_user), **_legislators_context(request)}
return templates.TemplateResponse(request, "legislators.html", context)
@app.get("/partials/legislator-suggestions", response_class=HTMLResponse)
def legislator_suggestions_partial(
request: Request, _: str = Depends(require_admin)
request: Request, _: auth.AuthSession = Depends(require_user)
) -> Response:
"""Render legislator search suggestions for the HTMX typeahead."""
query = request.query_params.get("q", "").strip()
@@ -238,12 +206,29 @@ def legislator_suggestions_partial(
@app.get("/compare", response_class=HTMLResponse)
def compare(request: Request, _: str = Depends(require_admin)) -> Response:
def compare(
request: Request, current_user: auth.AuthSession = Depends(require_user)
) -> Response:
"""Render the legislator radar comparison page."""
context = _compare_context(request)
context = {**_auth_context(current_user), **_compare_context(request)}
return templates.TemplateResponse(request, "compare.html", context)
@app.get("/admin", response_class=HTMLResponse)
def admin_page(
request: Request, current_user: auth.AuthSession = Depends(require_admin)
) -> Response:
"""Render the admin-only placeholder page."""
return templates.TemplateResponse(
request,
"admin.html",
{
**_auth_context(current_user),
"organization_id": auth.get_auth_config().organization_id,
},
)
def _dashboard_context(request: Request) -> dict[str, Any]:
state = _parse_state(request)
base_context: dict[str, Any] = {
@@ -263,6 +248,7 @@ def _dashboard_context(request: Request) -> dict[str, Any]:
"has_scores": False,
"empty_message": "",
"build_url": _build_url,
"build_dashboard_partial_url": _build_dashboard_partial_url,
"toggle_compare": _toggle_compare,
}
with session_scope() as session:
@@ -520,10 +506,29 @@ def _build_url(
for legislator_id in chosen_compare:
params.append(("compare", str(legislator_id)))
if not params:
return "/"
return "/dashboard"
from urllib.parse import urlencode
return f"/?{urlencode(params, doseq=True)}"
return f"/dashboard?{urlencode(params, doseq=True)}"
def _build_dashboard_partial_url(
request: Request,
*,
issues: list[str] | None = None,
chamber: str | None = None,
congress: int | None = None,
compare: list[int] | None = None,
) -> str:
"""Return the HTMX endpoint matching the current dashboard query state."""
dashboard_url = _build_url(
request,
issues=issues,
chamber=chamber,
congress=congress,
compare=compare,
)
return dashboard_url.replace("/dashboard", "/partials/dashboard", 1)
def _toggle_compare(compare: list[int], legislator_id: int) -> list[int]:
@@ -587,3 +592,18 @@ def _build_compare_url(
if q:
params.append(("q", q))
return f"/compare?{urlencode(params, doseq=True)}" if params else "/compare"
def _auth_context(current_user: auth.AuthSession | None) -> dict[str, Any]:
"""Shared template context for auth-aware navigation."""
return {
"is_authenticated": current_user is not None,
"is_admin": current_user.is_admin if current_user is not None else False,
"current_user_name": current_user.display_name if current_user is not None else "",
"current_user_email": current_user.email if current_user is not None else "",
}
def _delete_auth_cookie(response: Response) -> None:
"""Delete the sealed WorkOS session cookie."""
response.delete_cookie(getenv("WORKOS_SESSION_COOKIE_NAME", "workos_session"))
+161 -1
View File
@@ -152,16 +152,35 @@ a {
text-align: left;
}
.account-menu-panel form {
margin: 0.2rem 0 0;
}
.account-email {
color: var(--muted);
display: block;
font-size: 0.84rem;
padding: 0.3rem 0.1rem 0.55rem;
}
.account-menu-panel .sign-out {
background: #0d5f53;
border-color: #16806f;
color: white;
}
.account-menu-panel .sign-out,
.account-nav .sign-in {
background: #0d5f53;
border-color: #16806f;
border-radius: 7px;
color: white;
cursor: pointer;
display: block;
font: inherit;
min-width: 100%;
padding: 0.55rem 0.8rem;
text-align: left;
}
.shell {
@@ -170,6 +189,140 @@ a {
padding: 1.25rem 1.5rem 2rem;
}
.home-shell {
padding-top: 2rem;
}
.hero-panel,
.home-grid,
.admin-meta,
.admin-actions {
display: grid;
}
.hero-panel {
gap: 1.4rem;
grid-template-columns: minmax(0, 1.5fr) minmax(18rem, 0.85fr);
}
.hero-panel,
.home-card,
.admin-card {
background: color-mix(in srgb, var(--panel) 88%, transparent);
border: 1px solid var(--line);
border-radius: 18px;
}
.hero-copy,
.hero-card,
.home-card,
.admin-card {
padding: 1.5rem 1.6rem;
}
.hero-copy h1 {
font-size: clamp(2.3rem, 4vw, 4.15rem);
letter-spacing: -0.04em;
line-height: 0.96;
margin: 0.2rem 0 0.9rem;
max-width: 12ch;
}
.hero-text {
color: #c0d3cd;
font-size: 1.02rem;
line-height: 1.65;
margin: 0;
max-width: 60ch;
}
.hero-actions,
.admin-actions {
gap: 0.75rem;
}
.hero-actions {
display: flex;
flex-wrap: wrap;
margin-top: 1.35rem;
}
.hero-primary,
.hero-secondary,
.admin-actions a {
border-radius: 999px;
font-weight: 760;
min-height: 2.8rem;
padding: 0.78rem 1.15rem;
}
.hero-primary {
background: linear-gradient(120deg, #0d5f53, #2fbd9f);
color: white;
}
.hero-secondary,
.admin-actions a {
border: 1px solid #1d554c;
color: #d0dfdb;
}
.hero-card h2,
.home-card h2,
.admin-card h2 {
margin-bottom: 0.65rem;
}
.hero-card ul {
color: #c0d3cd;
line-height: 1.6;
margin: 0;
padding-left: 1.1rem;
}
.home-grid {
gap: 1rem;
grid-template-columns: repeat(3, minmax(0, 1fr));
margin-top: 1.25rem;
}
.home-card p,
.admin-card p,
.admin-meta dt {
color: var(--muted);
}
.auth-notice {
margin-bottom: 1rem;
}
.admin-card {
margin-top: 1.25rem;
}
.admin-meta {
gap: 0.85rem;
grid-template-columns: repeat(auto-fit, minmax(14rem, 1fr));
margin: 1.2rem 0 1.1rem;
}
.admin-meta div {
background: rgba(12, 38, 33, 0.78);
border: 1px solid rgba(44, 123, 109, 0.28);
border-radius: 10px;
padding: 0.9rem 1rem;
}
.admin-meta dt {
font-size: 0.84rem;
margin-bottom: 0.35rem;
text-transform: uppercase;
}
.admin-meta dd {
margin: 0;
}
.login-shell {
align-items: center;
display: flex;
@@ -1122,7 +1275,9 @@ h2 {
.rankings-grid,
.topic-panels,
.compare-card,
.login-panel {
.login-panel,
.hero-panel,
.home-grid {
display: block;
}
@@ -1212,4 +1367,9 @@ h2 {
margin-bottom: 1rem;
padding: 0 0 1rem;
}
.hero-card,
.home-card + .home-card {
margin-top: 1rem;
}
}
+36
View File
@@ -0,0 +1,36 @@
{% extends "base.html" %}
{% block title %}Admin Settings{% endblock %}
{% block body %}
<main class="shell">
<section class="page-heading stacked-heading">
<div>
<h1>Admin settings</h1>
<p>Admin-only operational controls for the Nornsight workspace.</p>
</div>
</section>
<section class="admin-card">
<h2>WorkOS-managed access</h2>
<p>
Invitations, Google access, and role assignments are managed in the WorkOS dashboard.
This page confirms that app-level admin gating is active.
</p>
<dl class="admin-meta">
<div>
<dt>Workspace organization</dt>
<dd><code>{{ organization_id }}</code></dd>
</div>
<div>
<dt>Current administrator</dt>
<dd>{{ current_user_email }}</dd>
</div>
</dl>
<div class="admin-actions">
<a href="/dashboard">Return to dashboard</a>
<a href="https://dashboard.workos.com/" rel="noreferrer" target="_blank">Open WorkOS dashboard</a>
</div>
</section>
</main>
{% endblock %}
+15 -5
View File
@@ -15,23 +15,33 @@
</a>
{% if show_primary_nav|default(true) %}
<nav class="primary-nav" aria-label="Primary">
<a href="/">Issues</a>
{% if is_authenticated|default(false) %}
<a href="/dashboard">Dashboard</a>
<a href="/legislators">Legislators</a>
<a href="/compare">Compare</a>
{% if is_admin|default(false) %}
<a href="/admin">Admin</a>
{% endif %}
{% else %}
<a href="/">Overview</a>
{% endif %}
</nav>
{% endif %}
<nav class="account-nav" aria-label="Account">
<a href="#" aria-disabled="true">Help</a>
{% if is_authenticated|default(true) %}
{% if is_authenticated|default(false) %}
<details class="account-menu">
<summary>My account</summary>
<summary>{{ current_user_name or "My account" }}</summary>
<div class="account-menu-panel">
<span class="account-email">{{ current_user_email }}</span>
<a href="#" aria-disabled="true">Account settings</a>
<a class="sign-out" href="/logout">Sign out</a>
<form action="/logout" method="post">
<button class="sign-out" type="submit">Sign out</button>
</form>
</div>
</details>
{% else %}
<a class="sign-in" href="/login">Sign in</a>
<a class="sign-in" href="/login?next=/dashboard">Sign in</a>
{% endif %}
</nav>
</header>
+1
View File
@@ -10,6 +10,7 @@
<p>US legislative accountability · precomputed legislator topic scores{% if latest_score_year %} through {{ latest_score_year }}{% endif %}</p>
</div>
<div class="heading-actions">
<span>{{ current_user_email }}</span>
<a href="#" aria-disabled="true">Methodology</a>
<a href="#" aria-disabled="true">Data sources</a>
<span>Last updated: {{ last_updated.strftime("%b %Y") if last_updated else "Unavailable" }}</span>
+59
View File
@@ -0,0 +1,59 @@
{% extends "base.html" %}
{% block title %}Nornsight | Legislative Accountability{% endblock %}
{% block body %}
<main class="shell home-shell">
{% if auth_error %}
<div class="notice auth-notice">Authentication failed. Try signing in again.</div>
{% endif %}
<section class="hero-panel">
<div class="hero-copy">
<p class="eyebrow">Invite-only access</p>
<h1>Track legislative behavior with role-aware access and shared WorkOS sign-in.</h1>
<p class="hero-text">
Nornsight turns roll-call data into issue-level accountability views for your invited team.
Use the public home page as the front door, then move signed-in users into the dashboard,
legislator search, and comparison tools.
</p>
<div class="hero-actions">
{% if is_authenticated %}
<a class="hero-primary" href="/dashboard">Open dashboard</a>
{% if is_admin %}
<a class="hero-secondary" href="/admin">Admin settings</a>
{% endif %}
{% else %}
<a class="hero-primary" href="/login?next=/dashboard">Sign in</a>
<a class="hero-secondary" href="#access-model">How access works</a>
{% endif %}
</div>
</div>
<aside class="hero-card">
<h2>Launch access model</h2>
<ul>
<li>Public landing page at <code>/</code></li>
<li>Invite-only AuthKit login with Email + Password and Google</li>
<li><code>viewer</code> role for dashboard, legislators, and compare</li>
<li><code>admin</code> role for settings and account administration</li>
</ul>
</aside>
</section>
<section id="access-model" class="home-grid">
<article class="home-card">
<h2>For invited users</h2>
<p>View the dashboard, inspect legislator profiles, and compare issue scoring without sharing a local password.</p>
</article>
<article class="home-card">
<h2>For admins</h2>
<p>Manage invitations and role assignments in WorkOS while the app enforces role-based route access.</p>
</article>
<article class="home-card">
<h2>For rollout</h2>
<p>Authentication is centralized, sessions are sealed, and the old hard-coded admin login is removed.</p>
</article>
</section>
</main>
{% endblock %}
+1 -1
View File
@@ -2,7 +2,7 @@
<header>
<h2>Score history{% if selected_issue_label %} — {{ selected_issue_label }}{% endif %}</h2>
<a href="{{ build_url(request, compare=[]) }}"
hx-get="/partials/dashboard{{ build_url(request, compare=[])|replace('/', '', 1) }}"
hx-get="{{ build_dashboard_partial_url(request, compare=[]) }}"
hx-target="#dashboard-body"
hx-push-url="{{ build_url(request, compare=[]) }}">Clear comparison</a>
</header>
@@ -3,17 +3,17 @@
<div class="chamber-card">
<a class="segment {{ 'active' if chamber == 'house' else '' }}"
href="{{ build_url(request, chamber='house') }}"
hx-get="/partials/dashboard{{ build_url(request, chamber='house')|replace('/', '', 1) }}"
hx-get="{{ build_dashboard_partial_url(request, chamber='house') }}"
hx-target="#dashboard-body"
hx-push-url="{{ build_url(request, chamber='house') }}">House</a>
<a class="segment {{ 'active' if chamber == 'senate' else '' }}"
href="{{ build_url(request, chamber='senate') }}"
hx-get="/partials/dashboard{{ build_url(request, chamber='senate')|replace('/', '', 1) }}"
hx-get="{{ build_dashboard_partial_url(request, chamber='senate') }}"
hx-target="#dashboard-body"
hx-push-url="{{ build_url(request, chamber='senate') }}">Senate</a>
<a class="segment {{ 'active' if chamber == 'all' else '' }}"
href="{{ build_url(request, chamber='all') }}"
hx-get="/partials/dashboard{{ build_url(request, chamber='all')|replace('/', '', 1) }}"
hx-get="{{ build_dashboard_partial_url(request, chamber='all') }}"
hx-target="#dashboard-body"
hx-push-url="{{ build_url(request, chamber='all') }}">All</a>
</div>
@@ -2,10 +2,10 @@
<h2>Issue filters</h2>
<form class="issue-form"
method="get"
action="/"
hx-get="/"
action="/dashboard"
hx-get="/partials/dashboard"
hx-target="#dashboard-body"
hx-push-url="true">
hx-push-url="/dashboard">
<input type="hidden" name="chamber" value="{{ chamber }}">
{% if congress %}
<input type="hidden" name="congress" value="{{ congress }}">
@@ -17,7 +17,7 @@
<span class="chip">
{{ issue }}
<a href="{{ build_url(request, issues=issues[:loop.index0] + issues[loop.index:]) }}"
hx-get="/partials/dashboard{{ build_url(request, issues=issues[:loop.index0] + issues[loop.index:])|replace('/', '', 1) }}"
hx-get="{{ build_dashboard_partial_url(request, issues=issues[:loop.index0] + issues[loop.index:]) }}"
hx-target="#dashboard-body"
hx-push-url="{{ build_url(request, issues=issues[:loop.index0] + issues[loop.index:]) }}"
aria-label="Remove {{ issue }}">×</a>
@@ -36,7 +36,7 @@
{% for suggestion in suggestions %}
{% if suggestion not in issues %}
<a href="{{ build_url(request, issues=issues + [suggestion]) }}"
hx-get="/partials/dashboard{{ build_url(request, issues=issues + [suggestion])|replace('/', '', 1) }}"
hx-get="{{ build_dashboard_partial_url(request, issues=issues + [suggestion]) }}"
hx-target="#dashboard-body"
hx-push-url="{{ build_url(request, issues=issues + [suggestion]) }}">{{ suggestion }}</a>
{% endif %}
@@ -10,7 +10,7 @@
{% set next_compare = toggle_compare(compare, row.legislator_id) %}
<li class="{{ 'selected' if row.legislator_id in compare else '' }}">
<a href="{{ build_url(request, compare=next_compare) }}"
hx-get="/partials/dashboard{{ build_url(request, compare=next_compare)|replace('/', '', 1) }}"
hx-get="{{ build_dashboard_partial_url(request, compare=next_compare) }}"
hx-target="#dashboard-body"
hx-push-url="{{ build_url(request, compare=next_compare) }}">
<span class="rank">{{ loop.index }}</span>
@@ -40,7 +40,7 @@
{% set next_compare = toggle_compare(compare, row.legislator_id) %}
<li class="{{ 'selected' if row.legislator_id in compare else '' }}">
<a href="{{ build_url(request, compare=next_compare) }}"
hx-get="/partials/dashboard{{ build_url(request, compare=next_compare)|replace('/', '', 1) }}"
hx-get="{{ build_dashboard_partial_url(request, compare=next_compare) }}"
hx-target="#dashboard-body"
hx-push-url="{{ build_url(request, compare=next_compare) }}">
<span class="rank">{{ loop.index }}</span>
View File
+28
View File
@@ -0,0 +1,28 @@
[project]
name = "ds-testing-pipelines"
version = "0.1.0"
description = "Data science pipeline tools and legislative dashboard."
requires-python = ">=3.12"
dependencies = [
"alembic",
"fastapi",
"httpx",
"jinja2",
"psycopg",
"sqlalchemy",
"typer",
"uvicorn[standard]",
"workos",
]
[project.optional-dependencies]
test = [
"pytest",
]
[tool.pytest.ini_options]
testpaths = ["tests"]
pythonpath = ["."]
[tool.setuptools.packages.find]
include = ["pipelines*"]
+327
View File
@@ -0,0 +1,327 @@
from __future__ import annotations
from datetime import date
import pytest
from fastapi.testclient import TestClient
from pipelines.web import auth, main
from pipelines.web.repository import (
ChartSeries,
LegislatorOption,
RadarSeries,
RankingResult,
RankingRow,
TimePoint,
)
def test_healthz() -> None:
client = TestClient(main.app)
response = client.get("/healthz")
assert response.status_code == 200
assert response.text == "ok"
def test_public_home_page_renders() -> None:
client = TestClient(main.app)
response = client.get("/")
assert response.status_code == 200
assert "Invite-only access" in response.text
assert "Sign in" in response.text
def test_dashboard_redirects_to_login() -> None:
client = TestClient(main.app)
response = client.get("/dashboard?issues=Health", follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"].endswith(
"/login?next=%2Fdashboard%3Fissues%3DHealth"
)
def test_other_protected_routes_redirect_when_unauthenticated() -> None:
client = TestClient(main.app)
for path in ["/legislators", "/compare", "/admin"]:
response = client.get(path, follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"].endswith(f"/login?next={path.replace('/', '%2F', 1)}")
def test_login_redirects_to_workos(monkeypatch) -> None:
monkeypatch.setattr(main.auth, "get_current_session", lambda request: None)
monkeypatch.setattr(
main.auth,
"build_authorization_url",
lambda next_path: f"https://auth.example/login?state={next_path}",
)
client = TestClient(main.app)
response = client.get("/login?next=/compare", follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "https://auth.example/login?state=/compare"
def test_login_redirects_authenticated_user(monkeypatch) -> None:
monkeypatch.setattr(main.auth, "get_current_session", lambda request: _viewer_session())
client = TestClient(main.app)
response = client.get("/login?next=/compare", follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "/compare"
def test_callback_sets_session_cookie(monkeypatch) -> None:
monkeypatch.setattr(
main.auth,
"exchange_code",
lambda request: auth.CallbackResult(
sealed_session="sealed-session-value", next_path="/dashboard"
),
)
monkeypatch.setattr(main.auth, "get_auth_config", _fake_auth_config)
client = TestClient(main.app)
response = client.get("/callback?code=abc&state=/dashboard", follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "/dashboard"
assert "workos_session=sealed-session-value" in response.headers["set-cookie"]
def test_callback_failure_redirects_home_and_clears_cookie(monkeypatch) -> None:
def raise_exchange_error(request):
raise RuntimeError("bad code")
monkeypatch.setattr(main.auth, "exchange_code", raise_exchange_error)
client = TestClient(main.app)
response = client.get("/callback?code=bad", follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "/?auth_error=1"
assert "workos_session=" in response.headers["set-cookie"]
def test_logout_redirects_to_workos_and_clears_cookie(monkeypatch) -> None:
monkeypatch.setattr(
main.auth,
"get_logout_url",
lambda request: "https://auth.example/logout",
)
client = TestClient(main.app)
response = client.post("/logout", follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "https://auth.example/logout"
assert "workos_session=" in response.headers["set-cookie"]
def test_dashboard_route_renders_with_stubbed_repository(monkeypatch) -> None:
_patch_authenticated_dashboard(monkeypatch, current_user=_viewer_session())
client = TestClient(main.app)
response = client.get("/dashboard?issues=Health&chamber=senate")
assert response.status_code == 200
assert "Legislative accountability" in response.text
assert "Most supportive" in response.text
assert "viewer@nornsight.test" in response.text
assert "/admin" not in response.text
assert '/partials/dashboard?issues=Health&amp;chamber=house' in response.text
assert "/partials/dashboarddashboard?" not in response.text
def test_admin_route_forbids_viewer(monkeypatch) -> None:
monkeypatch.setattr(main.auth, "get_current_session", lambda request: _viewer_session())
client = TestClient(main.app)
response = client.get("/admin")
assert response.status_code == 403
assert response.json()["detail"] == "Admin access required."
def test_admin_route_renders_for_admin(monkeypatch) -> None:
monkeypatch.setattr(main.auth, "get_current_session", lambda request: _admin_session())
monkeypatch.setattr(main.auth, "get_auth_config", _fake_auth_config)
client = TestClient(main.app)
response = client.get("/admin")
assert response.status_code == 200
assert "Admin settings" in response.text
assert "admin@nornsight.test" in response.text
assert "org_test_123" in response.text
def test_compare_page_renders_for_authenticated_user(monkeypatch) -> None:
monkeypatch.setattr(main.auth, "get_current_session", lambda request: _viewer_session())
_patch_compare_page_data(monkeypatch)
client = TestClient(main.app)
response = client.get("/compare")
assert response.status_code == 200
assert "Compare legislators" in response.text
assert "Sanders, B." in response.text
def _viewer_session() -> auth.AuthSession:
return auth.AuthSession(
user_id="user_viewer",
email="viewer@nornsight.test",
first_name="Viewer",
last_name="User",
role_slugs={"viewer"},
organization_id="org_test_123",
raw_user=None,
raw_session=None,
)
def _admin_session() -> auth.AuthSession:
return auth.AuthSession(
user_id="user_admin",
email="admin@nornsight.test",
first_name="Admin",
last_name="User",
role_slugs={"admin", "viewer"},
organization_id="org_test_123",
raw_user=None,
raw_session=None,
)
def _fake_auth_config() -> auth.AuthConfig:
return auth.AuthConfig(
api_key="sk_test",
client_id="client_test",
cookie_password="x" * 32,
redirect_uri="http://localhost:8000/callback",
logout_redirect_uri="http://localhost:8000/",
session_cookie_name="workos_session",
organization_id="org_test_123",
)
def _patch_authenticated_dashboard(monkeypatch, *, current_user: auth.AuthSession) -> None:
monkeypatch.setattr(main.auth, "get_current_session", lambda request: current_user)
class DummySession:
pass
class DummyScope:
def __enter__(self):
return DummySession()
def __exit__(self, exc_type, exc, tb):
return False
rankings = RankingResult(
supportive=[
RankingRow(
legislator_id=1,
display_name="Sanders, B.",
party="I",
state="VT",
chamber="senate",
score=78.0,
supportive=7,
opposed=2,
)
],
opposed=[
RankingRow(
legislator_id=2,
display_name="Cruz, T.",
party="R",
state="TX",
chamber="senate",
score=22.0,
supportive=2,
opposed=7,
)
],
)
history = [
ChartSeries(
legislator_id=1,
label="Sanders, B.",
party="I",
state="VT",
points=[TimePoint(year=2024, score=74.0), TimePoint(year=2025, score=78.0)],
)
]
monkeypatch.setattr(main, "session_scope", lambda: DummyScope())
monkeypatch.setattr(main.repository, "latest_congress", lambda session: 119)
monkeypatch.setattr(main.repository, "has_scores", lambda session: True)
monkeypatch.setattr(main.repository, "latest_score_year", lambda session: 2026)
monkeypatch.setattr(
main.repository, "latest_vote_date", lambda session, congress: date(2026, 1, 15)
)
monkeypatch.setattr(
main.repository,
"issue_suggestions",
lambda session, congress=None, limit=12: ["Health", "Taxation"],
)
monkeypatch.setattr(
main.repository,
"get_rankings",
lambda session, *, issues, chamber, congress: rankings,
)
monkeypatch.setattr(
main.repository,
"get_score_history",
lambda session, *, issues, chamber, congress, legislator_ids: history,
)
def _patch_compare_page_data(monkeypatch) -> None:
class DummySession:
pass
class DummyScope:
def __enter__(self):
return DummySession()
def __exit__(self, exc_type, exc, tb):
return False
legislator = LegislatorOption(
legislator_id=1,
display_name="Sanders, B.",
party="I",
state="VT",
chamber="senate",
)
topics = ["Health", "Taxation", "Energy"]
series = [
RadarSeries(
legislator=legislator,
average_score=77.0,
scores_by_topic={"Health": 82.0, "Taxation": 71.0, "Energy": 78.0},
)
]
monkeypatch.setattr(main, "session_scope", lambda: DummyScope())
monkeypatch.setattr(
main.repository,
"get_compare_defaults",
lambda session: ([1], topics),
)
monkeypatch.setattr(
main.repository,
"get_legislator_options",
lambda session, selected_legislators: [legislator],
)
monkeypatch.setattr(
main.repository,
"get_compare_radar_series",
lambda session, *, legislator_ids, topics: series,
)
monkeypatch.setattr(
main.repository,
"search_legislators",
lambda session, query=None, limit=12: [legislator],
)
monkeypatch.setattr(
main.repository,
"issue_suggestions",
lambda session, congress=None, limit=12: topics,
)