diff --git a/python/api/routers/views.py b/python/api/routers/views.py index 2c058b3..dc37f83 100644 --- a/python/api/routers/views.py +++ b/python/api/routers/views.py @@ -1,12 +1,13 @@ """HTMX server-rendered view router.""" from pathlib import Path +from typing import Annotated, Any from fastapi import APIRouter, Form, HTTPException, Request from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.templating import Jinja2Templates from sqlalchemy import select -from sqlalchemy.orm import selectinload +from sqlalchemy.orm import Session, selectinload from python.api.dependencies import DbSession from python.orm.richie.contact import Contact, ContactRelationship, Need, RelationshipType @@ -16,11 +17,39 @@ templates = Jinja2Templates(directory=TEMPLATES_DIR) router = APIRouter(tags=["views"]) -FAMILIAL_TYPES = {"parent", "child", "sibling", "grandparent", "grandchild", "aunt_uncle", "niece_nephew", "cousin", "in_law"} +FAMILIAL_TYPES = { + "parent", + "child", + "sibling", + "grandparent", + "grandchild", + "aunt_uncle", + "niece_nephew", + "cousin", + "in_law", +} FRIEND_TYPES = {"best_friend", "close_friend", "friend", "acquaintance", "neighbor"} PARTNER_TYPES = {"spouse", "partner"} PROFESSIONAL_TYPES = {"mentor", "mentee", "business_partner", "colleague", "manager", "direct_report", "client"} +CONTACT_STRING_FIELDS = ( + "name", + "legal_name", + "suffix", + "gender", + "current_job", + "timezone", + "profile_pic", + "bio", + "goals", + "social_structure_style", + "safe_conversation_starters", + "topics_to_avoid", + "ssn", +) + +CONTACT_INT_FIELDS = ("age", "self_sufficiency_score") + def _group_relationships(relationships: list[ContactRelationship]) -> dict[str, list[ContactRelationship]]: """Group relationships by category.""" @@ -45,7 +74,7 @@ def _group_relationships(relationships: list[ContactRelationship]) -> dict[str, return groups -def _build_contact_name_map(database: DbSession, contact: Contact) -> dict[int, str]: +def _build_contact_name_map(database: Session, contact: Contact) -> dict[int, str]: """Build a mapping of contact IDs to names for relationship display.""" related_ids = {rel.related_contact_id for rel in contact.related_to} related_ids |= {rel.contact_id for rel in contact.related_from} @@ -54,9 +83,7 @@ def _build_contact_name_map(database: DbSession, contact: Contact) -> dict[int, if not related_ids: return {} - related_contacts = list( - database.scalars(select(Contact).where(Contact.id.in_(related_ids))).all() - ) + related_contacts = list(database.scalars(select(Contact).where(Contact.id.in_(related_ids))).all()) return {related.id: related.name for related in related_contacts} @@ -65,41 +92,34 @@ def _get_relationship_type_display() -> dict[str, str]: return {rel_type.value: rel_type.display_name for rel_type in RelationshipType} -def _parse_contact_form( - name: str, - legal_name: str, - suffix: str, - age: str, - gender: str, - current_job: str, - timezone: str, - profile_pic: str, - bio: str, - goals: str, - social_structure_style: str, - self_sufficiency_score: str, - safe_conversation_starters: str, - topics_to_avoid: str, - ssn: str, -) -> dict: - """Parse form fields into a dict for contact creation/update.""" - return { - "name": name, - "legal_name": legal_name or None, - "suffix": suffix or None, - "age": int(age) if age else None, - "gender": gender or None, - "current_job": current_job or None, - "timezone": timezone or None, - "profile_pic": profile_pic or None, - "bio": bio or None, - "goals": goals or None, - "social_structure_style": social_structure_style or None, - "self_sufficiency_score": int(self_sufficiency_score) if self_sufficiency_score else None, - "safe_conversation_starters": safe_conversation_starters or None, - "topics_to_avoid": topics_to_avoid or None, - "ssn": ssn or None, - } +async def _parse_contact_form(request: Request) -> dict[str, Any]: + """Parse contact form data from a multipart/form request.""" + form_data = await request.form() + result: dict[str, Any] = {} + + for field in CONTACT_STRING_FIELDS: + value = form_data.get(field, "") + result[field] = str(value) if value else None + + for field in CONTACT_INT_FIELDS: + value = form_data.get(field, "") + result[field] = int(value) if value else None + + result["need_ids"] = [int(value) for value in form_data.getlist("need_ids")] + return result + + +def _save_contact_from_form(database: Session, contact: Contact, form_result: dict[str, Any]) -> None: + """Apply parsed form data to a Contact and save associated needs.""" + need_ids = form_result.pop("need_ids") + + for key, value in form_result.items(): + setattr(contact, key, value) + + if need_ids: + contact.needs = list(database.scalars(select(Need).where(Need.id.in_(need_ids))).all()) + else: + contact.needs = [] @router.get("/", response_class=HTMLResponse) @@ -107,51 +127,22 @@ def _parse_contact_form( def contact_list_page(request: Request, database: DbSession) -> HTMLResponse: """Render the contacts list page.""" contacts = list(database.scalars(select(Contact)).all()) - return templates.TemplateResponse( - request, "contact_list.html", {"contacts": contacts} - ) + return templates.TemplateResponse(request, "contact_list.html", {"contacts": contacts}) @router.get("/contacts/new", response_class=HTMLResponse) def new_contact_page(request: Request, database: DbSession) -> HTMLResponse: """Render the new contact form page.""" all_needs = list(database.scalars(select(Need)).all()) - return templates.TemplateResponse( - request, "contact_form.html", {"contact": None, "all_needs": all_needs} - ) + return templates.TemplateResponse(request, "contact_form.html", {"contact": None, "all_needs": all_needs}) @router.post("/htmx/contacts/new") -def create_contact_form( - database: DbSession, - name: str = Form(...), - legal_name: str = Form(""), - suffix: str = Form(""), - age: str = Form(""), - gender: str = Form(""), - current_job: str = Form(""), - timezone: str = Form(""), - profile_pic: str = Form(""), - bio: str = Form(""), - goals: str = Form(""), - social_structure_style: str = Form(""), - self_sufficiency_score: str = Form(""), - safe_conversation_starters: str = Form(""), - topics_to_avoid: str = Form(""), - ssn: str = Form(""), - need_ids: list[int] = Form([]), -) -> RedirectResponse: +async def create_contact_form(request: Request, database: DbSession) -> RedirectResponse: """Handle the create contact form submission.""" - contact_data = _parse_contact_form( - name, legal_name, suffix, age, gender, current_job, timezone, - profile_pic, bio, goals, social_structure_style, self_sufficiency_score, - safe_conversation_starters, topics_to_avoid, ssn, - ) - contact = Contact(**contact_data) - - if need_ids: - needs = list(database.scalars(select(Need).where(Need.id.in_(need_ids))).all()) - contact.needs = needs + form_result = await _parse_contact_form(request) + contact = Contact() + _save_contact_from_form(database, contact, form_result) database.add(contact) database.commit() @@ -197,57 +188,23 @@ def contact_detail_page(contact_id: int, request: Request, database: DbSession) @router.get("/contacts/{contact_id}/edit", response_class=HTMLResponse) def edit_contact_page(contact_id: int, request: Request, database: DbSession) -> HTMLResponse: """Render the edit contact form page.""" - contact = database.scalar( - select(Contact) - .where(Contact.id == contact_id) - .options(selectinload(Contact.needs)) - ) + contact = database.scalar(select(Contact).where(Contact.id == contact_id).options(selectinload(Contact.needs))) if not contact: raise HTTPException(status_code=404, detail="Contact not found") all_needs = list(database.scalars(select(Need)).all()) - return templates.TemplateResponse( - request, "contact_form.html", {"contact": contact, "all_needs": all_needs} - ) + return templates.TemplateResponse(request, "contact_form.html", {"contact": contact, "all_needs": all_needs}) @router.post("/htmx/contacts/{contact_id}/edit") -def update_contact_form( - contact_id: int, - database: DbSession, - name: str = Form(...), - legal_name: str = Form(""), - suffix: str = Form(""), - age: str = Form(""), - gender: str = Form(""), - current_job: str = Form(""), - timezone: str = Form(""), - profile_pic: str = Form(""), - bio: str = Form(""), - goals: str = Form(""), - social_structure_style: str = Form(""), - self_sufficiency_score: str = Form(""), - safe_conversation_starters: str = Form(""), - topics_to_avoid: str = Form(""), - ssn: str = Form(""), - need_ids: list[int] = Form([]), -) -> RedirectResponse: +async def update_contact_form(contact_id: int, request: Request, database: DbSession) -> RedirectResponse: """Handle the edit contact form submission.""" contact = database.get(Contact, contact_id) if not contact: raise HTTPException(status_code=404, detail="Contact not found") - contact_data = _parse_contact_form( - name, legal_name, suffix, age, gender, current_job, timezone, - profile_pic, bio, goals, social_structure_style, self_sufficiency_score, - safe_conversation_starters, topics_to_avoid, ssn, - ) - - for key, value in contact_data.items(): - setattr(contact, key, value) - - needs = list(database.scalars(select(Need).where(Need.id.in_(need_ids))).all()) if need_ids else [] - contact.needs = needs + form_result = await _parse_contact_form(request) + _save_contact_from_form(database, contact, form_result) database.commit() return RedirectResponse(url=f"/contacts/{contact_id}", status_code=303) @@ -258,14 +215,10 @@ def add_need_to_contact_htmx( contact_id: int, request: Request, database: DbSession, - need_id: int = Form(...), + need_id: Annotated[int, Form()], ) -> HTMLResponse: """Add a need to a contact and return updated manage-needs partial.""" - contact = database.scalar( - select(Contact) - .where(Contact.id == contact_id) - .options(selectinload(Contact.needs)) - ) + contact = database.scalar(select(Contact).where(Contact.id == contact_id).options(selectinload(Contact.needs))) if not contact: raise HTTPException(status_code=404, detail="Contact not found") @@ -278,9 +231,7 @@ def add_need_to_contact_htmx( database.commit() database.refresh(contact) - return templates.TemplateResponse( - request, "partials/manage_needs.html", {"contact": contact} - ) + return templates.TemplateResponse(request, "partials/manage_needs.html", {"contact": contact}) @router.post("/htmx/contacts/{contact_id}/add-relationship", response_class=HTMLResponse) @@ -288,15 +239,11 @@ def add_relationship_htmx( contact_id: int, request: Request, database: DbSession, - related_contact_id: int = Form(...), - relationship_type: str = Form(...), + related_contact_id: Annotated[int, Form()], + relationship_type: Annotated[str, Form()], ) -> HTMLResponse: """Add a relationship and return updated manage-relationships partial.""" - contact = database.scalar( - select(Contact) - .where(Contact.id == contact_id) - .options(selectinload(Contact.related_to)) - ) + contact = database.scalar(select(Contact).where(Contact.id == contact_id).options(selectinload(Contact.related_to))) if not contact: raise HTTPException(status_code=404, detail="Contact not found") @@ -319,7 +266,8 @@ def add_relationship_htmx( contact_names = _build_contact_name_map(database, contact) return templates.TemplateResponse( - request, "partials/manage_relationships.html", + request, + "partials/manage_relationships.html", {"contact": contact, "contact_names": contact_names}, ) @@ -329,7 +277,7 @@ def update_relationship_weight_htmx( contact_id: int, related_contact_id: int, database: DbSession, - closeness_weight: int = Form(...), + closeness_weight: Annotated[int, Form()], ) -> HTMLResponse: """Update a relationship's closeness weight from HTMX range input.""" relationship = database.scalar( @@ -350,8 +298,8 @@ def update_relationship_weight_htmx( def create_need_htmx( request: Request, database: DbSession, - name: str = Form(...), - description: str = Form(""), + name: Annotated[str, Form()], + description: Annotated[str, Form()] = "", ) -> HTMLResponse: """Create a need via form data and return updated needs list.""" need = Need(name=name, description=description or None)