Files
2026-04-28 22:50:53 -04:00

671 lines
19 KiB
Python

"""Congress database queries for the web dashboard."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import date
from typing import Literal
from sqlalchemy import ColumnElement, Select, case, desc, false, func, or_, select, true
from sqlalchemy.orm import Session
from pipelines.orm.data_science_dev.congress import (
BillTopic,
Legislator,
LegislatorScore,
Vote,
)
from pipelines.web.scoring import normalize_issues
Chamber = Literal["house", "senate", "all"]
STATE_ALIASES = {
"alabama": "AL",
"alaska": "AK",
"arizona": "AZ",
"arkansas": "AR",
"california": "CA",
"colorado": "CO",
"connecticut": "CT",
"delaware": "DE",
"florida": "FL",
"georgia": "GA",
"hawaii": "HI",
"idaho": "ID",
"illinois": "IL",
"indiana": "IN",
"iowa": "IA",
"kansas": "KS",
"kentucky": "KY",
"louisiana": "LA",
"maine": "ME",
"maryland": "MD",
"massachusetts": "MA",
"michigan": "MI",
"minnesota": "MN",
"mississippi": "MS",
"missouri": "MO",
"montana": "MT",
"nebraska": "NE",
"nevada": "NV",
"new hampshire": "NH",
"new jersey": "NJ",
"new mexico": "NM",
"new york": "NY",
"north carolina": "NC",
"north dakota": "ND",
"ohio": "OH",
"oklahoma": "OK",
"oregon": "OR",
"pennsylvania": "PA",
"rhode island": "RI",
"south carolina": "SC",
"south dakota": "SD",
"tennessee": "TN",
"texas": "TX",
"utah": "UT",
"vermont": "VT",
"virginia": "VA",
"washington": "WA",
"west virginia": "WV",
"wisconsin": "WI",
"wyoming": "WY",
"district of columbia": "DC",
}
@dataclass(frozen=True)
class RankingRow:
"""A legislator support score row."""
legislator_id: int
display_name: str
party: str | None
state: str | None
chamber: str | None
score: float | None
supportive: int
opposed: int
@property
def total(self) -> int:
return self.supportive + self.opposed
@dataclass(frozen=True)
class RankingResult:
"""Supportive and opposed ranking lists."""
supportive: list[RankingRow]
opposed: list[RankingRow]
@dataclass(frozen=True)
class TimePoint:
"""One yearly chart point."""
year: int
score: float
@dataclass(frozen=True)
class ChartSeries:
"""One legislator score-history series."""
legislator_id: int
label: str
party: str | None
state: str | None
points: list[TimePoint]
@dataclass(frozen=True)
class TopicScore:
"""Average score for one topic."""
topic: str
score: float
count: int
@dataclass(frozen=True)
class LegislatorOption:
"""Compact legislator metadata for search and comparison controls."""
legislator_id: int
display_name: str
party: str | None
state: str | None
chamber: str | None
@dataclass(frozen=True)
class LegislatorProfile:
"""Legislator metadata plus issue score summary."""
legislator: LegislatorOption
overall_score: float | None
serving_since: int | None
top_topics: list[TopicScore]
bottom_topics: list[TopicScore]
@dataclass(frozen=True)
class RadarSeries:
"""One legislator polygon for the compare radar chart."""
legislator: LegislatorOption
average_score: float | None
scores_by_topic: dict[str, float]
def latest_congress(session: Session) -> int | None:
"""Return the latest congress number in the vote table."""
return session.scalar(select(func.max(Vote.congress)))
def latest_vote_date(session: Session, congress: int | None = None) -> date | None:
"""Return the most recent vote date, optionally scoped to a congress."""
stmt = select(func.max(Vote.vote_date))
if congress is not None:
stmt = stmt.where(Vote.congress == congress)
return session.scalar(stmt)
def latest_score_year(session: Session) -> int | None:
"""Return the latest year in the precomputed legislator score table."""
return session.scalar(select(func.max(LegislatorScore.year)))
def has_scores(session: Session) -> bool:
"""Return True when the database has at least one precomputed score."""
return session.scalar(select(LegislatorScore.id).limit(1)) is not None
def issue_suggestions(
session: Session,
*,
congress: int | None,
limit: int = 12,
) -> list[str]:
"""Return common precomputed score topics for issue filter suggestions."""
stmt = (
select(LegislatorScore.topic, func.count(LegislatorScore.id).label("score_count"))
.where(LegislatorScore.topic != "")
.group_by(LegislatorScore.topic)
.order_by(desc("score_count"), LegislatorScore.topic)
.limit(limit)
)
suggestions = [row[0] for row in session.execute(stmt).all()]
if suggestions:
return suggestions
fallback = (
select(BillTopic.topic, func.count(BillTopic.id).label("topic_count"))
.where(BillTopic.topic != "")
.group_by(BillTopic.topic)
.order_by(desc("topic_count"), BillTopic.topic)
.limit(limit)
)
return [row[0] for row in session.execute(fallback).all()]
def ranking_query(
*,
issues: list[str],
chamber: Chamber,
congress: int,
) -> Select:
"""Build the aggregate ranking query from precomputed scores."""
average_score = func.avg(LegislatorScore.score).label("score")
supportive = func.sum(case((LegislatorScore.score >= 50, 1), else_=0)).label(
"supportive"
)
opposed = func.sum(case((LegislatorScore.score < 50, 1), else_=0)).label("opposed")
stmt = (
select(
Legislator.id,
Legislator.official_full_name,
Legislator.last_name,
Legislator.current_party,
Legislator.current_state,
Legislator.current_chamber,
average_score,
supportive,
opposed,
)
.join(LegislatorScore, LegislatorScore.legislator_id == Legislator.id)
.where(_score_topic_match_condition(issues))
.group_by(
Legislator.id,
Legislator.official_full_name,
Legislator.last_name,
Legislator.current_party,
Legislator.current_state,
Legislator.current_chamber,
)
)
if chamber != "all":
stmt = stmt.where(Legislator.current_chamber == _db_chamber(chamber))
return stmt
def get_rankings(
session: Session,
*,
issues: list[str],
chamber: Chamber,
congress: int,
limit: int = 10,
) -> RankingResult:
"""Return top supportive and opposed legislators from precomputed scores."""
rows = [
_ranking_row(row)
for row in session.execute(
ranking_query(issues=issues, chamber=chamber, congress=congress)
)
]
scored = [row for row in rows if row.score is not None]
supportive = sorted(
scored, key=lambda row: (-float(row.score), -row.total, row.display_name)
)[:limit]
opposed = sorted(
scored, key=lambda row: (float(row.score), -row.total, row.display_name)
)[:limit]
return RankingResult(supportive=supportive, opposed=opposed)
def get_score_history(
session: Session,
*,
issues: list[str],
chamber: Chamber,
congress: int,
legislator_ids: list[int],
) -> list[ChartSeries]:
"""Return yearly score history from precomputed scores."""
if not legislator_ids:
return []
average_score = func.avg(LegislatorScore.score).label("score")
stmt = (
select(
Legislator.id,
Legislator.official_full_name,
Legislator.last_name,
Legislator.current_party,
Legislator.current_state,
LegislatorScore.year,
average_score,
)
.join(LegislatorScore, LegislatorScore.legislator_id == Legislator.id)
.where(
Legislator.id.in_(legislator_ids),
_score_topic_match_condition(issues),
)
.group_by(
Legislator.id,
Legislator.official_full_name,
Legislator.last_name,
Legislator.current_party,
Legislator.current_state,
LegislatorScore.year,
)
.order_by(Legislator.id, LegislatorScore.year)
)
if chamber != "all":
stmt = stmt.where(Legislator.current_chamber == _db_chamber(chamber))
by_legislator: dict[int, ChartSeries] = {}
for row in session.execute(stmt):
if row.score is None:
continue
series = by_legislator.setdefault(
row.id,
ChartSeries(
legislator_id=row.id,
label=_display_name(row.official_full_name, row.last_name),
party=row.current_party,
state=row.current_state,
points=[],
),
)
series.points.append(TimePoint(year=int(row.year), score=float(row.score)))
return list(by_legislator.values())
def _ranking_row(row: object) -> RankingRow:
return RankingRow(
legislator_id=row.id,
display_name=_display_name(row.official_full_name, row.last_name),
party=row.current_party,
state=row.current_state,
chamber=row.current_chamber,
score=float(row.score) if row.score is not None else None,
supportive=row.supportive or 0,
opposed=row.opposed or 0,
)
def _score_topic_match_condition(
issues: list[str] | tuple[str, ...],
) -> ColumnElement[bool]:
normalized = normalize_issues(list(issues))
if not normalized:
return false()
return or_(*(LegislatorScore.topic.ilike(f"%{issue}%") for issue in normalized))
def search_legislators(
session: Session,
*,
query: str | None = None,
limit: int = 12,
offset: int = 0,
) -> list[LegislatorOption]:
"""Search ingested legislators, preferring those with computed scores."""
return [
_legislator_option(row)
for row in session.execute(
legislator_search_query(query=query, limit=limit, offset=offset)
)
]
def count_legislators(session: Session, *, query: str | None = None) -> int:
"""Return the total number of legislators matching a search query."""
return int(
session.scalar(
select(func.count(Legislator.id)).where(_legislator_search_condition(query))
)
or 0
)
def get_legislator_options(
session: Session, legislator_ids: list[int]
) -> list[LegislatorOption]:
"""Return legislator options in the same order as the selected IDs."""
options = {
option.legislator_id: option
for option in (
_get_legislator_option(session, legislator_id)
for legislator_id in legislator_ids
)
if option is not None
}
return [
options[legislator_id]
for legislator_id in legislator_ids
if legislator_id in options
]
def legislator_search_query(
*,
query: str | None = None,
limit: int = 12,
offset: int = 0,
) -> Select:
"""Build the legislator search query used by profile and compare controls."""
score_count = func.count(LegislatorScore.id).label("score_count")
stmt = (
select(
Legislator.id,
Legislator.official_full_name,
Legislator.last_name,
Legislator.current_party,
Legislator.current_state,
Legislator.current_chamber,
score_count,
)
.outerjoin(LegislatorScore, LegislatorScore.legislator_id == Legislator.id)
.group_by(
Legislator.id,
Legislator.official_full_name,
Legislator.first_name,
Legislator.last_name,
Legislator.current_party,
Legislator.current_state,
Legislator.current_chamber,
Legislator.bioguide_id,
)
.order_by(desc("score_count"), Legislator.last_name, Legislator.first_name)
.limit(limit)
.offset(offset)
)
return stmt.where(_legislator_search_condition(query))
def _legislator_search_condition(query: str | None) -> ColumnElement[bool]:
cleaned_query = query.strip() if query else ""
if not cleaned_query:
return true()
pattern = f"%{cleaned_query}%"
state_alias = _state_alias(cleaned_query)
conditions: list[ColumnElement[bool]] = [
Legislator.official_full_name.ilike(pattern),
Legislator.first_name.ilike(pattern),
Legislator.last_name.ilike(pattern),
Legislator.current_state.ilike(pattern),
Legislator.bioguide_id.ilike(pattern),
]
if state_alias is not None:
conditions.append(Legislator.current_state == state_alias)
return or_(*conditions)
def _state_alias(query: str) -> str | None:
normalized = " ".join(query.lower().replace(".", "").split())
if len(normalized) == 2 and normalized.isalpha():
return normalized.upper()
return STATE_ALIASES.get(normalized)
def get_legislator_profile(
session: Session,
*,
legislator_id: int | None = None,
query: str | None = None,
) -> LegislatorProfile | None:
"""Return the selected legislator profile and top/bottom topic scores."""
selected = _get_legislator_option(session, legislator_id)
cleaned_query = query.strip() if query else ""
if selected is None and cleaned_query:
matches = search_legislators(session, query=query, limit=1)
selected = matches[0] if matches else None
if selected is None:
return None
topic_scores = get_legislator_topic_scores(
session, legislator_id=selected.legislator_id
)
top_topics = sorted(topic_scores, key=lambda item: (-item.score, item.topic))[:3]
bottom_topics = sorted(topic_scores, key=lambda item: (item.score, item.topic))[:3]
overall_score = session.scalar(
select(func.avg(LegislatorScore.score)).where(
LegislatorScore.legislator_id == selected.legislator_id
)
)
serving_since = session.scalar(
select(func.min(LegislatorScore.year)).where(
LegislatorScore.legislator_id == selected.legislator_id
)
)
return LegislatorProfile(
legislator=selected,
overall_score=float(overall_score) if overall_score is not None else None,
serving_since=int(serving_since) if serving_since is not None else None,
top_topics=top_topics,
bottom_topics=bottom_topics,
)
def get_legislator_topic_scores(
session: Session,
*,
legislator_id: int,
) -> list[TopicScore]:
"""Return all average topic scores for one legislator."""
rows = session.execute(
select(
LegislatorScore.topic,
func.avg(LegislatorScore.score).label("score"),
func.count(LegislatorScore.id).label("count"),
)
.where(LegislatorScore.legislator_id == legislator_id)
.group_by(LegislatorScore.topic)
.order_by(LegislatorScore.topic)
)
return [
TopicScore(topic=row.topic, score=float(row.score), count=row.count)
for row in rows
if row.score is not None
]
def get_single_legislator_history(
session: Session,
*,
legislator_id: int,
topic: str,
) -> list[ChartSeries]:
"""Return score history for one legislator/topic pair."""
option = _get_legislator_option(session, legislator_id)
if option is None:
return []
rows = session.execute(
select(
LegislatorScore.year,
func.avg(LegislatorScore.score).label("score"),
)
.where(
LegislatorScore.legislator_id == legislator_id,
LegislatorScore.topic == topic,
)
.group_by(LegislatorScore.year)
.order_by(LegislatorScore.year)
)
points = [
TimePoint(year=int(row.year), score=float(row.score))
for row in rows
if row.score is not None
]
return [
ChartSeries(
legislator_id=option.legislator_id,
label=option.display_name,
party=option.party,
state=option.state,
points=points,
)
]
def get_compare_defaults(session: Session) -> tuple[list[int], list[str]]:
"""Return default compare legislators and topics."""
legislators = search_legislators(session, limit=3)
topics = issue_suggestions(session, congress=None, limit=6)
return [item.legislator_id for item in legislators], topics
def get_compare_radar_series(
session: Session,
*,
legislator_ids: list[int],
topics: list[str],
) -> list[RadarSeries]:
"""Return radar chart scores for selected legislators and topics."""
if not legislator_ids:
return []
options = {
option.legislator_id: option
for option in (
_get_legislator_option(session, legislator_id)
for legislator_id in legislator_ids
)
if option is not None
}
if not options:
return []
scores: dict[int, dict[str, float]] = {
legislator_id: {} for legislator_id in options
}
if topics:
rows = session.execute(
select(
LegislatorScore.legislator_id,
LegislatorScore.topic,
func.avg(LegislatorScore.score).label("score"),
)
.where(
LegislatorScore.legislator_id.in_(list(options)),
LegislatorScore.topic.in_(topics),
)
.group_by(LegislatorScore.legislator_id, LegislatorScore.topic)
)
for row in rows:
scores[row.legislator_id][row.topic] = float(row.score)
series: list[RadarSeries] = []
for legislator_id in legislator_ids:
option = options.get(legislator_id)
if option is None:
continue
topic_scores = scores.get(legislator_id, {})
values = list(topic_scores.values())
series.append(
RadarSeries(
legislator=option,
average_score=sum(values) / len(values) if values else None,
scores_by_topic=topic_scores,
)
)
return series
def _display_name(official_full_name: str | None, last_name: str | None) -> str:
if official_full_name:
parts = official_full_name.split()
if len(parts) > 1:
return f"{parts[-1]}, {' '.join(parts[:-1])}"
return official_full_name
return last_name or "Unknown"
def _legislator_option(row: object) -> LegislatorOption:
return LegislatorOption(
legislator_id=row.id,
display_name=_display_name(row.official_full_name, row.last_name),
party=row.current_party,
state=row.current_state,
chamber=row.current_chamber,
)
def _get_legislator_option(
session: Session, legislator_id: int | None
) -> LegislatorOption | None:
if legislator_id is None:
return None
row = session.execute(
select(
Legislator.id,
Legislator.official_full_name,
Legislator.last_name,
Legislator.current_party,
Legislator.current_state,
Legislator.current_chamber,
).where(Legislator.id == legislator_id)
).first()
return _legislator_option(row) if row is not None else None
def _db_chamber(chamber: Chamber) -> str:
return {"house": "House", "senate": "Senate", "all": "all"}[chamber]