From 2abd61d3b11c30b3941ff96b35803f0c151e592b Mon Sep 17 00:00:00 2001 From: Richie Cahill Date: Tue, 14 Apr 2026 18:17:35 -0400 Subject: [PATCH] added orm code --- pipelines/orm/__init__.py | 7 ++ pipelines/orm/common.py | 51 +++++++++++ pipelines/orm/data_science_dev/__init__.py | 15 ++++ pipelines/orm/data_science_dev/base.py | 52 ++++++++++++ .../orm/data_science_dev/congress/__init__.py | 17 ++++ .../orm/data_science_dev/congress/bill.py | 72 ++++++++++++++++ .../data_science_dev/congress/legislator.py | 68 +++++++++++++++ .../orm/data_science_dev/congress/vote.py | 84 +++++++++++++++++++ pipelines/orm/data_science_dev/models.py | 16 ++++ .../orm/data_science_dev/posts/__init__.py | 11 +++ .../orm/data_science_dev/posts/columns.py | 33 ++++++++ .../posts/failed_ingestion.py | 17 ++++ .../orm/data_science_dev/posts/partitions.py | 71 ++++++++++++++++ .../orm/data_science_dev/posts/tables.py | 13 +++ 14 files changed, 527 insertions(+) create mode 100644 pipelines/orm/__init__.py create mode 100644 pipelines/orm/common.py create mode 100644 pipelines/orm/data_science_dev/__init__.py create mode 100644 pipelines/orm/data_science_dev/base.py create mode 100644 pipelines/orm/data_science_dev/congress/__init__.py create mode 100644 pipelines/orm/data_science_dev/congress/bill.py create mode 100644 pipelines/orm/data_science_dev/congress/legislator.py create mode 100644 pipelines/orm/data_science_dev/congress/vote.py create mode 100644 pipelines/orm/data_science_dev/models.py create mode 100644 pipelines/orm/data_science_dev/posts/__init__.py create mode 100644 pipelines/orm/data_science_dev/posts/columns.py create mode 100644 pipelines/orm/data_science_dev/posts/failed_ingestion.py create mode 100644 pipelines/orm/data_science_dev/posts/partitions.py create mode 100644 pipelines/orm/data_science_dev/posts/tables.py diff --git a/pipelines/orm/__init__.py b/pipelines/orm/__init__.py new file mode 100644 index 0000000..a64f8eb --- /dev/null +++ b/pipelines/orm/__init__.py @@ -0,0 +1,7 @@ +"""ORM package exports.""" + +from pipelines.orm.data_science_dev.base import DataScienceDevBase + +__all__ = [ + "DataScienceDevBase", +] diff --git a/pipelines/orm/common.py b/pipelines/orm/common.py new file mode 100644 index 0000000..6f86462 --- /dev/null +++ b/pipelines/orm/common.py @@ -0,0 +1,51 @@ +"""Shared ORM definitions.""" + +from __future__ import annotations + +from os import getenv +from typing import cast + +from sqlalchemy import create_engine +from sqlalchemy.engine import URL, Engine + +NAMING_CONVENTION = { + "ix": "ix_%(table_name)s_%(column_0_name)s", + "uq": "uq_%(table_name)s_%(column_0_name)s", + "ck": "ck_%(table_name)s_%(constraint_name)s", + "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", + "pk": "pk_%(table_name)s", +} + + +def get_connection_info(name: str) -> tuple[str, str, str, str, str | None]: + """Get connection info from environment variables.""" + database = getenv(f"{name}_DB") + host = getenv(f"{name}_HOST") + port = getenv(f"{name}_PORT") + username = getenv(f"{name}_USER") + password = getenv(f"{name}_PASSWORD") + + if None in (database, host, port, username): + error = f"Missing environment variables for Postgres connection.\n{database=}\n{host=}\n{port=}\n{username=}\n" + raise ValueError(error) + return cast("tuple[str, str, str, str, str | None]", (database, host, port, username, password)) + + +def get_postgres_engine(*, name: str = "POSTGRES", pool_pre_ping: bool = True) -> Engine: + """Create a SQLAlchemy engine from environment variables.""" + database, host, port, username, password = get_connection_info(name) + + url = URL.create( + drivername="postgresql+psycopg", + username=username, + password=password, + host=host, + port=int(port), + database=database, + ) + + return create_engine( + url=url, + pool_pre_ping=pool_pre_ping, + pool_recycle=1800, + ) diff --git a/pipelines/orm/data_science_dev/__init__.py b/pipelines/orm/data_science_dev/__init__.py new file mode 100644 index 0000000..d60c35c --- /dev/null +++ b/pipelines/orm/data_science_dev/__init__.py @@ -0,0 +1,15 @@ +"""Data science dev database ORM exports.""" + +from __future__ import annotations + +from pipelines.orm.data_science_dev.base import ( + DataScienceDevBase, + DataScienceDevTableBase, + DataScienceDevTableBaseBig, +) + +__all__ = [ + "DataScienceDevBase", + "DataScienceDevTableBase", + "DataScienceDevTableBaseBig", +] diff --git a/pipelines/orm/data_science_dev/base.py b/pipelines/orm/data_science_dev/base.py new file mode 100644 index 0000000..3d889bf --- /dev/null +++ b/pipelines/orm/data_science_dev/base.py @@ -0,0 +1,52 @@ +"""Data science dev database ORM base.""" + +from __future__ import annotations + +from datetime import datetime + +from sqlalchemy import BigInteger, DateTime, MetaData, func +from sqlalchemy.ext.declarative import AbstractConcreteBase +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column + +from pipelines.orm.common import NAMING_CONVENTION + + +class DataScienceDevBase(DeclarativeBase): + """Base class for data_science_dev database ORM models.""" + + schema_name = "main" + + metadata = MetaData( + schema=schema_name, + naming_convention=NAMING_CONVENTION, + ) + + +class _TableMixin: + """Shared timestamp columns for all table bases.""" + + created: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + server_default=func.now(), + ) + updated: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + server_default=func.now(), + onupdate=func.now(), + ) + + +class DataScienceDevTableBase(_TableMixin, AbstractConcreteBase, DataScienceDevBase): + """Table with Integer primary key.""" + + __abstract__ = True + + id: Mapped[int] = mapped_column(primary_key=True) + + +class DataScienceDevTableBaseBig(_TableMixin, AbstractConcreteBase, DataScienceDevBase): + """Table with BigInteger primary key.""" + + __abstract__ = True + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) diff --git a/pipelines/orm/data_science_dev/congress/__init__.py b/pipelines/orm/data_science_dev/congress/__init__.py new file mode 100644 index 0000000..366fcda --- /dev/null +++ b/pipelines/orm/data_science_dev/congress/__init__.py @@ -0,0 +1,17 @@ +"""init.""" + +from pipelines.orm.data_science_dev.congress.bill import Bill, BillText +from pipelines.orm.data_science_dev.congress.legislator import ( + Legislator, + LegislatorSocialMedia, +) +from pipelines.orm.data_science_dev.congress.vote import Vote, VoteRecord + +__all__ = [ + "Bill", + "BillText", + "Legislator", + "LegislatorSocialMedia", + "Vote", + "VoteRecord", +] diff --git a/pipelines/orm/data_science_dev/congress/bill.py b/pipelines/orm/data_science_dev/congress/bill.py new file mode 100644 index 0000000..87c9c8d --- /dev/null +++ b/pipelines/orm/data_science_dev/congress/bill.py @@ -0,0 +1,72 @@ +"""Bill model - legislation introduced in Congress.""" + +from __future__ import annotations + +from datetime import date +from typing import TYPE_CHECKING + +from sqlalchemy import ForeignKey, Index, UniqueConstraint +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from pipelines.orm.data_science_dev.base import DataScienceDevTableBase + +if TYPE_CHECKING: + from pipelines.orm.data_science_dev.congress.vote import Vote + + +class Bill(DataScienceDevTableBase): + """Legislation with congress number, type, titles, status, and sponsor.""" + + __tablename__ = "bill" + + congress: Mapped[int] + bill_type: Mapped[str] + number: Mapped[int] + + title: Mapped[str | None] + title_short: Mapped[str | None] + official_title: Mapped[str | None] + + status: Mapped[str | None] + status_at: Mapped[date | None] + + sponsor_bioguide_id: Mapped[str | None] + + subjects_top_term: Mapped[str | None] + + votes: Mapped[list[Vote]] = relationship( + "Vote", + back_populates="bill", + ) + bill_texts: Mapped[list[BillText]] = relationship( + "BillText", + back_populates="bill", + cascade="all, delete-orphan", + ) + + __table_args__ = ( + UniqueConstraint( + "congress", "bill_type", "number", name="uq_bill_congress_type_number" + ), + Index("ix_bill_congress", "congress"), + ) + + +class BillText(DataScienceDevTableBase): + """Stores different text versions of a bill (introduced, enrolled, etc.).""" + + __tablename__ = "bill_text" + + bill_id: Mapped[int] = mapped_column(ForeignKey("main.bill.id", ondelete="CASCADE")) + version_code: Mapped[str] + version_name: Mapped[str | None] + text_content: Mapped[str | None] + date: Mapped[date | None] + + bill: Mapped[Bill] = relationship("Bill", back_populates="bill_texts") + + __table_args__ = ( + UniqueConstraint( + "bill_id", "version_code", name="uq_bill_text_bill_id_version_code" + ), + ) diff --git a/pipelines/orm/data_science_dev/congress/legislator.py b/pipelines/orm/data_science_dev/congress/legislator.py new file mode 100644 index 0000000..3c274b4 --- /dev/null +++ b/pipelines/orm/data_science_dev/congress/legislator.py @@ -0,0 +1,68 @@ +"""Legislator model - members of Congress.""" + +from __future__ import annotations + +from datetime import date +from typing import TYPE_CHECKING + +from sqlalchemy import ForeignKey, Text +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from pipelines.orm.data_science_dev.base import DataScienceDevTableBase + +if TYPE_CHECKING: + from pipelines.orm.data_science_dev.congress.vote import VoteRecord + + +class Legislator(DataScienceDevTableBase): + """Members of Congress with identification and current term info.""" + + __tablename__ = "legislator" + + bioguide_id: Mapped[str] = mapped_column(Text, unique=True, index=True) + + thomas_id: Mapped[str | None] + lis_id: Mapped[str | None] + govtrack_id: Mapped[int | None] + opensecrets_id: Mapped[str | None] + fec_ids: Mapped[str | None] + + first_name: Mapped[str] + last_name: Mapped[str] + official_full_name: Mapped[str | None] + nickname: Mapped[str | None] + + birthday: Mapped[date | None] + gender: Mapped[str | None] + + current_party: Mapped[str | None] + current_state: Mapped[str | None] + current_district: Mapped[int | None] + current_chamber: Mapped[str | None] + + social_media_accounts: Mapped[list[LegislatorSocialMedia]] = relationship( + "LegislatorSocialMedia", + back_populates="legislator", + cascade="all, delete-orphan", + ) + vote_records: Mapped[list[VoteRecord]] = relationship( + "VoteRecord", + back_populates="legislator", + cascade="all, delete-orphan", + ) + + +class LegislatorSocialMedia(DataScienceDevTableBase): + """Social media account linked to a legislator.""" + + __tablename__ = "legislator_social_media" + + legislator_id: Mapped[int] = mapped_column(ForeignKey("main.legislator.id")) + platform: Mapped[str] + account_name: Mapped[str] + url: Mapped[str | None] + source: Mapped[str] + + legislator: Mapped[Legislator] = relationship( + back_populates="social_media_accounts" + ) diff --git a/pipelines/orm/data_science_dev/congress/vote.py b/pipelines/orm/data_science_dev/congress/vote.py new file mode 100644 index 0000000..ce4de3f --- /dev/null +++ b/pipelines/orm/data_science_dev/congress/vote.py @@ -0,0 +1,84 @@ +"""Vote model - roll call votes in Congress.""" + +from __future__ import annotations + +from datetime import date +from typing import TYPE_CHECKING + +from sqlalchemy import ForeignKey, Index, UniqueConstraint +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from pipelines.orm.data_science_dev.base import ( + DataScienceDevBase, + DataScienceDevTableBase, +) + +if TYPE_CHECKING: + from pipelines.orm.data_science_dev.congress.bill import Bill + from pipelines.orm.data_science_dev.congress.legislator import Legislator + from pipelines.orm.data_science_dev.congress.vote import Vote + + +class VoteRecord(DataScienceDevBase): + """Links a vote to a legislator with their position (Yea, Nay, etc.).""" + + __tablename__ = "vote_record" + + vote_id: Mapped[int] = mapped_column( + ForeignKey("main.vote.id", ondelete="CASCADE"), + primary_key=True, + ) + legislator_id: Mapped[int] = mapped_column( + ForeignKey("main.legislator.id", ondelete="CASCADE"), + primary_key=True, + ) + position: Mapped[str] + + vote: Mapped[Vote] = relationship("Vote", back_populates="vote_records") + legislator: Mapped[Legislator] = relationship( + "Legislator", back_populates="vote_records" + ) + + +class Vote(DataScienceDevTableBase): + """Roll call votes with counts and optional bill linkage.""" + + __tablename__ = "vote" + + congress: Mapped[int] + chamber: Mapped[str] + session: Mapped[int] + number: Mapped[int] + + vote_type: Mapped[str | None] + question: Mapped[str | None] + result: Mapped[str | None] + result_text: Mapped[str | None] + + vote_date: Mapped[date] + + yea_count: Mapped[int | None] + nay_count: Mapped[int | None] + not_voting_count: Mapped[int | None] + present_count: Mapped[int | None] + + bill_id: Mapped[int | None] = mapped_column(ForeignKey("main.bill.id")) + + bill: Mapped[Bill | None] = relationship("Bill", back_populates="votes") + vote_records: Mapped[list[VoteRecord]] = relationship( + "VoteRecord", + back_populates="vote", + cascade="all, delete-orphan", + ) + + __table_args__ = ( + UniqueConstraint( + "congress", + "chamber", + "session", + "number", + name="uq_vote_congress_chamber_session_number", + ), + Index("ix_vote_date", "vote_date"), + Index("ix_vote_congress_chamber", "congress", "chamber"), + ) diff --git a/pipelines/orm/data_science_dev/models.py b/pipelines/orm/data_science_dev/models.py new file mode 100644 index 0000000..4264c5c --- /dev/null +++ b/pipelines/orm/data_science_dev/models.py @@ -0,0 +1,16 @@ +"""Data science dev database ORM models.""" + +from __future__ import annotations + +from pipelines.orm.data_science_dev.congress import Bill, BillText, Legislator, Vote, VoteRecord +from pipelines.orm.data_science_dev.posts import partitions # noqa: F401 — registers partition classes in metadata +from pipelines.orm.data_science_dev.posts.tables import Posts + +__all__ = [ + "Bill", + "BillText", + "Legislator", + "Posts", + "Vote", + "VoteRecord", +] diff --git a/pipelines/orm/data_science_dev/posts/__init__.py b/pipelines/orm/data_science_dev/posts/__init__.py new file mode 100644 index 0000000..97fe244 --- /dev/null +++ b/pipelines/orm/data_science_dev/posts/__init__.py @@ -0,0 +1,11 @@ +"""Posts module — weekly-partitioned posts table and partition ORM models.""" + +from __future__ import annotations + +from pipelines.orm.data_science_dev.posts.failed_ingestion import FailedIngestion +from pipelines.orm.data_science_dev.posts.tables import Posts + +__all__ = [ + "FailedIngestion", + "Posts", +] diff --git a/pipelines/orm/data_science_dev/posts/columns.py b/pipelines/orm/data_science_dev/posts/columns.py new file mode 100644 index 0000000..5b13b73 --- /dev/null +++ b/pipelines/orm/data_science_dev/posts/columns.py @@ -0,0 +1,33 @@ +"""Shared column definitions for the posts partitioned table family.""" + +from __future__ import annotations + +from datetime import datetime + +from sqlalchemy import BigInteger, SmallInteger, Text +from sqlalchemy.orm import Mapped, mapped_column + + +class PostsColumns: + """Mixin providing all posts columns. Used by both the parent table and partitions.""" + + post_id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + user_id: Mapped[int] = mapped_column(BigInteger) + instance: Mapped[str] + date: Mapped[datetime] = mapped_column(primary_key=True) + text: Mapped[str] = mapped_column(Text) + langs: Mapped[str | None] + like_count: Mapped[int] + reply_count: Mapped[int] + repost_count: Mapped[int] + reply_to: Mapped[int | None] = mapped_column(BigInteger) + replied_author: Mapped[int | None] = mapped_column(BigInteger) + thread_root: Mapped[int | None] = mapped_column(BigInteger) + thread_root_author: Mapped[int | None] = mapped_column(BigInteger) + repost_from: Mapped[int | None] = mapped_column(BigInteger) + reposted_author: Mapped[int | None] = mapped_column(BigInteger) + quotes: Mapped[int | None] = mapped_column(BigInteger) + quoted_author: Mapped[int | None] = mapped_column(BigInteger) + labels: Mapped[str | None] + sent_label: Mapped[int | None] = mapped_column(SmallInteger) + sent_score: Mapped[float | None] diff --git a/pipelines/orm/data_science_dev/posts/failed_ingestion.py b/pipelines/orm/data_science_dev/posts/failed_ingestion.py new file mode 100644 index 0000000..5853acd --- /dev/null +++ b/pipelines/orm/data_science_dev/posts/failed_ingestion.py @@ -0,0 +1,17 @@ +"""Table for storing JSONL lines that failed during post ingestion.""" + +from __future__ import annotations + +from sqlalchemy import Text +from sqlalchemy.orm import Mapped, mapped_column + +from pipelines.orm.data_science_dev.base import DataScienceDevTableBase + + +class FailedIngestion(DataScienceDevTableBase): + """Stores raw JSONL lines and their error messages when ingestion fails.""" + + __tablename__ = "failed_ingestion" + + raw_line: Mapped[str] = mapped_column(Text) + error: Mapped[str] = mapped_column(Text) diff --git a/pipelines/orm/data_science_dev/posts/partitions.py b/pipelines/orm/data_science_dev/posts/partitions.py new file mode 100644 index 0000000..7855784 --- /dev/null +++ b/pipelines/orm/data_science_dev/posts/partitions.py @@ -0,0 +1,71 @@ +"""Dynamically generated ORM classes for each weekly partition of the posts table. + +Each class maps to a PostgreSQL partition table (e.g. posts_2024_01). +These are real ORM models tracked by Alembic autogenerate. + +Uses ISO week numbering (datetime.isocalendar().week). ISO years can have +52 or 53 weeks, and week boundaries are always Monday to Monday. +""" + +from __future__ import annotations + +import sys +from datetime import UTC, datetime + +from pipelines.orm.data_science_dev.base import DataScienceDevBase +from pipelines.orm.data_science_dev.posts.columns import PostsColumns + +PARTITION_START_YEAR = 2023 +PARTITION_END_YEAR = 2026 + +_current_module = sys.modules[__name__] + + +def iso_weeks_in_year(year: int) -> int: + """Return the number of ISO weeks in a given year (52 or 53).""" + dec_28 = datetime(year, 12, 28, tzinfo=UTC) + return dec_28.isocalendar().week + + +def week_bounds(year: int, week: int) -> tuple[datetime, datetime]: + """Return (start, end) datetimes for an ISO week. + + Start = Monday 00:00:00 UTC of the given ISO week. + End = Monday 00:00:00 UTC of the following ISO week. + """ + start = datetime.fromisocalendar(year, week, 1).replace(tzinfo=UTC) + if week < iso_weeks_in_year(year): + end = datetime.fromisocalendar(year, week + 1, 1).replace(tzinfo=UTC) + else: + end = datetime.fromisocalendar(year + 1, 1, 1).replace(tzinfo=UTC) + return start, end + + +def _build_partition_classes() -> dict[str, type]: + """Generate one ORM class per ISO week partition.""" + classes: dict[str, type] = {} + + for year in range(PARTITION_START_YEAR, PARTITION_END_YEAR + 1): + for week in range(1, iso_weeks_in_year(year) + 1): + class_name = f"PostsWeek{year}W{week:02d}" + table_name = f"posts_{year}_{week:02d}" + + partition_class = type( + class_name, + (PostsColumns, DataScienceDevBase), + { + "__tablename__": table_name, + "__table_args__": ({"implicit_returning": False},), + }, + ) + + classes[class_name] = partition_class + + return classes + + +# Generate all partition classes and register them on this module +_partition_classes = _build_partition_classes() +for _name, _cls in _partition_classes.items(): + setattr(_current_module, _name, _cls) +__all__ = list(_partition_classes.keys()) diff --git a/pipelines/orm/data_science_dev/posts/tables.py b/pipelines/orm/data_science_dev/posts/tables.py new file mode 100644 index 0000000..a536d55 --- /dev/null +++ b/pipelines/orm/data_science_dev/posts/tables.py @@ -0,0 +1,13 @@ +"""Posts parent table with PostgreSQL weekly range partitioning on date column.""" + +from __future__ import annotations + +from pipelines.orm.data_science_dev.base import DataScienceDevBase +from pipelines.orm.data_science_dev.posts.columns import PostsColumns + + +class Posts(PostsColumns, DataScienceDevBase): + """Parent partitioned table for posts, partitioned by week on `date`.""" + + __tablename__ = "posts" + __table_args__ = ({"postgresql_partition_by": "RANGE (date)"},)