adding website
This commit is contained in:
@@ -0,0 +1,589 @@
|
||||
"""FastAPI app for the HTMX legislative dashboard."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from contextlib import asynccontextmanager
|
||||
from dataclasses import dataclass
|
||||
import hashlib
|
||||
import hmac
|
||||
from os import getenv
|
||||
from pathlib import Path
|
||||
import secrets
|
||||
from typing import Any
|
||||
from urllib.parse import parse_qs
|
||||
|
||||
from fastapi import Depends, FastAPI, HTTPException, Request, Response, status
|
||||
from fastapi.responses import HTMLResponse, PlainTextResponse, RedirectResponse
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from fastapi.templating import Jinja2Templates
|
||||
|
||||
from pipelines.web import repository
|
||||
from pipelines.web.db import session_scope, validate_database_connection
|
||||
from pipelines.web.repository import Chamber, RankingResult
|
||||
from pipelines.web.scoring import normalize_issues
|
||||
from pipelines.web.svg import render_compare_radar_svg, render_score_history_svg
|
||||
|
||||
BASE_DIR = Path(__file__).resolve().parent
|
||||
TEMPLATES_DIR = BASE_DIR / "templates"
|
||||
STATIC_DIR = BASE_DIR / "static"
|
||||
|
||||
templates = Jinja2Templates(directory=TEMPLATES_DIR)
|
||||
ADMIN_USERNAME = "admin"
|
||||
ADMIN_PASSWORD = "admin"
|
||||
SESSION_COOKIE = "nornsight_admin"
|
||||
SESSION_SECRET = "nornsight-local-dev-session-secret"
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(_: FastAPI):
|
||||
"""Validate database access when the CLI starts the web server."""
|
||||
if getenv("PYTEST_CURRENT_TEST") is None:
|
||||
validate_database_connection()
|
||||
yield
|
||||
|
||||
|
||||
app = FastAPI(title="Nornsight Legislative Dashboard", lifespan=lifespan)
|
||||
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DashboardState:
|
||||
"""Dashboard query-string state."""
|
||||
|
||||
issues: list[str]
|
||||
chamber: Chamber
|
||||
congress: int | None
|
||||
compare: list[int]
|
||||
|
||||
|
||||
@app.get("/healthz", response_class=PlainTextResponse)
|
||||
def healthz() -> str:
|
||||
"""Return a simple liveness response."""
|
||||
return "ok"
|
||||
|
||||
|
||||
@app.get("/login", response_class=HTMLResponse)
|
||||
def login(request: Request) -> Response:
|
||||
"""Render the integrated login page."""
|
||||
next_path = _safe_next_path(request.query_params.get("next"))
|
||||
if _authenticated_user(request) is not None:
|
||||
return RedirectResponse(next_path, status_code=status.HTTP_303_SEE_OTHER)
|
||||
return templates.TemplateResponse(
|
||||
request,
|
||||
"login.html",
|
||||
{
|
||||
"error": "",
|
||||
"is_authenticated": False,
|
||||
"show_primary_nav": False,
|
||||
"next_path": next_path,
|
||||
"username": "",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@app.post("/login", response_class=HTMLResponse)
|
||||
async def login_submit(request: Request) -> Response:
|
||||
"""Authenticate the hard-coded admin user and set a session cookie."""
|
||||
form = parse_qs((await request.body()).decode())
|
||||
username = form.get("username", [""])[0]
|
||||
password = form.get("password", [""])[0]
|
||||
next_path = _safe_next_path(form.get("next", [request.query_params.get("next")])[0])
|
||||
|
||||
username_ok = secrets.compare_digest(username, ADMIN_USERNAME)
|
||||
password_ok = secrets.compare_digest(password, ADMIN_PASSWORD)
|
||||
if not (username_ok and password_ok):
|
||||
return templates.TemplateResponse(
|
||||
request,
|
||||
"login.html",
|
||||
{
|
||||
"error": "Invalid username or password.",
|
||||
"is_authenticated": False,
|
||||
"show_primary_nav": False,
|
||||
"next_path": next_path,
|
||||
"username": username,
|
||||
},
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
)
|
||||
|
||||
response = RedirectResponse(next_path, status_code=status.HTTP_303_SEE_OTHER)
|
||||
response.set_cookie(
|
||||
SESSION_COOKIE,
|
||||
_sign_session(username),
|
||||
httponly=True,
|
||||
samesite="lax",
|
||||
)
|
||||
return response
|
||||
|
||||
|
||||
@app.get("/logout")
|
||||
def logout() -> Response:
|
||||
"""Clear the local admin session."""
|
||||
response = RedirectResponse("/login", status_code=status.HTTP_303_SEE_OTHER)
|
||||
response.delete_cookie(SESSION_COOKIE)
|
||||
return response
|
||||
|
||||
|
||||
def require_admin(request: Request) -> str:
|
||||
"""Redirect unauthenticated users to the in-site login page."""
|
||||
username = _authenticated_user(request)
|
||||
if username is not None:
|
||||
return username
|
||||
next_path = request.url.path
|
||||
if request.url.query:
|
||||
next_path = f"{next_path}?{request.url.query}"
|
||||
login_url = request.url_for("login").include_query_params(next=next_path)
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_303_SEE_OTHER,
|
||||
headers={"Location": str(login_url)},
|
||||
)
|
||||
|
||||
|
||||
def _authenticated_user(request: Request) -> str | None:
|
||||
token = request.cookies.get(SESSION_COOKIE)
|
||||
if token is None:
|
||||
return None
|
||||
try:
|
||||
username, signature = token.split(":", 1)
|
||||
except ValueError:
|
||||
return None
|
||||
if username != ADMIN_USERNAME:
|
||||
return None
|
||||
expected = _session_signature(username)
|
||||
if secrets.compare_digest(signature, expected):
|
||||
return username
|
||||
return None
|
||||
|
||||
|
||||
def _sign_session(username: str) -> str:
|
||||
return f"{username}:{_session_signature(username)}"
|
||||
|
||||
|
||||
def _session_signature(username: str) -> str:
|
||||
return hmac.new(
|
||||
SESSION_SECRET.encode(),
|
||||
username.encode(),
|
||||
hashlib.sha256,
|
||||
).hexdigest()
|
||||
|
||||
|
||||
def _safe_next_path(value: str | None) -> str:
|
||||
if value and value.startswith("/") and not value.startswith("//"):
|
||||
return value
|
||||
return "/"
|
||||
|
||||
|
||||
@app.get("/", response_class=HTMLResponse)
|
||||
def dashboard(request: Request, _: str = Depends(require_admin)) -> Response:
|
||||
"""Render the full dashboard page."""
|
||||
context = _dashboard_context(request)
|
||||
if request.headers.get("hx-request") == "true":
|
||||
return templates.TemplateResponse(request, "partials/_dashboard.html", context)
|
||||
return templates.TemplateResponse(request, "dashboard.html", context)
|
||||
|
||||
|
||||
@app.get("/partials/dashboard", response_class=HTMLResponse)
|
||||
def dashboard_partial(request: Request, _: str = Depends(require_admin)) -> Response:
|
||||
"""Render the filter-dependent dashboard body."""
|
||||
context = _dashboard_context(request)
|
||||
return templates.TemplateResponse(request, "partials/_dashboard.html", context)
|
||||
|
||||
|
||||
@app.get("/partials/issues", response_class=HTMLResponse)
|
||||
def issues_partial(request: Request, _: str = Depends(require_admin)) -> Response:
|
||||
"""Render only issue filters."""
|
||||
context = _dashboard_context(request)
|
||||
return templates.TemplateResponse(request, "partials/_issue_filters.html", context)
|
||||
|
||||
|
||||
@app.get("/partials/rankings", response_class=HTMLResponse)
|
||||
def rankings_partial(request: Request, _: str = Depends(require_admin)) -> Response:
|
||||
"""Render only ranking panels."""
|
||||
context = _dashboard_context(request)
|
||||
return templates.TemplateResponse(request, "partials/_rankings.html", context)
|
||||
|
||||
|
||||
@app.get("/partials/chart", response_class=HTMLResponse)
|
||||
def chart_partial(request: Request, _: str = Depends(require_admin)) -> Response:
|
||||
"""Render only the SVG chart panel."""
|
||||
context = _dashboard_context(request)
|
||||
return templates.TemplateResponse(request, "partials/_chart.html", context)
|
||||
|
||||
|
||||
@app.get("/legislators", response_class=HTMLResponse)
|
||||
def legislators(request: Request, _: str = Depends(require_admin)) -> Response:
|
||||
"""Render the legislator profile/search page."""
|
||||
context = _legislators_context(request)
|
||||
return templates.TemplateResponse(request, "legislators.html", context)
|
||||
|
||||
|
||||
@app.get("/partials/legislator-suggestions", response_class=HTMLResponse)
|
||||
def legislator_suggestions_partial(
|
||||
request: Request, _: str = Depends(require_admin)
|
||||
) -> Response:
|
||||
"""Render legislator search suggestions for the HTMX typeahead."""
|
||||
query = request.query_params.get("q", "").strip()
|
||||
context: dict[str, Any] = {
|
||||
"q": query if len(query) >= 2 else "",
|
||||
"matches": [],
|
||||
"build_legislator_url": _build_legislator_url,
|
||||
}
|
||||
if len(query) >= 2:
|
||||
with session_scope() as session:
|
||||
context["matches"] = repository.search_legislators(
|
||||
session, query=query, limit=8
|
||||
)
|
||||
return templates.TemplateResponse(
|
||||
request, "partials/_legislator_suggestions.html", context
|
||||
)
|
||||
|
||||
|
||||
@app.get("/compare", response_class=HTMLResponse)
|
||||
def compare(request: Request, _: str = Depends(require_admin)) -> Response:
|
||||
"""Render the legislator radar comparison page."""
|
||||
context = _compare_context(request)
|
||||
return templates.TemplateResponse(request, "compare.html", context)
|
||||
|
||||
|
||||
def _dashboard_context(request: Request) -> dict[str, Any]:
|
||||
state = _parse_state(request)
|
||||
base_context: dict[str, Any] = {
|
||||
"state": state,
|
||||
"issues": state.issues,
|
||||
"selected_issue_label": " + ".join(state.issues) if state.issues else "",
|
||||
"chamber": state.chamber,
|
||||
"congress": state.congress,
|
||||
"latest_score_year": None,
|
||||
"last_updated": None,
|
||||
"suggestions": [],
|
||||
"rankings": RankingResult(supportive=[], opposed=[]),
|
||||
"compare": [],
|
||||
"chart_svg": render_score_history_svg([]),
|
||||
"chart_series": [],
|
||||
"has_votes": False,
|
||||
"has_scores": False,
|
||||
"empty_message": "",
|
||||
"build_url": _build_url,
|
||||
"toggle_compare": _toggle_compare,
|
||||
}
|
||||
with session_scope() as session:
|
||||
congress = state.congress or repository.latest_congress(session)
|
||||
base_context["congress"] = congress
|
||||
base_context["has_scores"] = repository.has_scores(session)
|
||||
base_context["latest_score_year"] = repository.latest_score_year(session)
|
||||
base_context["last_updated"] = repository.latest_vote_date(session, congress)
|
||||
base_context["suggestions"] = repository.issue_suggestions(
|
||||
session, congress=congress
|
||||
)
|
||||
|
||||
if not base_context["has_scores"]:
|
||||
base_context["empty_message"] = (
|
||||
"No legislator scores are loaded yet. Run the score calculator first."
|
||||
)
|
||||
return base_context
|
||||
|
||||
if congress is None:
|
||||
base_context["congress"] = "Computed"
|
||||
|
||||
if not state.issues:
|
||||
base_context["empty_message"] = (
|
||||
"Choose one or more issue areas to calculate roll-call support scores."
|
||||
)
|
||||
return base_context
|
||||
|
||||
rankings = repository.get_rankings(
|
||||
session,
|
||||
issues=state.issues,
|
||||
chamber=state.chamber,
|
||||
congress=congress,
|
||||
)
|
||||
base_context["rankings"] = rankings
|
||||
compare = state.compare or [row.legislator_id for row in rankings.supportive[:2]]
|
||||
base_context["compare"] = compare
|
||||
if not rankings.supportive and not rankings.opposed:
|
||||
base_context["empty_message"] = "No matching roll-call votes."
|
||||
return base_context
|
||||
|
||||
history = repository.get_score_history(
|
||||
session,
|
||||
issues=state.issues,
|
||||
chamber=state.chamber,
|
||||
congress=congress,
|
||||
legislator_ids=compare,
|
||||
)
|
||||
base_context["chart_series"] = history
|
||||
base_context["chart_svg"] = render_score_history_svg(history)
|
||||
return base_context
|
||||
|
||||
|
||||
def _parse_state(request: Request) -> DashboardState:
|
||||
query = request.query_params
|
||||
chamber = query.get("chamber", "senate").lower()
|
||||
if chamber not in {"house", "senate", "all"}:
|
||||
chamber = "senate"
|
||||
congress = _parse_int(query.get("congress"))
|
||||
compare = [
|
||||
value
|
||||
for value in (_parse_int(raw) for raw in query.getlist("compare"))
|
||||
if value is not None
|
||||
]
|
||||
return DashboardState(
|
||||
issues=normalize_issues(query.getlist("issues")),
|
||||
chamber=chamber, # type: ignore[arg-type]
|
||||
congress=congress,
|
||||
compare=compare,
|
||||
)
|
||||
|
||||
|
||||
def _legislators_context(request: Request) -> dict[str, Any]:
|
||||
query = request.query_params.get("q", "").strip()
|
||||
legislator_id = _parse_int(request.query_params.get("legislator_id"))
|
||||
selected_topic = request.query_params.get("topic", "").strip()
|
||||
per_page = _parse_per_page(request.query_params.get("per_page"))
|
||||
page = max(_parse_int(request.query_params.get("page")) or 1, 1)
|
||||
base_context: dict[str, Any] = {
|
||||
"q": query,
|
||||
"profile": None,
|
||||
"matches": [],
|
||||
"result_count": 0,
|
||||
"page": page,
|
||||
"per_page": per_page,
|
||||
"per_page_options": [10, 25, 50],
|
||||
"total_pages": 1,
|
||||
"previous_page": None,
|
||||
"next_page": None,
|
||||
"selected_topic": selected_topic,
|
||||
"history_svg": render_score_history_svg([]),
|
||||
"history_series": [],
|
||||
"build_legislator_url": _build_legislator_url,
|
||||
"build_legislator_search_url": _build_legislator_search_url,
|
||||
}
|
||||
with session_scope() as session:
|
||||
result_count = repository.count_legislators(session, query=query) if query else 0
|
||||
total_pages = max((result_count + per_page - 1) // per_page, 1)
|
||||
if page > total_pages:
|
||||
page = total_pages
|
||||
base_context["page"] = page
|
||||
matches = (
|
||||
repository.search_legislators(
|
||||
session,
|
||||
query=query,
|
||||
limit=per_page,
|
||||
offset=(page - 1) * per_page,
|
||||
)
|
||||
if query
|
||||
else []
|
||||
)
|
||||
profile = repository.get_legislator_profile(
|
||||
session, legislator_id=legislator_id, query=None
|
||||
)
|
||||
base_context["profile"] = profile
|
||||
base_context["matches"] = matches
|
||||
base_context["result_count"] = result_count
|
||||
base_context["total_pages"] = total_pages
|
||||
base_context["previous_page"] = page - 1 if page > 1 else None
|
||||
base_context["next_page"] = page + 1 if page < total_pages else None
|
||||
if profile is None:
|
||||
return base_context
|
||||
if not selected_topic:
|
||||
if profile.bottom_topics:
|
||||
selected_topic = profile.bottom_topics[0].topic
|
||||
elif profile.top_topics:
|
||||
selected_topic = profile.top_topics[0].topic
|
||||
base_context["selected_topic"] = selected_topic
|
||||
if selected_topic:
|
||||
history = repository.get_single_legislator_history(
|
||||
session,
|
||||
legislator_id=profile.legislator.legislator_id,
|
||||
topic=selected_topic,
|
||||
)
|
||||
base_context["history_series"] = history
|
||||
base_context["history_svg"] = render_score_history_svg(history)
|
||||
return base_context
|
||||
|
||||
|
||||
def _compare_context(request: Request) -> dict[str, Any]:
|
||||
selected_legislators = _parse_int_list(
|
||||
request.query_params.getlist("legislator_id")
|
||||
or request.query_params.getlist("compare")
|
||||
)[:4]
|
||||
topics = normalize_issues(
|
||||
request.query_params.getlist("topic") or request.query_params.getlist("issues")
|
||||
)[:8]
|
||||
query = request.query_params.get("q", "").strip()
|
||||
base_context: dict[str, Any] = {
|
||||
"selected_legislators": selected_legislators,
|
||||
"selected_legislator_options": [],
|
||||
"topics": topics,
|
||||
"q": query,
|
||||
"series": [],
|
||||
"radar_svg": render_compare_radar_svg([], []),
|
||||
"legislator_options": [],
|
||||
"topic_options": [],
|
||||
"build_compare_url": _build_compare_url,
|
||||
}
|
||||
with session_scope() as session:
|
||||
default_legislators, default_topics = repository.get_compare_defaults(session)
|
||||
if not selected_legislators and not query:
|
||||
selected_legislators = default_legislators[:3]
|
||||
if not topics:
|
||||
topics = default_topics[:6]
|
||||
selected_legislator_options = repository.get_legislator_options(
|
||||
session, selected_legislators
|
||||
)
|
||||
series = repository.get_compare_radar_series(
|
||||
session, legislator_ids=selected_legislators, topics=topics
|
||||
)
|
||||
base_context.update(
|
||||
{
|
||||
"selected_legislators": selected_legislators,
|
||||
"selected_legislator_options": selected_legislator_options,
|
||||
"topics": topics,
|
||||
"q": query,
|
||||
"series": series,
|
||||
"radar_svg": render_compare_radar_svg(topics, series),
|
||||
"legislator_options": repository.search_legislators(
|
||||
session, query=query or None, limit=12
|
||||
),
|
||||
"topic_options": repository.issue_suggestions(
|
||||
session, congress=None, limit=12
|
||||
),
|
||||
}
|
||||
)
|
||||
return base_context
|
||||
|
||||
|
||||
def _parse_int(value: str | None) -> int | None:
|
||||
if value is None or value == "":
|
||||
return None
|
||||
try:
|
||||
return int(value)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def _parse_int_list(values: list[str]) -> list[int]:
|
||||
parsed: list[int] = []
|
||||
seen: set[int] = set()
|
||||
for value in values:
|
||||
integer = _parse_int(value)
|
||||
if integer is not None and integer not in seen:
|
||||
parsed.append(integer)
|
||||
seen.add(integer)
|
||||
return parsed
|
||||
|
||||
|
||||
def _parse_per_page(value: str | None) -> int:
|
||||
parsed = _parse_int(value)
|
||||
return parsed if parsed in {10, 25, 50} else 10
|
||||
|
||||
|
||||
def _build_url(
|
||||
request: Request,
|
||||
*,
|
||||
issues: list[str] | None = None,
|
||||
chamber: str | None = None,
|
||||
congress: int | None = None,
|
||||
compare: list[int] | None = None,
|
||||
) -> str:
|
||||
params: list[tuple[str, str]] = []
|
||||
chosen_issues = (
|
||||
issues
|
||||
if issues is not None
|
||||
else normalize_issues(request.query_params.getlist("issues"))
|
||||
)
|
||||
chosen_chamber = (
|
||||
chamber
|
||||
if chamber is not None
|
||||
else request.query_params.get("chamber", "senate")
|
||||
)
|
||||
chosen_congress = (
|
||||
congress
|
||||
if congress is not None
|
||||
else _parse_int(request.query_params.get("congress"))
|
||||
)
|
||||
chosen_compare = (
|
||||
compare
|
||||
if compare is not None
|
||||
else [
|
||||
value
|
||||
for value in (
|
||||
_parse_int(raw) for raw in request.query_params.getlist("compare")
|
||||
)
|
||||
if value is not None
|
||||
]
|
||||
)
|
||||
for issue in chosen_issues:
|
||||
params.append(("issues", issue))
|
||||
params.append(("chamber", chosen_chamber))
|
||||
if chosen_congress is not None:
|
||||
params.append(("congress", str(chosen_congress)))
|
||||
for legislator_id in chosen_compare:
|
||||
params.append(("compare", str(legislator_id)))
|
||||
if not params:
|
||||
return "/"
|
||||
from urllib.parse import urlencode
|
||||
|
||||
return f"/?{urlencode(params, doseq=True)}"
|
||||
|
||||
|
||||
def _toggle_compare(compare: list[int], legislator_id: int) -> list[int]:
|
||||
"""Return compare IDs with the legislator added or removed."""
|
||||
if legislator_id in compare:
|
||||
return [value for value in compare if value != legislator_id]
|
||||
return [*compare, legislator_id]
|
||||
|
||||
|
||||
def _build_legislator_url(
|
||||
*,
|
||||
legislator_id: int | None = None,
|
||||
q: str | None = None,
|
||||
topic: str | None = None,
|
||||
per_page: int | None = None,
|
||||
) -> str:
|
||||
from urllib.parse import urlencode
|
||||
|
||||
params: list[tuple[str, str]] = []
|
||||
if legislator_id is not None:
|
||||
params.append(("legislator_id", str(legislator_id)))
|
||||
if q:
|
||||
params.append(("q", q))
|
||||
if topic:
|
||||
params.append(("topic", topic))
|
||||
if per_page in {10, 25, 50} and per_page != 10:
|
||||
params.append(("per_page", str(per_page)))
|
||||
return f"/legislators?{urlencode(params)}" if params else "/legislators"
|
||||
|
||||
|
||||
def _build_legislator_search_url(
|
||||
*,
|
||||
q: str,
|
||||
per_page: int,
|
||||
page: int = 1,
|
||||
) -> str:
|
||||
from urllib.parse import urlencode
|
||||
|
||||
params: list[tuple[str, str]] = []
|
||||
if q:
|
||||
params.append(("q", q))
|
||||
params.append(("per_page", str(per_page)))
|
||||
if page > 1:
|
||||
params.append(("page", str(page)))
|
||||
return f"/legislators?{urlencode(params)}" if params else "/legislators"
|
||||
|
||||
|
||||
def _build_compare_url(
|
||||
*,
|
||||
legislator_ids: list[int],
|
||||
topics: list[str],
|
||||
q: str | None = None,
|
||||
) -> str:
|
||||
from urllib.parse import urlencode
|
||||
|
||||
params: list[tuple[str, str]] = []
|
||||
for legislator_id in legislator_ids[:4]:
|
||||
params.append(("legislator_id", str(legislator_id)))
|
||||
for topic in topics[:8]:
|
||||
params.append(("topic", topic))
|
||||
if q:
|
||||
params.append(("q", q))
|
||||
return f"/compare?{urlencode(params, doseq=True)}" if params else "/compare"
|
||||
Reference in New Issue
Block a user