added orm code
This commit is contained in:
7
pipelines/orm/__init__.py
Normal file
7
pipelines/orm/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
||||
"""ORM package exports."""
|
||||
|
||||
from pipelines.orm.data_science_dev.base import DataScienceDevBase
|
||||
|
||||
__all__ = [
|
||||
"DataScienceDevBase",
|
||||
]
|
||||
51
pipelines/orm/common.py
Normal file
51
pipelines/orm/common.py
Normal file
@@ -0,0 +1,51 @@
|
||||
"""Shared ORM definitions."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from os import getenv
|
||||
from typing import cast
|
||||
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.engine import URL, Engine
|
||||
|
||||
NAMING_CONVENTION = {
|
||||
"ix": "ix_%(table_name)s_%(column_0_name)s",
|
||||
"uq": "uq_%(table_name)s_%(column_0_name)s",
|
||||
"ck": "ck_%(table_name)s_%(constraint_name)s",
|
||||
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
|
||||
"pk": "pk_%(table_name)s",
|
||||
}
|
||||
|
||||
|
||||
def get_connection_info(name: str) -> tuple[str, str, str, str, str | None]:
|
||||
"""Get connection info from environment variables."""
|
||||
database = getenv(f"{name}_DB")
|
||||
host = getenv(f"{name}_HOST")
|
||||
port = getenv(f"{name}_PORT")
|
||||
username = getenv(f"{name}_USER")
|
||||
password = getenv(f"{name}_PASSWORD")
|
||||
|
||||
if None in (database, host, port, username):
|
||||
error = f"Missing environment variables for Postgres connection.\n{database=}\n{host=}\n{port=}\n{username=}\n"
|
||||
raise ValueError(error)
|
||||
return cast("tuple[str, str, str, str, str | None]", (database, host, port, username, password))
|
||||
|
||||
|
||||
def get_postgres_engine(*, name: str = "POSTGRES", pool_pre_ping: bool = True) -> Engine:
|
||||
"""Create a SQLAlchemy engine from environment variables."""
|
||||
database, host, port, username, password = get_connection_info(name)
|
||||
|
||||
url = URL.create(
|
||||
drivername="postgresql+psycopg",
|
||||
username=username,
|
||||
password=password,
|
||||
host=host,
|
||||
port=int(port),
|
||||
database=database,
|
||||
)
|
||||
|
||||
return create_engine(
|
||||
url=url,
|
||||
pool_pre_ping=pool_pre_ping,
|
||||
pool_recycle=1800,
|
||||
)
|
||||
15
pipelines/orm/data_science_dev/__init__.py
Normal file
15
pipelines/orm/data_science_dev/__init__.py
Normal file
@@ -0,0 +1,15 @@
|
||||
"""Data science dev database ORM exports."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pipelines.orm.data_science_dev.base import (
|
||||
DataScienceDevBase,
|
||||
DataScienceDevTableBase,
|
||||
DataScienceDevTableBaseBig,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"DataScienceDevBase",
|
||||
"DataScienceDevTableBase",
|
||||
"DataScienceDevTableBaseBig",
|
||||
]
|
||||
52
pipelines/orm/data_science_dev/base.py
Normal file
52
pipelines/orm/data_science_dev/base.py
Normal file
@@ -0,0 +1,52 @@
|
||||
"""Data science dev database ORM base."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy import BigInteger, DateTime, MetaData, func
|
||||
from sqlalchemy.ext.declarative import AbstractConcreteBase
|
||||
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
|
||||
|
||||
from pipelines.orm.common import NAMING_CONVENTION
|
||||
|
||||
|
||||
class DataScienceDevBase(DeclarativeBase):
|
||||
"""Base class for data_science_dev database ORM models."""
|
||||
|
||||
schema_name = "main"
|
||||
|
||||
metadata = MetaData(
|
||||
schema=schema_name,
|
||||
naming_convention=NAMING_CONVENTION,
|
||||
)
|
||||
|
||||
|
||||
class _TableMixin:
|
||||
"""Shared timestamp columns for all table bases."""
|
||||
|
||||
created: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True),
|
||||
server_default=func.now(),
|
||||
)
|
||||
updated: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True),
|
||||
server_default=func.now(),
|
||||
onupdate=func.now(),
|
||||
)
|
||||
|
||||
|
||||
class DataScienceDevTableBase(_TableMixin, AbstractConcreteBase, DataScienceDevBase):
|
||||
"""Table with Integer primary key."""
|
||||
|
||||
__abstract__ = True
|
||||
|
||||
id: Mapped[int] = mapped_column(primary_key=True)
|
||||
|
||||
|
||||
class DataScienceDevTableBaseBig(_TableMixin, AbstractConcreteBase, DataScienceDevBase):
|
||||
"""Table with BigInteger primary key."""
|
||||
|
||||
__abstract__ = True
|
||||
|
||||
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
|
||||
17
pipelines/orm/data_science_dev/congress/__init__.py
Normal file
17
pipelines/orm/data_science_dev/congress/__init__.py
Normal file
@@ -0,0 +1,17 @@
|
||||
"""init."""
|
||||
|
||||
from pipelines.orm.data_science_dev.congress.bill import Bill, BillText
|
||||
from pipelines.orm.data_science_dev.congress.legislator import (
|
||||
Legislator,
|
||||
LegislatorSocialMedia,
|
||||
)
|
||||
from pipelines.orm.data_science_dev.congress.vote import Vote, VoteRecord
|
||||
|
||||
__all__ = [
|
||||
"Bill",
|
||||
"BillText",
|
||||
"Legislator",
|
||||
"LegislatorSocialMedia",
|
||||
"Vote",
|
||||
"VoteRecord",
|
||||
]
|
||||
72
pipelines/orm/data_science_dev/congress/bill.py
Normal file
72
pipelines/orm/data_science_dev/congress/bill.py
Normal file
@@ -0,0 +1,72 @@
|
||||
"""Bill model - legislation introduced in Congress."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import date
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from sqlalchemy import ForeignKey, Index, UniqueConstraint
|
||||
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
||||
|
||||
from pipelines.orm.data_science_dev.base import DataScienceDevTableBase
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pipelines.orm.data_science_dev.congress.vote import Vote
|
||||
|
||||
|
||||
class Bill(DataScienceDevTableBase):
|
||||
"""Legislation with congress number, type, titles, status, and sponsor."""
|
||||
|
||||
__tablename__ = "bill"
|
||||
|
||||
congress: Mapped[int]
|
||||
bill_type: Mapped[str]
|
||||
number: Mapped[int]
|
||||
|
||||
title: Mapped[str | None]
|
||||
title_short: Mapped[str | None]
|
||||
official_title: Mapped[str | None]
|
||||
|
||||
status: Mapped[str | None]
|
||||
status_at: Mapped[date | None]
|
||||
|
||||
sponsor_bioguide_id: Mapped[str | None]
|
||||
|
||||
subjects_top_term: Mapped[str | None]
|
||||
|
||||
votes: Mapped[list[Vote]] = relationship(
|
||||
"Vote",
|
||||
back_populates="bill",
|
||||
)
|
||||
bill_texts: Mapped[list[BillText]] = relationship(
|
||||
"BillText",
|
||||
back_populates="bill",
|
||||
cascade="all, delete-orphan",
|
||||
)
|
||||
|
||||
__table_args__ = (
|
||||
UniqueConstraint(
|
||||
"congress", "bill_type", "number", name="uq_bill_congress_type_number"
|
||||
),
|
||||
Index("ix_bill_congress", "congress"),
|
||||
)
|
||||
|
||||
|
||||
class BillText(DataScienceDevTableBase):
|
||||
"""Stores different text versions of a bill (introduced, enrolled, etc.)."""
|
||||
|
||||
__tablename__ = "bill_text"
|
||||
|
||||
bill_id: Mapped[int] = mapped_column(ForeignKey("main.bill.id", ondelete="CASCADE"))
|
||||
version_code: Mapped[str]
|
||||
version_name: Mapped[str | None]
|
||||
text_content: Mapped[str | None]
|
||||
date: Mapped[date | None]
|
||||
|
||||
bill: Mapped[Bill] = relationship("Bill", back_populates="bill_texts")
|
||||
|
||||
__table_args__ = (
|
||||
UniqueConstraint(
|
||||
"bill_id", "version_code", name="uq_bill_text_bill_id_version_code"
|
||||
),
|
||||
)
|
||||
68
pipelines/orm/data_science_dev/congress/legislator.py
Normal file
68
pipelines/orm/data_science_dev/congress/legislator.py
Normal file
@@ -0,0 +1,68 @@
|
||||
"""Legislator model - members of Congress."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import date
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from sqlalchemy import ForeignKey, Text
|
||||
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
||||
|
||||
from pipelines.orm.data_science_dev.base import DataScienceDevTableBase
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pipelines.orm.data_science_dev.congress.vote import VoteRecord
|
||||
|
||||
|
||||
class Legislator(DataScienceDevTableBase):
|
||||
"""Members of Congress with identification and current term info."""
|
||||
|
||||
__tablename__ = "legislator"
|
||||
|
||||
bioguide_id: Mapped[str] = mapped_column(Text, unique=True, index=True)
|
||||
|
||||
thomas_id: Mapped[str | None]
|
||||
lis_id: Mapped[str | None]
|
||||
govtrack_id: Mapped[int | None]
|
||||
opensecrets_id: Mapped[str | None]
|
||||
fec_ids: Mapped[str | None]
|
||||
|
||||
first_name: Mapped[str]
|
||||
last_name: Mapped[str]
|
||||
official_full_name: Mapped[str | None]
|
||||
nickname: Mapped[str | None]
|
||||
|
||||
birthday: Mapped[date | None]
|
||||
gender: Mapped[str | None]
|
||||
|
||||
current_party: Mapped[str | None]
|
||||
current_state: Mapped[str | None]
|
||||
current_district: Mapped[int | None]
|
||||
current_chamber: Mapped[str | None]
|
||||
|
||||
social_media_accounts: Mapped[list[LegislatorSocialMedia]] = relationship(
|
||||
"LegislatorSocialMedia",
|
||||
back_populates="legislator",
|
||||
cascade="all, delete-orphan",
|
||||
)
|
||||
vote_records: Mapped[list[VoteRecord]] = relationship(
|
||||
"VoteRecord",
|
||||
back_populates="legislator",
|
||||
cascade="all, delete-orphan",
|
||||
)
|
||||
|
||||
|
||||
class LegislatorSocialMedia(DataScienceDevTableBase):
|
||||
"""Social media account linked to a legislator."""
|
||||
|
||||
__tablename__ = "legislator_social_media"
|
||||
|
||||
legislator_id: Mapped[int] = mapped_column(ForeignKey("main.legislator.id"))
|
||||
platform: Mapped[str]
|
||||
account_name: Mapped[str]
|
||||
url: Mapped[str | None]
|
||||
source: Mapped[str]
|
||||
|
||||
legislator: Mapped[Legislator] = relationship(
|
||||
back_populates="social_media_accounts"
|
||||
)
|
||||
84
pipelines/orm/data_science_dev/congress/vote.py
Normal file
84
pipelines/orm/data_science_dev/congress/vote.py
Normal file
@@ -0,0 +1,84 @@
|
||||
"""Vote model - roll call votes in Congress."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import date
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from sqlalchemy import ForeignKey, Index, UniqueConstraint
|
||||
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
||||
|
||||
from pipelines.orm.data_science_dev.base import (
|
||||
DataScienceDevBase,
|
||||
DataScienceDevTableBase,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pipelines.orm.data_science_dev.congress.bill import Bill
|
||||
from pipelines.orm.data_science_dev.congress.legislator import Legislator
|
||||
from pipelines.orm.data_science_dev.congress.vote import Vote
|
||||
|
||||
|
||||
class VoteRecord(DataScienceDevBase):
|
||||
"""Links a vote to a legislator with their position (Yea, Nay, etc.)."""
|
||||
|
||||
__tablename__ = "vote_record"
|
||||
|
||||
vote_id: Mapped[int] = mapped_column(
|
||||
ForeignKey("main.vote.id", ondelete="CASCADE"),
|
||||
primary_key=True,
|
||||
)
|
||||
legislator_id: Mapped[int] = mapped_column(
|
||||
ForeignKey("main.legislator.id", ondelete="CASCADE"),
|
||||
primary_key=True,
|
||||
)
|
||||
position: Mapped[str]
|
||||
|
||||
vote: Mapped[Vote] = relationship("Vote", back_populates="vote_records")
|
||||
legislator: Mapped[Legislator] = relationship(
|
||||
"Legislator", back_populates="vote_records"
|
||||
)
|
||||
|
||||
|
||||
class Vote(DataScienceDevTableBase):
|
||||
"""Roll call votes with counts and optional bill linkage."""
|
||||
|
||||
__tablename__ = "vote"
|
||||
|
||||
congress: Mapped[int]
|
||||
chamber: Mapped[str]
|
||||
session: Mapped[int]
|
||||
number: Mapped[int]
|
||||
|
||||
vote_type: Mapped[str | None]
|
||||
question: Mapped[str | None]
|
||||
result: Mapped[str | None]
|
||||
result_text: Mapped[str | None]
|
||||
|
||||
vote_date: Mapped[date]
|
||||
|
||||
yea_count: Mapped[int | None]
|
||||
nay_count: Mapped[int | None]
|
||||
not_voting_count: Mapped[int | None]
|
||||
present_count: Mapped[int | None]
|
||||
|
||||
bill_id: Mapped[int | None] = mapped_column(ForeignKey("main.bill.id"))
|
||||
|
||||
bill: Mapped[Bill | None] = relationship("Bill", back_populates="votes")
|
||||
vote_records: Mapped[list[VoteRecord]] = relationship(
|
||||
"VoteRecord",
|
||||
back_populates="vote",
|
||||
cascade="all, delete-orphan",
|
||||
)
|
||||
|
||||
__table_args__ = (
|
||||
UniqueConstraint(
|
||||
"congress",
|
||||
"chamber",
|
||||
"session",
|
||||
"number",
|
||||
name="uq_vote_congress_chamber_session_number",
|
||||
),
|
||||
Index("ix_vote_date", "vote_date"),
|
||||
Index("ix_vote_congress_chamber", "congress", "chamber"),
|
||||
)
|
||||
16
pipelines/orm/data_science_dev/models.py
Normal file
16
pipelines/orm/data_science_dev/models.py
Normal file
@@ -0,0 +1,16 @@
|
||||
"""Data science dev database ORM models."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pipelines.orm.data_science_dev.congress import Bill, BillText, Legislator, Vote, VoteRecord
|
||||
from pipelines.orm.data_science_dev.posts import partitions # noqa: F401 — registers partition classes in metadata
|
||||
from pipelines.orm.data_science_dev.posts.tables import Posts
|
||||
|
||||
__all__ = [
|
||||
"Bill",
|
||||
"BillText",
|
||||
"Legislator",
|
||||
"Posts",
|
||||
"Vote",
|
||||
"VoteRecord",
|
||||
]
|
||||
11
pipelines/orm/data_science_dev/posts/__init__.py
Normal file
11
pipelines/orm/data_science_dev/posts/__init__.py
Normal file
@@ -0,0 +1,11 @@
|
||||
"""Posts module — weekly-partitioned posts table and partition ORM models."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pipelines.orm.data_science_dev.posts.failed_ingestion import FailedIngestion
|
||||
from pipelines.orm.data_science_dev.posts.tables import Posts
|
||||
|
||||
__all__ = [
|
||||
"FailedIngestion",
|
||||
"Posts",
|
||||
]
|
||||
33
pipelines/orm/data_science_dev/posts/columns.py
Normal file
33
pipelines/orm/data_science_dev/posts/columns.py
Normal file
@@ -0,0 +1,33 @@
|
||||
"""Shared column definitions for the posts partitioned table family."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy import BigInteger, SmallInteger, Text
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
|
||||
class PostsColumns:
|
||||
"""Mixin providing all posts columns. Used by both the parent table and partitions."""
|
||||
|
||||
post_id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
|
||||
user_id: Mapped[int] = mapped_column(BigInteger)
|
||||
instance: Mapped[str]
|
||||
date: Mapped[datetime] = mapped_column(primary_key=True)
|
||||
text: Mapped[str] = mapped_column(Text)
|
||||
langs: Mapped[str | None]
|
||||
like_count: Mapped[int]
|
||||
reply_count: Mapped[int]
|
||||
repost_count: Mapped[int]
|
||||
reply_to: Mapped[int | None] = mapped_column(BigInteger)
|
||||
replied_author: Mapped[int | None] = mapped_column(BigInteger)
|
||||
thread_root: Mapped[int | None] = mapped_column(BigInteger)
|
||||
thread_root_author: Mapped[int | None] = mapped_column(BigInteger)
|
||||
repost_from: Mapped[int | None] = mapped_column(BigInteger)
|
||||
reposted_author: Mapped[int | None] = mapped_column(BigInteger)
|
||||
quotes: Mapped[int | None] = mapped_column(BigInteger)
|
||||
quoted_author: Mapped[int | None] = mapped_column(BigInteger)
|
||||
labels: Mapped[str | None]
|
||||
sent_label: Mapped[int | None] = mapped_column(SmallInteger)
|
||||
sent_score: Mapped[float | None]
|
||||
17
pipelines/orm/data_science_dev/posts/failed_ingestion.py
Normal file
17
pipelines/orm/data_science_dev/posts/failed_ingestion.py
Normal file
@@ -0,0 +1,17 @@
|
||||
"""Table for storing JSONL lines that failed during post ingestion."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from sqlalchemy import Text
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
from pipelines.orm.data_science_dev.base import DataScienceDevTableBase
|
||||
|
||||
|
||||
class FailedIngestion(DataScienceDevTableBase):
|
||||
"""Stores raw JSONL lines and their error messages when ingestion fails."""
|
||||
|
||||
__tablename__ = "failed_ingestion"
|
||||
|
||||
raw_line: Mapped[str] = mapped_column(Text)
|
||||
error: Mapped[str] = mapped_column(Text)
|
||||
71
pipelines/orm/data_science_dev/posts/partitions.py
Normal file
71
pipelines/orm/data_science_dev/posts/partitions.py
Normal file
@@ -0,0 +1,71 @@
|
||||
"""Dynamically generated ORM classes for each weekly partition of the posts table.
|
||||
|
||||
Each class maps to a PostgreSQL partition table (e.g. posts_2024_01).
|
||||
These are real ORM models tracked by Alembic autogenerate.
|
||||
|
||||
Uses ISO week numbering (datetime.isocalendar().week). ISO years can have
|
||||
52 or 53 weeks, and week boundaries are always Monday to Monday.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from pipelines.orm.data_science_dev.base import DataScienceDevBase
|
||||
from pipelines.orm.data_science_dev.posts.columns import PostsColumns
|
||||
|
||||
PARTITION_START_YEAR = 2023
|
||||
PARTITION_END_YEAR = 2026
|
||||
|
||||
_current_module = sys.modules[__name__]
|
||||
|
||||
|
||||
def iso_weeks_in_year(year: int) -> int:
|
||||
"""Return the number of ISO weeks in a given year (52 or 53)."""
|
||||
dec_28 = datetime(year, 12, 28, tzinfo=UTC)
|
||||
return dec_28.isocalendar().week
|
||||
|
||||
|
||||
def week_bounds(year: int, week: int) -> tuple[datetime, datetime]:
|
||||
"""Return (start, end) datetimes for an ISO week.
|
||||
|
||||
Start = Monday 00:00:00 UTC of the given ISO week.
|
||||
End = Monday 00:00:00 UTC of the following ISO week.
|
||||
"""
|
||||
start = datetime.fromisocalendar(year, week, 1).replace(tzinfo=UTC)
|
||||
if week < iso_weeks_in_year(year):
|
||||
end = datetime.fromisocalendar(year, week + 1, 1).replace(tzinfo=UTC)
|
||||
else:
|
||||
end = datetime.fromisocalendar(year + 1, 1, 1).replace(tzinfo=UTC)
|
||||
return start, end
|
||||
|
||||
|
||||
def _build_partition_classes() -> dict[str, type]:
|
||||
"""Generate one ORM class per ISO week partition."""
|
||||
classes: dict[str, type] = {}
|
||||
|
||||
for year in range(PARTITION_START_YEAR, PARTITION_END_YEAR + 1):
|
||||
for week in range(1, iso_weeks_in_year(year) + 1):
|
||||
class_name = f"PostsWeek{year}W{week:02d}"
|
||||
table_name = f"posts_{year}_{week:02d}"
|
||||
|
||||
partition_class = type(
|
||||
class_name,
|
||||
(PostsColumns, DataScienceDevBase),
|
||||
{
|
||||
"__tablename__": table_name,
|
||||
"__table_args__": ({"implicit_returning": False},),
|
||||
},
|
||||
)
|
||||
|
||||
classes[class_name] = partition_class
|
||||
|
||||
return classes
|
||||
|
||||
|
||||
# Generate all partition classes and register them on this module
|
||||
_partition_classes = _build_partition_classes()
|
||||
for _name, _cls in _partition_classes.items():
|
||||
setattr(_current_module, _name, _cls)
|
||||
__all__ = list(_partition_classes.keys())
|
||||
13
pipelines/orm/data_science_dev/posts/tables.py
Normal file
13
pipelines/orm/data_science_dev/posts/tables.py
Normal file
@@ -0,0 +1,13 @@
|
||||
"""Posts parent table with PostgreSQL weekly range partitioning on date column."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pipelines.orm.data_science_dev.base import DataScienceDevBase
|
||||
from pipelines.orm.data_science_dev.posts.columns import PostsColumns
|
||||
|
||||
|
||||
class Posts(PostsColumns, DataScienceDevBase):
|
||||
"""Parent partitioned table for posts, partitioned by week on `date`."""
|
||||
|
||||
__tablename__ = "posts"
|
||||
__table_args__ = ({"postgresql_partition_by": "RANGE (date)"},)
|
||||
Reference in New Issue
Block a user