"""FastAPI app for the HTMX legislative dashboard.""" from __future__ import annotations from contextlib import asynccontextmanager from dataclasses import dataclass import logging from os import getenv from pathlib import Path from typing import Any from fastapi import Depends, FastAPI, HTTPException, Request, Response, status from fastapi.responses import HTMLResponse, PlainTextResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pipelines.web import auth, repository from pipelines.web.db import session_scope, validate_database_connection from pipelines.web.repository import Chamber, RankingResult from pipelines.web.scoring import normalize_issues from pipelines.web.svg import render_compare_radar_svg, render_score_history_svg BASE_DIR = Path(__file__).resolve().parent TEMPLATES_DIR = BASE_DIR / "templates" STATIC_DIR = BASE_DIR / "static" templates = Jinja2Templates(directory=TEMPLATES_DIR) logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(_: FastAPI): """Validate database access when the CLI starts the web server.""" if getenv("PYTEST_CURRENT_TEST") is None: validate_database_connection() yield app = FastAPI(title="Nornsight Legislative Dashboard", lifespan=lifespan) app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") @dataclass(frozen=True) class DashboardState: """Dashboard query-string state.""" issues: list[str] chamber: Chamber congress: int | None compare: list[int] @app.get("/healthz", response_class=PlainTextResponse) def healthz() -> str: """Return a simple liveness response.""" return "ok" @app.get("/", response_class=HTMLResponse) def home(request: Request) -> Response: """Render the public home page.""" current_user = auth.get_current_session(request) return templates.TemplateResponse( request, "home.html", { **_auth_context(current_user), "auth_error": request.query_params.get("auth_error") == "1", }, ) @app.get("/login") def login(request: Request) -> Response: """Start the WorkOS hosted login flow.""" next_path = auth.safe_next_path(request.query_params.get("next")) current_user = auth.get_current_session(request) if current_user is not None: return RedirectResponse(next_path, status_code=status.HTTP_303_SEE_OTHER) return RedirectResponse( auth.build_authorization_url(next_path), status_code=status.HTTP_303_SEE_OTHER, ) @app.get("/callback") def callback(request: Request) -> Response: """Exchange the WorkOS code for a sealed session cookie.""" try: result = auth.exchange_code(request) except Exception: logger.exception("WorkOS callback exchange failed.") response = RedirectResponse("/?auth_error=1", status_code=status.HTTP_303_SEE_OTHER) _delete_auth_cookie(response) return response config = auth.get_auth_config() response = RedirectResponse(result.next_path, status_code=status.HTTP_303_SEE_OTHER) response.set_cookie( config.session_cookie_name, result.sealed_session, httponly=True, samesite="lax", secure=config.secure_cookies, ) return response @app.post("/logout") def logout(request: Request) -> Response: """End the WorkOS session and clear the local sealed session cookie.""" response = RedirectResponse(auth.get_logout_url(request), status_code=status.HTTP_303_SEE_OTHER) _delete_auth_cookie(response) return response def require_user(request: Request) -> auth.AuthSession: """Redirect unauthenticated users to the WorkOS sign-in flow.""" current_user = auth.get_current_session(request) if current_user is not None: return current_user next_path = request.url.path if request.url.query: next_path = f"{next_path}?{request.url.query}" login_url = request.url_for("login").include_query_params(next=next_path) raise HTTPException( status_code=status.HTTP_303_SEE_OTHER, headers={"Location": str(login_url)}, ) def require_admin(current_user: auth.AuthSession = Depends(require_user)) -> auth.AuthSession: """Restrict a route to WorkOS users with the admin role.""" if current_user.is_admin: return current_user raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required.") @app.get("/dashboard", response_class=HTMLResponse) def dashboard( request: Request, current_user: auth.AuthSession = Depends(require_user) ) -> Response: """Render the full dashboard page.""" context = {**_auth_context(current_user), **_dashboard_context(request)} if request.headers.get("hx-request") == "true": return templates.TemplateResponse(request, "partials/_dashboard.html", context) return templates.TemplateResponse(request, "dashboard.html", context) @app.get("/partials/dashboard", response_class=HTMLResponse) def dashboard_partial(request: Request, _: auth.AuthSession = Depends(require_user)) -> Response: """Render the filter-dependent dashboard body.""" context = _dashboard_context(request) return templates.TemplateResponse(request, "partials/_dashboard.html", context) @app.get("/partials/issues", response_class=HTMLResponse) def issues_partial(request: Request, _: auth.AuthSession = Depends(require_user)) -> Response: """Render only issue filters.""" context = _dashboard_context(request) return templates.TemplateResponse(request, "partials/_issue_filters.html", context) @app.get("/partials/rankings", response_class=HTMLResponse) def rankings_partial(request: Request, _: auth.AuthSession = Depends(require_user)) -> Response: """Render only ranking panels.""" context = _dashboard_context(request) return templates.TemplateResponse(request, "partials/_rankings.html", context) @app.get("/partials/chart", response_class=HTMLResponse) def chart_partial(request: Request, _: auth.AuthSession = Depends(require_user)) -> Response: """Render only the SVG chart panel.""" context = _dashboard_context(request) return templates.TemplateResponse(request, "partials/_chart.html", context) @app.get("/legislators", response_class=HTMLResponse) def legislators( request: Request, current_user: auth.AuthSession = Depends(require_user) ) -> Response: """Render the legislator profile/search page.""" context = {**_auth_context(current_user), **_legislators_context(request)} return templates.TemplateResponse(request, "legislators.html", context) @app.get("/partials/legislator-suggestions", response_class=HTMLResponse) def legislator_suggestions_partial( request: Request, _: auth.AuthSession = Depends(require_user) ) -> Response: """Render legislator search suggestions for the HTMX typeahead.""" query = request.query_params.get("q", "").strip() context: dict[str, Any] = { "q": query if len(query) >= 2 else "", "matches": [], "build_legislator_url": _build_legislator_url, } if len(query) >= 2: with session_scope() as session: context["matches"] = repository.search_legislators( session, query=query, limit=8 ) return templates.TemplateResponse( request, "partials/_legislator_suggestions.html", context ) @app.get("/compare", response_class=HTMLResponse) def compare( request: Request, current_user: auth.AuthSession = Depends(require_user) ) -> Response: """Render the legislator radar comparison page.""" context = {**_auth_context(current_user), **_compare_context(request)} return templates.TemplateResponse(request, "compare.html", context) @app.get("/admin", response_class=HTMLResponse) def admin_page( request: Request, current_user: auth.AuthSession = Depends(require_admin) ) -> Response: """Render the admin-only placeholder page.""" return templates.TemplateResponse( request, "admin.html", { **_auth_context(current_user), "organization_id": auth.get_auth_config().organization_id, }, ) def _dashboard_context(request: Request) -> dict[str, Any]: state = _parse_state(request) base_context: dict[str, Any] = { "state": state, "issues": state.issues, "selected_issue_label": " + ".join(state.issues) if state.issues else "", "chamber": state.chamber, "congress": state.congress, "latest_score_year": None, "last_updated": None, "suggestions": [], "rankings": RankingResult(supportive=[], opposed=[]), "compare": [], "chart_svg": render_score_history_svg([]), "chart_series": [], "has_votes": False, "has_scores": False, "empty_message": "", "build_url": _build_url, "build_dashboard_partial_url": _build_dashboard_partial_url, "toggle_compare": _toggle_compare, } with session_scope() as session: congress = state.congress or repository.latest_congress(session) base_context["congress"] = congress base_context["has_scores"] = repository.has_scores(session) base_context["latest_score_year"] = repository.latest_score_year(session) base_context["last_updated"] = repository.latest_vote_date(session, congress) base_context["suggestions"] = repository.issue_suggestions( session, congress=congress ) if not base_context["has_scores"]: base_context["empty_message"] = ( "No legislator scores are loaded yet. Run the score calculator first." ) return base_context if congress is None: base_context["congress"] = "Computed" if not state.issues: base_context["empty_message"] = ( "Choose one or more issue areas to calculate roll-call support scores." ) return base_context rankings = repository.get_rankings( session, issues=state.issues, chamber=state.chamber, congress=congress, ) base_context["rankings"] = rankings compare = state.compare or [row.legislator_id for row in rankings.supportive[:2]] base_context["compare"] = compare if not rankings.supportive and not rankings.opposed: base_context["empty_message"] = "No matching roll-call votes." return base_context history = repository.get_score_history( session, issues=state.issues, chamber=state.chamber, congress=congress, legislator_ids=compare, ) base_context["chart_series"] = history base_context["chart_svg"] = render_score_history_svg(history) return base_context def _parse_state(request: Request) -> DashboardState: query = request.query_params chamber = query.get("chamber", "senate").lower() if chamber not in {"house", "senate", "all"}: chamber = "senate" congress = _parse_int(query.get("congress")) compare = [ value for value in (_parse_int(raw) for raw in query.getlist("compare")) if value is not None ] return DashboardState( issues=normalize_issues(query.getlist("issues")), chamber=chamber, # type: ignore[arg-type] congress=congress, compare=compare, ) def _legislators_context(request: Request) -> dict[str, Any]: query = request.query_params.get("q", "").strip() legislator_id = _parse_int(request.query_params.get("legislator_id")) selected_topic = request.query_params.get("topic", "").strip() per_page = _parse_per_page(request.query_params.get("per_page")) page = max(_parse_int(request.query_params.get("page")) or 1, 1) base_context: dict[str, Any] = { "q": query, "profile": None, "matches": [], "result_count": 0, "page": page, "per_page": per_page, "per_page_options": [10, 25, 50], "total_pages": 1, "previous_page": None, "next_page": None, "selected_topic": selected_topic, "history_svg": render_score_history_svg([]), "history_series": [], "build_legislator_url": _build_legislator_url, "build_legislator_search_url": _build_legislator_search_url, } with session_scope() as session: result_count = repository.count_legislators(session, query=query) if query else 0 total_pages = max((result_count + per_page - 1) // per_page, 1) if page > total_pages: page = total_pages base_context["page"] = page matches = ( repository.search_legislators( session, query=query, limit=per_page, offset=(page - 1) * per_page, ) if query else [] ) profile = repository.get_legislator_profile( session, legislator_id=legislator_id, query=None ) base_context["profile"] = profile base_context["matches"] = matches base_context["result_count"] = result_count base_context["total_pages"] = total_pages base_context["previous_page"] = page - 1 if page > 1 else None base_context["next_page"] = page + 1 if page < total_pages else None if profile is None: return base_context if not selected_topic: if profile.bottom_topics: selected_topic = profile.bottom_topics[0].topic elif profile.top_topics: selected_topic = profile.top_topics[0].topic base_context["selected_topic"] = selected_topic if selected_topic: history = repository.get_single_legislator_history( session, legislator_id=profile.legislator.legislator_id, topic=selected_topic, ) base_context["history_series"] = history base_context["history_svg"] = render_score_history_svg(history) return base_context def _compare_context(request: Request) -> dict[str, Any]: selected_legislators = _parse_int_list( request.query_params.getlist("legislator_id") or request.query_params.getlist("compare") )[:4] topics = normalize_issues( request.query_params.getlist("topic") or request.query_params.getlist("issues") )[:8] query = request.query_params.get("q", "").strip() base_context: dict[str, Any] = { "selected_legislators": selected_legislators, "selected_legislator_options": [], "topics": topics, "q": query, "series": [], "radar_svg": render_compare_radar_svg([], []), "legislator_options": [], "topic_options": [], "build_compare_url": _build_compare_url, } with session_scope() as session: default_legislators, default_topics = repository.get_compare_defaults(session) if not selected_legislators and not query: selected_legislators = default_legislators[:3] if not topics: topics = default_topics[:6] selected_legislator_options = repository.get_legislator_options( session, selected_legislators ) series = repository.get_compare_radar_series( session, legislator_ids=selected_legislators, topics=topics ) base_context.update( { "selected_legislators": selected_legislators, "selected_legislator_options": selected_legislator_options, "topics": topics, "q": query, "series": series, "radar_svg": render_compare_radar_svg(topics, series), "legislator_options": repository.search_legislators( session, query=query or None, limit=12 ), "topic_options": repository.issue_suggestions( session, congress=None, limit=12 ), } ) return base_context def _parse_int(value: str | None) -> int | None: if value is None or value == "": return None try: return int(value) except ValueError: return None def _parse_int_list(values: list[str]) -> list[int]: parsed: list[int] = [] seen: set[int] = set() for value in values: integer = _parse_int(value) if integer is not None and integer not in seen: parsed.append(integer) seen.add(integer) return parsed def _parse_per_page(value: str | None) -> int: parsed = _parse_int(value) return parsed if parsed in {10, 25, 50} else 10 def _build_url( request: Request, *, issues: list[str] | None = None, chamber: str | None = None, congress: int | None = None, compare: list[int] | None = None, ) -> str: params: list[tuple[str, str]] = [] chosen_issues = ( issues if issues is not None else normalize_issues(request.query_params.getlist("issues")) ) chosen_chamber = ( chamber if chamber is not None else request.query_params.get("chamber", "senate") ) chosen_congress = ( congress if congress is not None else _parse_int(request.query_params.get("congress")) ) chosen_compare = ( compare if compare is not None else [ value for value in ( _parse_int(raw) for raw in request.query_params.getlist("compare") ) if value is not None ] ) for issue in chosen_issues: params.append(("issues", issue)) params.append(("chamber", chosen_chamber)) if chosen_congress is not None: params.append(("congress", str(chosen_congress))) for legislator_id in chosen_compare: params.append(("compare", str(legislator_id))) if not params: return "/dashboard" from urllib.parse import urlencode return f"/dashboard?{urlencode(params, doseq=True)}" def _build_dashboard_partial_url( request: Request, *, issues: list[str] | None = None, chamber: str | None = None, congress: int | None = None, compare: list[int] | None = None, ) -> str: """Return the HTMX endpoint matching the current dashboard query state.""" dashboard_url = _build_url( request, issues=issues, chamber=chamber, congress=congress, compare=compare, ) return dashboard_url.replace("/dashboard", "/partials/dashboard", 1) def _toggle_compare(compare: list[int], legislator_id: int) -> list[int]: """Return compare IDs with the legislator added or removed.""" if legislator_id in compare: return [value for value in compare if value != legislator_id] return [*compare, legislator_id] def _build_legislator_url( *, legislator_id: int | None = None, q: str | None = None, topic: str | None = None, per_page: int | None = None, ) -> str: from urllib.parse import urlencode params: list[tuple[str, str]] = [] if legislator_id is not None: params.append(("legislator_id", str(legislator_id))) if q: params.append(("q", q)) if topic: params.append(("topic", topic)) if per_page in {10, 25, 50} and per_page != 10: params.append(("per_page", str(per_page))) return f"/legislators?{urlencode(params)}" if params else "/legislators" def _build_legislator_search_url( *, q: str, per_page: int, page: int = 1, ) -> str: from urllib.parse import urlencode params: list[tuple[str, str]] = [] if q: params.append(("q", q)) params.append(("per_page", str(per_page))) if page > 1: params.append(("page", str(page))) return f"/legislators?{urlencode(params)}" if params else "/legislators" def _build_compare_url( *, legislator_ids: list[int], topics: list[str], q: str | None = None, ) -> str: from urllib.parse import urlencode params: list[tuple[str, str]] = [] for legislator_id in legislator_ids[:4]: params.append(("legislator_id", str(legislator_id))) for topic in topics[:8]: params.append(("topic", topic)) if q: params.append(("q", q)) return f"/compare?{urlencode(params, doseq=True)}" if params else "/compare" def _auth_context(current_user: auth.AuthSession | None) -> dict[str, Any]: """Shared template context for auth-aware navigation.""" return { "is_authenticated": current_user is not None, "is_admin": current_user.is_admin if current_user is not None else False, "current_user_name": current_user.display_name if current_user is not None else "", "current_user_email": current_user.email if current_user is not None else "", } def _delete_auth_cookie(response: Response) -> None: """Delete the sealed WorkOS session cookie.""" response.delete_cookie(getenv("WORKOS_SESSION_COOKIE_NAME", "workos_session"))