"""FastAPI app for the HTMX legislative dashboard.""" from __future__ import annotations from contextlib import asynccontextmanager from dataclasses import dataclass import hashlib import hmac from os import getenv from pathlib import Path import secrets from typing import Any from urllib.parse import parse_qs from fastapi import Depends, FastAPI, HTTPException, Request, Response, status from fastapi.responses import HTMLResponse, PlainTextResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pipelines.web import repository from pipelines.web.db import session_scope, validate_database_connection from pipelines.web.repository import Chamber, RankingResult from pipelines.web.scoring import normalize_issues from pipelines.web.svg import render_compare_radar_svg, render_score_history_svg BASE_DIR = Path(__file__).resolve().parent TEMPLATES_DIR = BASE_DIR / "templates" STATIC_DIR = BASE_DIR / "static" templates = Jinja2Templates(directory=TEMPLATES_DIR) ADMIN_USERNAME = "admin" ADMIN_PASSWORD = "admin" SESSION_COOKIE = "nornsight_admin" SESSION_SECRET = "nornsight-local-dev-session-secret" @asynccontextmanager async def lifespan(_: FastAPI): """Validate database access when the CLI starts the web server.""" if getenv("PYTEST_CURRENT_TEST") is None: validate_database_connection() yield app = FastAPI(title="Nornsight Legislative Dashboard", lifespan=lifespan) app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") @dataclass(frozen=True) class DashboardState: """Dashboard query-string state.""" issues: list[str] chamber: Chamber congress: int | None compare: list[int] @app.get("/healthz", response_class=PlainTextResponse) def healthz() -> str: """Return a simple liveness response.""" return "ok" @app.get("/login", response_class=HTMLResponse) def login(request: Request) -> Response: """Render the integrated login page.""" next_path = _safe_next_path(request.query_params.get("next")) if _authenticated_user(request) is not None: return RedirectResponse(next_path, status_code=status.HTTP_303_SEE_OTHER) return templates.TemplateResponse( request, "login.html", { "error": "", "is_authenticated": False, "show_primary_nav": False, "next_path": next_path, "username": "", }, ) @app.post("/login", response_class=HTMLResponse) async def login_submit(request: Request) -> Response: """Authenticate the hard-coded admin user and set a session cookie.""" form = parse_qs((await request.body()).decode()) username = form.get("username", [""])[0] password = form.get("password", [""])[0] next_path = _safe_next_path(form.get("next", [request.query_params.get("next")])[0]) username_ok = secrets.compare_digest(username, ADMIN_USERNAME) password_ok = secrets.compare_digest(password, ADMIN_PASSWORD) if not (username_ok and password_ok): return templates.TemplateResponse( request, "login.html", { "error": "Invalid username or password.", "is_authenticated": False, "show_primary_nav": False, "next_path": next_path, "username": username, }, status_code=status.HTTP_401_UNAUTHORIZED, ) response = RedirectResponse(next_path, status_code=status.HTTP_303_SEE_OTHER) response.set_cookie( SESSION_COOKIE, _sign_session(username), httponly=True, samesite="lax", ) return response @app.get("/logout") def logout() -> Response: """Clear the local admin session.""" response = RedirectResponse("/login", status_code=status.HTTP_303_SEE_OTHER) response.delete_cookie(SESSION_COOKIE) return response def require_admin(request: Request) -> str: """Redirect unauthenticated users to the in-site login page.""" username = _authenticated_user(request) if username is not None: return username next_path = request.url.path if request.url.query: next_path = f"{next_path}?{request.url.query}" login_url = request.url_for("login").include_query_params(next=next_path) raise HTTPException( status_code=status.HTTP_303_SEE_OTHER, headers={"Location": str(login_url)}, ) def _authenticated_user(request: Request) -> str | None: token = request.cookies.get(SESSION_COOKIE) if token is None: return None try: username, signature = token.split(":", 1) except ValueError: return None if username != ADMIN_USERNAME: return None expected = _session_signature(username) if secrets.compare_digest(signature, expected): return username return None def _sign_session(username: str) -> str: return f"{username}:{_session_signature(username)}" def _session_signature(username: str) -> str: return hmac.new( SESSION_SECRET.encode(), username.encode(), hashlib.sha256, ).hexdigest() def _safe_next_path(value: str | None) -> str: if value and value.startswith("/") and not value.startswith("//"): return value return "/" @app.get("/", response_class=HTMLResponse) def dashboard(request: Request, _: str = Depends(require_admin)) -> Response: """Render the full dashboard page.""" context = _dashboard_context(request) if request.headers.get("hx-request") == "true": return templates.TemplateResponse(request, "partials/_dashboard.html", context) return templates.TemplateResponse(request, "dashboard.html", context) @app.get("/partials/dashboard", response_class=HTMLResponse) def dashboard_partial(request: Request, _: str = Depends(require_admin)) -> Response: """Render the filter-dependent dashboard body.""" context = _dashboard_context(request) return templates.TemplateResponse(request, "partials/_dashboard.html", context) @app.get("/partials/issues", response_class=HTMLResponse) def issues_partial(request: Request, _: str = Depends(require_admin)) -> Response: """Render only issue filters.""" context = _dashboard_context(request) return templates.TemplateResponse(request, "partials/_issue_filters.html", context) @app.get("/partials/rankings", response_class=HTMLResponse) def rankings_partial(request: Request, _: str = Depends(require_admin)) -> Response: """Render only ranking panels.""" context = _dashboard_context(request) return templates.TemplateResponse(request, "partials/_rankings.html", context) @app.get("/partials/chart", response_class=HTMLResponse) def chart_partial(request: Request, _: str = Depends(require_admin)) -> Response: """Render only the SVG chart panel.""" context = _dashboard_context(request) return templates.TemplateResponse(request, "partials/_chart.html", context) @app.get("/legislators", response_class=HTMLResponse) def legislators(request: Request, _: str = Depends(require_admin)) -> Response: """Render the legislator profile/search page.""" context = _legislators_context(request) return templates.TemplateResponse(request, "legislators.html", context) @app.get("/partials/legislator-suggestions", response_class=HTMLResponse) def legislator_suggestions_partial( request: Request, _: str = Depends(require_admin) ) -> Response: """Render legislator search suggestions for the HTMX typeahead.""" query = request.query_params.get("q", "").strip() context: dict[str, Any] = { "q": query if len(query) >= 2 else "", "matches": [], "build_legislator_url": _build_legislator_url, } if len(query) >= 2: with session_scope() as session: context["matches"] = repository.search_legislators( session, query=query, limit=8 ) return templates.TemplateResponse( request, "partials/_legislator_suggestions.html", context ) @app.get("/compare", response_class=HTMLResponse) def compare(request: Request, _: str = Depends(require_admin)) -> Response: """Render the legislator radar comparison page.""" context = _compare_context(request) return templates.TemplateResponse(request, "compare.html", context) def _dashboard_context(request: Request) -> dict[str, Any]: state = _parse_state(request) base_context: dict[str, Any] = { "state": state, "issues": state.issues, "selected_issue_label": " + ".join(state.issues) if state.issues else "", "chamber": state.chamber, "congress": state.congress, "latest_score_year": None, "last_updated": None, "suggestions": [], "rankings": RankingResult(supportive=[], opposed=[]), "compare": [], "chart_svg": render_score_history_svg([]), "chart_series": [], "has_votes": False, "has_scores": False, "empty_message": "", "build_url": _build_url, "toggle_compare": _toggle_compare, } with session_scope() as session: congress = state.congress or repository.latest_congress(session) base_context["congress"] = congress base_context["has_scores"] = repository.has_scores(session) base_context["latest_score_year"] = repository.latest_score_year(session) base_context["last_updated"] = repository.latest_vote_date(session, congress) base_context["suggestions"] = repository.issue_suggestions( session, congress=congress ) if not base_context["has_scores"]: base_context["empty_message"] = ( "No legislator scores are loaded yet. Run the score calculator first." ) return base_context if congress is None: base_context["congress"] = "Computed" if not state.issues: base_context["empty_message"] = ( "Choose one or more issue areas to calculate roll-call support scores." ) return base_context rankings = repository.get_rankings( session, issues=state.issues, chamber=state.chamber, congress=congress, ) base_context["rankings"] = rankings compare = state.compare or [row.legislator_id for row in rankings.supportive[:2]] base_context["compare"] = compare if not rankings.supportive and not rankings.opposed: base_context["empty_message"] = "No matching roll-call votes." return base_context history = repository.get_score_history( session, issues=state.issues, chamber=state.chamber, congress=congress, legislator_ids=compare, ) base_context["chart_series"] = history base_context["chart_svg"] = render_score_history_svg(history) return base_context def _parse_state(request: Request) -> DashboardState: query = request.query_params chamber = query.get("chamber", "senate").lower() if chamber not in {"house", "senate", "all"}: chamber = "senate" congress = _parse_int(query.get("congress")) compare = [ value for value in (_parse_int(raw) for raw in query.getlist("compare")) if value is not None ] return DashboardState( issues=normalize_issues(query.getlist("issues")), chamber=chamber, # type: ignore[arg-type] congress=congress, compare=compare, ) def _legislators_context(request: Request) -> dict[str, Any]: query = request.query_params.get("q", "").strip() legislator_id = _parse_int(request.query_params.get("legislator_id")) selected_topic = request.query_params.get("topic", "").strip() per_page = _parse_per_page(request.query_params.get("per_page")) page = max(_parse_int(request.query_params.get("page")) or 1, 1) base_context: dict[str, Any] = { "q": query, "profile": None, "matches": [], "result_count": 0, "page": page, "per_page": per_page, "per_page_options": [10, 25, 50], "total_pages": 1, "previous_page": None, "next_page": None, "selected_topic": selected_topic, "history_svg": render_score_history_svg([]), "history_series": [], "build_legislator_url": _build_legislator_url, "build_legislator_search_url": _build_legislator_search_url, } with session_scope() as session: result_count = repository.count_legislators(session, query=query) if query else 0 total_pages = max((result_count + per_page - 1) // per_page, 1) if page > total_pages: page = total_pages base_context["page"] = page matches = ( repository.search_legislators( session, query=query, limit=per_page, offset=(page - 1) * per_page, ) if query else [] ) profile = repository.get_legislator_profile( session, legislator_id=legislator_id, query=None ) base_context["profile"] = profile base_context["matches"] = matches base_context["result_count"] = result_count base_context["total_pages"] = total_pages base_context["previous_page"] = page - 1 if page > 1 else None base_context["next_page"] = page + 1 if page < total_pages else None if profile is None: return base_context if not selected_topic: if profile.bottom_topics: selected_topic = profile.bottom_topics[0].topic elif profile.top_topics: selected_topic = profile.top_topics[0].topic base_context["selected_topic"] = selected_topic if selected_topic: history = repository.get_single_legislator_history( session, legislator_id=profile.legislator.legislator_id, topic=selected_topic, ) base_context["history_series"] = history base_context["history_svg"] = render_score_history_svg(history) return base_context def _compare_context(request: Request) -> dict[str, Any]: selected_legislators = _parse_int_list( request.query_params.getlist("legislator_id") or request.query_params.getlist("compare") )[:4] topics = normalize_issues( request.query_params.getlist("topic") or request.query_params.getlist("issues") )[:8] query = request.query_params.get("q", "").strip() base_context: dict[str, Any] = { "selected_legislators": selected_legislators, "selected_legislator_options": [], "topics": topics, "q": query, "series": [], "radar_svg": render_compare_radar_svg([], []), "legislator_options": [], "topic_options": [], "build_compare_url": _build_compare_url, } with session_scope() as session: default_legislators, default_topics = repository.get_compare_defaults(session) if not selected_legislators and not query: selected_legislators = default_legislators[:3] if not topics: topics = default_topics[:6] selected_legislator_options = repository.get_legislator_options( session, selected_legislators ) series = repository.get_compare_radar_series( session, legislator_ids=selected_legislators, topics=topics ) base_context.update( { "selected_legislators": selected_legislators, "selected_legislator_options": selected_legislator_options, "topics": topics, "q": query, "series": series, "radar_svg": render_compare_radar_svg(topics, series), "legislator_options": repository.search_legislators( session, query=query or None, limit=12 ), "topic_options": repository.issue_suggestions( session, congress=None, limit=12 ), } ) return base_context def _parse_int(value: str | None) -> int | None: if value is None or value == "": return None try: return int(value) except ValueError: return None def _parse_int_list(values: list[str]) -> list[int]: parsed: list[int] = [] seen: set[int] = set() for value in values: integer = _parse_int(value) if integer is not None and integer not in seen: parsed.append(integer) seen.add(integer) return parsed def _parse_per_page(value: str | None) -> int: parsed = _parse_int(value) return parsed if parsed in {10, 25, 50} else 10 def _build_url( request: Request, *, issues: list[str] | None = None, chamber: str | None = None, congress: int | None = None, compare: list[int] | None = None, ) -> str: params: list[tuple[str, str]] = [] chosen_issues = ( issues if issues is not None else normalize_issues(request.query_params.getlist("issues")) ) chosen_chamber = ( chamber if chamber is not None else request.query_params.get("chamber", "senate") ) chosen_congress = ( congress if congress is not None else _parse_int(request.query_params.get("congress")) ) chosen_compare = ( compare if compare is not None else [ value for value in ( _parse_int(raw) for raw in request.query_params.getlist("compare") ) if value is not None ] ) for issue in chosen_issues: params.append(("issues", issue)) params.append(("chamber", chosen_chamber)) if chosen_congress is not None: params.append(("congress", str(chosen_congress))) for legislator_id in chosen_compare: params.append(("compare", str(legislator_id))) if not params: return "/" from urllib.parse import urlencode return f"/?{urlencode(params, doseq=True)}" def _toggle_compare(compare: list[int], legislator_id: int) -> list[int]: """Return compare IDs with the legislator added or removed.""" if legislator_id in compare: return [value for value in compare if value != legislator_id] return [*compare, legislator_id] def _build_legislator_url( *, legislator_id: int | None = None, q: str | None = None, topic: str | None = None, per_page: int | None = None, ) -> str: from urllib.parse import urlencode params: list[tuple[str, str]] = [] if legislator_id is not None: params.append(("legislator_id", str(legislator_id))) if q: params.append(("q", q)) if topic: params.append(("topic", topic)) if per_page in {10, 25, 50} and per_page != 10: params.append(("per_page", str(per_page))) return f"/legislators?{urlencode(params)}" if params else "/legislators" def _build_legislator_search_url( *, q: str, per_page: int, page: int = 1, ) -> str: from urllib.parse import urlencode params: list[tuple[str, str]] = [] if q: params.append(("q", q)) params.append(("per_page", str(per_page))) if page > 1: params.append(("page", str(page))) return f"/legislators?{urlencode(params)}" if params else "/legislators" def _build_compare_url( *, legislator_ids: list[int], topics: list[str], q: str | None = None, ) -> str: from urllib.parse import urlencode params: list[tuple[str, str]] = [] for legislator_id in legislator_ids[:4]: params.append(("legislator_id", str(legislator_id))) for topic in topics[:8]: params.append(("topic", topic)) if q: params.append(("q", q)) return f"/compare?{urlencode(params, doseq=True)}" if params else "/compare"