19 Commits

Author SHA1 Message Date
Richie d3fe6dba56 allowing multiple summaries per bill text 2026-05-08 18:30:07 -04:00
Richie de9e59b5f4 Merge pull request 'added bert_topic train.py and infer.py' (#3) from feature/added-bert_topic-train.py-and-infer.py into main
Reviewed-on: #3
2026-05-02 20:59:08 -04:00
Richie 2034a760c9 adding missig @dataclass to config.py 2026-05-02 20:58:33 -04:00
Richie 45bdd7b629 added bert_topic train.py and infer.py 2026-04-28 23:07:41 -04:00
Richie b5f2df6ae5 Merge pull request 'feature/finishing-migration-of-work' (#9) from feature/finishing-migration-of-work into main
Reviewed-on: #9
2026-04-28 23:05:56 -04:00
Richie 21448eb515 updated __init__.py 2026-04-28 23:02:31 -04:00
Richie 28993213af fixed pyproject.toml 2026-04-28 23:02:18 -04:00
Richie d4c587362d remoed old prompts 2026-04-28 23:01:54 -04:00
Richie d0e865ffbd added congress_vote_context.py ingest_congress.py ingest_posts.py to jobs dir 2026-04-28 23:01:38 -04:00
Richie 297d9ce89b Merge pull request 'adding website' (#8) from feature/adding-website into main
Reviewed-on: #8
2026-04-28 22:51:27 -04:00
Richie 72eb2d8c3d adding website 2026-04-28 22:50:53 -04:00
Richie e75c077e16 Merge pull request 'featre/cleaning-up-tools' (#7) from featre/cleaning-up-tools into main
Reviewed-on: #7
2026-04-28 22:49:25 -04:00
Richie 37fb68ac7e fixed prompt_bench.py 2026-04-28 22:49:25 -04:00
Richie e8bafbd589 converting tools to get_config_dir/path 2026-04-28 22:49:25 -04:00
Richie caff8724af Merge pull request 'adding calculate_legislator_scores.py summarize_bills.py and extract_bill_topics.py' (#6) from feature/making-jobs-dir into main
Reviewed-on: #6
2026-04-28 22:45:53 -04:00
Richie e1beffef12 adding calculate_legislator_scores.py summarize_bills.py and extract_bill_topics.py 2026-04-28 22:45:53 -04:00
Richie 2facb82bd4 Merge pull request 'moving config.py and adding OpenAIConfig' (#5) from feature/moving-config.py-and-adding-OpenAIConfig into main
Reviewed-on: #5
2026-04-28 22:43:26 -04:00
Richie 8d5a6e202b moving config.py and adding OpenAIConfig 2026-04-28 22:43:02 -04:00
Richie f32c895561 Merge pull request 'moved containers dir and created docker_files dir' (#4) from feature/creating-docker_files-dir into main
Reviewed-on: #4
2026-04-28 22:37:23 -04:00
40 changed files with 7654 additions and 151 deletions
@@ -0,0 +1,211 @@
"""move bill text summaries into a child table.
Revision ID: 4b2e1c9d8f70
Revises: b9360b0b0c22
Create Date: 2026-05-03 00:00:00.000000
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "4b2e1c9d8f70"
down_revision: str | None = "b9360b0b0c22"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
op.create_table(
"bill_text_summary",
sa.Column("bill_text_id", sa.Integer(), nullable=False),
sa.Column("summary", sa.String(), nullable=False),
sa.Column("summarization_model", sa.String(), nullable=True),
sa.Column("summarization_user_prompt_version", sa.String(), nullable=True),
sa.Column("summarization_system_prompt_version", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["bill_text_id"],
[f"{schema}.bill_text.id"],
name=op.f("fk_bill_text_summary_bill_text_id_bill_text"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_bill_text_summary")),
schema=schema,
)
op.create_index(
"ix_bill_text_summary_bill_text_id",
"bill_text_summary",
["bill_text_id"],
unique=False,
schema=schema,
)
op.create_index(
"ix_bill_text_summary_bill_text_id_created",
"bill_text_summary",
["bill_text_id", "created"],
unique=False,
schema=schema,
)
op.add_column(
"bill_text",
sa.Column("primary_summary_id", sa.Integer(), nullable=True),
schema=schema,
)
op.create_foreign_key(
op.f("fk_bill_text_primary_summary_id_bill_text_summary"),
"bill_text",
"bill_text_summary",
["primary_summary_id"],
["id"],
source_schema=schema,
referent_schema=schema,
ondelete="SET NULL",
)
op.execute(
sa.text(
f"""
INSERT INTO {schema}.bill_text_summary (
bill_text_id,
summary,
summarization_model,
summarization_user_prompt_version,
summarization_system_prompt_version,
created,
updated
)
SELECT
bill_text.id,
bill_text.summary,
bill_text.summarization_model,
bill_text.summarization_user_prompt_version,
bill_text.summarization_system_prompt_version,
COALESCE(bill_text.updated, bill_text.created, now()),
COALESCE(bill_text.updated, bill_text.created, now())
FROM {schema}.bill_text
WHERE bill_text.summary IS NOT NULL
AND btrim(bill_text.summary) <> ''
"""
)
)
op.drop_column("bill_text", "summary", schema=schema)
op.drop_column("bill_text", "summarization_model", schema=schema)
op.drop_column("bill_text", "summarization_user_prompt_version", schema=schema)
op.drop_column("bill_text", "summarization_system_prompt_version", schema=schema)
def downgrade() -> None:
"""Downgrade."""
op.add_column(
"bill_text",
sa.Column("summarization_system_prompt_version", sa.String(), nullable=True),
schema=schema,
)
op.add_column(
"bill_text",
sa.Column("summarization_user_prompt_version", sa.String(), nullable=True),
schema=schema,
)
op.add_column(
"bill_text",
sa.Column("summarization_model", sa.String(), nullable=True),
schema=schema,
)
op.add_column(
"bill_text",
sa.Column("summary", sa.String(), nullable=True),
schema=schema,
)
op.execute(
sa.text(
f"""
WITH ranked AS (
SELECT
bts.*,
row_number() OVER (
PARTITION BY bts.bill_text_id
ORDER BY bts.created DESC, bts.id DESC
) AS rn
FROM {schema}.bill_text_summary AS bts
),
chosen AS (
SELECT
bill_text.id AS bill_text_id,
COALESCE(ps.summary, ls.summary) AS summary,
COALESCE(
ps.summarization_model,
ls.summarization_model
) AS summarization_model,
COALESCE(
ps.summarization_user_prompt_version,
ls.summarization_user_prompt_version
) AS summarization_user_prompt_version,
COALESCE(
ps.summarization_system_prompt_version,
ls.summarization_system_prompt_version
) AS summarization_system_prompt_version
FROM {schema}.bill_text
LEFT JOIN {schema}.bill_text_summary AS ps
ON ps.id = bill_text.primary_summary_id
LEFT JOIN ranked AS ls
ON ls.bill_text_id = bill_text.id
AND ls.rn = 1
)
UPDATE {schema}.bill_text
SET
summary = chosen.summary,
summarization_model = chosen.summarization_model,
summarization_user_prompt_version = chosen.summarization_user_prompt_version,
summarization_system_prompt_version = chosen.summarization_system_prompt_version
FROM chosen
WHERE chosen.bill_text_id = bill_text.id
"""
)
)
op.drop_constraint(
op.f("fk_bill_text_primary_summary_id_bill_text_summary"),
"bill_text",
schema=schema,
type_="foreignkey",
)
op.drop_column("bill_text", "primary_summary_id", schema=schema)
op.drop_index(
"ix_bill_text_summary_bill_text_id_created",
table_name="bill_text_summary",
schema=schema,
)
op.drop_index(
"ix_bill_text_summary_bill_text_id",
table_name="bill_text_summary",
schema=schema,
)
op.drop_table("bill_text_summary", schema=schema)
-89
View File
@@ -1,89 +0,0 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import tomllib
@dataclass
class LoraConfig:
"""LoRA adapter hyperparameters."""
rank: int
alpha: int
dropout: float
targets: list[str]
@dataclass
class TrainingConfig:
"""Training loop hyperparameters."""
learning_rate: float
epochs: int
batch_size: int
gradient_accumulation: int
max_seq_length: int
warmup_ratio: float
weight_decay: float
logging_steps: int
save_steps: int
@dataclass
class FinetuneConfig:
"""Top-level finetune configuration."""
base_model: str
lora: LoraConfig
training: TrainingConfig
@classmethod
def from_toml(cls, config_path: Path) -> FinetuneConfig:
"""Load finetune config from a TOML file."""
raw = tomllib.loads(config_path.read_text())["finetune"]
return cls(
base_model=raw["base_model"],
lora=LoraConfig(**raw["lora"]),
training=TrainingConfig(**raw["training"]),
)
class BenchmarkConfig:
"""Top-level benchmark configuration loaded from TOML."""
models: list[str]
model_dir: str
port: int
gpu_memory_utilization: float
temperature: float
timeout: int
concurrency: int
vllm_startup_timeout: int
@classmethod
def from_toml(cls, config_path: Path) -> BenchmarkConfig:
"""Load benchmark config from a TOML file."""
raw = tomllib.loads(config_path.read_text())["bench"]
return cls(**raw)
def get_config_dir() -> Path:
"""Get the path to the config file."""
return Path(__file__).resolve().parent.parent.parent / "config"
def default_config_path() -> Path:
"""Get the path to the config file."""
return get_config_dir() / "config.toml"
def get_finetune_config(config_path: Path | None = None) -> FinetuneConfig:
if config_path is None:
config_path = default_config_path()
return FinetuneConfig.from_toml(config_path)
def get_benchmark_config(config_path: Path | None = None) -> BenchmarkConfig:
if config_path is None:
config_path = default_config_path()
return BenchmarkConfig.from_toml(config_path)
+1 -1
View File
@@ -1 +1 @@
"""Prompt benchmarking system for evaluating LLMs via vLLM."""
"""Init."""
+116
View File
@@ -0,0 +1,116 @@
"""Nornsight — BERTopic POC Inference Script.
Loads the trained model and labels a small batch of posts,
writing results to main.post_topic for inspection.
POC: processes a single batch of 1k posts to validate the pipeline end-to-end.
"""
from __future__ import annotations
import logging
import time
from collections import Counter
from pathlib import Path
from bertopic import BERTopic
from sqlalchemy import Engine, func, insert, select
from sqlalchemy.orm import Session
from pipelines.config import BertTopicInferConfig, get_bertopic_infer_config
from pipelines.orm.common import get_postgres_engine
from pipelines.orm.data_science_dev.posts import PostTopic, Posts
from pipelines.orm.data_science_dev.posts.lang_filters import ENGLISH_LANGS
from pipelines.pipelines.common import configure_logger
logger = logging.getLogger(__name__)
def main() -> None:
"""Run BERTopic inference against a sample of posts."""
configure_logger()
config = get_bertopic_infer_config()
run_inference(config)
logger.info(
"POC inference complete. Check main.post_topic in DBeaver to inspect results."
)
def run_inference(config: BertTopicInferConfig) -> None:
model_save_path = Path(config.model_save_path)
logger.info(f"Loading BERTopic model from {model_save_path}")
topic_model = BERTopic.load(str(model_save_path))
topic_info = topic_model.get_topic_info()
label_map: dict[int, str] = dict(zip(topic_info["Topic"], topic_info["Name"]))
logger.info(f"Model loaded with {len(label_map)} topics")
engine = get_postgres_engine(name="DATA_SCIENCE_DEV")
post_ids, texts = get_post_ids_and_test(engine, config)
logger.info(f"Fetched {len(texts)} posts")
logger.info("Running BERTopic transform")
start = time.perf_counter()
topics, _probabilities = topic_model.transform(texts)
elapsed = time.perf_counter() - start
logger.info(f"Transform complete in {elapsed:.1f}s")
# Write results to main.post_topic
records = [
{
"post_id": pid,
"topic_id": int(topic_id),
"topic_label": label_map.get(int(topic_id), "unknown"),
"model_version": config.model_version,
}
for pid, topic_id in zip(post_ids, topics)
]
with Session(engine) as session:
session.execute(insert(PostTopic), records)
session.commit()
count_topics(records)
logger.info(f"Wrote {len(records)} topic labels to main.post_topic")
def get_post_ids_and_test(
engine: Engine,
config: BertTopicInferConfig,
) -> None | tuple[list[int], list[str]]:
with Session(engine) as session:
logger.info(f"Fetching {config.poc_batch_size} posts for inference")
# Pull a fresh batch for inference — distinct from training sample
# using a fixed seed offset so we're not re-labeling training posts
stmt = select(Posts).where(
Posts.text.is_not(None),
Posts.langs.in_(ENGLISH_LANGS),
func.length(Posts.text) > config.min_text_length,
)
if config.poc_batch_size > 0:
stmt = stmt.limit(config.poc_batch_size)
posts = session.scalars(stmt).all()
if not posts:
logger.warning("No posts were selected for inference")
return [], []
post_ids = [post.post_id for post in posts]
texts = [post.text.strip() for post in posts]
return post_ids, texts
def count_topics(records: list[dict]) -> None:
topic_counts = Counter(record.get("topic_label", "unknown") for record in records)
logger.info("Topic distribution in this batch:")
for label, count in topic_counts.most_common(10):
logger.info(" %s: %d", label, count)
if __name__ == "__main__":
main()
+119
View File
@@ -0,0 +1,119 @@
"""Nornsight — BERTopic POC Training Script.
Pulls a small stratified sample (~11.5k posts) from main.posts,
trains BERTopic with MiniBatchKMeans on Jeeves, and saves the model locally.
POC sample rate: random() < 0.00005 (~0.005% of 230M = ~11.5k posts)
Full training rate will be: random() < 0.005 (~1.08M posts)
"""
from __future__ import annotations
import logging
import time
from pathlib import Path
from bertopic import BERTopic
from sklearn.cluster import MiniBatchKMeans
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from pipelines.config import BertTopicTrainConfig, get_bertopic_train_config
from pipelines.orm.common import get_postgres_engine
from pipelines.orm.data_science_dev.posts import Posts
from pipelines.orm.data_science_dev.posts.lang_filters import ENGLISH_LANGS
from pipelines.pipelines.common import configure_logger
logger = logging.getLogger(__name__)
def main() -> None:
"""Train and persist the BERTopic model."""
configure_logger()
config = get_bertopic_train_config()
docs = load_sample(config)
if not docs:
logger.warning("No training documents were selected")
return
train(docs, config)
logger.info(f"Done. Model saved as version {config.model_version}")
logger.info("Next: run infer.py to label a sample of posts in the database")
def load_sample(config: BertTopicTrainConfig) -> list[str]:
logger.info("Connecting to PostgreSQL via SQLAlchemy")
engine = get_postgres_engine(name="DATA_SCIENCE_DEV")
logger.info(f"Pulling sample from main.posts (sample_rate={config.sample_rate})")
start = time.perf_counter()
with Session(engine) as session:
texts = session.scalars(
select(Posts.text).where(
Posts.text.is_not(None),
Posts.langs.in_(ENGLISH_LANGS),
func.length(Posts.text) > config.min_text_length,
func.random() < config.sample_rate,
)
).all()
elapsed = time.perf_counter() - start
logger.info(f"Fetched {len(texts)} rows in {elapsed:.1f}s")
# Basic cleaning — strip whitespace and deduplicate
docs = list({text.strip() for text in texts})
logger.info(f"After cleaning and dedup: {len(docs)} posts")
return docs
def train(docs: list[str], config: BertTopicTrainConfig) -> None:
logger.info(
f"Initialising BERTopic with MiniBatchKMeans (n_topics={config.n_topics})"
)
cluster_model = MiniBatchKMeans(
n_clusters=config.n_topics,
random_state=42,
batch_size=1024,
n_init=3,
verbose=1,
)
topic_model = BERTopic(
hdbscan_model=cluster_model,
language="english",
calculate_probabilities=False, # saves memory
verbose=True,
)
logger.info(f"Starting fit_transform on {len(docs)} posts (CPU)")
start = time.perf_counter()
topic_model.fit_transform(docs)
elapsed = time.perf_counter() - start
logger.info(f"Training complete in {elapsed:.1f}s ({elapsed / 60:.1f} min)")
# Log topic summary for quick inspection
topic_info = topic_model.get_topic_info()
logger.info(f"Topics found: {len(topic_info)}")
logger.info(f"\n{topic_info.to_string()}")
model_save_path = Path(config.model_save_path)
model_save_path.mkdir(parents=True, exist_ok=True)
logger.info(f"Saving model to {model_save_path}")
topic_model.save(
str(model_save_path),
serialization="safetensors",
save_ctfidf=True,
save_embedding_model=True,
)
logger.info("Model saved")
if __name__ == "__main__":
main()
+186
View File
@@ -0,0 +1,186 @@
from __future__ import annotations
from dataclasses import dataclass
from os import getenv
from datetime import date
from pathlib import Path
import tomllib
@dataclass
class LoraConfig:
"""LoRA adapter hyperparameters."""
rank: int
alpha: int
dropout: float
targets: list[str]
@dataclass
class TrainingConfig:
"""Training loop hyperparameters."""
learning_rate: float
epochs: int
batch_size: int
gradient_accumulation: int
max_seq_length: int
warmup_ratio: float
weight_decay: float
logging_steps: int
save_steps: int
@dataclass
class FinetuneConfig:
"""Top-level finetune configuration."""
base_model: str
lora: LoraConfig
training: TrainingConfig
@classmethod
def from_toml(cls, config_path: Path) -> FinetuneConfig:
"""Load finetune config from a TOML file."""
raw = tomllib.loads(config_path.read_text())["finetune"]
return cls(
base_model=raw["base_model"],
lora=LoraConfig(**raw["lora"]),
training=TrainingConfig(**raw["training"]),
)
@dataclass
class BenchmarkConfig:
"""Top-level benchmark configuration loaded from TOML."""
models: list[str]
model_dir: str
port: int
gpu_memory_utilization: float
temperature: float
timeout: int
concurrency: int
vllm_startup_timeout: int
@classmethod
def from_toml(cls, config_path: Path) -> BenchmarkConfig:
"""Load benchmark config from a TOML file."""
raw = tomllib.loads(config_path.read_text())["bench"]
return cls(**raw)
@dataclass
class OpenAIConfig:
"""OpenAI API configuration."""
api_key: str
openai_project_id: str
openai_chat_completions_url: str
model: str
timeout_seconds: int
@classmethod
def from_toml(cls, config_path: Path) -> OpenAIConfig:
"""Load OpenAI config from a TOML file."""
raw = tomllib.loads(config_path.read_text()).get("openai", {})
api_key = getenv("CLOSEDAI_TOKEN")
if not api_key:
message = "CLOSEDAI_TOKEN is required"
raise KeyError(message)
return cls(
api_key=api_key,
openai_project_id=raw.get(
"openai_project_id", "proj_fQBPEXFgnS87Fk6wZwploFwE"
),
openai_chat_completions_url=raw.get(
"openai_chat_completions_url",
"https://api.openai.com/v1/chat/completions",
),
model=raw.get("model", "gpt-5.4-mini"),
timeout_seconds=raw.get("timeout_seconds", 60),
)
@dataclass
class BertTopicTrainConfig:
"""BERTopic training configuration loaded from TOML."""
sample_rate: float
min_text_length: int
n_topics: int
model_save_path: str
model_version: str | None = None
@classmethod
def from_toml(cls, config_path: Path) -> BertTopicTrainConfig:
"""Load BERTopic training config from a TOML file."""
raw = tomllib.loads(config_path.read_text())["bertopic"]["train"]
today = date.today().isoformat()
if raw.get("model_version") is None:
raw["model_version"] = (
f"{today}-{raw['sample_rate']}-{raw['min_text_length']}-{raw['n_topics']}"
)
return cls(**raw)
@dataclass
class BertTopicInferConfig:
"""BERTopic inference configuration loaded from TOML."""
min_text_length: int
poc_batch_size: int
model_version: str
model_save_path: str
@classmethod
def from_toml(cls, config_path: Path) -> BertTopicInferConfig:
"""Load BERTopic inference config from a TOML file."""
raw = tomllib.loads(config_path.read_text())["bertopic"]["infer"]
return cls(**raw)
def get_config_dir() -> Path:
"""Get the path to the config directory."""
return Path(__file__).resolve().parents[2] / "config"
def default_config_path() -> Path:
"""Get the path to the config file."""
return get_config_dir() / "config.toml"
def get_openai_config(config_path: Path | None = None) -> OpenAIConfig:
if config_path is None:
config_path = default_config_path()
return OpenAIConfig.from_toml(config_path)
def get_finetune_config(config_path: Path | None = None) -> FinetuneConfig:
if config_path is None:
config_path = default_config_path()
return FinetuneConfig.from_toml(config_path)
def get_benchmark_config(config_path: Path | None = None) -> BenchmarkConfig:
if config_path is None:
config_path = default_config_path()
return BenchmarkConfig.from_toml(config_path)
def get_bertopic_train_config(
config_path: Path | None = None,
) -> BertTopicTrainConfig:
if config_path is None:
config_path = default_config_path()
return BertTopicTrainConfig.from_toml(config_path)
def get_bertopic_infer_config(
config_path: Path | None = None,
) -> BertTopicInferConfig:
if config_path is None:
config_path = default_config_path()
return BertTopicInferConfig.from_toml(config_path)
@@ -23,7 +23,7 @@ from sqlalchemy import (
)
from sqlalchemy.orm import Session
from pipelines.congress_vote_context import create_score_run, finalize_score_run
from pipelines.jobs.congress_vote_context import create_score_run, finalize_score_run
from pipelines.orm.common import get_postgres_engine
from pipelines.orm.data_science_dev.congress import (
BillTopic,
@@ -39,7 +39,7 @@ from pipelines.orm.data_science_dev.congress import (
VoteRelationship,
VoteRecord,
)
from pipelines.pipelines.jobs.extract_bill_topics import normalize_topic_label
from pipelines.jobs.extract_bill_topics import normalize_topic_label
from pipelines.web.scoring import (
OPPOSE_POSITIONS,
SUPPORT_POSITIONS,
File diff suppressed because it is too large Load Diff
+26 -12
View File
@@ -19,6 +19,7 @@ from pipelines.orm.common import get_postgres_engine
from pipelines.orm.data_science_dev.congress import (
Bill,
BillText,
BillTextSummary,
BillTopic,
BillTopicPosition,
SubjectType,
@@ -72,11 +73,19 @@ class ExtractedBillTopic:
def _select_bill_text_for_topic_extraction(bill: Bill) -> BillText | None:
"""Pick one summarized bill_text row from the already-loaded relationship."""
for bill_text in bill.bill_texts:
if bill_text.summary and bill_text.summary.strip():
summary_row = bill_text.default_summary()
if summary_row and summary_row.summary.strip():
return bill_text
return None
def _bill_text_has_summary_clause() -> ColumnElement[bool]:
"""Return a correlated EXISTS clause for bill texts with at least one summary."""
return exists(
select(BillTextSummary.id).where(BillTextSummary.bill_text_id == BillText.id)
)
def normalize_topic_label(value: str) -> str:
"""Normalize a topic label for storage, comparison, and de-duping."""
normalized = value.strip().strip("\"'")
@@ -323,11 +332,7 @@ def create_select_bills_for_topic_extraction(
limit: int | None = None,
) -> Select[tuple[Bill]]:
"""Select bill rows that have summarized bill_text rows for topic extraction."""
has_summary = (BillText.summary.is_not(None), BillText.summary != "")
summarized_text_filters: list[ColumnElement[bool]] = [
BillText.bill_id == Bill.id,
*has_summary,
]
summarized_text_filters: list[ColumnElement[bool]] = [_bill_text_has_summary_clause()]
if with_votes_only:
summarized_text_filters.append(
exists(
@@ -347,11 +352,17 @@ def create_select_bills_for_topic_extraction(
)
)
)
summarized_text_exists = exists(select(BillText.id).where(*summarized_text_filters))
summarized_text_exists = exists(
select(BillText.id).where(BillText.bill_id == Bill.id, *summarized_text_filters)
)
bill_text_loader = selectinload(Bill.bill_texts.and_(*summarized_text_filters))
stmt = (
select(Bill)
.where(summarized_text_exists)
.options(selectinload(Bill.bill_texts.and_(*summarized_text_filters[1:])))
.options(
bill_text_loader.selectinload(BillText.summaries),
bill_text_loader.selectinload(BillText.primary_summary),
)
.order_by(Bill.id)
)
if congress is not None:
@@ -363,7 +374,7 @@ def create_select_bills_for_topic_extraction(
select(BillText.id).where(
BillText.bill_id == Bill.id,
BillText.id.in_(bill_text_ids),
*summarized_text_filters[1:],
*summarized_text_filters,
)
)
stmt = stmt.where(selected_text_exists)
@@ -416,8 +427,7 @@ def collect_topic_extraction_diagnostics(
)
)
has_summary = (BillText.summary.is_not(None), BillText.summary != "")
summary_filters = [*bill_text_filters, *has_summary]
summary_filters = [*bill_text_filters, _bill_text_has_summary_clause()]
bills_with_summaries = session.scalar(
select(func.count(func.distinct(Bill.id)))
@@ -607,7 +617,11 @@ def main(
if bill_text is None:
logger.warning("Skipping bill id=%s: no usable summary", bill.id)
continue
summary = bill_text.summary.strip()
summary_row = bill_text.default_summary()
if summary_row is None:
logger.warning("Skipping bill id=%s: no default summary", bill.id)
continue
summary = summary_row.summary.strip()
try:
extracted_topics = extract_topics_for_bill_text(
File diff suppressed because it is too large Load Diff
+281
View File
@@ -0,0 +1,281 @@
"""Ingestion pipeline for loading JSONL post files into the weekly-partitioned posts table.
Usage:
ingest-posts /path/to/files/
ingest-posts /path/to/single_file.jsonl
ingest-posts /data/dir/ --workers 4 --batch-size 5000
"""
from __future__ import annotations
import logging
from datetime import UTC, datetime
from pathlib import Path # noqa: TC003 this is needed for typer
from typing import TYPE_CHECKING, Annotated
import orjson
import psycopg
import typer
from pipelines.pipelines.common import configure_logger
from pipelines.orm.common import get_connection_info
from pipelines.pipelines.parallelize import parallelize_process
if TYPE_CHECKING:
from collections.abc import Iterator
logger = logging.getLogger(__name__)
app = typer.Typer(help="Ingest JSONL post files into the partitioned posts table.")
@app.command()
def main(
path: Annotated[
Path,
typer.Argument(help="Directory containing JSONL files, or a single JSONL file"),
],
batch_size: Annotated[int, typer.Option(help="Rows per INSERT batch")] = 10000,
workers: Annotated[
int, typer.Option(help="Parallel workers for multi-file ingestion")
] = 4,
pattern: Annotated[
str, typer.Option(help="Glob pattern for JSONL files")
] = "*.jsonl",
) -> None:
"""Ingest JSONL post files into the weekly-partitioned posts table."""
configure_logger(level="INFO")
logger.info("starting ingest-posts")
logger.info(
"path=%s batch_size=%d workers=%d pattern=%s",
path,
batch_size,
workers,
pattern,
)
if path.is_file():
ingest_file(path, batch_size=batch_size)
elif path.is_dir():
ingest_directory(
path, batch_size=batch_size, max_workers=workers, pattern=pattern
)
else:
typer.echo(f"Path does not exist: {path}", err=True)
raise typer.Exit(code=1)
logger.info("ingest-posts done")
def ingest_directory(
directory: Path,
*,
batch_size: int,
max_workers: int,
pattern: str = "*.jsonl",
) -> None:
"""Ingest all JSONL files in a directory using parallel workers."""
files = sorted(directory.glob(pattern))
if not files:
logger.warning("No JSONL files found in %s", directory)
return
logger.info("Found %d JSONL files to ingest", len(files))
kwargs_list = [{"path": fp, "batch_size": batch_size} for fp in files]
parallelize_process(ingest_file, kwargs_list, max_workers=max_workers)
SCHEMA = "main"
COLUMNS = (
"post_id",
"user_id",
"instance",
"date",
"text",
"langs",
"like_count",
"reply_count",
"repost_count",
"reply_to",
"replied_author",
"thread_root",
"thread_root_author",
"repost_from",
"reposted_author",
"quotes",
"quoted_author",
"labels",
"sent_label",
"sent_score",
)
INSERT_FROM_STAGING = f"""
INSERT INTO {SCHEMA}.posts ({", ".join(COLUMNS)})
SELECT {", ".join(COLUMNS)} FROM pg_temp.staging
ON CONFLICT (post_id, date) DO NOTHING
""" # noqa: S608
FAILED_INSERT = f"""
INSERT INTO {SCHEMA}.failed_ingestion (raw_line, error)
VALUES (%(raw_line)s, %(error)s)
""" # noqa: S608
def get_psycopg_connection() -> psycopg.Connection:
"""Create a raw psycopg3 connection from environment variables."""
database, host, port, username, password = get_connection_info("DATA_SCIENCE_DEV")
return psycopg.connect(
dbname=database,
host=host,
port=int(port),
user=username,
password=password,
autocommit=False,
)
def ingest_file(path: Path, *, batch_size: int) -> None:
"""Ingest a single JSONL file into the posts table."""
log_trigger = max(100_000 // batch_size, 1)
failed_lines: list[dict] = []
try:
with get_psycopg_connection() as connection:
for index, batch in enumerate(
read_jsonl_batches(path, batch_size, failed_lines), 1
):
ingest_batch(connection, batch)
if index % log_trigger == 0:
logger.info(
"Ingested %d batches (%d rows) from %s",
index,
index * batch_size,
path,
)
if failed_lines:
logger.warning(
"Recording %d malformed lines from %s", len(failed_lines), path.name
)
with connection.cursor() as cursor:
cursor.executemany(FAILED_INSERT, failed_lines)
connection.commit()
except Exception:
logger.exception("Failed to ingest file: %s", path)
raise
def ingest_batch(connection: psycopg.Connection, batch: list[dict]) -> None:
"""COPY batch into a temp staging table, then INSERT ... ON CONFLICT into posts."""
if not batch:
return
try:
with connection.cursor() as cursor:
cursor.execute(f"""
CREATE TEMP TABLE IF NOT EXISTS staging
(LIKE {SCHEMA}.posts INCLUDING DEFAULTS)
ON COMMIT DELETE ROWS
""")
cursor.execute("TRUNCATE pg_temp.staging")
with cursor.copy(
f"COPY pg_temp.staging ({', '.join(COLUMNS)}) FROM STDIN"
) as copy:
for row in batch:
copy.write_row(tuple(row.get(column) for column in COLUMNS))
cursor.execute(INSERT_FROM_STAGING)
connection.commit()
except Exception as error:
connection.rollback()
if len(batch) == 1:
logger.exception("Skipping bad row post_id=%s", batch[0].get("post_id"))
with connection.cursor() as cursor:
cursor.execute(
FAILED_INSERT,
{
"raw_line": orjson.dumps(batch[0], default=str).decode(),
"error": str(error),
},
)
connection.commit()
return
midpoint = len(batch) // 2
ingest_batch(connection, batch[:midpoint])
ingest_batch(connection, batch[midpoint:])
def read_jsonl_batches(
file_path: Path, batch_size: int, failed_lines: list[dict]
) -> Iterator[list[dict]]:
"""Stream a JSONL file and yield batches of transformed rows."""
batch: list[dict] = []
with file_path.open("r", encoding="utf-8") as handle:
for raw_line in handle:
line = raw_line.strip()
if not line:
continue
batch.extend(parse_line(line, file_path, failed_lines))
if len(batch) >= batch_size:
yield batch
batch = []
if batch:
yield batch
def parse_line(line: str, file_path: Path, failed_lines: list[dict]) -> Iterator[dict]:
"""Parse a JSONL line, handling concatenated JSON objects."""
try:
yield transform_row(orjson.loads(line))
except orjson.JSONDecodeError:
if "}{" not in line:
logger.warning(
"Skipping malformed line in %s: %s", file_path.name, line[:120]
)
failed_lines.append({"raw_line": line, "error": "malformed JSON"})
return
fragments = line.replace("}{", "}\n{").split("\n")
for fragment in fragments:
try:
yield transform_row(orjson.loads(fragment))
except (orjson.JSONDecodeError, KeyError, ValueError) as error:
logger.warning(
"Skipping malformed fragment in %s: %s",
file_path.name,
fragment[:120],
)
failed_lines.append({"raw_line": fragment, "error": str(error)})
except Exception as error:
logger.exception("Skipping bad row in %s: %s", file_path.name, line[:120])
failed_lines.append({"raw_line": line, "error": str(error)})
def transform_row(raw: dict) -> dict:
"""Transform a raw JSONL row into a dict matching the Posts table columns."""
raw["date"] = parse_date(raw["date"])
if raw.get("langs") is not None:
raw["langs"] = orjson.dumps(raw["langs"])
if raw.get("text") is not None:
raw["text"] = raw["text"].replace("\x00", "")
return raw
def parse_date(raw_date: int) -> datetime:
"""Parse compact YYYYMMDDHHmm integer into a naive datetime (input is UTC by spec)."""
return datetime(
raw_date // 100000000,
(raw_date // 1000000) % 100,
(raw_date // 10000) % 100,
(raw_date // 100) % 100,
raw_date % 100,
tzinfo=UTC,
)
if __name__ == "__main__":
app()
+23 -9
View File
@@ -9,7 +9,7 @@ from typing import Annotated, Any
import httpx
import typer
from sqlalchemy import Select, exists, or_, select
from sqlalchemy import Select, exists, select
from sqlalchemy.orm import Session, selectinload
from tiktoken import get_encoding
@@ -20,6 +20,7 @@ from pipelines.orm.common import get_postgres_engine
from pipelines.orm.data_science_dev.congress import (
Bill,
BillText,
BillTextSummary,
SubjectType,
VoteClassification,
VoteRelationship,
@@ -112,7 +113,7 @@ def summarize_bill_text(
model: str,
bill_text: BillText,
summarization_prompts: dict[str, str],
) -> str:
) -> str | None:
"""Generate and return a summary for one bill_text row."""
messages, user_prompt_tokens = build_bill_summary_messages(
bill_text=bill_text,
@@ -136,15 +137,21 @@ def summarize_bill_text(
def store_bill_summary_result(
*,
session: Session,
bill_text: BillText,
summary: str,
model: str,
) -> None:
) -> BillTextSummary:
"""Store a generated summary and the prompt/model metadata that produced it."""
bill_text.summary = summary
bill_text.summarization_model = model
bill_text.summarization_system_prompt_version = "v1.2"
bill_text.summarization_user_prompt_version = "v1"
summary_row = BillTextSummary(
bill_text=bill_text,
summary=summary,
summarization_model=model,
summarization_system_prompt_version="v1.2",
summarization_user_prompt_version="v1",
)
session.add(summary_row)
return summary_row
def create_select_bill_texts_for_summarization(
@@ -154,7 +161,7 @@ def create_select_bill_texts_for_summarization(
with_votes_only: bool = False,
force: bool = False,
limit: int | None = None,
) -> Select:
) -> Select[tuple[BillText]]:
"""Select bill_text rows that have source text and need summaries."""
stmt = (
select(BillText)
@@ -189,7 +196,13 @@ def create_select_bill_texts_for_summarization(
)
)
if not force:
stmt = stmt.where(or_(BillText.summary.is_(None), BillText.summary == ""))
stmt = stmt.where(
~exists(
select(BillTextSummary.id).where(
BillTextSummary.bill_text_id == BillText.id
)
)
)
if limit is not None:
stmt = stmt.limit(limit)
return stmt
@@ -287,6 +300,7 @@ def main(
logger.warning("Skipping bill_text id=%s", bill_text.id)
continue
store_bill_summary_result(
session=session,
bill_text=bill_text,
summary=summary,
model=model,
@@ -6,6 +6,7 @@ from pipelines.orm.data_science_dev.congress.bill import (
BillActionRecordedVote,
BillRelation,
BillText,
BillTextSummary,
BillTopic,
BillTopicPosition,
)
@@ -54,6 +55,7 @@ __all__ = [
"BillActionRecordedVote",
"BillRelation",
"BillText",
"BillTextSummary",
"BillTopic",
"BillTopicPosition",
"ClassificationMethod",
@@ -105,13 +105,12 @@ class BillText(DataScienceDevTableBase):
)
bill_id: Mapped[int] = mapped_column(ForeignKey("main.bill.id", ondelete="CASCADE"))
primary_summary_id: Mapped[int | None] = mapped_column(
ForeignKey("main.bill_text_summary.id", ondelete="SET NULL")
)
version_code: Mapped[str]
version_name: Mapped[str | None]
text_content: Mapped[str | None]
summary: Mapped[str | None]
summarization_model: Mapped[str | None]
summarization_user_prompt_version: Mapped[str | None]
summarization_system_prompt_version: Mapped[str | None]
date: Mapped[date | None]
source_datetime_raw: Mapped[str | None]
text_url_xml: Mapped[str | None]
@@ -122,6 +121,57 @@ class BillText(DataScienceDevTableBase):
)
bill: Mapped[Bill] = relationship("Bill", back_populates="bill_texts")
summaries: Mapped[list[BillTextSummary]] = relationship(
"BillTextSummary",
back_populates="bill_text",
cascade="all, delete-orphan",
foreign_keys="BillTextSummary.bill_text_id",
order_by=lambda: (
BillTextSummary.created.desc(),
BillTextSummary.id.desc(),
),
)
primary_summary: Mapped[BillTextSummary | None] = relationship(
"BillTextSummary",
foreign_keys=[primary_summary_id],
post_update=True,
)
def latest_summary(self) -> BillTextSummary | None:
"""Return the newest summary row for this bill text."""
return self.summaries[0] if self.summaries else None
def default_summary(self) -> BillTextSummary | None:
"""Return the primary summary when set, otherwise the newest summary."""
return self.primary_summary or self.latest_summary()
class BillTextSummary(DataScienceDevTableBase):
"""Stores one generated summary for a bill text version."""
__tablename__ = "bill_text_summary"
__table_args__ = (
Index("ix_bill_text_summary_bill_text_id", "bill_text_id"),
Index(
"ix_bill_text_summary_bill_text_id_created",
"bill_text_id",
"created",
),
)
bill_text_id: Mapped[int] = mapped_column(
ForeignKey("main.bill_text.id", ondelete="CASCADE")
)
summary: Mapped[str]
summarization_model: Mapped[str | None]
summarization_user_prompt_version: Mapped[str | None]
summarization_system_prompt_version: Mapped[str | None]
bill_text: Mapped[BillText] = relationship(
"BillText",
back_populates="summaries",
foreign_keys=[bill_text_id],
)
class BillAction(DataScienceDevTableBase):
+2
View File
@@ -11,6 +11,7 @@ from pipelines.orm.data_science_dev.congress import (
BillActionRecordedVote,
BillRelation,
BillText,
BillTextSummary,
BillTopic,
BillTopicPosition,
ClassificationMethod,
@@ -51,6 +52,7 @@ __all__ = [
"BillActionRecordedVote",
"BillRelation",
"BillText",
"BillTextSummary",
"BillTopic",
"BillTopicPosition",
"ClassificationMethod",
-34
View File
@@ -1,34 +0,0 @@
SUMMARIZATION_SYSTEM_PROMPT = """You are a legislative analyst extracting policy substance from Congressional bill text.
Your job is to compress a bill into a dense, neutral structured summary that captures every distinct policy action — including secondary effects that might be buried in subsections.
EXTRACTION RULES:
- IGNORE: whereas clauses, congressional findings that are purely political statements, recitals, preambles, citations of existing law by number alone, and procedural boilerplate.
- FOCUS ON: operative verbs — what the bill SHALL do, PROHIBIT, REQUIRE, AUTHORIZE, AMEND, APPROPRIATE, or ESTABLISH.
- SURFACE ALL THREADS: If the bill touches multiple policy areas, list each thread separately. Do not collapse them.
- BE CONCRETE: Name the affected population, the mechanism, and the direction (expands/restricts/maintains).
- STAY NEUTRAL: No political framing. Describe what the text does, not what its sponsors claim it does.
OUTPUT FORMAT — plain structured text, not JSON:
OPERATIVE ACTIONS:
[Numbered list of what the bill actually does, one action per line, max 20 words each]
AFFECTED POPULATIONS:
[Who gains something, who loses something, or whose behavior is regulated]
MECHANISMS:
[How it works: new funding, mandate, prohibition, amendment to existing statute, grant program, study commission, etc.]
POLICY THREADS:
[List each distinct policy domain this bill touches, even minor ones. Use plain language, not domain codes.]
SYMBOLIC/PROCEDURAL ONLY:
[Yes or No — is this bill primarily a resolution, designation, or awareness declaration with no operative effect?]
LENGTH TARGET: 150-250 words total. Be ruthless about cutting. Density over completeness."""
SUMMARIZATION_USER_TEMPLATE = """Summarize the following Congressional bill according to your instructions.
BILL TEXT:
{text_content}"""
+1
View File
@@ -0,0 +1 @@
"""FastAPI HTMX front end for the legislative database."""
+31
View File
@@ -0,0 +1,31 @@
"""Database access for the FastAPI web app."""
from __future__ import annotations
from collections.abc import Iterator
from contextlib import contextmanager
from functools import lru_cache
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session
from pipelines.orm.common import get_postgres_engine
@lru_cache(maxsize=1)
def get_engine() -> Engine:
"""Return the lazily-created DATA_SCIENCE_DEV SQLAlchemy engine."""
return get_postgres_engine(name="DATA_SCIENCE_DEV")
def validate_database_connection() -> None:
"""Fail fast if the configured DATA_SCIENCE_DEV database is unavailable."""
with get_engine().connect():
pass
@contextmanager
def session_scope() -> Iterator[Session]:
"""Yield a SQLAlchemy session for a read-only request."""
with Session(get_engine()) as session:
yield session
+589
View File
@@ -0,0 +1,589 @@
"""FastAPI app for the HTMX legislative dashboard."""
from __future__ import annotations
from contextlib import asynccontextmanager
from dataclasses import dataclass
import hashlib
import hmac
from os import getenv
from pathlib import Path
import secrets
from typing import Any
from urllib.parse import parse_qs
from fastapi import Depends, FastAPI, HTTPException, Request, Response, status
from fastapi.responses import HTMLResponse, PlainTextResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pipelines.web import repository
from pipelines.web.db import session_scope, validate_database_connection
from pipelines.web.repository import Chamber, RankingResult
from pipelines.web.scoring import normalize_issues
from pipelines.web.svg import render_compare_radar_svg, render_score_history_svg
BASE_DIR = Path(__file__).resolve().parent
TEMPLATES_DIR = BASE_DIR / "templates"
STATIC_DIR = BASE_DIR / "static"
templates = Jinja2Templates(directory=TEMPLATES_DIR)
ADMIN_USERNAME = "admin"
ADMIN_PASSWORD = "admin"
SESSION_COOKIE = "nornsight_admin"
SESSION_SECRET = "nornsight-local-dev-session-secret"
@asynccontextmanager
async def lifespan(_: FastAPI):
"""Validate database access when the CLI starts the web server."""
if getenv("PYTEST_CURRENT_TEST") is None:
validate_database_connection()
yield
app = FastAPI(title="Nornsight Legislative Dashboard", lifespan=lifespan)
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
@dataclass(frozen=True)
class DashboardState:
"""Dashboard query-string state."""
issues: list[str]
chamber: Chamber
congress: int | None
compare: list[int]
@app.get("/healthz", response_class=PlainTextResponse)
def healthz() -> str:
"""Return a simple liveness response."""
return "ok"
@app.get("/login", response_class=HTMLResponse)
def login(request: Request) -> Response:
"""Render the integrated login page."""
next_path = _safe_next_path(request.query_params.get("next"))
if _authenticated_user(request) is not None:
return RedirectResponse(next_path, status_code=status.HTTP_303_SEE_OTHER)
return templates.TemplateResponse(
request,
"login.html",
{
"error": "",
"is_authenticated": False,
"show_primary_nav": False,
"next_path": next_path,
"username": "",
},
)
@app.post("/login", response_class=HTMLResponse)
async def login_submit(request: Request) -> Response:
"""Authenticate the hard-coded admin user and set a session cookie."""
form = parse_qs((await request.body()).decode())
username = form.get("username", [""])[0]
password = form.get("password", [""])[0]
next_path = _safe_next_path(form.get("next", [request.query_params.get("next")])[0])
username_ok = secrets.compare_digest(username, ADMIN_USERNAME)
password_ok = secrets.compare_digest(password, ADMIN_PASSWORD)
if not (username_ok and password_ok):
return templates.TemplateResponse(
request,
"login.html",
{
"error": "Invalid username or password.",
"is_authenticated": False,
"show_primary_nav": False,
"next_path": next_path,
"username": username,
},
status_code=status.HTTP_401_UNAUTHORIZED,
)
response = RedirectResponse(next_path, status_code=status.HTTP_303_SEE_OTHER)
response.set_cookie(
SESSION_COOKIE,
_sign_session(username),
httponly=True,
samesite="lax",
)
return response
@app.get("/logout")
def logout() -> Response:
"""Clear the local admin session."""
response = RedirectResponse("/login", status_code=status.HTTP_303_SEE_OTHER)
response.delete_cookie(SESSION_COOKIE)
return response
def require_admin(request: Request) -> str:
"""Redirect unauthenticated users to the in-site login page."""
username = _authenticated_user(request)
if username is not None:
return username
next_path = request.url.path
if request.url.query:
next_path = f"{next_path}?{request.url.query}"
login_url = request.url_for("login").include_query_params(next=next_path)
raise HTTPException(
status_code=status.HTTP_303_SEE_OTHER,
headers={"Location": str(login_url)},
)
def _authenticated_user(request: Request) -> str | None:
token = request.cookies.get(SESSION_COOKIE)
if token is None:
return None
try:
username, signature = token.split(":", 1)
except ValueError:
return None
if username != ADMIN_USERNAME:
return None
expected = _session_signature(username)
if secrets.compare_digest(signature, expected):
return username
return None
def _sign_session(username: str) -> str:
return f"{username}:{_session_signature(username)}"
def _session_signature(username: str) -> str:
return hmac.new(
SESSION_SECRET.encode(),
username.encode(),
hashlib.sha256,
).hexdigest()
def _safe_next_path(value: str | None) -> str:
if value and value.startswith("/") and not value.startswith("//"):
return value
return "/"
@app.get("/", response_class=HTMLResponse)
def dashboard(request: Request, _: str = Depends(require_admin)) -> Response:
"""Render the full dashboard page."""
context = _dashboard_context(request)
if request.headers.get("hx-request") == "true":
return templates.TemplateResponse(request, "partials/_dashboard.html", context)
return templates.TemplateResponse(request, "dashboard.html", context)
@app.get("/partials/dashboard", response_class=HTMLResponse)
def dashboard_partial(request: Request, _: str = Depends(require_admin)) -> Response:
"""Render the filter-dependent dashboard body."""
context = _dashboard_context(request)
return templates.TemplateResponse(request, "partials/_dashboard.html", context)
@app.get("/partials/issues", response_class=HTMLResponse)
def issues_partial(request: Request, _: str = Depends(require_admin)) -> Response:
"""Render only issue filters."""
context = _dashboard_context(request)
return templates.TemplateResponse(request, "partials/_issue_filters.html", context)
@app.get("/partials/rankings", response_class=HTMLResponse)
def rankings_partial(request: Request, _: str = Depends(require_admin)) -> Response:
"""Render only ranking panels."""
context = _dashboard_context(request)
return templates.TemplateResponse(request, "partials/_rankings.html", context)
@app.get("/partials/chart", response_class=HTMLResponse)
def chart_partial(request: Request, _: str = Depends(require_admin)) -> Response:
"""Render only the SVG chart panel."""
context = _dashboard_context(request)
return templates.TemplateResponse(request, "partials/_chart.html", context)
@app.get("/legislators", response_class=HTMLResponse)
def legislators(request: Request, _: str = Depends(require_admin)) -> Response:
"""Render the legislator profile/search page."""
context = _legislators_context(request)
return templates.TemplateResponse(request, "legislators.html", context)
@app.get("/partials/legislator-suggestions", response_class=HTMLResponse)
def legislator_suggestions_partial(
request: Request, _: str = Depends(require_admin)
) -> Response:
"""Render legislator search suggestions for the HTMX typeahead."""
query = request.query_params.get("q", "").strip()
context: dict[str, Any] = {
"q": query if len(query) >= 2 else "",
"matches": [],
"build_legislator_url": _build_legislator_url,
}
if len(query) >= 2:
with session_scope() as session:
context["matches"] = repository.search_legislators(
session, query=query, limit=8
)
return templates.TemplateResponse(
request, "partials/_legislator_suggestions.html", context
)
@app.get("/compare", response_class=HTMLResponse)
def compare(request: Request, _: str = Depends(require_admin)) -> Response:
"""Render the legislator radar comparison page."""
context = _compare_context(request)
return templates.TemplateResponse(request, "compare.html", context)
def _dashboard_context(request: Request) -> dict[str, Any]:
state = _parse_state(request)
base_context: dict[str, Any] = {
"state": state,
"issues": state.issues,
"selected_issue_label": " + ".join(state.issues) if state.issues else "",
"chamber": state.chamber,
"congress": state.congress,
"latest_score_year": None,
"last_updated": None,
"suggestions": [],
"rankings": RankingResult(supportive=[], opposed=[]),
"compare": [],
"chart_svg": render_score_history_svg([]),
"chart_series": [],
"has_votes": False,
"has_scores": False,
"empty_message": "",
"build_url": _build_url,
"toggle_compare": _toggle_compare,
}
with session_scope() as session:
congress = state.congress or repository.latest_congress(session)
base_context["congress"] = congress
base_context["has_scores"] = repository.has_scores(session)
base_context["latest_score_year"] = repository.latest_score_year(session)
base_context["last_updated"] = repository.latest_vote_date(session, congress)
base_context["suggestions"] = repository.issue_suggestions(
session, congress=congress
)
if not base_context["has_scores"]:
base_context["empty_message"] = (
"No legislator scores are loaded yet. Run the score calculator first."
)
return base_context
if congress is None:
base_context["congress"] = "Computed"
if not state.issues:
base_context["empty_message"] = (
"Choose one or more issue areas to calculate roll-call support scores."
)
return base_context
rankings = repository.get_rankings(
session,
issues=state.issues,
chamber=state.chamber,
congress=congress,
)
base_context["rankings"] = rankings
compare = state.compare or [row.legislator_id for row in rankings.supportive[:2]]
base_context["compare"] = compare
if not rankings.supportive and not rankings.opposed:
base_context["empty_message"] = "No matching roll-call votes."
return base_context
history = repository.get_score_history(
session,
issues=state.issues,
chamber=state.chamber,
congress=congress,
legislator_ids=compare,
)
base_context["chart_series"] = history
base_context["chart_svg"] = render_score_history_svg(history)
return base_context
def _parse_state(request: Request) -> DashboardState:
query = request.query_params
chamber = query.get("chamber", "senate").lower()
if chamber not in {"house", "senate", "all"}:
chamber = "senate"
congress = _parse_int(query.get("congress"))
compare = [
value
for value in (_parse_int(raw) for raw in query.getlist("compare"))
if value is not None
]
return DashboardState(
issues=normalize_issues(query.getlist("issues")),
chamber=chamber, # type: ignore[arg-type]
congress=congress,
compare=compare,
)
def _legislators_context(request: Request) -> dict[str, Any]:
query = request.query_params.get("q", "").strip()
legislator_id = _parse_int(request.query_params.get("legislator_id"))
selected_topic = request.query_params.get("topic", "").strip()
per_page = _parse_per_page(request.query_params.get("per_page"))
page = max(_parse_int(request.query_params.get("page")) or 1, 1)
base_context: dict[str, Any] = {
"q": query,
"profile": None,
"matches": [],
"result_count": 0,
"page": page,
"per_page": per_page,
"per_page_options": [10, 25, 50],
"total_pages": 1,
"previous_page": None,
"next_page": None,
"selected_topic": selected_topic,
"history_svg": render_score_history_svg([]),
"history_series": [],
"build_legislator_url": _build_legislator_url,
"build_legislator_search_url": _build_legislator_search_url,
}
with session_scope() as session:
result_count = repository.count_legislators(session, query=query) if query else 0
total_pages = max((result_count + per_page - 1) // per_page, 1)
if page > total_pages:
page = total_pages
base_context["page"] = page
matches = (
repository.search_legislators(
session,
query=query,
limit=per_page,
offset=(page - 1) * per_page,
)
if query
else []
)
profile = repository.get_legislator_profile(
session, legislator_id=legislator_id, query=None
)
base_context["profile"] = profile
base_context["matches"] = matches
base_context["result_count"] = result_count
base_context["total_pages"] = total_pages
base_context["previous_page"] = page - 1 if page > 1 else None
base_context["next_page"] = page + 1 if page < total_pages else None
if profile is None:
return base_context
if not selected_topic:
if profile.bottom_topics:
selected_topic = profile.bottom_topics[0].topic
elif profile.top_topics:
selected_topic = profile.top_topics[0].topic
base_context["selected_topic"] = selected_topic
if selected_topic:
history = repository.get_single_legislator_history(
session,
legislator_id=profile.legislator.legislator_id,
topic=selected_topic,
)
base_context["history_series"] = history
base_context["history_svg"] = render_score_history_svg(history)
return base_context
def _compare_context(request: Request) -> dict[str, Any]:
selected_legislators = _parse_int_list(
request.query_params.getlist("legislator_id")
or request.query_params.getlist("compare")
)[:4]
topics = normalize_issues(
request.query_params.getlist("topic") or request.query_params.getlist("issues")
)[:8]
query = request.query_params.get("q", "").strip()
base_context: dict[str, Any] = {
"selected_legislators": selected_legislators,
"selected_legislator_options": [],
"topics": topics,
"q": query,
"series": [],
"radar_svg": render_compare_radar_svg([], []),
"legislator_options": [],
"topic_options": [],
"build_compare_url": _build_compare_url,
}
with session_scope() as session:
default_legislators, default_topics = repository.get_compare_defaults(session)
if not selected_legislators and not query:
selected_legislators = default_legislators[:3]
if not topics:
topics = default_topics[:6]
selected_legislator_options = repository.get_legislator_options(
session, selected_legislators
)
series = repository.get_compare_radar_series(
session, legislator_ids=selected_legislators, topics=topics
)
base_context.update(
{
"selected_legislators": selected_legislators,
"selected_legislator_options": selected_legislator_options,
"topics": topics,
"q": query,
"series": series,
"radar_svg": render_compare_radar_svg(topics, series),
"legislator_options": repository.search_legislators(
session, query=query or None, limit=12
),
"topic_options": repository.issue_suggestions(
session, congress=None, limit=12
),
}
)
return base_context
def _parse_int(value: str | None) -> int | None:
if value is None or value == "":
return None
try:
return int(value)
except ValueError:
return None
def _parse_int_list(values: list[str]) -> list[int]:
parsed: list[int] = []
seen: set[int] = set()
for value in values:
integer = _parse_int(value)
if integer is not None and integer not in seen:
parsed.append(integer)
seen.add(integer)
return parsed
def _parse_per_page(value: str | None) -> int:
parsed = _parse_int(value)
return parsed if parsed in {10, 25, 50} else 10
def _build_url(
request: Request,
*,
issues: list[str] | None = None,
chamber: str | None = None,
congress: int | None = None,
compare: list[int] | None = None,
) -> str:
params: list[tuple[str, str]] = []
chosen_issues = (
issues
if issues is not None
else normalize_issues(request.query_params.getlist("issues"))
)
chosen_chamber = (
chamber
if chamber is not None
else request.query_params.get("chamber", "senate")
)
chosen_congress = (
congress
if congress is not None
else _parse_int(request.query_params.get("congress"))
)
chosen_compare = (
compare
if compare is not None
else [
value
for value in (
_parse_int(raw) for raw in request.query_params.getlist("compare")
)
if value is not None
]
)
for issue in chosen_issues:
params.append(("issues", issue))
params.append(("chamber", chosen_chamber))
if chosen_congress is not None:
params.append(("congress", str(chosen_congress)))
for legislator_id in chosen_compare:
params.append(("compare", str(legislator_id)))
if not params:
return "/"
from urllib.parse import urlencode
return f"/?{urlencode(params, doseq=True)}"
def _toggle_compare(compare: list[int], legislator_id: int) -> list[int]:
"""Return compare IDs with the legislator added or removed."""
if legislator_id in compare:
return [value for value in compare if value != legislator_id]
return [*compare, legislator_id]
def _build_legislator_url(
*,
legislator_id: int | None = None,
q: str | None = None,
topic: str | None = None,
per_page: int | None = None,
) -> str:
from urllib.parse import urlencode
params: list[tuple[str, str]] = []
if legislator_id is not None:
params.append(("legislator_id", str(legislator_id)))
if q:
params.append(("q", q))
if topic:
params.append(("topic", topic))
if per_page in {10, 25, 50} and per_page != 10:
params.append(("per_page", str(per_page)))
return f"/legislators?{urlencode(params)}" if params else "/legislators"
def _build_legislator_search_url(
*,
q: str,
per_page: int,
page: int = 1,
) -> str:
from urllib.parse import urlencode
params: list[tuple[str, str]] = []
if q:
params.append(("q", q))
params.append(("per_page", str(per_page)))
if page > 1:
params.append(("page", str(page)))
return f"/legislators?{urlencode(params)}" if params else "/legislators"
def _build_compare_url(
*,
legislator_ids: list[int],
topics: list[str],
q: str | None = None,
) -> str:
from urllib.parse import urlencode
params: list[tuple[str, str]] = []
for legislator_id in legislator_ids[:4]:
params.append(("legislator_id", str(legislator_id)))
for topic in topics[:8]:
params.append(("topic", topic))
if q:
params.append(("q", q))
return f"/compare?{urlencode(params, doseq=True)}" if params else "/compare"
+670
View File
@@ -0,0 +1,670 @@
"""Congress database queries for the web dashboard."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import date
from typing import Literal
from sqlalchemy import ColumnElement, Select, case, desc, false, func, or_, select, true
from sqlalchemy.orm import Session
from pipelines.orm.data_science_dev.congress import (
BillTopic,
Legislator,
LegislatorScore,
Vote,
)
from pipelines.web.scoring import normalize_issues
Chamber = Literal["house", "senate", "all"]
STATE_ALIASES = {
"alabama": "AL",
"alaska": "AK",
"arizona": "AZ",
"arkansas": "AR",
"california": "CA",
"colorado": "CO",
"connecticut": "CT",
"delaware": "DE",
"florida": "FL",
"georgia": "GA",
"hawaii": "HI",
"idaho": "ID",
"illinois": "IL",
"indiana": "IN",
"iowa": "IA",
"kansas": "KS",
"kentucky": "KY",
"louisiana": "LA",
"maine": "ME",
"maryland": "MD",
"massachusetts": "MA",
"michigan": "MI",
"minnesota": "MN",
"mississippi": "MS",
"missouri": "MO",
"montana": "MT",
"nebraska": "NE",
"nevada": "NV",
"new hampshire": "NH",
"new jersey": "NJ",
"new mexico": "NM",
"new york": "NY",
"north carolina": "NC",
"north dakota": "ND",
"ohio": "OH",
"oklahoma": "OK",
"oregon": "OR",
"pennsylvania": "PA",
"rhode island": "RI",
"south carolina": "SC",
"south dakota": "SD",
"tennessee": "TN",
"texas": "TX",
"utah": "UT",
"vermont": "VT",
"virginia": "VA",
"washington": "WA",
"west virginia": "WV",
"wisconsin": "WI",
"wyoming": "WY",
"district of columbia": "DC",
}
@dataclass(frozen=True)
class RankingRow:
"""A legislator support score row."""
legislator_id: int
display_name: str
party: str | None
state: str | None
chamber: str | None
score: float | None
supportive: int
opposed: int
@property
def total(self) -> int:
return self.supportive + self.opposed
@dataclass(frozen=True)
class RankingResult:
"""Supportive and opposed ranking lists."""
supportive: list[RankingRow]
opposed: list[RankingRow]
@dataclass(frozen=True)
class TimePoint:
"""One yearly chart point."""
year: int
score: float
@dataclass(frozen=True)
class ChartSeries:
"""One legislator score-history series."""
legislator_id: int
label: str
party: str | None
state: str | None
points: list[TimePoint]
@dataclass(frozen=True)
class TopicScore:
"""Average score for one topic."""
topic: str
score: float
count: int
@dataclass(frozen=True)
class LegislatorOption:
"""Compact legislator metadata for search and comparison controls."""
legislator_id: int
display_name: str
party: str | None
state: str | None
chamber: str | None
@dataclass(frozen=True)
class LegislatorProfile:
"""Legislator metadata plus issue score summary."""
legislator: LegislatorOption
overall_score: float | None
serving_since: int | None
top_topics: list[TopicScore]
bottom_topics: list[TopicScore]
@dataclass(frozen=True)
class RadarSeries:
"""One legislator polygon for the compare radar chart."""
legislator: LegislatorOption
average_score: float | None
scores_by_topic: dict[str, float]
def latest_congress(session: Session) -> int | None:
"""Return the latest congress number in the vote table."""
return session.scalar(select(func.max(Vote.congress)))
def latest_vote_date(session: Session, congress: int | None = None) -> date | None:
"""Return the most recent vote date, optionally scoped to a congress."""
stmt = select(func.max(Vote.vote_date))
if congress is not None:
stmt = stmt.where(Vote.congress == congress)
return session.scalar(stmt)
def latest_score_year(session: Session) -> int | None:
"""Return the latest year in the precomputed legislator score table."""
return session.scalar(select(func.max(LegislatorScore.year)))
def has_scores(session: Session) -> bool:
"""Return True when the database has at least one precomputed score."""
return session.scalar(select(LegislatorScore.id).limit(1)) is not None
def issue_suggestions(
session: Session,
*,
congress: int | None,
limit: int = 12,
) -> list[str]:
"""Return common precomputed score topics for issue filter suggestions."""
stmt = (
select(LegislatorScore.topic, func.count(LegislatorScore.id).label("score_count"))
.where(LegislatorScore.topic != "")
.group_by(LegislatorScore.topic)
.order_by(desc("score_count"), LegislatorScore.topic)
.limit(limit)
)
suggestions = [row[0] for row in session.execute(stmt).all()]
if suggestions:
return suggestions
fallback = (
select(BillTopic.topic, func.count(BillTopic.id).label("topic_count"))
.where(BillTopic.topic != "")
.group_by(BillTopic.topic)
.order_by(desc("topic_count"), BillTopic.topic)
.limit(limit)
)
return [row[0] for row in session.execute(fallback).all()]
def ranking_query(
*,
issues: list[str],
chamber: Chamber,
congress: int,
) -> Select:
"""Build the aggregate ranking query from precomputed scores."""
average_score = func.avg(LegislatorScore.score).label("score")
supportive = func.sum(case((LegislatorScore.score >= 50, 1), else_=0)).label(
"supportive"
)
opposed = func.sum(case((LegislatorScore.score < 50, 1), else_=0)).label("opposed")
stmt = (
select(
Legislator.id,
Legislator.official_full_name,
Legislator.last_name,
Legislator.current_party,
Legislator.current_state,
Legislator.current_chamber,
average_score,
supportive,
opposed,
)
.join(LegislatorScore, LegislatorScore.legislator_id == Legislator.id)
.where(_score_topic_match_condition(issues))
.group_by(
Legislator.id,
Legislator.official_full_name,
Legislator.last_name,
Legislator.current_party,
Legislator.current_state,
Legislator.current_chamber,
)
)
if chamber != "all":
stmt = stmt.where(Legislator.current_chamber == _db_chamber(chamber))
return stmt
def get_rankings(
session: Session,
*,
issues: list[str],
chamber: Chamber,
congress: int,
limit: int = 10,
) -> RankingResult:
"""Return top supportive and opposed legislators from precomputed scores."""
rows = [
_ranking_row(row)
for row in session.execute(
ranking_query(issues=issues, chamber=chamber, congress=congress)
)
]
scored = [row for row in rows if row.score is not None]
supportive = sorted(
scored, key=lambda row: (-float(row.score), -row.total, row.display_name)
)[:limit]
opposed = sorted(
scored, key=lambda row: (float(row.score), -row.total, row.display_name)
)[:limit]
return RankingResult(supportive=supportive, opposed=opposed)
def get_score_history(
session: Session,
*,
issues: list[str],
chamber: Chamber,
congress: int,
legislator_ids: list[int],
) -> list[ChartSeries]:
"""Return yearly score history from precomputed scores."""
if not legislator_ids:
return []
average_score = func.avg(LegislatorScore.score).label("score")
stmt = (
select(
Legislator.id,
Legislator.official_full_name,
Legislator.last_name,
Legislator.current_party,
Legislator.current_state,
LegislatorScore.year,
average_score,
)
.join(LegislatorScore, LegislatorScore.legislator_id == Legislator.id)
.where(
Legislator.id.in_(legislator_ids),
_score_topic_match_condition(issues),
)
.group_by(
Legislator.id,
Legislator.official_full_name,
Legislator.last_name,
Legislator.current_party,
Legislator.current_state,
LegislatorScore.year,
)
.order_by(Legislator.id, LegislatorScore.year)
)
if chamber != "all":
stmt = stmt.where(Legislator.current_chamber == _db_chamber(chamber))
by_legislator: dict[int, ChartSeries] = {}
for row in session.execute(stmt):
if row.score is None:
continue
series = by_legislator.setdefault(
row.id,
ChartSeries(
legislator_id=row.id,
label=_display_name(row.official_full_name, row.last_name),
party=row.current_party,
state=row.current_state,
points=[],
),
)
series.points.append(TimePoint(year=int(row.year), score=float(row.score)))
return list(by_legislator.values())
def _ranking_row(row: object) -> RankingRow:
return RankingRow(
legislator_id=row.id,
display_name=_display_name(row.official_full_name, row.last_name),
party=row.current_party,
state=row.current_state,
chamber=row.current_chamber,
score=float(row.score) if row.score is not None else None,
supportive=row.supportive or 0,
opposed=row.opposed or 0,
)
def _score_topic_match_condition(
issues: list[str] | tuple[str, ...],
) -> ColumnElement[bool]:
normalized = normalize_issues(list(issues))
if not normalized:
return false()
return or_(*(LegislatorScore.topic.ilike(f"%{issue}%") for issue in normalized))
def search_legislators(
session: Session,
*,
query: str | None = None,
limit: int = 12,
offset: int = 0,
) -> list[LegislatorOption]:
"""Search ingested legislators, preferring those with computed scores."""
return [
_legislator_option(row)
for row in session.execute(
legislator_search_query(query=query, limit=limit, offset=offset)
)
]
def count_legislators(session: Session, *, query: str | None = None) -> int:
"""Return the total number of legislators matching a search query."""
return int(
session.scalar(
select(func.count(Legislator.id)).where(_legislator_search_condition(query))
)
or 0
)
def get_legislator_options(
session: Session, legislator_ids: list[int]
) -> list[LegislatorOption]:
"""Return legislator options in the same order as the selected IDs."""
options = {
option.legislator_id: option
for option in (
_get_legislator_option(session, legislator_id)
for legislator_id in legislator_ids
)
if option is not None
}
return [
options[legislator_id]
for legislator_id in legislator_ids
if legislator_id in options
]
def legislator_search_query(
*,
query: str | None = None,
limit: int = 12,
offset: int = 0,
) -> Select:
"""Build the legislator search query used by profile and compare controls."""
score_count = func.count(LegislatorScore.id).label("score_count")
stmt = (
select(
Legislator.id,
Legislator.official_full_name,
Legislator.last_name,
Legislator.current_party,
Legislator.current_state,
Legislator.current_chamber,
score_count,
)
.outerjoin(LegislatorScore, LegislatorScore.legislator_id == Legislator.id)
.group_by(
Legislator.id,
Legislator.official_full_name,
Legislator.first_name,
Legislator.last_name,
Legislator.current_party,
Legislator.current_state,
Legislator.current_chamber,
Legislator.bioguide_id,
)
.order_by(desc("score_count"), Legislator.last_name, Legislator.first_name)
.limit(limit)
.offset(offset)
)
return stmt.where(_legislator_search_condition(query))
def _legislator_search_condition(query: str | None) -> ColumnElement[bool]:
cleaned_query = query.strip() if query else ""
if not cleaned_query:
return true()
pattern = f"%{cleaned_query}%"
state_alias = _state_alias(cleaned_query)
conditions: list[ColumnElement[bool]] = [
Legislator.official_full_name.ilike(pattern),
Legislator.first_name.ilike(pattern),
Legislator.last_name.ilike(pattern),
Legislator.current_state.ilike(pattern),
Legislator.bioguide_id.ilike(pattern),
]
if state_alias is not None:
conditions.append(Legislator.current_state == state_alias)
return or_(*conditions)
def _state_alias(query: str) -> str | None:
normalized = " ".join(query.lower().replace(".", "").split())
if len(normalized) == 2 and normalized.isalpha():
return normalized.upper()
return STATE_ALIASES.get(normalized)
def get_legislator_profile(
session: Session,
*,
legislator_id: int | None = None,
query: str | None = None,
) -> LegislatorProfile | None:
"""Return the selected legislator profile and top/bottom topic scores."""
selected = _get_legislator_option(session, legislator_id)
cleaned_query = query.strip() if query else ""
if selected is None and cleaned_query:
matches = search_legislators(session, query=query, limit=1)
selected = matches[0] if matches else None
if selected is None:
return None
topic_scores = get_legislator_topic_scores(
session, legislator_id=selected.legislator_id
)
top_topics = sorted(topic_scores, key=lambda item: (-item.score, item.topic))[:3]
bottom_topics = sorted(topic_scores, key=lambda item: (item.score, item.topic))[:3]
overall_score = session.scalar(
select(func.avg(LegislatorScore.score)).where(
LegislatorScore.legislator_id == selected.legislator_id
)
)
serving_since = session.scalar(
select(func.min(LegislatorScore.year)).where(
LegislatorScore.legislator_id == selected.legislator_id
)
)
return LegislatorProfile(
legislator=selected,
overall_score=float(overall_score) if overall_score is not None else None,
serving_since=int(serving_since) if serving_since is not None else None,
top_topics=top_topics,
bottom_topics=bottom_topics,
)
def get_legislator_topic_scores(
session: Session,
*,
legislator_id: int,
) -> list[TopicScore]:
"""Return all average topic scores for one legislator."""
rows = session.execute(
select(
LegislatorScore.topic,
func.avg(LegislatorScore.score).label("score"),
func.count(LegislatorScore.id).label("count"),
)
.where(LegislatorScore.legislator_id == legislator_id)
.group_by(LegislatorScore.topic)
.order_by(LegislatorScore.topic)
)
return [
TopicScore(topic=row.topic, score=float(row.score), count=row.count)
for row in rows
if row.score is not None
]
def get_single_legislator_history(
session: Session,
*,
legislator_id: int,
topic: str,
) -> list[ChartSeries]:
"""Return score history for one legislator/topic pair."""
option = _get_legislator_option(session, legislator_id)
if option is None:
return []
rows = session.execute(
select(
LegislatorScore.year,
func.avg(LegislatorScore.score).label("score"),
)
.where(
LegislatorScore.legislator_id == legislator_id,
LegislatorScore.topic == topic,
)
.group_by(LegislatorScore.year)
.order_by(LegislatorScore.year)
)
points = [
TimePoint(year=int(row.year), score=float(row.score))
for row in rows
if row.score is not None
]
return [
ChartSeries(
legislator_id=option.legislator_id,
label=option.display_name,
party=option.party,
state=option.state,
points=points,
)
]
def get_compare_defaults(session: Session) -> tuple[list[int], list[str]]:
"""Return default compare legislators and topics."""
legislators = search_legislators(session, limit=3)
topics = issue_suggestions(session, congress=None, limit=6)
return [item.legislator_id for item in legislators], topics
def get_compare_radar_series(
session: Session,
*,
legislator_ids: list[int],
topics: list[str],
) -> list[RadarSeries]:
"""Return radar chart scores for selected legislators and topics."""
if not legislator_ids:
return []
options = {
option.legislator_id: option
for option in (
_get_legislator_option(session, legislator_id)
for legislator_id in legislator_ids
)
if option is not None
}
if not options:
return []
scores: dict[int, dict[str, float]] = {
legislator_id: {} for legislator_id in options
}
if topics:
rows = session.execute(
select(
LegislatorScore.legislator_id,
LegislatorScore.topic,
func.avg(LegislatorScore.score).label("score"),
)
.where(
LegislatorScore.legislator_id.in_(list(options)),
LegislatorScore.topic.in_(topics),
)
.group_by(LegislatorScore.legislator_id, LegislatorScore.topic)
)
for row in rows:
scores[row.legislator_id][row.topic] = float(row.score)
series: list[RadarSeries] = []
for legislator_id in legislator_ids:
option = options.get(legislator_id)
if option is None:
continue
topic_scores = scores.get(legislator_id, {})
values = list(topic_scores.values())
series.append(
RadarSeries(
legislator=option,
average_score=sum(values) / len(values) if values else None,
scores_by_topic=topic_scores,
)
)
return series
def _display_name(official_full_name: str | None, last_name: str | None) -> str:
if official_full_name:
parts = official_full_name.split()
if len(parts) > 1:
return f"{parts[-1]}, {' '.join(parts[:-1])}"
return official_full_name
return last_name or "Unknown"
def _legislator_option(row: object) -> LegislatorOption:
return LegislatorOption(
legislator_id=row.id,
display_name=_display_name(row.official_full_name, row.last_name),
party=row.current_party,
state=row.current_state,
chamber=row.current_chamber,
)
def _get_legislator_option(
session: Session, legislator_id: int | None
) -> LegislatorOption | None:
if legislator_id is None:
return None
row = session.execute(
select(
Legislator.id,
Legislator.official_full_name,
Legislator.last_name,
Legislator.current_party,
Legislator.current_state,
Legislator.current_chamber,
).where(Legislator.id == legislator_id)
).first()
return _legislator_option(row) if row is not None else None
def _db_chamber(chamber: Chamber) -> str:
return {"house": "House", "senate": "Senate", "all": "all"}[chamber]
+100
View File
@@ -0,0 +1,100 @@
"""Issue matching and voting score helpers."""
from __future__ import annotations
from dataclasses import dataclass
from sqlalchemy import ColumnElement, false, func, or_
from sqlalchemy.sql.elements import BinaryExpression
from pipelines.orm.data_science_dev.congress import Bill, BillTopicPosition, Vote
SUPPORT_POSITIONS = frozenset({"yea", "aye", "yes"})
OPPOSE_POSITIONS = frozenset({"nay", "no"})
@dataclass(frozen=True)
class ScoreCounts:
"""Support/opposition counts for one legislator or time bucket."""
supportive: int
opposed: int
@property
def total(self) -> int:
return self.supportive + self.opposed
def normalize_position(position: str | None) -> str | None:
"""Normalize a raw roll-call position into support/oppose/ignore buckets."""
if position is None:
return None
value = position.strip().lower()
if value in SUPPORT_POSITIONS:
return "support"
if value in OPPOSE_POSITIONS:
return "oppose"
return None
def score_vote_position(
position: str | None,
support_position: BillTopicPosition | str,
) -> str | None:
"""Score a raw vote as support/opposition for an extracted bill topic."""
normalized_vote = normalize_position(position)
if normalized_vote is None:
return None
topic_position = BillTopicPosition(support_position)
if topic_position is BillTopicPosition.FOR:
return normalized_vote
if normalized_vote == "support":
return "oppose"
return "support"
def calculate_score(counts: ScoreCounts) -> int | None:
"""Calculate the 0-100 support score, or None when there are no scored votes."""
if counts.total == 0:
return None
return round(100 * counts.supportive / counts.total)
def normalize_issues(issues: list[str] | tuple[str, ...]) -> list[str]:
"""Trim, de-duplicate, and preserve issue order for display and queries."""
normalized: list[str] = []
seen: set[str] = set()
for issue in issues:
value = issue.strip()
key = value.casefold()
if value and key not in seen:
normalized.append(value)
seen.add(key)
return normalized
def issue_match_condition(issues: list[str] | tuple[str, ...]) -> ColumnElement[bool]:
"""Build the SQLAlchemy condition for issue text matching."""
normalized = normalize_issues(list(issues))
if not normalized:
return false()
fields: tuple[ColumnElement[str | None], ...] = (
Bill.subjects_top_term,
Bill.title,
Bill.title_short,
Bill.official_title,
Vote.question,
Vote.result_text,
)
terms: list[BinaryExpression[bool]] = []
for issue in normalized:
pattern = f"%{issue}%"
terms.extend(field.ilike(pattern) for field in fields)
return or_(*terms)
def normalized_position_expression(column: ColumnElement[str]) -> ColumnElement[str | None]:
"""Lowercase and trim a SQL column containing raw vote positions."""
return func.lower(func.trim(column))
File diff suppressed because it is too large Load Diff
File diff suppressed because one or more lines are too long
+231
View File
@@ -0,0 +1,231 @@
"""Inline SVG rendering helpers."""
from __future__ import annotations
from html import escape
from math import cos, pi, sin
from pipelines.web.repository import ChartSeries, RadarSeries
SERIES_STYLES = (
{
"color": "#009e73",
"dasharray": None,
"marker": "circle",
},
{
"color": "#0072b2",
"dasharray": "10 6",
"marker": "square",
},
{
"color": "#e69f00",
"dasharray": "4 5",
"marker": "diamond",
},
{
"color": "#cc79a7",
"dasharray": "14 5 3 5",
"marker": "triangle",
},
)
def render_score_history_svg(series: list[ChartSeries]) -> str:
"""Render a responsive inline SVG score history chart."""
width = 880
height = 300
margin_left = 70
margin_right = 28
margin_top = 24
margin_bottom = 48
plot_width = width - margin_left - margin_right
plot_height = height - margin_top - margin_bottom
all_years = sorted({point.year for item in series for point in item.points})
if not all_years:
return _empty_svg(width, height, "No score history for this selection")
min_year = min(all_years)
max_year = max(all_years)
year_span = max(max_year - min_year, 1)
def x_for(year: int) -> float:
return margin_left + ((year - min_year) / year_span) * plot_width
def y_for(score: int) -> float:
return margin_top + ((100 - score) / 100) * plot_height
parts: list[str] = [
f'<svg viewBox="0 0 {width} {height}" role="img" aria-label="Score history chart" class="score-chart">',
'<rect width="100%" height="100%" fill="transparent" />',
]
for score in (0, 25, 50, 75, 100):
y = y_for(score)
parts.append(
f'<line x1="{margin_left}" y1="{y:.2f}" x2="{width - margin_right}" y2="{y:.2f}" class="chart-grid" />'
)
parts.append(
f'<text x="{margin_left - 16}" y="{y + 4:.2f}" text-anchor="end" class="chart-axis-label">{score}</text>'
)
tick_years = _tick_years(all_years)
for year in tick_years:
x = x_for(year)
parts.append(
f'<line x1="{x:.2f}" y1="{margin_top}" x2="{x:.2f}" y2="{height - margin_bottom}" class="chart-year-line" />'
)
parts.append(
f'<text x="{x:.2f}" y="{height - 18}" text-anchor="middle" class="chart-axis-label">{year}</text>'
)
parts.append(
f'<line x1="{margin_left}" y1="{height - margin_bottom}" x2="{width - margin_right}" y2="{height - margin_bottom}" class="chart-axis" />'
)
parts.append(
f'<line x1="{margin_left}" y1="{margin_top}" x2="{margin_left}" y2="{height - margin_bottom}" class="chart-axis" />'
)
for index, item in enumerate(series):
points = sorted(item.points, key=lambda point: point.year)
if not points:
continue
style = SERIES_STYLES[index % len(SERIES_STYLES)]
color = style["color"]
path = " ".join(
f"{'M' if point_index == 0 else 'L'} {x_for(point.year):.2f} {y_for(point.score):.2f}"
for point_index, point in enumerate(points)
)
label = escape(item.label)
dash_attr = (
f' stroke-dasharray="{style["dasharray"]}"'
if style["dasharray"]
else ""
)
parts.append(
f'<path d="{path}" fill="none" stroke="{color}" stroke-width="3.5" stroke-linecap="round" stroke-linejoin="round"{dash_attr}>'
f"<title>{label}</title></path>"
)
for point in points:
parts.append(
_point_marker(
marker=style["marker"],
x=x_for(point.year),
y=y_for(point.score),
color=color,
label=f"{label}: {point.score:.0f} in {point.year}",
)
)
last = points[-1]
parts.append(
f'<text x="{x_for(last.year) - 10:.2f}" y="{y_for(last.score) + 4:.2f}" text-anchor="end" class="chart-series-label" fill="{color}">'
f"{last.score:.0f}</text>"
)
parts.append("</svg>")
return "".join(parts)
def _empty_svg(width: int, height: int, message: str) -> str:
return (
f'<svg viewBox="0 0 {width} {height}" role="img" aria-label="{escape(message)}" class="score-chart">'
'<rect width="100%" height="100%" fill="transparent" />'
f'<text x="{width / 2}" y="{height / 2}" text-anchor="middle" class="chart-empty">{escape(message)}</text>'
"</svg>"
)
def _tick_years(years: list[int]) -> list[int]:
first = years[0]
last = years[-1]
start = first - (first % 5)
tick_years = {year for year in range(start, last + 1, 5) if first <= year <= last}
tick_years.add(first)
tick_years.add(last)
return sorted(tick_years)
def render_compare_radar_svg(topics: list[str], series: list[RadarSeries]) -> str:
"""Render a server-side radar chart for legislator comparison."""
width = 720
height = 560
center_x = 285
center_y = 280
radius = 200
if len(topics) < 3 or not series:
return _empty_svg(width, height, "Choose at least 3 axes and 1 legislator")
axis_count = len(topics)
def point_for(index: int, score: float) -> tuple[float, float]:
angle = -pi / 2 + (2 * pi * index / axis_count)
distance = radius * max(0, min(score, 100)) / 100
return center_x + cos(angle) * distance, center_y + sin(angle) * distance
def ring_points(score: float) -> str:
return " ".join(
f"{point_for(index, score)[0]:.2f},{point_for(index, score)[1]:.2f}"
for index in range(axis_count)
)
parts: list[str] = [
f'<svg viewBox="0 0 {width} {height}" role="img" aria-label="Compare legislators radar chart" class="radar-chart">',
'<rect width="100%" height="100%" fill="transparent" />',
]
for ring in (25, 50, 75, 100):
parts.append(f'<polygon points="{ring_points(ring)}" class="radar-ring" />')
for index, topic in enumerate(topics):
outer_x, outer_y = point_for(index, 100)
label_x, label_y = point_for(index, 113)
parts.append(
f'<line x1="{center_x}" y1="{center_y}" x2="{outer_x:.2f}" y2="{outer_y:.2f}" class="radar-axis" />'
)
anchor = "middle"
if label_x < center_x - 24:
anchor = "end"
elif label_x > center_x + 24:
anchor = "start"
parts.append(
f'<text x="{label_x:.2f}" y="{label_y:.2f}" text-anchor="{anchor}" class="radar-label">{escape(topic)}</text>'
)
for index, item in enumerate(series):
color = SERIES_STYLES[index % len(SERIES_STYLES)]["color"]
points = " ".join(
f"{point_for(topic_index, item.scores_by_topic.get(topic, 50.0))[0]:.2f},"
f"{point_for(topic_index, item.scores_by_topic.get(topic, 50.0))[1]:.2f}"
for topic_index, topic in enumerate(topics)
)
label = escape(item.legislator.display_name)
parts.append(
f'<polygon points="{points}" fill="{color}" fill-opacity="0.14" stroke="{color}" stroke-width="3" class="radar-series">'
f"<title>{label}</title></polygon>"
)
parts.append("</svg>")
return "".join(parts)
def _point_marker(*, marker: str, x: float, y: float, color: str, label: str) -> str:
title = f"<title>{escape(label)}</title>"
if marker == "square":
return (
f'<rect x="{x - 4.25:.2f}" y="{y - 4.25:.2f}" width="8.5" height="8.5" '
f'fill="{color}" rx="1.5" ry="1.5">{title}</rect>'
)
if marker == "diamond":
points = (
f"{x:.2f},{y - 5.2:.2f} "
f"{x + 5.2:.2f},{y:.2f} "
f"{x:.2f},{y + 5.2:.2f} "
f"{x - 5.2:.2f},{y:.2f}"
)
return f'<polygon points="{points}" fill="{color}">{title}</polygon>'
if marker == "triangle":
points = (
f"{x:.2f},{y - 5.5:.2f} "
f"{x + 5.5:.2f},{y + 4.5:.2f} "
f"{x - 5.5:.2f},{y + 4.5:.2f}"
)
return f'<polygon points="{points}" fill="{color}">{title}</polygon>'
return f'<circle cx="{x:.2f}" cy="{y:.2f}" r="4.5" fill="{color}">{title}</circle>'
+40
View File
@@ -0,0 +1,40 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>{% block title %}Nornsight{% endblock %}</title>
<link rel="stylesheet" href="{{ url_for('static', path='styles.css') }}">
<script src="{{ url_for('static', path='vendor/htmx.min.js') }}" defer></script>
</head>
<body>
<header class="topbar">
<a class="brand" href="/">
<span class="brand-mark">N</span>
<span>Nornsight</span>
</a>
{% if show_primary_nav|default(true) %}
<nav class="primary-nav" aria-label="Primary">
<a href="/">Issues</a>
<a href="/legislators">Legislators</a>
<a href="/compare">Compare</a>
</nav>
{% endif %}
<nav class="account-nav" aria-label="Account">
<a href="#" aria-disabled="true">Help</a>
{% if is_authenticated|default(true) %}
<details class="account-menu">
<summary>My account</summary>
<div class="account-menu-panel">
<a href="#" aria-disabled="true">Account settings</a>
<a class="sign-out" href="/logout">Sign out</a>
</div>
</details>
{% else %}
<a class="sign-in" href="/login">Sign in</a>
{% endif %}
</nav>
</header>
{% block body %}{% endblock %}
</body>
</html>
+87
View File
@@ -0,0 +1,87 @@
{% extends "base.html" %}
{% block title %}Compare Legislators{% endblock %}
{% block body %}
<main class="shell">
<section class="page-heading stacked-heading">
<div>
<h1>Compare legislators</h1>
<p>Up to 4 legislators · up to 8 issue axes · each polygon = one legislator's full issue profile</p>
</div>
</section>
<section class="compare-controls">
<form class="wide-search compare-search" action="/compare" method="get">
<label class="sr-only" for="compare-legislator-search">Search legislators</label>
{% for legislator_id in selected_legislators %}
<input type="hidden" name="legislator_id" value="{{ legislator_id }}">
{% endfor %}
{% for topic in topics %}
<input type="hidden" name="topic" value="{{ topic }}">
{% endfor %}
<input
id="compare-legislator-search"
type="search"
name="q"
value="{{ q }}"
placeholder="Search legislators to add"
autocomplete="off">
<button type="submit">Search</button>
</form>
<h2>Legislators ({{ selected_legislator_options|length }} / 4)</h2>
<div class="result-chips">
{% for legislator in selected_legislator_options %}
{% set without = selected_legislators | reject('equalto', legislator.legislator_id) | list %}
<a href="{{ build_compare_url(legislator_ids=without, topics=topics, q=q) }}"><span class="legend-dot dot-{{ loop.index0 }}"></span>{{ legislator.display_name }}{% if legislator.state %} — {{ legislator.state }}{% endif %} ×</a>
{% endfor %}
{% if selected_legislator_options|length < 4 %}
{% for option in legislator_options %}
{% if option.legislator_id not in selected_legislators %}
<a class="dashed-chip" href="{{ build_compare_url(legislator_ids=selected_legislators + [option.legislator_id], topics=topics, q=q) }}">+ {{ option.display_name }}</a>
{% endif %}
{% endfor %}
{% endif %}
</div>
<h2>Issue axes ({{ topics|length }} / 8)</h2>
<div class="axis-chips">
{% for topic in topics %}
{% set without_topic = topics[:loop.index0] + topics[loop.index:] %}
<a href="{{ build_compare_url(legislator_ids=selected_legislators, topics=without_topic, q=q) }}">{{ topic }} ×</a>
{% endfor %}
{% if topics|length < 8 %}
{% for topic in topic_options %}
{% if topic not in topics %}
<a href="{{ build_compare_url(legislator_ids=selected_legislators, topics=topics + [topic], q=q) }}">{{ topic }}</a>
{% endif %}
{% endfor %}
{% endif %}
</div>
</section>
<section class="compare-card">
<div class="radar-frame">{{ radar_svg | safe }}</div>
<aside class="compare-legend">
<h2>Legend</h2>
{% for item in series %}
<div class="legend-row">
<span class="legend-line line-{{ loop.index0 }}"></span>
<div>
<strong>{{ item.legislator.display_name }}</strong>
<small>{{ item.legislator.state or "US" }} · {{ item.legislator.party or "—" }} · avg {{ item.average_score|round(0) if item.average_score is not none else "—" }}</small>
</div>
</div>
{% endfor %}
<p>Outer ring = 100% support. Each axis is scored independently against full roll-call record.</p>
<p><em>Max 4 legislators · max 8 axes</em></p>
</aside>
</section>
</main>
<footer class="footer">
<span>Actual record, not rhetoric</span>
<span>Source: congressional roll-call votes</span>
<span>Not affiliated with any political party or organization</span>
</footer>
{% endblock %}
+30
View File
@@ -0,0 +1,30 @@
{% extends "base.html" %}
{% block title %}Legislative Accountability{% endblock %}
{% block body %}
<main class="shell">
<section class="page-heading">
<div>
<h1>Legislative accountability</h1>
<p>US legislative accountability · precomputed legislator topic scores{% if latest_score_year %} through {{ latest_score_year }}{% endif %}</p>
</div>
<div class="heading-actions">
<a href="#" aria-disabled="true">Methodology</a>
<a href="#" aria-disabled="true">Data sources</a>
<span>Last updated: {{ last_updated.strftime("%b %Y") if last_updated else "Unavailable" }}</span>
</div>
</section>
<div class="notice">Choose one or more score topics, then select lawmakers to compare computed records over time.</div>
<div id="dashboard-body">
{% include "partials/_dashboard.html" %}
</div>
</main>
<footer class="footer">
<span>Actual record, not rhetoric</span>
<span>Source: congressional roll-call votes</span>
<span>Not affiliated with any political party or organization</span>
</footer>
{% endblock %}
+148
View File
@@ -0,0 +1,148 @@
{% extends "base.html" %}
{% block title %}Legislator Profiles{% endblock %}
{% block body %}
<main class="shell">
<section class="page-heading stacked-heading">
<div>
<h1>Legislator profiles</h1>
<p>Full issue taxonomy · search any current legislator</p>
</div>
</section>
<form class="wide-search legislator-search-form" action="/legislators" method="get">
<label class="sr-only" for="legislator-search">Search legislators</label>
<input
id="legislator-search"
type="search"
name="q"
value="{{ q }}"
placeholder="Search by name or state"
autocomplete="off"
hx-get="/partials/legislator-suggestions"
hx-trigger="input changed delay:200ms, search"
hx-target="#legislator-suggestions"
hx-swap="innerHTML">
<label class="sr-only" for="legislator-page-size">Results per page</label>
<select id="legislator-page-size" name="per_page" aria-label="Results per page">
{% for option in per_page_options %}
<option value="{{ option }}" {{ "selected" if option == per_page else "" }}>{{ option }}</option>
{% endfor %}
</select>
<button type="submit">Search</button>
</form>
<div id="legislator-suggestions" aria-live="polite"></div>
{% if q %}
<section class="phonebook-results" aria-label="Matching legislators">
<header>
<h2>Matching legislators</h2>
<span>{{ result_count }} result{{ "" if result_count == 1 else "s" }}</span>
</header>
{% if matches %}
<ol class="phonebook-list" start="{{ ((page - 1) * per_page) + 1 }}">
{% for option in matches %}
<li>
<a href="{{ build_legislator_url(legislator_id=option.legislator_id, q=q, per_page=per_page) }}">
<span class="phonebook-name">{{ option.display_name }}</span>
<span class="phonebook-meta">
{{ option.state or "US" }}{% if option.party %} · {{ option.party }}{% endif %}{% if option.chamber %} · {{ option.chamber }}{% endif %}
</span>
</a>
</li>
{% endfor %}
</ol>
<nav class="pagination" aria-label="Legislator results pages">
{% if previous_page %}
<a href="{{ build_legislator_search_url(q=q, per_page=per_page, page=previous_page) }}">Previous</a>
{% else %}
<span>Previous</span>
{% endif %}
<strong>Page {{ page }} of {{ total_pages }}</strong>
{% if next_page %}
<a href="{{ build_legislator_search_url(q=q, per_page=per_page, page=next_page) }}">Next</a>
{% else %}
<span>Next</span>
{% endif %}
</nav>
{% else %}
<p class="empty-state">No legislators matched this search.</p>
{% endif %}
</section>
{% endif %}
{% if profile %}
<section class="profile-card">
<header class="profile-header">
<div class="profile-identity">
<span class="avatar">{{ profile.legislator.display_name.split(',')[0][:1] }}{{ profile.legislator.display_name.split(',')[-1].strip()[:1] }}</span>
<div>
<h2>{{ profile.legislator.display_name }} <span class="party-pill">{{ profile.legislator.chamber or "LEG" }}</span></h2>
<p>{{ profile.legislator.state or "US" }} · {{ profile.legislator.party or "Unaffiliated" }}{% if profile.serving_since %} · Serving since {{ profile.serving_since }}{% endif %}</p>
</div>
</div>
<div class="overall-score">
<span>Overall avg</span>
<strong>{{ profile.overall_score|round(0) if profile.overall_score is not none else "—" }}</strong>
<small>/ 100</small>
</div>
</header>
{% if profile.top_topics or profile.bottom_topics %}
<div class="topic-panels">
<article>
<h3>Most important issues for</h3>
{% for item in profile.top_topics %}
<a class="topic-row" href="{{ build_legislator_url(legislator_id=profile.legislator.legislator_id, topic=item.topic) }}">
<strong class="score positive">{{ item.score|round(0) }}</strong>
<span>{{ item.topic }}</span>
<i style="width: {{ item.score }}%"></i>
</a>
{% endfor %}
</article>
<article>
<h3 class="opposed-heading">Most important issues against</h3>
{% for item in profile.bottom_topics %}
<a class="topic-row {{ 'active' if item.topic == selected_topic else '' }}" href="{{ build_legislator_url(legislator_id=profile.legislator.legislator_id, topic=item.topic) }}">
<strong class="score negative">{{ item.score|round(0) }}</strong>
<span>{{ item.topic }}</span>
<i style="width: {{ item.score }}%"></i>
</a>
{% endfor %}
</article>
</div>
<section class="profile-history">
<h3>{{ selected_topic or "Topic" }} — score history</h3>
<div class="chart-frame">{{ history_svg | safe }}</div>
{% if history_series %}
<div class="chart-legend compact" aria-label="Chart legend">
{% for item in history_series %}
<div class="chart-legend-row">
<span class="chart-legend-line line-0"></span>
<span class="chart-legend-marker marker-0"></span>
<div class="chart-legend-copy">
<span class="chart-legend-label">{{ item.label }}</span>
<span class="chart-legend-meta">
{% if item.party %}{{ item.party }}{% endif %}{% if item.party and item.state %} · {% endif %}{% if item.state %}{{ item.state }}{% endif %}
</span>
</div>
</div>
{% endfor %}
</div>
{% endif %}
</section>
{% else %}
<p class="empty-state">No issue scores are available for this legislator yet.</p>
{% endif %}
</section>
{% endif %}
</main>
<footer class="footer">
<span>Actual record, not rhetoric</span>
<span>Source: congressional roll-call votes</span>
<span>Not affiliated with any political party or organization</span>
</footer>
{% endblock %}
+45
View File
@@ -0,0 +1,45 @@
{% extends "base.html" %}
{% block title %}Sign in | Nornsight{% endblock %}
{% block body %}
<main class="login-shell">
<section class="login-panel" aria-labelledby="login-title">
<div class="login-copy">
<p class="eyebrow">Admin access</p>
<h1 id="login-title">Sign in to Nornsight</h1>
<p>Use the dashboard account to review rankings, profiles, and legislator comparisons.</p>
</div>
<form class="login-form" action="/login" method="post">
<input type="hidden" name="next" value="{{ next_path }}">
{% if error %}
<p class="form-error" role="alert">{{ error }}</p>
{% endif %}
<label for="username">Username</label>
<input
id="username"
name="username"
type="text"
autocomplete="username"
value="{{ username }}"
required
autofocus
>
<label for="password">Password</label>
<input
id="password"
name="password"
type="password"
autocomplete="current-password"
required
>
<button type="submit">Sign in</button>
</form>
</section>
</main>
{% endblock %}
@@ -0,0 +1,30 @@
<section class="chart-card">
<header>
<h2>Score history{% if selected_issue_label %} — {{ selected_issue_label }}{% endif %}</h2>
<a href="{{ build_url(request, compare=[]) }}"
hx-get="/partials/dashboard{{ build_url(request, compare=[])|replace('/', '', 1) }}"
hx-target="#dashboard-body"
hx-push-url="{{ build_url(request, compare=[]) }}">Clear comparison</a>
</header>
<div class="chart-frame">
{{ chart_svg | safe }}
</div>
{% if chart_series %}
<div class="chart-legend" aria-label="Chart legend">
{% for item in chart_series %}
{% set style_index = loop.index0 % 4 %}
<div class="chart-legend-row">
<span class="chart-legend-line line-{{ style_index }}"></span>
<span class="chart-legend-marker marker-{{ style_index }}"></span>
<div class="chart-legend-copy">
<span class="chart-legend-label">{{ item.label }}</span>
<span class="chart-legend-meta">
{% if item.party %}{{ item.party }}{% endif %}{% if item.party and item.state %} · {% endif %}{% if item.state %}{{ item.state }}{% endif %}
</span>
</div>
</div>
{% endfor %}
</div>
{% endif %}
<p class="score-note">Scores reflect averaged precomputed topic rows per year. Sparse years are omitted rather than plotted as zero.</p>
</section>
@@ -0,0 +1,25 @@
<section class="controls-grid">
{% include "partials/_issue_filters.html" %}
<div class="chamber-card">
<a class="segment {{ 'active' if chamber == 'house' else '' }}"
href="{{ build_url(request, chamber='house') }}"
hx-get="/partials/dashboard{{ build_url(request, chamber='house')|replace('/', '', 1) }}"
hx-target="#dashboard-body"
hx-push-url="{{ build_url(request, chamber='house') }}">House</a>
<a class="segment {{ 'active' if chamber == 'senate' else '' }}"
href="{{ build_url(request, chamber='senate') }}"
hx-get="/partials/dashboard{{ build_url(request, chamber='senate')|replace('/', '', 1) }}"
hx-target="#dashboard-body"
hx-push-url="{{ build_url(request, chamber='senate') }}">Senate</a>
<a class="segment {{ 'active' if chamber == 'all' else '' }}"
href="{{ build_url(request, chamber='all') }}"
hx-get="/partials/dashboard{{ build_url(request, chamber='all')|replace('/', '', 1) }}"
hx-target="#dashboard-body"
hx-push-url="{{ build_url(request, chamber='all') }}">All</a>
</div>
</section>
<p class="score-note">Support score: 1-100 precomputed from bill topic stance and roll-call votes. Higher means more aligned with the topic.</p>
{% include "partials/_rankings.html" %}
{% include "partials/_chart.html" %}
@@ -0,0 +1,46 @@
<section class="filter-card">
<h2>Issue filters</h2>
<form class="issue-form"
method="get"
action="/"
hx-get="/"
hx-target="#dashboard-body"
hx-push-url="true">
<input type="hidden" name="chamber" value="{{ chamber }}">
{% if congress %}
<input type="hidden" name="congress" value="{{ congress }}">
{% endif %}
{% for legislator_id in compare %}
<input type="hidden" name="compare" value="{{ legislator_id }}">
{% endfor %}
{% for issue in issues %}
<span class="chip">
{{ issue }}
<a href="{{ build_url(request, issues=issues[:loop.index0] + issues[loop.index:]) }}"
hx-get="/partials/dashboard{{ build_url(request, issues=issues[:loop.index0] + issues[loop.index:])|replace('/', '', 1) }}"
hx-target="#dashboard-body"
hx-push-url="{{ build_url(request, issues=issues[:loop.index0] + issues[loop.index:]) }}"
aria-label="Remove {{ issue }}">×</a>
</span>
<input type="hidden" name="issues" value="{{ issue }}">
{% endfor %}
<label class="search-box">
<span class="sr-only">Search issue areas</span>
<input type="search" name="issues" placeholder="Search issue areas" autocomplete="off">
</label>
<button type="submit">Apply</button>
</form>
{% if suggestions %}
<div class="suggestions" aria-label="Issue suggestions">
{% for suggestion in suggestions %}
{% if suggestion not in issues %}
<a href="{{ build_url(request, issues=issues + [suggestion]) }}"
hx-get="/partials/dashboard{{ build_url(request, issues=issues + [suggestion])|replace('/', '', 1) }}"
hx-target="#dashboard-body"
hx-push-url="{{ build_url(request, issues=issues + [suggestion]) }}">{{ suggestion }}</a>
{% endif %}
{% endfor %}
</div>
{% endif %}
</section>
@@ -0,0 +1,11 @@
{% if matches %}
<div class="result-chips" aria-label="Search suggestions">
{% for option in matches %}
<a href="{{ build_legislator_url(legislator_id=option.legislator_id) }}">
{{ option.display_name }}{% if option.state %} · {{ option.state }}{% endif %}
</a>
{% endfor %}
</div>
{% elif q %}
<p class="suggestion-empty">No matches</p>
{% endif %}
@@ -0,0 +1,61 @@
<section class="rankings-grid">
<article class="ranking-card">
<header>
<h2>Most supportive</h2>
<span>Top 10</span>
</header>
{% if rankings.supportive %}
<ol class="ranking-list">
{% for row in rankings.supportive %}
{% set next_compare = toggle_compare(compare, row.legislator_id) %}
<li class="{{ 'selected' if row.legislator_id in compare else '' }}">
<a href="{{ build_url(request, compare=next_compare) }}"
hx-get="/partials/dashboard{{ build_url(request, compare=next_compare)|replace('/', '', 1) }}"
hx-target="#dashboard-body"
hx-push-url="{{ build_url(request, compare=next_compare) }}">
<span class="rank">{{ loop.index }}</span>
<strong class="score positive">{{ row.score|round(1) }}</strong>
<span class="member">
<strong>{{ row.display_name }}</strong>
<small>{{ row.state or "US" }}{% if row.party %} · {{ row.party[:1] }}{% endif %}</small>
</span>
<span class="votes">{{ row.total }} rows</span>
</a>
</li>
{% endfor %}
</ol>
{% else %}
<p class="empty-state">{{ empty_message }}</p>
{% endif %}
</article>
<article class="ranking-card">
<header>
<h2>Most opposed</h2>
<span>Bottom 10</span>
</header>
{% if rankings.opposed %}
<ol class="ranking-list">
{% for row in rankings.opposed %}
{% set next_compare = toggle_compare(compare, row.legislator_id) %}
<li class="{{ 'selected' if row.legislator_id in compare else '' }}">
<a href="{{ build_url(request, compare=next_compare) }}"
hx-get="/partials/dashboard{{ build_url(request, compare=next_compare)|replace('/', '', 1) }}"
hx-target="#dashboard-body"
hx-push-url="{{ build_url(request, compare=next_compare) }}">
<span class="rank">{{ loop.index }}</span>
<strong class="score negative">{{ row.score|round(1) }}</strong>
<span class="member">
<strong>{{ row.display_name }}</strong>
<small>{{ row.state or "US" }}{% if row.party %} · {{ row.party[:1] }}{% endif %}</small>
</span>
<span class="votes">{{ row.total }} rows</span>
</a>
</li>
{% endfor %}
</ol>
{% else %}
<p class="empty-state">{{ empty_message }}</p>
{% endif %}
</article>
</section>
+15
View File
@@ -0,0 +1,15 @@
{% extends "base.html" %}
{% block title %}Database Setup Required{% endblock %}
{% block body %}
<main class="shell">
<section class="page-heading stacked-heading">
<div>
<h1>Database setup required</h1>
<p>Configure DATA_SCIENCE_DEV before opening the dashboard.</p>
</div>
</section>
<pre class="setup-error">{{ error }}</pre>
</main>
{% endblock %}
View File
+22
View File
@@ -0,0 +1,22 @@
[project]
name = "ds-testing-pipelines"
version = "0.1.0"
description = "Data science pipeline tools and legislative dashboard."
requires-python = ">=3.12"
dependencies = [
"fastapi",
"httpx",
"uvicorn[standard]",
"jinja2",
"sqlalchemy",
"psycopg",
]
[project.optional-dependencies]
test = [
"pytest",
]
[tool.pytest.ini_options]
testpaths = ["tests"]
pythonpath = ["."]
+36
View File
@@ -0,0 +1,36 @@
from pipelines.orm.data_science_dev.congress import BillText, BillTextSummary
def test_default_summary_prefers_primary_summary() -> None:
primary_summary = BillTextSummary(id=1, bill_text_id=10, summary="primary")
latest_summary = BillTextSummary(id=2, bill_text_id=10, summary="latest")
bill_text = BillText(
id=10,
bill_id=5,
version_code="ih",
summaries=[latest_summary],
primary_summary=primary_summary,
)
assert bill_text.default_summary() is primary_summary
def test_default_summary_falls_back_to_latest_summary() -> None:
latest_summary = BillTextSummary(id=2, bill_text_id=10, summary="latest")
older_summary = BillTextSummary(id=1, bill_text_id=10, summary="older")
bill_text = BillText(
id=10,
bill_id=5,
version_code="ih",
summaries=[latest_summary, older_summary],
)
assert bill_text.latest_summary() is latest_summary
assert bill_text.default_summary() is latest_summary
def test_default_summary_is_none_without_summaries() -> None:
bill_text = BillText(id=10, bill_id=5, version_code="ih")
assert bill_text.latest_summary() is None
assert bill_text.default_summary() is None
+71
View File
@@ -0,0 +1,71 @@
from sqlalchemy.dialects import postgresql
from pipelines.jobs.extract_bill_topics import (
_select_bill_text_for_topic_extraction,
create_select_bills_for_topic_extraction,
)
from pipelines.orm.data_science_dev.congress import Bill, BillText, BillTextSummary
def _compile_sql(statement: object) -> str:
return str(
statement.compile(
dialect=postgresql.dialect(),
compile_kwargs={"literal_binds": True},
)
)
def test_select_bill_text_for_topic_extraction_uses_primary_summary() -> None:
primary_summary = BillTextSummary(id=1, bill_text_id=10, summary="primary")
newest_summary = BillTextSummary(id=2, bill_text_id=10, summary="newest")
bill_text = BillText(
id=10,
bill_id=5,
version_code="ih",
summaries=[newest_summary],
primary_summary=primary_summary,
)
bill = Bill(
id=5,
congress=119,
bill_type="hr",
number=1,
bill_texts=[bill_text],
)
selected = _select_bill_text_for_topic_extraction(bill)
assert selected is bill_text
assert selected.default_summary() is primary_summary
def test_select_bill_text_for_topic_extraction_uses_latest_summary_without_primary() -> None:
newest_summary = BillTextSummary(id=2, bill_text_id=10, summary="newest")
older_summary = BillTextSummary(id=1, bill_text_id=10, summary="older")
bill_text = BillText(
id=10,
bill_id=5,
version_code="ih",
summaries=[newest_summary, older_summary],
)
bill = Bill(
id=5,
congress=119,
bill_type="hr",
number=1,
bill_texts=[bill_text],
)
selected = _select_bill_text_for_topic_extraction(bill)
assert selected is bill_text
assert selected.default_summary() is newest_summary
def test_create_select_bills_for_topic_extraction_uses_summary_exists_subquery() -> None:
sql = _compile_sql(create_select_bills_for_topic_extraction())
assert "bill_text_summary" in sql
assert "EXISTS" in sql
assert "bill_text.summary" not in sql
+58
View File
@@ -0,0 +1,58 @@
from sqlalchemy.dialects import postgresql
from pipelines.jobs.summarize_bills import (
create_select_bill_texts_for_summarization,
store_bill_summary_result,
)
from pipelines.orm.data_science_dev.congress import BillText, BillTextSummary
class FakeSession:
def __init__(self) -> None:
self.added: list[object] = []
def add(self, value: object) -> None:
self.added.append(value)
def _compile_sql(statement: object) -> str:
return str(
statement.compile(
dialect=postgresql.dialect(),
compile_kwargs={"literal_binds": True},
)
)
def test_store_bill_summary_result_creates_summary_row() -> None:
session = FakeSession()
bill_text = BillText(id=10, bill_id=5, version_code="ih")
summary_row = store_bill_summary_result(
session=session,
bill_text=bill_text,
summary="A summary",
model="gpt-5.4-mini",
)
assert session.added == [summary_row]
assert isinstance(summary_row, BillTextSummary)
assert summary_row.bill_text is bill_text
assert summary_row.summary == "A summary"
assert summary_row.summarization_model == "gpt-5.4-mini"
assert summary_row.summarization_system_prompt_version == "v1.2"
assert summary_row.summarization_user_prompt_version == "v1"
def test_create_select_bill_texts_for_summarization_excludes_existing_summaries() -> None:
sql = _compile_sql(create_select_bill_texts_for_summarization(force=False))
assert "bill_text_summary" in sql
assert "NOT (EXISTS" in sql or "NOT EXISTS" in sql
assert "bill_text.summary" not in sql
def test_create_select_bill_texts_for_summarization_force_skips_summary_filter() -> None:
sql = _compile_sql(create_select_bill_texts_for_summarization(force=True))
assert "bill_text_summary" not in sql