5 Commits

36 changed files with 2805 additions and 1311 deletions
+1 -1
View File
@@ -110,7 +110,7 @@ ipython_config.py
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-pipelines.
# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python.
# https://pdm-project.org/en/latest/usage/project/#working-with-version-control
# pdm.lock
# pdm.toml
@@ -0,0 +1,98 @@
"""adding LegislatorScoreFake.
Revision ID: 06f833813bd7
Revises: b63ed11d6775
Create Date: 2026-04-22 18:41:07.484609
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "06f833813bd7"
down_revision: str | None = "b63ed11d6775"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"legislator_score_fake",
sa.Column("legislator_id", sa.Integer(), nullable=False),
sa.Column("year", sa.Integer(), nullable=False),
sa.Column("topic", sa.String(), nullable=False),
sa.Column("score", sa.Float(), nullable=False),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["legislator_id"],
[f"{schema}.legislator.id"],
name=op.f("fk_legislator_score_fake_legislator_id_legislator"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_legislator_score_fake")),
sa.UniqueConstraint(
"legislator_id",
"year",
"topic",
name="uq_legislator_score_fake_legislator_id_year_topic",
),
schema=schema,
)
op.create_index(
op.f("ix_legislator_score_fake_legislator_id"),
"legislator_score_fake",
["legislator_id"],
unique=False,
schema=schema,
)
op.create_index(
"ix_legislator_score_fake_year_topic",
"legislator_score_fake",
["year", "topic"],
unique=False,
schema=schema,
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(
"ix_legislator_score_fake_year_topic",
table_name="legislator_score_fake",
schema=schema,
)
op.drop_index(
op.f("ix_legislator_score_fake_legislator_id"),
table_name="legislator_score_fake",
schema=schema,
)
op.drop_table("legislator_score_fake", schema=schema)
# ### end Alembic commands ###
@@ -0,0 +1,64 @@
"""add vote.bill_text_id linkage.
Revision ID: 9c7d4a2e1b10
Revises: 06f833813bd7
Create Date: 2026-04-23 00:00:00.000000
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "9c7d4a2e1b10"
down_revision: str | None = "06f833813bd7"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
op.add_column(
"vote",
sa.Column("bill_text_id", sa.Integer(), nullable=True),
schema=schema,
)
op.create_index(
"ix_vote_bill_text_id",
"vote",
["bill_text_id"],
unique=False,
schema=schema,
)
op.create_foreign_key(
"fk_vote_bill_text_id_bill_text",
"vote",
"bill_text",
["bill_text_id"],
["id"],
source_schema=schema,
referent_schema=schema,
)
def downgrade() -> None:
"""Downgrade."""
op.drop_constraint(
"fk_vote_bill_text_id_bill_text",
"vote",
schema=schema,
type_="foreignkey",
)
op.drop_index("ix_vote_bill_text_id", table_name="vote", schema=schema)
op.drop_column("vote", "bill_text_id", schema=schema)
@@ -0,0 +1,844 @@
"""canonical vote context v3.
Revision ID: 1f8c0e7a9d21
Revises: 9c7d4a2e1b10
Create Date: 2026-04-25 00:00:00.000000
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
revision: str = "1f8c0e7a9d21"
down_revision: str | None = "9c7d4a2e1b10"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
op.create_table(
"ingest_run",
sa.Column("started_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("completed_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("git_sha", sa.String(), nullable=True),
sa.Column("classifier_version", sa.String(), nullable=True),
sa.Column("source_snapshot_label", sa.String(), nullable=True),
sa.Column("status", sa.String(), nullable=False),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_ingest_run")),
schema=schema,
)
op.create_table(
"source_artifact",
sa.Column("source_kind", sa.String(), nullable=False),
sa.Column("congress", sa.Integer(), nullable=False),
sa.Column("chamber", sa.String(), nullable=True),
sa.Column("local_path", sa.String(), nullable=False),
sa.Column("source_url", sa.String(), nullable=True),
sa.Column("sha256", sa.String(), nullable=False),
sa.Column("byte_size", sa.Integer(), nullable=False),
sa.Column("modified_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("ingested_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("ingest_run_id", sa.Integer(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["ingest_run_id"],
[f"{schema}.ingest_run.id"],
name=op.f("fk_source_artifact_ingest_run_id_ingest_run"),
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_source_artifact")),
schema=schema,
)
op.create_index(
"ix_source_artifact_source_kind",
"source_artifact",
["source_kind"],
unique=False,
schema=schema,
)
op.create_index(
"ix_source_artifact_congress",
"source_artifact",
["congress"],
unique=False,
schema=schema,
)
op.create_table(
"score_run",
sa.Column("ingest_run_id", sa.Integer(), nullable=True),
sa.Column("classifier_version", sa.String(), nullable=True),
sa.Column("scoring_version", sa.String(), nullable=True),
sa.Column("included_vote_count", sa.Integer(), nullable=False),
sa.Column("excluded_vote_count", sa.Integer(), nullable=False),
sa.Column("started_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("completed_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["ingest_run_id"],
[f"{schema}.ingest_run.id"],
name=op.f("fk_score_run_ingest_run_id_ingest_run"),
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_score_run")),
schema=schema,
)
op.add_column(
"legislator_score",
sa.Column("score_run_id", sa.Integer(), nullable=True),
schema=schema,
)
op.create_index(
op.f("ix_legislator_score_score_run_id"),
"legislator_score",
["score_run_id"],
unique=False,
schema=schema,
)
op.create_foreign_key(
op.f("fk_legislator_score_score_run_id_score_run"),
"legislator_score",
"score_run",
["score_run_id"],
["id"],
source_schema=schema,
referent_schema=schema,
ondelete="CASCADE",
)
op.add_column(
"bill_text",
sa.Column("source_datetime_raw", sa.String(), nullable=True),
schema=schema,
)
op.add_column(
"bill_text", sa.Column("text_url_xml", sa.String(), nullable=True), schema=schema
)
op.add_column(
"bill_text", sa.Column("text_url_pdf", sa.String(), nullable=True), schema=schema
)
op.add_column(
"bill_text",
sa.Column("text_url_html", sa.String(), nullable=True),
schema=schema,
)
op.add_column(
"bill_text",
sa.Column("source_artifact_id", sa.Integer(), nullable=True),
schema=schema,
)
op.create_foreign_key(
op.f("fk_bill_text_source_artifact_id_source_artifact"),
"bill_text",
"source_artifact",
["source_artifact_id"],
["id"],
source_schema=schema,
referent_schema=schema,
ondelete="SET NULL",
)
op.create_table(
"bill_action",
sa.Column("bill_id", sa.Integer(), nullable=False),
sa.Column("sequence", sa.Integer(), nullable=False),
sa.Column("action_date", sa.Date(), nullable=False),
sa.Column("action_time", sa.String(), nullable=True),
sa.Column("action_text", sa.String(), nullable=False),
sa.Column("action_type", sa.String(), nullable=True),
sa.Column("action_code", sa.String(), nullable=True),
sa.Column("source_system_code", sa.String(), nullable=True),
sa.Column("source_system_name", sa.String(), nullable=True),
sa.Column("source_artifact_id", sa.Integer(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["bill_id"],
[f"{schema}.bill.id"],
name=op.f("fk_bill_action_bill_id_bill"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["source_artifact_id"],
[f"{schema}.source_artifact.id"],
name=op.f("fk_bill_action_source_artifact_id_source_artifact"),
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_bill_action")),
sa.UniqueConstraint("bill_id", "sequence", name="uq_bill_action_bill_id_sequence"),
schema=schema,
)
op.create_table(
"bill_action_recorded_vote",
sa.Column("bill_action_id", sa.Integer(), nullable=False),
sa.Column("congress", sa.Integer(), nullable=False),
sa.Column("chamber", sa.String(), nullable=False),
sa.Column("session_number", sa.Integer(), nullable=False),
sa.Column("roll_number", sa.Integer(), nullable=False),
sa.Column("vote_datetime", sa.DateTime(timezone=True), nullable=True),
sa.Column("vote_url", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["bill_action_id"],
[f"{schema}.bill_action.id"],
name=op.f("fk_bill_action_recorded_vote_bill_action_id_bill_action"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_bill_action_recorded_vote")),
sa.UniqueConstraint(
"bill_action_id",
"congress",
"chamber",
"session_number",
"roll_number",
name="uq_bill_action_recorded_vote_match_key",
),
schema=schema,
)
op.create_table(
"bill_relation",
sa.Column("bill_id", sa.Integer(), nullable=False),
sa.Column("related_bill_id", sa.Integer(), nullable=False),
sa.Column("relationship_type", sa.String(), nullable=False),
sa.Column("identified_by", sa.String(), nullable=True),
sa.Column("latest_action_date", sa.Date(), nullable=True),
sa.Column("latest_action_text", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["bill_id"],
[f"{schema}.bill.id"],
name=op.f("fk_bill_relation_bill_id_bill"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["related_bill_id"],
[f"{schema}.bill.id"],
name=op.f("fk_bill_relation_related_bill_id_bill"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_bill_relation")),
schema=schema,
)
op.create_index(
"ix_bill_relation_bill_id",
"bill_relation",
["bill_id"],
unique=False,
schema=schema,
)
op.create_index(
"ix_bill_relation_related_bill_id",
"bill_relation",
["related_bill_id"],
unique=False,
schema=schema,
)
op.create_table(
"amendment",
sa.Column("congress", sa.Integer(), nullable=False),
sa.Column("amendment_type", sa.String(), nullable=False),
sa.Column("number", sa.Integer(), nullable=False),
sa.Column("chamber", sa.String(), nullable=False),
sa.Column("description", sa.String(), nullable=True),
sa.Column("purpose", sa.String(), nullable=True),
sa.Column("amended_bill_id", sa.Integer(), nullable=True),
sa.Column("amended_amendment_id", sa.Integer(), nullable=True),
sa.Column("source_path", sa.String(), nullable=True),
sa.Column("source_artifact_id", sa.Integer(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["amended_amendment_id"],
[f"{schema}.amendment.id"],
name=op.f("fk_amendment_amended_amendment_id_amendment"),
ondelete="SET NULL",
),
sa.ForeignKeyConstraint(
["amended_bill_id"],
[f"{schema}.bill.id"],
name=op.f("fk_amendment_amended_bill_id_bill"),
ondelete="SET NULL",
),
sa.ForeignKeyConstraint(
["source_artifact_id"],
[f"{schema}.source_artifact.id"],
name=op.f("fk_amendment_source_artifact_id_source_artifact"),
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_amendment")),
sa.UniqueConstraint(
"congress",
"amendment_type",
"number",
name="uq_amendment_congress_type_number",
),
schema=schema,
)
op.create_table(
"amendment_action",
sa.Column("amendment_id", sa.Integer(), nullable=False),
sa.Column("sequence", sa.Integer(), nullable=False),
sa.Column("action_date", sa.Date(), nullable=False),
sa.Column("action_time", sa.String(), nullable=True),
sa.Column("action_text", sa.String(), nullable=False),
sa.Column("action_type", sa.String(), nullable=True),
sa.Column("action_code", sa.String(), nullable=True),
sa.Column("source_system_code", sa.String(), nullable=True),
sa.Column("source_system_name", sa.String(), nullable=True),
sa.Column("source_artifact_id", sa.Integer(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["amendment_id"],
[f"{schema}.amendment.id"],
name=op.f("fk_amendment_action_amendment_id_amendment"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["source_artifact_id"],
[f"{schema}.source_artifact.id"],
name=op.f("fk_amendment_action_source_artifact_id_source_artifact"),
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_amendment_action")),
sa.UniqueConstraint(
"amendment_id",
"sequence",
name="uq_amendment_action_amendment_id_sequence",
),
schema=schema,
)
op.create_table(
"amendment_action_recorded_vote",
sa.Column("amendment_action_id", sa.Integer(), nullable=False),
sa.Column("congress", sa.Integer(), nullable=False),
sa.Column("chamber", sa.String(), nullable=False),
sa.Column("session_number", sa.Integer(), nullable=False),
sa.Column("roll_number", sa.Integer(), nullable=False),
sa.Column("vote_datetime", sa.DateTime(timezone=True), nullable=True),
sa.Column("vote_url", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["amendment_action_id"],
[f"{schema}.amendment_action.id"],
name=op.f(
"fk_amendment_action_recorded_vote_amendment_action_id_amendment_action"
),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_amendment_action_recorded_vote")),
sa.UniqueConstraint(
"amendment_action_id",
"congress",
"chamber",
"session_number",
"roll_number",
name="uq_amendment_action_recorded_vote_match_key",
),
schema=schema,
)
op.drop_constraint(
"uq_vote_congress_chamber_session_number",
"vote",
schema=schema,
type_="unique",
)
op.alter_column("vote", "session", new_column_name="session_year", schema=schema)
op.alter_column("vote", "number", new_column_name="roll_number", schema=schema)
op.add_column("vote", sa.Column("session_number", sa.Integer(), nullable=True), schema=schema)
op.add_column(
"vote",
sa.Column("vote_datetime", sa.DateTime(timezone=True), nullable=True),
schema=schema,
)
op.add_column(
"vote", sa.Column("raw_vote_source_url", sa.String(), nullable=True), schema=schema
)
op.add_column(
"vote",
sa.Column("raw_bill_ref", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
schema=schema,
)
op.add_column(
"vote",
sa.Column(
"raw_amendment_ref",
postgresql.JSONB(astext_type=sa.Text()),
nullable=True,
),
schema=schema,
)
op.add_column(
"vote",
sa.Column(
"raw_nomination_ref",
postgresql.JSONB(astext_type=sa.Text()),
nullable=True,
),
schema=schema,
)
op.add_column(
"vote",
sa.Column(
"raw_treaty_ref",
postgresql.JSONB(astext_type=sa.Text()),
nullable=True,
),
schema=schema,
)
op.add_column(
"vote",
sa.Column("raw_vote_source_artifact_id", sa.Integer(), nullable=True),
schema=schema,
)
op.create_foreign_key(
op.f("fk_vote_raw_vote_source_artifact_id_source_artifact"),
"vote",
"source_artifact",
["raw_vote_source_artifact_id"],
["id"],
source_schema=schema,
referent_schema=schema,
ondelete="SET NULL",
)
op.execute(
sa.text(
f"""
UPDATE {schema}.vote
SET session_number = session_year - (((congress - 1) * 2) + 1789) + 1
"""
)
)
op.alter_column("vote", "session_number", nullable=False, schema=schema)
op.create_unique_constraint(
"uq_vote_congress_chamber_session_number_roll_number",
"vote",
["congress", "chamber", "session_number", "roll_number"],
schema=schema,
)
op.drop_constraint(
op.f("fk_vote_bill_id_bill"),
"vote",
schema=schema,
type_="foreignkey",
)
op.drop_constraint(
"fk_vote_bill_text_id_bill_text",
"vote",
schema=schema,
type_="foreignkey",
)
op.drop_index("ix_vote_bill_text_id", table_name="vote", schema=schema)
op.drop_column("vote", "bill_id", schema=schema)
op.drop_column("vote", "bill_text_id", schema=schema)
op.create_table(
"vote_action_match",
sa.Column("vote_id", sa.Integer(), nullable=False),
sa.Column("action_scope", sa.String(), nullable=False),
sa.Column("bill_action_id", sa.Integer(), nullable=True),
sa.Column("amendment_action_id", sa.Integer(), nullable=True),
sa.Column("is_selected", sa.Boolean(), nullable=False),
sa.Column("match_method", sa.String(), nullable=False),
sa.Column("match_reason", sa.String(), nullable=True),
sa.Column("match_confidence", sa.String(), nullable=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["amendment_action_id"],
[f"{schema}.amendment_action.id"],
name=op.f("fk_vote_action_match_amendment_action_id_amendment_action"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["bill_action_id"],
[f"{schema}.bill_action.id"],
name=op.f("fk_vote_action_match_bill_action_id_bill_action"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["vote_id"],
[f"{schema}.vote.id"],
name=op.f("fk_vote_action_match_vote_id_vote"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_vote_action_match")),
schema=schema,
)
op.create_index(
"ix_vote_action_match_vote_id",
"vote_action_match",
["vote_id"],
unique=False,
schema=schema,
)
op.create_index(
"uq_vote_action_match_selected_vote_id",
"vote_action_match",
["vote_id"],
unique=True,
schema=schema,
postgresql_where=sa.text("is_selected"),
)
op.create_table(
"vote_classification",
sa.Column("vote_id", sa.Integer(), nullable=False),
sa.Column("subject_type", sa.String(), nullable=False),
sa.Column("measure_type", sa.String(), nullable=True),
sa.Column("measure_subtype", sa.String(), nullable=True),
sa.Column("measure_function", sa.String(), nullable=True),
sa.Column("vote_relationship", sa.String(), nullable=False),
sa.Column("is_legislation_related", sa.Boolean(), nullable=False),
sa.Column("is_direct_vote_on_legislative_text", sa.Boolean(), nullable=False),
sa.Column("is_substantive_policy_vote", sa.Boolean(), nullable=False),
sa.Column("is_lawmaking_vehicle", sa.Boolean(), nullable=False),
sa.Column("is_special_rule", sa.Boolean(), nullable=False),
sa.Column("classification_method", sa.String(), nullable=False),
sa.Column("classification_confidence_reason", sa.String(), nullable=True),
sa.Column("confidence", sa.String(), nullable=False),
sa.Column("classified_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("classification_version", sa.String(), nullable=False),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["vote_id"],
[f"{schema}.vote.id"],
name=op.f("fk_vote_classification_vote_id_vote"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_vote_classification")),
sa.UniqueConstraint("vote_id", name=op.f("uq_vote_classification_vote_id")),
schema=schema,
)
op.create_index(
"ix_vote_classification_subject_type",
"vote_classification",
["subject_type"],
unique=False,
schema=schema,
)
op.create_table(
"vote_measure_link",
sa.Column("vote_id", sa.Integer(), nullable=False),
sa.Column("measure_id", sa.Integer(), nullable=False),
sa.Column("role", sa.String(), nullable=False),
sa.Column("source", sa.String(), nullable=False),
sa.Column("confidence", sa.String(), nullable=False),
sa.Column("notes", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["measure_id"],
[f"{schema}.bill.id"],
name=op.f("fk_vote_measure_link_measure_id_bill"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_vote_measure_link")),
schema=schema,
)
op.create_index(
"ix_vote_measure_link_vote_id",
"vote_measure_link",
["vote_id"],
unique=False,
schema=schema,
)
op.create_foreign_key(
op.f("fk_vote_measure_link_vote_id_vote"),
"vote_measure_link",
"vote",
["vote_id"],
["id"],
source_schema=schema,
referent_schema=schema,
ondelete="CASCADE",
)
op.create_table(
"vote_text_target",
sa.Column("vote_id", sa.Integer(), nullable=False),
sa.Column("text_target_type", sa.String(), nullable=False),
sa.Column("voted_text_version_id", sa.Integer(), nullable=True),
sa.Column("resulting_text_version_id", sa.Integer(), nullable=True),
sa.Column("related_amendment_id", sa.Integer(), nullable=True),
sa.Column("text_target_basis", sa.String(), nullable=False),
sa.Column("text_resolution_method", sa.String(), nullable=False),
sa.Column("text_resolution_confidence_reason", sa.String(), nullable=True),
sa.Column("confidence", sa.String(), nullable=False),
sa.Column("notes", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["related_amendment_id"],
[f"{schema}.amendment.id"],
name=op.f("fk_vote_text_target_related_amendment_id_amendment"),
ondelete="SET NULL",
),
sa.ForeignKeyConstraint(
["resulting_text_version_id"],
[f"{schema}.bill_text.id"],
name=op.f("fk_vote_text_target_resulting_text_version_id_bill_text"),
ondelete="SET NULL",
),
sa.ForeignKeyConstraint(
["vote_id"],
[f"{schema}.vote.id"],
name=op.f("fk_vote_text_target_vote_id_vote"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["voted_text_version_id"],
[f"{schema}.bill_text.id"],
name=op.f("fk_vote_text_target_voted_text_version_id_bill_text"),
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_vote_text_target")),
sa.UniqueConstraint("vote_id", name=op.f("uq_vote_text_target_vote_id")),
schema=schema,
)
op.create_table(
"vote_position_meaning",
sa.Column("vote_id", sa.Integer(), nullable=False),
sa.Column("yea_effect", sa.String(), nullable=False),
sa.Column("nay_effect", sa.String(), nullable=False),
sa.Column("present_effect", sa.String(), nullable=False),
sa.Column("polarity_confidence", sa.String(), nullable=False),
sa.Column("polarity_method", sa.String(), nullable=False),
sa.Column("notes", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["vote_id"],
[f"{schema}.vote.id"],
name=op.f("fk_vote_position_meaning_vote_id_vote"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_vote_position_meaning")),
sa.UniqueConstraint("vote_id", name=op.f("uq_vote_position_meaning_vote_id")),
schema=schema,
)
op.create_table(
"vote_context_audit",
sa.Column("vote_id", sa.Integer(), nullable=False),
sa.Column("step", sa.String(), nullable=False),
sa.Column("message", sa.String(), nullable=False),
sa.Column("severity", sa.String(), nullable=False),
sa.Column("source_path", sa.String(), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["vote_id"],
[f"{schema}.vote.id"],
name=op.f("fk_vote_context_audit_vote_id_vote"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_vote_context_audit")),
schema=schema,
)
op.create_index(
"ix_vote_context_audit_vote_id",
"vote_context_audit",
["vote_id"],
unique=False,
schema=schema,
)
def downgrade() -> None:
"""Downgrade."""
raise NotImplementedError("Downgrade is not supported for canonical vote context v3.")
@@ -0,0 +1,203 @@
"""add supporting indexes for congress vote context and scoring.
Revision ID: a7b91c4e2d30
Revises: 1f8c0e7a9d21
Create Date: 2026-04-26 00:00:00.000000
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
revision: str = "a7b91c4e2d30"
down_revision: str | None = "1f8c0e7a9d21"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def _dedupe_source_artifacts() -> None:
op.execute(
sa.text(
f"""
CREATE TEMP TABLE tmp_source_artifact_dups AS
WITH ranked AS (
SELECT
id,
first_value(id) OVER (
PARTITION BY ingest_run_id, local_path, sha256
ORDER BY id
) AS keep_id,
row_number() OVER (
PARTITION BY ingest_run_id, local_path, sha256
ORDER BY id
) AS rn
FROM {schema}.source_artifact
WHERE ingest_run_id IS NOT NULL
)
SELECT id, keep_id
FROM ranked
WHERE rn > 1
"""
)
)
for table_name, column_name in (
("bill_text", "source_artifact_id"),
("bill_action", "source_artifact_id"),
("amendment", "source_artifact_id"),
("amendment_action", "source_artifact_id"),
("vote", "raw_vote_source_artifact_id"),
):
op.execute(
sa.text(
f"""
UPDATE {schema}.{table_name} AS target
SET {column_name} = d.keep_id
FROM tmp_source_artifact_dups AS d
WHERE target.{column_name} = d.id
"""
)
)
op.execute(
sa.text(
f"""
DELETE FROM {schema}.source_artifact AS source_artifact
USING tmp_source_artifact_dups AS d
WHERE source_artifact.id = d.id
"""
)
)
op.execute(sa.text("DROP TABLE tmp_source_artifact_dups"))
def upgrade() -> None:
"""Upgrade."""
_dedupe_source_artifacts()
op.create_index(
"uq_source_artifact_ingest_identity",
"source_artifact",
["ingest_run_id", "local_path", "sha256"],
unique=True,
schema=schema,
)
op.create_index(
"ix_bill_action_recorded_vote_match_tuple",
"bill_action_recorded_vote",
["congress", "chamber", "session_number", "roll_number"],
unique=False,
schema=schema,
)
op.create_index(
"ix_amendment_action_recorded_vote_match_tuple",
"amendment_action_recorded_vote",
["congress", "chamber", "session_number", "roll_number"],
unique=False,
schema=schema,
)
op.create_index(
"ix_vote_classification_eligible_vote_id",
"vote_classification",
["vote_id"],
unique=False,
schema=schema,
postgresql_where=sa.text(
"subject_type = 'measure' "
"AND vote_relationship = 'direct_text_vote' "
"AND is_direct_vote_on_legislative_text "
"AND is_substantive_policy_vote "
"AND NOT is_special_rule"
),
)
op.create_index(
"ix_vote_measure_link_vote_id_role",
"vote_measure_link",
["vote_id", "role"],
unique=False,
schema=schema,
)
op.create_index(
"ix_vote_measure_link_measure_id_role",
"vote_measure_link",
["measure_id", "role"],
unique=False,
schema=schema,
)
op.create_index(
"ix_vote_text_target_voted_text_version_id",
"vote_text_target",
["voted_text_version_id"],
unique=False,
schema=schema,
postgresql_where=sa.text("voted_text_version_id IS NOT NULL"),
)
op.create_index(
"ix_vote_context_audit_severity_vote_id",
"vote_context_audit",
["severity", "vote_id"],
unique=False,
schema=schema,
)
op.create_index(
"ix_legislator_current_chamber",
"legislator",
["current_chamber"],
unique=False,
schema=schema,
)
def downgrade() -> None:
"""Downgrade."""
op.drop_index("ix_legislator_current_chamber", table_name="legislator", schema=schema)
op.drop_index(
"ix_vote_context_audit_severity_vote_id",
table_name="vote_context_audit",
schema=schema,
)
op.drop_index(
"ix_vote_text_target_voted_text_version_id",
table_name="vote_text_target",
schema=schema,
)
op.drop_index(
"ix_vote_measure_link_measure_id_role",
table_name="vote_measure_link",
schema=schema,
)
op.drop_index(
"ix_vote_measure_link_vote_id_role",
table_name="vote_measure_link",
schema=schema,
)
op.drop_index(
"ix_vote_classification_eligible_vote_id",
table_name="vote_classification",
schema=schema,
)
op.drop_index(
"ix_amendment_action_recorded_vote_match_tuple",
table_name="amendment_action_recorded_vote",
schema=schema,
)
op.drop_index(
"ix_bill_action_recorded_vote_match_tuple",
table_name="bill_action_recorded_vote",
schema=schema,
)
op.drop_index(
"uq_source_artifact_ingest_identity",
table_name="source_artifact",
schema=schema,
)
@@ -0,0 +1,66 @@
"""adding PostTopic.
Revision ID: 032e26bbfcb5
Revises: a7b91c4e2d30
Create Date: 2026-04-26 14:34:35.688341
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "032e26bbfcb5"
down_revision: str | None = "a7b91c4e2d30"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"post_topic",
sa.Column("post_id", sa.BigInteger(), nullable=False),
sa.Column("topic_id", sa.SmallInteger(), nullable=False),
sa.Column("topic_label", sa.String(), nullable=True),
sa.Column("model_version", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_post_topic")),
schema=schema,
)
op.create_index(
"ix_post_topic_post_id", "post_topic", ["post_id"], unique=False, schema=schema
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index("ix_post_topic_post_id", table_name="post_topic", schema=schema)
op.drop_table("post_topic", schema=schema)
# ### end Alembic commands ###
@@ -0,0 +1,35 @@
"""adding PG Vector.
Revision ID: b9360b0b0c22
Revises: 032e26bbfcb5
Create Date: 2026-04-26 14:35:08.770128
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "b9360b0b0c22"
down_revision: str | None = "032e26bbfcb5"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
op.execute("CREATE EXTENSION IF NOT EXISTS vector")
def downgrade() -> None:
"""Downgrade."""
op.execute("DROP EXTENSION IF EXISTS vector")
+2 -3
View File
@@ -69,9 +69,8 @@ class BenchmarkConfig:
def get_config_dir() -> Path:
"""Get the path to the config directory."""
return Path(__file__).resolve().parents[2] / "config"
"""Get the path to the config file."""
return Path(__file__).resolve().parent.parent.parent / "config"
def default_config_path() -> Path:
"""Get the path to the config file."""
-1
View File
@@ -1 +0,0 @@
"""Init."""
+235
View File
@@ -0,0 +1,235 @@
"""Docker container lifecycle management for BERTopic jobs on Jeeves."""
from __future__ import annotations
import logging
import os
import subprocess
from pathlib import Path
from typing import Annotated, Literal
import typer
logger = logging.getLogger(__name__)
JOBMode = Literal["train", "infer"]
IMAGE_NAME = "bert-topic:latest"
REPO_DIR = Path(__file__).resolve().parents[3]
DEFAULT_CACHE_ROOT = Path("/zfs/storage/main/ds_thing/models/bert_topic")
DEFAULT_POSTGRES_SOCKET_DIR = Path("/run/postgresql")
DB_ENV_VARS = (
"DATA_SCIENCE_DEV_DB",
"DATA_SCIENCE_DEV_HOST",
"DATA_SCIENCE_DEV_PORT",
"DATA_SCIENCE_DEV_USER",
"DATA_SCIENCE_DEV_PASSWORD",
)
app = typer.Typer(help="BERTopic container management.")
def _container_name(mode: JOBMode) -> str:
"""Return the Docker container name for the selected BERTopic job."""
return f"bert-topic-{mode}"
def _module_name(mode: JOBMode) -> str:
"""Return the Python module to run inside the container."""
return f"pipelines.bert_topic.{mode}"
def _env_args(*, use_postgres_socket: bool) -> list[str]:
"""Pass through database environment variables from the host shell."""
required = [
"DATA_SCIENCE_DEV_DB",
"DATA_SCIENCE_DEV_PORT",
"DATA_SCIENCE_DEV_USER",
]
if not use_postgres_socket:
required.append("DATA_SCIENCE_DEV_HOST")
missing = [name for name in required if not os.getenv(name)]
if missing:
message = "Missing required database environment variables: " + ", ".join(
missing
)
raise RuntimeError(message)
args: list[str] = []
if use_postgres_socket:
args.extend(["-e", f"DATA_SCIENCE_DEV_HOST={DEFAULT_POSTGRES_SOCKET_DIR}"])
for name in DB_ENV_VARS:
if use_postgres_socket and name == "DATA_SCIENCE_DEV_HOST":
continue
if os.getenv(name):
args.extend(["-e", name])
return args
def build_image() -> None:
"""Build the BERTopic Docker image."""
dockerfile = REPO_DIR / "pipelines/containers/docker_files/Dockerfile.bert_topic"
logger.info("Building BERTopic image: %s", IMAGE_NAME)
result = subprocess.run(
[
"docker",
"build",
"--network",
"host",
"-f",
str(dockerfile),
"-t",
IMAGE_NAME,
str(REPO_DIR),
],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
message = (
"Failed to build BERTopic image. "
f"docker build stderr:\n{result.stderr.strip()}"
)
raise RuntimeError(message)
logger.info("Image built: %s", IMAGE_NAME)
def stop_job(*, mode: JOBMode) -> None:
"""Stop and remove the BERTopic container for the selected mode."""
container_name = _container_name(mode)
logger.info("Stopping BERTopic container: %s", container_name)
subprocess.run(["docker", "stop", container_name], capture_output=True, check=False)
subprocess.run(
["docker", "rm", "-f", container_name], capture_output=True, check=False
)
def start_job(
*,
mode: JOBMode,
cache_root: Path = DEFAULT_CACHE_ROOT,
postgres_socket_dir: Path = DEFAULT_POSTGRES_SOCKET_DIR,
detach: bool = False,
) -> None:
"""Run BERTopic training or inference in Docker on Jeeves."""
cache_root = cache_root.resolve()
cache_root.mkdir(parents=True, exist_ok=True)
postgres_socket_dir = postgres_socket_dir.resolve()
stop_job(mode=mode)
use_postgres_socket = postgres_socket_dir.exists()
command = [
"docker",
"run",
"--name",
_container_name(mode),
"--ipc=host",
"-v",
f"{cache_root}:/cache",
*_env_args(use_postgres_socket=use_postgres_socket),
IMAGE_NAME,
_module_name(mode),
]
if use_postgres_socket:
command[7:7] = ["-v", f"{postgres_socket_dir}:{DEFAULT_POSTGRES_SOCKET_DIR}"]
if detach:
command.insert(2, "-d")
logger.info("Starting BERTopic %s container", mode)
logger.info(" Cache root: %s", cache_root)
if use_postgres_socket:
logger.info(" Postgres socket: %s", postgres_socket_dir)
result = subprocess.run(command, text=True, capture_output=detach, check=False)
if result.returncode != 0:
detail = (
result.stderr.strip() if result.stderr else f"exit code {result.returncode}"
)
raise RuntimeError(f"BERTopic container failed to start: {detail}")
if detach:
logger.info("Container started: %s", result.stdout.strip()[:12])
else:
logger.info("BERTopic %s run complete", mode)
def logs_job(*, mode: JOBMode) -> str | None:
"""Return recent logs from the BERTopic container, or None if absent."""
result = subprocess.run(
["docker", "logs", "--tail", "100", _container_name(mode)],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
return None
return result.stdout + result.stderr
@app.command()
def build(
log_level: Annotated[str, typer.Option(help="Log level")] = "INFO",
) -> None:
"""Build the BERTopic Docker image."""
logging.basicConfig(
level=log_level,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
build_image()
@app.command("run")
def run_job_command(
mode: Annotated[JOBMode, typer.Option(help="Which BERTopic job to run")] = "train",
cache_root: Annotated[
Path, typer.Option(help="Host path mounted to /cache for model and HF cache")
] = DEFAULT_CACHE_ROOT,
postgres_socket_dir: Annotated[
Path, typer.Option(help="Host Postgres socket directory to mount into the container")
] = DEFAULT_POSTGRES_SOCKET_DIR,
detach: Annotated[
bool, typer.Option(help="Start the container in the background")
] = False,
log_level: Annotated[str, typer.Option(help="Log level")] = "INFO",
) -> None:
"""Run BERTopic training or inference inside Docker."""
logging.basicConfig(
level=log_level,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
start_job(
mode=mode,
cache_root=cache_root,
postgres_socket_dir=postgres_socket_dir,
detach=detach,
)
@app.command("stop")
def stop_job_command(
mode: Annotated[
JOBMode, typer.Option(help="Which BERTopic container to stop")
] = "train",
) -> None:
"""Stop and remove the BERTopic container."""
stop_job(mode=mode)
@app.command("logs")
def logs_job_command(
mode: Annotated[
JOBMode, typer.Option(help="Which BERTopic container logs to show")
] = "train",
) -> None:
"""Show recent logs from the BERTopic container."""
output = logs_job(mode=mode)
if output is None:
typer.echo(f"No BERTopic container found for mode={mode}.")
raise typer.Exit(code=1)
typer.echo(output)
def cli() -> None:
"""Typer entry point."""
app()
if __name__ == "__main__":
cli()
@@ -0,0 +1,38 @@
FROM python:3.12-bookworm
ENV DEBIAN_FRONTEND=noninteractive
ENV PYTHONUNBUFFERED=1
ENV PIP_NO_CACHE_DIR=1
RUN apt-get update && apt-get install -y \
build-essential \
gcc \
g++ \
git \
libgomp1 \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY pipelines ./pipelines
RUN python -m pip install --upgrade pip setuptools wheel && \
python -m pip install \
torch \
--index-url https://download.pytorch.org/whl/cpu && \
python -m pip install \
typer \
sqlalchemy \
bertopic \
sentence-transformers \
scikit-learn \
pandas \
numpy \
"psycopg[binary]"
ENV HF_HOME=/cache/huggingface
ENV TRANSFORMERS_CACHE=/cache/huggingface
ENTRYPOINT ["python", "-m"]
CMD ["pipelines.bert_topic.train"]
@@ -0,0 +1,11 @@
FROM ghcr.io/unslothai/unsloth:latest
RUN pip install --no-cache-dir typer
WORKDIR /workspace
COPY python/prompt_bench/finetune.py python/prompt_bench/finetune.py
COPY config/prompts/summarization_prompts.toml config/prompts/summarization_prompts.toml
COPY python/prompt_bench/__init__.py python/prompt_bench/__init__.py
COPY python/__init__.py python/__init__.py
ENTRYPOINT ["python", "-m", "pipelines.prompt_bench.finetune"]
@@ -9,7 +9,7 @@ from typing import Annotated
import typer
from pipelines.tools.containers.lib import check_gpu_free
from pipelines.pipelines.containers.lib import check_gpu_free
logger = logging.getLogger(__name__)
@@ -27,7 +27,7 @@ def build_image() -> None:
"docker",
"build",
"-f",
str(REPO_DIR / "python/prompt_bench/Dockerfile.finetune"),
str(REPO_DIR / "pipelines/containers/docker_files/Dockerfile.finetune"),
"-t",
FINETUNE_IMAGE,
".",
-671
View File
@@ -1,671 +0,0 @@
"""Ingestion pipeline for loading congress data from unitedstates/congress JSON files.
Loads legislators, bills, votes, vote records, and bill text into the data_science_dev database.
Expects the parent directory to contain congress-tracker/ and congress-legislators/ as siblings.
Usage:
ingest-congress /path/to/parent/
ingest-congress /path/to/parent/ --congress 118
ingest-congress /path/to/parent/ --congress 118 --only bills
"""
from __future__ import annotations
import logging
from pathlib import Path # noqa: TC003 needed at runtime for typer CLI argument
from typing import TYPE_CHECKING, Annotated
import orjson
import typer
import yaml
from sqlalchemy import select
from sqlalchemy.orm import Session
from pipelines.pipelines.common import configure_logger
from pipelines.orm.common import get_postgres_engine
from pipelines.orm.data_science_dev.congress import (
Bill,
BillText,
Legislator,
LegislatorSocialMedia,
Vote,
VoteRecord,
)
if TYPE_CHECKING:
from collections.abc import Iterator
from sqlalchemy.engine import Engine
logger = logging.getLogger(__name__)
BATCH_SIZE = 10_000
app = typer.Typer(help="Ingest unitedstates/congress data into data_science_dev.")
@app.command()
def main(
parent_dir: Annotated[
Path,
typer.Argument(
help="Parent directory containing congress-tracker/ and congress-legislators/"
),
],
congress: Annotated[
int | None, typer.Option(help="Only ingest a specific congress number")
] = None,
only: Annotated[
str | None,
typer.Option(
help="Only run a specific step: legislators, social-media, bills, votes, bill-text"
),
] = None,
) -> None:
"""Ingest congress data from unitedstates/congress JSON files."""
configure_logger(level="INFO")
data_dir = parent_dir / "congress-tracker/congress/data/"
legislators_dir = parent_dir / "congress-legislators"
if not data_dir.is_dir():
typer.echo(f"Expected congress-tracker/ directory: {data_dir}", err=True)
raise typer.Exit(code=1)
if not legislators_dir.is_dir():
typer.echo(
f"Expected congress-legislators/ directory: {legislators_dir}", err=True
)
raise typer.Exit(code=1)
engine = get_postgres_engine(name="DATA_SCIENCE_DEV")
congress_dirs = _resolve_congress_dirs(data_dir, congress)
if not congress_dirs:
typer.echo("No congress directories found.", err=True)
raise typer.Exit(code=1)
logger.info("Found %d congress directories to process", len(congress_dirs))
steps: dict[str, tuple] = {
"legislators": (ingest_legislators, (engine, legislators_dir)),
"legislators-social-media": (ingest_social_media, (engine, legislators_dir)),
"bills": (ingest_bills, (engine, congress_dirs)),
"votes": (ingest_votes, (engine, congress_dirs)),
"bill-text": (ingest_bill_text, (engine, congress_dirs)),
}
if only:
if only not in steps:
typer.echo(
f"Unknown step: {only}. Choose from: {', '.join(steps)}", err=True
)
raise typer.Exit(code=1)
steps = {only: steps[only]}
for step_name, (step_func, step_args) in steps.items():
logger.info("=== Starting step: %s ===", step_name)
step_func(*step_args)
logger.info("=== Finished step: %s ===", step_name)
logger.info("ingest-congress done")
def _resolve_congress_dirs(data_dir: Path, congress: int | None) -> list[Path]:
"""Find congress number directories under data_dir."""
if congress is not None:
target = data_dir / str(congress)
return [target] if target.is_dir() else []
return sorted(
path for path in data_dir.iterdir() if path.is_dir() and path.name.isdigit()
)
def _flush_batch(session: Session, batch: list[object], label: str) -> int:
"""Add a batch of ORM objects to the session and commit. Returns count added."""
if not batch:
return 0
session.add_all(batch)
session.commit()
count = len(batch)
logger.info("Committed %d %s", count, label)
batch.clear()
return count
# ---------------------------------------------------------------------------
# Legislators — loaded from congress-legislators YAML files
# ---------------------------------------------------------------------------
def ingest_legislators(engine: Engine, legislators_dir: Path) -> None:
"""Load legislators from congress-legislators YAML files."""
legislators_data = _load_legislators_yaml(legislators_dir)
logger.info("Loaded %d legislators from YAML files", len(legislators_data))
with Session(engine) as session:
existing_legislators = {
legislator.bioguide_id: legislator
for legislator in session.scalars(select(Legislator)).all()
}
logger.info("Found %d existing legislators in DB", len(existing_legislators))
total_inserted = 0
total_updated = 0
for entry in legislators_data:
bioguide_id = entry.get("id", {}).get("bioguide")
if not bioguide_id:
continue
fields = _parse_legislator(entry)
if existing := existing_legislators.get(bioguide_id):
changed = False
for field, value in fields.items():
if value is not None and getattr(existing, field) != value:
setattr(existing, field, value)
changed = True
if changed:
total_updated += 1
else:
session.add(Legislator(bioguide_id=bioguide_id, **fields))
total_inserted += 1
session.commit()
logger.info(
"Inserted %d new legislators, updated %d existing",
total_inserted,
total_updated,
)
def _load_legislators_yaml(legislators_dir: Path) -> list[dict]:
"""Load and combine legislators-current.yaml and legislators-historical.yaml."""
legislators: list[dict] = []
for filename in ("legislators-current.yaml", "legislators-historical.yaml"):
path = legislators_dir / filename
if not path.exists():
logger.warning("Legislators file not found: %s", path)
continue
with path.open() as file:
data = yaml.safe_load(file)
if isinstance(data, list):
legislators.extend(data)
return legislators
def _parse_legislator(entry: dict) -> dict:
"""Extract Legislator fields from a congress-legislators YAML entry."""
ids = entry.get("id", {})
name = entry.get("name", {})
bio = entry.get("bio", {})
terms = entry.get("terms", [])
latest_term = terms[-1] if terms else {}
fec_ids = ids.get("fec")
fec_ids_joined = ",".join(fec_ids) if isinstance(fec_ids, list) else fec_ids
chamber = latest_term.get("type")
chamber_normalized = {"rep": "House", "sen": "Senate"}.get(chamber, chamber)
return {
"thomas_id": ids.get("thomas"),
"lis_id": ids.get("lis"),
"govtrack_id": ids.get("govtrack"),
"opensecrets_id": ids.get("opensecrets"),
"fec_ids": fec_ids_joined,
"first_name": name.get("first"),
"last_name": name.get("last"),
"official_full_name": name.get("official_full"),
"nickname": name.get("nickname"),
"birthday": bio.get("birthday"),
"gender": bio.get("gender"),
"current_party": latest_term.get("party"),
"current_state": latest_term.get("state"),
"current_district": latest_term.get("district"),
"current_chamber": chamber_normalized,
}
# ---------------------------------------------------------------------------
# Social Media — loaded from legislators-social-media.yaml
# ---------------------------------------------------------------------------
SOCIAL_MEDIA_PLATFORMS = {
"twitter": "https://twitter.com/{account}",
"facebook": "https://facebook.com/{account}",
"youtube": "https://youtube.com/{account}",
"instagram": "https://instagram.com/{account}",
"mastodon": None,
}
def ingest_social_media(engine: Engine, legislators_dir: Path) -> None:
"""Load social media accounts from legislators-social-media.yaml."""
social_media_path = legislators_dir / "legislators-social-media.yaml"
if not social_media_path.exists():
logger.warning("Social media file not found: %s", social_media_path)
return
with social_media_path.open() as file:
social_media_data = yaml.safe_load(file)
if not isinstance(social_media_data, list):
logger.warning("Unexpected format in %s", social_media_path)
return
logger.info(
"Loaded %d entries from legislators-social-media.yaml", len(social_media_data)
)
with Session(engine) as session:
legislator_map = _build_legislator_map(session)
existing_accounts = {
(account.legislator_id, account.platform)
for account in session.scalars(select(LegislatorSocialMedia)).all()
}
logger.info(
"Found %d existing social media accounts in DB", len(existing_accounts)
)
total_inserted = 0
total_updated = 0
for entry in social_media_data:
bioguide_id = entry.get("id", {}).get("bioguide")
if not bioguide_id:
continue
legislator_id = legislator_map.get(bioguide_id)
if legislator_id is None:
continue
social = entry.get("social", {})
for platform, url_template in SOCIAL_MEDIA_PLATFORMS.items():
account_name = social.get(platform)
if not account_name:
continue
url = (
url_template.format(account=account_name) if url_template else None
)
if (legislator_id, platform) in existing_accounts:
total_updated += 1
else:
session.add(
LegislatorSocialMedia(
legislator_id=legislator_id,
platform=platform,
account_name=str(account_name),
url=url,
source="https://github.com/unitedstates/congress-legislators",
)
)
existing_accounts.add((legislator_id, platform))
total_inserted += 1
session.commit()
logger.info(
"Inserted %d new social media accounts, updated %d existing",
total_inserted,
total_updated,
)
def _iter_voters(position_group: object) -> Iterator[dict]:
"""Yield voter dicts from a vote position group (handles list, single dict, or string)."""
if isinstance(position_group, dict):
yield position_group
elif isinstance(position_group, list):
for voter in position_group:
if isinstance(voter, dict):
yield voter
# ---------------------------------------------------------------------------
# Bills
# ---------------------------------------------------------------------------
def ingest_bills(engine: Engine, congress_dirs: list[Path]) -> None:
"""Load bill data.json files."""
with Session(engine) as session:
existing_bills = {
(bill.congress, bill.bill_type, bill.number)
for bill in session.scalars(select(Bill)).all()
}
logger.info("Found %d existing bills in DB", len(existing_bills))
total_inserted = 0
batch: list[Bill] = []
for congress_dir in congress_dirs:
bills_dir = congress_dir / "bills"
if not bills_dir.is_dir():
continue
logger.info("Scanning bills from %s", congress_dir.name)
for bill_file in bills_dir.rglob("data.json"):
data = _read_json(bill_file)
if data is None:
continue
bill = _parse_bill(data, existing_bills)
if bill is not None:
batch.append(bill)
if len(batch) >= BATCH_SIZE:
total_inserted += _flush_batch(session, batch, "bills")
total_inserted += _flush_batch(session, batch, "bills")
logger.info("Inserted %d new bills total", total_inserted)
def _parse_bill(data: dict, existing_bills: set[tuple[int, str, int]]) -> Bill | None:
"""Parse a bill data.json dict into a Bill ORM object, skipping existing."""
raw_congress = data.get("congress")
bill_type = data.get("bill_type")
raw_number = data.get("number")
if raw_congress is None or bill_type is None or raw_number is None:
return None
congress = int(raw_congress)
number = int(raw_number)
if (congress, bill_type, number) in existing_bills:
return None
sponsor_bioguide = None
sponsor = data.get("sponsor")
if sponsor:
sponsor_bioguide = sponsor.get("bioguide_id")
return Bill(
congress=congress,
bill_type=bill_type,
number=number,
title=data.get("short_title") or data.get("official_title"),
title_short=data.get("short_title"),
official_title=data.get("official_title"),
status=data.get("status"),
status_at=data.get("status_at"),
sponsor_bioguide_id=sponsor_bioguide,
subjects_top_term=data.get("subjects_top_term"),
)
# ---------------------------------------------------------------------------
# Votes (and vote records)
# ---------------------------------------------------------------------------
def ingest_votes(engine: Engine, congress_dirs: list[Path]) -> None:
"""Load vote data.json files with their vote records."""
with Session(engine) as session:
legislator_map = _build_legislator_map(session)
logger.info("Loaded %d legislators into lookup map", len(legislator_map))
bill_map = _build_bill_map(session)
logger.info("Loaded %d bills into lookup map", len(bill_map))
existing_votes = {
(vote.congress, vote.chamber, vote.session, vote.number)
for vote in session.scalars(select(Vote)).all()
}
logger.info("Found %d existing votes in DB", len(existing_votes))
total_inserted = 0
batch: list[Vote] = []
for congress_dir in congress_dirs:
votes_dir = congress_dir / "votes"
if not votes_dir.is_dir():
continue
logger.info("Scanning votes from %s", congress_dir.name)
for vote_file in votes_dir.rglob("data.json"):
data = _read_json(vote_file)
if data is None:
continue
vote = _parse_vote(data, legislator_map, bill_map, existing_votes)
if vote is not None:
batch.append(vote)
if len(batch) >= BATCH_SIZE:
total_inserted += _flush_batch(session, batch, "votes")
total_inserted += _flush_batch(session, batch, "votes")
logger.info("Inserted %d new votes total", total_inserted)
def _build_legislator_map(session: Session) -> dict[str, int]:
"""Build a mapping of bioguide_id -> legislator.id."""
return {
legislator.bioguide_id: legislator.id
for legislator in session.scalars(select(Legislator)).all()
}
def _build_bill_map(session: Session) -> dict[tuple[int, str, int], int]:
"""Build a mapping of (congress, bill_type, number) -> bill.id."""
return {
(bill.congress, bill.bill_type, bill.number): bill.id
for bill in session.scalars(select(Bill)).all()
}
def _parse_vote(
data: dict,
legislator_map: dict[str, int],
bill_map: dict[tuple[int, str, int], int],
existing_votes: set[tuple[int, str, int, int]],
) -> Vote | None:
"""Parse a vote data.json dict into a Vote ORM object with records."""
raw_congress = data.get("congress")
chamber = data.get("chamber")
raw_number = data.get("number")
vote_date = data.get("date")
if (
raw_congress is None
or chamber is None
or raw_number is None
or vote_date is None
):
return None
raw_session = data.get("session")
if raw_session is None:
return None
congress = int(raw_congress)
number = int(raw_number)
session_number = int(raw_session)
# Normalize chamber from "h"/"s" to "House"/"Senate"
chamber_normalized = {"h": "House", "s": "Senate"}.get(chamber, chamber)
if (congress, chamber_normalized, session_number, number) in existing_votes:
return None
# Resolve linked bill
bill_id = None
bill_ref = data.get("bill")
if bill_ref:
bill_key = (
int(bill_ref.get("congress", congress)),
bill_ref.get("type"),
int(bill_ref.get("number", 0)),
)
bill_id = bill_map.get(bill_key)
raw_votes = data.get("votes", {})
vote_counts = _count_votes(raw_votes)
vote_records = _build_vote_records(raw_votes, legislator_map)
return Vote(
congress=congress,
chamber=chamber_normalized,
session=session_number,
number=number,
vote_type=data.get("type"),
question=data.get("question"),
result=data.get("result"),
result_text=data.get("result_text"),
vote_date=vote_date[:10] if isinstance(vote_date, str) else vote_date,
bill_id=bill_id,
vote_records=vote_records,
**vote_counts,
)
def _count_votes(raw_votes: dict) -> dict[str, int]:
"""Count voters per position category, correctly handling dict and list formats."""
yea_count = 0
nay_count = 0
not_voting_count = 0
present_count = 0
for position, position_group in raw_votes.items():
voter_count = sum(1 for _ in _iter_voters(position_group))
if position in ("Yea", "Aye"):
yea_count += voter_count
elif position in ("Nay", "No"):
nay_count += voter_count
elif position == "Not Voting":
not_voting_count += voter_count
elif position == "Present":
present_count += voter_count
return {
"yea_count": yea_count,
"nay_count": nay_count,
"not_voting_count": not_voting_count,
"present_count": present_count,
}
def _build_vote_records(
raw_votes: dict, legislator_map: dict[str, int]
) -> list[VoteRecord]:
"""Build VoteRecord objects from raw vote data."""
records: list[VoteRecord] = []
for position, position_group in raw_votes.items():
for voter in _iter_voters(position_group):
bioguide_id = voter.get("id")
if not bioguide_id:
continue
legislator_id = legislator_map.get(bioguide_id)
if legislator_id is None:
continue
records.append(
VoteRecord(
legislator_id=legislator_id,
position=position,
)
)
return records
# ---------------------------------------------------------------------------
# Bill Text
# ---------------------------------------------------------------------------
def ingest_bill_text(engine: Engine, congress_dirs: list[Path]) -> None:
"""Load bill text from text-versions directories."""
with Session(engine) as session:
bill_map = _build_bill_map(session)
logger.info("Loaded %d bills into lookup map", len(bill_map))
existing_bill_texts = {
(bill_text.bill_id, bill_text.version_code)
for bill_text in session.scalars(select(BillText)).all()
}
logger.info(
"Found %d existing bill text versions in DB", len(existing_bill_texts)
)
total_inserted = 0
batch: list[BillText] = []
for congress_dir in congress_dirs:
logger.info("Scanning bill texts from %s", congress_dir.name)
for bill_text in _iter_bill_texts(
congress_dir, bill_map, existing_bill_texts
):
batch.append(bill_text)
if len(batch) >= BATCH_SIZE:
total_inserted += _flush_batch(session, batch, "bill texts")
total_inserted += _flush_batch(session, batch, "bill texts")
logger.info("Inserted %d new bill text versions total", total_inserted)
def _iter_bill_texts(
congress_dir: Path,
bill_map: dict[tuple[int, str, int], int],
existing_bill_texts: set[tuple[int, str]],
) -> Iterator[BillText]:
"""Yield BillText objects for a single congress directory, skipping existing."""
bills_dir = congress_dir / "bills"
if not bills_dir.is_dir():
return
for bill_dir in bills_dir.rglob("text-versions"):
if not bill_dir.is_dir():
continue
bill_key = _bill_key_from_dir(bill_dir.parent, congress_dir)
if bill_key is None:
continue
bill_id = bill_map.get(bill_key)
if bill_id is None:
continue
for version_dir in sorted(bill_dir.iterdir()):
if not version_dir.is_dir():
continue
if (bill_id, version_dir.name) in existing_bill_texts:
continue
text_content = _read_bill_text(version_dir)
version_data = _read_json(version_dir / "data.json")
yield BillText(
bill_id=bill_id,
version_code=version_dir.name,
version_name=version_data.get("version_name") if version_data else None,
date=version_data.get("issued_on") if version_data else None,
text_content=text_content,
)
def _bill_key_from_dir(
bill_dir: Path, congress_dir: Path
) -> tuple[int, str, int] | None:
"""Extract (congress, bill_type, number) from directory structure."""
congress = int(congress_dir.name)
bill_type = bill_dir.parent.name
name = bill_dir.name
# Directory name is like "hr3590" — strip the type prefix to get the number
number_str = name[len(bill_type) :]
if not number_str.isdigit():
return None
return (congress, bill_type, int(number_str))
def _read_bill_text(version_dir: Path) -> str | None:
"""Read bill text from a version directory, preferring .txt over .xml."""
for extension in ("txt", "htm", "html", "xml"):
candidates = list(version_dir.glob(f"document.{extension}"))
if not candidates:
candidates = list(version_dir.glob(f"*.{extension}"))
if candidates:
try:
return candidates[0].read_text(encoding="utf-8")
except Exception:
logger.exception("Failed to read %s", candidates[0])
return None
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _read_json(path: Path) -> dict | None:
"""Read and parse a JSON file, returning None on failure."""
try:
return orjson.loads(path.read_bytes())
except FileNotFoundError:
return None
except Exception:
logger.exception("Failed to parse %s", path)
return None
if __name__ == "__main__":
app()
-281
View File
@@ -1,281 +0,0 @@
"""Ingestion pipeline for loading JSONL post files into the weekly-partitioned posts table.
Usage:
ingest-posts /path/to/files/
ingest-posts /path/to/single_file.jsonl
ingest-posts /data/dir/ --workers 4 --batch-size 5000
"""
from __future__ import annotations
import logging
from datetime import UTC, datetime
from pathlib import Path # noqa: TC003 this is needed for typer
from typing import TYPE_CHECKING, Annotated
import orjson
import psycopg
import typer
from pipelines.pipelines.common import configure_logger
from pipelines.orm.common import get_connection_info
from pipelines.pipelines.parallelize import parallelize_process
if TYPE_CHECKING:
from collections.abc import Iterator
logger = logging.getLogger(__name__)
app = typer.Typer(help="Ingest JSONL post files into the partitioned posts table.")
@app.command()
def main(
path: Annotated[
Path,
typer.Argument(help="Directory containing JSONL files, or a single JSONL file"),
],
batch_size: Annotated[int, typer.Option(help="Rows per INSERT batch")] = 10000,
workers: Annotated[
int, typer.Option(help="Parallel workers for multi-file ingestion")
] = 4,
pattern: Annotated[
str, typer.Option(help="Glob pattern for JSONL files")
] = "*.jsonl",
) -> None:
"""Ingest JSONL post files into the weekly-partitioned posts table."""
configure_logger(level="INFO")
logger.info("starting ingest-posts")
logger.info(
"path=%s batch_size=%d workers=%d pattern=%s",
path,
batch_size,
workers,
pattern,
)
if path.is_file():
ingest_file(path, batch_size=batch_size)
elif path.is_dir():
ingest_directory(
path, batch_size=batch_size, max_workers=workers, pattern=pattern
)
else:
typer.echo(f"Path does not exist: {path}", err=True)
raise typer.Exit(code=1)
logger.info("ingest-posts done")
def ingest_directory(
directory: Path,
*,
batch_size: int,
max_workers: int,
pattern: str = "*.jsonl",
) -> None:
"""Ingest all JSONL files in a directory using parallel workers."""
files = sorted(directory.glob(pattern))
if not files:
logger.warning("No JSONL files found in %s", directory)
return
logger.info("Found %d JSONL files to ingest", len(files))
kwargs_list = [{"path": fp, "batch_size": batch_size} for fp in files]
parallelize_process(ingest_file, kwargs_list, max_workers=max_workers)
SCHEMA = "main"
COLUMNS = (
"post_id",
"user_id",
"instance",
"date",
"text",
"langs",
"like_count",
"reply_count",
"repost_count",
"reply_to",
"replied_author",
"thread_root",
"thread_root_author",
"repost_from",
"reposted_author",
"quotes",
"quoted_author",
"labels",
"sent_label",
"sent_score",
)
INSERT_FROM_STAGING = f"""
INSERT INTO {SCHEMA}.posts ({", ".join(COLUMNS)})
SELECT {", ".join(COLUMNS)} FROM pg_temp.staging
ON CONFLICT (post_id, date) DO NOTHING
""" # noqa: S608
FAILED_INSERT = f"""
INSERT INTO {SCHEMA}.failed_ingestion (raw_line, error)
VALUES (%(raw_line)s, %(error)s)
""" # noqa: S608
def get_psycopg_connection() -> psycopg.Connection:
"""Create a raw psycopg3 connection from environment variables."""
database, host, port, username, password = get_connection_info("DATA_SCIENCE_DEV")
return psycopg.connect(
dbname=database,
host=host,
port=int(port),
user=username,
password=password,
autocommit=False,
)
def ingest_file(path: Path, *, batch_size: int) -> None:
"""Ingest a single JSONL file into the posts table."""
log_trigger = max(100_000 // batch_size, 1)
failed_lines: list[dict] = []
try:
with get_psycopg_connection() as connection:
for index, batch in enumerate(
read_jsonl_batches(path, batch_size, failed_lines), 1
):
ingest_batch(connection, batch)
if index % log_trigger == 0:
logger.info(
"Ingested %d batches (%d rows) from %s",
index,
index * batch_size,
path,
)
if failed_lines:
logger.warning(
"Recording %d malformed lines from %s", len(failed_lines), path.name
)
with connection.cursor() as cursor:
cursor.executemany(FAILED_INSERT, failed_lines)
connection.commit()
except Exception:
logger.exception("Failed to ingest file: %s", path)
raise
def ingest_batch(connection: psycopg.Connection, batch: list[dict]) -> None:
"""COPY batch into a temp staging table, then INSERT ... ON CONFLICT into posts."""
if not batch:
return
try:
with connection.cursor() as cursor:
cursor.execute(f"""
CREATE TEMP TABLE IF NOT EXISTS staging
(LIKE {SCHEMA}.posts INCLUDING DEFAULTS)
ON COMMIT DELETE ROWS
""")
cursor.execute("TRUNCATE pg_temp.staging")
with cursor.copy(
f"COPY pg_temp.staging ({', '.join(COLUMNS)}) FROM STDIN"
) as copy:
for row in batch:
copy.write_row(tuple(row.get(column) for column in COLUMNS))
cursor.execute(INSERT_FROM_STAGING)
connection.commit()
except Exception as error:
connection.rollback()
if len(batch) == 1:
logger.exception("Skipping bad row post_id=%s", batch[0].get("post_id"))
with connection.cursor() as cursor:
cursor.execute(
FAILED_INSERT,
{
"raw_line": orjson.dumps(batch[0], default=str).decode(),
"error": str(error),
},
)
connection.commit()
return
midpoint = len(batch) // 2
ingest_batch(connection, batch[:midpoint])
ingest_batch(connection, batch[midpoint:])
def read_jsonl_batches(
file_path: Path, batch_size: int, failed_lines: list[dict]
) -> Iterator[list[dict]]:
"""Stream a JSONL file and yield batches of transformed rows."""
batch: list[dict] = []
with file_path.open("r", encoding="utf-8") as handle:
for raw_line in handle:
line = raw_line.strip()
if not line:
continue
batch.extend(parse_line(line, file_path, failed_lines))
if len(batch) >= batch_size:
yield batch
batch = []
if batch:
yield batch
def parse_line(line: str, file_path: Path, failed_lines: list[dict]) -> Iterator[dict]:
"""Parse a JSONL line, handling concatenated JSON objects."""
try:
yield transform_row(orjson.loads(line))
except orjson.JSONDecodeError:
if "}{" not in line:
logger.warning(
"Skipping malformed line in %s: %s", file_path.name, line[:120]
)
failed_lines.append({"raw_line": line, "error": "malformed JSON"})
return
fragments = line.replace("}{", "}\n{").split("\n")
for fragment in fragments:
try:
yield transform_row(orjson.loads(fragment))
except (orjson.JSONDecodeError, KeyError, ValueError) as error:
logger.warning(
"Skipping malformed fragment in %s: %s",
file_path.name,
fragment[:120],
)
failed_lines.append({"raw_line": fragment, "error": str(error)})
except Exception as error:
logger.exception("Skipping bad row in %s: %s", file_path.name, line[:120])
failed_lines.append({"raw_line": line, "error": str(error)})
def transform_row(raw: dict) -> dict:
"""Transform a raw JSONL row into a dict matching the Posts table columns."""
raw["date"] = parse_date(raw["date"])
if raw.get("langs") is not None:
raw["langs"] = orjson.dumps(raw["langs"])
if raw.get("text") is not None:
raw["text"] = raw["text"].replace("\x00", "")
return raw
def parse_date(raw_date: int) -> datetime:
"""Parse compact YYYYMMDDHHmm integer into a naive datetime (input is UTC by spec)."""
return datetime(
raw_date // 100000000,
(raw_date // 1000000) % 100,
(raw_date // 10000) % 100,
(raw_date // 100) % 100,
raw_date % 100,
tzinfo=UTC,
)
if __name__ == "__main__":
app()
@@ -2,25 +2,85 @@
from pipelines.orm.data_science_dev.congress.bill import (
Bill,
BillAction,
BillActionRecordedVote,
BillRelation,
BillText,
BillTopic,
BillTopicPosition,
)
from pipelines.orm.data_science_dev.congress.amendment import (
Amendment,
AmendmentAction,
AmendmentActionRecordedVote,
)
from pipelines.orm.data_science_dev.congress.context import (
ClassificationMethod,
ConfidenceLevel,
IngestRun,
MeasureFunction,
MeasureSubtype,
ScoreRun,
SourceArtifact,
SubjectType,
TextResolutionMethod,
TextTargetBasis,
TextTargetType,
VoteActionMatch,
VoteActionScope,
VoteClassification,
VoteContextAudit,
VoteEffect,
VoteMeasureLink,
VoteMeasureRole,
VotePositionMeaning,
VoteRelationship,
VoteTextTarget,
)
from pipelines.orm.data_science_dev.congress.legislator import (
Legislator,
LegislatorScore,
LegislatorSocialMedia,
LegislatorScoreFake,
)
from pipelines.orm.data_science_dev.congress.vote import Vote, VoteRecord
__all__ = [
"Amendment",
"AmendmentAction",
"AmendmentActionRecordedVote",
"Bill",
"BillAction",
"BillActionRecordedVote",
"BillRelation",
"BillText",
"BillTopic",
"BillTopicPosition",
"ClassificationMethod",
"ConfidenceLevel",
"IngestRun",
"Legislator",
"LegislatorScore",
"LegislatorScoreFake",
"LegislatorSocialMedia",
"MeasureFunction",
"MeasureSubtype",
"ScoreRun",
"SourceArtifact",
"SubjectType",
"TextResolutionMethod",
"TextTargetBasis",
"TextTargetType",
"Vote",
"VoteActionMatch",
"VoteActionScope",
"VoteClassification",
"VoteContextAudit",
"VoteEffect",
"VoteMeasureLink",
"VoteMeasureRole",
"VotePositionMeaning",
"VoteRelationship",
"VoteRecord",
"VoteTextTarget",
]
@@ -0,0 +1,127 @@
"""Amendment models and official action context."""
from __future__ import annotations
from datetime import date, datetime
from sqlalchemy import DateTime, ForeignKey, Index, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from pipelines.orm.data_science_dev.base import DataScienceDevTableBase
class Amendment(DataScienceDevTableBase):
"""Congressional amendment linked to a bill or to another amendment."""
__tablename__ = "amendment"
__table_args__ = (
UniqueConstraint(
"congress",
"amendment_type",
"number",
name="uq_amendment_congress_type_number",
),
)
congress: Mapped[int]
amendment_type: Mapped[str]
number: Mapped[int]
chamber: Mapped[str]
description: Mapped[str | None]
purpose: Mapped[str | None]
amended_bill_id: Mapped[int | None] = mapped_column(
ForeignKey("main.bill.id", ondelete="SET NULL")
)
amended_amendment_id: Mapped[int | None] = mapped_column(
ForeignKey("main.amendment.id", ondelete="SET NULL")
)
source_path: Mapped[str | None]
source_artifact_id: Mapped[int | None] = mapped_column(
ForeignKey("main.source_artifact.id", ondelete="SET NULL")
)
actions: Mapped[list[AmendmentAction]] = relationship(
"AmendmentAction",
back_populates="amendment",
cascade="all, delete-orphan",
)
amended_amendment: Mapped[Amendment | None] = relationship(
"Amendment",
remote_side="Amendment.id",
)
class AmendmentAction(DataScienceDevTableBase):
"""Official action row for an amendment."""
__tablename__ = "amendment_action"
__table_args__ = (
UniqueConstraint(
"amendment_id",
"sequence",
name="uq_amendment_action_amendment_id_sequence",
),
)
amendment_id: Mapped[int] = mapped_column(
ForeignKey("main.amendment.id", ondelete="CASCADE")
)
sequence: Mapped[int]
action_date: Mapped[date]
action_time: Mapped[str | None]
action_text: Mapped[str]
action_type: Mapped[str | None]
action_code: Mapped[str | None]
source_system_code: Mapped[str | None]
source_system_name: Mapped[str | None]
source_artifact_id: Mapped[int | None] = mapped_column(
ForeignKey("main.source_artifact.id", ondelete="SET NULL")
)
amendment: Mapped[Amendment] = relationship(
"Amendment",
back_populates="actions",
)
recorded_votes: Mapped[list[AmendmentActionRecordedVote]] = relationship(
"AmendmentActionRecordedVote",
back_populates="amendment_action",
cascade="all, delete-orphan",
)
class AmendmentActionRecordedVote(DataScienceDevTableBase):
"""Recorded vote nested under one official amendment action."""
__tablename__ = "amendment_action_recorded_vote"
__table_args__ = (
UniqueConstraint(
"amendment_action_id",
"congress",
"chamber",
"session_number",
"roll_number",
name="uq_amendment_action_recorded_vote_match_key",
),
Index(
"ix_amendment_action_recorded_vote_match_tuple",
"congress",
"chamber",
"session_number",
"roll_number",
),
)
amendment_action_id: Mapped[int] = mapped_column(
ForeignKey("main.amendment_action.id", ondelete="CASCADE")
)
congress: Mapped[int]
chamber: Mapped[str]
session_number: Mapped[int]
roll_number: Mapped[int]
vote_datetime: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
vote_url: Mapped[str | None]
amendment_action: Mapped[AmendmentAction] = relationship(
"AmendmentAction",
back_populates="recorded_votes",
)
+140 -13
View File
@@ -1,4 +1,4 @@
"""Bill model - legislation introduced in Congress."""
"""Bill models for legislation, official actions, text versions, and topic tags."""
from __future__ import annotations
@@ -12,7 +12,7 @@ from sqlalchemy.orm import Mapped, mapped_column, relationship
from pipelines.orm.data_science_dev.base import DataScienceDevTableBase
if TYPE_CHECKING:
from pipelines.orm.data_science_dev.congress.vote import Vote
from pipelines.orm.data_science_dev.congress.context import VoteMeasureLink
class BillTopicPosition(StrEnum):
@@ -22,6 +22,17 @@ class BillTopicPosition(StrEnum):
AGAINST = "against"
def _enum_column(enum_cls: type[StrEnum], *, name: str) -> Enum:
"""Build a portable SQLAlchemy enum column for StrEnum values."""
return Enum(
enum_cls,
values_callable=lambda enum_type: [member.value for member in enum_type],
native_enum=False,
name=name,
)
class Bill(DataScienceDevTableBase):
"""Legislation with congress number, type, titles, status, and sponsor."""
@@ -49,10 +60,6 @@ class Bill(DataScienceDevTableBase):
subjects_top_term: Mapped[str | None]
score_processed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
votes: Mapped[list[Vote]] = relationship(
"Vote",
back_populates="bill",
)
bill_texts: Mapped[list[BillText]] = relationship(
"BillText",
back_populates="bill",
@@ -63,6 +70,28 @@ class Bill(DataScienceDevTableBase):
back_populates="bill",
cascade="all, delete-orphan",
)
bill_actions: Mapped[list[BillAction]] = relationship(
"BillAction",
back_populates="bill",
cascade="all, delete-orphan",
)
outgoing_bill_relations: Mapped[list[BillRelation]] = relationship(
"BillRelation",
foreign_keys="BillRelation.bill_id",
back_populates="bill",
cascade="all, delete-orphan",
)
incoming_bill_relations: Mapped[list[BillRelation]] = relationship(
"BillRelation",
foreign_keys="BillRelation.related_bill_id",
back_populates="related_bill",
cascade="all, delete-orphan",
)
vote_measure_links: Mapped[list[VoteMeasureLink]] = relationship(
"VoteMeasureLink",
back_populates="measure",
cascade="all, delete-orphan",
)
class BillText(DataScienceDevTableBase):
@@ -84,10 +113,113 @@ class BillText(DataScienceDevTableBase):
summarization_user_prompt_version: Mapped[str | None]
summarization_system_prompt_version: Mapped[str | None]
date: Mapped[date | None]
source_datetime_raw: Mapped[str | None]
text_url_xml: Mapped[str | None]
text_url_pdf: Mapped[str | None]
text_url_html: Mapped[str | None]
source_artifact_id: Mapped[int | None] = mapped_column(
ForeignKey("main.source_artifact.id", ondelete="SET NULL")
)
bill: Mapped[Bill] = relationship("Bill", back_populates="bill_texts")
# suport multipu summary prer bill
class BillAction(DataScienceDevTableBase):
"""Official action row from Bill Status XML."""
__tablename__ = "bill_action"
__table_args__ = (
UniqueConstraint("bill_id", "sequence", name="uq_bill_action_bill_id_sequence"),
)
bill_id: Mapped[int] = mapped_column(ForeignKey("main.bill.id", ondelete="CASCADE"))
sequence: Mapped[int]
action_date: Mapped[date]
action_time: Mapped[str | None]
action_text: Mapped[str]
action_type: Mapped[str | None]
action_code: Mapped[str | None]
source_system_code: Mapped[str | None]
source_system_name: Mapped[str | None]
source_artifact_id: Mapped[int | None] = mapped_column(
ForeignKey("main.source_artifact.id", ondelete="SET NULL")
)
bill: Mapped[Bill] = relationship("Bill", back_populates="bill_actions")
recorded_votes: Mapped[list[BillActionRecordedVote]] = relationship(
"BillActionRecordedVote",
back_populates="bill_action",
cascade="all, delete-orphan",
)
class BillActionRecordedVote(DataScienceDevTableBase):
"""Recorded vote nested under one official bill action."""
__tablename__ = "bill_action_recorded_vote"
__table_args__ = (
UniqueConstraint(
"bill_action_id",
"congress",
"chamber",
"session_number",
"roll_number",
name="uq_bill_action_recorded_vote_match_key",
),
Index(
"ix_bill_action_recorded_vote_match_tuple",
"congress",
"chamber",
"session_number",
"roll_number",
),
)
bill_action_id: Mapped[int] = mapped_column(
ForeignKey("main.bill_action.id", ondelete="CASCADE")
)
congress: Mapped[int]
chamber: Mapped[str]
session_number: Mapped[int]
roll_number: Mapped[int]
vote_datetime: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
vote_url: Mapped[str | None]
bill_action: Mapped[BillAction] = relationship(
"BillAction",
back_populates="recorded_votes",
)
class BillRelation(DataScienceDevTableBase):
"""Relationship between one bill/resolution and another."""
__tablename__ = "bill_relation"
__table_args__ = (
Index("ix_bill_relation_bill_id", "bill_id"),
Index("ix_bill_relation_related_bill_id", "related_bill_id"),
)
bill_id: Mapped[int] = mapped_column(ForeignKey("main.bill.id", ondelete="CASCADE"))
related_bill_id: Mapped[int] = mapped_column(
ForeignKey("main.bill.id", ondelete="CASCADE")
)
relationship_type: Mapped[str]
identified_by: Mapped[str | None]
latest_action_date: Mapped[date | None]
latest_action_text: Mapped[str | None]
bill: Mapped[Bill] = relationship(
"Bill",
foreign_keys=[bill_id],
back_populates="outgoing_bill_relations",
)
related_bill: Mapped[Bill] = relationship(
"Bill",
foreign_keys=[related_bill_id],
back_populates="incoming_bill_relations",
)
class BillTopic(DataScienceDevTableBase):
"""One bill stance on one topic used to score roll-call votes."""
@@ -106,12 +238,7 @@ class BillTopic(DataScienceDevTableBase):
bill_id: Mapped[int] = mapped_column(ForeignKey("main.bill.id", ondelete="CASCADE"))
topic: Mapped[str]
support_position: Mapped[BillTopicPosition] = mapped_column(
Enum(
BillTopicPosition,
values_callable=lambda enum_cls: [member.value for member in enum_cls],
native_enum=False,
name="bill_topic_position",
)
_enum_column(BillTopicPosition, name="bill_topic_position")
)
bill: Mapped[Bill] = relationship("Bill", back_populates="topics")
@@ -0,0 +1,462 @@
"""Canonical vote context, artifact tracking, and run metadata models."""
from __future__ import annotations
from datetime import datetime
from enum import StrEnum
from typing import TYPE_CHECKING
from sqlalchemy import DateTime, Enum, ForeignKey, Index, func, text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from pipelines.orm.data_science_dev.base import DataScienceDevTableBase
if TYPE_CHECKING:
from pipelines.orm.data_science_dev.congress.amendment import Amendment, AmendmentAction
from pipelines.orm.data_science_dev.congress.bill import Bill, BillAction, BillText
from pipelines.orm.data_science_dev.congress.legislator import LegislatorScore
from pipelines.orm.data_science_dev.congress.vote import Vote
def _enum_column(enum_cls: type[StrEnum], *, name: str) -> Enum:
"""Build a portable SQLAlchemy enum column for StrEnum values."""
return Enum(
enum_cls,
values_callable=lambda enum_type: [member.value for member in enum_type],
native_enum=False,
name=name,
)
class ConfidenceLevel(StrEnum):
"""Low/medium/high confidence buckets."""
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
class VoteActionScope(StrEnum):
"""Whether a matched action came from bill or amendment context."""
BILL = "bill"
AMENDMENT = "amendment"
class SubjectType(StrEnum):
"""The direct legal/procedural subject of the vote."""
MEASURE = "measure"
AMENDMENT = "amendment"
NOMINATION = "nomination"
TREATY = "treaty"
QUORUM = "quorum"
CHAMBER_ADMIN = "chamber_admin"
UNKNOWN = "unknown"
class MeasureSubtype(StrEnum):
"""Formal congressional measure subtype."""
BILL = "bill"
JOINT_RESOLUTION = "joint_resolution"
CONCURRENT_RESOLUTION = "concurrent_resolution"
SIMPLE_RESOLUTION = "simple_resolution"
class MeasureFunction(StrEnum):
"""Semantic function of a measure beyond its formal subtype."""
SUBSTANTIVE_MEASURE = "substantive_measure"
SPECIAL_RULE = "special_rule"
BUDGET_RESOLUTION = "budget_resolution"
CHAMBER_INTERNAL = "chamber_internal"
COMMEMORATIVE_OR_SENSE_OF = "commemorative_or_sense_of"
UNKNOWN = "unknown"
class VoteRelationship(StrEnum):
"""The vote's relationship to the direct subject and its text."""
DIRECT_TEXT_VOTE = "direct_text_vote"
AMENDMENT_TEXT_VOTE = "amendment_text_vote"
PROCEDURAL_RELATED_TO_MEASURE = "procedural_related_to_measure"
PROCEDURAL_RELATED_TO_AMENDMENT = "procedural_related_to_amendment"
NON_LEGISLATIVE = "non_legislative"
UNKNOWN = "unknown"
class ClassificationMethod(StrEnum):
"""How the final classification was derived."""
RECORDED_VOTE_ACTION_EXACT = "recorded_vote_action_exact"
RECORDED_VOTE_ACTION_DUPLICATE_SOURCE_DEDUPED = (
"recorded_vote_action_duplicate_source_deduped"
)
VOTE_XML_ONLY = "vote_xml_only"
QUESTION_TEXT_ONLY = "question_text_only"
MANUAL_REVIEW = "manual_review"
class VoteMeasureRole(StrEnum):
"""How one measure relates to one classified vote."""
VOTED_ON = "voted_on"
RULE_FOR = "rule_for"
UNDERLYING_BILL = "underlying_bill"
PROCEDURAL_TARGET = "procedural_target"
AMENDS = "amends"
AMENDED_BY = "amended_by"
CONFERENCE_REPORT_FOR = "conference_report_for"
RELATED_ONLY = "related_only"
class TextTargetType(StrEnum):
"""Which kind of legislative text was the object of a vote."""
BILL_TEXT = "bill_text"
RESOLUTION_TEXT = "resolution_text"
AMENDMENT_TEXT = "amendment_text"
NONE = "none"
UNKNOWN = "unknown"
class TextTargetBasis(StrEnum):
"""How the text target should be interpreted."""
EXACT_ACTION_TEXT_VERSION = "exact_action_text_version"
RESULTING_ENGROSSED_VERSION = "resulting_engrossed_version"
RECEIVED_PRIOR_CHAMBER_VERSION = "received_prior_chamber_version"
AMENDMENT_TEXT = "amendment_text"
RULE_RESOLUTION_TEXT = "rule_resolution_text"
NO_TEXT_TARGET = "no_text_target"
UNKNOWN = "unknown"
class TextResolutionMethod(StrEnum):
"""How the official text target was resolved."""
TEXT_EXACT_ACTION_DATE_AND_CODE = "text_exact_action_date_and_code"
TEXT_EXACT_ACTION_DATE_WRONG_CODE = "text_exact_action_date_wrong_code"
TEXT_PRIOR_VERSION_CODE_MATCH = "text_prior_version_code_match"
TEXT_RECEIVED_PRIOR_CHAMBER_VERSION = "text_received_prior_chamber_version"
TEXT_RESULTING_ENROLLED_ONLY = "text_resulting_enrolled_only"
AMENDMENT_TEXT_UNMODELED_PHASE1 = "amendment_text_unmodeled_phase1"
NO_TEXT_TARGET = "no_text_target"
UNKNOWN = "unknown"
class VoteEffect(StrEnum):
"""Meaning of one member position relative to the target text/procedure."""
SUPPORTS_TEXT = "supports_text"
OPPOSES_TEXT = "opposes_text"
ADVANCES_PROCEDURE = "advances_procedure"
BLOCKS_PROCEDURE = "blocks_procedure"
UNKNOWN = "unknown"
class IngestRun(DataScienceDevTableBase):
"""One full ingestion or context rebuild run."""
__tablename__ = "ingest_run"
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
git_sha: Mapped[str | None]
classifier_version: Mapped[str | None]
source_snapshot_label: Mapped[str | None]
status: Mapped[str]
source_artifacts: Mapped[list[SourceArtifact]] = relationship(
"SourceArtifact",
back_populates="ingest_run",
cascade="all, delete-orphan",
)
score_runs: Mapped[list[ScoreRun]] = relationship(
"ScoreRun",
back_populates="ingest_run",
cascade="all, delete-orphan",
)
class SourceArtifact(DataScienceDevTableBase):
"""Local artifact manifest entry for reproducibility."""
__tablename__ = "source_artifact"
__table_args__ = (
Index("ix_source_artifact_source_kind", "source_kind"),
Index("ix_source_artifact_congress", "congress"),
Index(
"uq_source_artifact_ingest_identity",
"ingest_run_id",
"local_path",
"sha256",
unique=True,
),
)
source_kind: Mapped[str]
congress: Mapped[int]
chamber: Mapped[str | None]
local_path: Mapped[str]
source_url: Mapped[str | None]
sha256: Mapped[str]
byte_size: Mapped[int]
modified_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
ingested_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
ingest_run_id: Mapped[int | None] = mapped_column(
ForeignKey("main.ingest_run.id", ondelete="SET NULL")
)
ingest_run: Mapped[IngestRun | None] = relationship(
"IngestRun",
back_populates="source_artifacts",
)
class ScoreRun(DataScienceDevTableBase):
"""One full score recomputation tied to one ingest snapshot."""
__tablename__ = "score_run"
ingest_run_id: Mapped[int | None] = mapped_column(
ForeignKey("main.ingest_run.id", ondelete="SET NULL")
)
classifier_version: Mapped[str | None]
scoring_version: Mapped[str | None]
included_vote_count: Mapped[int]
excluded_vote_count: Mapped[int]
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
ingest_run: Mapped[IngestRun | None] = relationship(
"IngestRun",
back_populates="score_runs",
)
scores: Mapped[list[LegislatorScore]] = relationship(
"LegislatorScore",
back_populates="score_run",
cascade="all, delete-orphan",
)
class VoteActionMatch(DataScienceDevTableBase):
"""A candidate or selected official action match for one raw vote."""
__tablename__ = "vote_action_match"
__table_args__ = (
Index("ix_vote_action_match_vote_id", "vote_id"),
Index(
"uq_vote_action_match_selected_vote_id",
"vote_id",
unique=True,
postgresql_where=text("is_selected"),
),
)
vote_id: Mapped[int] = mapped_column(ForeignKey("main.vote.id", ondelete="CASCADE"))
action_scope: Mapped[VoteActionScope] = mapped_column(
_enum_column(VoteActionScope, name="vote_action_scope")
)
bill_action_id: Mapped[int | None] = mapped_column(
ForeignKey("main.bill_action.id", ondelete="CASCADE")
)
amendment_action_id: Mapped[int | None] = mapped_column(
ForeignKey("main.amendment_action.id", ondelete="CASCADE")
)
is_selected: Mapped[bool]
match_method: Mapped[str]
match_reason: Mapped[str | None]
match_confidence: Mapped[ConfidenceLevel] = mapped_column(
_enum_column(ConfidenceLevel, name="vote_action_match_confidence")
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
)
vote: Mapped[Vote] = relationship("Vote", back_populates="action_matches")
bill_action: Mapped[BillAction | None] = relationship("BillAction")
amendment_action: Mapped[AmendmentAction | None] = relationship("AmendmentAction")
class VoteClassification(DataScienceDevTableBase):
"""Normalized classification for what a vote was legally/procedurally on."""
__tablename__ = "vote_classification"
__table_args__ = (
Index("ix_vote_classification_subject_type", "subject_type"),
Index(
"ix_vote_classification_eligible_vote_id",
"vote_id",
postgresql_where=text(
"subject_type = 'measure' "
"AND vote_relationship = 'direct_text_vote' "
"AND is_direct_vote_on_legislative_text "
"AND is_substantive_policy_vote "
"AND NOT is_special_rule"
),
),
)
vote_id: Mapped[int] = mapped_column(
ForeignKey("main.vote.id", ondelete="CASCADE"),
unique=True,
)
subject_type: Mapped[SubjectType] = mapped_column(
_enum_column(SubjectType, name="vote_subject_type")
)
measure_type: Mapped[str | None]
measure_subtype: Mapped[MeasureSubtype | None] = mapped_column(
_enum_column(MeasureSubtype, name="vote_measure_subtype")
)
measure_function: Mapped[MeasureFunction | None] = mapped_column(
_enum_column(MeasureFunction, name="vote_measure_function")
)
vote_relationship: Mapped[VoteRelationship] = mapped_column(
_enum_column(VoteRelationship, name="vote_relationship")
)
is_legislation_related: Mapped[bool]
is_direct_vote_on_legislative_text: Mapped[bool]
is_substantive_policy_vote: Mapped[bool]
is_lawmaking_vehicle: Mapped[bool]
is_special_rule: Mapped[bool]
classification_method: Mapped[ClassificationMethod] = mapped_column(
_enum_column(ClassificationMethod, name="vote_classification_method")
)
classification_confidence_reason: Mapped[str | None]
confidence: Mapped[ConfidenceLevel] = mapped_column(
_enum_column(ConfidenceLevel, name="vote_classification_confidence")
)
classified_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
classification_version: Mapped[str]
vote: Mapped[Vote] = relationship("Vote", back_populates="classification")
class VoteMeasureLink(DataScienceDevTableBase):
"""Relationship between a classified vote and one bill/resolution measure."""
__tablename__ = "vote_measure_link"
__table_args__ = (
Index("ix_vote_measure_link_vote_id", "vote_id"),
Index("ix_vote_measure_link_vote_id_role", "vote_id", "role"),
Index("ix_vote_measure_link_measure_id_role", "measure_id", "role"),
)
vote_id: Mapped[int] = mapped_column(ForeignKey("main.vote.id", ondelete="CASCADE"))
measure_id: Mapped[int] = mapped_column(ForeignKey("main.bill.id", ondelete="CASCADE"))
role: Mapped[VoteMeasureRole] = mapped_column(
_enum_column(VoteMeasureRole, name="vote_measure_role")
)
source: Mapped[str]
confidence: Mapped[ConfidenceLevel] = mapped_column(
_enum_column(ConfidenceLevel, name="vote_measure_link_confidence")
)
notes: Mapped[str | None]
vote: Mapped[Vote] = relationship("Vote", back_populates="vote_measure_links")
measure: Mapped[Bill] = relationship("Bill", back_populates="vote_measure_links")
class VoteTextTarget(DataScienceDevTableBase):
"""Official text target, if any, resolved for one classified vote."""
__tablename__ = "vote_text_target"
__table_args__ = (
Index(
"ix_vote_text_target_voted_text_version_id",
"voted_text_version_id",
postgresql_where=text("voted_text_version_id IS NOT NULL"),
),
)
vote_id: Mapped[int] = mapped_column(
ForeignKey("main.vote.id", ondelete="CASCADE"),
unique=True,
)
text_target_type: Mapped[TextTargetType] = mapped_column(
_enum_column(TextTargetType, name="vote_text_target_type")
)
voted_text_version_id: Mapped[int | None] = mapped_column(
ForeignKey("main.bill_text.id", ondelete="SET NULL")
)
resulting_text_version_id: Mapped[int | None] = mapped_column(
ForeignKey("main.bill_text.id", ondelete="SET NULL")
)
related_amendment_id: Mapped[int | None] = mapped_column(
ForeignKey("main.amendment.id", ondelete="SET NULL")
)
text_target_basis: Mapped[TextTargetBasis] = mapped_column(
_enum_column(TextTargetBasis, name="vote_text_target_basis")
)
text_resolution_method: Mapped[TextResolutionMethod] = mapped_column(
_enum_column(TextResolutionMethod, name="vote_text_resolution_method")
)
text_resolution_confidence_reason: Mapped[str | None]
confidence: Mapped[ConfidenceLevel] = mapped_column(
_enum_column(ConfidenceLevel, name="vote_text_target_confidence")
)
notes: Mapped[str | None]
vote: Mapped[Vote] = relationship("Vote", back_populates="text_target")
voted_text_version: Mapped[BillText | None] = relationship(
"BillText",
foreign_keys=[voted_text_version_id],
)
resulting_text_version: Mapped[BillText | None] = relationship(
"BillText",
foreign_keys=[resulting_text_version_id],
)
related_amendment: Mapped[Amendment | None] = relationship("Amendment")
class VotePositionMeaning(DataScienceDevTableBase):
"""Meaning of Yea/Nay/Present positions for one classified vote."""
__tablename__ = "vote_position_meaning"
vote_id: Mapped[int] = mapped_column(
ForeignKey("main.vote.id", ondelete="CASCADE"),
unique=True,
)
yea_effect: Mapped[VoteEffect] = mapped_column(
_enum_column(VoteEffect, name="vote_yea_effect")
)
nay_effect: Mapped[VoteEffect] = mapped_column(
_enum_column(VoteEffect, name="vote_nay_effect")
)
present_effect: Mapped[VoteEffect] = mapped_column(
_enum_column(VoteEffect, name="vote_present_effect")
)
polarity_confidence: Mapped[ConfidenceLevel] = mapped_column(
_enum_column(ConfidenceLevel, name="vote_polarity_confidence")
)
polarity_method: Mapped[str]
notes: Mapped[str | None]
vote: Mapped[Vote] = relationship("Vote", back_populates="position_meaning")
class VoteContextAudit(DataScienceDevTableBase):
"""Audit/event row for ambiguous or noteworthy vote-context decisions."""
__tablename__ = "vote_context_audit"
__table_args__ = (
Index("ix_vote_context_audit_vote_id", "vote_id"),
Index("ix_vote_context_audit_severity_vote_id", "severity", "vote_id"),
)
vote_id: Mapped[int] = mapped_column(ForeignKey("main.vote.id", ondelete="CASCADE"))
step: Mapped[str]
message: Mapped[str]
severity: Mapped[str]
source_path: Mapped[str | None]
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
)
vote: Mapped[Vote] = relationship("Vote", back_populates="context_audit_rows")
@@ -11,6 +11,7 @@ from sqlalchemy.orm import Mapped, mapped_column, relationship
from pipelines.orm.data_science_dev.base import DataScienceDevTableBase
if TYPE_CHECKING:
from pipelines.orm.data_science_dev.congress.context import ScoreRun
from pipelines.orm.data_science_dev.congress.vote import VoteRecord
@@ -18,6 +19,7 @@ class Legislator(DataScienceDevTableBase):
"""Members of Congress with identification and current term info."""
__tablename__ = "legislator"
__table_args__ = (Index("ix_legislator_current_chamber", "current_chamber"),)
bioguide_id: Mapped[str] = mapped_column(Text, unique=True, index=True)
@@ -91,8 +93,39 @@ class LegislatorScore(DataScienceDevTableBase):
ForeignKey("main.legislator.id", ondelete="CASCADE"),
index=True,
)
score_run_id: Mapped[int | None] = mapped_column(
ForeignKey("main.score_run.id", ondelete="CASCADE"),
index=True,
)
year: Mapped[int]
topic: Mapped[str]
score: Mapped[float]
legislator: Mapped[Legislator] = relationship(back_populates="scores")
score_run: Mapped[ScoreRun | None] = relationship(
"ScoreRun",
back_populates="scores",
)
class LegislatorScoreFake(DataScienceDevTableBase):
"""Computed topic score for a legislator in one calendar year."""
__tablename__ = "legislator_score_fake"
__table_args__ = (
UniqueConstraint(
"legislator_id",
"year",
"topic",
name="uq_legislator_score_fake_legislator_id_year_topic",
),
Index("ix_legislator_score_fake_year_topic", "year", "topic"),
)
legislator_id: Mapped[int] = mapped_column(
ForeignKey("main.legislator.id", ondelete="CASCADE"),
index=True,
)
year: Mapped[int]
topic: Mapped[str]
score: Mapped[float]
+61 -14
View File
@@ -1,11 +1,12 @@
"""Vote model - roll call votes in Congress."""
"""Vote models for raw roll-call data and member positions."""
from __future__ import annotations
from datetime import date
from datetime import date, datetime
from typing import TYPE_CHECKING
from sqlalchemy import ForeignKey, Index, UniqueConstraint
from sqlalchemy import DateTime, ForeignKey, Index, UniqueConstraint
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column, relationship
from pipelines.orm.data_science_dev.base import (
@@ -14,9 +15,15 @@ from pipelines.orm.data_science_dev.base import (
)
if TYPE_CHECKING:
from pipelines.orm.data_science_dev.congress.bill import Bill
from pipelines.orm.data_science_dev.congress.context import (
VoteActionMatch,
VoteClassification,
VoteContextAudit,
VoteMeasureLink,
VotePositionMeaning,
VoteTextTarget,
)
from pipelines.orm.data_science_dev.congress.legislator import Legislator
from pipelines.orm.data_science_dev.congress.vote import Vote
class VoteRecord(DataScienceDevBase):
@@ -41,16 +48,16 @@ class VoteRecord(DataScienceDevBase):
class Vote(DataScienceDevTableBase):
"""Roll call votes with counts and optional bill linkage."""
"""Raw roll call vote facts from House or Senate vote sources."""
__tablename__ = "vote"
__table_args__ = (
UniqueConstraint(
"congress",
"chamber",
"session",
"number",
name="uq_vote_congress_chamber_session_number",
"session_number",
"roll_number",
name="uq_vote_congress_chamber_session_number_roll_number",
),
Index("ix_vote_date", "vote_date"),
Index("ix_vote_congress_chamber", "congress", "chamber"),
@@ -58,8 +65,9 @@ class Vote(DataScienceDevTableBase):
congress: Mapped[int]
chamber: Mapped[str]
session: Mapped[int]
number: Mapped[int]
session_year: Mapped[int]
session_number: Mapped[int]
roll_number: Mapped[int]
vote_type: Mapped[str | None]
question: Mapped[str | None]
@@ -67,18 +75,57 @@ class Vote(DataScienceDevTableBase):
result_text: Mapped[str | None]
vote_date: Mapped[date]
vote_datetime: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
raw_vote_source_url: Mapped[str | None]
yea_count: Mapped[int | None]
nay_count: Mapped[int | None]
not_voting_count: Mapped[int | None]
present_count: Mapped[int | None]
bill_id: Mapped[int | None] = mapped_column(ForeignKey("main.bill.id"))
raw_bill_ref: Mapped[dict | None] = mapped_column(JSONB)
raw_amendment_ref: Mapped[dict | None] = mapped_column(JSONB)
raw_nomination_ref: Mapped[dict | None] = mapped_column(JSONB)
raw_treaty_ref: Mapped[dict | None] = mapped_column(JSONB)
raw_vote_source_artifact_id: Mapped[int | None] = mapped_column(
ForeignKey("main.source_artifact.id", ondelete="SET NULL")
)
bill: Mapped[Bill | None] = relationship("Bill", back_populates="votes")
vote_records: Mapped[list[VoteRecord]] = relationship(
"VoteRecord",
back_populates="vote",
cascade="all, delete-orphan",
)
action_matches: Mapped[list[VoteActionMatch]] = relationship(
"VoteActionMatch",
back_populates="vote",
cascade="all, delete-orphan",
)
classification: Mapped[VoteClassification | None] = relationship(
"VoteClassification",
back_populates="vote",
cascade="all, delete-orphan",
uselist=False,
)
vote_measure_links: Mapped[list[VoteMeasureLink]] = relationship(
"VoteMeasureLink",
back_populates="vote",
cascade="all, delete-orphan",
)
text_target: Mapped[VoteTextTarget | None] = relationship(
"VoteTextTarget",
back_populates="vote",
cascade="all, delete-orphan",
uselist=False,
)
position_meaning: Mapped[VotePositionMeaning | None] = relationship(
"VotePositionMeaning",
back_populates="vote",
cascade="all, delete-orphan",
uselist=False,
)
context_audit_rows: Mapped[list[VoteContextAudit]] = relationship(
"VoteContextAudit",
back_populates="vote",
cascade="all, delete-orphan",
)
+54
View File
@@ -3,26 +3,80 @@
from __future__ import annotations
from pipelines.orm.data_science_dev.congress import (
Amendment,
AmendmentAction,
AmendmentActionRecordedVote,
Bill,
BillAction,
BillActionRecordedVote,
BillRelation,
BillText,
BillTopic,
BillTopicPosition,
ClassificationMethod,
ConfidenceLevel,
IngestRun,
Legislator,
LegislatorScore,
MeasureFunction,
MeasureSubtype,
ScoreRun,
SourceArtifact,
SubjectType,
TextResolutionMethod,
TextTargetBasis,
TextTargetType,
Vote,
VoteActionMatch,
VoteActionScope,
VoteClassification,
VoteContextAudit,
VoteEffect,
VoteMeasureLink,
VoteMeasureRole,
VotePositionMeaning,
VoteRelationship,
VoteRecord,
VoteTextTarget,
)
from pipelines.orm.data_science_dev.posts import partitions # noqa: F401 — registers partition classes in metadata
from pipelines.orm.data_science_dev.posts.tables import Posts
__all__ = [
"Amendment",
"AmendmentAction",
"AmendmentActionRecordedVote",
"Bill",
"BillAction",
"BillActionRecordedVote",
"BillRelation",
"BillText",
"BillTopic",
"BillTopicPosition",
"ClassificationMethod",
"ConfidenceLevel",
"IngestRun",
"Legislator",
"LegislatorScore",
"MeasureFunction",
"MeasureSubtype",
"Posts",
"ScoreRun",
"SourceArtifact",
"SubjectType",
"TextResolutionMethod",
"TextTargetBasis",
"TextTargetType",
"Vote",
"VoteActionMatch",
"VoteActionScope",
"VoteClassification",
"VoteContextAudit",
"VoteEffect",
"VoteMeasureLink",
"VoteMeasureRole",
"VotePositionMeaning",
"VoteRelationship",
"VoteRecord",
"VoteTextTarget",
]
@@ -3,9 +3,10 @@
from __future__ import annotations
from pipelines.orm.data_science_dev.posts.failed_ingestion import FailedIngestion
from pipelines.orm.data_science_dev.posts.tables import Posts
from pipelines.orm.data_science_dev.posts.tables import Posts, PostTopic
__all__ = [
"FailedIngestion",
"Posts",
"PostTopic",
]
@@ -0,0 +1,195 @@
"""Shared language filter constants for post sampling queries."""
from __future__ import annotations
ENGLISH_LANGS = (
'["", "", ""]',
'[""]',
"[]",
'["", "eng"]',
'["eng", "", ""]',
'["eng", ""]',
'["eng"]',
'["eng", "aar"]',
'["eng", "abk", "afr"]',
'["eng", "afr"]',
'["eng", "afr", "abk"]',
'["eng", "afr", "anp"]',
'["eng", "afr", "ber"]',
'["eng", "afr", "dan"]',
'["eng", "afr", "deu"]',
'["eng", "afr", "est"]',
'["eng", "afr", "fra"]',
'["eng", "afr", "ind"]',
'["eng", "afr", "lat"]',
'["eng", "afr", "nld"]',
'["eng", "afr", "nor"]',
'["eng", "afr", "pol"]',
'["eng", "afr", "por"]',
'["eng", "afr", "ron"]',
'["eng", "afr", "slk"]',
'["eng", "afr", "spa"]',
'["eng", "afr", "tgl"]',
'["eng", "afr", "tuk"]',
'["eng", "afr", "tur"]',
'["eng", "afr", "ukr"]',
'["eng", "afr", "vol"]',
'["eng", "agq"]',
'["eng", "ain"]',
'["eng", "ain", "amh"]',
'["eng", "ain", "jpn"]',
'["eng", "aka"]',
'["eng", "amh"]',
'["eng", "amh", "afr"]',
'["eng", "amh", "ara"]',
'["eng", "amh", "fra"]',
'["eng", "anp"]',
'["eng", "anp", "hye"]',
'["eng", "anp", "sqi"]',
'["eng", "", "ara"]',
'["eng", "ara", ""]',
'["eng", "ara"]',
'["eng", "ara", "afr"]',
'["eng", "ara", "anp"]',
'["eng", "ara", "ars"]',
'["eng", "ara", "bul"]',
'["eng", "ara", "cat"]',
'["eng", "ara", "deu"]',
'["eng", "ara", "ell"]',
'["eng", "ara", "fas"]',
'["eng", "ara", "fra"]',
'["eng", "ara", "heb"]',
'["eng", "ara", "hin"]',
'["eng", "ara", "ind"]',
'["eng", "ara", "ita"]',
'["eng", "ara", "jpn"]',
'["eng", "ara", "kas"]',
'["eng", "ara", "kor"]',
'["eng", "ara", "nob"]',
'["eng", "ara", "nor"]',
'["eng", "ara", "rus"]',
'["eng", "ara", "spa"]',
'["eng", "ara", "swe"]',
'["eng", "ara", "tam"]',
'["eng", "ara", "tur"]',
'["eng", "ara", "urd"]',
'["eng", "ara", "zho"]',
'["eng", "arg"]',
'["eng", "arg", "amh"]',
'["eng", "arg", "aze"]',
'["eng", "ars"]',
'["eng", "ars", "ara"]',
'["eng", "asm"]',
'["eng", "ava", "sqi"]',
'["eng", "ave"]',
'["eng", "aze"]',
'["eng", "aze", "deu"]',
'["eng", "aze", "hye"]',
'["eng", "aze", "ita"]',
'["eng", "aze", "rus"]',
'["eng", "bam", ""]',
'["eng", "bel"]',
'["eng", "bel", "rus"]',
'["eng", "ben"]',
'["eng", "ben", "deu"]',
'["eng", "ben", "fra"]',
'["eng", "ben", "hin"]',
'["eng", "ben", "mya"]',
'["eng", "ber"]',
'["eng", "ber", "afr"]',
'["eng", "ber", "deu"]',
'["eng", "ber", "est"]',
'["eng", "ber", "hun"]',
'["eng", "ber", "isl"]',
'["eng", "ber", "jpn"]',
'["eng", "ber", "lat"]',
'["eng", "ber", "nor"]',
'["eng", "ber", "pol"]',
'["eng", "ber", "por"]',
'["eng", "ber", "ron"]',
'["eng", "ber", "run"]',
'["eng", "ber", "slk"]',
'["eng", "ber", "spa"]',
'["eng", "ber", "tgl"]',
'["eng", "ber", "tlh"]',
'["eng", "ber", "tuk"]',
'["eng", "bod"]',
'["eng", "bod", "nep"]',
'["eng", "bos", "hrv"]',
'["eng", "bos", "srp"]',
'["eng", "bul"]',
'["eng", "bul", "deu"]',
'["eng", "bul", "fra"]',
'["eng", "bul", "jpn"]',
'["eng", "bul", "mkd"]',
'["eng", "bul", "mri"]',
'["eng", "bul", "nld"]',
'["eng", "bul", "rus"]',
'["eng", "bul", "srp"]',
'["eng", "cat"]',
'["eng", "cat", "fra"]',
'["eng", "cat", "ind"]',
'["eng", "cat", "isl"]',
'["eng", "cat", "jpn"]',
'["eng", "cat", "nld"]',
'["eng", "cat", "spa"]',
'["eng", "ces"]',
'["eng", "ces", "deu"]',
'["eng", "ces", "ell"]',
'["eng", "ces", "haw"]',
'["eng", "ces", "ind"]',
'["eng", "ces", "ita"]',
'["eng", "ces", "jpn"]',
'["eng", "ces", "por"]',
'["eng", "ces", "rus"]',
'["eng", "ces", "slk"]',
'["eng", "ces", "spa"]',
'["eng", "ces", "tuk"]',
'["eng", "cha"]',
'["eng", "chr"]',
'["eng", "chr", "ara"]',
'["eng", "chr", "deu"]',
'["eng", "chr", "ell"]',
'["eng", "chr", "fil"]',
'["eng", "chr", "isl"]',
'["eng", "chr", "kor"]',
'["eng", "chr", "rus"]',
'["eng", "chr", "spa"]',
'["eng", "chr", "zho"]',
'["eng", "chu", "oci"]',
'["eng", "cor"]',
'["eng", "", "cos"]',
'["eng", "cos"]',
'["eng", "cym"]',
'["eng", "cym", "deu"]',
'["eng", "cym", "fra"]',
'["eng", "cym", "jpn"]',
'["eng", "cym", "spa"]',
'["eng", "cym", "zho"]',
'["eng", "dan"]',
'["eng", "dan", "ber"]',
'["eng", "dan", "deu"]',
'["eng", "dan", "ell"]',
'["eng", "dan", "est"]',
'["eng", "dan", "fas"]',
'["eng", "dan", "fin"]',
'["eng", "dan", "fra"]',
'["eng", "dan", "gle"]',
'["eng", "dan", "hun"]',
'["eng", "dan", "isl"]',
'["eng", "dan", "ita"]',
'["eng", "dan", "jpn"]',
'["eng", "dan", "lat"]',
'["eng", "dan", "nld"]',
'["eng", "dan", "nob"]',
'["eng", "dan", "nor"]',
'["eng", "dan", "por"]',
'["eng", "dan", "rus"]',
'["eng", "dan", "slk"]',
'["eng", "dan", "spa"]',
'["eng", "dan", "swe"]',
'["eng", "dan", "tuk"]',
'["eng", "dan", "zho"]',
'["eng", "deu", ""]',
'["eng", "deu"]',
)
+25 -2
View File
@@ -1,13 +1,36 @@
"""Posts parent table with PostgreSQL weekly range partitioning on date column."""
"""Posts parent table and PostTopic table for the data_science_dev database."""
from __future__ import annotations
from pipelines.orm.data_science_dev.base import DataScienceDevBase
from pipelines.orm.data_science_dev.base import (
DataScienceDevBase,
DataScienceDevTableBase,
)
from pipelines.orm.data_science_dev.posts.columns import PostsColumns
from sqlalchemy import BigInteger, Index, SmallInteger
from sqlalchemy.orm import Mapped, mapped_column
class Posts(PostsColumns, DataScienceDevBase):
"""Parent partitioned table for posts, partitioned by week on `date`."""
__tablename__ = "posts"
__table_args__ = ({"postgresql_partition_by": "RANGE (date)"},)
class PostTopic(DataScienceDevTableBase):
"""Stores BERTopic topic assignments for posts.
post_id references main.posts but without a FK constraint
since posts is a partitioned table.
"""
__tablename__ = "post_topic"
__table_args__ = (Index("ix_post_topic_post_id", "post_id"),)
post_id: Mapped[int] = mapped_column(BigInteger)
topic_id: Mapped[int] = mapped_column(SmallInteger)
topic_label: Mapped[str | None]
model_version: Mapped[str | None]
-25
View File
@@ -1,25 +0,0 @@
# Unsloth fine-tuning container for Qwen 3.5 4B on RTX 3090.
#
# Build:
# docker build -f python/prompt_bench/Dockerfile.finetune -t bill-finetune .
#
# Run:
# docker run --rm --device=nvidia.com/gpu=all --ipc=host \
# -v $(pwd)/output:/workspace/output \
# -v $(pwd)/output/finetune_dataset.jsonl:/workspace/dataset.jsonl:ro \
# -v /zfs/models/hf:/models \
# bill-finetune \
# --dataset /workspace/dataset.jsonl \
# --output-dir /workspace/output/qwen-bill-summarizer
FROM ghcr.io/unslothai/unsloth:latest
RUN pip install --no-cache-dir typer
WORKDIR /workspace
COPY python/prompt_bench/finetune.py python/prompt_bench/finetune.py
COPY config/prompts/summarization_prompts.toml config/prompts/summarization_prompts.toml
COPY python/prompt_bench/__init__.py python/prompt_bench/__init__.py
COPY python/__init__.py python/__init__.py
ENTRYPOINT ["python", "-m", "pipelines.prompt_bench.finetune"]
+6 -2
View File
@@ -23,10 +23,14 @@ import httpx
import typer
from tiktoken import Encoding, get_encoding
from pipelines.config import get_config_dir
from pipelines.tools.bill_token_compression import compress_bill_text
_PROMPTS_PATH = get_config_dir() / "prompts" / "summarization_prompts.toml"
_PROMPTS_PATH = (
Path(__file__).resolve().parents[2]
/ "config"
/ "prompts"
/ "summarization_prompts.toml"
)
_PROMPTS = tomllib.loads(_PROMPTS_PATH.read_text())["summarization"]
SUMMARIZATION_SYSTEM_PROMPT: str = _PROMPTS["system_prompt"]
SUMMARIZATION_USER_TEMPLATE: str = _PROMPTS["user_template"]
+6 -2
View File
@@ -24,10 +24,14 @@ from typing import Annotated
import httpx
import typer
from pipelines.config import get_config_dir
from pipelines.tools.bill_token_compression import compress_bill_text
_PROMPTS_PATH = get_config_dir() / "prompts" / "summarization_prompts.toml"
_PROMPTS_PATH = (
Path(__file__).resolve().parents[2]
/ "config"
/ "prompts"
/ "summarization_prompts.toml"
)
_PROMPTS = tomllib.loads(_PROMPTS_PATH.read_text())["summarization"]
SUMMARIZATION_SYSTEM_PROMPT: str = _PROMPTS["system_prompt"]
SUMMARIZATION_USER_TEMPLATE: str = _PROMPTS["user_template"]
+1 -3
View File
@@ -25,8 +25,6 @@ from datasets import Dataset
from transformers import TrainingArguments
from trl import SFTTrainer
from pipelines.config import default_config_path
logger = logging.getLogger(__name__)
@@ -125,7 +123,7 @@ def main(
config_path: Annotated[
Path,
typer.Option("--config", help="TOML config file"),
] = default_config_path(),
] = Path(__file__).parent / "config.toml",
save_gguf: Annotated[
bool, typer.Option("--save-gguf/--no-save-gguf", help="Also save GGUF")
] = False,
+34
View File
@@ -0,0 +1,34 @@
SUMMARIZATION_SYSTEM_PROMPT = """You are a legislative analyst extracting policy substance from Congressional bill text.
Your job is to compress a bill into a dense, neutral structured summary that captures every distinct policy action — including secondary effects that might be buried in subsections.
EXTRACTION RULES:
- IGNORE: whereas clauses, congressional findings that are purely political statements, recitals, preambles, citations of existing law by number alone, and procedural boilerplate.
- FOCUS ON: operative verbs — what the bill SHALL do, PROHIBIT, REQUIRE, AUTHORIZE, AMEND, APPROPRIATE, or ESTABLISH.
- SURFACE ALL THREADS: If the bill touches multiple policy areas, list each thread separately. Do not collapse them.
- BE CONCRETE: Name the affected population, the mechanism, and the direction (expands/restricts/maintains).
- STAY NEUTRAL: No political framing. Describe what the text does, not what its sponsors claim it does.
OUTPUT FORMAT — plain structured text, not JSON:
OPERATIVE ACTIONS:
[Numbered list of what the bill actually does, one action per line, max 20 words each]
AFFECTED POPULATIONS:
[Who gains something, who loses something, or whose behavior is regulated]
MECHANISMS:
[How it works: new funding, mandate, prohibition, amendment to existing statute, grant program, study commission, etc.]
POLICY THREADS:
[List each distinct policy domain this bill touches, even minor ones. Use plain language, not domain codes.]
SYMBOLIC/PROCEDURAL ONLY:
[Yes or No — is this bill primarily a resolution, designation, or awareness declaration with no operative effect?]
LENGTH TARGET: 150-250 words total. Be ruthless about cutting. Density over completeness."""
SUMMARIZATION_USER_TEMPLATE = """Summarize the following Congressional bill according to your instructions.
BILL TEXT:
{text_content}"""
-268
View File
@@ -1,268 +0,0 @@
"""Summarize bill_text rows with GPT-5 and store results in the database."""
from __future__ import annotations
import logging
import tomllib
from os import getenv
from pathlib import Path
from typing import Annotated, Any
import httpx
import typer
from sqlalchemy import Select, or_, select
from sqlalchemy.orm import Session, selectinload
from pipelines.config import get_config_dir
from pipelines.orm.common import get_postgres_engine
from pipelines.orm.data_science_dev.congress import Bill, BillText
from pipelines.tools.bill_token_compression import compress_bill_text
logger = logging.getLogger(__name__)
OPENAI_PROJECT_ID = "proj_fQBPEXFgnS87Fk6wZwploFwE"
def _find_prompts_path() -> Path:
return get_config_dir() / "prompts" / "summarization_prompts.toml"
def load_summarization_prompts(
section: str = "summarization",
) -> dict[str, str]:
return tomllib.loads(_find_prompts_path().read_text())[section]
class BillSummaryError(RuntimeError):
"""Raised when a bill summary request or response is invalid."""
def call_openai_summary(
*,
model: str,
messages: list[dict[str, str]],
) -> str:
"""Call GPT and return the assistant message content."""
api_key = getenv("CLOSEDAI_TOKEN")
if not api_key:
msg = "CLOSEDAI_TOKEN is required"
raise BillSummaryError(msg)
response = httpx.post(
"https://api.openai.com/v1/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"OpenAI-Project": OPENAI_PROJECT_ID,
"Content-Type": "application/json",
},
json={
"model": model,
"messages": messages,
},
timeout=60,
)
response.raise_for_status()
return extract_message_content(response.json())
def build_bill_summary_messages(
*,
bill_text: BillText,
) -> list[dict[str, str]]:
"""Build the GPT prompt messages for one bill text row."""
if not bill_text.text_content:
msg = f"bill_text id={bill_text.id} has no text_content"
raise BillSummaryError(msg)
bill = bill_text.bill
compressed_text = compress_bill_text(bill_text.text_content)
if not compressed_text:
msg = f"bill_text id={bill_text.id} has no summarizable text_content"
raise BillSummaryError(msg)
metadata = "\n".join(
(
f"Congress: {bill.congress}",
f"Bill: {bill.bill_type} {bill.number}",
f"Title: {bill.title_short or bill.title or bill.official_title or ''}",
f"Top subject term: {bill.subjects_top_term or ''}",
f"Text version: {bill_text.version_code}"
+ (f" ({bill_text.version_name})" if bill_text.version_name else ""),
)
)
summarization_prompts = load_summarization_prompts()
user_prompt = "\n\n".join(
(
"BILL METADATA:",
metadata,
summarization_prompts["user_template"].format(text_content=compressed_text),
)
)
return [
{"role": "system", "content": summarization_prompts["system_prompt"]},
{
"role": "user",
"content": user_prompt,
},
]
def summarize_bill_text(
*,
model: str,
bill_text: BillText,
) -> str:
"""Generate and return a summary for one bill_text row."""
messages = build_bill_summary_messages(bill_text=bill_text)
summary = call_openai_summary(
model=model,
messages=messages,
).strip()
if not summary:
msg = f"Model returned an empty summary for bill_text id={bill_text.id}"
raise BillSummaryError(msg)
return summary
def store_bill_summary_result(
*,
bill_text: BillText,
summary: str,
model: str,
) -> None:
"""Store a generated summary and the prompt/model metadata that produced it."""
bill_text.summary = summary
bill_text.summarization_model = model
bill_text.summarization_system_prompt_version = "v1"
bill_text.summarization_user_prompt_version = "v1"
def create_select_bill_texts_for_summarization(
congress: int | None = None,
bill_ids: list[int] | None = None,
bill_text_ids: list[int] | None = None,
force: bool = False,
limit: int | None = None,
) -> Select:
"""Select bill_text rows that have source text and need summaries."""
stmt = (
select(BillText)
.join(Bill, Bill.id == BillText.bill_id)
.where(BillText.text_content.is_not(None), BillText.text_content != "")
.options(selectinload(BillText.bill))
.order_by(BillText.id)
)
if congress is not None:
stmt = stmt.where(Bill.congress == congress)
if bill_ids:
stmt = stmt.where(BillText.bill_id.in_(bill_ids))
if bill_text_ids:
stmt = stmt.where(BillText.id.in_(bill_text_ids))
if not force:
stmt = stmt.where(or_(BillText.summary.is_(None), BillText.summary == ""))
if limit is not None:
stmt = stmt.limit(limit)
return stmt
def extract_message_content(data: dict[str, Any]) -> str:
"""Extract message content from a chat-completions response body."""
choices = data.get("choices")
if not isinstance(choices, list) or not choices:
msg = "Chat completion response did not contain choices"
raise BillSummaryError(msg)
first = choices[0]
if not isinstance(first, dict):
msg = "Chat completion choice must be an object"
raise BillSummaryError(msg)
message = first.get("message")
if isinstance(message, dict) and isinstance(message.get("content"), str):
return message["content"]
if isinstance(first.get("text"), str):
return first["text"]
msg = "Chat completion response did not contain message content"
raise BillSummaryError(msg)
def main(
model: Annotated[str, typer.Option(help="OpenAI model id.")] = "gpt-5.4-mini",
congress: Annotated[
int | None, typer.Option(help="Only process one Congress.")
] = None,
bill_ids: Annotated[
list[int] | None,
typer.Option(
"--bill-id",
help="Only process one internal bill.id. Repeat for multiple bills.",
),
] = None,
bill_text_ids: Annotated[
list[int] | None,
typer.Option(
"--bill-text-id",
help="Only process one internal bill_text.id. Repeat for multiple rows.",
),
] = None,
limit: Annotated[int | None, typer.Option(help="Maximum rows to process.")] = None,
force: Annotated[
bool,
typer.Option(help="Regenerate summaries for rows that already have a summary."),
] = False,
dry_run: Annotated[
bool, typer.Option(help="Print summaries without writing them to the database.")
] = False,
log_level: Annotated[str, typer.Option(help="Log level.")] = "INFO",
) -> None:
"""CLI entrypoint for generating and storing bill summaries."""
logging.basicConfig(
level=log_level,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
if not getenv("CLOSEDAI_TOKEN"):
message = "CLOSEDAI_TOKEN is required"
raise typer.BadParameter(message)
engine = get_postgres_engine(name="DATA_SCIENCE_DEV")
with Session(engine) as session:
stmt = create_select_bill_texts_for_summarization(
congress=congress,
bill_ids=bill_ids,
bill_text_ids=bill_text_ids,
force=force,
limit=limit,
)
bill_texts = session.scalars(stmt).all()
logger.info("Selected %d bill_text rows for summarization", len(bill_texts))
written = 0
for index, bill_text in enumerate(bill_texts, 1):
summary = summarize_bill_text(
model=model,
bill_text=bill_text,
)
store_bill_summary_result(
bill_text=bill_text,
summary=summary,
model=model,
)
if index % 100 == 0:
session.commit()
written += 1
session.commit()
logger.info("Stored summary for bill_text id=%s", bill_text.id)
logger.info("Done: stored %d summaries", written)
def cli() -> None:
"""Typer entry point."""
typer.run(main)
if __name__ == "__main__":
cli()
View File
-22
View File
@@ -1,22 +0,0 @@
[project]
name = "ds-testing-pipelines"
version = "0.1.0"
description = "Data science pipeline tools and legislative dashboard."
requires-python = ">=3.12"
dependencies = [
"fastapi",
"httpx",
"uvicorn[standard]",
"jinja2",
"sqlalchemy",
"psycopg",
]
[project.optional-dependencies]
test = [
"pytest",
]
[tool.pytest.ini_options]
testpaths = ["tests"]
pythonpath = ["."]