From be4b473a3ca77d4b3841ce11d98bc99ea47b8022 Mon Sep 17 00:00:00 2001 From: Richie Cahill Date: Tue, 21 Apr 2026 11:36:58 -0400 Subject: [PATCH] got alembic working --- ..._24-adding_2026_partitions_f99be864fe69.py | 2 +- ..._24-adding_failedingestion_2f43120e3ffc.py | 16 ++- ...6_03_24-adding_posts_table_f6c5475834a7.py | 2 +- ...25-adding_2023_partishions_605b1794838f.py | 2 +- .../2026_03_25-attach_partitions_to_posts.py | 15 ++- ...03_27-adding_congress_data_83bfc8af92d8.py | 116 +++++++++++++++--- ...ding_legislatorsocialmedia_5cd7eee3549d.py | 16 ++- database_cli.py | 8 +- pipelines/__init__.py | 1 + common.py => pipelines/common.py | 0 config.py => pipelines/config.py | 0 pipelines/ingest_congress.py | 2 +- pipelines/ingest_posts.py | 4 +- parallelize.py => pipelines/parallelize.py | 0 14 files changed, 144 insertions(+), 40 deletions(-) create mode 100644 pipelines/__init__.py rename common.py => pipelines/common.py (100%) rename config.py => pipelines/config.py (100%) rename parallelize.py => pipelines/parallelize.py (100%) diff --git a/alembic/data_science_dev/versions/2026_03_24-adding_2026_partitions_f99be864fe69.py b/alembic/data_science_dev/versions/2026_03_24-adding_2026_partitions_f99be864fe69.py index 8daeb72..36eb1d5 100644 --- a/alembic/data_science_dev/versions/2026_03_24-adding_2026_partitions_f99be864fe69.py +++ b/alembic/data_science_dev/versions/2026_03_24-adding_2026_partitions_f99be864fe69.py @@ -13,7 +13,7 @@ from typing import TYPE_CHECKING import sqlalchemy as sa from alembic import op -from python.orm import DataScienceDevBase +from pipelines.orm import DataScienceDevBase if TYPE_CHECKING: from collections.abc import Sequence diff --git a/alembic/data_science_dev/versions/2026_03_24-adding_failedingestion_2f43120e3ffc.py b/alembic/data_science_dev/versions/2026_03_24-adding_failedingestion_2f43120e3ffc.py index 507e5e7..a21a742 100644 --- a/alembic/data_science_dev/versions/2026_03_24-adding_failedingestion_2f43120e3ffc.py +++ b/alembic/data_science_dev/versions/2026_03_24-adding_failedingestion_2f43120e3ffc.py @@ -13,7 +13,7 @@ from typing import TYPE_CHECKING import sqlalchemy as sa from alembic import op -from python.orm import DataScienceDevBase +from pipelines.orm import DataScienceDevBase if TYPE_CHECKING: from collections.abc import Sequence @@ -35,8 +35,18 @@ def upgrade() -> None: sa.Column("raw_line", sa.Text(), nullable=False), sa.Column("error", sa.Text(), nullable=False), sa.Column("id", sa.Integer(), nullable=False), - sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False), - sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False), + sa.Column( + "created", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.Column( + "updated", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), sa.PrimaryKeyConstraint("id", name=op.f("pk_failed_ingestion")), schema=schema, ) diff --git a/alembic/data_science_dev/versions/2026_03_24-adding_posts_table_f6c5475834a7.py b/alembic/data_science_dev/versions/2026_03_24-adding_posts_table_f6c5475834a7.py index 5e88e72..5008a66 100644 --- a/alembic/data_science_dev/versions/2026_03_24-adding_posts_table_f6c5475834a7.py +++ b/alembic/data_science_dev/versions/2026_03_24-adding_posts_table_f6c5475834a7.py @@ -13,7 +13,7 @@ from typing import TYPE_CHECKING import sqlalchemy as sa from alembic import op -from python.orm import DataScienceDevBase +from pipelines.orm import DataScienceDevBase if TYPE_CHECKING: from collections.abc import Sequence diff --git a/alembic/data_science_dev/versions/2026_03_25-adding_2023_partishions_605b1794838f.py b/alembic/data_science_dev/versions/2026_03_25-adding_2023_partishions_605b1794838f.py index 751f8c7..94f48aa 100644 --- a/alembic/data_science_dev/versions/2026_03_25-adding_2023_partishions_605b1794838f.py +++ b/alembic/data_science_dev/versions/2026_03_25-adding_2023_partishions_605b1794838f.py @@ -13,7 +13,7 @@ from typing import TYPE_CHECKING import sqlalchemy as sa from alembic import op -from python.orm import DataScienceDevBase +from pipelines.orm import DataScienceDevBase if TYPE_CHECKING: from collections.abc import Sequence diff --git a/alembic/data_science_dev/versions/2026_03_25-attach_partitions_to_posts.py b/alembic/data_science_dev/versions/2026_03_25-attach_partitions_to_posts.py index 5390566..159da3f 100644 --- a/alembic/data_science_dev/versions/2026_03_25-attach_partitions_to_posts.py +++ b/alembic/data_science_dev/versions/2026_03_25-attach_partitions_to_posts.py @@ -17,8 +17,8 @@ from typing import TYPE_CHECKING from alembic import op from sqlalchemy import text -from python.orm import DataScienceDevBase -from python.orm.data_science_dev.posts.partitions import ( +from pipelines.orm import DataScienceDevBase +from pipelines.orm.data_science_dev.posts.partitions import ( PARTITION_END_YEAR, PARTITION_START_YEAR, iso_weeks_in_year, @@ -46,7 +46,12 @@ ALREADY_ATTACHED_QUERY = text(""" def upgrade() -> None: """Attach all weekly partition tables to the posts parent table.""" connection = op.get_bind() - already_attached = {row[0] for row in connection.execute(ALREADY_ATTACHED_QUERY, {"parent": f"{schema}.posts"})} + already_attached = { + row[0] + for row in connection.execute( + ALREADY_ATTACHED_QUERY, {"parent": f"{schema}.posts"} + ) + } for year in range(PARTITION_START_YEAR, PARTITION_END_YEAR + 1): for week in range(1, iso_weeks_in_year(year) + 1): @@ -69,4 +74,6 @@ def downgrade() -> None: for year in range(PARTITION_START_YEAR, PARTITION_END_YEAR + 1): for week in range(1, iso_weeks_in_year(year) + 1): table_name = f"posts_{year}_{week:02d}" - op.execute(f"ALTER TABLE {schema}.posts DETACH PARTITION {schema}.{table_name}") + op.execute( + f"ALTER TABLE {schema}.posts DETACH PARTITION {schema}.{table_name}" + ) diff --git a/alembic/data_science_dev/versions/2026_03_27-adding_congress_data_83bfc8af92d8.py b/alembic/data_science_dev/versions/2026_03_27-adding_congress_data_83bfc8af92d8.py index 79295c5..88815a3 100644 --- a/alembic/data_science_dev/versions/2026_03_27-adding_congress_data_83bfc8af92d8.py +++ b/alembic/data_science_dev/versions/2026_03_27-adding_congress_data_83bfc8af92d8.py @@ -13,7 +13,7 @@ from typing import TYPE_CHECKING import sqlalchemy as sa from alembic import op -from python.orm import DataScienceDevBase +from pipelines.orm import DataScienceDevBase if TYPE_CHECKING: from collections.abc import Sequence @@ -43,13 +43,27 @@ def upgrade() -> None: sa.Column("sponsor_bioguide_id", sa.String(), nullable=True), sa.Column("subjects_top_term", sa.String(), nullable=True), sa.Column("id", sa.Integer(), nullable=False), - sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False), - sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False), + sa.Column( + "created", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.Column( + "updated", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), sa.PrimaryKeyConstraint("id", name=op.f("pk_bill")), - sa.UniqueConstraint("congress", "bill_type", "number", name="uq_bill_congress_type_number"), + sa.UniqueConstraint( + "congress", "bill_type", "number", name="uq_bill_congress_type_number" + ), schema=schema, ) - op.create_index("ix_bill_congress", "bill", ["congress"], unique=False, schema=schema) + op.create_index( + "ix_bill_congress", "bill", ["congress"], unique=False, schema=schema + ) op.create_table( "legislator", sa.Column("bioguide_id", sa.Text(), nullable=False), @@ -69,12 +83,28 @@ def upgrade() -> None: sa.Column("current_district", sa.Integer(), nullable=True), sa.Column("current_chamber", sa.String(), nullable=True), sa.Column("id", sa.Integer(), nullable=False), - sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False), - sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False), + sa.Column( + "created", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.Column( + "updated", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), sa.PrimaryKeyConstraint("id", name=op.f("pk_legislator")), schema=schema, ) - op.create_index(op.f("ix_legislator_bioguide_id"), "legislator", ["bioguide_id"], unique=True, schema=schema) + op.create_index( + op.f("ix_legislator_bioguide_id"), + "legislator", + ["bioguide_id"], + unique=True, + schema=schema, + ) op.create_table( "bill_text", sa.Column("bill_id", sa.Integer(), nullable=False), @@ -83,13 +113,28 @@ def upgrade() -> None: sa.Column("text_content", sa.String(), nullable=True), sa.Column("date", sa.Date(), nullable=True), sa.Column("id", sa.Integer(), nullable=False), - sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False), - sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False), + sa.Column( + "created", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.Column( + "updated", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), sa.ForeignKeyConstraint( - ["bill_id"], [f"{schema}.bill.id"], name=op.f("fk_bill_text_bill_id_bill"), ondelete="CASCADE" + ["bill_id"], + [f"{schema}.bill.id"], + name=op.f("fk_bill_text_bill_id_bill"), + ondelete="CASCADE", ), sa.PrimaryKeyConstraint("id", name=op.f("pk_bill_text")), - sa.UniqueConstraint("bill_id", "version_code", name="uq_bill_text_bill_id_version_code"), + sa.UniqueConstraint( + "bill_id", "version_code", name="uq_bill_text_bill_id_version_code" + ), schema=schema, ) op.create_table( @@ -109,14 +154,38 @@ def upgrade() -> None: sa.Column("present_count", sa.Integer(), nullable=True), sa.Column("bill_id", sa.Integer(), nullable=True), sa.Column("id", sa.Integer(), nullable=False), - sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False), - sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False), - sa.ForeignKeyConstraint(["bill_id"], [f"{schema}.bill.id"], name=op.f("fk_vote_bill_id_bill")), + sa.Column( + "created", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.Column( + "updated", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.ForeignKeyConstraint( + ["bill_id"], [f"{schema}.bill.id"], name=op.f("fk_vote_bill_id_bill") + ), sa.PrimaryKeyConstraint("id", name=op.f("pk_vote")), - sa.UniqueConstraint("congress", "chamber", "session", "number", name="uq_vote_congress_chamber_session_number"), + sa.UniqueConstraint( + "congress", + "chamber", + "session", + "number", + name="uq_vote_congress_chamber_session_number", + ), + schema=schema, + ) + op.create_index( + "ix_vote_congress_chamber", + "vote", + ["congress", "chamber"], + unique=False, schema=schema, ) - op.create_index("ix_vote_congress_chamber", "vote", ["congress", "chamber"], unique=False, schema=schema) op.create_index("ix_vote_date", "vote", ["vote_date"], unique=False, schema=schema) op.create_table( "vote_record", @@ -130,9 +199,14 @@ def upgrade() -> None: ondelete="CASCADE", ), sa.ForeignKeyConstraint( - ["vote_id"], [f"{schema}.vote.id"], name=op.f("fk_vote_record_vote_id_vote"), ondelete="CASCADE" + ["vote_id"], + [f"{schema}.vote.id"], + name=op.f("fk_vote_record_vote_id_vote"), + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint( + "vote_id", "legislator_id", name=op.f("pk_vote_record") ), - sa.PrimaryKeyConstraint("vote_id", "legislator_id", name=op.f("pk_vote_record")), schema=schema, ) # ### end Alembic commands ### @@ -146,7 +220,9 @@ def downgrade() -> None: op.drop_index("ix_vote_congress_chamber", table_name="vote", schema=schema) op.drop_table("vote", schema=schema) op.drop_table("bill_text", schema=schema) - op.drop_index(op.f("ix_legislator_bioguide_id"), table_name="legislator", schema=schema) + op.drop_index( + op.f("ix_legislator_bioguide_id"), table_name="legislator", schema=schema + ) op.drop_table("legislator", schema=schema) op.drop_index("ix_bill_congress", table_name="bill", schema=schema) op.drop_table("bill", schema=schema) diff --git a/alembic/data_science_dev/versions/2026_03_29-adding_legislatorsocialmedia_5cd7eee3549d.py b/alembic/data_science_dev/versions/2026_03_29-adding_legislatorsocialmedia_5cd7eee3549d.py index f7bc5b4..739ab38 100644 --- a/alembic/data_science_dev/versions/2026_03_29-adding_legislatorsocialmedia_5cd7eee3549d.py +++ b/alembic/data_science_dev/versions/2026_03_29-adding_legislatorsocialmedia_5cd7eee3549d.py @@ -13,7 +13,7 @@ from typing import TYPE_CHECKING import sqlalchemy as sa from alembic import op -from python.orm import DataScienceDevBase +from pipelines.orm import DataScienceDevBase if TYPE_CHECKING: from collections.abc import Sequence @@ -38,8 +38,18 @@ def upgrade() -> None: sa.Column("url", sa.String(), nullable=True), sa.Column("source", sa.String(), nullable=False), sa.Column("id", sa.Integer(), nullable=False), - sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False), - sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False), + sa.Column( + "created", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.Column( + "updated", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), sa.ForeignKeyConstraint( ["legislator_id"], [f"{schema}.legislator.id"], diff --git a/database_cli.py b/database_cli.py index 0c6129d..f22d7b9 100644 --- a/database_cli.py +++ b/database_cli.py @@ -34,7 +34,7 @@ class DatabaseConfig: base_module: str base_class_name: str models_module: str - script_location: str = "python/alembic" + script_location: str = "alembic" file_template: str = "%%(year)d_%%(month).2d_%%(day).2d-%%(slug)s_%%(rev)s" def get_base(self) -> type[DeclarativeBase]: @@ -77,10 +77,10 @@ class DatabaseConfig: DATABASES: dict[str, DatabaseConfig] = { "data_science_dev": DatabaseConfig( env_prefix="DATA_SCIENCE_DEV", - version_location="python/alembic/data_science_dev/versions", - base_module="python.orm.data_science_dev.base", + version_location="alembic/data_science_dev/versions", + base_module="pipelines.orm.data_science_dev.base", base_class_name="DataScienceDevBase", - models_module="python.orm.data_science_dev.models", + models_module="pipelines.orm.data_science_dev.models", ), } diff --git a/pipelines/__init__.py b/pipelines/__init__.py new file mode 100644 index 0000000..14e8999 --- /dev/null +++ b/pipelines/__init__.py @@ -0,0 +1 @@ +"""Init.""" diff --git a/common.py b/pipelines/common.py similarity index 100% rename from common.py rename to pipelines/common.py diff --git a/config.py b/pipelines/config.py similarity index 100% rename from config.py rename to pipelines/config.py diff --git a/pipelines/ingest_congress.py b/pipelines/ingest_congress.py index 9a6b3c6..e278ef3 100644 --- a/pipelines/ingest_congress.py +++ b/pipelines/ingest_congress.py @@ -21,7 +21,7 @@ import yaml from sqlalchemy import select from sqlalchemy.orm import Session -from pipelines.common import configure_logger +from pipelines.pipelines.common import configure_logger from pipelines.orm.common import get_postgres_engine from pipelines.orm.data_science_dev.congress import ( Bill, diff --git a/pipelines/ingest_posts.py b/pipelines/ingest_posts.py index 95470a7..fc3a602 100644 --- a/pipelines/ingest_posts.py +++ b/pipelines/ingest_posts.py @@ -17,9 +17,9 @@ import orjson import psycopg import typer -from pipelines.common import configure_logger +from pipelines.pipelines.common import configure_logger from pipelines.orm.common import get_connection_info -from pipelines.parallelize import parallelize_process +from pipelines.pipelines.parallelize import parallelize_process if TYPE_CHECKING: from collections.abc import Iterator diff --git a/parallelize.py b/pipelines/parallelize.py similarity index 100% rename from parallelize.py rename to pipelines/parallelize.py