Files
weave/pipelines/web/main.py
2026-04-28 22:50:53 -04:00

590 lines
20 KiB
Python

"""FastAPI app for the HTMX legislative dashboard."""
from __future__ import annotations
from contextlib import asynccontextmanager
from dataclasses import dataclass
import hashlib
import hmac
from os import getenv
from pathlib import Path
import secrets
from typing import Any
from urllib.parse import parse_qs
from fastapi import Depends, FastAPI, HTTPException, Request, Response, status
from fastapi.responses import HTMLResponse, PlainTextResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pipelines.web import repository
from pipelines.web.db import session_scope, validate_database_connection
from pipelines.web.repository import Chamber, RankingResult
from pipelines.web.scoring import normalize_issues
from pipelines.web.svg import render_compare_radar_svg, render_score_history_svg
BASE_DIR = Path(__file__).resolve().parent
TEMPLATES_DIR = BASE_DIR / "templates"
STATIC_DIR = BASE_DIR / "static"
templates = Jinja2Templates(directory=TEMPLATES_DIR)
ADMIN_USERNAME = "admin"
ADMIN_PASSWORD = "admin"
SESSION_COOKIE = "nornsight_admin"
SESSION_SECRET = "nornsight-local-dev-session-secret"
@asynccontextmanager
async def lifespan(_: FastAPI):
"""Validate database access when the CLI starts the web server."""
if getenv("PYTEST_CURRENT_TEST") is None:
validate_database_connection()
yield
app = FastAPI(title="Nornsight Legislative Dashboard", lifespan=lifespan)
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
@dataclass(frozen=True)
class DashboardState:
"""Dashboard query-string state."""
issues: list[str]
chamber: Chamber
congress: int | None
compare: list[int]
@app.get("/healthz", response_class=PlainTextResponse)
def healthz() -> str:
"""Return a simple liveness response."""
return "ok"
@app.get("/login", response_class=HTMLResponse)
def login(request: Request) -> Response:
"""Render the integrated login page."""
next_path = _safe_next_path(request.query_params.get("next"))
if _authenticated_user(request) is not None:
return RedirectResponse(next_path, status_code=status.HTTP_303_SEE_OTHER)
return templates.TemplateResponse(
request,
"login.html",
{
"error": "",
"is_authenticated": False,
"show_primary_nav": False,
"next_path": next_path,
"username": "",
},
)
@app.post("/login", response_class=HTMLResponse)
async def login_submit(request: Request) -> Response:
"""Authenticate the hard-coded admin user and set a session cookie."""
form = parse_qs((await request.body()).decode())
username = form.get("username", [""])[0]
password = form.get("password", [""])[0]
next_path = _safe_next_path(form.get("next", [request.query_params.get("next")])[0])
username_ok = secrets.compare_digest(username, ADMIN_USERNAME)
password_ok = secrets.compare_digest(password, ADMIN_PASSWORD)
if not (username_ok and password_ok):
return templates.TemplateResponse(
request,
"login.html",
{
"error": "Invalid username or password.",
"is_authenticated": False,
"show_primary_nav": False,
"next_path": next_path,
"username": username,
},
status_code=status.HTTP_401_UNAUTHORIZED,
)
response = RedirectResponse(next_path, status_code=status.HTTP_303_SEE_OTHER)
response.set_cookie(
SESSION_COOKIE,
_sign_session(username),
httponly=True,
samesite="lax",
)
return response
@app.get("/logout")
def logout() -> Response:
"""Clear the local admin session."""
response = RedirectResponse("/login", status_code=status.HTTP_303_SEE_OTHER)
response.delete_cookie(SESSION_COOKIE)
return response
def require_admin(request: Request) -> str:
"""Redirect unauthenticated users to the in-site login page."""
username = _authenticated_user(request)
if username is not None:
return username
next_path = request.url.path
if request.url.query:
next_path = f"{next_path}?{request.url.query}"
login_url = request.url_for("login").include_query_params(next=next_path)
raise HTTPException(
status_code=status.HTTP_303_SEE_OTHER,
headers={"Location": str(login_url)},
)
def _authenticated_user(request: Request) -> str | None:
token = request.cookies.get(SESSION_COOKIE)
if token is None:
return None
try:
username, signature = token.split(":", 1)
except ValueError:
return None
if username != ADMIN_USERNAME:
return None
expected = _session_signature(username)
if secrets.compare_digest(signature, expected):
return username
return None
def _sign_session(username: str) -> str:
return f"{username}:{_session_signature(username)}"
def _session_signature(username: str) -> str:
return hmac.new(
SESSION_SECRET.encode(),
username.encode(),
hashlib.sha256,
).hexdigest()
def _safe_next_path(value: str | None) -> str:
if value and value.startswith("/") and not value.startswith("//"):
return value
return "/"
@app.get("/", response_class=HTMLResponse)
def dashboard(request: Request, _: str = Depends(require_admin)) -> Response:
"""Render the full dashboard page."""
context = _dashboard_context(request)
if request.headers.get("hx-request") == "true":
return templates.TemplateResponse(request, "partials/_dashboard.html", context)
return templates.TemplateResponse(request, "dashboard.html", context)
@app.get("/partials/dashboard", response_class=HTMLResponse)
def dashboard_partial(request: Request, _: str = Depends(require_admin)) -> Response:
"""Render the filter-dependent dashboard body."""
context = _dashboard_context(request)
return templates.TemplateResponse(request, "partials/_dashboard.html", context)
@app.get("/partials/issues", response_class=HTMLResponse)
def issues_partial(request: Request, _: str = Depends(require_admin)) -> Response:
"""Render only issue filters."""
context = _dashboard_context(request)
return templates.TemplateResponse(request, "partials/_issue_filters.html", context)
@app.get("/partials/rankings", response_class=HTMLResponse)
def rankings_partial(request: Request, _: str = Depends(require_admin)) -> Response:
"""Render only ranking panels."""
context = _dashboard_context(request)
return templates.TemplateResponse(request, "partials/_rankings.html", context)
@app.get("/partials/chart", response_class=HTMLResponse)
def chart_partial(request: Request, _: str = Depends(require_admin)) -> Response:
"""Render only the SVG chart panel."""
context = _dashboard_context(request)
return templates.TemplateResponse(request, "partials/_chart.html", context)
@app.get("/legislators", response_class=HTMLResponse)
def legislators(request: Request, _: str = Depends(require_admin)) -> Response:
"""Render the legislator profile/search page."""
context = _legislators_context(request)
return templates.TemplateResponse(request, "legislators.html", context)
@app.get("/partials/legislator-suggestions", response_class=HTMLResponse)
def legislator_suggestions_partial(
request: Request, _: str = Depends(require_admin)
) -> Response:
"""Render legislator search suggestions for the HTMX typeahead."""
query = request.query_params.get("q", "").strip()
context: dict[str, Any] = {
"q": query if len(query) >= 2 else "",
"matches": [],
"build_legislator_url": _build_legislator_url,
}
if len(query) >= 2:
with session_scope() as session:
context["matches"] = repository.search_legislators(
session, query=query, limit=8
)
return templates.TemplateResponse(
request, "partials/_legislator_suggestions.html", context
)
@app.get("/compare", response_class=HTMLResponse)
def compare(request: Request, _: str = Depends(require_admin)) -> Response:
"""Render the legislator radar comparison page."""
context = _compare_context(request)
return templates.TemplateResponse(request, "compare.html", context)
def _dashboard_context(request: Request) -> dict[str, Any]:
state = _parse_state(request)
base_context: dict[str, Any] = {
"state": state,
"issues": state.issues,
"selected_issue_label": " + ".join(state.issues) if state.issues else "",
"chamber": state.chamber,
"congress": state.congress,
"latest_score_year": None,
"last_updated": None,
"suggestions": [],
"rankings": RankingResult(supportive=[], opposed=[]),
"compare": [],
"chart_svg": render_score_history_svg([]),
"chart_series": [],
"has_votes": False,
"has_scores": False,
"empty_message": "",
"build_url": _build_url,
"toggle_compare": _toggle_compare,
}
with session_scope() as session:
congress = state.congress or repository.latest_congress(session)
base_context["congress"] = congress
base_context["has_scores"] = repository.has_scores(session)
base_context["latest_score_year"] = repository.latest_score_year(session)
base_context["last_updated"] = repository.latest_vote_date(session, congress)
base_context["suggestions"] = repository.issue_suggestions(
session, congress=congress
)
if not base_context["has_scores"]:
base_context["empty_message"] = (
"No legislator scores are loaded yet. Run the score calculator first."
)
return base_context
if congress is None:
base_context["congress"] = "Computed"
if not state.issues:
base_context["empty_message"] = (
"Choose one or more issue areas to calculate roll-call support scores."
)
return base_context
rankings = repository.get_rankings(
session,
issues=state.issues,
chamber=state.chamber,
congress=congress,
)
base_context["rankings"] = rankings
compare = state.compare or [row.legislator_id for row in rankings.supportive[:2]]
base_context["compare"] = compare
if not rankings.supportive and not rankings.opposed:
base_context["empty_message"] = "No matching roll-call votes."
return base_context
history = repository.get_score_history(
session,
issues=state.issues,
chamber=state.chamber,
congress=congress,
legislator_ids=compare,
)
base_context["chart_series"] = history
base_context["chart_svg"] = render_score_history_svg(history)
return base_context
def _parse_state(request: Request) -> DashboardState:
query = request.query_params
chamber = query.get("chamber", "senate").lower()
if chamber not in {"house", "senate", "all"}:
chamber = "senate"
congress = _parse_int(query.get("congress"))
compare = [
value
for value in (_parse_int(raw) for raw in query.getlist("compare"))
if value is not None
]
return DashboardState(
issues=normalize_issues(query.getlist("issues")),
chamber=chamber, # type: ignore[arg-type]
congress=congress,
compare=compare,
)
def _legislators_context(request: Request) -> dict[str, Any]:
query = request.query_params.get("q", "").strip()
legislator_id = _parse_int(request.query_params.get("legislator_id"))
selected_topic = request.query_params.get("topic", "").strip()
per_page = _parse_per_page(request.query_params.get("per_page"))
page = max(_parse_int(request.query_params.get("page")) or 1, 1)
base_context: dict[str, Any] = {
"q": query,
"profile": None,
"matches": [],
"result_count": 0,
"page": page,
"per_page": per_page,
"per_page_options": [10, 25, 50],
"total_pages": 1,
"previous_page": None,
"next_page": None,
"selected_topic": selected_topic,
"history_svg": render_score_history_svg([]),
"history_series": [],
"build_legislator_url": _build_legislator_url,
"build_legislator_search_url": _build_legislator_search_url,
}
with session_scope() as session:
result_count = repository.count_legislators(session, query=query) if query else 0
total_pages = max((result_count + per_page - 1) // per_page, 1)
if page > total_pages:
page = total_pages
base_context["page"] = page
matches = (
repository.search_legislators(
session,
query=query,
limit=per_page,
offset=(page - 1) * per_page,
)
if query
else []
)
profile = repository.get_legislator_profile(
session, legislator_id=legislator_id, query=None
)
base_context["profile"] = profile
base_context["matches"] = matches
base_context["result_count"] = result_count
base_context["total_pages"] = total_pages
base_context["previous_page"] = page - 1 if page > 1 else None
base_context["next_page"] = page + 1 if page < total_pages else None
if profile is None:
return base_context
if not selected_topic:
if profile.bottom_topics:
selected_topic = profile.bottom_topics[0].topic
elif profile.top_topics:
selected_topic = profile.top_topics[0].topic
base_context["selected_topic"] = selected_topic
if selected_topic:
history = repository.get_single_legislator_history(
session,
legislator_id=profile.legislator.legislator_id,
topic=selected_topic,
)
base_context["history_series"] = history
base_context["history_svg"] = render_score_history_svg(history)
return base_context
def _compare_context(request: Request) -> dict[str, Any]:
selected_legislators = _parse_int_list(
request.query_params.getlist("legislator_id")
or request.query_params.getlist("compare")
)[:4]
topics = normalize_issues(
request.query_params.getlist("topic") or request.query_params.getlist("issues")
)[:8]
query = request.query_params.get("q", "").strip()
base_context: dict[str, Any] = {
"selected_legislators": selected_legislators,
"selected_legislator_options": [],
"topics": topics,
"q": query,
"series": [],
"radar_svg": render_compare_radar_svg([], []),
"legislator_options": [],
"topic_options": [],
"build_compare_url": _build_compare_url,
}
with session_scope() as session:
default_legislators, default_topics = repository.get_compare_defaults(session)
if not selected_legislators and not query:
selected_legislators = default_legislators[:3]
if not topics:
topics = default_topics[:6]
selected_legislator_options = repository.get_legislator_options(
session, selected_legislators
)
series = repository.get_compare_radar_series(
session, legislator_ids=selected_legislators, topics=topics
)
base_context.update(
{
"selected_legislators": selected_legislators,
"selected_legislator_options": selected_legislator_options,
"topics": topics,
"q": query,
"series": series,
"radar_svg": render_compare_radar_svg(topics, series),
"legislator_options": repository.search_legislators(
session, query=query or None, limit=12
),
"topic_options": repository.issue_suggestions(
session, congress=None, limit=12
),
}
)
return base_context
def _parse_int(value: str | None) -> int | None:
if value is None or value == "":
return None
try:
return int(value)
except ValueError:
return None
def _parse_int_list(values: list[str]) -> list[int]:
parsed: list[int] = []
seen: set[int] = set()
for value in values:
integer = _parse_int(value)
if integer is not None and integer not in seen:
parsed.append(integer)
seen.add(integer)
return parsed
def _parse_per_page(value: str | None) -> int:
parsed = _parse_int(value)
return parsed if parsed in {10, 25, 50} else 10
def _build_url(
request: Request,
*,
issues: list[str] | None = None,
chamber: str | None = None,
congress: int | None = None,
compare: list[int] | None = None,
) -> str:
params: list[tuple[str, str]] = []
chosen_issues = (
issues
if issues is not None
else normalize_issues(request.query_params.getlist("issues"))
)
chosen_chamber = (
chamber
if chamber is not None
else request.query_params.get("chamber", "senate")
)
chosen_congress = (
congress
if congress is not None
else _parse_int(request.query_params.get("congress"))
)
chosen_compare = (
compare
if compare is not None
else [
value
for value in (
_parse_int(raw) for raw in request.query_params.getlist("compare")
)
if value is not None
]
)
for issue in chosen_issues:
params.append(("issues", issue))
params.append(("chamber", chosen_chamber))
if chosen_congress is not None:
params.append(("congress", str(chosen_congress)))
for legislator_id in chosen_compare:
params.append(("compare", str(legislator_id)))
if not params:
return "/"
from urllib.parse import urlencode
return f"/?{urlencode(params, doseq=True)}"
def _toggle_compare(compare: list[int], legislator_id: int) -> list[int]:
"""Return compare IDs with the legislator added or removed."""
if legislator_id in compare:
return [value for value in compare if value != legislator_id]
return [*compare, legislator_id]
def _build_legislator_url(
*,
legislator_id: int | None = None,
q: str | None = None,
topic: str | None = None,
per_page: int | None = None,
) -> str:
from urllib.parse import urlencode
params: list[tuple[str, str]] = []
if legislator_id is not None:
params.append(("legislator_id", str(legislator_id)))
if q:
params.append(("q", q))
if topic:
params.append(("topic", topic))
if per_page in {10, 25, 50} and per_page != 10:
params.append(("per_page", str(per_page)))
return f"/legislators?{urlencode(params)}" if params else "/legislators"
def _build_legislator_search_url(
*,
q: str,
per_page: int,
page: int = 1,
) -> str:
from urllib.parse import urlencode
params: list[tuple[str, str]] = []
if q:
params.append(("q", q))
params.append(("per_page", str(per_page)))
if page > 1:
params.append(("page", str(page)))
return f"/legislators?{urlencode(params)}" if params else "/legislators"
def _build_compare_url(
*,
legislator_ids: list[int],
topics: list[str],
q: str | None = None,
) -> str:
from urllib.parse import urlencode
params: list[tuple[str, str]] = []
for legislator_id in legislator_ids[:4]:
params.append(("legislator_id", str(legislator_id)))
for topic in topics[:8]:
params.append(("topic", topic))
if q:
params.append(("q", q))
return f"/compare?{urlencode(params, doseq=True)}" if params else "/compare"