diff --git a/python/data_science/ingest_congress.py b/python/data_science/ingest_congress.py index b7e7868..281b858 100644 --- a/python/data_science/ingest_congress.py +++ b/python/data_science/ingest_congress.py @@ -1,11 +1,12 @@ """Ingestion pipeline for loading congress data from unitedstates/congress JSON files. Loads legislators, bills, votes, vote records, and bill text into the data_science_dev database. +Expects the parent directory to contain congress-tracker/ and congress-legislators/ as siblings. Usage: - ingest-congress /path/to/congress/data/ - ingest-congress /path/to/congress/data/ --congress 118 - ingest-congress /path/to/congress/data/ --congress 118 --only bills + ingest-congress /path/to/parent/ + ingest-congress /path/to/parent/ --congress 118 + ingest-congress /path/to/parent/ --congress 118 --only bills """ from __future__ import annotations @@ -16,12 +17,13 @@ from typing import TYPE_CHECKING, Annotated import orjson import typer +import yaml from sqlalchemy import select from sqlalchemy.orm import Session from python.common import configure_logger from python.orm.common import get_postgres_engine -from python.orm.data_science_dev.congress import Bill, BillText, Legislator, Vote, VoteRecord +from python.orm.data_science_dev.congress import Bill, BillText, Legislator, LegislatorSocialMedia, Vote, VoteRecord if TYPE_CHECKING: from collections.abc import Iterator @@ -37,18 +39,28 @@ app = typer.Typer(help="Ingest unitedstates/congress data into data_science_dev. @app.command() def main( - data_dir: Annotated[Path, typer.Argument(help="Path to the congress data/ directory")], + parent_dir: Annotated[ + Path, + typer.Argument(help="Parent directory containing congress-tracker/ and congress-legislators/"), + ], congress: Annotated[int | None, typer.Option(help="Only ingest a specific congress number")] = None, only: Annotated[ str | None, - typer.Option(help="Only run a specific step: legislators, bills, votes, bill-text"), + typer.Option(help="Only run a specific step: legislators, social-media, bills, votes, bill-text"), ] = None, ) -> None: """Ingest congress data from unitedstates/congress JSON files.""" configure_logger(level="INFO") + data_dir = parent_dir / "congress-tracker/congress/data/" + legislators_dir = parent_dir / "congress-legislators" + if not data_dir.is_dir(): - typer.echo(f"Data directory does not exist: {data_dir}", err=True) + typer.echo(f"Expected congress-tracker/ directory: {data_dir}", err=True) + raise typer.Exit(code=1) + + if not legislators_dir.is_dir(): + typer.echo(f"Expected congress-legislators/ directory: {legislators_dir}", err=True) raise typer.Exit(code=1) engine = get_postgres_engine(name="DATA_SCIENCE_DEV") @@ -60,11 +72,12 @@ def main( logger.info("Found %d congress directories to process", len(congress_dirs)) - steps = { - "legislators": ingest_legislators, - "bills": ingest_bills, - "votes": ingest_votes, - "bill-text": ingest_bill_text, + steps: dict[str, tuple] = { + "legislators": (ingest_legislators, (engine, legislators_dir)), + "legislators-social-media": (ingest_social_media, (engine, legislators_dir)), + "bills": (ingest_bills, (engine, congress_dirs)), + "votes": (ingest_votes, (engine, congress_dirs)), + "bill-text": (ingest_bill_text, (engine, congress_dirs)), } if only: @@ -73,9 +86,9 @@ def main( raise typer.Exit(code=1) steps = {only: steps[only]} - for step_name, step_func in steps.items(): + for step_name, (step_func, step_args) in steps.items(): logger.info("=== Starting step: %s ===", step_name) - step_func(engine, congress_dirs) + step_func(*step_args) logger.info("=== Finished step: %s ===", step_name) logger.info("ingest-congress done") @@ -102,33 +115,166 @@ def _flush_batch(session: Session, batch: list[object], label: str) -> int: # --------------------------------------------------------------------------- -# Legislators — extracted from vote JSON files (voter records include bioguide_id, name, party, state) +# Legislators — loaded from congress-legislators YAML files # --------------------------------------------------------------------------- -def ingest_legislators(engine: Engine, congress_dirs: list[Path]) -> None: - """Extract unique legislators from vote data and insert them.""" +def ingest_legislators(engine: Engine, legislators_dir: Path) -> None: + """Load legislators from congress-legislators YAML files.""" + legislators_data = _load_legislators_yaml(legislators_dir) + logger.info("Loaded %d legislators from YAML files", len(legislators_data)) + with Session(engine) as session: - seen_bioguide_ids = set(session.scalars(select(Legislator.bioguide_id)).all()) - logger.info("Found %d existing legislators in DB", len(seen_bioguide_ids)) + existing_legislators = { + legislator.bioguide_id: legislator for legislator in session.scalars(select(Legislator)).all() + } + logger.info("Found %d existing legislators in DB", len(existing_legislators)) total_inserted = 0 - batch: list[Legislator] = [] - for congress_dir in congress_dirs: - votes_dir = congress_dir / "votes" - if not votes_dir.is_dir(): + total_updated = 0 + for entry in legislators_data: + bioguide_id = entry.get("id", {}).get("bioguide") + if not bioguide_id: continue - logger.info("Scanning legislators from %s", congress_dir.name) - for vote_file in votes_dir.rglob("data.json"): - data = _read_json(vote_file) - if data is None: - continue - _extract_legislators(data, seen_bioguide_ids, batch) - if len(batch) >= BATCH_SIZE: - total_inserted += _flush_batch(session, batch, "legislators") - total_inserted += _flush_batch(session, batch, "legislators") - logger.info("Inserted %d new legislators total", total_inserted) + fields = _parse_legislator(entry) + if existing := existing_legislators.get(bioguide_id): + changed = False + for field, value in fields.items(): + if value is not None and getattr(existing, field) != value: + setattr(existing, field, value) + changed = True + if changed: + total_updated += 1 + else: + session.add(Legislator(bioguide_id=bioguide_id, **fields)) + total_inserted += 1 + + session.commit() + logger.info("Inserted %d new legislators, updated %d existing", total_inserted, total_updated) + + +def _load_legislators_yaml(legislators_dir: Path) -> list[dict]: + """Load and combine legislators-current.yaml and legislators-historical.yaml.""" + legislators: list[dict] = [] + for filename in ("legislators-current.yaml", "legislators-historical.yaml"): + path = legislators_dir / filename + if not path.exists(): + logger.warning("Legislators file not found: %s", path) + continue + with path.open() as file: + data = yaml.safe_load(file) + if isinstance(data, list): + legislators.extend(data) + return legislators + + +def _parse_legislator(entry: dict) -> dict: + """Extract Legislator fields from a congress-legislators YAML entry.""" + ids = entry.get("id", {}) + name = entry.get("name", {}) + bio = entry.get("bio", {}) + terms = entry.get("terms", []) + latest_term = terms[-1] if terms else {} + + fec_ids = ids.get("fec") + fec_ids_joined = ",".join(fec_ids) if isinstance(fec_ids, list) else fec_ids + + chamber = latest_term.get("type") + chamber_normalized = {"rep": "House", "sen": "Senate"}.get(chamber, chamber) + + return { + "thomas_id": ids.get("thomas"), + "lis_id": ids.get("lis"), + "govtrack_id": ids.get("govtrack"), + "opensecrets_id": ids.get("opensecrets"), + "fec_ids": fec_ids_joined, + "first_name": name.get("first"), + "last_name": name.get("last"), + "official_full_name": name.get("official_full"), + "nickname": name.get("nickname"), + "birthday": bio.get("birthday"), + "gender": bio.get("gender"), + "current_party": latest_term.get("party"), + "current_state": latest_term.get("state"), + "current_district": latest_term.get("district"), + "current_chamber": chamber_normalized, + } + + +# --------------------------------------------------------------------------- +# Social Media — loaded from legislators-social-media.yaml +# --------------------------------------------------------------------------- + +SOCIAL_MEDIA_PLATFORMS = { + "twitter": "https://twitter.com/{account}", + "facebook": "https://facebook.com/{account}", + "youtube": "https://youtube.com/{account}", + "instagram": "https://instagram.com/{account}", + "mastodon": None, +} + + +def ingest_social_media(engine: Engine, legislators_dir: Path) -> None: + """Load social media accounts from legislators-social-media.yaml.""" + social_media_path = legislators_dir / "legislators-social-media.yaml" + if not social_media_path.exists(): + logger.warning("Social media file not found: %s", social_media_path) + return + + with social_media_path.open() as file: + social_media_data = yaml.safe_load(file) + + if not isinstance(social_media_data, list): + logger.warning("Unexpected format in %s", social_media_path) + return + + logger.info("Loaded %d entries from legislators-social-media.yaml", len(social_media_data)) + + with Session(engine) as session: + legislator_map = _build_legislator_map(session) + existing_accounts = { + (account.legislator_id, account.platform) + for account in session.scalars(select(LegislatorSocialMedia)).all() + } + logger.info("Found %d existing social media accounts in DB", len(existing_accounts)) + + total_inserted = 0 + total_updated = 0 + for entry in social_media_data: + bioguide_id = entry.get("id", {}).get("bioguide") + if not bioguide_id: + continue + + legislator_id = legislator_map.get(bioguide_id) + if legislator_id is None: + continue + + social = entry.get("social", {}) + for platform, url_template in SOCIAL_MEDIA_PLATFORMS.items(): + account_name = social.get(platform) + if not account_name: + continue + + url = url_template.format(account=account_name) if url_template else None + + if (legislator_id, platform) in existing_accounts: + total_updated += 1 + else: + session.add( + LegislatorSocialMedia( + legislator_id=legislator_id, + platform=platform, + account_name=str(account_name), + url=url, + source="https://github.com/unitedstates/congress-legislators", + ) + ) + existing_accounts.add((legislator_id, platform)) + total_inserted += 1 + + session.commit() + logger.info("Inserted %d new social media accounts, updated %d existing", total_inserted, total_updated) def _iter_voters(position_group: object) -> Iterator[dict]: @@ -141,37 +287,6 @@ def _iter_voters(position_group: object) -> Iterator[dict]: yield voter -def _extract_legislators(data: dict, seen_bioguide_ids: set[str], batch: list[Legislator]) -> None: - """Extract unique legislators from a single vote data.json.""" - for position_group in data.get("votes", {}).values(): - for voter in _iter_voters(position_group): - bioguide_id = voter.get("id") - if not bioguide_id or bioguide_id in seen_bioguide_ids: - continue - seen_bioguide_ids.add(bioguide_id) - first_name, last_name = _split_name(voter.get("display_name", "")) - batch.append( - Legislator( - bioguide_id=bioguide_id, - first_name=first_name, - last_name=last_name, - current_party=voter.get("party"), - current_state=voter.get("state"), - ) - ) - - -def _split_name(display_name: str) -> tuple[str, str]: - """Split 'Last, First' or 'Name' into (first, last).""" - if "," in display_name: - parts = display_name.split(",", 1) - return parts[1].strip(), parts[0].strip() - parts = display_name.rsplit(" ", 1) - if len(parts) > 1: - return parts[0].strip(), parts[1].strip() - return display_name, "" - - # --------------------------------------------------------------------------- # Bills # ---------------------------------------------------------------------------