5 Commits

35 changed files with 9875 additions and 54 deletions
@@ -0,0 +1,60 @@
"""adding FailedIngestion.
Revision ID: 2f43120e3ffc
Revises: f99be864fe69
Create Date: 2026-03-24 23:46:17.277897
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "2f43120e3ffc"
down_revision: str | None = "f99be864fe69"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"failed_ingestion",
sa.Column("raw_line", sa.Text(), nullable=False),
sa.Column("error", sa.Text(), nullable=False),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_failed_ingestion")),
schema=schema,
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("failed_ingestion", schema=schema)
# ### end Alembic commands ###
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,79 @@
"""Attach all partition tables to the posts parent table.
Alembic autogenerate creates partition tables as standalone tables but does not
emit the ALTER TABLE ... ATTACH PARTITION statements needed for PostgreSQL to
route inserts to the correct partition.
Revision ID: a1b2c3d4e5f6
Revises: 605b1794838f
Create Date: 2026-03-25 10:00:00.000000
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from alembic import op
from sqlalchemy import text
from pipelines.orm import DataScienceDevBase
from pipelines.orm.data_science_dev.posts.partitions import (
PARTITION_END_YEAR,
PARTITION_START_YEAR,
iso_weeks_in_year,
week_bounds,
)
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "a1b2c3d4e5f6"
down_revision: str | None = "605b1794838f"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
ALREADY_ATTACHED_QUERY = text("""
SELECT inhrelid::regclass::text
FROM pg_inherits
WHERE inhparent = :parent::regclass
""")
def upgrade() -> None:
"""Attach all weekly partition tables to the posts parent table."""
connection = op.get_bind()
already_attached = {
row[0]
for row in connection.execute(
ALREADY_ATTACHED_QUERY, {"parent": f"{schema}.posts"}
)
}
for year in range(PARTITION_START_YEAR, PARTITION_END_YEAR + 1):
for week in range(1, iso_weeks_in_year(year) + 1):
table_name = f"posts_{year}_{week:02d}"
qualified_name = f"{schema}.{table_name}"
if qualified_name in already_attached:
continue
start, end = week_bounds(year, week)
start_str = start.strftime("%Y-%m-%d %H:%M:%S")
end_str = end.strftime("%Y-%m-%d %H:%M:%S")
op.execute(
f"ALTER TABLE {schema}.posts "
f"ATTACH PARTITION {qualified_name} "
f"FOR VALUES FROM ('{start_str}') TO ('{end_str}')"
)
def downgrade() -> None:
"""Detach all weekly partition tables from the posts parent table."""
for year in range(PARTITION_START_YEAR, PARTITION_END_YEAR + 1):
for week in range(1, iso_weeks_in_year(year) + 1):
table_name = f"posts_{year}_{week:02d}"
op.execute(
f"ALTER TABLE {schema}.posts DETACH PARTITION {schema}.{table_name}"
)
@@ -0,0 +1,229 @@
"""adding congress data.
Revision ID: 83bfc8af92d8
Revises: a1b2c3d4e5f6
Create Date: 2026-03-27 10:43:02.324510
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "83bfc8af92d8"
down_revision: str | None = "a1b2c3d4e5f6"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"bill",
sa.Column("congress", sa.Integer(), nullable=False),
sa.Column("bill_type", sa.String(), nullable=False),
sa.Column("number", sa.Integer(), nullable=False),
sa.Column("title", sa.String(), nullable=True),
sa.Column("title_short", sa.String(), nullable=True),
sa.Column("official_title", sa.String(), nullable=True),
sa.Column("status", sa.String(), nullable=True),
sa.Column("status_at", sa.Date(), nullable=True),
sa.Column("sponsor_bioguide_id", sa.String(), nullable=True),
sa.Column("subjects_top_term", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_bill")),
sa.UniqueConstraint(
"congress", "bill_type", "number", name="uq_bill_congress_type_number"
),
schema=schema,
)
op.create_index(
"ix_bill_congress", "bill", ["congress"], unique=False, schema=schema
)
op.create_table(
"legislator",
sa.Column("bioguide_id", sa.Text(), nullable=False),
sa.Column("thomas_id", sa.String(), nullable=True),
sa.Column("lis_id", sa.String(), nullable=True),
sa.Column("govtrack_id", sa.Integer(), nullable=True),
sa.Column("opensecrets_id", sa.String(), nullable=True),
sa.Column("fec_ids", sa.String(), nullable=True),
sa.Column("first_name", sa.String(), nullable=False),
sa.Column("last_name", sa.String(), nullable=False),
sa.Column("official_full_name", sa.String(), nullable=True),
sa.Column("nickname", sa.String(), nullable=True),
sa.Column("birthday", sa.Date(), nullable=True),
sa.Column("gender", sa.String(), nullable=True),
sa.Column("current_party", sa.String(), nullable=True),
sa.Column("current_state", sa.String(), nullable=True),
sa.Column("current_district", sa.Integer(), nullable=True),
sa.Column("current_chamber", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_legislator")),
schema=schema,
)
op.create_index(
op.f("ix_legislator_bioguide_id"),
"legislator",
["bioguide_id"],
unique=True,
schema=schema,
)
op.create_table(
"bill_text",
sa.Column("bill_id", sa.Integer(), nullable=False),
sa.Column("version_code", sa.String(), nullable=False),
sa.Column("version_name", sa.String(), nullable=True),
sa.Column("text_content", sa.String(), nullable=True),
sa.Column("date", sa.Date(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["bill_id"],
[f"{schema}.bill.id"],
name=op.f("fk_bill_text_bill_id_bill"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_bill_text")),
sa.UniqueConstraint(
"bill_id", "version_code", name="uq_bill_text_bill_id_version_code"
),
schema=schema,
)
op.create_table(
"vote",
sa.Column("congress", sa.Integer(), nullable=False),
sa.Column("chamber", sa.String(), nullable=False),
sa.Column("session", sa.Integer(), nullable=False),
sa.Column("number", sa.Integer(), nullable=False),
sa.Column("vote_type", sa.String(), nullable=True),
sa.Column("question", sa.String(), nullable=True),
sa.Column("result", sa.String(), nullable=True),
sa.Column("result_text", sa.String(), nullable=True),
sa.Column("vote_date", sa.Date(), nullable=False),
sa.Column("yea_count", sa.Integer(), nullable=True),
sa.Column("nay_count", sa.Integer(), nullable=True),
sa.Column("not_voting_count", sa.Integer(), nullable=True),
sa.Column("present_count", sa.Integer(), nullable=True),
sa.Column("bill_id", sa.Integer(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["bill_id"], [f"{schema}.bill.id"], name=op.f("fk_vote_bill_id_bill")
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_vote")),
sa.UniqueConstraint(
"congress",
"chamber",
"session",
"number",
name="uq_vote_congress_chamber_session_number",
),
schema=schema,
)
op.create_index(
"ix_vote_congress_chamber",
"vote",
["congress", "chamber"],
unique=False,
schema=schema,
)
op.create_index("ix_vote_date", "vote", ["vote_date"], unique=False, schema=schema)
op.create_table(
"vote_record",
sa.Column("vote_id", sa.Integer(), nullable=False),
sa.Column("legislator_id", sa.Integer(), nullable=False),
sa.Column("position", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["legislator_id"],
[f"{schema}.legislator.id"],
name=op.f("fk_vote_record_legislator_id_legislator"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["vote_id"],
[f"{schema}.vote.id"],
name=op.f("fk_vote_record_vote_id_vote"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint(
"vote_id", "legislator_id", name=op.f("pk_vote_record")
),
schema=schema,
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("vote_record", schema=schema)
op.drop_index("ix_vote_date", table_name="vote", schema=schema)
op.drop_index("ix_vote_congress_chamber", table_name="vote", schema=schema)
op.drop_table("vote", schema=schema)
op.drop_table("bill_text", schema=schema)
op.drop_index(
op.f("ix_legislator_bioguide_id"), table_name="legislator", schema=schema
)
op.drop_table("legislator", schema=schema)
op.drop_index("ix_bill_congress", table_name="bill", schema=schema)
op.drop_table("bill", schema=schema)
# ### end Alembic commands ###
@@ -0,0 +1,68 @@
"""adding LegislatorSocialMedia.
Revision ID: 5cd7eee3549d
Revises: 83bfc8af92d8
Create Date: 2026-03-29 11:53:44.224799
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "5cd7eee3549d"
down_revision: str | None = "83bfc8af92d8"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"legislator_social_media",
sa.Column("legislator_id", sa.Integer(), nullable=False),
sa.Column("platform", sa.String(), nullable=False),
sa.Column("account_name", sa.String(), nullable=False),
sa.Column("url", sa.String(), nullable=True),
sa.Column("source", sa.String(), nullable=False),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["legislator_id"],
[f"{schema}.legislator.id"],
name=op.f("fk_legislator_social_media_legislator_id_legislator"),
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_legislator_social_media")),
schema=schema,
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("legislator_social_media", schema=schema)
# ### end Alembic commands ###
@@ -0,0 +1,245 @@
"""adding LegislatorScore and BillTopic.
Revision ID: ef4bc5411176
Revises: 5cd7eee3549d
Create Date: 2026-04-21 11:35:18.977213
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "ef4bc5411176"
down_revision: str | None = "5cd7eee3549d"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"bill_topic",
sa.Column("bill_id", sa.Integer(), nullable=False),
sa.Column("topic", sa.String(), nullable=False),
sa.Column(
"support_position",
sa.Enum("for", "against", name="bill_topic_position", native_enum=False),
nullable=False,
),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["bill_id"],
[f"{schema}.bill.id"],
name=op.f("fk_bill_topic_bill_id_bill"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_bill_topic")),
sa.UniqueConstraint(
"bill_id",
"topic",
"support_position",
name="uq_bill_topic_bill_id_topic_support_position",
),
schema=schema,
)
op.create_index(
"ix_bill_topic_topic", "bill_topic", ["topic"], unique=False, schema=schema
)
op.create_table(
"legislator_score",
sa.Column("legislator_id", sa.Integer(), nullable=False),
sa.Column("year", sa.Integer(), nullable=False),
sa.Column("topic", sa.String(), nullable=False),
sa.Column("score", sa.Float(), nullable=False),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["legislator_id"],
[f"{schema}.legislator.id"],
name=op.f("fk_legislator_score_legislator_id_legislator"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_legislator_score")),
sa.UniqueConstraint(
"legislator_id",
"year",
"topic",
name="uq_legislator_score_legislator_id_year_topic",
),
schema=schema,
)
op.create_index(
op.f("ix_legislator_score_legislator_id"),
"legislator_score",
["legislator_id"],
unique=False,
schema=schema,
)
op.create_index(
"ix_legislator_score_year_topic",
"legislator_score",
["year", "topic"],
unique=False,
schema=schema,
)
op.create_table(
"legislator_bill_score",
sa.Column("bill_id", sa.Integer(), nullable=False),
sa.Column("bill_topic_id", sa.Integer(), nullable=False),
sa.Column("legislator_id", sa.Integer(), nullable=False),
sa.Column("year", sa.Integer(), nullable=False),
sa.Column("topic", sa.String(), nullable=False),
sa.Column("score", sa.Float(), nullable=False),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["bill_id"],
[f"{schema}.bill.id"],
name=op.f("fk_legislator_bill_score_bill_id_bill"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["bill_topic_id"],
[f"{schema}.bill_topic.id"],
name=op.f("fk_legislator_bill_score_bill_topic_id_bill_topic"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["legislator_id"],
[f"{schema}.legislator.id"],
name=op.f("fk_legislator_bill_score_legislator_id_legislator"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_legislator_bill_score")),
sa.UniqueConstraint(
"bill_topic_id",
"legislator_id",
"year",
name="uq_legislator_bill_score_bill_topic_id_legislator_id_year",
),
schema=schema,
)
op.create_index(
op.f("ix_legislator_bill_score_bill_id"),
"legislator_bill_score",
["bill_id"],
unique=False,
schema=schema,
)
op.create_index(
op.f("ix_legislator_bill_score_bill_topic_id"),
"legislator_bill_score",
["bill_topic_id"],
unique=False,
schema=schema,
)
op.create_index(
op.f("ix_legislator_bill_score_legislator_id"),
"legislator_bill_score",
["legislator_id"],
unique=False,
schema=schema,
)
op.create_index(
"ix_legislator_bill_score_year_topic",
"legislator_bill_score",
["year", "topic"],
unique=False,
schema=schema,
)
op.add_column(
"bill",
sa.Column("score_processed_at", sa.DateTime(timezone=True), nullable=True),
schema=schema,
)
op.add_column(
"bill_text", sa.Column("summary", sa.String(), nullable=True), schema=schema
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("bill_text", "summary", schema=schema)
op.drop_column("bill", "score_processed_at", schema=schema)
op.drop_index(
"ix_legislator_bill_score_year_topic",
table_name="legislator_bill_score",
schema=schema,
)
op.drop_index(
op.f("ix_legislator_bill_score_legislator_id"),
table_name="legislator_bill_score",
schema=schema,
)
op.drop_index(
op.f("ix_legislator_bill_score_bill_topic_id"),
table_name="legislator_bill_score",
schema=schema,
)
op.drop_index(
op.f("ix_legislator_bill_score_bill_id"),
table_name="legislator_bill_score",
schema=schema,
)
op.drop_table("legislator_bill_score", schema=schema)
op.drop_index(
"ix_legislator_score_year_topic", table_name="legislator_score", schema=schema
)
op.drop_index(
op.f("ix_legislator_score_legislator_id"),
table_name="legislator_score",
schema=schema,
)
op.drop_table("legislator_score", schema=schema)
op.drop_index("ix_bill_topic_topic", table_name="bill_topic", schema=schema)
op.drop_table("bill_topic", schema=schema)
# ### end Alembic commands ###
@@ -0,0 +1,146 @@
"""removed LegislatorBillScore.
Revision ID: b63ed11d6775
Revises: 7d15f9b7c8a2
Create Date: 2026-04-21 22:46:48.058542
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "b63ed11d6775"
down_revision: str | None = "7d15f9b7c8a2"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(
op.f("ix_legislator_bill_score_bill_id"),
table_name="legislator_bill_score",
schema=schema,
)
op.drop_index(
op.f("ix_legislator_bill_score_bill_topic_id"),
table_name="legislator_bill_score",
schema=schema,
)
op.drop_index(
op.f("ix_legislator_bill_score_legislator_id"),
table_name="legislator_bill_score",
schema=schema,
)
op.drop_index(
op.f("ix_legislator_bill_score_year_topic"),
table_name="legislator_bill_score",
schema=schema,
)
op.drop_table("legislator_bill_score", schema=schema)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"legislator_bill_score",
sa.Column("bill_id", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("bill_topic_id", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("legislator_id", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("year", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("topic", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column(
"score",
sa.DOUBLE_PRECISION(precision=53),
autoincrement=False,
nullable=False,
),
sa.Column("id", sa.INTEGER(), autoincrement=True, nullable=False),
sa.Column(
"created",
postgresql.TIMESTAMP(timezone=True),
server_default=sa.text("now()"),
autoincrement=False,
nullable=False,
),
sa.Column(
"updated",
postgresql.TIMESTAMP(timezone=True),
server_default=sa.text("now()"),
autoincrement=False,
nullable=False,
),
sa.ForeignKeyConstraint(
["bill_id"],
[f"{schema}.bill.id"],
name=op.f("fk_legislator_bill_score_bill_id_bill"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["bill_topic_id"],
[f"{schema}.bill_topic.id"],
name=op.f("fk_legislator_bill_score_bill_topic_id_bill_topic"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["legislator_id"],
[f"{schema}.legislator.id"],
name=op.f("fk_legislator_bill_score_legislator_id_legislator"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_legislator_bill_score")),
sa.UniqueConstraint(
"bill_topic_id",
"legislator_id",
"year",
name=op.f("uq_legislator_bill_score_bill_topic_id_legislator_id_year"),
postgresql_include=[],
postgresql_nulls_not_distinct=False,
),
schema=schema,
)
op.create_index(
op.f("ix_legislator_bill_score_year_topic"),
"legislator_bill_score",
["year", "topic"],
unique=False,
schema=schema,
)
op.create_index(
op.f("ix_legislator_bill_score_legislator_id"),
"legislator_bill_score",
["legislator_id"],
unique=False,
schema=schema,
)
op.create_index(
op.f("ix_legislator_bill_score_bill_topic_id"),
"legislator_bill_score",
["bill_topic_id"],
unique=False,
schema=schema,
)
op.create_index(
op.f("ix_legislator_bill_score_bill_id"),
"legislator_bill_score",
["bill_id"],
unique=False,
schema=schema,
)
# ### end Alembic commands ###
@@ -0,0 +1,54 @@
"""add bill_text summarization metadata.
Revision ID: 7d15f9b7c8a2
Revises: ef4bc5411176
Create Date: 2026-04-22 00:00:00.000000
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "7d15f9b7c8a2"
down_revision: str | None = "ef4bc5411176"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
op.add_column(
"bill_text",
sa.Column("summarization_model", sa.String(), nullable=True),
schema=schema,
)
op.add_column(
"bill_text",
sa.Column("summarization_user_prompt_version", sa.String(), nullable=True),
schema=schema,
)
op.add_column(
"bill_text",
sa.Column("summarization_system_prompt_version", sa.String(), nullable=True),
schema=schema,
)
def downgrade() -> None:
"""Downgrade."""
op.drop_column(
"bill_text", "summarization_system_prompt_version", schema=schema
)
op.drop_column("bill_text", "summarization_user_prompt_version", schema=schema)
op.drop_column("bill_text", "summarization_model", schema=schema)
@@ -0,0 +1,98 @@
"""adding LegislatorScoreFake.
Revision ID: 06f833813bd7
Revises: b63ed11d6775
Create Date: 2026-04-22 18:41:07.484609
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "06f833813bd7"
down_revision: str | None = "b63ed11d6775"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"legislator_score_fake",
sa.Column("legislator_id", sa.Integer(), nullable=False),
sa.Column("year", sa.Integer(), nullable=False),
sa.Column("topic", sa.String(), nullable=False),
sa.Column("score", sa.Float(), nullable=False),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["legislator_id"],
[f"{schema}.legislator.id"],
name=op.f("fk_legislator_score_fake_legislator_id_legislator"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_legislator_score_fake")),
sa.UniqueConstraint(
"legislator_id",
"year",
"topic",
name="uq_legislator_score_fake_legislator_id_year_topic",
),
schema=schema,
)
op.create_index(
op.f("ix_legislator_score_fake_legislator_id"),
"legislator_score_fake",
["legislator_id"],
unique=False,
schema=schema,
)
op.create_index(
"ix_legislator_score_fake_year_topic",
"legislator_score_fake",
["year", "topic"],
unique=False,
schema=schema,
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(
"ix_legislator_score_fake_year_topic",
table_name="legislator_score_fake",
schema=schema,
)
op.drop_index(
op.f("ix_legislator_score_fake_legislator_id"),
table_name="legislator_score_fake",
schema=schema,
)
op.drop_table("legislator_score_fake", schema=schema)
# ### end Alembic commands ###
@@ -0,0 +1,64 @@
"""add vote.bill_text_id linkage.
Revision ID: 9c7d4a2e1b10
Revises: 06f833813bd7
Create Date: 2026-04-23 00:00:00.000000
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "9c7d4a2e1b10"
down_revision: str | None = "06f833813bd7"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
op.add_column(
"vote",
sa.Column("bill_text_id", sa.Integer(), nullable=True),
schema=schema,
)
op.create_index(
"ix_vote_bill_text_id",
"vote",
["bill_text_id"],
unique=False,
schema=schema,
)
op.create_foreign_key(
"fk_vote_bill_text_id_bill_text",
"vote",
"bill_text",
["bill_text_id"],
["id"],
source_schema=schema,
referent_schema=schema,
)
def downgrade() -> None:
"""Downgrade."""
op.drop_constraint(
"fk_vote_bill_text_id_bill_text",
"vote",
schema=schema,
type_="foreignkey",
)
op.drop_index("ix_vote_bill_text_id", table_name="vote", schema=schema)
op.drop_column("vote", "bill_text_id", schema=schema)
@@ -0,0 +1,844 @@
"""canonical vote context v3.
Revision ID: 1f8c0e7a9d21
Revises: 9c7d4a2e1b10
Create Date: 2026-04-25 00:00:00.000000
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
revision: str = "1f8c0e7a9d21"
down_revision: str | None = "9c7d4a2e1b10"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
op.create_table(
"ingest_run",
sa.Column("started_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("completed_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("git_sha", sa.String(), nullable=True),
sa.Column("classifier_version", sa.String(), nullable=True),
sa.Column("source_snapshot_label", sa.String(), nullable=True),
sa.Column("status", sa.String(), nullable=False),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_ingest_run")),
schema=schema,
)
op.create_table(
"source_artifact",
sa.Column("source_kind", sa.String(), nullable=False),
sa.Column("congress", sa.Integer(), nullable=False),
sa.Column("chamber", sa.String(), nullable=True),
sa.Column("local_path", sa.String(), nullable=False),
sa.Column("source_url", sa.String(), nullable=True),
sa.Column("sha256", sa.String(), nullable=False),
sa.Column("byte_size", sa.Integer(), nullable=False),
sa.Column("modified_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("ingested_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("ingest_run_id", sa.Integer(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["ingest_run_id"],
[f"{schema}.ingest_run.id"],
name=op.f("fk_source_artifact_ingest_run_id_ingest_run"),
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_source_artifact")),
schema=schema,
)
op.create_index(
"ix_source_artifact_source_kind",
"source_artifact",
["source_kind"],
unique=False,
schema=schema,
)
op.create_index(
"ix_source_artifact_congress",
"source_artifact",
["congress"],
unique=False,
schema=schema,
)
op.create_table(
"score_run",
sa.Column("ingest_run_id", sa.Integer(), nullable=True),
sa.Column("classifier_version", sa.String(), nullable=True),
sa.Column("scoring_version", sa.String(), nullable=True),
sa.Column("included_vote_count", sa.Integer(), nullable=False),
sa.Column("excluded_vote_count", sa.Integer(), nullable=False),
sa.Column("started_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("completed_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["ingest_run_id"],
[f"{schema}.ingest_run.id"],
name=op.f("fk_score_run_ingest_run_id_ingest_run"),
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_score_run")),
schema=schema,
)
op.add_column(
"legislator_score",
sa.Column("score_run_id", sa.Integer(), nullable=True),
schema=schema,
)
op.create_index(
op.f("ix_legislator_score_score_run_id"),
"legislator_score",
["score_run_id"],
unique=False,
schema=schema,
)
op.create_foreign_key(
op.f("fk_legislator_score_score_run_id_score_run"),
"legislator_score",
"score_run",
["score_run_id"],
["id"],
source_schema=schema,
referent_schema=schema,
ondelete="CASCADE",
)
op.add_column(
"bill_text",
sa.Column("source_datetime_raw", sa.String(), nullable=True),
schema=schema,
)
op.add_column(
"bill_text", sa.Column("text_url_xml", sa.String(), nullable=True), schema=schema
)
op.add_column(
"bill_text", sa.Column("text_url_pdf", sa.String(), nullable=True), schema=schema
)
op.add_column(
"bill_text",
sa.Column("text_url_html", sa.String(), nullable=True),
schema=schema,
)
op.add_column(
"bill_text",
sa.Column("source_artifact_id", sa.Integer(), nullable=True),
schema=schema,
)
op.create_foreign_key(
op.f("fk_bill_text_source_artifact_id_source_artifact"),
"bill_text",
"source_artifact",
["source_artifact_id"],
["id"],
source_schema=schema,
referent_schema=schema,
ondelete="SET NULL",
)
op.create_table(
"bill_action",
sa.Column("bill_id", sa.Integer(), nullable=False),
sa.Column("sequence", sa.Integer(), nullable=False),
sa.Column("action_date", sa.Date(), nullable=False),
sa.Column("action_time", sa.String(), nullable=True),
sa.Column("action_text", sa.String(), nullable=False),
sa.Column("action_type", sa.String(), nullable=True),
sa.Column("action_code", sa.String(), nullable=True),
sa.Column("source_system_code", sa.String(), nullable=True),
sa.Column("source_system_name", sa.String(), nullable=True),
sa.Column("source_artifact_id", sa.Integer(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["bill_id"],
[f"{schema}.bill.id"],
name=op.f("fk_bill_action_bill_id_bill"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["source_artifact_id"],
[f"{schema}.source_artifact.id"],
name=op.f("fk_bill_action_source_artifact_id_source_artifact"),
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_bill_action")),
sa.UniqueConstraint("bill_id", "sequence", name="uq_bill_action_bill_id_sequence"),
schema=schema,
)
op.create_table(
"bill_action_recorded_vote",
sa.Column("bill_action_id", sa.Integer(), nullable=False),
sa.Column("congress", sa.Integer(), nullable=False),
sa.Column("chamber", sa.String(), nullable=False),
sa.Column("session_number", sa.Integer(), nullable=False),
sa.Column("roll_number", sa.Integer(), nullable=False),
sa.Column("vote_datetime", sa.DateTime(timezone=True), nullable=True),
sa.Column("vote_url", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["bill_action_id"],
[f"{schema}.bill_action.id"],
name=op.f("fk_bill_action_recorded_vote_bill_action_id_bill_action"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_bill_action_recorded_vote")),
sa.UniqueConstraint(
"bill_action_id",
"congress",
"chamber",
"session_number",
"roll_number",
name="uq_bill_action_recorded_vote_match_key",
),
schema=schema,
)
op.create_table(
"bill_relation",
sa.Column("bill_id", sa.Integer(), nullable=False),
sa.Column("related_bill_id", sa.Integer(), nullable=False),
sa.Column("relationship_type", sa.String(), nullable=False),
sa.Column("identified_by", sa.String(), nullable=True),
sa.Column("latest_action_date", sa.Date(), nullable=True),
sa.Column("latest_action_text", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["bill_id"],
[f"{schema}.bill.id"],
name=op.f("fk_bill_relation_bill_id_bill"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["related_bill_id"],
[f"{schema}.bill.id"],
name=op.f("fk_bill_relation_related_bill_id_bill"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_bill_relation")),
schema=schema,
)
op.create_index(
"ix_bill_relation_bill_id",
"bill_relation",
["bill_id"],
unique=False,
schema=schema,
)
op.create_index(
"ix_bill_relation_related_bill_id",
"bill_relation",
["related_bill_id"],
unique=False,
schema=schema,
)
op.create_table(
"amendment",
sa.Column("congress", sa.Integer(), nullable=False),
sa.Column("amendment_type", sa.String(), nullable=False),
sa.Column("number", sa.Integer(), nullable=False),
sa.Column("chamber", sa.String(), nullable=False),
sa.Column("description", sa.String(), nullable=True),
sa.Column("purpose", sa.String(), nullable=True),
sa.Column("amended_bill_id", sa.Integer(), nullable=True),
sa.Column("amended_amendment_id", sa.Integer(), nullable=True),
sa.Column("source_path", sa.String(), nullable=True),
sa.Column("source_artifact_id", sa.Integer(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["amended_amendment_id"],
[f"{schema}.amendment.id"],
name=op.f("fk_amendment_amended_amendment_id_amendment"),
ondelete="SET NULL",
),
sa.ForeignKeyConstraint(
["amended_bill_id"],
[f"{schema}.bill.id"],
name=op.f("fk_amendment_amended_bill_id_bill"),
ondelete="SET NULL",
),
sa.ForeignKeyConstraint(
["source_artifact_id"],
[f"{schema}.source_artifact.id"],
name=op.f("fk_amendment_source_artifact_id_source_artifact"),
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_amendment")),
sa.UniqueConstraint(
"congress",
"amendment_type",
"number",
name="uq_amendment_congress_type_number",
),
schema=schema,
)
op.create_table(
"amendment_action",
sa.Column("amendment_id", sa.Integer(), nullable=False),
sa.Column("sequence", sa.Integer(), nullable=False),
sa.Column("action_date", sa.Date(), nullable=False),
sa.Column("action_time", sa.String(), nullable=True),
sa.Column("action_text", sa.String(), nullable=False),
sa.Column("action_type", sa.String(), nullable=True),
sa.Column("action_code", sa.String(), nullable=True),
sa.Column("source_system_code", sa.String(), nullable=True),
sa.Column("source_system_name", sa.String(), nullable=True),
sa.Column("source_artifact_id", sa.Integer(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["amendment_id"],
[f"{schema}.amendment.id"],
name=op.f("fk_amendment_action_amendment_id_amendment"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["source_artifact_id"],
[f"{schema}.source_artifact.id"],
name=op.f("fk_amendment_action_source_artifact_id_source_artifact"),
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_amendment_action")),
sa.UniqueConstraint(
"amendment_id",
"sequence",
name="uq_amendment_action_amendment_id_sequence",
),
schema=schema,
)
op.create_table(
"amendment_action_recorded_vote",
sa.Column("amendment_action_id", sa.Integer(), nullable=False),
sa.Column("congress", sa.Integer(), nullable=False),
sa.Column("chamber", sa.String(), nullable=False),
sa.Column("session_number", sa.Integer(), nullable=False),
sa.Column("roll_number", sa.Integer(), nullable=False),
sa.Column("vote_datetime", sa.DateTime(timezone=True), nullable=True),
sa.Column("vote_url", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["amendment_action_id"],
[f"{schema}.amendment_action.id"],
name=op.f(
"fk_amendment_action_recorded_vote_amendment_action_id_amendment_action"
),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_amendment_action_recorded_vote")),
sa.UniqueConstraint(
"amendment_action_id",
"congress",
"chamber",
"session_number",
"roll_number",
name="uq_amendment_action_recorded_vote_match_key",
),
schema=schema,
)
op.drop_constraint(
"uq_vote_congress_chamber_session_number",
"vote",
schema=schema,
type_="unique",
)
op.alter_column("vote", "session", new_column_name="session_year", schema=schema)
op.alter_column("vote", "number", new_column_name="roll_number", schema=schema)
op.add_column("vote", sa.Column("session_number", sa.Integer(), nullable=True), schema=schema)
op.add_column(
"vote",
sa.Column("vote_datetime", sa.DateTime(timezone=True), nullable=True),
schema=schema,
)
op.add_column(
"vote", sa.Column("raw_vote_source_url", sa.String(), nullable=True), schema=schema
)
op.add_column(
"vote",
sa.Column("raw_bill_ref", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
schema=schema,
)
op.add_column(
"vote",
sa.Column(
"raw_amendment_ref",
postgresql.JSONB(astext_type=sa.Text()),
nullable=True,
),
schema=schema,
)
op.add_column(
"vote",
sa.Column(
"raw_nomination_ref",
postgresql.JSONB(astext_type=sa.Text()),
nullable=True,
),
schema=schema,
)
op.add_column(
"vote",
sa.Column(
"raw_treaty_ref",
postgresql.JSONB(astext_type=sa.Text()),
nullable=True,
),
schema=schema,
)
op.add_column(
"vote",
sa.Column("raw_vote_source_artifact_id", sa.Integer(), nullable=True),
schema=schema,
)
op.create_foreign_key(
op.f("fk_vote_raw_vote_source_artifact_id_source_artifact"),
"vote",
"source_artifact",
["raw_vote_source_artifact_id"],
["id"],
source_schema=schema,
referent_schema=schema,
ondelete="SET NULL",
)
op.execute(
sa.text(
f"""
UPDATE {schema}.vote
SET session_number = session_year - (((congress - 1) * 2) + 1789) + 1
"""
)
)
op.alter_column("vote", "session_number", nullable=False, schema=schema)
op.create_unique_constraint(
"uq_vote_congress_chamber_session_number_roll_number",
"vote",
["congress", "chamber", "session_number", "roll_number"],
schema=schema,
)
op.drop_constraint(
op.f("fk_vote_bill_id_bill"),
"vote",
schema=schema,
type_="foreignkey",
)
op.drop_constraint(
"fk_vote_bill_text_id_bill_text",
"vote",
schema=schema,
type_="foreignkey",
)
op.drop_index("ix_vote_bill_text_id", table_name="vote", schema=schema)
op.drop_column("vote", "bill_id", schema=schema)
op.drop_column("vote", "bill_text_id", schema=schema)
op.create_table(
"vote_action_match",
sa.Column("vote_id", sa.Integer(), nullable=False),
sa.Column("action_scope", sa.String(), nullable=False),
sa.Column("bill_action_id", sa.Integer(), nullable=True),
sa.Column("amendment_action_id", sa.Integer(), nullable=True),
sa.Column("is_selected", sa.Boolean(), nullable=False),
sa.Column("match_method", sa.String(), nullable=False),
sa.Column("match_reason", sa.String(), nullable=True),
sa.Column("match_confidence", sa.String(), nullable=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["amendment_action_id"],
[f"{schema}.amendment_action.id"],
name=op.f("fk_vote_action_match_amendment_action_id_amendment_action"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["bill_action_id"],
[f"{schema}.bill_action.id"],
name=op.f("fk_vote_action_match_bill_action_id_bill_action"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["vote_id"],
[f"{schema}.vote.id"],
name=op.f("fk_vote_action_match_vote_id_vote"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_vote_action_match")),
schema=schema,
)
op.create_index(
"ix_vote_action_match_vote_id",
"vote_action_match",
["vote_id"],
unique=False,
schema=schema,
)
op.create_index(
"uq_vote_action_match_selected_vote_id",
"vote_action_match",
["vote_id"],
unique=True,
schema=schema,
postgresql_where=sa.text("is_selected"),
)
op.create_table(
"vote_classification",
sa.Column("vote_id", sa.Integer(), nullable=False),
sa.Column("subject_type", sa.String(), nullable=False),
sa.Column("measure_type", sa.String(), nullable=True),
sa.Column("measure_subtype", sa.String(), nullable=True),
sa.Column("measure_function", sa.String(), nullable=True),
sa.Column("vote_relationship", sa.String(), nullable=False),
sa.Column("is_legislation_related", sa.Boolean(), nullable=False),
sa.Column("is_direct_vote_on_legislative_text", sa.Boolean(), nullable=False),
sa.Column("is_substantive_policy_vote", sa.Boolean(), nullable=False),
sa.Column("is_lawmaking_vehicle", sa.Boolean(), nullable=False),
sa.Column("is_special_rule", sa.Boolean(), nullable=False),
sa.Column("classification_method", sa.String(), nullable=False),
sa.Column("classification_confidence_reason", sa.String(), nullable=True),
sa.Column("confidence", sa.String(), nullable=False),
sa.Column("classified_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("classification_version", sa.String(), nullable=False),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["vote_id"],
[f"{schema}.vote.id"],
name=op.f("fk_vote_classification_vote_id_vote"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_vote_classification")),
sa.UniqueConstraint("vote_id", name=op.f("uq_vote_classification_vote_id")),
schema=schema,
)
op.create_index(
"ix_vote_classification_subject_type",
"vote_classification",
["subject_type"],
unique=False,
schema=schema,
)
op.create_table(
"vote_measure_link",
sa.Column("vote_id", sa.Integer(), nullable=False),
sa.Column("measure_id", sa.Integer(), nullable=False),
sa.Column("role", sa.String(), nullable=False),
sa.Column("source", sa.String(), nullable=False),
sa.Column("confidence", sa.String(), nullable=False),
sa.Column("notes", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["measure_id"],
[f"{schema}.bill.id"],
name=op.f("fk_vote_measure_link_measure_id_bill"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_vote_measure_link")),
schema=schema,
)
op.create_index(
"ix_vote_measure_link_vote_id",
"vote_measure_link",
["vote_id"],
unique=False,
schema=schema,
)
op.create_foreign_key(
op.f("fk_vote_measure_link_vote_id_vote"),
"vote_measure_link",
"vote",
["vote_id"],
["id"],
source_schema=schema,
referent_schema=schema,
ondelete="CASCADE",
)
op.create_table(
"vote_text_target",
sa.Column("vote_id", sa.Integer(), nullable=False),
sa.Column("text_target_type", sa.String(), nullable=False),
sa.Column("voted_text_version_id", sa.Integer(), nullable=True),
sa.Column("resulting_text_version_id", sa.Integer(), nullable=True),
sa.Column("related_amendment_id", sa.Integer(), nullable=True),
sa.Column("text_target_basis", sa.String(), nullable=False),
sa.Column("text_resolution_method", sa.String(), nullable=False),
sa.Column("text_resolution_confidence_reason", sa.String(), nullable=True),
sa.Column("confidence", sa.String(), nullable=False),
sa.Column("notes", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["related_amendment_id"],
[f"{schema}.amendment.id"],
name=op.f("fk_vote_text_target_related_amendment_id_amendment"),
ondelete="SET NULL",
),
sa.ForeignKeyConstraint(
["resulting_text_version_id"],
[f"{schema}.bill_text.id"],
name=op.f("fk_vote_text_target_resulting_text_version_id_bill_text"),
ondelete="SET NULL",
),
sa.ForeignKeyConstraint(
["vote_id"],
[f"{schema}.vote.id"],
name=op.f("fk_vote_text_target_vote_id_vote"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["voted_text_version_id"],
[f"{schema}.bill_text.id"],
name=op.f("fk_vote_text_target_voted_text_version_id_bill_text"),
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_vote_text_target")),
sa.UniqueConstraint("vote_id", name=op.f("uq_vote_text_target_vote_id")),
schema=schema,
)
op.create_table(
"vote_position_meaning",
sa.Column("vote_id", sa.Integer(), nullable=False),
sa.Column("yea_effect", sa.String(), nullable=False),
sa.Column("nay_effect", sa.String(), nullable=False),
sa.Column("present_effect", sa.String(), nullable=False),
sa.Column("polarity_confidence", sa.String(), nullable=False),
sa.Column("polarity_method", sa.String(), nullable=False),
sa.Column("notes", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["vote_id"],
[f"{schema}.vote.id"],
name=op.f("fk_vote_position_meaning_vote_id_vote"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_vote_position_meaning")),
sa.UniqueConstraint("vote_id", name=op.f("uq_vote_position_meaning_vote_id")),
schema=schema,
)
op.create_table(
"vote_context_audit",
sa.Column("vote_id", sa.Integer(), nullable=False),
sa.Column("step", sa.String(), nullable=False),
sa.Column("message", sa.String(), nullable=False),
sa.Column("severity", sa.String(), nullable=False),
sa.Column("source_path", sa.String(), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["vote_id"],
[f"{schema}.vote.id"],
name=op.f("fk_vote_context_audit_vote_id_vote"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_vote_context_audit")),
schema=schema,
)
op.create_index(
"ix_vote_context_audit_vote_id",
"vote_context_audit",
["vote_id"],
unique=False,
schema=schema,
)
def downgrade() -> None:
"""Downgrade."""
raise NotImplementedError("Downgrade is not supported for canonical vote context v3.")
@@ -0,0 +1,203 @@
"""add supporting indexes for congress vote context and scoring.
Revision ID: a7b91c4e2d30
Revises: 1f8c0e7a9d21
Create Date: 2026-04-26 00:00:00.000000
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
revision: str = "a7b91c4e2d30"
down_revision: str | None = "1f8c0e7a9d21"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def _dedupe_source_artifacts() -> None:
op.execute(
sa.text(
f"""
CREATE TEMP TABLE tmp_source_artifact_dups AS
WITH ranked AS (
SELECT
id,
first_value(id) OVER (
PARTITION BY ingest_run_id, local_path, sha256
ORDER BY id
) AS keep_id,
row_number() OVER (
PARTITION BY ingest_run_id, local_path, sha256
ORDER BY id
) AS rn
FROM {schema}.source_artifact
WHERE ingest_run_id IS NOT NULL
)
SELECT id, keep_id
FROM ranked
WHERE rn > 1
"""
)
)
for table_name, column_name in (
("bill_text", "source_artifact_id"),
("bill_action", "source_artifact_id"),
("amendment", "source_artifact_id"),
("amendment_action", "source_artifact_id"),
("vote", "raw_vote_source_artifact_id"),
):
op.execute(
sa.text(
f"""
UPDATE {schema}.{table_name} AS target
SET {column_name} = d.keep_id
FROM tmp_source_artifact_dups AS d
WHERE target.{column_name} = d.id
"""
)
)
op.execute(
sa.text(
f"""
DELETE FROM {schema}.source_artifact AS source_artifact
USING tmp_source_artifact_dups AS d
WHERE source_artifact.id = d.id
"""
)
)
op.execute(sa.text("DROP TABLE tmp_source_artifact_dups"))
def upgrade() -> None:
"""Upgrade."""
_dedupe_source_artifacts()
op.create_index(
"uq_source_artifact_ingest_identity",
"source_artifact",
["ingest_run_id", "local_path", "sha256"],
unique=True,
schema=schema,
)
op.create_index(
"ix_bill_action_recorded_vote_match_tuple",
"bill_action_recorded_vote",
["congress", "chamber", "session_number", "roll_number"],
unique=False,
schema=schema,
)
op.create_index(
"ix_amendment_action_recorded_vote_match_tuple",
"amendment_action_recorded_vote",
["congress", "chamber", "session_number", "roll_number"],
unique=False,
schema=schema,
)
op.create_index(
"ix_vote_classification_eligible_vote_id",
"vote_classification",
["vote_id"],
unique=False,
schema=schema,
postgresql_where=sa.text(
"subject_type = 'measure' "
"AND vote_relationship = 'direct_text_vote' "
"AND is_direct_vote_on_legislative_text "
"AND is_substantive_policy_vote "
"AND NOT is_special_rule"
),
)
op.create_index(
"ix_vote_measure_link_vote_id_role",
"vote_measure_link",
["vote_id", "role"],
unique=False,
schema=schema,
)
op.create_index(
"ix_vote_measure_link_measure_id_role",
"vote_measure_link",
["measure_id", "role"],
unique=False,
schema=schema,
)
op.create_index(
"ix_vote_text_target_voted_text_version_id",
"vote_text_target",
["voted_text_version_id"],
unique=False,
schema=schema,
postgresql_where=sa.text("voted_text_version_id IS NOT NULL"),
)
op.create_index(
"ix_vote_context_audit_severity_vote_id",
"vote_context_audit",
["severity", "vote_id"],
unique=False,
schema=schema,
)
op.create_index(
"ix_legislator_current_chamber",
"legislator",
["current_chamber"],
unique=False,
schema=schema,
)
def downgrade() -> None:
"""Downgrade."""
op.drop_index("ix_legislator_current_chamber", table_name="legislator", schema=schema)
op.drop_index(
"ix_vote_context_audit_severity_vote_id",
table_name="vote_context_audit",
schema=schema,
)
op.drop_index(
"ix_vote_text_target_voted_text_version_id",
table_name="vote_text_target",
schema=schema,
)
op.drop_index(
"ix_vote_measure_link_measure_id_role",
table_name="vote_measure_link",
schema=schema,
)
op.drop_index(
"ix_vote_measure_link_vote_id_role",
table_name="vote_measure_link",
schema=schema,
)
op.drop_index(
"ix_vote_classification_eligible_vote_id",
table_name="vote_classification",
schema=schema,
)
op.drop_index(
"ix_amendment_action_recorded_vote_match_tuple",
table_name="amendment_action_recorded_vote",
schema=schema,
)
op.drop_index(
"ix_bill_action_recorded_vote_match_tuple",
table_name="bill_action_recorded_vote",
schema=schema,
)
op.drop_index(
"uq_source_artifact_ingest_identity",
table_name="source_artifact",
schema=schema,
)
@@ -0,0 +1,66 @@
"""adding PostTopic.
Revision ID: 032e26bbfcb5
Revises: a7b91c4e2d30
Create Date: 2026-04-26 14:34:35.688341
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "032e26bbfcb5"
down_revision: str | None = "a7b91c4e2d30"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"post_topic",
sa.Column("post_id", sa.BigInteger(), nullable=False),
sa.Column("topic_id", sa.SmallInteger(), nullable=False),
sa.Column("topic_label", sa.String(), nullable=True),
sa.Column("model_version", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_post_topic")),
schema=schema,
)
op.create_index(
"ix_post_topic_post_id", "post_topic", ["post_id"], unique=False, schema=schema
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index("ix_post_topic_post_id", table_name="post_topic", schema=schema)
op.drop_table("post_topic", schema=schema)
# ### end Alembic commands ###
@@ -0,0 +1,35 @@
"""adding PG Vector.
Revision ID: b9360b0b0c22
Revises: 032e26bbfcb5
Create Date: 2026-04-26 14:35:08.770128
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "b9360b0b0c22"
down_revision: str | None = "032e26bbfcb5"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
op.execute("CREATE EXTENSION IF NOT EXISTS vector")
def downgrade() -> None:
"""Downgrade."""
op.execute("DROP EXTENSION IF EXISTS vector")
+138
View File
@@ -0,0 +1,138 @@
"""Alembic."""
from __future__ import annotations
import logging
import sys
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal
from alembic import context
from alembic.script import write_hooks
from sqlalchemy.schema import CreateSchema
from pipelines.common import bash_wrapper
from pipelines.orm.common import get_postgres_engine
if TYPE_CHECKING:
from collections.abc import MutableMapping
from sqlalchemy.orm import DeclarativeBase
config = context.config
base_class: type[DeclarativeBase] = config.attributes.get("base")
if base_class is None:
error = "No base class provided. Use the database CLI to run alembic commands."
raise RuntimeError(error)
target_metadata = base_class.metadata
logging.basicConfig(
level="DEBUG",
datefmt="%Y-%m-%dT%H:%M:%S%z",
format="%(asctime)s %(levelname)s %(filename)s:%(lineno)d - %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
@write_hooks.register("dynamic_schema")
def dynamic_schema(filename: str, _options: dict[Any, Any]) -> None:
"""Dynamic schema."""
original_file = Path(filename).read_text()
schema_name = base_class.schema_name
dynamic_schema_file_part1 = original_file.replace(
f"schema='{schema_name}'", "schema=schema"
)
dynamic_schema_file = dynamic_schema_file_part1.replace(
f"'{schema_name}.", "f'{schema}."
)
Path(filename).write_text(dynamic_schema_file)
@write_hooks.register("import_postgresql")
def import_postgresql(filename: str, _options: dict[Any, Any]) -> None:
"""Add postgresql dialect import when postgresql types are used."""
content = Path(filename).read_text()
if (
"postgresql." in content
and "from sqlalchemy.dialects import postgresql" not in content
):
content = content.replace(
"import sqlalchemy as sa\n",
"import sqlalchemy as sa\nfrom sqlalchemy.dialects import postgresql\n",
)
Path(filename).write_text(content)
@write_hooks.register("ruff")
def ruff_check_and_format(filename: str, _options: dict[Any, Any]) -> None:
"""Docstring for ruff_check_and_format."""
bash_wrapper(f"ruff check --fix {filename}")
bash_wrapper(f"ruff format {filename}")
def include_name(
name: str | None,
type_: Literal[
"schema",
"table",
"column",
"index",
"unique_constraint",
"foreign_key_constraint",
],
_parent_names: MutableMapping[
Literal["schema_name", "table_name", "schema_qualified_table_name"], str | None
],
) -> bool:
"""Filter tables to be included in the migration.
Args:
name (str): The name of the table.
type_ (str): The type of the table.
_parent_names (MutableMapping): The names of the parent tables.
Returns:
bool: True if the table should be included, False otherwise.
"""
if type_ == "schema":
# allows a database with multiple schemas to have separate alembic revisions
return name == target_metadata.schema
return True
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
env_prefix = config.attributes.get("env_prefix", "POSTGRES")
connectable = get_postgres_engine(name=env_prefix)
with connectable.connect() as connection:
schema = base_class.schema_name
if not connectable.dialect.has_schema(connection, schema):
answer = input(f"Schema {schema!r} does not exist. Create it? [y/N] ")
if answer.lower() != "y":
error = f"Schema {schema!r} does not exist. Exiting."
raise SystemExit(error)
connection.execute(CreateSchema(schema))
connection.commit()
context.configure(
connection=connection,
target_metadata=target_metadata,
include_schemas=True,
version_table_schema=schema,
include_name=include_name,
)
with context.begin_transaction():
context.run_migrations()
connection.commit()
run_migrations_online()
+36
View File
@@ -0,0 +1,36 @@
"""${message}.
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from pipelines.orm import ${config.attributes["base"].__name__}
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: str | None = ${repr(down_revision)}
branch_labels: str | Sequence[str] | None = ${repr(branch_labels)}
depends_on: str | Sequence[str] | None = ${repr(depends_on)}
schema=${config.attributes["base"].__name__}.schema_name
def upgrade() -> None:
"""Upgrade."""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Downgrade."""
${downgrades if downgrades else "pass"}
+123
View File
@@ -0,0 +1,123 @@
"""CLI wrapper around alembic for multi-database support.
Usage:
database <db_name> <command> [args...]
Examples:
database van_inventory upgrade head
database van_inventory downgrade head-1
database van_inventory revision --autogenerate -m "add meals table"
database van_inventory check
database richie check
database richie upgrade head
"""
from __future__ import annotations
from dataclasses import dataclass
from importlib import import_module
from typing import TYPE_CHECKING, Annotated
import typer
from alembic.config import CommandLine, Config
if TYPE_CHECKING:
from sqlalchemy.orm import DeclarativeBase
@dataclass(frozen=True)
class DatabaseConfig:
"""Configuration for a database."""
env_prefix: str
version_location: str
base_module: str
base_class_name: str
models_module: str
script_location: str = "alembic"
file_template: str = "%%(year)d_%%(month).2d_%%(day).2d-%%(slug)s_%%(rev)s"
def get_base(self) -> type[DeclarativeBase]:
"""Import and return the Base class."""
module = import_module(self.base_module)
return getattr(module, self.base_class_name)
def import_models(self) -> None:
"""Import ORM models so alembic autogenerate can detect them."""
import_module(self.models_module)
def alembic_config(self) -> Config:
"""Build an alembic Config for this database."""
# Runtime import needed — Config is in TYPE_CHECKING for the return type annotation
from alembic.config import Config as AlembicConfig # noqa: PLC0415
cfg = AlembicConfig()
cfg.set_main_option("script_location", self.script_location)
cfg.set_main_option("file_template", self.file_template)
cfg.set_main_option("prepend_sys_path", ".")
cfg.set_main_option("version_path_separator", "os")
cfg.set_main_option("version_locations", self.version_location)
cfg.set_main_option("revision_environment", "true")
cfg.set_section_option(
"post_write_hooks", "hooks", "dynamic_schema,import_postgresql,ruff"
)
cfg.set_section_option(
"post_write_hooks", "dynamic_schema.type", "dynamic_schema"
)
cfg.set_section_option(
"post_write_hooks", "import_postgresql.type", "import_postgresql"
)
cfg.set_section_option("post_write_hooks", "ruff.type", "ruff")
cfg.attributes["base"] = self.get_base()
cfg.attributes["env_prefix"] = self.env_prefix
self.import_models()
return cfg
DATABASES: dict[str, DatabaseConfig] = {
"data_science_dev": DatabaseConfig(
env_prefix="DATA_SCIENCE_DEV",
version_location="alembic/data_science_dev/versions",
base_module="pipelines.orm.data_science_dev.base",
base_class_name="DataScienceDevBase",
models_module="pipelines.orm.data_science_dev.models",
),
}
app = typer.Typer(help="Multi-database alembic wrapper.")
@app.command(
context_settings={"allow_extra_args": True, "ignore_unknown_options": True},
)
def main(
ctx: typer.Context,
db_name: Annotated[
str, typer.Argument(help=f"Database name. Options: {', '.join(DATABASES)}")
],
command: Annotated[
str,
typer.Argument(
help="Alembic command (upgrade, downgrade, revision, check, etc.)"
),
],
) -> None:
"""Run an alembic command against the specified database."""
db_config = DATABASES.get(db_name)
if not db_config:
typer.echo(
f"Unknown database: {db_name!r}. Available: {', '.join(DATABASES)}",
err=True,
)
raise typer.Exit(code=1)
alembic_cfg = db_config.alembic_config()
cmd_line = CommandLine()
options = cmd_line.parser.parse_args([command, *ctx.args])
cmd_line.run_cmd(alembic_cfg, options)
if __name__ == "__main__":
app()
+116
View File
@@ -0,0 +1,116 @@
"""Nornsight — BERTopic POC Inference Script.
Loads the trained model and labels a small batch of posts,
writing results to main.post_topic for inspection.
POC: processes a single batch of 1k posts to validate the pipeline end-to-end.
"""
from __future__ import annotations
import logging
import time
from collections import Counter
from pathlib import Path
from bertopic import BERTopic
from sqlalchemy import Engine, func, insert, select
from sqlalchemy.orm import Session
from pipelines.config import BertTopicInferConfig, get_bertopic_infer_config
from pipelines.orm.common import get_postgres_engine
from pipelines.orm.data_science_dev.posts import PostTopic, Posts
from pipelines.orm.data_science_dev.posts.lang_filters import ENGLISH_LANGS
from pipelines.pipelines.common import configure_logger
logger = logging.getLogger(__name__)
def main() -> None:
"""Run BERTopic inference against a sample of posts."""
configure_logger()
config = get_bertopic_infer_config()
run_inference(config)
logger.info(
"POC inference complete. Check main.post_topic in DBeaver to inspect results."
)
def run_inference(config: BertTopicInferConfig) -> None:
model_save_path = Path(config.model_save_path)
logger.info(f"Loading BERTopic model from {model_save_path}")
topic_model = BERTopic.load(str(model_save_path))
topic_info = topic_model.get_topic_info()
label_map: dict[int, str] = dict(zip(topic_info["Topic"], topic_info["Name"]))
logger.info(f"Model loaded with {len(label_map)} topics")
engine = get_postgres_engine(name="DATA_SCIENCE_DEV")
post_ids, texts = get_post_ids_and_test(engine, config)
logger.info(f"Fetched {len(texts)} posts")
logger.info("Running BERTopic transform")
start = time.perf_counter()
topics, _probabilities = topic_model.transform(texts)
elapsed = time.perf_counter() - start
logger.info(f"Transform complete in {elapsed:.1f}s")
# Write results to main.post_topic
records = [
{
"post_id": pid,
"topic_id": int(topic_id),
"topic_label": label_map.get(int(topic_id), "unknown"),
"model_version": config.model_version,
}
for pid, topic_id in zip(post_ids, topics)
]
with Session(engine) as session:
session.execute(insert(PostTopic), records)
session.commit()
count_topics(records)
logger.info(f"Wrote {len(records)} topic labels to main.post_topic")
def get_post_ids_and_test(
engine: Engine,
config: BertTopicInferConfig,
) -> None | tuple[list[int], list[str]]:
with Session(engine) as session:
logger.info(f"Fetching {config.poc_batch_size} posts for inference")
# Pull a fresh batch for inference — distinct from training sample
# using a fixed seed offset so we're not re-labeling training posts
stmt = select(Posts).where(
Posts.text.is_not(None),
Posts.langs.in_(ENGLISH_LANGS),
func.length(Posts.text) > config.min_text_length,
)
if config.poc_batch_size > 0:
stmt = stmt.limit(config.poc_batch_size)
posts = session.scalars(stmt).all()
if not posts:
logger.warning("No posts were selected for inference")
return [], []
post_ids = [post.post_id for post in posts]
texts = [post.text.strip() for post in posts]
return post_ids, texts
def count_topics(records: list[dict]) -> None:
topic_counts = Counter(record.get("topic_label", "unknown") for record in records)
logger.info("Topic distribution in this batch:")
for label, count in topic_counts.most_common(10):
logger.info(" %s: %d", label, count)
if __name__ == "__main__":
main()
+119
View File
@@ -0,0 +1,119 @@
"""Nornsight — BERTopic POC Training Script.
Pulls a small stratified sample (~11.5k posts) from main.posts,
trains BERTopic with MiniBatchKMeans on Jeeves, and saves the model locally.
POC sample rate: random() < 0.00005 (~0.005% of 230M = ~11.5k posts)
Full training rate will be: random() < 0.005 (~1.08M posts)
"""
from __future__ import annotations
import logging
import time
from pathlib import Path
from bertopic import BERTopic
from sklearn.cluster import MiniBatchKMeans
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from pipelines.config import BertTopicTrainConfig, get_bertopic_train_config
from pipelines.orm.common import get_postgres_engine
from pipelines.orm.data_science_dev.posts import Posts
from pipelines.orm.data_science_dev.posts.lang_filters import ENGLISH_LANGS
from pipelines.pipelines.common import configure_logger
logger = logging.getLogger(__name__)
def main() -> None:
"""Train and persist the BERTopic model."""
configure_logger()
config = get_bertopic_train_config()
docs = load_sample(config)
if not docs:
logger.warning("No training documents were selected")
return
train(docs, config)
logger.info(f"Done. Model saved as version {config.model_version}")
logger.info("Next: run infer.py to label a sample of posts in the database")
def load_sample(config: BertTopicTrainConfig) -> list[str]:
logger.info("Connecting to PostgreSQL via SQLAlchemy")
engine = get_postgres_engine(name="DATA_SCIENCE_DEV")
logger.info(f"Pulling sample from main.posts (sample_rate={config.sample_rate})")
start = time.perf_counter()
with Session(engine) as session:
texts = session.scalars(
select(Posts.text).where(
Posts.text.is_not(None),
Posts.langs.in_(ENGLISH_LANGS),
func.length(Posts.text) > config.min_text_length,
func.random() < config.sample_rate,
)
).all()
elapsed = time.perf_counter() - start
logger.info(f"Fetched {len(texts)} rows in {elapsed:.1f}s")
# Basic cleaning — strip whitespace and deduplicate
docs = list({text.strip() for text in texts})
logger.info(f"After cleaning and dedup: {len(docs)} posts")
return docs
def train(docs: list[str], config: BertTopicTrainConfig) -> None:
logger.info(
f"Initialising BERTopic with MiniBatchKMeans (n_topics={config.n_topics})"
)
cluster_model = MiniBatchKMeans(
n_clusters=config.n_topics,
random_state=42,
batch_size=1024,
n_init=3,
verbose=1,
)
topic_model = BERTopic(
hdbscan_model=cluster_model,
language="english",
calculate_probabilities=False, # saves memory
verbose=True,
)
logger.info(f"Starting fit_transform on {len(docs)} posts (CPU)")
start = time.perf_counter()
topic_model.fit_transform(docs)
elapsed = time.perf_counter() - start
logger.info(f"Training complete in {elapsed:.1f}s ({elapsed / 60:.1f} min)")
# Log topic summary for quick inspection
topic_info = topic_model.get_topic_info()
logger.info(f"Topics found: {len(topic_info)}")
logger.info(f"\n{topic_info.to_string()}")
model_save_path = Path(config.model_save_path)
model_save_path.mkdir(parents=True, exist_ok=True)
logger.info(f"Saving model to {model_save_path}")
topic_model.save(
str(model_save_path),
serialization="safetensors",
save_ctfidf=True,
save_embedding_model=True,
)
logger.info("Model saved")
if __name__ == "__main__":
main()
+72
View File
@@ -0,0 +1,72 @@
"""common."""
from __future__ import annotations
import logging
import sys
from datetime import UTC, datetime
from os import getenv
from subprocess import PIPE, Popen
from apprise import Apprise
logger = logging.getLogger(__name__)
def configure_logger(level: str = "INFO") -> None:
"""Configure the logger.
Args:
level (str, optional): The logging level. Defaults to "INFO".
"""
logging.basicConfig(
level=level,
datefmt="%Y-%m-%dT%H:%M:%S%z",
format="%(asctime)s %(levelname)s %(filename)s:%(lineno)d - %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
def bash_wrapper(command: str) -> tuple[str, int]:
"""Execute a bash command and capture the output.
Args:
command (str): The bash command to be executed.
Returns:
Tuple[str, int]: A tuple containing the output of the command (stdout) as a string,
the error output (stderr) as a string (optional), and the return code as an integer.
"""
# This is a acceptable risk
process = Popen(command.split(), stdout=PIPE, stderr=PIPE)
output, error = process.communicate()
if error:
logger.error(f"{error=}")
return error.decode(), process.returncode
return output.decode(), process.returncode
def signal_alert(body: str, title: str = "") -> None:
"""Send a signal alert.
Args:
body (str): The body of the alert.
title (str, optional): The title of the alert. Defaults to "".
"""
apprise_client = Apprise()
from_phone = getenv("SIGNAL_ALERT_FROM_PHONE")
to_phone = getenv("SIGNAL_ALERT_TO_PHONE")
if not from_phone or not to_phone:
logger.info("SIGNAL_ALERT_FROM_PHONE or SIGNAL_ALERT_TO_PHONE not set")
return
apprise_client.add(f"signal://localhost:8989/{from_phone}/{to_phone}")
apprise_client.notify(title=title, body=body)
def utcnow() -> datetime:
"""Get the current UTC time."""
return datetime.now(tz=UTC)
+57
View File
@@ -1,6 +1,7 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import date
from pathlib import Path
import tomllib
@@ -68,10 +69,50 @@ class BenchmarkConfig:
return cls(**raw)
@dataclass
class BertTopicTrainConfig:
"""BERTopic training configuration loaded from TOML."""
sample_rate: float
min_text_length: int
n_topics: int
model_save_path: str
model_version: str | None = None
@classmethod
def from_toml(cls, config_path: Path) -> BertTopicTrainConfig:
"""Load BERTopic training config from a TOML file."""
raw = tomllib.loads(config_path.read_text())["bertopic"]["train"]
today = date.today().isoformat()
if raw.get("model_version") is None:
raw["model_version"] = (
f"{today}-{raw['sample_rate']}-{raw['min_text_length']}-{raw['n_topics']}"
)
return cls(**raw)
@dataclass
class BertTopicInferConfig:
"""BERTopic inference configuration loaded from TOML."""
min_text_length: int
poc_batch_size: int
model_version: str
model_save_path: str
@classmethod
def from_toml(cls, config_path: Path) -> BertTopicInferConfig:
"""Load BERTopic inference config from a TOML file."""
raw = tomllib.loads(config_path.read_text())["bertopic"]["infer"]
return cls(**raw)
def get_config_dir() -> Path:
"""Get the path to the config file."""
return Path(__file__).resolve().parent.parent.parent / "config"
def default_config_path() -> Path:
"""Get the path to the config file."""
return get_config_dir() / "config.toml"
@@ -87,3 +128,19 @@ def get_benchmark_config(config_path: Path | None = None) -> BenchmarkConfig:
if config_path is None:
config_path = default_config_path()
return BenchmarkConfig.from_toml(config_path)
def get_bertopic_train_config(
config_path: Path | None = None,
) -> BertTopicTrainConfig:
if config_path is None:
config_path = default_config_path()
return BertTopicTrainConfig.from_toml(config_path)
def get_bertopic_infer_config(
config_path: Path | None = None,
) -> BertTopicInferConfig:
if config_path is None:
config_path = default_config_path()
return BertTopicInferConfig.from_toml(config_path)
+14 -3
View File
@@ -17,6 +17,10 @@ NAMING_CONVENTION = {
}
class DatabaseSetupError(RuntimeError):
"""Raised when database configuration is missing or invalid."""
def get_connection_info(name: str) -> tuple[str, str, str, str, str | None]:
"""Get connection info from environment variables."""
database = getenv(f"{name}_DB")
@@ -27,11 +31,18 @@ def get_connection_info(name: str) -> tuple[str, str, str, str, str | None]:
if None in (database, host, port, username):
error = f"Missing environment variables for Postgres connection.\n{database=}\n{host=}\n{port=}\n{username=}\n"
raise ValueError(error)
return cast("tuple[str, str, str, str, str | None]", (database, host, port, username, password))
raise DatabaseSetupError(error)
return cast(
"tuple[str, str, str, str, str | None]",
(database, host, port, username, password),
)
def get_postgres_engine(*, name: str = "POSTGRES", pool_pre_ping: bool = True) -> Engine:
def get_postgres_engine(
*,
name: str = "POSTGRES",
pool_pre_ping: bool = True,
) -> Engine:
"""Create a SQLAlchemy engine from environment variables."""
database, host, port, username, password = get_connection_info(name)
@@ -1,17 +1,86 @@
"""init."""
"""Congress ORM models."""
from pipelines.orm.data_science_dev.congress.bill import Bill, BillText
from pipelines.orm.data_science_dev.congress.bill import (
Bill,
BillAction,
BillActionRecordedVote,
BillRelation,
BillText,
BillTopic,
BillTopicPosition,
)
from pipelines.orm.data_science_dev.congress.amendment import (
Amendment,
AmendmentAction,
AmendmentActionRecordedVote,
)
from pipelines.orm.data_science_dev.congress.context import (
ClassificationMethod,
ConfidenceLevel,
IngestRun,
MeasureFunction,
MeasureSubtype,
ScoreRun,
SourceArtifact,
SubjectType,
TextResolutionMethod,
TextTargetBasis,
TextTargetType,
VoteActionMatch,
VoteActionScope,
VoteClassification,
VoteContextAudit,
VoteEffect,
VoteMeasureLink,
VoteMeasureRole,
VotePositionMeaning,
VoteRelationship,
VoteTextTarget,
)
from pipelines.orm.data_science_dev.congress.legislator import (
Legislator,
LegislatorScore,
LegislatorSocialMedia,
LegislatorScoreFake,
)
from pipelines.orm.data_science_dev.congress.vote import Vote, VoteRecord
__all__ = [
"Amendment",
"AmendmentAction",
"AmendmentActionRecordedVote",
"Bill",
"BillAction",
"BillActionRecordedVote",
"BillRelation",
"BillText",
"BillTopic",
"BillTopicPosition",
"ClassificationMethod",
"ConfidenceLevel",
"IngestRun",
"Legislator",
"LegislatorScore",
"LegislatorScoreFake",
"LegislatorSocialMedia",
"MeasureFunction",
"MeasureSubtype",
"ScoreRun",
"SourceArtifact",
"SubjectType",
"TextResolutionMethod",
"TextTargetBasis",
"TextTargetType",
"Vote",
"VoteActionMatch",
"VoteActionScope",
"VoteClassification",
"VoteContextAudit",
"VoteEffect",
"VoteMeasureLink",
"VoteMeasureRole",
"VotePositionMeaning",
"VoteRelationship",
"VoteRecord",
"VoteTextTarget",
]
@@ -0,0 +1,127 @@
"""Amendment models and official action context."""
from __future__ import annotations
from datetime import date, datetime
from sqlalchemy import DateTime, ForeignKey, Index, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from pipelines.orm.data_science_dev.base import DataScienceDevTableBase
class Amendment(DataScienceDevTableBase):
"""Congressional amendment linked to a bill or to another amendment."""
__tablename__ = "amendment"
__table_args__ = (
UniqueConstraint(
"congress",
"amendment_type",
"number",
name="uq_amendment_congress_type_number",
),
)
congress: Mapped[int]
amendment_type: Mapped[str]
number: Mapped[int]
chamber: Mapped[str]
description: Mapped[str | None]
purpose: Mapped[str | None]
amended_bill_id: Mapped[int | None] = mapped_column(
ForeignKey("main.bill.id", ondelete="SET NULL")
)
amended_amendment_id: Mapped[int | None] = mapped_column(
ForeignKey("main.amendment.id", ondelete="SET NULL")
)
source_path: Mapped[str | None]
source_artifact_id: Mapped[int | None] = mapped_column(
ForeignKey("main.source_artifact.id", ondelete="SET NULL")
)
actions: Mapped[list[AmendmentAction]] = relationship(
"AmendmentAction",
back_populates="amendment",
cascade="all, delete-orphan",
)
amended_amendment: Mapped[Amendment | None] = relationship(
"Amendment",
remote_side="Amendment.id",
)
class AmendmentAction(DataScienceDevTableBase):
"""Official action row for an amendment."""
__tablename__ = "amendment_action"
__table_args__ = (
UniqueConstraint(
"amendment_id",
"sequence",
name="uq_amendment_action_amendment_id_sequence",
),
)
amendment_id: Mapped[int] = mapped_column(
ForeignKey("main.amendment.id", ondelete="CASCADE")
)
sequence: Mapped[int]
action_date: Mapped[date]
action_time: Mapped[str | None]
action_text: Mapped[str]
action_type: Mapped[str | None]
action_code: Mapped[str | None]
source_system_code: Mapped[str | None]
source_system_name: Mapped[str | None]
source_artifact_id: Mapped[int | None] = mapped_column(
ForeignKey("main.source_artifact.id", ondelete="SET NULL")
)
amendment: Mapped[Amendment] = relationship(
"Amendment",
back_populates="actions",
)
recorded_votes: Mapped[list[AmendmentActionRecordedVote]] = relationship(
"AmendmentActionRecordedVote",
back_populates="amendment_action",
cascade="all, delete-orphan",
)
class AmendmentActionRecordedVote(DataScienceDevTableBase):
"""Recorded vote nested under one official amendment action."""
__tablename__ = "amendment_action_recorded_vote"
__table_args__ = (
UniqueConstraint(
"amendment_action_id",
"congress",
"chamber",
"session_number",
"roll_number",
name="uq_amendment_action_recorded_vote_match_key",
),
Index(
"ix_amendment_action_recorded_vote_match_tuple",
"congress",
"chamber",
"session_number",
"roll_number",
),
)
amendment_action_id: Mapped[int] = mapped_column(
ForeignKey("main.amendment_action.id", ondelete="CASCADE")
)
congress: Mapped[int]
chamber: Mapped[str]
session_number: Mapped[int]
roll_number: Mapped[int]
vote_datetime: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
vote_url: Mapped[str | None]
amendment_action: Mapped[AmendmentAction] = relationship(
"AmendmentAction",
back_populates="recorded_votes",
)
+195 -23
View File
@@ -1,23 +1,48 @@
"""Bill model - legislation introduced in Congress."""
"""Bill models for legislation, official actions, text versions, and topic tags."""
from __future__ import annotations
from datetime import date
from datetime import date, datetime
from enum import StrEnum
from typing import TYPE_CHECKING
from sqlalchemy import ForeignKey, Index, UniqueConstraint
from sqlalchemy import DateTime, Enum, ForeignKey, Index, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from pipelines.orm.data_science_dev.base import DataScienceDevTableBase
if TYPE_CHECKING:
from pipelines.orm.data_science_dev.congress.vote import Vote
from pipelines.orm.data_science_dev.congress.context import VoteMeasureLink
class BillTopicPosition(StrEnum):
"""Whether a yes vote on a bill is for or against a topic."""
FOR = "for"
AGAINST = "against"
def _enum_column(enum_cls: type[StrEnum], *, name: str) -> Enum:
"""Build a portable SQLAlchemy enum column for StrEnum values."""
return Enum(
enum_cls,
values_callable=lambda enum_type: [member.value for member in enum_type],
native_enum=False,
name=name,
)
class Bill(DataScienceDevTableBase):
"""Legislation with congress number, type, titles, status, and sponsor."""
__tablename__ = "bill"
__table_args__ = (
UniqueConstraint(
"congress", "bill_type", "number", name="uq_bill_congress_type_number"
),
Index("ix_bill_congress", "congress"),
)
congress: Mapped[int]
bill_type: Mapped[str]
@@ -33,22 +58,39 @@ class Bill(DataScienceDevTableBase):
sponsor_bioguide_id: Mapped[str | None]
subjects_top_term: Mapped[str | None]
score_processed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
votes: Mapped[list[Vote]] = relationship(
"Vote",
back_populates="bill",
)
bill_texts: Mapped[list[BillText]] = relationship(
"BillText",
back_populates="bill",
cascade="all, delete-orphan",
)
__table_args__ = (
UniqueConstraint(
"congress", "bill_type", "number", name="uq_bill_congress_type_number"
),
Index("ix_bill_congress", "congress"),
topics: Mapped[list[BillTopic]] = relationship(
"BillTopic",
back_populates="bill",
cascade="all, delete-orphan",
)
bill_actions: Mapped[list[BillAction]] = relationship(
"BillAction",
back_populates="bill",
cascade="all, delete-orphan",
)
outgoing_bill_relations: Mapped[list[BillRelation]] = relationship(
"BillRelation",
foreign_keys="BillRelation.bill_id",
back_populates="bill",
cascade="all, delete-orphan",
)
incoming_bill_relations: Mapped[list[BillRelation]] = relationship(
"BillRelation",
foreign_keys="BillRelation.related_bill_id",
back_populates="related_bill",
cascade="all, delete-orphan",
)
vote_measure_links: Mapped[list[VoteMeasureLink]] = relationship(
"VoteMeasureLink",
back_populates="measure",
cascade="all, delete-orphan",
)
@@ -56,17 +98,147 @@ class BillText(DataScienceDevTableBase):
"""Stores different text versions of a bill (introduced, enrolled, etc.)."""
__tablename__ = "bill_text"
bill_id: Mapped[int] = mapped_column(ForeignKey("main.bill.id", ondelete="CASCADE"))
version_code: Mapped[str]
version_name: Mapped[str | None]
text_content: Mapped[str | None]
date: Mapped[date | None]
bill: Mapped[Bill] = relationship("Bill", back_populates="bill_texts")
__table_args__ = (
UniqueConstraint(
"bill_id", "version_code", name="uq_bill_text_bill_id_version_code"
),
)
bill_id: Mapped[int] = mapped_column(ForeignKey("main.bill.id", ondelete="CASCADE"))
version_code: Mapped[str]
version_name: Mapped[str | None]
text_content: Mapped[str | None]
summary: Mapped[str | None]
summarization_model: Mapped[str | None]
summarization_user_prompt_version: Mapped[str | None]
summarization_system_prompt_version: Mapped[str | None]
date: Mapped[date | None]
source_datetime_raw: Mapped[str | None]
text_url_xml: Mapped[str | None]
text_url_pdf: Mapped[str | None]
text_url_html: Mapped[str | None]
source_artifact_id: Mapped[int | None] = mapped_column(
ForeignKey("main.source_artifact.id", ondelete="SET NULL")
)
bill: Mapped[Bill] = relationship("Bill", back_populates="bill_texts")
class BillAction(DataScienceDevTableBase):
"""Official action row from Bill Status XML."""
__tablename__ = "bill_action"
__table_args__ = (
UniqueConstraint("bill_id", "sequence", name="uq_bill_action_bill_id_sequence"),
)
bill_id: Mapped[int] = mapped_column(ForeignKey("main.bill.id", ondelete="CASCADE"))
sequence: Mapped[int]
action_date: Mapped[date]
action_time: Mapped[str | None]
action_text: Mapped[str]
action_type: Mapped[str | None]
action_code: Mapped[str | None]
source_system_code: Mapped[str | None]
source_system_name: Mapped[str | None]
source_artifact_id: Mapped[int | None] = mapped_column(
ForeignKey("main.source_artifact.id", ondelete="SET NULL")
)
bill: Mapped[Bill] = relationship("Bill", back_populates="bill_actions")
recorded_votes: Mapped[list[BillActionRecordedVote]] = relationship(
"BillActionRecordedVote",
back_populates="bill_action",
cascade="all, delete-orphan",
)
class BillActionRecordedVote(DataScienceDevTableBase):
"""Recorded vote nested under one official bill action."""
__tablename__ = "bill_action_recorded_vote"
__table_args__ = (
UniqueConstraint(
"bill_action_id",
"congress",
"chamber",
"session_number",
"roll_number",
name="uq_bill_action_recorded_vote_match_key",
),
Index(
"ix_bill_action_recorded_vote_match_tuple",
"congress",
"chamber",
"session_number",
"roll_number",
),
)
bill_action_id: Mapped[int] = mapped_column(
ForeignKey("main.bill_action.id", ondelete="CASCADE")
)
congress: Mapped[int]
chamber: Mapped[str]
session_number: Mapped[int]
roll_number: Mapped[int]
vote_datetime: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
vote_url: Mapped[str | None]
bill_action: Mapped[BillAction] = relationship(
"BillAction",
back_populates="recorded_votes",
)
class BillRelation(DataScienceDevTableBase):
"""Relationship between one bill/resolution and another."""
__tablename__ = "bill_relation"
__table_args__ = (
Index("ix_bill_relation_bill_id", "bill_id"),
Index("ix_bill_relation_related_bill_id", "related_bill_id"),
)
bill_id: Mapped[int] = mapped_column(ForeignKey("main.bill.id", ondelete="CASCADE"))
related_bill_id: Mapped[int] = mapped_column(
ForeignKey("main.bill.id", ondelete="CASCADE")
)
relationship_type: Mapped[str]
identified_by: Mapped[str | None]
latest_action_date: Mapped[date | None]
latest_action_text: Mapped[str | None]
bill: Mapped[Bill] = relationship(
"Bill",
foreign_keys=[bill_id],
back_populates="outgoing_bill_relations",
)
related_bill: Mapped[Bill] = relationship(
"Bill",
foreign_keys=[related_bill_id],
back_populates="incoming_bill_relations",
)
class BillTopic(DataScienceDevTableBase):
"""One bill stance on one topic used to score roll-call votes."""
__tablename__ = "bill_topic"
__table_args__ = (
UniqueConstraint(
"bill_id",
"topic",
"support_position",
name="uq_bill_topic_bill_id_topic_support_position",
),
Index("ix_bill_topic_topic", "topic"),
)
bill_id: Mapped[int] = mapped_column(ForeignKey("main.bill.id", ondelete="CASCADE"))
topic: Mapped[str]
support_position: Mapped[BillTopicPosition] = mapped_column(
_enum_column(BillTopicPosition, name="bill_topic_position")
)
bill: Mapped[Bill] = relationship("Bill", back_populates="topics")
@@ -0,0 +1,462 @@
"""Canonical vote context, artifact tracking, and run metadata models."""
from __future__ import annotations
from datetime import datetime
from enum import StrEnum
from typing import TYPE_CHECKING
from sqlalchemy import DateTime, Enum, ForeignKey, Index, func, text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from pipelines.orm.data_science_dev.base import DataScienceDevTableBase
if TYPE_CHECKING:
from pipelines.orm.data_science_dev.congress.amendment import Amendment, AmendmentAction
from pipelines.orm.data_science_dev.congress.bill import Bill, BillAction, BillText
from pipelines.orm.data_science_dev.congress.legislator import LegislatorScore
from pipelines.orm.data_science_dev.congress.vote import Vote
def _enum_column(enum_cls: type[StrEnum], *, name: str) -> Enum:
"""Build a portable SQLAlchemy enum column for StrEnum values."""
return Enum(
enum_cls,
values_callable=lambda enum_type: [member.value for member in enum_type],
native_enum=False,
name=name,
)
class ConfidenceLevel(StrEnum):
"""Low/medium/high confidence buckets."""
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
class VoteActionScope(StrEnum):
"""Whether a matched action came from bill or amendment context."""
BILL = "bill"
AMENDMENT = "amendment"
class SubjectType(StrEnum):
"""The direct legal/procedural subject of the vote."""
MEASURE = "measure"
AMENDMENT = "amendment"
NOMINATION = "nomination"
TREATY = "treaty"
QUORUM = "quorum"
CHAMBER_ADMIN = "chamber_admin"
UNKNOWN = "unknown"
class MeasureSubtype(StrEnum):
"""Formal congressional measure subtype."""
BILL = "bill"
JOINT_RESOLUTION = "joint_resolution"
CONCURRENT_RESOLUTION = "concurrent_resolution"
SIMPLE_RESOLUTION = "simple_resolution"
class MeasureFunction(StrEnum):
"""Semantic function of a measure beyond its formal subtype."""
SUBSTANTIVE_MEASURE = "substantive_measure"
SPECIAL_RULE = "special_rule"
BUDGET_RESOLUTION = "budget_resolution"
CHAMBER_INTERNAL = "chamber_internal"
COMMEMORATIVE_OR_SENSE_OF = "commemorative_or_sense_of"
UNKNOWN = "unknown"
class VoteRelationship(StrEnum):
"""The vote's relationship to the direct subject and its text."""
DIRECT_TEXT_VOTE = "direct_text_vote"
AMENDMENT_TEXT_VOTE = "amendment_text_vote"
PROCEDURAL_RELATED_TO_MEASURE = "procedural_related_to_measure"
PROCEDURAL_RELATED_TO_AMENDMENT = "procedural_related_to_amendment"
NON_LEGISLATIVE = "non_legislative"
UNKNOWN = "unknown"
class ClassificationMethod(StrEnum):
"""How the final classification was derived."""
RECORDED_VOTE_ACTION_EXACT = "recorded_vote_action_exact"
RECORDED_VOTE_ACTION_DUPLICATE_SOURCE_DEDUPED = (
"recorded_vote_action_duplicate_source_deduped"
)
VOTE_XML_ONLY = "vote_xml_only"
QUESTION_TEXT_ONLY = "question_text_only"
MANUAL_REVIEW = "manual_review"
class VoteMeasureRole(StrEnum):
"""How one measure relates to one classified vote."""
VOTED_ON = "voted_on"
RULE_FOR = "rule_for"
UNDERLYING_BILL = "underlying_bill"
PROCEDURAL_TARGET = "procedural_target"
AMENDS = "amends"
AMENDED_BY = "amended_by"
CONFERENCE_REPORT_FOR = "conference_report_for"
RELATED_ONLY = "related_only"
class TextTargetType(StrEnum):
"""Which kind of legislative text was the object of a vote."""
BILL_TEXT = "bill_text"
RESOLUTION_TEXT = "resolution_text"
AMENDMENT_TEXT = "amendment_text"
NONE = "none"
UNKNOWN = "unknown"
class TextTargetBasis(StrEnum):
"""How the text target should be interpreted."""
EXACT_ACTION_TEXT_VERSION = "exact_action_text_version"
RESULTING_ENGROSSED_VERSION = "resulting_engrossed_version"
RECEIVED_PRIOR_CHAMBER_VERSION = "received_prior_chamber_version"
AMENDMENT_TEXT = "amendment_text"
RULE_RESOLUTION_TEXT = "rule_resolution_text"
NO_TEXT_TARGET = "no_text_target"
UNKNOWN = "unknown"
class TextResolutionMethod(StrEnum):
"""How the official text target was resolved."""
TEXT_EXACT_ACTION_DATE_AND_CODE = "text_exact_action_date_and_code"
TEXT_EXACT_ACTION_DATE_WRONG_CODE = "text_exact_action_date_wrong_code"
TEXT_PRIOR_VERSION_CODE_MATCH = "text_prior_version_code_match"
TEXT_RECEIVED_PRIOR_CHAMBER_VERSION = "text_received_prior_chamber_version"
TEXT_RESULTING_ENROLLED_ONLY = "text_resulting_enrolled_only"
AMENDMENT_TEXT_UNMODELED_PHASE1 = "amendment_text_unmodeled_phase1"
NO_TEXT_TARGET = "no_text_target"
UNKNOWN = "unknown"
class VoteEffect(StrEnum):
"""Meaning of one member position relative to the target text/procedure."""
SUPPORTS_TEXT = "supports_text"
OPPOSES_TEXT = "opposes_text"
ADVANCES_PROCEDURE = "advances_procedure"
BLOCKS_PROCEDURE = "blocks_procedure"
UNKNOWN = "unknown"
class IngestRun(DataScienceDevTableBase):
"""One full ingestion or context rebuild run."""
__tablename__ = "ingest_run"
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
git_sha: Mapped[str | None]
classifier_version: Mapped[str | None]
source_snapshot_label: Mapped[str | None]
status: Mapped[str]
source_artifacts: Mapped[list[SourceArtifact]] = relationship(
"SourceArtifact",
back_populates="ingest_run",
cascade="all, delete-orphan",
)
score_runs: Mapped[list[ScoreRun]] = relationship(
"ScoreRun",
back_populates="ingest_run",
cascade="all, delete-orphan",
)
class SourceArtifact(DataScienceDevTableBase):
"""Local artifact manifest entry for reproducibility."""
__tablename__ = "source_artifact"
__table_args__ = (
Index("ix_source_artifact_source_kind", "source_kind"),
Index("ix_source_artifact_congress", "congress"),
Index(
"uq_source_artifact_ingest_identity",
"ingest_run_id",
"local_path",
"sha256",
unique=True,
),
)
source_kind: Mapped[str]
congress: Mapped[int]
chamber: Mapped[str | None]
local_path: Mapped[str]
source_url: Mapped[str | None]
sha256: Mapped[str]
byte_size: Mapped[int]
modified_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
ingested_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
ingest_run_id: Mapped[int | None] = mapped_column(
ForeignKey("main.ingest_run.id", ondelete="SET NULL")
)
ingest_run: Mapped[IngestRun | None] = relationship(
"IngestRun",
back_populates="source_artifacts",
)
class ScoreRun(DataScienceDevTableBase):
"""One full score recomputation tied to one ingest snapshot."""
__tablename__ = "score_run"
ingest_run_id: Mapped[int | None] = mapped_column(
ForeignKey("main.ingest_run.id", ondelete="SET NULL")
)
classifier_version: Mapped[str | None]
scoring_version: Mapped[str | None]
included_vote_count: Mapped[int]
excluded_vote_count: Mapped[int]
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
ingest_run: Mapped[IngestRun | None] = relationship(
"IngestRun",
back_populates="score_runs",
)
scores: Mapped[list[LegislatorScore]] = relationship(
"LegislatorScore",
back_populates="score_run",
cascade="all, delete-orphan",
)
class VoteActionMatch(DataScienceDevTableBase):
"""A candidate or selected official action match for one raw vote."""
__tablename__ = "vote_action_match"
__table_args__ = (
Index("ix_vote_action_match_vote_id", "vote_id"),
Index(
"uq_vote_action_match_selected_vote_id",
"vote_id",
unique=True,
postgresql_where=text("is_selected"),
),
)
vote_id: Mapped[int] = mapped_column(ForeignKey("main.vote.id", ondelete="CASCADE"))
action_scope: Mapped[VoteActionScope] = mapped_column(
_enum_column(VoteActionScope, name="vote_action_scope")
)
bill_action_id: Mapped[int | None] = mapped_column(
ForeignKey("main.bill_action.id", ondelete="CASCADE")
)
amendment_action_id: Mapped[int | None] = mapped_column(
ForeignKey("main.amendment_action.id", ondelete="CASCADE")
)
is_selected: Mapped[bool]
match_method: Mapped[str]
match_reason: Mapped[str | None]
match_confidence: Mapped[ConfidenceLevel] = mapped_column(
_enum_column(ConfidenceLevel, name="vote_action_match_confidence")
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
)
vote: Mapped[Vote] = relationship("Vote", back_populates="action_matches")
bill_action: Mapped[BillAction | None] = relationship("BillAction")
amendment_action: Mapped[AmendmentAction | None] = relationship("AmendmentAction")
class VoteClassification(DataScienceDevTableBase):
"""Normalized classification for what a vote was legally/procedurally on."""
__tablename__ = "vote_classification"
__table_args__ = (
Index("ix_vote_classification_subject_type", "subject_type"),
Index(
"ix_vote_classification_eligible_vote_id",
"vote_id",
postgresql_where=text(
"subject_type = 'measure' "
"AND vote_relationship = 'direct_text_vote' "
"AND is_direct_vote_on_legislative_text "
"AND is_substantive_policy_vote "
"AND NOT is_special_rule"
),
),
)
vote_id: Mapped[int] = mapped_column(
ForeignKey("main.vote.id", ondelete="CASCADE"),
unique=True,
)
subject_type: Mapped[SubjectType] = mapped_column(
_enum_column(SubjectType, name="vote_subject_type")
)
measure_type: Mapped[str | None]
measure_subtype: Mapped[MeasureSubtype | None] = mapped_column(
_enum_column(MeasureSubtype, name="vote_measure_subtype")
)
measure_function: Mapped[MeasureFunction | None] = mapped_column(
_enum_column(MeasureFunction, name="vote_measure_function")
)
vote_relationship: Mapped[VoteRelationship] = mapped_column(
_enum_column(VoteRelationship, name="vote_relationship")
)
is_legislation_related: Mapped[bool]
is_direct_vote_on_legislative_text: Mapped[bool]
is_substantive_policy_vote: Mapped[bool]
is_lawmaking_vehicle: Mapped[bool]
is_special_rule: Mapped[bool]
classification_method: Mapped[ClassificationMethod] = mapped_column(
_enum_column(ClassificationMethod, name="vote_classification_method")
)
classification_confidence_reason: Mapped[str | None]
confidence: Mapped[ConfidenceLevel] = mapped_column(
_enum_column(ConfidenceLevel, name="vote_classification_confidence")
)
classified_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
classification_version: Mapped[str]
vote: Mapped[Vote] = relationship("Vote", back_populates="classification")
class VoteMeasureLink(DataScienceDevTableBase):
"""Relationship between a classified vote and one bill/resolution measure."""
__tablename__ = "vote_measure_link"
__table_args__ = (
Index("ix_vote_measure_link_vote_id", "vote_id"),
Index("ix_vote_measure_link_vote_id_role", "vote_id", "role"),
Index("ix_vote_measure_link_measure_id_role", "measure_id", "role"),
)
vote_id: Mapped[int] = mapped_column(ForeignKey("main.vote.id", ondelete="CASCADE"))
measure_id: Mapped[int] = mapped_column(ForeignKey("main.bill.id", ondelete="CASCADE"))
role: Mapped[VoteMeasureRole] = mapped_column(
_enum_column(VoteMeasureRole, name="vote_measure_role")
)
source: Mapped[str]
confidence: Mapped[ConfidenceLevel] = mapped_column(
_enum_column(ConfidenceLevel, name="vote_measure_link_confidence")
)
notes: Mapped[str | None]
vote: Mapped[Vote] = relationship("Vote", back_populates="vote_measure_links")
measure: Mapped[Bill] = relationship("Bill", back_populates="vote_measure_links")
class VoteTextTarget(DataScienceDevTableBase):
"""Official text target, if any, resolved for one classified vote."""
__tablename__ = "vote_text_target"
__table_args__ = (
Index(
"ix_vote_text_target_voted_text_version_id",
"voted_text_version_id",
postgresql_where=text("voted_text_version_id IS NOT NULL"),
),
)
vote_id: Mapped[int] = mapped_column(
ForeignKey("main.vote.id", ondelete="CASCADE"),
unique=True,
)
text_target_type: Mapped[TextTargetType] = mapped_column(
_enum_column(TextTargetType, name="vote_text_target_type")
)
voted_text_version_id: Mapped[int | None] = mapped_column(
ForeignKey("main.bill_text.id", ondelete="SET NULL")
)
resulting_text_version_id: Mapped[int | None] = mapped_column(
ForeignKey("main.bill_text.id", ondelete="SET NULL")
)
related_amendment_id: Mapped[int | None] = mapped_column(
ForeignKey("main.amendment.id", ondelete="SET NULL")
)
text_target_basis: Mapped[TextTargetBasis] = mapped_column(
_enum_column(TextTargetBasis, name="vote_text_target_basis")
)
text_resolution_method: Mapped[TextResolutionMethod] = mapped_column(
_enum_column(TextResolutionMethod, name="vote_text_resolution_method")
)
text_resolution_confidence_reason: Mapped[str | None]
confidence: Mapped[ConfidenceLevel] = mapped_column(
_enum_column(ConfidenceLevel, name="vote_text_target_confidence")
)
notes: Mapped[str | None]
vote: Mapped[Vote] = relationship("Vote", back_populates="text_target")
voted_text_version: Mapped[BillText | None] = relationship(
"BillText",
foreign_keys=[voted_text_version_id],
)
resulting_text_version: Mapped[BillText | None] = relationship(
"BillText",
foreign_keys=[resulting_text_version_id],
)
related_amendment: Mapped[Amendment | None] = relationship("Amendment")
class VotePositionMeaning(DataScienceDevTableBase):
"""Meaning of Yea/Nay/Present positions for one classified vote."""
__tablename__ = "vote_position_meaning"
vote_id: Mapped[int] = mapped_column(
ForeignKey("main.vote.id", ondelete="CASCADE"),
unique=True,
)
yea_effect: Mapped[VoteEffect] = mapped_column(
_enum_column(VoteEffect, name="vote_yea_effect")
)
nay_effect: Mapped[VoteEffect] = mapped_column(
_enum_column(VoteEffect, name="vote_nay_effect")
)
present_effect: Mapped[VoteEffect] = mapped_column(
_enum_column(VoteEffect, name="vote_present_effect")
)
polarity_confidence: Mapped[ConfidenceLevel] = mapped_column(
_enum_column(ConfidenceLevel, name="vote_polarity_confidence")
)
polarity_method: Mapped[str]
notes: Mapped[str | None]
vote: Mapped[Vote] = relationship("Vote", back_populates="position_meaning")
class VoteContextAudit(DataScienceDevTableBase):
"""Audit/event row for ambiguous or noteworthy vote-context decisions."""
__tablename__ = "vote_context_audit"
__table_args__ = (
Index("ix_vote_context_audit_vote_id", "vote_id"),
Index("ix_vote_context_audit_severity_vote_id", "severity", "vote_id"),
)
vote_id: Mapped[int] = mapped_column(ForeignKey("main.vote.id", ondelete="CASCADE"))
step: Mapped[str]
message: Mapped[str]
severity: Mapped[str]
source_path: Mapped[str | None]
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
)
vote: Mapped[Vote] = relationship("Vote", back_populates="context_audit_rows")
@@ -5,12 +5,13 @@ from __future__ import annotations
from datetime import date
from typing import TYPE_CHECKING
from sqlalchemy import ForeignKey, Text
from sqlalchemy import ForeignKey, Index, Text, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from pipelines.orm.data_science_dev.base import DataScienceDevTableBase
if TYPE_CHECKING:
from pipelines.orm.data_science_dev.congress.context import ScoreRun
from pipelines.orm.data_science_dev.congress.vote import VoteRecord
@@ -18,6 +19,7 @@ class Legislator(DataScienceDevTableBase):
"""Members of Congress with identification and current term info."""
__tablename__ = "legislator"
__table_args__ = (Index("ix_legislator_current_chamber", "current_chamber"),)
bioguide_id: Mapped[str] = mapped_column(Text, unique=True, index=True)
@@ -50,6 +52,11 @@ class Legislator(DataScienceDevTableBase):
back_populates="legislator",
cascade="all, delete-orphan",
)
scores: Mapped[list[LegislatorScore]] = relationship(
"LegislatorScore",
back_populates="legislator",
cascade="all, delete-orphan",
)
class LegislatorSocialMedia(DataScienceDevTableBase):
@@ -66,3 +73,59 @@ class LegislatorSocialMedia(DataScienceDevTableBase):
legislator: Mapped[Legislator] = relationship(
back_populates="social_media_accounts"
)
class LegislatorScore(DataScienceDevTableBase):
"""Computed topic score for a legislator in one calendar year."""
__tablename__ = "legislator_score"
__table_args__ = (
UniqueConstraint(
"legislator_id",
"year",
"topic",
name="uq_legislator_score_legislator_id_year_topic",
),
Index("ix_legislator_score_year_topic", "year", "topic"),
)
legislator_id: Mapped[int] = mapped_column(
ForeignKey("main.legislator.id", ondelete="CASCADE"),
index=True,
)
score_run_id: Mapped[int | None] = mapped_column(
ForeignKey("main.score_run.id", ondelete="CASCADE"),
index=True,
)
year: Mapped[int]
topic: Mapped[str]
score: Mapped[float]
legislator: Mapped[Legislator] = relationship(back_populates="scores")
score_run: Mapped[ScoreRun | None] = relationship(
"ScoreRun",
back_populates="scores",
)
class LegislatorScoreFake(DataScienceDevTableBase):
"""Computed topic score for a legislator in one calendar year."""
__tablename__ = "legislator_score_fake"
__table_args__ = (
UniqueConstraint(
"legislator_id",
"year",
"topic",
name="uq_legislator_score_fake_legislator_id_year_topic",
),
Index("ix_legislator_score_fake_year_topic", "year", "topic"),
)
legislator_id: Mapped[int] = mapped_column(
ForeignKey("main.legislator.id", ondelete="CASCADE"),
index=True,
)
year: Mapped[int]
topic: Mapped[str]
score: Mapped[float]
+68 -21
View File
@@ -1,11 +1,12 @@
"""Vote model - roll call votes in Congress."""
"""Vote models for raw roll-call data and member positions."""
from __future__ import annotations
from datetime import date
from datetime import date, datetime
from typing import TYPE_CHECKING
from sqlalchemy import ForeignKey, Index, UniqueConstraint
from sqlalchemy import DateTime, ForeignKey, Index, UniqueConstraint
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column, relationship
from pipelines.orm.data_science_dev.base import (
@@ -14,9 +15,15 @@ from pipelines.orm.data_science_dev.base import (
)
if TYPE_CHECKING:
from pipelines.orm.data_science_dev.congress.bill import Bill
from pipelines.orm.data_science_dev.congress.context import (
VoteActionMatch,
VoteClassification,
VoteContextAudit,
VoteMeasureLink,
VotePositionMeaning,
VoteTextTarget,
)
from pipelines.orm.data_science_dev.congress.legislator import Legislator
from pipelines.orm.data_science_dev.congress.vote import Vote
class VoteRecord(DataScienceDevBase):
@@ -41,14 +48,26 @@ class VoteRecord(DataScienceDevBase):
class Vote(DataScienceDevTableBase):
"""Roll call votes with counts and optional bill linkage."""
"""Raw roll call vote facts from House or Senate vote sources."""
__tablename__ = "vote"
__table_args__ = (
UniqueConstraint(
"congress",
"chamber",
"session_number",
"roll_number",
name="uq_vote_congress_chamber_session_number_roll_number",
),
Index("ix_vote_date", "vote_date"),
Index("ix_vote_congress_chamber", "congress", "chamber"),
)
congress: Mapped[int]
chamber: Mapped[str]
session: Mapped[int]
number: Mapped[int]
session_year: Mapped[int]
session_number: Mapped[int]
roll_number: Mapped[int]
vote_type: Mapped[str | None]
question: Mapped[str | None]
@@ -56,29 +75,57 @@ class Vote(DataScienceDevTableBase):
result_text: Mapped[str | None]
vote_date: Mapped[date]
vote_datetime: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
raw_vote_source_url: Mapped[str | None]
yea_count: Mapped[int | None]
nay_count: Mapped[int | None]
not_voting_count: Mapped[int | None]
present_count: Mapped[int | None]
bill_id: Mapped[int | None] = mapped_column(ForeignKey("main.bill.id"))
raw_bill_ref: Mapped[dict | None] = mapped_column(JSONB)
raw_amendment_ref: Mapped[dict | None] = mapped_column(JSONB)
raw_nomination_ref: Mapped[dict | None] = mapped_column(JSONB)
raw_treaty_ref: Mapped[dict | None] = mapped_column(JSONB)
raw_vote_source_artifact_id: Mapped[int | None] = mapped_column(
ForeignKey("main.source_artifact.id", ondelete="SET NULL")
)
bill: Mapped[Bill | None] = relationship("Bill", back_populates="votes")
vote_records: Mapped[list[VoteRecord]] = relationship(
"VoteRecord",
back_populates="vote",
cascade="all, delete-orphan",
)
__table_args__ = (
UniqueConstraint(
"congress",
"chamber",
"session",
"number",
name="uq_vote_congress_chamber_session_number",
),
Index("ix_vote_date", "vote_date"),
Index("ix_vote_congress_chamber", "congress", "chamber"),
action_matches: Mapped[list[VoteActionMatch]] = relationship(
"VoteActionMatch",
back_populates="vote",
cascade="all, delete-orphan",
)
classification: Mapped[VoteClassification | None] = relationship(
"VoteClassification",
back_populates="vote",
cascade="all, delete-orphan",
uselist=False,
)
vote_measure_links: Mapped[list[VoteMeasureLink]] = relationship(
"VoteMeasureLink",
back_populates="vote",
cascade="all, delete-orphan",
)
text_target: Mapped[VoteTextTarget | None] = relationship(
"VoteTextTarget",
back_populates="vote",
cascade="all, delete-orphan",
uselist=False,
)
position_meaning: Mapped[VotePositionMeaning | None] = relationship(
"VotePositionMeaning",
back_populates="vote",
cascade="all, delete-orphan",
uselist=False,
)
context_audit_rows: Mapped[list[VoteContextAudit]] = relationship(
"VoteContextAudit",
back_populates="vote",
cascade="all, delete-orphan",
)
+67 -1
View File
@@ -2,15 +2,81 @@
from __future__ import annotations
from pipelines.orm.data_science_dev.congress import Bill, BillText, Legislator, Vote, VoteRecord
from pipelines.orm.data_science_dev.congress import (
Amendment,
AmendmentAction,
AmendmentActionRecordedVote,
Bill,
BillAction,
BillActionRecordedVote,
BillRelation,
BillText,
BillTopic,
BillTopicPosition,
ClassificationMethod,
ConfidenceLevel,
IngestRun,
Legislator,
LegislatorScore,
MeasureFunction,
MeasureSubtype,
ScoreRun,
SourceArtifact,
SubjectType,
TextResolutionMethod,
TextTargetBasis,
TextTargetType,
Vote,
VoteActionMatch,
VoteActionScope,
VoteClassification,
VoteContextAudit,
VoteEffect,
VoteMeasureLink,
VoteMeasureRole,
VotePositionMeaning,
VoteRelationship,
VoteRecord,
VoteTextTarget,
)
from pipelines.orm.data_science_dev.posts import partitions # noqa: F401 — registers partition classes in metadata
from pipelines.orm.data_science_dev.posts.tables import Posts
__all__ = [
"Amendment",
"AmendmentAction",
"AmendmentActionRecordedVote",
"Bill",
"BillAction",
"BillActionRecordedVote",
"BillRelation",
"BillText",
"BillTopic",
"BillTopicPosition",
"ClassificationMethod",
"ConfidenceLevel",
"IngestRun",
"Legislator",
"LegislatorScore",
"MeasureFunction",
"MeasureSubtype",
"Posts",
"ScoreRun",
"SourceArtifact",
"SubjectType",
"TextResolutionMethod",
"TextTargetBasis",
"TextTargetType",
"Vote",
"VoteActionMatch",
"VoteActionScope",
"VoteClassification",
"VoteContextAudit",
"VoteEffect",
"VoteMeasureLink",
"VoteMeasureRole",
"VotePositionMeaning",
"VoteRelationship",
"VoteRecord",
"VoteTextTarget",
]
@@ -3,9 +3,10 @@
from __future__ import annotations
from pipelines.orm.data_science_dev.posts.failed_ingestion import FailedIngestion
from pipelines.orm.data_science_dev.posts.tables import Posts
from pipelines.orm.data_science_dev.posts.tables import Posts, PostTopic
__all__ = [
"FailedIngestion",
"Posts",
"PostTopic",
]
@@ -0,0 +1,195 @@
"""Shared language filter constants for post sampling queries."""
from __future__ import annotations
ENGLISH_LANGS = (
'["", "", ""]',
'[""]',
"[]",
'["", "eng"]',
'["eng", "", ""]',
'["eng", ""]',
'["eng"]',
'["eng", "aar"]',
'["eng", "abk", "afr"]',
'["eng", "afr"]',
'["eng", "afr", "abk"]',
'["eng", "afr", "anp"]',
'["eng", "afr", "ber"]',
'["eng", "afr", "dan"]',
'["eng", "afr", "deu"]',
'["eng", "afr", "est"]',
'["eng", "afr", "fra"]',
'["eng", "afr", "ind"]',
'["eng", "afr", "lat"]',
'["eng", "afr", "nld"]',
'["eng", "afr", "nor"]',
'["eng", "afr", "pol"]',
'["eng", "afr", "por"]',
'["eng", "afr", "ron"]',
'["eng", "afr", "slk"]',
'["eng", "afr", "spa"]',
'["eng", "afr", "tgl"]',
'["eng", "afr", "tuk"]',
'["eng", "afr", "tur"]',
'["eng", "afr", "ukr"]',
'["eng", "afr", "vol"]',
'["eng", "agq"]',
'["eng", "ain"]',
'["eng", "ain", "amh"]',
'["eng", "ain", "jpn"]',
'["eng", "aka"]',
'["eng", "amh"]',
'["eng", "amh", "afr"]',
'["eng", "amh", "ara"]',
'["eng", "amh", "fra"]',
'["eng", "anp"]',
'["eng", "anp", "hye"]',
'["eng", "anp", "sqi"]',
'["eng", "", "ara"]',
'["eng", "ara", ""]',
'["eng", "ara"]',
'["eng", "ara", "afr"]',
'["eng", "ara", "anp"]',
'["eng", "ara", "ars"]',
'["eng", "ara", "bul"]',
'["eng", "ara", "cat"]',
'["eng", "ara", "deu"]',
'["eng", "ara", "ell"]',
'["eng", "ara", "fas"]',
'["eng", "ara", "fra"]',
'["eng", "ara", "heb"]',
'["eng", "ara", "hin"]',
'["eng", "ara", "ind"]',
'["eng", "ara", "ita"]',
'["eng", "ara", "jpn"]',
'["eng", "ara", "kas"]',
'["eng", "ara", "kor"]',
'["eng", "ara", "nob"]',
'["eng", "ara", "nor"]',
'["eng", "ara", "rus"]',
'["eng", "ara", "spa"]',
'["eng", "ara", "swe"]',
'["eng", "ara", "tam"]',
'["eng", "ara", "tur"]',
'["eng", "ara", "urd"]',
'["eng", "ara", "zho"]',
'["eng", "arg"]',
'["eng", "arg", "amh"]',
'["eng", "arg", "aze"]',
'["eng", "ars"]',
'["eng", "ars", "ara"]',
'["eng", "asm"]',
'["eng", "ava", "sqi"]',
'["eng", "ave"]',
'["eng", "aze"]',
'["eng", "aze", "deu"]',
'["eng", "aze", "hye"]',
'["eng", "aze", "ita"]',
'["eng", "aze", "rus"]',
'["eng", "bam", ""]',
'["eng", "bel"]',
'["eng", "bel", "rus"]',
'["eng", "ben"]',
'["eng", "ben", "deu"]',
'["eng", "ben", "fra"]',
'["eng", "ben", "hin"]',
'["eng", "ben", "mya"]',
'["eng", "ber"]',
'["eng", "ber", "afr"]',
'["eng", "ber", "deu"]',
'["eng", "ber", "est"]',
'["eng", "ber", "hun"]',
'["eng", "ber", "isl"]',
'["eng", "ber", "jpn"]',
'["eng", "ber", "lat"]',
'["eng", "ber", "nor"]',
'["eng", "ber", "pol"]',
'["eng", "ber", "por"]',
'["eng", "ber", "ron"]',
'["eng", "ber", "run"]',
'["eng", "ber", "slk"]',
'["eng", "ber", "spa"]',
'["eng", "ber", "tgl"]',
'["eng", "ber", "tlh"]',
'["eng", "ber", "tuk"]',
'["eng", "bod"]',
'["eng", "bod", "nep"]',
'["eng", "bos", "hrv"]',
'["eng", "bos", "srp"]',
'["eng", "bul"]',
'["eng", "bul", "deu"]',
'["eng", "bul", "fra"]',
'["eng", "bul", "jpn"]',
'["eng", "bul", "mkd"]',
'["eng", "bul", "mri"]',
'["eng", "bul", "nld"]',
'["eng", "bul", "rus"]',
'["eng", "bul", "srp"]',
'["eng", "cat"]',
'["eng", "cat", "fra"]',
'["eng", "cat", "ind"]',
'["eng", "cat", "isl"]',
'["eng", "cat", "jpn"]',
'["eng", "cat", "nld"]',
'["eng", "cat", "spa"]',
'["eng", "ces"]',
'["eng", "ces", "deu"]',
'["eng", "ces", "ell"]',
'["eng", "ces", "haw"]',
'["eng", "ces", "ind"]',
'["eng", "ces", "ita"]',
'["eng", "ces", "jpn"]',
'["eng", "ces", "por"]',
'["eng", "ces", "rus"]',
'["eng", "ces", "slk"]',
'["eng", "ces", "spa"]',
'["eng", "ces", "tuk"]',
'["eng", "cha"]',
'["eng", "chr"]',
'["eng", "chr", "ara"]',
'["eng", "chr", "deu"]',
'["eng", "chr", "ell"]',
'["eng", "chr", "fil"]',
'["eng", "chr", "isl"]',
'["eng", "chr", "kor"]',
'["eng", "chr", "rus"]',
'["eng", "chr", "spa"]',
'["eng", "chr", "zho"]',
'["eng", "chu", "oci"]',
'["eng", "cor"]',
'["eng", "", "cos"]',
'["eng", "cos"]',
'["eng", "cym"]',
'["eng", "cym", "deu"]',
'["eng", "cym", "fra"]',
'["eng", "cym", "jpn"]',
'["eng", "cym", "spa"]',
'["eng", "cym", "zho"]',
'["eng", "dan"]',
'["eng", "dan", "ber"]',
'["eng", "dan", "deu"]',
'["eng", "dan", "ell"]',
'["eng", "dan", "est"]',
'["eng", "dan", "fas"]',
'["eng", "dan", "fin"]',
'["eng", "dan", "fra"]',
'["eng", "dan", "gle"]',
'["eng", "dan", "hun"]',
'["eng", "dan", "isl"]',
'["eng", "dan", "ita"]',
'["eng", "dan", "jpn"]',
'["eng", "dan", "lat"]',
'["eng", "dan", "nld"]',
'["eng", "dan", "nob"]',
'["eng", "dan", "nor"]',
'["eng", "dan", "por"]',
'["eng", "dan", "rus"]',
'["eng", "dan", "slk"]',
'["eng", "dan", "spa"]',
'["eng", "dan", "swe"]',
'["eng", "dan", "tuk"]',
'["eng", "dan", "zho"]',
'["eng", "deu", ""]',
'["eng", "deu"]',
)
+25 -2
View File
@@ -1,13 +1,36 @@
"""Posts parent table with PostgreSQL weekly range partitioning on date column."""
"""Posts parent table and PostTopic table for the data_science_dev database."""
from __future__ import annotations
from pipelines.orm.data_science_dev.base import DataScienceDevBase
from pipelines.orm.data_science_dev.base import (
DataScienceDevBase,
DataScienceDevTableBase,
)
from pipelines.orm.data_science_dev.posts.columns import PostsColumns
from sqlalchemy import BigInteger, Index, SmallInteger
from sqlalchemy.orm import Mapped, mapped_column
class Posts(PostsColumns, DataScienceDevBase):
"""Parent partitioned table for posts, partitioned by week on `date`."""
__tablename__ = "posts"
__table_args__ = ({"postgresql_partition_by": "RANGE (date)"},)
class PostTopic(DataScienceDevTableBase):
"""Stores BERTopic topic assignments for posts.
post_id references main.posts but without a FK constraint
since posts is a partitioned table.
"""
__tablename__ = "post_topic"
__table_args__ = (Index("ix_post_topic_post_id", "post_id"),)
post_id: Mapped[int] = mapped_column(BigInteger)
topic_id: Mapped[int] = mapped_column(SmallInteger)
topic_label: Mapped[str | None]
model_version: Mapped[str | None]
+155
View File
@@ -0,0 +1,155 @@
"""Thing."""
from __future__ import annotations
import logging
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from dataclasses import dataclass
from multiprocessing import cpu_count
from typing import TYPE_CHECKING, Any, Literal, TypeVar
if TYPE_CHECKING:
from collections.abc import Callable, Mapping, Sequence
logger = logging.getLogger(__name__)
R = TypeVar("R")
modes = Literal["normal", "early_error"]
@dataclass
class ExecutorResults[R]:
"""Dataclass to store the results and exceptions of the parallel execution."""
results: list[R]
exceptions: list[BaseException]
def __repr__(self) -> str:
"""Return a string representation of the object."""
return f"results={self.results} exceptions={self.exceptions}"
def _parallelize_base[R](
executor_type: type[ThreadPoolExecutor | ProcessPoolExecutor],
func: Callable[..., R],
kwargs_list: Sequence[Mapping[str, Any]],
max_workers: int | None,
progress_tracker: int | None,
mode: modes,
) -> ExecutorResults:
total_work = len(kwargs_list)
with executor_type(max_workers=max_workers) as executor:
futures = [executor.submit(func, **kwarg) for kwarg in kwargs_list]
results = []
exceptions = []
for index, future in enumerate(futures, 1):
if exception := future.exception():
logger.error(f"{future} raised {exception.__class__.__name__}")
exceptions.append(exception)
if mode == "early_error":
executor.shutdown(wait=False)
raise exception
continue
results.append(future.result())
if progress_tracker and index % progress_tracker == 0:
logger.info(f"Progress: {index}/{total_work}")
return ExecutorResults(results, exceptions)
def parallelize_thread[R](
func: Callable[..., R],
kwargs_list: Sequence[Mapping[str, Any]],
max_workers: int | None = None,
progress_tracker: int | None = None,
mode: modes = "normal",
) -> ExecutorResults:
"""Generic function to run a function with multiple arguments in threads.
Args:
func (Callable[..., R]): Function to run in threads.
kwargs_list (Sequence[Mapping[str, Any]]): List of dictionaries with the arguments for the function.
max_workers (int, optional): Number of workers to use. Defaults to 8.
progress_tracker (int, optional): Number of tasks to complete before logging progress.
mode (modes, optional): Mode to use. Defaults to "normal".
Returns:
tuple[list[R], list[Exception]]: List with the results and a list with the exceptions.
"""
return _parallelize_base(
executor_type=ThreadPoolExecutor,
func=func,
kwargs_list=kwargs_list,
max_workers=max_workers,
progress_tracker=progress_tracker,
mode=mode,
)
def parallelize_process[R](
func: Callable[..., R],
kwargs_list: Sequence[Mapping[str, Any]],
max_workers: int | None = None,
progress_tracker: int | None = None,
mode: modes = "normal",
) -> ExecutorResults:
"""Generic function to run a function with multiple arguments in process.
Args:
func (Callable[..., R]): Function to run in process.
kwargs_list (Sequence[Mapping[str, Any]]): List of dictionaries with the arguments for the function.
max_workers (int, optional): Number of workers to use. Defaults to 4.
progress_tracker (int, optional): Number of tasks to complete before logging progress.
mode (modes, optional): Mode to use. Defaults to "normal".
Returns:
tuple[list[R], list[Exception]]: List with the results and a list with the exceptions.
"""
if max_workers and max_workers > cpu_count():
error = f"max_workers must be less than or equal to {cpu_count()}"
raise RuntimeError(error)
return process_executor_unchecked(
func=func,
kwargs_list=kwargs_list,
max_workers=max_workers,
progress_tracker=progress_tracker,
mode=mode,
)
def process_executor_unchecked[R](
func: Callable[..., R],
kwargs_list: Sequence[Mapping[str, Any]],
max_workers: int | None,
progress_tracker: int | None,
mode: modes = "normal",
) -> ExecutorResults:
"""Generic function to run a function with multiple arguments in parallel.
Note: this function does not check if the number of workers is greater than the number of CPUs.
This can cause the system to become unresponsive.
Args:
func (Callable[..., R]): Function to run in parallel.
kwargs_list (Sequence[Mapping[str, Any]]): List of dictionaries with the arguments for the function.
max_workers (int, optional): Number of workers to use. Defaults to 8.
progress_tracker (int, optional): Number of tasks to complete before logging progress.
mode (modes, optional): Mode to use. Defaults to "normal".
Returns:
tuple[list[R], list[Exception]]: List with the results and a list with the exceptions.
"""
return _parallelize_base(
executor_type=ProcessPoolExecutor,
func=func,
kwargs_list=kwargs_list,
max_workers=max_workers,
progress_tracker=progress_tracker,
mode=mode,
)