12 Commits

Author SHA1 Message Date
Richie 9a4f71f471 updated dependencies 2026-04-21 22:57:54 -04:00
Richie c8adf6914e removed LegislatorBillScore 2026-04-21 22:51:29 -04:00
Richie 87a7f5312f converted default_config_path and get_config_dir 2026-04-21 22:51:10 -04:00
Richie 51d6240690 removed LegislatorBillScore 2026-04-21 22:48:13 -04:00
Richie 1426b797e5 added summarize_bills.py 2026-04-21 22:44:52 -04:00
Richie 4b768049c0 added summarization metadata to the DB 2026-04-21 21:42:13 -04:00
Richie 674edafe94 created scoring tables and basic logic 2026-04-21 11:44:53 -04:00
Richie be4b473a3c got alembic working 2026-04-21 11:36:58 -04:00
Richie e5ba089479 adding database_cli.py and alembic 2026-04-21 11:08:00 -04:00
Richie 0c99a63347 added DatabaseSetupError 2026-04-21 10:54:30 -04:00
Richie 07cd231609 adding common.py code and parallelize.py 2026-04-21 10:09:01 -04:00
Richie c8b61fc3c0 adding ingest_congress.py and ingest_posts.py 2026-04-21 09:59:26 -04:00
38 changed files with 1073 additions and 4173 deletions
+1 -1
View File
@@ -110,7 +110,7 @@ ipython_config.py
# pdm # pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. # Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python. # pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-pipelines.
# https://pdm-project.org/en/latest/usage/project/#working-with-version-control # https://pdm-project.org/en/latest/usage/project/#working-with-version-control
# pdm.lock # pdm.lock
# pdm.toml # pdm.toml
@@ -1,98 +0,0 @@
"""adding LegislatorScoreFake.
Revision ID: 06f833813bd7
Revises: b63ed11d6775
Create Date: 2026-04-22 18:41:07.484609
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "06f833813bd7"
down_revision: str | None = "b63ed11d6775"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"legislator_score_fake",
sa.Column("legislator_id", sa.Integer(), nullable=False),
sa.Column("year", sa.Integer(), nullable=False),
sa.Column("topic", sa.String(), nullable=False),
sa.Column("score", sa.Float(), nullable=False),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["legislator_id"],
[f"{schema}.legislator.id"],
name=op.f("fk_legislator_score_fake_legislator_id_legislator"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_legislator_score_fake")),
sa.UniqueConstraint(
"legislator_id",
"year",
"topic",
name="uq_legislator_score_fake_legislator_id_year_topic",
),
schema=schema,
)
op.create_index(
op.f("ix_legislator_score_fake_legislator_id"),
"legislator_score_fake",
["legislator_id"],
unique=False,
schema=schema,
)
op.create_index(
"ix_legislator_score_fake_year_topic",
"legislator_score_fake",
["year", "topic"],
unique=False,
schema=schema,
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(
"ix_legislator_score_fake_year_topic",
table_name="legislator_score_fake",
schema=schema,
)
op.drop_index(
op.f("ix_legislator_score_fake_legislator_id"),
table_name="legislator_score_fake",
schema=schema,
)
op.drop_table("legislator_score_fake", schema=schema)
# ### end Alembic commands ###
@@ -1,64 +0,0 @@
"""add vote.bill_text_id linkage.
Revision ID: 9c7d4a2e1b10
Revises: 06f833813bd7
Create Date: 2026-04-23 00:00:00.000000
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "9c7d4a2e1b10"
down_revision: str | None = "06f833813bd7"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
op.add_column(
"vote",
sa.Column("bill_text_id", sa.Integer(), nullable=True),
schema=schema,
)
op.create_index(
"ix_vote_bill_text_id",
"vote",
["bill_text_id"],
unique=False,
schema=schema,
)
op.create_foreign_key(
"fk_vote_bill_text_id_bill_text",
"vote",
"bill_text",
["bill_text_id"],
["id"],
source_schema=schema,
referent_schema=schema,
)
def downgrade() -> None:
"""Downgrade."""
op.drop_constraint(
"fk_vote_bill_text_id_bill_text",
"vote",
schema=schema,
type_="foreignkey",
)
op.drop_index("ix_vote_bill_text_id", table_name="vote", schema=schema)
op.drop_column("vote", "bill_text_id", schema=schema)
@@ -1,844 +0,0 @@
"""canonical vote context v3.
Revision ID: 1f8c0e7a9d21
Revises: 9c7d4a2e1b10
Create Date: 2026-04-25 00:00:00.000000
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
revision: str = "1f8c0e7a9d21"
down_revision: str | None = "9c7d4a2e1b10"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
op.create_table(
"ingest_run",
sa.Column("started_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("completed_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("git_sha", sa.String(), nullable=True),
sa.Column("classifier_version", sa.String(), nullable=True),
sa.Column("source_snapshot_label", sa.String(), nullable=True),
sa.Column("status", sa.String(), nullable=False),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_ingest_run")),
schema=schema,
)
op.create_table(
"source_artifact",
sa.Column("source_kind", sa.String(), nullable=False),
sa.Column("congress", sa.Integer(), nullable=False),
sa.Column("chamber", sa.String(), nullable=True),
sa.Column("local_path", sa.String(), nullable=False),
sa.Column("source_url", sa.String(), nullable=True),
sa.Column("sha256", sa.String(), nullable=False),
sa.Column("byte_size", sa.Integer(), nullable=False),
sa.Column("modified_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("ingested_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("ingest_run_id", sa.Integer(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["ingest_run_id"],
[f"{schema}.ingest_run.id"],
name=op.f("fk_source_artifact_ingest_run_id_ingest_run"),
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_source_artifact")),
schema=schema,
)
op.create_index(
"ix_source_artifact_source_kind",
"source_artifact",
["source_kind"],
unique=False,
schema=schema,
)
op.create_index(
"ix_source_artifact_congress",
"source_artifact",
["congress"],
unique=False,
schema=schema,
)
op.create_table(
"score_run",
sa.Column("ingest_run_id", sa.Integer(), nullable=True),
sa.Column("classifier_version", sa.String(), nullable=True),
sa.Column("scoring_version", sa.String(), nullable=True),
sa.Column("included_vote_count", sa.Integer(), nullable=False),
sa.Column("excluded_vote_count", sa.Integer(), nullable=False),
sa.Column("started_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("completed_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["ingest_run_id"],
[f"{schema}.ingest_run.id"],
name=op.f("fk_score_run_ingest_run_id_ingest_run"),
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_score_run")),
schema=schema,
)
op.add_column(
"legislator_score",
sa.Column("score_run_id", sa.Integer(), nullable=True),
schema=schema,
)
op.create_index(
op.f("ix_legislator_score_score_run_id"),
"legislator_score",
["score_run_id"],
unique=False,
schema=schema,
)
op.create_foreign_key(
op.f("fk_legislator_score_score_run_id_score_run"),
"legislator_score",
"score_run",
["score_run_id"],
["id"],
source_schema=schema,
referent_schema=schema,
ondelete="CASCADE",
)
op.add_column(
"bill_text",
sa.Column("source_datetime_raw", sa.String(), nullable=True),
schema=schema,
)
op.add_column(
"bill_text", sa.Column("text_url_xml", sa.String(), nullable=True), schema=schema
)
op.add_column(
"bill_text", sa.Column("text_url_pdf", sa.String(), nullable=True), schema=schema
)
op.add_column(
"bill_text",
sa.Column("text_url_html", sa.String(), nullable=True),
schema=schema,
)
op.add_column(
"bill_text",
sa.Column("source_artifact_id", sa.Integer(), nullable=True),
schema=schema,
)
op.create_foreign_key(
op.f("fk_bill_text_source_artifact_id_source_artifact"),
"bill_text",
"source_artifact",
["source_artifact_id"],
["id"],
source_schema=schema,
referent_schema=schema,
ondelete="SET NULL",
)
op.create_table(
"bill_action",
sa.Column("bill_id", sa.Integer(), nullable=False),
sa.Column("sequence", sa.Integer(), nullable=False),
sa.Column("action_date", sa.Date(), nullable=False),
sa.Column("action_time", sa.String(), nullable=True),
sa.Column("action_text", sa.String(), nullable=False),
sa.Column("action_type", sa.String(), nullable=True),
sa.Column("action_code", sa.String(), nullable=True),
sa.Column("source_system_code", sa.String(), nullable=True),
sa.Column("source_system_name", sa.String(), nullable=True),
sa.Column("source_artifact_id", sa.Integer(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["bill_id"],
[f"{schema}.bill.id"],
name=op.f("fk_bill_action_bill_id_bill"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["source_artifact_id"],
[f"{schema}.source_artifact.id"],
name=op.f("fk_bill_action_source_artifact_id_source_artifact"),
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_bill_action")),
sa.UniqueConstraint("bill_id", "sequence", name="uq_bill_action_bill_id_sequence"),
schema=schema,
)
op.create_table(
"bill_action_recorded_vote",
sa.Column("bill_action_id", sa.Integer(), nullable=False),
sa.Column("congress", sa.Integer(), nullable=False),
sa.Column("chamber", sa.String(), nullable=False),
sa.Column("session_number", sa.Integer(), nullable=False),
sa.Column("roll_number", sa.Integer(), nullable=False),
sa.Column("vote_datetime", sa.DateTime(timezone=True), nullable=True),
sa.Column("vote_url", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["bill_action_id"],
[f"{schema}.bill_action.id"],
name=op.f("fk_bill_action_recorded_vote_bill_action_id_bill_action"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_bill_action_recorded_vote")),
sa.UniqueConstraint(
"bill_action_id",
"congress",
"chamber",
"session_number",
"roll_number",
name="uq_bill_action_recorded_vote_match_key",
),
schema=schema,
)
op.create_table(
"bill_relation",
sa.Column("bill_id", sa.Integer(), nullable=False),
sa.Column("related_bill_id", sa.Integer(), nullable=False),
sa.Column("relationship_type", sa.String(), nullable=False),
sa.Column("identified_by", sa.String(), nullable=True),
sa.Column("latest_action_date", sa.Date(), nullable=True),
sa.Column("latest_action_text", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["bill_id"],
[f"{schema}.bill.id"],
name=op.f("fk_bill_relation_bill_id_bill"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["related_bill_id"],
[f"{schema}.bill.id"],
name=op.f("fk_bill_relation_related_bill_id_bill"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_bill_relation")),
schema=schema,
)
op.create_index(
"ix_bill_relation_bill_id",
"bill_relation",
["bill_id"],
unique=False,
schema=schema,
)
op.create_index(
"ix_bill_relation_related_bill_id",
"bill_relation",
["related_bill_id"],
unique=False,
schema=schema,
)
op.create_table(
"amendment",
sa.Column("congress", sa.Integer(), nullable=False),
sa.Column("amendment_type", sa.String(), nullable=False),
sa.Column("number", sa.Integer(), nullable=False),
sa.Column("chamber", sa.String(), nullable=False),
sa.Column("description", sa.String(), nullable=True),
sa.Column("purpose", sa.String(), nullable=True),
sa.Column("amended_bill_id", sa.Integer(), nullable=True),
sa.Column("amended_amendment_id", sa.Integer(), nullable=True),
sa.Column("source_path", sa.String(), nullable=True),
sa.Column("source_artifact_id", sa.Integer(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["amended_amendment_id"],
[f"{schema}.amendment.id"],
name=op.f("fk_amendment_amended_amendment_id_amendment"),
ondelete="SET NULL",
),
sa.ForeignKeyConstraint(
["amended_bill_id"],
[f"{schema}.bill.id"],
name=op.f("fk_amendment_amended_bill_id_bill"),
ondelete="SET NULL",
),
sa.ForeignKeyConstraint(
["source_artifact_id"],
[f"{schema}.source_artifact.id"],
name=op.f("fk_amendment_source_artifact_id_source_artifact"),
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_amendment")),
sa.UniqueConstraint(
"congress",
"amendment_type",
"number",
name="uq_amendment_congress_type_number",
),
schema=schema,
)
op.create_table(
"amendment_action",
sa.Column("amendment_id", sa.Integer(), nullable=False),
sa.Column("sequence", sa.Integer(), nullable=False),
sa.Column("action_date", sa.Date(), nullable=False),
sa.Column("action_time", sa.String(), nullable=True),
sa.Column("action_text", sa.String(), nullable=False),
sa.Column("action_type", sa.String(), nullable=True),
sa.Column("action_code", sa.String(), nullable=True),
sa.Column("source_system_code", sa.String(), nullable=True),
sa.Column("source_system_name", sa.String(), nullable=True),
sa.Column("source_artifact_id", sa.Integer(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["amendment_id"],
[f"{schema}.amendment.id"],
name=op.f("fk_amendment_action_amendment_id_amendment"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["source_artifact_id"],
[f"{schema}.source_artifact.id"],
name=op.f("fk_amendment_action_source_artifact_id_source_artifact"),
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_amendment_action")),
sa.UniqueConstraint(
"amendment_id",
"sequence",
name="uq_amendment_action_amendment_id_sequence",
),
schema=schema,
)
op.create_table(
"amendment_action_recorded_vote",
sa.Column("amendment_action_id", sa.Integer(), nullable=False),
sa.Column("congress", sa.Integer(), nullable=False),
sa.Column("chamber", sa.String(), nullable=False),
sa.Column("session_number", sa.Integer(), nullable=False),
sa.Column("roll_number", sa.Integer(), nullable=False),
sa.Column("vote_datetime", sa.DateTime(timezone=True), nullable=True),
sa.Column("vote_url", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["amendment_action_id"],
[f"{schema}.amendment_action.id"],
name=op.f(
"fk_amendment_action_recorded_vote_amendment_action_id_amendment_action"
),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_amendment_action_recorded_vote")),
sa.UniqueConstraint(
"amendment_action_id",
"congress",
"chamber",
"session_number",
"roll_number",
name="uq_amendment_action_recorded_vote_match_key",
),
schema=schema,
)
op.drop_constraint(
"uq_vote_congress_chamber_session_number",
"vote",
schema=schema,
type_="unique",
)
op.alter_column("vote", "session", new_column_name="session_year", schema=schema)
op.alter_column("vote", "number", new_column_name="roll_number", schema=schema)
op.add_column("vote", sa.Column("session_number", sa.Integer(), nullable=True), schema=schema)
op.add_column(
"vote",
sa.Column("vote_datetime", sa.DateTime(timezone=True), nullable=True),
schema=schema,
)
op.add_column(
"vote", sa.Column("raw_vote_source_url", sa.String(), nullable=True), schema=schema
)
op.add_column(
"vote",
sa.Column("raw_bill_ref", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
schema=schema,
)
op.add_column(
"vote",
sa.Column(
"raw_amendment_ref",
postgresql.JSONB(astext_type=sa.Text()),
nullable=True,
),
schema=schema,
)
op.add_column(
"vote",
sa.Column(
"raw_nomination_ref",
postgresql.JSONB(astext_type=sa.Text()),
nullable=True,
),
schema=schema,
)
op.add_column(
"vote",
sa.Column(
"raw_treaty_ref",
postgresql.JSONB(astext_type=sa.Text()),
nullable=True,
),
schema=schema,
)
op.add_column(
"vote",
sa.Column("raw_vote_source_artifact_id", sa.Integer(), nullable=True),
schema=schema,
)
op.create_foreign_key(
op.f("fk_vote_raw_vote_source_artifact_id_source_artifact"),
"vote",
"source_artifact",
["raw_vote_source_artifact_id"],
["id"],
source_schema=schema,
referent_schema=schema,
ondelete="SET NULL",
)
op.execute(
sa.text(
f"""
UPDATE {schema}.vote
SET session_number = session_year - (((congress - 1) * 2) + 1789) + 1
"""
)
)
op.alter_column("vote", "session_number", nullable=False, schema=schema)
op.create_unique_constraint(
"uq_vote_congress_chamber_session_number_roll_number",
"vote",
["congress", "chamber", "session_number", "roll_number"],
schema=schema,
)
op.drop_constraint(
op.f("fk_vote_bill_id_bill"),
"vote",
schema=schema,
type_="foreignkey",
)
op.drop_constraint(
"fk_vote_bill_text_id_bill_text",
"vote",
schema=schema,
type_="foreignkey",
)
op.drop_index("ix_vote_bill_text_id", table_name="vote", schema=schema)
op.drop_column("vote", "bill_id", schema=schema)
op.drop_column("vote", "bill_text_id", schema=schema)
op.create_table(
"vote_action_match",
sa.Column("vote_id", sa.Integer(), nullable=False),
sa.Column("action_scope", sa.String(), nullable=False),
sa.Column("bill_action_id", sa.Integer(), nullable=True),
sa.Column("amendment_action_id", sa.Integer(), nullable=True),
sa.Column("is_selected", sa.Boolean(), nullable=False),
sa.Column("match_method", sa.String(), nullable=False),
sa.Column("match_reason", sa.String(), nullable=True),
sa.Column("match_confidence", sa.String(), nullable=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["amendment_action_id"],
[f"{schema}.amendment_action.id"],
name=op.f("fk_vote_action_match_amendment_action_id_amendment_action"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["bill_action_id"],
[f"{schema}.bill_action.id"],
name=op.f("fk_vote_action_match_bill_action_id_bill_action"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["vote_id"],
[f"{schema}.vote.id"],
name=op.f("fk_vote_action_match_vote_id_vote"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_vote_action_match")),
schema=schema,
)
op.create_index(
"ix_vote_action_match_vote_id",
"vote_action_match",
["vote_id"],
unique=False,
schema=schema,
)
op.create_index(
"uq_vote_action_match_selected_vote_id",
"vote_action_match",
["vote_id"],
unique=True,
schema=schema,
postgresql_where=sa.text("is_selected"),
)
op.create_table(
"vote_classification",
sa.Column("vote_id", sa.Integer(), nullable=False),
sa.Column("subject_type", sa.String(), nullable=False),
sa.Column("measure_type", sa.String(), nullable=True),
sa.Column("measure_subtype", sa.String(), nullable=True),
sa.Column("measure_function", sa.String(), nullable=True),
sa.Column("vote_relationship", sa.String(), nullable=False),
sa.Column("is_legislation_related", sa.Boolean(), nullable=False),
sa.Column("is_direct_vote_on_legislative_text", sa.Boolean(), nullable=False),
sa.Column("is_substantive_policy_vote", sa.Boolean(), nullable=False),
sa.Column("is_lawmaking_vehicle", sa.Boolean(), nullable=False),
sa.Column("is_special_rule", sa.Boolean(), nullable=False),
sa.Column("classification_method", sa.String(), nullable=False),
sa.Column("classification_confidence_reason", sa.String(), nullable=True),
sa.Column("confidence", sa.String(), nullable=False),
sa.Column("classified_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("classification_version", sa.String(), nullable=False),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["vote_id"],
[f"{schema}.vote.id"],
name=op.f("fk_vote_classification_vote_id_vote"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_vote_classification")),
sa.UniqueConstraint("vote_id", name=op.f("uq_vote_classification_vote_id")),
schema=schema,
)
op.create_index(
"ix_vote_classification_subject_type",
"vote_classification",
["subject_type"],
unique=False,
schema=schema,
)
op.create_table(
"vote_measure_link",
sa.Column("vote_id", sa.Integer(), nullable=False),
sa.Column("measure_id", sa.Integer(), nullable=False),
sa.Column("role", sa.String(), nullable=False),
sa.Column("source", sa.String(), nullable=False),
sa.Column("confidence", sa.String(), nullable=False),
sa.Column("notes", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["measure_id"],
[f"{schema}.bill.id"],
name=op.f("fk_vote_measure_link_measure_id_bill"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_vote_measure_link")),
schema=schema,
)
op.create_index(
"ix_vote_measure_link_vote_id",
"vote_measure_link",
["vote_id"],
unique=False,
schema=schema,
)
op.create_foreign_key(
op.f("fk_vote_measure_link_vote_id_vote"),
"vote_measure_link",
"vote",
["vote_id"],
["id"],
source_schema=schema,
referent_schema=schema,
ondelete="CASCADE",
)
op.create_table(
"vote_text_target",
sa.Column("vote_id", sa.Integer(), nullable=False),
sa.Column("text_target_type", sa.String(), nullable=False),
sa.Column("voted_text_version_id", sa.Integer(), nullable=True),
sa.Column("resulting_text_version_id", sa.Integer(), nullable=True),
sa.Column("related_amendment_id", sa.Integer(), nullable=True),
sa.Column("text_target_basis", sa.String(), nullable=False),
sa.Column("text_resolution_method", sa.String(), nullable=False),
sa.Column("text_resolution_confidence_reason", sa.String(), nullable=True),
sa.Column("confidence", sa.String(), nullable=False),
sa.Column("notes", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["related_amendment_id"],
[f"{schema}.amendment.id"],
name=op.f("fk_vote_text_target_related_amendment_id_amendment"),
ondelete="SET NULL",
),
sa.ForeignKeyConstraint(
["resulting_text_version_id"],
[f"{schema}.bill_text.id"],
name=op.f("fk_vote_text_target_resulting_text_version_id_bill_text"),
ondelete="SET NULL",
),
sa.ForeignKeyConstraint(
["vote_id"],
[f"{schema}.vote.id"],
name=op.f("fk_vote_text_target_vote_id_vote"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["voted_text_version_id"],
[f"{schema}.bill_text.id"],
name=op.f("fk_vote_text_target_voted_text_version_id_bill_text"),
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_vote_text_target")),
sa.UniqueConstraint("vote_id", name=op.f("uq_vote_text_target_vote_id")),
schema=schema,
)
op.create_table(
"vote_position_meaning",
sa.Column("vote_id", sa.Integer(), nullable=False),
sa.Column("yea_effect", sa.String(), nullable=False),
sa.Column("nay_effect", sa.String(), nullable=False),
sa.Column("present_effect", sa.String(), nullable=False),
sa.Column("polarity_confidence", sa.String(), nullable=False),
sa.Column("polarity_method", sa.String(), nullable=False),
sa.Column("notes", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["vote_id"],
[f"{schema}.vote.id"],
name=op.f("fk_vote_position_meaning_vote_id_vote"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_vote_position_meaning")),
sa.UniqueConstraint("vote_id", name=op.f("uq_vote_position_meaning_vote_id")),
schema=schema,
)
op.create_table(
"vote_context_audit",
sa.Column("vote_id", sa.Integer(), nullable=False),
sa.Column("step", sa.String(), nullable=False),
sa.Column("message", sa.String(), nullable=False),
sa.Column("severity", sa.String(), nullable=False),
sa.Column("source_path", sa.String(), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["vote_id"],
[f"{schema}.vote.id"],
name=op.f("fk_vote_context_audit_vote_id_vote"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_vote_context_audit")),
schema=schema,
)
op.create_index(
"ix_vote_context_audit_vote_id",
"vote_context_audit",
["vote_id"],
unique=False,
schema=schema,
)
def downgrade() -> None:
"""Downgrade."""
raise NotImplementedError("Downgrade is not supported for canonical vote context v3.")
@@ -1,203 +0,0 @@
"""add supporting indexes for congress vote context and scoring.
Revision ID: a7b91c4e2d30
Revises: 1f8c0e7a9d21
Create Date: 2026-04-26 00:00:00.000000
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
revision: str = "a7b91c4e2d30"
down_revision: str | None = "1f8c0e7a9d21"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def _dedupe_source_artifacts() -> None:
op.execute(
sa.text(
f"""
CREATE TEMP TABLE tmp_source_artifact_dups AS
WITH ranked AS (
SELECT
id,
first_value(id) OVER (
PARTITION BY ingest_run_id, local_path, sha256
ORDER BY id
) AS keep_id,
row_number() OVER (
PARTITION BY ingest_run_id, local_path, sha256
ORDER BY id
) AS rn
FROM {schema}.source_artifact
WHERE ingest_run_id IS NOT NULL
)
SELECT id, keep_id
FROM ranked
WHERE rn > 1
"""
)
)
for table_name, column_name in (
("bill_text", "source_artifact_id"),
("bill_action", "source_artifact_id"),
("amendment", "source_artifact_id"),
("amendment_action", "source_artifact_id"),
("vote", "raw_vote_source_artifact_id"),
):
op.execute(
sa.text(
f"""
UPDATE {schema}.{table_name} AS target
SET {column_name} = d.keep_id
FROM tmp_source_artifact_dups AS d
WHERE target.{column_name} = d.id
"""
)
)
op.execute(
sa.text(
f"""
DELETE FROM {schema}.source_artifact AS source_artifact
USING tmp_source_artifact_dups AS d
WHERE source_artifact.id = d.id
"""
)
)
op.execute(sa.text("DROP TABLE tmp_source_artifact_dups"))
def upgrade() -> None:
"""Upgrade."""
_dedupe_source_artifacts()
op.create_index(
"uq_source_artifact_ingest_identity",
"source_artifact",
["ingest_run_id", "local_path", "sha256"],
unique=True,
schema=schema,
)
op.create_index(
"ix_bill_action_recorded_vote_match_tuple",
"bill_action_recorded_vote",
["congress", "chamber", "session_number", "roll_number"],
unique=False,
schema=schema,
)
op.create_index(
"ix_amendment_action_recorded_vote_match_tuple",
"amendment_action_recorded_vote",
["congress", "chamber", "session_number", "roll_number"],
unique=False,
schema=schema,
)
op.create_index(
"ix_vote_classification_eligible_vote_id",
"vote_classification",
["vote_id"],
unique=False,
schema=schema,
postgresql_where=sa.text(
"subject_type = 'measure' "
"AND vote_relationship = 'direct_text_vote' "
"AND is_direct_vote_on_legislative_text "
"AND is_substantive_policy_vote "
"AND NOT is_special_rule"
),
)
op.create_index(
"ix_vote_measure_link_vote_id_role",
"vote_measure_link",
["vote_id", "role"],
unique=False,
schema=schema,
)
op.create_index(
"ix_vote_measure_link_measure_id_role",
"vote_measure_link",
["measure_id", "role"],
unique=False,
schema=schema,
)
op.create_index(
"ix_vote_text_target_voted_text_version_id",
"vote_text_target",
["voted_text_version_id"],
unique=False,
schema=schema,
postgresql_where=sa.text("voted_text_version_id IS NOT NULL"),
)
op.create_index(
"ix_vote_context_audit_severity_vote_id",
"vote_context_audit",
["severity", "vote_id"],
unique=False,
schema=schema,
)
op.create_index(
"ix_legislator_current_chamber",
"legislator",
["current_chamber"],
unique=False,
schema=schema,
)
def downgrade() -> None:
"""Downgrade."""
op.drop_index("ix_legislator_current_chamber", table_name="legislator", schema=schema)
op.drop_index(
"ix_vote_context_audit_severity_vote_id",
table_name="vote_context_audit",
schema=schema,
)
op.drop_index(
"ix_vote_text_target_voted_text_version_id",
table_name="vote_text_target",
schema=schema,
)
op.drop_index(
"ix_vote_measure_link_measure_id_role",
table_name="vote_measure_link",
schema=schema,
)
op.drop_index(
"ix_vote_measure_link_vote_id_role",
table_name="vote_measure_link",
schema=schema,
)
op.drop_index(
"ix_vote_classification_eligible_vote_id",
table_name="vote_classification",
schema=schema,
)
op.drop_index(
"ix_amendment_action_recorded_vote_match_tuple",
table_name="amendment_action_recorded_vote",
schema=schema,
)
op.drop_index(
"ix_bill_action_recorded_vote_match_tuple",
table_name="bill_action_recorded_vote",
schema=schema,
)
op.drop_index(
"uq_source_artifact_ingest_identity",
table_name="source_artifact",
schema=schema,
)
@@ -1,66 +0,0 @@
"""adding PostTopic.
Revision ID: 032e26bbfcb5
Revises: a7b91c4e2d30
Create Date: 2026-04-26 14:34:35.688341
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "032e26bbfcb5"
down_revision: str | None = "a7b91c4e2d30"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"post_topic",
sa.Column("post_id", sa.BigInteger(), nullable=False),
sa.Column("topic_id", sa.SmallInteger(), nullable=False),
sa.Column("topic_label", sa.String(), nullable=True),
sa.Column("model_version", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_post_topic")),
schema=schema,
)
op.create_index(
"ix_post_topic_post_id", "post_topic", ["post_id"], unique=False, schema=schema
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index("ix_post_topic_post_id", table_name="post_topic", schema=schema)
op.drop_table("post_topic", schema=schema)
# ### end Alembic commands ###
@@ -1,35 +0,0 @@
"""adding PG Vector.
Revision ID: b9360b0b0c22
Revises: 032e26bbfcb5
Create Date: 2026-04-26 14:35:08.770128
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from alembic import op
from pipelines.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "b9360b0b0c22"
down_revision: str | None = "032e26bbfcb5"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
op.execute("CREATE EXTENSION IF NOT EXISTS vector")
def downgrade() -> None:
"""Downgrade."""
op.execute("DROP EXTENSION IF EXISTS vector")
+1 -1
View File
@@ -1 +1 @@
"""Prompt benchmarking system for evaluating LLMs via vLLM.""" """Init."""
-39
View File
@@ -1,7 +1,6 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from os import getenv
from pathlib import Path from pathlib import Path
import tomllib import tomllib
@@ -69,38 +68,6 @@ class BenchmarkConfig:
return cls(**raw) return cls(**raw)
@dataclass
class OpenAIConfig:
"""OpenAI API configuration."""
api_key: str
openai_project_id: str
openai_chat_completions_url: str
model: str
timeout_seconds: int
@classmethod
def from_toml(cls, config_path: Path) -> OpenAIConfig:
"""Load OpenAI config from a TOML file."""
raw = tomllib.loads(config_path.read_text()).get("openai", {})
api_key = getenv("CLOSEDAI_TOKEN")
if not api_key:
message = "CLOSEDAI_TOKEN is required"
raise KeyError(message)
return cls(
api_key=api_key,
openai_project_id=raw.get(
"openai_project_id", "proj_fQBPEXFgnS87Fk6wZwploFwE"
),
openai_chat_completions_url=raw.get(
"openai_chat_completions_url",
"https://api.openai.com/v1/chat/completions",
),
model=raw.get("model", "gpt-5.4-mini"),
timeout_seconds=raw.get("timeout_seconds", 60),
)
def get_config_dir() -> Path: def get_config_dir() -> Path:
"""Get the path to the config directory.""" """Get the path to the config directory."""
return Path(__file__).resolve().parents[2] / "config" return Path(__file__).resolve().parents[2] / "config"
@@ -111,12 +78,6 @@ def default_config_path() -> Path:
return get_config_dir() / "config.toml" return get_config_dir() / "config.toml"
def get_openai_config(config_path: Path | None = None) -> OpenAIConfig:
if config_path is None:
config_path = default_config_path()
return OpenAIConfig.from_toml(config_path)
def get_finetune_config(config_path: Path | None = None) -> FinetuneConfig: def get_finetune_config(config_path: Path | None = None) -> FinetuneConfig:
if config_path is None: if config_path is None:
config_path = default_config_path() config_path = default_config_path()
-235
View File
@@ -1,235 +0,0 @@
"""Docker container lifecycle management for BERTopic jobs on Jeeves."""
from __future__ import annotations
import logging
import os
import subprocess
from pathlib import Path
from typing import Annotated, Literal
import typer
logger = logging.getLogger(__name__)
JOBMode = Literal["train", "infer"]
IMAGE_NAME = "bert-topic:latest"
REPO_DIR = Path(__file__).resolve().parents[3]
DEFAULT_CACHE_ROOT = Path("/zfs/storage/main/ds_thing/models/bert_topic")
DEFAULT_POSTGRES_SOCKET_DIR = Path("/run/postgresql")
DB_ENV_VARS = (
"DATA_SCIENCE_DEV_DB",
"DATA_SCIENCE_DEV_HOST",
"DATA_SCIENCE_DEV_PORT",
"DATA_SCIENCE_DEV_USER",
"DATA_SCIENCE_DEV_PASSWORD",
)
app = typer.Typer(help="BERTopic container management.")
def _container_name(mode: JOBMode) -> str:
"""Return the Docker container name for the selected BERTopic job."""
return f"bert-topic-{mode}"
def _module_name(mode: JOBMode) -> str:
"""Return the Python module to run inside the container."""
return f"pipelines.bert_topic.{mode}"
def _env_args(*, use_postgres_socket: bool) -> list[str]:
"""Pass through database environment variables from the host shell."""
required = [
"DATA_SCIENCE_DEV_DB",
"DATA_SCIENCE_DEV_PORT",
"DATA_SCIENCE_DEV_USER",
]
if not use_postgres_socket:
required.append("DATA_SCIENCE_DEV_HOST")
missing = [name for name in required if not os.getenv(name)]
if missing:
message = "Missing required database environment variables: " + ", ".join(
missing
)
raise RuntimeError(message)
args: list[str] = []
if use_postgres_socket:
args.extend(["-e", f"DATA_SCIENCE_DEV_HOST={DEFAULT_POSTGRES_SOCKET_DIR}"])
for name in DB_ENV_VARS:
if use_postgres_socket and name == "DATA_SCIENCE_DEV_HOST":
continue
if os.getenv(name):
args.extend(["-e", name])
return args
def build_image() -> None:
"""Build the BERTopic Docker image."""
dockerfile = REPO_DIR / "pipelines/containers/docker_files/Dockerfile.bert_topic"
logger.info("Building BERTopic image: %s", IMAGE_NAME)
result = subprocess.run(
[
"docker",
"build",
"--network",
"host",
"-f",
str(dockerfile),
"-t",
IMAGE_NAME,
str(REPO_DIR),
],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
message = (
"Failed to build BERTopic image. "
f"docker build stderr:\n{result.stderr.strip()}"
)
raise RuntimeError(message)
logger.info("Image built: %s", IMAGE_NAME)
def stop_job(*, mode: JOBMode) -> None:
"""Stop and remove the BERTopic container for the selected mode."""
container_name = _container_name(mode)
logger.info("Stopping BERTopic container: %s", container_name)
subprocess.run(["docker", "stop", container_name], capture_output=True, check=False)
subprocess.run(
["docker", "rm", "-f", container_name], capture_output=True, check=False
)
def start_job(
*,
mode: JOBMode,
cache_root: Path = DEFAULT_CACHE_ROOT,
postgres_socket_dir: Path = DEFAULT_POSTGRES_SOCKET_DIR,
detach: bool = False,
) -> None:
"""Run BERTopic training or inference in Docker on Jeeves."""
cache_root = cache_root.resolve()
cache_root.mkdir(parents=True, exist_ok=True)
postgres_socket_dir = postgres_socket_dir.resolve()
stop_job(mode=mode)
use_postgres_socket = postgres_socket_dir.exists()
command = [
"docker",
"run",
"--name",
_container_name(mode),
"--ipc=host",
"-v",
f"{cache_root}:/cache",
*_env_args(use_postgres_socket=use_postgres_socket),
IMAGE_NAME,
_module_name(mode),
]
if use_postgres_socket:
command[7:7] = ["-v", f"{postgres_socket_dir}:{DEFAULT_POSTGRES_SOCKET_DIR}"]
if detach:
command.insert(2, "-d")
logger.info("Starting BERTopic %s container", mode)
logger.info(" Cache root: %s", cache_root)
if use_postgres_socket:
logger.info(" Postgres socket: %s", postgres_socket_dir)
result = subprocess.run(command, text=True, capture_output=detach, check=False)
if result.returncode != 0:
detail = (
result.stderr.strip() if result.stderr else f"exit code {result.returncode}"
)
raise RuntimeError(f"BERTopic container failed to start: {detail}")
if detach:
logger.info("Container started: %s", result.stdout.strip()[:12])
else:
logger.info("BERTopic %s run complete", mode)
def logs_job(*, mode: JOBMode) -> str | None:
"""Return recent logs from the BERTopic container, or None if absent."""
result = subprocess.run(
["docker", "logs", "--tail", "100", _container_name(mode)],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
return None
return result.stdout + result.stderr
@app.command()
def build(
log_level: Annotated[str, typer.Option(help="Log level")] = "INFO",
) -> None:
"""Build the BERTopic Docker image."""
logging.basicConfig(
level=log_level,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
build_image()
@app.command("run")
def run_job_command(
mode: Annotated[JOBMode, typer.Option(help="Which BERTopic job to run")] = "train",
cache_root: Annotated[
Path, typer.Option(help="Host path mounted to /cache for model and HF cache")
] = DEFAULT_CACHE_ROOT,
postgres_socket_dir: Annotated[
Path, typer.Option(help="Host Postgres socket directory to mount into the container")
] = DEFAULT_POSTGRES_SOCKET_DIR,
detach: Annotated[
bool, typer.Option(help="Start the container in the background")
] = False,
log_level: Annotated[str, typer.Option(help="Log level")] = "INFO",
) -> None:
"""Run BERTopic training or inference inside Docker."""
logging.basicConfig(
level=log_level,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
start_job(
mode=mode,
cache_root=cache_root,
postgres_socket_dir=postgres_socket_dir,
detach=detach,
)
@app.command("stop")
def stop_job_command(
mode: Annotated[
JOBMode, typer.Option(help="Which BERTopic container to stop")
] = "train",
) -> None:
"""Stop and remove the BERTopic container."""
stop_job(mode=mode)
@app.command("logs")
def logs_job_command(
mode: Annotated[
JOBMode, typer.Option(help="Which BERTopic container logs to show")
] = "train",
) -> None:
"""Show recent logs from the BERTopic container."""
output = logs_job(mode=mode)
if output is None:
typer.echo(f"No BERTopic container found for mode={mode}.")
raise typer.Exit(code=1)
typer.echo(output)
def cli() -> None:
"""Typer entry point."""
app()
if __name__ == "__main__":
cli()
@@ -1,38 +0,0 @@
FROM python:3.12-bookworm
ENV DEBIAN_FRONTEND=noninteractive
ENV PYTHONUNBUFFERED=1
ENV PIP_NO_CACHE_DIR=1
RUN apt-get update && apt-get install -y \
build-essential \
gcc \
g++ \
git \
libgomp1 \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY pipelines ./pipelines
RUN python -m pip install --upgrade pip setuptools wheel && \
python -m pip install \
torch \
--index-url https://download.pytorch.org/whl/cpu && \
python -m pip install \
typer \
sqlalchemy \
bertopic \
sentence-transformers \
scikit-learn \
pandas \
numpy \
"psycopg[binary]"
ENV HF_HOME=/cache/huggingface
ENV TRANSFORMERS_CACHE=/cache/huggingface
ENTRYPOINT ["python", "-m"]
CMD ["pipelines.bert_topic.train"]
@@ -1,11 +0,0 @@
FROM ghcr.io/unslothai/unsloth:latest
RUN pip install --no-cache-dir typer
WORKDIR /workspace
COPY python/prompt_bench/finetune.py python/prompt_bench/finetune.py
COPY config/prompts/summarization_prompts.toml config/prompts/summarization_prompts.toml
COPY python/prompt_bench/__init__.py python/prompt_bench/__init__.py
COPY python/__init__.py python/__init__.py
ENTRYPOINT ["python", "-m", "pipelines.prompt_bench.finetune"]
+671
View File
@@ -0,0 +1,671 @@
"""Ingestion pipeline for loading congress data from unitedstates/congress JSON files.
Loads legislators, bills, votes, vote records, and bill text into the data_science_dev database.
Expects the parent directory to contain congress-tracker/ and congress-legislators/ as siblings.
Usage:
ingest-congress /path/to/parent/
ingest-congress /path/to/parent/ --congress 118
ingest-congress /path/to/parent/ --congress 118 --only bills
"""
from __future__ import annotations
import logging
from pathlib import Path # noqa: TC003 needed at runtime for typer CLI argument
from typing import TYPE_CHECKING, Annotated
import orjson
import typer
import yaml
from sqlalchemy import select
from sqlalchemy.orm import Session
from pipelines.pipelines.common import configure_logger
from pipelines.orm.common import get_postgres_engine
from pipelines.orm.data_science_dev.congress import (
Bill,
BillText,
Legislator,
LegislatorSocialMedia,
Vote,
VoteRecord,
)
if TYPE_CHECKING:
from collections.abc import Iterator
from sqlalchemy.engine import Engine
logger = logging.getLogger(__name__)
BATCH_SIZE = 10_000
app = typer.Typer(help="Ingest unitedstates/congress data into data_science_dev.")
@app.command()
def main(
parent_dir: Annotated[
Path,
typer.Argument(
help="Parent directory containing congress-tracker/ and congress-legislators/"
),
],
congress: Annotated[
int | None, typer.Option(help="Only ingest a specific congress number")
] = None,
only: Annotated[
str | None,
typer.Option(
help="Only run a specific step: legislators, social-media, bills, votes, bill-text"
),
] = None,
) -> None:
"""Ingest congress data from unitedstates/congress JSON files."""
configure_logger(level="INFO")
data_dir = parent_dir / "congress-tracker/congress/data/"
legislators_dir = parent_dir / "congress-legislators"
if not data_dir.is_dir():
typer.echo(f"Expected congress-tracker/ directory: {data_dir}", err=True)
raise typer.Exit(code=1)
if not legislators_dir.is_dir():
typer.echo(
f"Expected congress-legislators/ directory: {legislators_dir}", err=True
)
raise typer.Exit(code=1)
engine = get_postgres_engine(name="DATA_SCIENCE_DEV")
congress_dirs = _resolve_congress_dirs(data_dir, congress)
if not congress_dirs:
typer.echo("No congress directories found.", err=True)
raise typer.Exit(code=1)
logger.info("Found %d congress directories to process", len(congress_dirs))
steps: dict[str, tuple] = {
"legislators": (ingest_legislators, (engine, legislators_dir)),
"legislators-social-media": (ingest_social_media, (engine, legislators_dir)),
"bills": (ingest_bills, (engine, congress_dirs)),
"votes": (ingest_votes, (engine, congress_dirs)),
"bill-text": (ingest_bill_text, (engine, congress_dirs)),
}
if only:
if only not in steps:
typer.echo(
f"Unknown step: {only}. Choose from: {', '.join(steps)}", err=True
)
raise typer.Exit(code=1)
steps = {only: steps[only]}
for step_name, (step_func, step_args) in steps.items():
logger.info("=== Starting step: %s ===", step_name)
step_func(*step_args)
logger.info("=== Finished step: %s ===", step_name)
logger.info("ingest-congress done")
def _resolve_congress_dirs(data_dir: Path, congress: int | None) -> list[Path]:
"""Find congress number directories under data_dir."""
if congress is not None:
target = data_dir / str(congress)
return [target] if target.is_dir() else []
return sorted(
path for path in data_dir.iterdir() if path.is_dir() and path.name.isdigit()
)
def _flush_batch(session: Session, batch: list[object], label: str) -> int:
"""Add a batch of ORM objects to the session and commit. Returns count added."""
if not batch:
return 0
session.add_all(batch)
session.commit()
count = len(batch)
logger.info("Committed %d %s", count, label)
batch.clear()
return count
# ---------------------------------------------------------------------------
# Legislators — loaded from congress-legislators YAML files
# ---------------------------------------------------------------------------
def ingest_legislators(engine: Engine, legislators_dir: Path) -> None:
"""Load legislators from congress-legislators YAML files."""
legislators_data = _load_legislators_yaml(legislators_dir)
logger.info("Loaded %d legislators from YAML files", len(legislators_data))
with Session(engine) as session:
existing_legislators = {
legislator.bioguide_id: legislator
for legislator in session.scalars(select(Legislator)).all()
}
logger.info("Found %d existing legislators in DB", len(existing_legislators))
total_inserted = 0
total_updated = 0
for entry in legislators_data:
bioguide_id = entry.get("id", {}).get("bioguide")
if not bioguide_id:
continue
fields = _parse_legislator(entry)
if existing := existing_legislators.get(bioguide_id):
changed = False
for field, value in fields.items():
if value is not None and getattr(existing, field) != value:
setattr(existing, field, value)
changed = True
if changed:
total_updated += 1
else:
session.add(Legislator(bioguide_id=bioguide_id, **fields))
total_inserted += 1
session.commit()
logger.info(
"Inserted %d new legislators, updated %d existing",
total_inserted,
total_updated,
)
def _load_legislators_yaml(legislators_dir: Path) -> list[dict]:
"""Load and combine legislators-current.yaml and legislators-historical.yaml."""
legislators: list[dict] = []
for filename in ("legislators-current.yaml", "legislators-historical.yaml"):
path = legislators_dir / filename
if not path.exists():
logger.warning("Legislators file not found: %s", path)
continue
with path.open() as file:
data = yaml.safe_load(file)
if isinstance(data, list):
legislators.extend(data)
return legislators
def _parse_legislator(entry: dict) -> dict:
"""Extract Legislator fields from a congress-legislators YAML entry."""
ids = entry.get("id", {})
name = entry.get("name", {})
bio = entry.get("bio", {})
terms = entry.get("terms", [])
latest_term = terms[-1] if terms else {}
fec_ids = ids.get("fec")
fec_ids_joined = ",".join(fec_ids) if isinstance(fec_ids, list) else fec_ids
chamber = latest_term.get("type")
chamber_normalized = {"rep": "House", "sen": "Senate"}.get(chamber, chamber)
return {
"thomas_id": ids.get("thomas"),
"lis_id": ids.get("lis"),
"govtrack_id": ids.get("govtrack"),
"opensecrets_id": ids.get("opensecrets"),
"fec_ids": fec_ids_joined,
"first_name": name.get("first"),
"last_name": name.get("last"),
"official_full_name": name.get("official_full"),
"nickname": name.get("nickname"),
"birthday": bio.get("birthday"),
"gender": bio.get("gender"),
"current_party": latest_term.get("party"),
"current_state": latest_term.get("state"),
"current_district": latest_term.get("district"),
"current_chamber": chamber_normalized,
}
# ---------------------------------------------------------------------------
# Social Media — loaded from legislators-social-media.yaml
# ---------------------------------------------------------------------------
SOCIAL_MEDIA_PLATFORMS = {
"twitter": "https://twitter.com/{account}",
"facebook": "https://facebook.com/{account}",
"youtube": "https://youtube.com/{account}",
"instagram": "https://instagram.com/{account}",
"mastodon": None,
}
def ingest_social_media(engine: Engine, legislators_dir: Path) -> None:
"""Load social media accounts from legislators-social-media.yaml."""
social_media_path = legislators_dir / "legislators-social-media.yaml"
if not social_media_path.exists():
logger.warning("Social media file not found: %s", social_media_path)
return
with social_media_path.open() as file:
social_media_data = yaml.safe_load(file)
if not isinstance(social_media_data, list):
logger.warning("Unexpected format in %s", social_media_path)
return
logger.info(
"Loaded %d entries from legislators-social-media.yaml", len(social_media_data)
)
with Session(engine) as session:
legislator_map = _build_legislator_map(session)
existing_accounts = {
(account.legislator_id, account.platform)
for account in session.scalars(select(LegislatorSocialMedia)).all()
}
logger.info(
"Found %d existing social media accounts in DB", len(existing_accounts)
)
total_inserted = 0
total_updated = 0
for entry in social_media_data:
bioguide_id = entry.get("id", {}).get("bioguide")
if not bioguide_id:
continue
legislator_id = legislator_map.get(bioguide_id)
if legislator_id is None:
continue
social = entry.get("social", {})
for platform, url_template in SOCIAL_MEDIA_PLATFORMS.items():
account_name = social.get(platform)
if not account_name:
continue
url = (
url_template.format(account=account_name) if url_template else None
)
if (legislator_id, platform) in existing_accounts:
total_updated += 1
else:
session.add(
LegislatorSocialMedia(
legislator_id=legislator_id,
platform=platform,
account_name=str(account_name),
url=url,
source="https://github.com/unitedstates/congress-legislators",
)
)
existing_accounts.add((legislator_id, platform))
total_inserted += 1
session.commit()
logger.info(
"Inserted %d new social media accounts, updated %d existing",
total_inserted,
total_updated,
)
def _iter_voters(position_group: object) -> Iterator[dict]:
"""Yield voter dicts from a vote position group (handles list, single dict, or string)."""
if isinstance(position_group, dict):
yield position_group
elif isinstance(position_group, list):
for voter in position_group:
if isinstance(voter, dict):
yield voter
# ---------------------------------------------------------------------------
# Bills
# ---------------------------------------------------------------------------
def ingest_bills(engine: Engine, congress_dirs: list[Path]) -> None:
"""Load bill data.json files."""
with Session(engine) as session:
existing_bills = {
(bill.congress, bill.bill_type, bill.number)
for bill in session.scalars(select(Bill)).all()
}
logger.info("Found %d existing bills in DB", len(existing_bills))
total_inserted = 0
batch: list[Bill] = []
for congress_dir in congress_dirs:
bills_dir = congress_dir / "bills"
if not bills_dir.is_dir():
continue
logger.info("Scanning bills from %s", congress_dir.name)
for bill_file in bills_dir.rglob("data.json"):
data = _read_json(bill_file)
if data is None:
continue
bill = _parse_bill(data, existing_bills)
if bill is not None:
batch.append(bill)
if len(batch) >= BATCH_SIZE:
total_inserted += _flush_batch(session, batch, "bills")
total_inserted += _flush_batch(session, batch, "bills")
logger.info("Inserted %d new bills total", total_inserted)
def _parse_bill(data: dict, existing_bills: set[tuple[int, str, int]]) -> Bill | None:
"""Parse a bill data.json dict into a Bill ORM object, skipping existing."""
raw_congress = data.get("congress")
bill_type = data.get("bill_type")
raw_number = data.get("number")
if raw_congress is None or bill_type is None or raw_number is None:
return None
congress = int(raw_congress)
number = int(raw_number)
if (congress, bill_type, number) in existing_bills:
return None
sponsor_bioguide = None
sponsor = data.get("sponsor")
if sponsor:
sponsor_bioguide = sponsor.get("bioguide_id")
return Bill(
congress=congress,
bill_type=bill_type,
number=number,
title=data.get("short_title") or data.get("official_title"),
title_short=data.get("short_title"),
official_title=data.get("official_title"),
status=data.get("status"),
status_at=data.get("status_at"),
sponsor_bioguide_id=sponsor_bioguide,
subjects_top_term=data.get("subjects_top_term"),
)
# ---------------------------------------------------------------------------
# Votes (and vote records)
# ---------------------------------------------------------------------------
def ingest_votes(engine: Engine, congress_dirs: list[Path]) -> None:
"""Load vote data.json files with their vote records."""
with Session(engine) as session:
legislator_map = _build_legislator_map(session)
logger.info("Loaded %d legislators into lookup map", len(legislator_map))
bill_map = _build_bill_map(session)
logger.info("Loaded %d bills into lookup map", len(bill_map))
existing_votes = {
(vote.congress, vote.chamber, vote.session, vote.number)
for vote in session.scalars(select(Vote)).all()
}
logger.info("Found %d existing votes in DB", len(existing_votes))
total_inserted = 0
batch: list[Vote] = []
for congress_dir in congress_dirs:
votes_dir = congress_dir / "votes"
if not votes_dir.is_dir():
continue
logger.info("Scanning votes from %s", congress_dir.name)
for vote_file in votes_dir.rglob("data.json"):
data = _read_json(vote_file)
if data is None:
continue
vote = _parse_vote(data, legislator_map, bill_map, existing_votes)
if vote is not None:
batch.append(vote)
if len(batch) >= BATCH_SIZE:
total_inserted += _flush_batch(session, batch, "votes")
total_inserted += _flush_batch(session, batch, "votes")
logger.info("Inserted %d new votes total", total_inserted)
def _build_legislator_map(session: Session) -> dict[str, int]:
"""Build a mapping of bioguide_id -> legislator.id."""
return {
legislator.bioguide_id: legislator.id
for legislator in session.scalars(select(Legislator)).all()
}
def _build_bill_map(session: Session) -> dict[tuple[int, str, int], int]:
"""Build a mapping of (congress, bill_type, number) -> bill.id."""
return {
(bill.congress, bill.bill_type, bill.number): bill.id
for bill in session.scalars(select(Bill)).all()
}
def _parse_vote(
data: dict,
legislator_map: dict[str, int],
bill_map: dict[tuple[int, str, int], int],
existing_votes: set[tuple[int, str, int, int]],
) -> Vote | None:
"""Parse a vote data.json dict into a Vote ORM object with records."""
raw_congress = data.get("congress")
chamber = data.get("chamber")
raw_number = data.get("number")
vote_date = data.get("date")
if (
raw_congress is None
or chamber is None
or raw_number is None
or vote_date is None
):
return None
raw_session = data.get("session")
if raw_session is None:
return None
congress = int(raw_congress)
number = int(raw_number)
session_number = int(raw_session)
# Normalize chamber from "h"/"s" to "House"/"Senate"
chamber_normalized = {"h": "House", "s": "Senate"}.get(chamber, chamber)
if (congress, chamber_normalized, session_number, number) in existing_votes:
return None
# Resolve linked bill
bill_id = None
bill_ref = data.get("bill")
if bill_ref:
bill_key = (
int(bill_ref.get("congress", congress)),
bill_ref.get("type"),
int(bill_ref.get("number", 0)),
)
bill_id = bill_map.get(bill_key)
raw_votes = data.get("votes", {})
vote_counts = _count_votes(raw_votes)
vote_records = _build_vote_records(raw_votes, legislator_map)
return Vote(
congress=congress,
chamber=chamber_normalized,
session=session_number,
number=number,
vote_type=data.get("type"),
question=data.get("question"),
result=data.get("result"),
result_text=data.get("result_text"),
vote_date=vote_date[:10] if isinstance(vote_date, str) else vote_date,
bill_id=bill_id,
vote_records=vote_records,
**vote_counts,
)
def _count_votes(raw_votes: dict) -> dict[str, int]:
"""Count voters per position category, correctly handling dict and list formats."""
yea_count = 0
nay_count = 0
not_voting_count = 0
present_count = 0
for position, position_group in raw_votes.items():
voter_count = sum(1 for _ in _iter_voters(position_group))
if position in ("Yea", "Aye"):
yea_count += voter_count
elif position in ("Nay", "No"):
nay_count += voter_count
elif position == "Not Voting":
not_voting_count += voter_count
elif position == "Present":
present_count += voter_count
return {
"yea_count": yea_count,
"nay_count": nay_count,
"not_voting_count": not_voting_count,
"present_count": present_count,
}
def _build_vote_records(
raw_votes: dict, legislator_map: dict[str, int]
) -> list[VoteRecord]:
"""Build VoteRecord objects from raw vote data."""
records: list[VoteRecord] = []
for position, position_group in raw_votes.items():
for voter in _iter_voters(position_group):
bioguide_id = voter.get("id")
if not bioguide_id:
continue
legislator_id = legislator_map.get(bioguide_id)
if legislator_id is None:
continue
records.append(
VoteRecord(
legislator_id=legislator_id,
position=position,
)
)
return records
# ---------------------------------------------------------------------------
# Bill Text
# ---------------------------------------------------------------------------
def ingest_bill_text(engine: Engine, congress_dirs: list[Path]) -> None:
"""Load bill text from text-versions directories."""
with Session(engine) as session:
bill_map = _build_bill_map(session)
logger.info("Loaded %d bills into lookup map", len(bill_map))
existing_bill_texts = {
(bill_text.bill_id, bill_text.version_code)
for bill_text in session.scalars(select(BillText)).all()
}
logger.info(
"Found %d existing bill text versions in DB", len(existing_bill_texts)
)
total_inserted = 0
batch: list[BillText] = []
for congress_dir in congress_dirs:
logger.info("Scanning bill texts from %s", congress_dir.name)
for bill_text in _iter_bill_texts(
congress_dir, bill_map, existing_bill_texts
):
batch.append(bill_text)
if len(batch) >= BATCH_SIZE:
total_inserted += _flush_batch(session, batch, "bill texts")
total_inserted += _flush_batch(session, batch, "bill texts")
logger.info("Inserted %d new bill text versions total", total_inserted)
def _iter_bill_texts(
congress_dir: Path,
bill_map: dict[tuple[int, str, int], int],
existing_bill_texts: set[tuple[int, str]],
) -> Iterator[BillText]:
"""Yield BillText objects for a single congress directory, skipping existing."""
bills_dir = congress_dir / "bills"
if not bills_dir.is_dir():
return
for bill_dir in bills_dir.rglob("text-versions"):
if not bill_dir.is_dir():
continue
bill_key = _bill_key_from_dir(bill_dir.parent, congress_dir)
if bill_key is None:
continue
bill_id = bill_map.get(bill_key)
if bill_id is None:
continue
for version_dir in sorted(bill_dir.iterdir()):
if not version_dir.is_dir():
continue
if (bill_id, version_dir.name) in existing_bill_texts:
continue
text_content = _read_bill_text(version_dir)
version_data = _read_json(version_dir / "data.json")
yield BillText(
bill_id=bill_id,
version_code=version_dir.name,
version_name=version_data.get("version_name") if version_data else None,
date=version_data.get("issued_on") if version_data else None,
text_content=text_content,
)
def _bill_key_from_dir(
bill_dir: Path, congress_dir: Path
) -> tuple[int, str, int] | None:
"""Extract (congress, bill_type, number) from directory structure."""
congress = int(congress_dir.name)
bill_type = bill_dir.parent.name
name = bill_dir.name
# Directory name is like "hr3590" — strip the type prefix to get the number
number_str = name[len(bill_type) :]
if not number_str.isdigit():
return None
return (congress, bill_type, int(number_str))
def _read_bill_text(version_dir: Path) -> str | None:
"""Read bill text from a version directory, preferring .txt over .xml."""
for extension in ("txt", "htm", "html", "xml"):
candidates = list(version_dir.glob(f"document.{extension}"))
if not candidates:
candidates = list(version_dir.glob(f"*.{extension}"))
if candidates:
try:
return candidates[0].read_text(encoding="utf-8")
except Exception:
logger.exception("Failed to read %s", candidates[0])
return None
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _read_json(path: Path) -> dict | None:
"""Read and parse a JSON file, returning None on failure."""
try:
return orjson.loads(path.read_bytes())
except FileNotFoundError:
return None
except Exception:
logger.exception("Failed to parse %s", path)
return None
if __name__ == "__main__":
app()
+281
View File
@@ -0,0 +1,281 @@
"""Ingestion pipeline for loading JSONL post files into the weekly-partitioned posts table.
Usage:
ingest-posts /path/to/files/
ingest-posts /path/to/single_file.jsonl
ingest-posts /data/dir/ --workers 4 --batch-size 5000
"""
from __future__ import annotations
import logging
from datetime import UTC, datetime
from pathlib import Path # noqa: TC003 this is needed for typer
from typing import TYPE_CHECKING, Annotated
import orjson
import psycopg
import typer
from pipelines.pipelines.common import configure_logger
from pipelines.orm.common import get_connection_info
from pipelines.pipelines.parallelize import parallelize_process
if TYPE_CHECKING:
from collections.abc import Iterator
logger = logging.getLogger(__name__)
app = typer.Typer(help="Ingest JSONL post files into the partitioned posts table.")
@app.command()
def main(
path: Annotated[
Path,
typer.Argument(help="Directory containing JSONL files, or a single JSONL file"),
],
batch_size: Annotated[int, typer.Option(help="Rows per INSERT batch")] = 10000,
workers: Annotated[
int, typer.Option(help="Parallel workers for multi-file ingestion")
] = 4,
pattern: Annotated[
str, typer.Option(help="Glob pattern for JSONL files")
] = "*.jsonl",
) -> None:
"""Ingest JSONL post files into the weekly-partitioned posts table."""
configure_logger(level="INFO")
logger.info("starting ingest-posts")
logger.info(
"path=%s batch_size=%d workers=%d pattern=%s",
path,
batch_size,
workers,
pattern,
)
if path.is_file():
ingest_file(path, batch_size=batch_size)
elif path.is_dir():
ingest_directory(
path, batch_size=batch_size, max_workers=workers, pattern=pattern
)
else:
typer.echo(f"Path does not exist: {path}", err=True)
raise typer.Exit(code=1)
logger.info("ingest-posts done")
def ingest_directory(
directory: Path,
*,
batch_size: int,
max_workers: int,
pattern: str = "*.jsonl",
) -> None:
"""Ingest all JSONL files in a directory using parallel workers."""
files = sorted(directory.glob(pattern))
if not files:
logger.warning("No JSONL files found in %s", directory)
return
logger.info("Found %d JSONL files to ingest", len(files))
kwargs_list = [{"path": fp, "batch_size": batch_size} for fp in files]
parallelize_process(ingest_file, kwargs_list, max_workers=max_workers)
SCHEMA = "main"
COLUMNS = (
"post_id",
"user_id",
"instance",
"date",
"text",
"langs",
"like_count",
"reply_count",
"repost_count",
"reply_to",
"replied_author",
"thread_root",
"thread_root_author",
"repost_from",
"reposted_author",
"quotes",
"quoted_author",
"labels",
"sent_label",
"sent_score",
)
INSERT_FROM_STAGING = f"""
INSERT INTO {SCHEMA}.posts ({", ".join(COLUMNS)})
SELECT {", ".join(COLUMNS)} FROM pg_temp.staging
ON CONFLICT (post_id, date) DO NOTHING
""" # noqa: S608
FAILED_INSERT = f"""
INSERT INTO {SCHEMA}.failed_ingestion (raw_line, error)
VALUES (%(raw_line)s, %(error)s)
""" # noqa: S608
def get_psycopg_connection() -> psycopg.Connection:
"""Create a raw psycopg3 connection from environment variables."""
database, host, port, username, password = get_connection_info("DATA_SCIENCE_DEV")
return psycopg.connect(
dbname=database,
host=host,
port=int(port),
user=username,
password=password,
autocommit=False,
)
def ingest_file(path: Path, *, batch_size: int) -> None:
"""Ingest a single JSONL file into the posts table."""
log_trigger = max(100_000 // batch_size, 1)
failed_lines: list[dict] = []
try:
with get_psycopg_connection() as connection:
for index, batch in enumerate(
read_jsonl_batches(path, batch_size, failed_lines), 1
):
ingest_batch(connection, batch)
if index % log_trigger == 0:
logger.info(
"Ingested %d batches (%d rows) from %s",
index,
index * batch_size,
path,
)
if failed_lines:
logger.warning(
"Recording %d malformed lines from %s", len(failed_lines), path.name
)
with connection.cursor() as cursor:
cursor.executemany(FAILED_INSERT, failed_lines)
connection.commit()
except Exception:
logger.exception("Failed to ingest file: %s", path)
raise
def ingest_batch(connection: psycopg.Connection, batch: list[dict]) -> None:
"""COPY batch into a temp staging table, then INSERT ... ON CONFLICT into posts."""
if not batch:
return
try:
with connection.cursor() as cursor:
cursor.execute(f"""
CREATE TEMP TABLE IF NOT EXISTS staging
(LIKE {SCHEMA}.posts INCLUDING DEFAULTS)
ON COMMIT DELETE ROWS
""")
cursor.execute("TRUNCATE pg_temp.staging")
with cursor.copy(
f"COPY pg_temp.staging ({', '.join(COLUMNS)}) FROM STDIN"
) as copy:
for row in batch:
copy.write_row(tuple(row.get(column) for column in COLUMNS))
cursor.execute(INSERT_FROM_STAGING)
connection.commit()
except Exception as error:
connection.rollback()
if len(batch) == 1:
logger.exception("Skipping bad row post_id=%s", batch[0].get("post_id"))
with connection.cursor() as cursor:
cursor.execute(
FAILED_INSERT,
{
"raw_line": orjson.dumps(batch[0], default=str).decode(),
"error": str(error),
},
)
connection.commit()
return
midpoint = len(batch) // 2
ingest_batch(connection, batch[:midpoint])
ingest_batch(connection, batch[midpoint:])
def read_jsonl_batches(
file_path: Path, batch_size: int, failed_lines: list[dict]
) -> Iterator[list[dict]]:
"""Stream a JSONL file and yield batches of transformed rows."""
batch: list[dict] = []
with file_path.open("r", encoding="utf-8") as handle:
for raw_line in handle:
line = raw_line.strip()
if not line:
continue
batch.extend(parse_line(line, file_path, failed_lines))
if len(batch) >= batch_size:
yield batch
batch = []
if batch:
yield batch
def parse_line(line: str, file_path: Path, failed_lines: list[dict]) -> Iterator[dict]:
"""Parse a JSONL line, handling concatenated JSON objects."""
try:
yield transform_row(orjson.loads(line))
except orjson.JSONDecodeError:
if "}{" not in line:
logger.warning(
"Skipping malformed line in %s: %s", file_path.name, line[:120]
)
failed_lines.append({"raw_line": line, "error": "malformed JSON"})
return
fragments = line.replace("}{", "}\n{").split("\n")
for fragment in fragments:
try:
yield transform_row(orjson.loads(fragment))
except (orjson.JSONDecodeError, KeyError, ValueError) as error:
logger.warning(
"Skipping malformed fragment in %s: %s",
file_path.name,
fragment[:120],
)
failed_lines.append({"raw_line": fragment, "error": str(error)})
except Exception as error:
logger.exception("Skipping bad row in %s: %s", file_path.name, line[:120])
failed_lines.append({"raw_line": line, "error": str(error)})
def transform_row(raw: dict) -> dict:
"""Transform a raw JSONL row into a dict matching the Posts table columns."""
raw["date"] = parse_date(raw["date"])
if raw.get("langs") is not None:
raw["langs"] = orjson.dumps(raw["langs"])
if raw.get("text") is not None:
raw["text"] = raw["text"].replace("\x00", "")
return raw
def parse_date(raw_date: int) -> datetime:
"""Parse compact YYYYMMDDHHmm integer into a naive datetime (input is UTC by spec)."""
return datetime(
raw_date // 100000000,
(raw_date // 1000000) % 100,
(raw_date // 10000) % 100,
(raw_date // 100) % 100,
raw_date % 100,
tzinfo=UTC,
)
if __name__ == "__main__":
app()
@@ -1,574 +0,0 @@
"""Calculate legislator topic scores from bill topics and roll-call votes."""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Annotated, Sequence
import typer
from sqlalchemy import (
ColumnElement,
Integer,
Select,
and_,
case,
cast,
delete,
extract,
func,
or_,
select,
tuple_,
)
from sqlalchemy.orm import Session
from pipelines.congress_vote_context import create_score_run, finalize_score_run
from pipelines.orm.common import get_postgres_engine
from pipelines.orm.data_science_dev.congress import (
BillTopic,
BillTopicPosition,
LegislatorScore,
SubjectType,
Vote,
VoteClassification,
VoteEffect,
VoteMeasureLink,
VoteMeasureRole,
VotePositionMeaning,
VoteRelationship,
VoteRecord,
)
from pipelines.pipelines.jobs.extract_bill_topics import normalize_topic_label
from pipelines.web.scoring import (
OPPOSE_POSITIONS,
SUPPORT_POSITIONS,
normalized_position_expression,
)
logger = logging.getLogger(__name__)
DELETE_BATCH_SIZE = 5_000
@dataclass(frozen=True)
class ScoreDiagnostics:
"""Counts for the input stages required to calculate legislator scores."""
bill_topic_rows: int
linked_vote_rows: int
vote_record_rows: int
topic_vote_links: int
scorable_vote_records: int
@dataclass(frozen=True)
class LegislatorScoreInput:
"""One aggregated score ready to store in legislator_score."""
legislator_id: int
year: int
topic: str
score: float
supportive: int
opposed: int
def create_legislator_score_query(
*,
congress: int | None = None,
bill_ids: Sequence[int] | None = None,
topics: Sequence[str] | None = None,
) -> Select:
"""Build the aggregate score query from extracted bill topics and vote records."""
normalized_vote = normalized_position_expression(VoteRecord.position)
supportive_vote = _supportive_vote_expression(normalized_vote)
opposed_vote = _opposed_vote_expression(normalized_vote)
supportive_count = func.sum(supportive_vote)
opposed_count = func.sum(opposed_vote)
total_count = supportive_count + opposed_count
vote_year = cast(extract("year", Vote.vote_date), Integer)
score = (100.0 * supportive_count / func.nullif(total_count, 0)).label("score")
stmt = (
select(
VoteRecord.legislator_id.label("legislator_id"),
vote_year.label("year"),
BillTopic.topic.label("topic"),
score,
supportive_count.label("supportive"),
opposed_count.label("opposed"),
total_count.label("total"),
)
.select_from(VoteRecord)
.join(Vote, Vote.id == VoteRecord.vote_id)
.join(VoteClassification, VoteClassification.vote_id == Vote.id)
.join(VotePositionMeaning, VotePositionMeaning.vote_id == Vote.id)
.join(
VoteMeasureLink,
and_(
VoteMeasureLink.vote_id == Vote.id,
VoteMeasureLink.role == VoteMeasureRole.VOTED_ON,
),
)
.join(BillTopic, BillTopic.bill_id == VoteMeasureLink.measure_id)
.where(
*_eligible_vote_filters(),
_is_scorable_position(normalized_vote),
)
.group_by(VoteRecord.legislator_id, vote_year, BillTopic.topic)
.having(total_count > 0)
.order_by(VoteRecord.legislator_id, vote_year, BillTopic.topic)
)
if congress is not None:
stmt = stmt.where(Vote.congress == congress)
if bill_ids:
stmt = stmt.where(VoteMeasureLink.measure_id.in_(list(bill_ids)))
normalized_topics = _normalize_topics(topics)
if normalized_topics:
stmt = stmt.where(BillTopic.topic.in_(normalized_topics))
return stmt
def collect_legislator_scores(
session: Session,
*,
congress: int | None = None,
bill_ids: Sequence[int] | None = None,
topics: Sequence[str] | None = None,
) -> list[LegislatorScoreInput]:
"""Run the aggregate query and return score rows."""
rows = session.execute(
create_legislator_score_query(
congress=congress,
bill_ids=bill_ids,
topics=topics,
)
)
return [
LegislatorScoreInput(
legislator_id=int(row.legislator_id),
year=int(row.year),
topic=str(row.topic),
score=float(row.score),
supportive=int(row.supportive),
opposed=int(row.opposed),
)
for row in rows
if row.score is not None
]
def collect_score_diagnostics(
session: Session,
*,
congress: int | None = None,
bill_ids: Sequence[int] | None = None,
topics: Sequence[str] | None = None,
) -> ScoreDiagnostics:
"""Count score pipeline inputs for explaining empty score runs."""
normalized_topics = _normalize_topics(topics)
vote_filters = _vote_scope_filters(congress=congress, bill_ids=bill_ids)
topic_filters = _topic_scope_filters(bill_ids=bill_ids, topics=normalized_topics)
normalized_vote = normalized_position_expression(VoteRecord.position)
eligible_vote_filters = _eligible_vote_filters()
bill_topic_rows = session.scalar(
select(func.count(BillTopic.id)).where(*topic_filters)
)
linked_vote_rows = session.scalar(
select(func.count(func.distinct(Vote.id)))
.select_from(Vote)
.join(VoteClassification, VoteClassification.vote_id == Vote.id)
.join(
VoteMeasureLink,
and_(
VoteMeasureLink.vote_id == Vote.id,
VoteMeasureLink.role == VoteMeasureRole.VOTED_ON,
),
)
.where(*vote_filters, *eligible_vote_filters)
)
vote_record_rows = session.scalar(
select(func.count())
.select_from(VoteRecord)
.join(Vote, Vote.id == VoteRecord.vote_id)
.join(VoteClassification, VoteClassification.vote_id == Vote.id)
.where(*vote_filters, *eligible_vote_filters)
)
topic_vote_links = session.scalar(
select(func.count())
.select_from(VoteRecord)
.join(Vote, Vote.id == VoteRecord.vote_id)
.join(VoteClassification, VoteClassification.vote_id == Vote.id)
.join(VotePositionMeaning, VotePositionMeaning.vote_id == Vote.id)
.join(
VoteMeasureLink,
and_(
VoteMeasureLink.vote_id == Vote.id,
VoteMeasureLink.role == VoteMeasureRole.VOTED_ON,
),
)
.join(BillTopic, BillTopic.bill_id == VoteMeasureLink.measure_id)
.where(*vote_filters, *topic_filters, *eligible_vote_filters)
)
scorable_vote_records = session.scalar(
select(func.count())
.select_from(VoteRecord)
.join(Vote, Vote.id == VoteRecord.vote_id)
.join(VoteClassification, VoteClassification.vote_id == Vote.id)
.join(VotePositionMeaning, VotePositionMeaning.vote_id == Vote.id)
.join(
VoteMeasureLink,
and_(
VoteMeasureLink.vote_id == Vote.id,
VoteMeasureLink.role == VoteMeasureRole.VOTED_ON,
),
)
.join(BillTopic, BillTopic.bill_id == VoteMeasureLink.measure_id)
.where(
*vote_filters,
*topic_filters,
*eligible_vote_filters,
_is_scorable_position(normalized_vote),
)
)
return ScoreDiagnostics(
bill_topic_rows=bill_topic_rows or 0,
linked_vote_rows=linked_vote_rows or 0,
vote_record_rows=vote_record_rows or 0,
topic_vote_links=topic_vote_links or 0,
scorable_vote_records=scorable_vote_records or 0,
)
def store_legislator_scores(
session: Session,
rows: Sequence[LegislatorScoreInput],
*,
score_run_id: int | None,
replace_all: bool = False,
) -> int:
"""Replace matching score rows and insert the newly calculated scores."""
if replace_all:
session.execute(delete(LegislatorScore))
elif rows:
keys = [
(row.legislator_id, row.year, row.topic)
for row in rows
]
for key_batch in _batched(keys, DELETE_BATCH_SIZE):
session.execute(
delete(LegislatorScore).where(
tuple_(
LegislatorScore.legislator_id,
LegislatorScore.year,
LegislatorScore.topic,
).in_(key_batch)
)
)
session.add_all(
[
LegislatorScore(
legislator_id=row.legislator_id,
year=row.year,
topic=row.topic,
score=row.score,
score_run_id=score_run_id,
)
for row in rows
]
)
return len(rows)
def _supportive_vote_expression(
normalized_vote: ColumnElement[str | None],
) -> ColumnElement[int]:
supports_text = _position_matches_effect(normalized_vote, VoteEffect.SUPPORTS_TEXT)
opposes_text = _position_matches_effect(normalized_vote, VoteEffect.OPPOSES_TEXT)
return case(
(
and_(
BillTopic.support_position == BillTopicPosition.FOR,
supports_text,
),
1,
),
(
and_(
BillTopic.support_position == BillTopicPosition.AGAINST,
opposes_text,
),
1,
),
else_=0,
)
def _opposed_vote_expression(
normalized_vote: ColumnElement[str | None],
) -> ColumnElement[int]:
supports_text = _position_matches_effect(normalized_vote, VoteEffect.SUPPORTS_TEXT)
opposes_text = _position_matches_effect(normalized_vote, VoteEffect.OPPOSES_TEXT)
return case(
(
and_(
BillTopic.support_position == BillTopicPosition.FOR,
opposes_text,
),
1,
),
(
and_(
BillTopic.support_position == BillTopicPosition.AGAINST,
supports_text,
),
1,
),
else_=0,
)
def _position_matches_effect(
normalized_vote: ColumnElement[str | None],
effect: VoteEffect,
) -> ColumnElement[bool]:
return or_(
and_(
normalized_vote.in_(sorted(SUPPORT_POSITIONS)),
VotePositionMeaning.yea_effect == effect,
),
and_(
normalized_vote.in_(sorted(OPPOSE_POSITIONS)),
VotePositionMeaning.nay_effect == effect,
),
and_(
normalized_vote == "present",
VotePositionMeaning.present_effect == effect,
),
)
def _is_scorable_position(normalized_vote: ColumnElement[str | None]) -> ColumnElement[bool]:
return or_(
_position_matches_effect(normalized_vote, VoteEffect.SUPPORTS_TEXT),
_position_matches_effect(normalized_vote, VoteEffect.OPPOSES_TEXT),
)
def _normalize_topics(topics: Sequence[str] | None) -> list[str]:
normalized: list[str] = []
seen: set[str] = set()
for topic in topics or []:
value = normalize_topic_label(topic)
if value and value not in seen:
normalized.append(value)
seen.add(value)
return normalized
def _batched[T](items: Sequence[T], batch_size: int) -> list[Sequence[T]]:
return [
items[index : index + batch_size]
for index in range(0, len(items), batch_size)
]
def _vote_scope_filters(
*,
congress: int | None,
bill_ids: Sequence[int] | None,
) -> list[ColumnElement[bool]]:
filters: list[ColumnElement[bool]] = []
if congress is not None:
filters.append(Vote.congress == congress)
if bill_ids:
filters.append(VoteMeasureLink.measure_id.in_(list(bill_ids)))
return filters
def _topic_scope_filters(
*,
bill_ids: Sequence[int] | None,
topics: Sequence[str],
) -> list[ColumnElement[bool]]:
filters: list[ColumnElement[bool]] = []
if bill_ids:
filters.append(BillTopic.bill_id.in_(list(bill_ids)))
if topics:
filters.append(BillTopic.topic.in_(list(topics)))
return filters
def _has_score_scope(
*,
congress: int | None,
bill_ids: Sequence[int] | None,
topics: Sequence[str] | None,
) -> bool:
return congress is not None or bool(bill_ids) or bool(topics)
def _eligible_vote_filters() -> list[ColumnElement[bool]]:
return [
VoteClassification.subject_type == SubjectType.MEASURE,
VoteClassification.vote_relationship == VoteRelationship.DIRECT_TEXT_VOTE,
VoteClassification.is_direct_vote_on_legislative_text.is_(True),
VoteClassification.is_substantive_policy_vote.is_(True),
VoteClassification.is_special_rule.is_(False),
]
def main(
congress: Annotated[
int | None,
typer.Option(help="Only score votes from one Congress."),
] = None,
bill_ids: Annotated[
list[int] | None,
typer.Option(
"--bill-id",
help="Only score votes linked to one internal bill.id. Repeatable.",
),
] = None,
topics: Annotated[
list[str] | None,
typer.Option("--topic", help="Only score one normalized topic. Repeatable."),
] = None,
replace_all: Annotated[
bool,
typer.Option(
help="Delete every existing legislator score before inserting. "
"Unfiltered runs do this automatically."
),
] = False,
dry_run: Annotated[
bool,
typer.Option(help="Calculate scores without writing to the database."),
] = False,
log_level: Annotated[str, typer.Option(help="Log level.")] = "INFO",
diagnose: Annotated[
bool,
typer.Option(help="Log input-stage counts even when rows are calculated."),
] = False,
) -> None:
"""CLI entrypoint for calculating and storing legislator topic scores."""
logging.basicConfig(
level=log_level,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
engine = get_postgres_engine(name="DATA_SCIENCE_DEV")
with Session(engine) as session:
rows = collect_legislator_scores(
session,
congress=congress,
bill_ids=bill_ids,
topics=topics,
)
logger.info("Calculated %d legislator topic score rows", len(rows))
if diagnose or not rows:
diagnostics = collect_score_diagnostics(
session,
congress=congress,
bill_ids=bill_ids,
topics=topics,
)
_log_diagnostics(diagnostics)
if dry_run:
session.rollback()
return
score_run = create_score_run(session)
should_replace_all = replace_all or not _has_score_scope(
congress=congress,
bill_ids=bill_ids,
topics=topics,
)
written = store_legislator_scores(
session,
rows,
score_run_id=score_run.id,
replace_all=should_replace_all,
)
included_vote_count = session.scalar(
select(func.count(func.distinct(Vote.id)))
.select_from(VoteRecord)
.join(Vote, Vote.id == VoteRecord.vote_id)
.join(VoteClassification, VoteClassification.vote_id == Vote.id)
.join(VotePositionMeaning, VotePositionMeaning.vote_id == Vote.id)
.join(
VoteMeasureLink,
and_(
VoteMeasureLink.vote_id == Vote.id,
VoteMeasureLink.role == VoteMeasureRole.VOTED_ON,
),
)
.join(BillTopic, BillTopic.bill_id == VoteMeasureLink.measure_id)
.where(
*_vote_scope_filters(congress=congress, bill_ids=bill_ids),
*_topic_scope_filters(bill_ids=bill_ids, topics=_normalize_topics(topics)),
*_eligible_vote_filters(),
_is_scorable_position(normalized_position_expression(VoteRecord.position)),
)
) or 0
total_scoped_votes = session.scalar(
select(func.count(func.distinct(Vote.id)))
.select_from(Vote)
.join(VoteClassification, VoteClassification.vote_id == Vote.id)
.join(
VoteMeasureLink,
and_(
VoteMeasureLink.vote_id == Vote.id,
VoteMeasureLink.role == VoteMeasureRole.VOTED_ON,
),
)
.where(*_vote_scope_filters(congress=congress, bill_ids=bill_ids))
) or 0
finalize_score_run(
session,
score_run=score_run,
included_vote_count=included_vote_count,
excluded_vote_count=max(total_scoped_votes - included_vote_count, 0),
)
session.commit()
logger.info("Stored %d legislator topic score rows", written)
def _log_diagnostics(diagnostics: ScoreDiagnostics) -> None:
logger.info(
"Score input diagnostics: bill_topic_rows=%d linked_vote_rows=%d "
"vote_record_rows=%d topic_vote_links=%d scorable_vote_records=%d",
diagnostics.bill_topic_rows,
diagnostics.linked_vote_rows,
diagnostics.vote_record_rows,
diagnostics.topic_vote_links,
diagnostics.scorable_vote_records,
)
if diagnostics.bill_topic_rows == 0:
logger.warning(
"No extracted bill topics matched the score scope. Run "
"pipelines.tools.extract_bill_topics after bill summarization."
)
elif diagnostics.linked_vote_rows == 0:
logger.warning("No direct substantive text votes matched the score scope.")
elif diagnostics.vote_record_rows == 0:
logger.warning("No individual vote records matched the score scope.")
elif diagnostics.topic_vote_links == 0:
logger.warning(
"Bill topics exist, but none are attached to bills that have eligible scored votes."
)
elif diagnostics.scorable_vote_records == 0:
logger.warning(
"Topic-vote links exist, but no joined vote records had Yea/Aye/Yes/Nay/No positions."
)
if __name__ == "__main__":
typer.run(main)
-682
View File
@@ -1,682 +0,0 @@
"""Extract bill topics from bill text using a configurable topic catalog."""
from __future__ import annotations
import json
import logging
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Annotated, Any, Sequence
import httpx
import typer
from sqlalchemy import ColumnElement, Select, delete, exists, func, select
from sqlalchemy.orm import Session, selectinload
from pipelines.config import OpenAIConfig, get_config_dir, get_openai_config
from pipelines.orm.common import get_postgres_engine
from pipelines.orm.data_science_dev.congress import (
Bill,
BillText,
BillTopic,
BillTopicPosition,
SubjectType,
VoteClassification,
VoteRelationship,
VoteTextTarget,
)
logger = logging.getLogger(__name__)
OPENAI_PROJECT_ID = "proj_fQBPEXFgnS87Fk6wZwploFwE"
OPENAI_CHAT_COMPLETIONS_URL = "https://api.openai.com/v1/chat/completions"
REQUEST_TIMEOUT_SECONDS = 60
DEFAULT_TOPICS_PATH = get_config_dir() / "congressional_issues_comprehensive.json"
class TopicExtractionError(RuntimeError):
"""Raised when a topic extraction request or response is invalid."""
@dataclass(frozen=True)
class TopicCatalog:
"""Loaded topic catalog with categories for prompting and flat candidates."""
topics_by_category: dict[str, list[str]]
candidate_topics: list[str]
@dataclass(frozen=True)
class TopicExtractionDiagnostics:
"""Counts for the bill summary inputs needed by topic extraction."""
bill_rows: int
bill_text_rows: int
summarized_bill_text_rows: int
bills_with_summaries: int
bill_topic_rows: int
selected_bills: int
@dataclass(frozen=True)
class ExtractedBillTopic:
"""One extracted bill topic and yes-vote stance."""
topic: str
support_position: BillTopicPosition
confidence: float | None = None
evidence: str | None = None
def _select_bill_text_for_topic_extraction(bill: Bill) -> BillText | None:
"""Pick one summarized bill_text row from the already-loaded relationship."""
for bill_text in bill.bill_texts:
if bill_text.summary and bill_text.summary.strip():
return bill_text
return None
def normalize_topic_label(value: str) -> str:
"""Normalize a topic label for storage, comparison, and de-duping."""
normalized = value.strip().strip("\"'")
normalized = normalized.strip().rstrip(".").strip()
return re.sub(r"\s+", " ", normalized).lower()
def load_topic_catalog(path: Path | None = None) -> TopicCatalog:
"""Load, validate, normalize, and flatten the bill topic catalog."""
topics_path = path or DEFAULT_TOPICS_PATH
try:
raw = json.loads(topics_path.read_text())
except FileNotFoundError as exc:
msg = f"Topic catalog not found: {topics_path}"
raise TopicExtractionError(msg) from exc
except json.JSONDecodeError as exc:
msg = f"Topic catalog is not valid JSON: {topics_path}: {exc}"
raise TopicExtractionError(msg) from exc
if not isinstance(raw, dict):
msg = "Topic catalog root must be an object mapping category names to lists"
raise TopicExtractionError(msg)
topics_by_category: dict[str, list[str]] = {}
candidate_topics: list[str] = []
seen_topics: set[str] = set()
for category, topics in raw.items():
if not isinstance(category, str) or not category.strip():
msg = "Topic catalog category names must be non-empty strings"
raise TopicExtractionError(msg)
if not isinstance(topics, list):
msg = f"Topic catalog category {category!r} must contain a list"
raise TopicExtractionError(msg)
normalized_topics: list[str] = []
for topic in topics:
if not isinstance(topic, str):
msg = f"Topic catalog category {category!r} contains a non-string topic"
raise TopicExtractionError(msg)
normalized_topic = normalize_topic_label(topic)
if not normalized_topic:
msg = f"Topic catalog category {category!r} contains a blank topic"
raise TopicExtractionError(msg)
if normalized_topic in seen_topics:
continue
seen_topics.add(normalized_topic)
normalized_topics.append(normalized_topic)
candidate_topics.append(normalized_topic)
topics_by_category[category.strip()] = normalized_topics
return TopicCatalog(
topics_by_category=topics_by_category,
candidate_topics=candidate_topics,
)
def build_topic_extraction_messages(
*,
bill: Bill,
bill_text: str,
candidate_topics: Sequence[str],
) -> list[dict[str, str]]:
"""Build GPT messages for extracting a bill's scored topics."""
normalized_candidates = [normalize_topic_label(topic) for topic in candidate_topics]
candidate_list = "\n".join(f"- {topic}" for topic in normalized_candidates)
metadata = "\n".join(
(
f"Congress: {bill.congress}",
f"Bill: {bill.bill_type} {bill.number}",
f"Title: {bill.title_short or bill.title or bill.official_title or ''}",
f"Top subject term: {bill.subjects_top_term or ''}",
)
)
system_prompt = (
"You extract policy topics from U.S. congressional bills.\n"
'For each selected topic, decide whether a Yes/Yea vote on the bill is "for" or "against" that topic.\n'
'Use "support_position": "for" when a Yes/Yea vote advances or supports the topic.\n'
'Use "support_position": "against" when a Yes/Yea vote restricts, repeals, blocks, or opposes the topic.\n'
"Select only topics from the provided candidate topic list.\n"
"Omit topics that are not materially addressed by the bill.\n"
"Return strict JSON only, with this shape:\n"
'{"topics":[{"topic":"candidate topic","support_position":"for","confidence":0.0,"evidence":"short reason"}]}'
)
user_prompt = "\n\n".join(
(
"BILL METADATA:",
metadata,
"CANDIDATE TOPICS:",
candidate_list,
"BILL TEXT:",
bill_text,
)
)
return [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
def call_openai_topic_extraction(
*,
openai_config: OpenAIConfig,
messages: list[dict[str, str]],
) -> str:
"""Call GPT and return the assistant message content."""
response = httpx.post(
openai_config.openai_chat_completions_url,
headers={
"Authorization": f"Bearer {openai_config.api_key}",
"OpenAI-Project": openai_config.openai_project_id,
"Content-Type": "application/json",
},
json={
"model": "gpt-5.4-mini",
"messages": messages,
},
timeout=openai_config.timeout_seconds,
)
response.raise_for_status()
return extract_message_content(response.json())
def extract_message_content(data: dict[str, Any]) -> str:
"""Extract message content from a chat-completions response body."""
choices = data.get("choices")
if not isinstance(choices, list) or not choices:
msg = "Chat completion response did not contain choices"
raise TopicExtractionError(msg)
first = choices[0]
if not isinstance(first, dict):
msg = "Chat completion choice must be an object"
raise TopicExtractionError(msg)
message = first.get("message")
if isinstance(message, dict) and isinstance(message.get("content"), str):
return message["content"]
if isinstance(first.get("text"), str):
return first["text"]
msg = "Chat completion response did not contain message content"
raise TopicExtractionError(msg)
def parse_topic_extraction_response(response_text: str) -> list[ExtractedBillTopic]:
"""Parse, normalize, validate, and de-dupe a topic extraction response."""
payload = _load_json_response(response_text)
topics = payload.get("topics")
if not isinstance(topics, list):
msg = "Topic extraction response must contain a topics list"
raise TopicExtractionError(msg)
deduped: dict[tuple[str, BillTopicPosition], ExtractedBillTopic] = {}
for item in topics:
if not isinstance(item, dict):
msg = "Topic extraction response topics must be objects"
raise TopicExtractionError(msg)
raw_topic = _extract_topic_label(item)
topic = normalize_topic_label(raw_topic)
if not topic:
msg = "Topic extraction response topic must not be blank"
raise TopicExtractionError(msg)
raw_position = item.get("support_position")
try:
support_position = BillTopicPosition(raw_position)
except ValueError as exc:
msg = f"Invalid support_position: {raw_position!r}"
raise TopicExtractionError(msg) from exc
confidence = _parse_confidence(item.get("confidence"))
evidence = item.get("evidence")
if evidence is not None and not isinstance(evidence, str):
evidence = str(evidence)
extracted = ExtractedBillTopic(
topic=topic,
support_position=support_position,
confidence=confidence,
evidence=evidence,
)
key = (topic, support_position)
existing = deduped.get(key)
if existing is None or _confidence_rank(extracted) > _confidence_rank(existing):
deduped[key] = extracted
return list(deduped.values())
def extract_topics_for_bill_text(
*,
openai_config: OpenAIConfig,
bill: Bill,
text: str,
candidate_topics: Sequence[str],
) -> list[ExtractedBillTopic]:
"""Extract accepted catalog topics for a bill text string."""
normalized_candidates = {normalize_topic_label(topic) for topic in candidate_topics}
messages = build_topic_extraction_messages(
bill=bill,
bill_text=text,
candidate_topics=sorted(normalized_candidates),
)
response_text = call_openai_topic_extraction(
openai_config=openai_config,
messages=messages,
)
extracted_topics = parse_topic_extraction_response(response_text)
return [topic for topic in extracted_topics if topic.topic in normalized_candidates]
def store_bill_topic_result(
*,
session: Session,
bill: Bill,
topics: Sequence[ExtractedBillTopic],
replace_existing: bool = True,
) -> None:
"""Store extracted topics for one bill."""
if replace_existing:
session.execute(delete(BillTopic).where(BillTopic.bill_id == bill.id))
for topic in topics:
session.add(
BillTopic(
bill_id=bill.id,
topic=normalize_topic_label(topic.topic),
support_position=topic.support_position,
)
)
def create_select_bills_for_topic_extraction(
congress: int | None = None,
bill_ids: list[int] | None = None,
bill_text_ids: list[int] | None = None,
with_votes_only: bool = False,
force: bool = False,
limit: int | None = None,
) -> Select[tuple[Bill]]:
"""Select bill rows that have summarized bill_text rows for topic extraction."""
has_summary = (BillText.summary.is_not(None), BillText.summary != "")
summarized_text_filters: list[ColumnElement[bool]] = [
BillText.bill_id == Bill.id,
*has_summary,
]
if with_votes_only:
summarized_text_filters.append(
exists(
select(VoteTextTarget.vote_id)
.join(
VoteClassification,
VoteClassification.vote_id == VoteTextTarget.vote_id,
)
.where(
VoteTextTarget.voted_text_version_id == BillText.id,
VoteClassification.subject_type == SubjectType.MEASURE,
VoteClassification.vote_relationship
== VoteRelationship.DIRECT_TEXT_VOTE,
VoteClassification.is_direct_vote_on_legislative_text.is_(True),
VoteClassification.is_substantive_policy_vote.is_(True),
VoteClassification.is_special_rule.is_(False),
)
)
)
summarized_text_exists = exists(select(BillText.id).where(*summarized_text_filters))
stmt = (
select(Bill)
.where(summarized_text_exists)
.options(selectinload(Bill.bill_texts.and_(*summarized_text_filters[1:])))
.order_by(Bill.id)
)
if congress is not None:
stmt = stmt.where(Bill.congress == congress)
if bill_ids:
stmt = stmt.where(Bill.id.in_(bill_ids))
if bill_text_ids:
selected_text_exists = exists(
select(BillText.id).where(
BillText.bill_id == Bill.id,
BillText.id.in_(bill_text_ids),
*summarized_text_filters[1:],
)
)
stmt = stmt.where(selected_text_exists)
if not force:
stmt = stmt.where(
~exists(select(BillTopic.id).where(BillTopic.bill_id == Bill.id))
)
if limit is not None:
stmt = stmt.limit(limit)
return stmt
def collect_topic_extraction_diagnostics(
session: Session,
*,
congress: int | None = None,
bill_ids: list[int] | None = None,
bill_text_ids: list[int] | None = None,
with_votes_only: bool = False,
force: bool = False,
limit: int | None = None,
) -> TopicExtractionDiagnostics:
"""Count topic extraction inputs for explaining empty selections."""
bill_filters = []
bill_text_filters: list[ColumnElement[bool]] = []
if congress is not None:
bill_filters.append(Bill.congress == congress)
if bill_ids:
bill_filters.append(Bill.id.in_(bill_ids))
bill_text_filters.append(BillText.bill_id.in_(bill_ids))
if bill_text_ids:
bill_text_filters.append(BillText.id.in_(bill_text_ids))
if with_votes_only:
bill_text_filters.append(
exists(
select(VoteTextTarget.vote_id)
.join(
VoteClassification,
VoteClassification.vote_id == VoteTextTarget.vote_id,
)
.where(
VoteTextTarget.voted_text_version_id == BillText.id,
VoteClassification.subject_type == SubjectType.MEASURE,
VoteClassification.vote_relationship
== VoteRelationship.DIRECT_TEXT_VOTE,
VoteClassification.is_direct_vote_on_legislative_text.is_(True),
VoteClassification.is_substantive_policy_vote.is_(True),
VoteClassification.is_special_rule.is_(False),
)
)
)
has_summary = (BillText.summary.is_not(None), BillText.summary != "")
summary_filters = [*bill_text_filters, *has_summary]
bills_with_summaries = session.scalar(
select(func.count(func.distinct(Bill.id)))
.select_from(Bill)
.join(BillText, BillText.bill_id == Bill.id)
.where(*bill_filters, *summary_filters)
)
selected_bills = session.scalar(
select(func.count()).select_from(
create_select_bills_for_topic_extraction(
congress=congress,
bill_ids=bill_ids,
bill_text_ids=bill_text_ids,
with_votes_only=with_votes_only,
force=force,
limit=limit,
).subquery()
)
)
return TopicExtractionDiagnostics(
bill_rows=session.scalar(select(func.count(Bill.id)).where(*bill_filters)) or 0,
bill_text_rows=_count_bill_texts(
session,
bill_filters=bill_filters,
bill_text_filters=bill_text_filters,
),
summarized_bill_text_rows=_count_bill_texts(
session,
bill_filters=bill_filters,
bill_text_filters=summary_filters,
),
bills_with_summaries=bills_with_summaries or 0,
bill_topic_rows=session.scalar(select(func.count(BillTopic.id))) or 0,
selected_bills=selected_bills or 0,
)
def _load_json_response(response_text: str) -> dict[str, Any]:
text = response_text.strip()
fenced = re.fullmatch(r"```(?:json)?\s*(.*?)\s*```", text, flags=re.DOTALL)
if fenced:
text = fenced.group(1).strip()
try:
payload = json.loads(text)
except json.JSONDecodeError as exc:
msg = f"Topic extraction response is not valid JSON: {exc}"
raise TopicExtractionError(msg) from exc
if not isinstance(payload, dict):
msg = "Topic extraction response must be a JSON object"
raise TopicExtractionError(msg)
return payload
def _parse_confidence(raw: Any) -> float | None:
if raw is None:
return None
try:
return float(raw)
except (TypeError, ValueError) as exc:
msg = f"Invalid confidence: {raw!r}"
raise TopicExtractionError(msg) from exc
def _confidence_rank(topic: ExtractedBillTopic) -> tuple[int, float]:
if topic.confidence is None:
return (0, 0.0)
return (1, topic.confidence)
def _extract_topic_label(item: dict[str, Any]) -> str:
raw_topic = item.get("topic")
if isinstance(raw_topic, str):
return raw_topic
if isinstance(raw_topic, dict):
for key in ("topic", "label", "name", "title"):
value = raw_topic.get(key)
if isinstance(value, str):
return value
msg = "Topic extraction response topic must be a string"
raise TopicExtractionError(msg)
def _count_bill_texts(
session: Session,
*,
bill_filters: Sequence[ColumnElement[bool]],
bill_text_filters: Sequence[ColumnElement[bool]],
) -> int:
stmt = select(func.count(BillText.id))
if bill_filters:
stmt = stmt.join(Bill, Bill.id == BillText.bill_id).where(*bill_filters)
return session.scalar(stmt.where(*bill_text_filters)) or 0
def main(
topics_path: Annotated[
Path, typer.Option(help="Path to congressional issue topic JSON.")
] = DEFAULT_TOPICS_PATH,
congress: Annotated[
int | None, typer.Option(help="Only process one Congress.")
] = None,
bill_ids: Annotated[
list[int] | None,
typer.Option(
"--bill-id",
help="Only process one internal bill.id. Repeat for multiple bills.",
),
] = None,
bill_text_ids: Annotated[
list[int] | None,
typer.Option(
"--bill-text-id",
help="Only process one internal bill_text.id. Repeat for multiple rows.",
),
] = None,
with_votes_only: Annotated[
bool,
typer.Option(
"--with-votes-only",
help="Only process summarized bill_text rows linked to at least one vote.",
),
] = True,
limit: Annotated[int | None, typer.Option(help="Maximum rows to process.")] = None,
force: Annotated[
bool,
typer.Option(help="Regenerate topics for bills that already have topics."),
] = False,
dry_run: Annotated[
bool,
typer.Option(help="Select bills and print diagnostics without calling OpenAI."),
] = False,
diagnose: Annotated[
bool,
typer.Option(help="Log input-stage counts before processing."),
] = False,
log_level: Annotated[str, typer.Option(help="Log level.")] = "INFO",
) -> None:
"""CLI entrypoint for generating and storing bill topics."""
logging.basicConfig(
level=log_level,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
topic_catalog = load_topic_catalog(topics_path)
logger.info(
"Loaded %d candidate topics from %s",
len(topic_catalog.candidate_topics),
topics_path,
)
engine = get_postgres_engine(name="DATA_SCIENCE_DEV")
with Session(engine) as session:
if diagnose or dry_run:
diagnostics = collect_topic_extraction_diagnostics(
session,
congress=congress,
bill_ids=bill_ids,
bill_text_ids=bill_text_ids,
with_votes_only=with_votes_only,
force=force,
limit=limit,
)
_log_topic_extraction_diagnostics(diagnostics)
if dry_run:
return
openai_config = get_openai_config()
stmt = create_select_bills_for_topic_extraction(
congress=congress,
bill_ids=bill_ids,
bill_text_ids=bill_text_ids,
with_votes_only=with_votes_only,
force=force,
limit=limit,
)
bills = session.scalars(stmt).all()
logger.info("Selected %d bills for topic extraction", len(bills))
written = 0
failed = 0
for index, bill in enumerate(bills, 1):
bill_text = _select_bill_text_for_topic_extraction(bill)
if bill_text is None:
logger.warning("Skipping bill id=%s: no usable summary", bill.id)
continue
summary = bill_text.summary.strip()
try:
extracted_topics = extract_topics_for_bill_text(
openai_config=openai_config,
bill=bill,
text=summary,
candidate_topics=topic_catalog.candidate_topics,
)
except (httpx.HTTPError, TopicExtractionError):
failed += 1
logger.exception(
"Skipping bill id=%s after topic extraction failure", bill.id
)
continue
store_bill_topic_result(
session=session,
bill=bill,
topics=extracted_topics,
replace_existing=True,
)
written += 1
if index % 100 == 0:
session.commit()
logger.info(
"Stored %d topics for bill id=%s",
len(extracted_topics),
bill.id,
)
session.commit()
logger.info(
"Done: stored topic results for %d bills; failed %d bills",
written,
failed,
)
def _log_topic_extraction_diagnostics(
diagnostics: TopicExtractionDiagnostics,
) -> None:
logger.info(
"Topic extraction diagnostics: bill_rows=%d bill_text_rows=%d "
"summarized_bill_text_rows=%d bills_with_summaries=%d "
"bill_topic_rows=%d selected_bills=%d",
diagnostics.bill_rows,
diagnostics.bill_text_rows,
diagnostics.summarized_bill_text_rows,
diagnostics.bills_with_summaries,
diagnostics.bill_topic_rows,
diagnostics.selected_bills,
)
if diagnostics.bill_rows == 0:
logger.warning("No bills matched the topic extraction scope.")
elif diagnostics.bill_text_rows == 0:
logger.warning("No bill_text rows matched the topic extraction scope.")
elif diagnostics.summarized_bill_text_rows == 0:
logger.warning(
"No summarized bill_text rows matched the topic extraction scope. "
"Run pipelines.tools.summarize_bills first."
)
elif diagnostics.selected_bills == 0 and diagnostics.bill_topic_rows > 0:
logger.warning(
"No bills selected because matching bills already have topics. "
"Use --force to regenerate them."
)
elif diagnostics.selected_bills == 0:
logger.warning("No bills selected for topic extraction.")
if __name__ == "__main__":
typer.run(main)
@@ -2,85 +2,25 @@
from pipelines.orm.data_science_dev.congress.bill import ( from pipelines.orm.data_science_dev.congress.bill import (
Bill, Bill,
BillAction,
BillActionRecordedVote,
BillRelation,
BillText, BillText,
BillTopic, BillTopic,
BillTopicPosition, BillTopicPosition,
) )
from pipelines.orm.data_science_dev.congress.amendment import (
Amendment,
AmendmentAction,
AmendmentActionRecordedVote,
)
from pipelines.orm.data_science_dev.congress.context import (
ClassificationMethod,
ConfidenceLevel,
IngestRun,
MeasureFunction,
MeasureSubtype,
ScoreRun,
SourceArtifact,
SubjectType,
TextResolutionMethod,
TextTargetBasis,
TextTargetType,
VoteActionMatch,
VoteActionScope,
VoteClassification,
VoteContextAudit,
VoteEffect,
VoteMeasureLink,
VoteMeasureRole,
VotePositionMeaning,
VoteRelationship,
VoteTextTarget,
)
from pipelines.orm.data_science_dev.congress.legislator import ( from pipelines.orm.data_science_dev.congress.legislator import (
Legislator, Legislator,
LegislatorScore, LegislatorScore,
LegislatorSocialMedia, LegislatorSocialMedia,
LegislatorScoreFake,
) )
from pipelines.orm.data_science_dev.congress.vote import Vote, VoteRecord from pipelines.orm.data_science_dev.congress.vote import Vote, VoteRecord
__all__ = [ __all__ = [
"Amendment",
"AmendmentAction",
"AmendmentActionRecordedVote",
"Bill", "Bill",
"BillAction",
"BillActionRecordedVote",
"BillRelation",
"BillText", "BillText",
"BillTopic", "BillTopic",
"BillTopicPosition", "BillTopicPosition",
"ClassificationMethod",
"ConfidenceLevel",
"IngestRun",
"Legislator", "Legislator",
"LegislatorScore", "LegislatorScore",
"LegislatorScoreFake",
"LegislatorSocialMedia", "LegislatorSocialMedia",
"MeasureFunction",
"MeasureSubtype",
"ScoreRun",
"SourceArtifact",
"SubjectType",
"TextResolutionMethod",
"TextTargetBasis",
"TextTargetType",
"Vote", "Vote",
"VoteActionMatch",
"VoteActionScope",
"VoteClassification",
"VoteContextAudit",
"VoteEffect",
"VoteMeasureLink",
"VoteMeasureRole",
"VotePositionMeaning",
"VoteRelationship",
"VoteRecord", "VoteRecord",
"VoteTextTarget",
] ]
@@ -1,127 +0,0 @@
"""Amendment models and official action context."""
from __future__ import annotations
from datetime import date, datetime
from sqlalchemy import DateTime, ForeignKey, Index, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from pipelines.orm.data_science_dev.base import DataScienceDevTableBase
class Amendment(DataScienceDevTableBase):
"""Congressional amendment linked to a bill or to another amendment."""
__tablename__ = "amendment"
__table_args__ = (
UniqueConstraint(
"congress",
"amendment_type",
"number",
name="uq_amendment_congress_type_number",
),
)
congress: Mapped[int]
amendment_type: Mapped[str]
number: Mapped[int]
chamber: Mapped[str]
description: Mapped[str | None]
purpose: Mapped[str | None]
amended_bill_id: Mapped[int | None] = mapped_column(
ForeignKey("main.bill.id", ondelete="SET NULL")
)
amended_amendment_id: Mapped[int | None] = mapped_column(
ForeignKey("main.amendment.id", ondelete="SET NULL")
)
source_path: Mapped[str | None]
source_artifact_id: Mapped[int | None] = mapped_column(
ForeignKey("main.source_artifact.id", ondelete="SET NULL")
)
actions: Mapped[list[AmendmentAction]] = relationship(
"AmendmentAction",
back_populates="amendment",
cascade="all, delete-orphan",
)
amended_amendment: Mapped[Amendment | None] = relationship(
"Amendment",
remote_side="Amendment.id",
)
class AmendmentAction(DataScienceDevTableBase):
"""Official action row for an amendment."""
__tablename__ = "amendment_action"
__table_args__ = (
UniqueConstraint(
"amendment_id",
"sequence",
name="uq_amendment_action_amendment_id_sequence",
),
)
amendment_id: Mapped[int] = mapped_column(
ForeignKey("main.amendment.id", ondelete="CASCADE")
)
sequence: Mapped[int]
action_date: Mapped[date]
action_time: Mapped[str | None]
action_text: Mapped[str]
action_type: Mapped[str | None]
action_code: Mapped[str | None]
source_system_code: Mapped[str | None]
source_system_name: Mapped[str | None]
source_artifact_id: Mapped[int | None] = mapped_column(
ForeignKey("main.source_artifact.id", ondelete="SET NULL")
)
amendment: Mapped[Amendment] = relationship(
"Amendment",
back_populates="actions",
)
recorded_votes: Mapped[list[AmendmentActionRecordedVote]] = relationship(
"AmendmentActionRecordedVote",
back_populates="amendment_action",
cascade="all, delete-orphan",
)
class AmendmentActionRecordedVote(DataScienceDevTableBase):
"""Recorded vote nested under one official amendment action."""
__tablename__ = "amendment_action_recorded_vote"
__table_args__ = (
UniqueConstraint(
"amendment_action_id",
"congress",
"chamber",
"session_number",
"roll_number",
name="uq_amendment_action_recorded_vote_match_key",
),
Index(
"ix_amendment_action_recorded_vote_match_tuple",
"congress",
"chamber",
"session_number",
"roll_number",
),
)
amendment_action_id: Mapped[int] = mapped_column(
ForeignKey("main.amendment_action.id", ondelete="CASCADE")
)
congress: Mapped[int]
chamber: Mapped[str]
session_number: Mapped[int]
roll_number: Mapped[int]
vote_datetime: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
vote_url: Mapped[str | None]
amendment_action: Mapped[AmendmentAction] = relationship(
"AmendmentAction",
back_populates="recorded_votes",
)
+13 -140
View File
@@ -1,4 +1,4 @@
"""Bill models for legislation, official actions, text versions, and topic tags.""" """Bill model - legislation introduced in Congress."""
from __future__ import annotations from __future__ import annotations
@@ -12,7 +12,7 @@ from sqlalchemy.orm import Mapped, mapped_column, relationship
from pipelines.orm.data_science_dev.base import DataScienceDevTableBase from pipelines.orm.data_science_dev.base import DataScienceDevTableBase
if TYPE_CHECKING: if TYPE_CHECKING:
from pipelines.orm.data_science_dev.congress.context import VoteMeasureLink from pipelines.orm.data_science_dev.congress.vote import Vote
class BillTopicPosition(StrEnum): class BillTopicPosition(StrEnum):
@@ -22,17 +22,6 @@ class BillTopicPosition(StrEnum):
AGAINST = "against" AGAINST = "against"
def _enum_column(enum_cls: type[StrEnum], *, name: str) -> Enum:
"""Build a portable SQLAlchemy enum column for StrEnum values."""
return Enum(
enum_cls,
values_callable=lambda enum_type: [member.value for member in enum_type],
native_enum=False,
name=name,
)
class Bill(DataScienceDevTableBase): class Bill(DataScienceDevTableBase):
"""Legislation with congress number, type, titles, status, and sponsor.""" """Legislation with congress number, type, titles, status, and sponsor."""
@@ -60,6 +49,10 @@ class Bill(DataScienceDevTableBase):
subjects_top_term: Mapped[str | None] subjects_top_term: Mapped[str | None]
score_processed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) score_processed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
votes: Mapped[list[Vote]] = relationship(
"Vote",
back_populates="bill",
)
bill_texts: Mapped[list[BillText]] = relationship( bill_texts: Mapped[list[BillText]] = relationship(
"BillText", "BillText",
back_populates="bill", back_populates="bill",
@@ -70,28 +63,6 @@ class Bill(DataScienceDevTableBase):
back_populates="bill", back_populates="bill",
cascade="all, delete-orphan", cascade="all, delete-orphan",
) )
bill_actions: Mapped[list[BillAction]] = relationship(
"BillAction",
back_populates="bill",
cascade="all, delete-orphan",
)
outgoing_bill_relations: Mapped[list[BillRelation]] = relationship(
"BillRelation",
foreign_keys="BillRelation.bill_id",
back_populates="bill",
cascade="all, delete-orphan",
)
incoming_bill_relations: Mapped[list[BillRelation]] = relationship(
"BillRelation",
foreign_keys="BillRelation.related_bill_id",
back_populates="related_bill",
cascade="all, delete-orphan",
)
vote_measure_links: Mapped[list[VoteMeasureLink]] = relationship(
"VoteMeasureLink",
back_populates="measure",
cascade="all, delete-orphan",
)
class BillText(DataScienceDevTableBase): class BillText(DataScienceDevTableBase):
@@ -113,113 +84,10 @@ class BillText(DataScienceDevTableBase):
summarization_user_prompt_version: Mapped[str | None] summarization_user_prompt_version: Mapped[str | None]
summarization_system_prompt_version: Mapped[str | None] summarization_system_prompt_version: Mapped[str | None]
date: Mapped[date | None] date: Mapped[date | None]
source_datetime_raw: Mapped[str | None]
text_url_xml: Mapped[str | None]
text_url_pdf: Mapped[str | None]
text_url_html: Mapped[str | None]
source_artifact_id: Mapped[int | None] = mapped_column(
ForeignKey("main.source_artifact.id", ondelete="SET NULL")
)
bill: Mapped[Bill] = relationship("Bill", back_populates="bill_texts") bill: Mapped[Bill] = relationship("Bill", back_populates="bill_texts")
# suport multipu summary prer bill
class BillAction(DataScienceDevTableBase):
"""Official action row from Bill Status XML."""
__tablename__ = "bill_action"
__table_args__ = (
UniqueConstraint("bill_id", "sequence", name="uq_bill_action_bill_id_sequence"),
)
bill_id: Mapped[int] = mapped_column(ForeignKey("main.bill.id", ondelete="CASCADE"))
sequence: Mapped[int]
action_date: Mapped[date]
action_time: Mapped[str | None]
action_text: Mapped[str]
action_type: Mapped[str | None]
action_code: Mapped[str | None]
source_system_code: Mapped[str | None]
source_system_name: Mapped[str | None]
source_artifact_id: Mapped[int | None] = mapped_column(
ForeignKey("main.source_artifact.id", ondelete="SET NULL")
)
bill: Mapped[Bill] = relationship("Bill", back_populates="bill_actions")
recorded_votes: Mapped[list[BillActionRecordedVote]] = relationship(
"BillActionRecordedVote",
back_populates="bill_action",
cascade="all, delete-orphan",
)
class BillActionRecordedVote(DataScienceDevTableBase):
"""Recorded vote nested under one official bill action."""
__tablename__ = "bill_action_recorded_vote"
__table_args__ = (
UniqueConstraint(
"bill_action_id",
"congress",
"chamber",
"session_number",
"roll_number",
name="uq_bill_action_recorded_vote_match_key",
),
Index(
"ix_bill_action_recorded_vote_match_tuple",
"congress",
"chamber",
"session_number",
"roll_number",
),
)
bill_action_id: Mapped[int] = mapped_column(
ForeignKey("main.bill_action.id", ondelete="CASCADE")
)
congress: Mapped[int]
chamber: Mapped[str]
session_number: Mapped[int]
roll_number: Mapped[int]
vote_datetime: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
vote_url: Mapped[str | None]
bill_action: Mapped[BillAction] = relationship(
"BillAction",
back_populates="recorded_votes",
)
class BillRelation(DataScienceDevTableBase):
"""Relationship between one bill/resolution and another."""
__tablename__ = "bill_relation"
__table_args__ = (
Index("ix_bill_relation_bill_id", "bill_id"),
Index("ix_bill_relation_related_bill_id", "related_bill_id"),
)
bill_id: Mapped[int] = mapped_column(ForeignKey("main.bill.id", ondelete="CASCADE"))
related_bill_id: Mapped[int] = mapped_column(
ForeignKey("main.bill.id", ondelete="CASCADE")
)
relationship_type: Mapped[str]
identified_by: Mapped[str | None]
latest_action_date: Mapped[date | None]
latest_action_text: Mapped[str | None]
bill: Mapped[Bill] = relationship(
"Bill",
foreign_keys=[bill_id],
back_populates="outgoing_bill_relations",
)
related_bill: Mapped[Bill] = relationship(
"Bill",
foreign_keys=[related_bill_id],
back_populates="incoming_bill_relations",
)
class BillTopic(DataScienceDevTableBase): class BillTopic(DataScienceDevTableBase):
"""One bill stance on one topic used to score roll-call votes.""" """One bill stance on one topic used to score roll-call votes."""
@@ -238,7 +106,12 @@ class BillTopic(DataScienceDevTableBase):
bill_id: Mapped[int] = mapped_column(ForeignKey("main.bill.id", ondelete="CASCADE")) bill_id: Mapped[int] = mapped_column(ForeignKey("main.bill.id", ondelete="CASCADE"))
topic: Mapped[str] topic: Mapped[str]
support_position: Mapped[BillTopicPosition] = mapped_column( support_position: Mapped[BillTopicPosition] = mapped_column(
_enum_column(BillTopicPosition, name="bill_topic_position") Enum(
BillTopicPosition,
values_callable=lambda enum_cls: [member.value for member in enum_cls],
native_enum=False,
name="bill_topic_position",
)
) )
bill: Mapped[Bill] = relationship("Bill", back_populates="topics") bill: Mapped[Bill] = relationship("Bill", back_populates="topics")
@@ -1,462 +0,0 @@
"""Canonical vote context, artifact tracking, and run metadata models."""
from __future__ import annotations
from datetime import datetime
from enum import StrEnum
from typing import TYPE_CHECKING
from sqlalchemy import DateTime, Enum, ForeignKey, Index, func, text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from pipelines.orm.data_science_dev.base import DataScienceDevTableBase
if TYPE_CHECKING:
from pipelines.orm.data_science_dev.congress.amendment import Amendment, AmendmentAction
from pipelines.orm.data_science_dev.congress.bill import Bill, BillAction, BillText
from pipelines.orm.data_science_dev.congress.legislator import LegislatorScore
from pipelines.orm.data_science_dev.congress.vote import Vote
def _enum_column(enum_cls: type[StrEnum], *, name: str) -> Enum:
"""Build a portable SQLAlchemy enum column for StrEnum values."""
return Enum(
enum_cls,
values_callable=lambda enum_type: [member.value for member in enum_type],
native_enum=False,
name=name,
)
class ConfidenceLevel(StrEnum):
"""Low/medium/high confidence buckets."""
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
class VoteActionScope(StrEnum):
"""Whether a matched action came from bill or amendment context."""
BILL = "bill"
AMENDMENT = "amendment"
class SubjectType(StrEnum):
"""The direct legal/procedural subject of the vote."""
MEASURE = "measure"
AMENDMENT = "amendment"
NOMINATION = "nomination"
TREATY = "treaty"
QUORUM = "quorum"
CHAMBER_ADMIN = "chamber_admin"
UNKNOWN = "unknown"
class MeasureSubtype(StrEnum):
"""Formal congressional measure subtype."""
BILL = "bill"
JOINT_RESOLUTION = "joint_resolution"
CONCURRENT_RESOLUTION = "concurrent_resolution"
SIMPLE_RESOLUTION = "simple_resolution"
class MeasureFunction(StrEnum):
"""Semantic function of a measure beyond its formal subtype."""
SUBSTANTIVE_MEASURE = "substantive_measure"
SPECIAL_RULE = "special_rule"
BUDGET_RESOLUTION = "budget_resolution"
CHAMBER_INTERNAL = "chamber_internal"
COMMEMORATIVE_OR_SENSE_OF = "commemorative_or_sense_of"
UNKNOWN = "unknown"
class VoteRelationship(StrEnum):
"""The vote's relationship to the direct subject and its text."""
DIRECT_TEXT_VOTE = "direct_text_vote"
AMENDMENT_TEXT_VOTE = "amendment_text_vote"
PROCEDURAL_RELATED_TO_MEASURE = "procedural_related_to_measure"
PROCEDURAL_RELATED_TO_AMENDMENT = "procedural_related_to_amendment"
NON_LEGISLATIVE = "non_legislative"
UNKNOWN = "unknown"
class ClassificationMethod(StrEnum):
"""How the final classification was derived."""
RECORDED_VOTE_ACTION_EXACT = "recorded_vote_action_exact"
RECORDED_VOTE_ACTION_DUPLICATE_SOURCE_DEDUPED = (
"recorded_vote_action_duplicate_source_deduped"
)
VOTE_XML_ONLY = "vote_xml_only"
QUESTION_TEXT_ONLY = "question_text_only"
MANUAL_REVIEW = "manual_review"
class VoteMeasureRole(StrEnum):
"""How one measure relates to one classified vote."""
VOTED_ON = "voted_on"
RULE_FOR = "rule_for"
UNDERLYING_BILL = "underlying_bill"
PROCEDURAL_TARGET = "procedural_target"
AMENDS = "amends"
AMENDED_BY = "amended_by"
CONFERENCE_REPORT_FOR = "conference_report_for"
RELATED_ONLY = "related_only"
class TextTargetType(StrEnum):
"""Which kind of legislative text was the object of a vote."""
BILL_TEXT = "bill_text"
RESOLUTION_TEXT = "resolution_text"
AMENDMENT_TEXT = "amendment_text"
NONE = "none"
UNKNOWN = "unknown"
class TextTargetBasis(StrEnum):
"""How the text target should be interpreted."""
EXACT_ACTION_TEXT_VERSION = "exact_action_text_version"
RESULTING_ENGROSSED_VERSION = "resulting_engrossed_version"
RECEIVED_PRIOR_CHAMBER_VERSION = "received_prior_chamber_version"
AMENDMENT_TEXT = "amendment_text"
RULE_RESOLUTION_TEXT = "rule_resolution_text"
NO_TEXT_TARGET = "no_text_target"
UNKNOWN = "unknown"
class TextResolutionMethod(StrEnum):
"""How the official text target was resolved."""
TEXT_EXACT_ACTION_DATE_AND_CODE = "text_exact_action_date_and_code"
TEXT_EXACT_ACTION_DATE_WRONG_CODE = "text_exact_action_date_wrong_code"
TEXT_PRIOR_VERSION_CODE_MATCH = "text_prior_version_code_match"
TEXT_RECEIVED_PRIOR_CHAMBER_VERSION = "text_received_prior_chamber_version"
TEXT_RESULTING_ENROLLED_ONLY = "text_resulting_enrolled_only"
AMENDMENT_TEXT_UNMODELED_PHASE1 = "amendment_text_unmodeled_phase1"
NO_TEXT_TARGET = "no_text_target"
UNKNOWN = "unknown"
class VoteEffect(StrEnum):
"""Meaning of one member position relative to the target text/procedure."""
SUPPORTS_TEXT = "supports_text"
OPPOSES_TEXT = "opposes_text"
ADVANCES_PROCEDURE = "advances_procedure"
BLOCKS_PROCEDURE = "blocks_procedure"
UNKNOWN = "unknown"
class IngestRun(DataScienceDevTableBase):
"""One full ingestion or context rebuild run."""
__tablename__ = "ingest_run"
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
git_sha: Mapped[str | None]
classifier_version: Mapped[str | None]
source_snapshot_label: Mapped[str | None]
status: Mapped[str]
source_artifacts: Mapped[list[SourceArtifact]] = relationship(
"SourceArtifact",
back_populates="ingest_run",
cascade="all, delete-orphan",
)
score_runs: Mapped[list[ScoreRun]] = relationship(
"ScoreRun",
back_populates="ingest_run",
cascade="all, delete-orphan",
)
class SourceArtifact(DataScienceDevTableBase):
"""Local artifact manifest entry for reproducibility."""
__tablename__ = "source_artifact"
__table_args__ = (
Index("ix_source_artifact_source_kind", "source_kind"),
Index("ix_source_artifact_congress", "congress"),
Index(
"uq_source_artifact_ingest_identity",
"ingest_run_id",
"local_path",
"sha256",
unique=True,
),
)
source_kind: Mapped[str]
congress: Mapped[int]
chamber: Mapped[str | None]
local_path: Mapped[str]
source_url: Mapped[str | None]
sha256: Mapped[str]
byte_size: Mapped[int]
modified_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
ingested_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
ingest_run_id: Mapped[int | None] = mapped_column(
ForeignKey("main.ingest_run.id", ondelete="SET NULL")
)
ingest_run: Mapped[IngestRun | None] = relationship(
"IngestRun",
back_populates="source_artifacts",
)
class ScoreRun(DataScienceDevTableBase):
"""One full score recomputation tied to one ingest snapshot."""
__tablename__ = "score_run"
ingest_run_id: Mapped[int | None] = mapped_column(
ForeignKey("main.ingest_run.id", ondelete="SET NULL")
)
classifier_version: Mapped[str | None]
scoring_version: Mapped[str | None]
included_vote_count: Mapped[int]
excluded_vote_count: Mapped[int]
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
ingest_run: Mapped[IngestRun | None] = relationship(
"IngestRun",
back_populates="score_runs",
)
scores: Mapped[list[LegislatorScore]] = relationship(
"LegislatorScore",
back_populates="score_run",
cascade="all, delete-orphan",
)
class VoteActionMatch(DataScienceDevTableBase):
"""A candidate or selected official action match for one raw vote."""
__tablename__ = "vote_action_match"
__table_args__ = (
Index("ix_vote_action_match_vote_id", "vote_id"),
Index(
"uq_vote_action_match_selected_vote_id",
"vote_id",
unique=True,
postgresql_where=text("is_selected"),
),
)
vote_id: Mapped[int] = mapped_column(ForeignKey("main.vote.id", ondelete="CASCADE"))
action_scope: Mapped[VoteActionScope] = mapped_column(
_enum_column(VoteActionScope, name="vote_action_scope")
)
bill_action_id: Mapped[int | None] = mapped_column(
ForeignKey("main.bill_action.id", ondelete="CASCADE")
)
amendment_action_id: Mapped[int | None] = mapped_column(
ForeignKey("main.amendment_action.id", ondelete="CASCADE")
)
is_selected: Mapped[bool]
match_method: Mapped[str]
match_reason: Mapped[str | None]
match_confidence: Mapped[ConfidenceLevel] = mapped_column(
_enum_column(ConfidenceLevel, name="vote_action_match_confidence")
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
)
vote: Mapped[Vote] = relationship("Vote", back_populates="action_matches")
bill_action: Mapped[BillAction | None] = relationship("BillAction")
amendment_action: Mapped[AmendmentAction | None] = relationship("AmendmentAction")
class VoteClassification(DataScienceDevTableBase):
"""Normalized classification for what a vote was legally/procedurally on."""
__tablename__ = "vote_classification"
__table_args__ = (
Index("ix_vote_classification_subject_type", "subject_type"),
Index(
"ix_vote_classification_eligible_vote_id",
"vote_id",
postgresql_where=text(
"subject_type = 'measure' "
"AND vote_relationship = 'direct_text_vote' "
"AND is_direct_vote_on_legislative_text "
"AND is_substantive_policy_vote "
"AND NOT is_special_rule"
),
),
)
vote_id: Mapped[int] = mapped_column(
ForeignKey("main.vote.id", ondelete="CASCADE"),
unique=True,
)
subject_type: Mapped[SubjectType] = mapped_column(
_enum_column(SubjectType, name="vote_subject_type")
)
measure_type: Mapped[str | None]
measure_subtype: Mapped[MeasureSubtype | None] = mapped_column(
_enum_column(MeasureSubtype, name="vote_measure_subtype")
)
measure_function: Mapped[MeasureFunction | None] = mapped_column(
_enum_column(MeasureFunction, name="vote_measure_function")
)
vote_relationship: Mapped[VoteRelationship] = mapped_column(
_enum_column(VoteRelationship, name="vote_relationship")
)
is_legislation_related: Mapped[bool]
is_direct_vote_on_legislative_text: Mapped[bool]
is_substantive_policy_vote: Mapped[bool]
is_lawmaking_vehicle: Mapped[bool]
is_special_rule: Mapped[bool]
classification_method: Mapped[ClassificationMethod] = mapped_column(
_enum_column(ClassificationMethod, name="vote_classification_method")
)
classification_confidence_reason: Mapped[str | None]
confidence: Mapped[ConfidenceLevel] = mapped_column(
_enum_column(ConfidenceLevel, name="vote_classification_confidence")
)
classified_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
classification_version: Mapped[str]
vote: Mapped[Vote] = relationship("Vote", back_populates="classification")
class VoteMeasureLink(DataScienceDevTableBase):
"""Relationship between a classified vote and one bill/resolution measure."""
__tablename__ = "vote_measure_link"
__table_args__ = (
Index("ix_vote_measure_link_vote_id", "vote_id"),
Index("ix_vote_measure_link_vote_id_role", "vote_id", "role"),
Index("ix_vote_measure_link_measure_id_role", "measure_id", "role"),
)
vote_id: Mapped[int] = mapped_column(ForeignKey("main.vote.id", ondelete="CASCADE"))
measure_id: Mapped[int] = mapped_column(ForeignKey("main.bill.id", ondelete="CASCADE"))
role: Mapped[VoteMeasureRole] = mapped_column(
_enum_column(VoteMeasureRole, name="vote_measure_role")
)
source: Mapped[str]
confidence: Mapped[ConfidenceLevel] = mapped_column(
_enum_column(ConfidenceLevel, name="vote_measure_link_confidence")
)
notes: Mapped[str | None]
vote: Mapped[Vote] = relationship("Vote", back_populates="vote_measure_links")
measure: Mapped[Bill] = relationship("Bill", back_populates="vote_measure_links")
class VoteTextTarget(DataScienceDevTableBase):
"""Official text target, if any, resolved for one classified vote."""
__tablename__ = "vote_text_target"
__table_args__ = (
Index(
"ix_vote_text_target_voted_text_version_id",
"voted_text_version_id",
postgresql_where=text("voted_text_version_id IS NOT NULL"),
),
)
vote_id: Mapped[int] = mapped_column(
ForeignKey("main.vote.id", ondelete="CASCADE"),
unique=True,
)
text_target_type: Mapped[TextTargetType] = mapped_column(
_enum_column(TextTargetType, name="vote_text_target_type")
)
voted_text_version_id: Mapped[int | None] = mapped_column(
ForeignKey("main.bill_text.id", ondelete="SET NULL")
)
resulting_text_version_id: Mapped[int | None] = mapped_column(
ForeignKey("main.bill_text.id", ondelete="SET NULL")
)
related_amendment_id: Mapped[int | None] = mapped_column(
ForeignKey("main.amendment.id", ondelete="SET NULL")
)
text_target_basis: Mapped[TextTargetBasis] = mapped_column(
_enum_column(TextTargetBasis, name="vote_text_target_basis")
)
text_resolution_method: Mapped[TextResolutionMethod] = mapped_column(
_enum_column(TextResolutionMethod, name="vote_text_resolution_method")
)
text_resolution_confidence_reason: Mapped[str | None]
confidence: Mapped[ConfidenceLevel] = mapped_column(
_enum_column(ConfidenceLevel, name="vote_text_target_confidence")
)
notes: Mapped[str | None]
vote: Mapped[Vote] = relationship("Vote", back_populates="text_target")
voted_text_version: Mapped[BillText | None] = relationship(
"BillText",
foreign_keys=[voted_text_version_id],
)
resulting_text_version: Mapped[BillText | None] = relationship(
"BillText",
foreign_keys=[resulting_text_version_id],
)
related_amendment: Mapped[Amendment | None] = relationship("Amendment")
class VotePositionMeaning(DataScienceDevTableBase):
"""Meaning of Yea/Nay/Present positions for one classified vote."""
__tablename__ = "vote_position_meaning"
vote_id: Mapped[int] = mapped_column(
ForeignKey("main.vote.id", ondelete="CASCADE"),
unique=True,
)
yea_effect: Mapped[VoteEffect] = mapped_column(
_enum_column(VoteEffect, name="vote_yea_effect")
)
nay_effect: Mapped[VoteEffect] = mapped_column(
_enum_column(VoteEffect, name="vote_nay_effect")
)
present_effect: Mapped[VoteEffect] = mapped_column(
_enum_column(VoteEffect, name="vote_present_effect")
)
polarity_confidence: Mapped[ConfidenceLevel] = mapped_column(
_enum_column(ConfidenceLevel, name="vote_polarity_confidence")
)
polarity_method: Mapped[str]
notes: Mapped[str | None]
vote: Mapped[Vote] = relationship("Vote", back_populates="position_meaning")
class VoteContextAudit(DataScienceDevTableBase):
"""Audit/event row for ambiguous or noteworthy vote-context decisions."""
__tablename__ = "vote_context_audit"
__table_args__ = (
Index("ix_vote_context_audit_vote_id", "vote_id"),
Index("ix_vote_context_audit_severity_vote_id", "severity", "vote_id"),
)
vote_id: Mapped[int] = mapped_column(ForeignKey("main.vote.id", ondelete="CASCADE"))
step: Mapped[str]
message: Mapped[str]
severity: Mapped[str]
source_path: Mapped[str | None]
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
)
vote: Mapped[Vote] = relationship("Vote", back_populates="context_audit_rows")
@@ -11,7 +11,6 @@ from sqlalchemy.orm import Mapped, mapped_column, relationship
from pipelines.orm.data_science_dev.base import DataScienceDevTableBase from pipelines.orm.data_science_dev.base import DataScienceDevTableBase
if TYPE_CHECKING: if TYPE_CHECKING:
from pipelines.orm.data_science_dev.congress.context import ScoreRun
from pipelines.orm.data_science_dev.congress.vote import VoteRecord from pipelines.orm.data_science_dev.congress.vote import VoteRecord
@@ -19,7 +18,6 @@ class Legislator(DataScienceDevTableBase):
"""Members of Congress with identification and current term info.""" """Members of Congress with identification and current term info."""
__tablename__ = "legislator" __tablename__ = "legislator"
__table_args__ = (Index("ix_legislator_current_chamber", "current_chamber"),)
bioguide_id: Mapped[str] = mapped_column(Text, unique=True, index=True) bioguide_id: Mapped[str] = mapped_column(Text, unique=True, index=True)
@@ -93,39 +91,8 @@ class LegislatorScore(DataScienceDevTableBase):
ForeignKey("main.legislator.id", ondelete="CASCADE"), ForeignKey("main.legislator.id", ondelete="CASCADE"),
index=True, index=True,
) )
score_run_id: Mapped[int | None] = mapped_column(
ForeignKey("main.score_run.id", ondelete="CASCADE"),
index=True,
)
year: Mapped[int] year: Mapped[int]
topic: Mapped[str] topic: Mapped[str]
score: Mapped[float] score: Mapped[float]
legislator: Mapped[Legislator] = relationship(back_populates="scores") legislator: Mapped[Legislator] = relationship(back_populates="scores")
score_run: Mapped[ScoreRun | None] = relationship(
"ScoreRun",
back_populates="scores",
)
class LegislatorScoreFake(DataScienceDevTableBase):
"""Computed topic score for a legislator in one calendar year."""
__tablename__ = "legislator_score_fake"
__table_args__ = (
UniqueConstraint(
"legislator_id",
"year",
"topic",
name="uq_legislator_score_fake_legislator_id_year_topic",
),
Index("ix_legislator_score_fake_year_topic", "year", "topic"),
)
legislator_id: Mapped[int] = mapped_column(
ForeignKey("main.legislator.id", ondelete="CASCADE"),
index=True,
)
year: Mapped[int]
topic: Mapped[str]
score: Mapped[float]
+14 -61
View File
@@ -1,12 +1,11 @@
"""Vote models for raw roll-call data and member positions.""" """Vote model - roll call votes in Congress."""
from __future__ import annotations from __future__ import annotations
from datetime import date, datetime from datetime import date
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from sqlalchemy import DateTime, ForeignKey, Index, UniqueConstraint from sqlalchemy import ForeignKey, Index, UniqueConstraint
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy.orm import Mapped, mapped_column, relationship
from pipelines.orm.data_science_dev.base import ( from pipelines.orm.data_science_dev.base import (
@@ -15,15 +14,9 @@ from pipelines.orm.data_science_dev.base import (
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from pipelines.orm.data_science_dev.congress.context import ( from pipelines.orm.data_science_dev.congress.bill import Bill
VoteActionMatch,
VoteClassification,
VoteContextAudit,
VoteMeasureLink,
VotePositionMeaning,
VoteTextTarget,
)
from pipelines.orm.data_science_dev.congress.legislator import Legislator from pipelines.orm.data_science_dev.congress.legislator import Legislator
from pipelines.orm.data_science_dev.congress.vote import Vote
class VoteRecord(DataScienceDevBase): class VoteRecord(DataScienceDevBase):
@@ -48,16 +41,16 @@ class VoteRecord(DataScienceDevBase):
class Vote(DataScienceDevTableBase): class Vote(DataScienceDevTableBase):
"""Raw roll call vote facts from House or Senate vote sources.""" """Roll call votes with counts and optional bill linkage."""
__tablename__ = "vote" __tablename__ = "vote"
__table_args__ = ( __table_args__ = (
UniqueConstraint( UniqueConstraint(
"congress", "congress",
"chamber", "chamber",
"session_number", "session",
"roll_number", "number",
name="uq_vote_congress_chamber_session_number_roll_number", name="uq_vote_congress_chamber_session_number",
), ),
Index("ix_vote_date", "vote_date"), Index("ix_vote_date", "vote_date"),
Index("ix_vote_congress_chamber", "congress", "chamber"), Index("ix_vote_congress_chamber", "congress", "chamber"),
@@ -65,9 +58,8 @@ class Vote(DataScienceDevTableBase):
congress: Mapped[int] congress: Mapped[int]
chamber: Mapped[str] chamber: Mapped[str]
session_year: Mapped[int] session: Mapped[int]
session_number: Mapped[int] number: Mapped[int]
roll_number: Mapped[int]
vote_type: Mapped[str | None] vote_type: Mapped[str | None]
question: Mapped[str | None] question: Mapped[str | None]
@@ -75,57 +67,18 @@ class Vote(DataScienceDevTableBase):
result_text: Mapped[str | None] result_text: Mapped[str | None]
vote_date: Mapped[date] vote_date: Mapped[date]
vote_datetime: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
raw_vote_source_url: Mapped[str | None]
yea_count: Mapped[int | None] yea_count: Mapped[int | None]
nay_count: Mapped[int | None] nay_count: Mapped[int | None]
not_voting_count: Mapped[int | None] not_voting_count: Mapped[int | None]
present_count: Mapped[int | None] present_count: Mapped[int | None]
raw_bill_ref: Mapped[dict | None] = mapped_column(JSONB) bill_id: Mapped[int | None] = mapped_column(ForeignKey("main.bill.id"))
raw_amendment_ref: Mapped[dict | None] = mapped_column(JSONB)
raw_nomination_ref: Mapped[dict | None] = mapped_column(JSONB)
raw_treaty_ref: Mapped[dict | None] = mapped_column(JSONB)
raw_vote_source_artifact_id: Mapped[int | None] = mapped_column(
ForeignKey("main.source_artifact.id", ondelete="SET NULL")
)
bill: Mapped[Bill | None] = relationship("Bill", back_populates="votes")
vote_records: Mapped[list[VoteRecord]] = relationship( vote_records: Mapped[list[VoteRecord]] = relationship(
"VoteRecord", "VoteRecord",
back_populates="vote", back_populates="vote",
cascade="all, delete-orphan", cascade="all, delete-orphan",
) )
action_matches: Mapped[list[VoteActionMatch]] = relationship(
"VoteActionMatch",
back_populates="vote",
cascade="all, delete-orphan",
)
classification: Mapped[VoteClassification | None] = relationship(
"VoteClassification",
back_populates="vote",
cascade="all, delete-orphan",
uselist=False,
)
vote_measure_links: Mapped[list[VoteMeasureLink]] = relationship(
"VoteMeasureLink",
back_populates="vote",
cascade="all, delete-orphan",
)
text_target: Mapped[VoteTextTarget | None] = relationship(
"VoteTextTarget",
back_populates="vote",
cascade="all, delete-orphan",
uselist=False,
)
position_meaning: Mapped[VotePositionMeaning | None] = relationship(
"VotePositionMeaning",
back_populates="vote",
cascade="all, delete-orphan",
uselist=False,
)
context_audit_rows: Mapped[list[VoteContextAudit]] = relationship(
"VoteContextAudit",
back_populates="vote",
cascade="all, delete-orphan",
)
-54
View File
@@ -3,80 +3,26 @@
from __future__ import annotations from __future__ import annotations
from pipelines.orm.data_science_dev.congress import ( from pipelines.orm.data_science_dev.congress import (
Amendment,
AmendmentAction,
AmendmentActionRecordedVote,
Bill, Bill,
BillAction,
BillActionRecordedVote,
BillRelation,
BillText, BillText,
BillTopic, BillTopic,
BillTopicPosition, BillTopicPosition,
ClassificationMethod,
ConfidenceLevel,
IngestRun,
Legislator, Legislator,
LegislatorScore, LegislatorScore,
MeasureFunction,
MeasureSubtype,
ScoreRun,
SourceArtifact,
SubjectType,
TextResolutionMethod,
TextTargetBasis,
TextTargetType,
Vote, Vote,
VoteActionMatch,
VoteActionScope,
VoteClassification,
VoteContextAudit,
VoteEffect,
VoteMeasureLink,
VoteMeasureRole,
VotePositionMeaning,
VoteRelationship,
VoteRecord, VoteRecord,
VoteTextTarget,
) )
from pipelines.orm.data_science_dev.posts import partitions # noqa: F401 — registers partition classes in metadata from pipelines.orm.data_science_dev.posts import partitions # noqa: F401 — registers partition classes in metadata
from pipelines.orm.data_science_dev.posts.tables import Posts from pipelines.orm.data_science_dev.posts.tables import Posts
__all__ = [ __all__ = [
"Amendment",
"AmendmentAction",
"AmendmentActionRecordedVote",
"Bill", "Bill",
"BillAction",
"BillActionRecordedVote",
"BillRelation",
"BillText", "BillText",
"BillTopic", "BillTopic",
"BillTopicPosition", "BillTopicPosition",
"ClassificationMethod",
"ConfidenceLevel",
"IngestRun",
"Legislator", "Legislator",
"LegislatorScore", "LegislatorScore",
"MeasureFunction",
"MeasureSubtype",
"Posts", "Posts",
"ScoreRun",
"SourceArtifact",
"SubjectType",
"TextResolutionMethod",
"TextTargetBasis",
"TextTargetType",
"Vote", "Vote",
"VoteActionMatch",
"VoteActionScope",
"VoteClassification",
"VoteContextAudit",
"VoteEffect",
"VoteMeasureLink",
"VoteMeasureRole",
"VotePositionMeaning",
"VoteRelationship",
"VoteRecord", "VoteRecord",
"VoteTextTarget",
] ]
@@ -3,10 +3,9 @@
from __future__ import annotations from __future__ import annotations
from pipelines.orm.data_science_dev.posts.failed_ingestion import FailedIngestion from pipelines.orm.data_science_dev.posts.failed_ingestion import FailedIngestion
from pipelines.orm.data_science_dev.posts.tables import Posts, PostTopic from pipelines.orm.data_science_dev.posts.tables import Posts
__all__ = [ __all__ = [
"FailedIngestion", "FailedIngestion",
"Posts", "Posts",
"PostTopic",
] ]
@@ -1,195 +0,0 @@
"""Shared language filter constants for post sampling queries."""
from __future__ import annotations
ENGLISH_LANGS = (
'["", "", ""]',
'[""]',
"[]",
'["", "eng"]',
'["eng", "", ""]',
'["eng", ""]',
'["eng"]',
'["eng", "aar"]',
'["eng", "abk", "afr"]',
'["eng", "afr"]',
'["eng", "afr", "abk"]',
'["eng", "afr", "anp"]',
'["eng", "afr", "ber"]',
'["eng", "afr", "dan"]',
'["eng", "afr", "deu"]',
'["eng", "afr", "est"]',
'["eng", "afr", "fra"]',
'["eng", "afr", "ind"]',
'["eng", "afr", "lat"]',
'["eng", "afr", "nld"]',
'["eng", "afr", "nor"]',
'["eng", "afr", "pol"]',
'["eng", "afr", "por"]',
'["eng", "afr", "ron"]',
'["eng", "afr", "slk"]',
'["eng", "afr", "spa"]',
'["eng", "afr", "tgl"]',
'["eng", "afr", "tuk"]',
'["eng", "afr", "tur"]',
'["eng", "afr", "ukr"]',
'["eng", "afr", "vol"]',
'["eng", "agq"]',
'["eng", "ain"]',
'["eng", "ain", "amh"]',
'["eng", "ain", "jpn"]',
'["eng", "aka"]',
'["eng", "amh"]',
'["eng", "amh", "afr"]',
'["eng", "amh", "ara"]',
'["eng", "amh", "fra"]',
'["eng", "anp"]',
'["eng", "anp", "hye"]',
'["eng", "anp", "sqi"]',
'["eng", "", "ara"]',
'["eng", "ara", ""]',
'["eng", "ara"]',
'["eng", "ara", "afr"]',
'["eng", "ara", "anp"]',
'["eng", "ara", "ars"]',
'["eng", "ara", "bul"]',
'["eng", "ara", "cat"]',
'["eng", "ara", "deu"]',
'["eng", "ara", "ell"]',
'["eng", "ara", "fas"]',
'["eng", "ara", "fra"]',
'["eng", "ara", "heb"]',
'["eng", "ara", "hin"]',
'["eng", "ara", "ind"]',
'["eng", "ara", "ita"]',
'["eng", "ara", "jpn"]',
'["eng", "ara", "kas"]',
'["eng", "ara", "kor"]',
'["eng", "ara", "nob"]',
'["eng", "ara", "nor"]',
'["eng", "ara", "rus"]',
'["eng", "ara", "spa"]',
'["eng", "ara", "swe"]',
'["eng", "ara", "tam"]',
'["eng", "ara", "tur"]',
'["eng", "ara", "urd"]',
'["eng", "ara", "zho"]',
'["eng", "arg"]',
'["eng", "arg", "amh"]',
'["eng", "arg", "aze"]',
'["eng", "ars"]',
'["eng", "ars", "ara"]',
'["eng", "asm"]',
'["eng", "ava", "sqi"]',
'["eng", "ave"]',
'["eng", "aze"]',
'["eng", "aze", "deu"]',
'["eng", "aze", "hye"]',
'["eng", "aze", "ita"]',
'["eng", "aze", "rus"]',
'["eng", "bam", ""]',
'["eng", "bel"]',
'["eng", "bel", "rus"]',
'["eng", "ben"]',
'["eng", "ben", "deu"]',
'["eng", "ben", "fra"]',
'["eng", "ben", "hin"]',
'["eng", "ben", "mya"]',
'["eng", "ber"]',
'["eng", "ber", "afr"]',
'["eng", "ber", "deu"]',
'["eng", "ber", "est"]',
'["eng", "ber", "hun"]',
'["eng", "ber", "isl"]',
'["eng", "ber", "jpn"]',
'["eng", "ber", "lat"]',
'["eng", "ber", "nor"]',
'["eng", "ber", "pol"]',
'["eng", "ber", "por"]',
'["eng", "ber", "ron"]',
'["eng", "ber", "run"]',
'["eng", "ber", "slk"]',
'["eng", "ber", "spa"]',
'["eng", "ber", "tgl"]',
'["eng", "ber", "tlh"]',
'["eng", "ber", "tuk"]',
'["eng", "bod"]',
'["eng", "bod", "nep"]',
'["eng", "bos", "hrv"]',
'["eng", "bos", "srp"]',
'["eng", "bul"]',
'["eng", "bul", "deu"]',
'["eng", "bul", "fra"]',
'["eng", "bul", "jpn"]',
'["eng", "bul", "mkd"]',
'["eng", "bul", "mri"]',
'["eng", "bul", "nld"]',
'["eng", "bul", "rus"]',
'["eng", "bul", "srp"]',
'["eng", "cat"]',
'["eng", "cat", "fra"]',
'["eng", "cat", "ind"]',
'["eng", "cat", "isl"]',
'["eng", "cat", "jpn"]',
'["eng", "cat", "nld"]',
'["eng", "cat", "spa"]',
'["eng", "ces"]',
'["eng", "ces", "deu"]',
'["eng", "ces", "ell"]',
'["eng", "ces", "haw"]',
'["eng", "ces", "ind"]',
'["eng", "ces", "ita"]',
'["eng", "ces", "jpn"]',
'["eng", "ces", "por"]',
'["eng", "ces", "rus"]',
'["eng", "ces", "slk"]',
'["eng", "ces", "spa"]',
'["eng", "ces", "tuk"]',
'["eng", "cha"]',
'["eng", "chr"]',
'["eng", "chr", "ara"]',
'["eng", "chr", "deu"]',
'["eng", "chr", "ell"]',
'["eng", "chr", "fil"]',
'["eng", "chr", "isl"]',
'["eng", "chr", "kor"]',
'["eng", "chr", "rus"]',
'["eng", "chr", "spa"]',
'["eng", "chr", "zho"]',
'["eng", "chu", "oci"]',
'["eng", "cor"]',
'["eng", "", "cos"]',
'["eng", "cos"]',
'["eng", "cym"]',
'["eng", "cym", "deu"]',
'["eng", "cym", "fra"]',
'["eng", "cym", "jpn"]',
'["eng", "cym", "spa"]',
'["eng", "cym", "zho"]',
'["eng", "dan"]',
'["eng", "dan", "ber"]',
'["eng", "dan", "deu"]',
'["eng", "dan", "ell"]',
'["eng", "dan", "est"]',
'["eng", "dan", "fas"]',
'["eng", "dan", "fin"]',
'["eng", "dan", "fra"]',
'["eng", "dan", "gle"]',
'["eng", "dan", "hun"]',
'["eng", "dan", "isl"]',
'["eng", "dan", "ita"]',
'["eng", "dan", "jpn"]',
'["eng", "dan", "lat"]',
'["eng", "dan", "nld"]',
'["eng", "dan", "nob"]',
'["eng", "dan", "nor"]',
'["eng", "dan", "por"]',
'["eng", "dan", "rus"]',
'["eng", "dan", "slk"]',
'["eng", "dan", "spa"]',
'["eng", "dan", "swe"]',
'["eng", "dan", "tuk"]',
'["eng", "dan", "zho"]',
'["eng", "deu", ""]',
'["eng", "deu"]',
)
+2 -25
View File
@@ -1,36 +1,13 @@
"""Posts parent table and PostTopic table for the data_science_dev database.""" """Posts parent table with PostgreSQL weekly range partitioning on date column."""
from __future__ import annotations from __future__ import annotations
from pipelines.orm.data_science_dev.base import ( from pipelines.orm.data_science_dev.base import DataScienceDevBase
DataScienceDevBase,
DataScienceDevTableBase,
)
from pipelines.orm.data_science_dev.posts.columns import PostsColumns from pipelines.orm.data_science_dev.posts.columns import PostsColumns
from sqlalchemy import BigInteger, Index, SmallInteger
from sqlalchemy.orm import Mapped, mapped_column
class Posts(PostsColumns, DataScienceDevBase): class Posts(PostsColumns, DataScienceDevBase):
"""Parent partitioned table for posts, partitioned by week on `date`.""" """Parent partitioned table for posts, partitioned by week on `date`."""
__tablename__ = "posts" __tablename__ = "posts"
__table_args__ = ({"postgresql_partition_by": "RANGE (date)"},) __table_args__ = ({"postgresql_partition_by": "RANGE (date)"},)
class PostTopic(DataScienceDevTableBase):
"""Stores BERTopic topic assignments for posts.
post_id references main.posts but without a FK constraint
since posts is a partitioned table.
"""
__tablename__ = "post_topic"
__table_args__ = (Index("ix_post_topic_post_id", "post_id"),)
post_id: Mapped[int] = mapped_column(BigInteger)
topic_id: Mapped[int] = mapped_column(SmallInteger)
topic_label: Mapped[str | None]
model_version: Mapped[str | None]
+25
View File
@@ -0,0 +1,25 @@
# Unsloth fine-tuning container for Qwen 3.5 4B on RTX 3090.
#
# Build:
# docker build -f python/prompt_bench/Dockerfile.finetune -t bill-finetune .
#
# Run:
# docker run --rm --device=nvidia.com/gpu=all --ipc=host \
# -v $(pwd)/output:/workspace/output \
# -v $(pwd)/output/finetune_dataset.jsonl:/workspace/dataset.jsonl:ro \
# -v /zfs/models/hf:/models \
# bill-finetune \
# --dataset /workspace/dataset.jsonl \
# --output-dir /workspace/output/qwen-bill-summarizer
FROM ghcr.io/unslothai/unsloth:latest
RUN pip install --no-cache-dir typer
WORKDIR /workspace
COPY python/prompt_bench/finetune.py python/prompt_bench/finetune.py
COPY config/prompts/summarization_prompts.toml config/prompts/summarization_prompts.toml
COPY python/prompt_bench/__init__.py python/prompt_bench/__init__.py
COPY python/__init__.py python/__init__.py
ENTRYPOINT ["python", "-m", "pipelines.prompt_bench.finetune"]
+2 -6
View File
@@ -23,14 +23,10 @@ import httpx
import typer import typer
from tiktoken import Encoding, get_encoding from tiktoken import Encoding, get_encoding
from pipelines.config import get_config_dir
from pipelines.tools.bill_token_compression import compress_bill_text from pipelines.tools.bill_token_compression import compress_bill_text
_PROMPTS_PATH = ( _PROMPTS_PATH = get_config_dir() / "prompts" / "summarization_prompts.toml"
Path(__file__).resolve().parents[2]
/ "config"
/ "prompts"
/ "summarization_prompts.toml"
)
_PROMPTS = tomllib.loads(_PROMPTS_PATH.read_text())["summarization"] _PROMPTS = tomllib.loads(_PROMPTS_PATH.read_text())["summarization"]
SUMMARIZATION_SYSTEM_PROMPT: str = _PROMPTS["system_prompt"] SUMMARIZATION_SYSTEM_PROMPT: str = _PROMPTS["system_prompt"]
SUMMARIZATION_USER_TEMPLATE: str = _PROMPTS["user_template"] SUMMARIZATION_USER_TEMPLATE: str = _PROMPTS["user_template"]
+2 -6
View File
@@ -24,14 +24,10 @@ from typing import Annotated
import httpx import httpx
import typer import typer
from pipelines.config import get_config_dir
from pipelines.tools.bill_token_compression import compress_bill_text from pipelines.tools.bill_token_compression import compress_bill_text
_PROMPTS_PATH = ( _PROMPTS_PATH = get_config_dir() / "prompts" / "summarization_prompts.toml"
Path(__file__).resolve().parents[2]
/ "config"
/ "prompts"
/ "summarization_prompts.toml"
)
_PROMPTS = tomllib.loads(_PROMPTS_PATH.read_text())["summarization"] _PROMPTS = tomllib.loads(_PROMPTS_PATH.read_text())["summarization"]
SUMMARIZATION_SYSTEM_PROMPT: str = _PROMPTS["system_prompt"] SUMMARIZATION_SYSTEM_PROMPT: str = _PROMPTS["system_prompt"]
SUMMARIZATION_USER_TEMPLATE: str = _PROMPTS["user_template"] SUMMARIZATION_USER_TEMPLATE: str = _PROMPTS["user_template"]
@@ -9,7 +9,7 @@ from typing import Annotated
import typer import typer
from pipelines.pipelines.containers.lib import check_gpu_free from pipelines.tools.containers.lib import check_gpu_free
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -27,7 +27,7 @@ def build_image() -> None:
"docker", "docker",
"build", "build",
"-f", "-f",
str(REPO_DIR / "pipelines/containers/docker_files/Dockerfile.finetune"), str(REPO_DIR / "python/prompt_bench/Dockerfile.finetune"),
"-t", "-t",
FINETUNE_IMAGE, FINETUNE_IMAGE,
".", ".",
+3 -1
View File
@@ -25,6 +25,8 @@ from datasets import Dataset
from transformers import TrainingArguments from transformers import TrainingArguments
from trl import SFTTrainer from trl import SFTTrainer
from pipelines.config import default_config_path
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -123,7 +125,7 @@ def main(
config_path: Annotated[ config_path: Annotated[
Path, Path,
typer.Option("--config", help="TOML config file"), typer.Option("--config", help="TOML config file"),
] = Path(__file__).parent / "config.toml", ] = default_config_path(),
save_gguf: Annotated[ save_gguf: Annotated[
bool, typer.Option("--save-gguf/--no-save-gguf", help="Also save GGUF") bool, typer.Option("--save-gguf/--no-save-gguf", help="Also save GGUF")
] = False, ] = False,
-34
View File
@@ -1,34 +0,0 @@
SUMMARIZATION_SYSTEM_PROMPT = """You are a legislative analyst extracting policy substance from Congressional bill text.
Your job is to compress a bill into a dense, neutral structured summary that captures every distinct policy action including secondary effects that might be buried in subsections.
EXTRACTION RULES:
- IGNORE: whereas clauses, congressional findings that are purely political statements, recitals, preambles, citations of existing law by number alone, and procedural boilerplate.
- FOCUS ON: operative verbs what the bill SHALL do, PROHIBIT, REQUIRE, AUTHORIZE, AMEND, APPROPRIATE, or ESTABLISH.
- SURFACE ALL THREADS: If the bill touches multiple policy areas, list each thread separately. Do not collapse them.
- BE CONCRETE: Name the affected population, the mechanism, and the direction (expands/restricts/maintains).
- STAY NEUTRAL: No political framing. Describe what the text does, not what its sponsors claim it does.
OUTPUT FORMAT plain structured text, not JSON:
OPERATIVE ACTIONS:
[Numbered list of what the bill actually does, one action per line, max 20 words each]
AFFECTED POPULATIONS:
[Who gains something, who loses something, or whose behavior is regulated]
MECHANISMS:
[How it works: new funding, mandate, prohibition, amendment to existing statute, grant program, study commission, etc.]
POLICY THREADS:
[List each distinct policy domain this bill touches, even minor ones. Use plain language, not domain codes.]
SYMBOLIC/PROCEDURAL ONLY:
[Yes or No is this bill primarily a resolution, designation, or awareness declaration with no operative effect?]
LENGTH TARGET: 150-250 words total. Be ruthless about cutting. Density over completeness."""
SUMMARIZATION_USER_TEMPLATE = """Summarize the following Congressional bill according to your instructions.
BILL TEXT:
{text_content}"""
@@ -5,41 +5,33 @@ from __future__ import annotations
import logging import logging
import tomllib import tomllib
from os import getenv from os import getenv
from pathlib import Path
from typing import Annotated, Any from typing import Annotated, Any
import httpx import httpx
import typer import typer
from sqlalchemy import Select, exists, or_, select from sqlalchemy import Select, or_, select
from sqlalchemy.orm import Session, selectinload from sqlalchemy.orm import Session, selectinload
from tiktoken import get_encoding
from pipelines.config import get_config_dir from pipelines.config import get_config_dir
from pipelines.orm.common import get_postgres_engine from pipelines.orm.common import get_postgres_engine
from pipelines.orm.data_science_dev.congress import ( from pipelines.orm.data_science_dev.congress import Bill, BillText
Bill,
BillText,
SubjectType,
VoteClassification,
VoteRelationship,
VoteTextTarget,
)
from pipelines.tools.bill_token_compression import compress_bill_text from pipelines.tools.bill_token_compression import compress_bill_text
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
OPENAI_CHAT_COMPLETIONS_URL = "https://api.openai.com/v1/chat/completions"
OPENAI_PROJECT_ID = "proj_fQBPEXFgnS87Fk6wZwploFwE" OPENAI_PROJECT_ID = "proj_fQBPEXFgnS87Fk6wZwploFwE"
REQUEST_TIMEOUT_SECONDS = 60
def _find_prompts_path() -> Path:
return get_config_dir() / "prompts" / "summarization_prompts.toml"
def load_summarization_prompts( def load_summarization_prompts(
section: str = "summarization", section: str = "summarization",
) -> dict[str, str]: ) -> dict[str, str]:
summarization_prompts = get_config_dir() / "prompts" / "summarization_prompts.toml" return tomllib.loads(_find_prompts_path().read_text())[section]
return tomllib.loads(summarization_prompts.read_text())[section]
class BillSummaryError(RuntimeError): class BillSummaryError(RuntimeError):
@@ -58,7 +50,7 @@ def call_openai_summary(
raise BillSummaryError(msg) raise BillSummaryError(msg)
response = httpx.post( response = httpx.post(
OPENAI_CHAT_COMPLETIONS_URL, "https://api.openai.com/v1/chat/completions",
headers={ headers={
"Authorization": f"Bearer {api_key}", "Authorization": f"Bearer {api_key}",
"OpenAI-Project": OPENAI_PROJECT_ID, "OpenAI-Project": OPENAI_PROJECT_ID,
@@ -68,9 +60,8 @@ def call_openai_summary(
"model": model, "model": model,
"messages": messages, "messages": messages,
}, },
timeout=REQUEST_TIMEOUT_SECONDS, timeout=60,
) )
logger.info(f"{response.text=}")
response.raise_for_status() response.raise_for_status()
return extract_message_content(response.json()) return extract_message_content(response.json())
@@ -78,52 +69,53 @@ def call_openai_summary(
def build_bill_summary_messages( def build_bill_summary_messages(
*, *,
bill_text: BillText, bill_text: BillText,
summarization_prompts: dict[str, str],
) -> list[dict[str, str]]: ) -> list[dict[str, str]]:
"""Build the GPT prompt messages plus compressed text and user prompt.""" """Build the GPT prompt messages for one bill text row."""
if not bill_text.text_content: if not bill_text.text_content:
msg = f"bill_text id={bill_text.id} has no text_content" msg = f"bill_text id={bill_text.id} has no text_content"
raise BillSummaryError(msg) raise BillSummaryError(msg)
bill = bill_text.bill
compressed_text = compress_bill_text(bill_text.text_content) compressed_text = compress_bill_text(bill_text.text_content)
if not compressed_text: if not compressed_text:
msg = f"bill_text id={bill_text.id} has no summarizable text_content" msg = f"bill_text id={bill_text.id} has no summarizable text_content"
raise BillSummaryError(msg) raise BillSummaryError(msg)
user_prompt = summarization_prompts["user_template"].format( metadata = "\n".join(
text_content=compressed_text (
f"Congress: {bill.congress}",
f"Bill: {bill.bill_type} {bill.number}",
f"Title: {bill.title_short or bill.title or bill.official_title or ''}",
f"Top subject term: {bill.subjects_top_term or ''}",
f"Text version: {bill_text.version_code}"
+ (f" ({bill_text.version_name})" if bill_text.version_name else ""),
)
)
summarization_prompts = load_summarization_prompts()
user_prompt = "\n\n".join(
(
"BILL METADATA:",
metadata,
summarization_prompts["user_template"].format(text_content=compressed_text),
)
) )
user_prompt_tokens = len(get_encoding("o200k_base").encode(user_prompt)) return [
logger.info(f"{user_prompt_tokens=}")
messages = [
{"role": "system", "content": summarization_prompts["system_prompt"]}, {"role": "system", "content": summarization_prompts["system_prompt"]},
{ {
"role": "user", "role": "user",
"content": user_prompt, "content": user_prompt,
}, },
] ]
return messages, user_prompt_tokens
def summarize_bill_text( def summarize_bill_text(
*, *,
model: str, model: str,
bill_text: BillText, bill_text: BillText,
summarization_prompts: dict[str, str],
) -> str: ) -> str:
"""Generate and return a summary for one bill_text row.""" """Generate and return a summary for one bill_text row."""
messages, user_prompt_tokens = build_bill_summary_messages( messages = build_bill_summary_messages(bill_text=bill_text)
bill_text=bill_text,
summarization_prompts=summarization_prompts,
)
# This may only be for gpt-5.4 mini I need to read the docs
if user_prompt_tokens > 272000:
msg = f"Compressed bill_text id={bill_text.id} is too long for summarization ({user_prompt_tokens} tokens)"
logger.warning(msg)
return None
summary = call_openai_summary( summary = call_openai_summary(
model=model, model=model,
messages=messages, messages=messages,
@@ -143,7 +135,7 @@ def store_bill_summary_result(
"""Store a generated summary and the prompt/model metadata that produced it.""" """Store a generated summary and the prompt/model metadata that produced it."""
bill_text.summary = summary bill_text.summary = summary
bill_text.summarization_model = model bill_text.summarization_model = model
bill_text.summarization_system_prompt_version = "v1.2" bill_text.summarization_system_prompt_version = "v1"
bill_text.summarization_user_prompt_version = "v1" bill_text.summarization_user_prompt_version = "v1"
@@ -151,7 +143,6 @@ def create_select_bill_texts_for_summarization(
congress: int | None = None, congress: int | None = None,
bill_ids: list[int] | None = None, bill_ids: list[int] | None = None,
bill_text_ids: list[int] | None = None, bill_text_ids: list[int] | None = None,
with_votes_only: bool = False,
force: bool = False, force: bool = False,
limit: int | None = None, limit: int | None = None,
) -> Select: ) -> Select:
@@ -169,25 +160,6 @@ def create_select_bill_texts_for_summarization(
stmt = stmt.where(BillText.bill_id.in_(bill_ids)) stmt = stmt.where(BillText.bill_id.in_(bill_ids))
if bill_text_ids: if bill_text_ids:
stmt = stmt.where(BillText.id.in_(bill_text_ids)) stmt = stmt.where(BillText.id.in_(bill_text_ids))
if with_votes_only:
stmt = stmt.where(
exists(
select(VoteTextTarget.vote_id)
.join(
VoteClassification,
VoteClassification.vote_id == VoteTextTarget.vote_id,
)
.where(
VoteTextTarget.voted_text_version_id == BillText.id,
VoteClassification.subject_type == SubjectType.MEASURE,
VoteClassification.vote_relationship
== VoteRelationship.DIRECT_TEXT_VOTE,
VoteClassification.is_direct_vote_on_legislative_text.is_(True),
VoteClassification.is_substantive_policy_vote.is_(True),
VoteClassification.is_special_rule.is_(False),
)
)
)
if not force: if not force:
stmt = stmt.where(or_(BillText.summary.is_(None), BillText.summary == "")) stmt = stmt.where(or_(BillText.summary.is_(None), BillText.summary == ""))
if limit is not None: if limit is not None:
@@ -236,13 +208,6 @@ def main(
help="Only process one internal bill_text.id. Repeat for multiple rows.", help="Only process one internal bill_text.id. Repeat for multiple rows.",
), ),
] = None, ] = None,
with_votes_only: Annotated[
bool,
typer.Option(
"--with-votes-only",
help="Only process bill_text rows linked to at least one vote.",
),
] = False,
limit: Annotated[int | None, typer.Option(help="Maximum rows to process.")] = None, limit: Annotated[int | None, typer.Option(help="Maximum rows to process.")] = None,
force: Annotated[ force: Annotated[
bool, bool,
@@ -262,14 +227,12 @@ def main(
message = "CLOSEDAI_TOKEN is required" message = "CLOSEDAI_TOKEN is required"
raise typer.BadParameter(message) raise typer.BadParameter(message)
summarization_prompts = load_summarization_prompts()
engine = get_postgres_engine(name="DATA_SCIENCE_DEV") engine = get_postgres_engine(name="DATA_SCIENCE_DEV")
with Session(engine) as session: with Session(engine) as session:
stmt = create_select_bill_texts_for_summarization( stmt = create_select_bill_texts_for_summarization(
congress=congress, congress=congress,
bill_ids=bill_ids, bill_ids=bill_ids,
bill_text_ids=bill_text_ids, bill_text_ids=bill_text_ids,
with_votes_only=with_votes_only,
force=force, force=force,
limit=limit, limit=limit,
) )
@@ -281,11 +244,7 @@ def main(
summary = summarize_bill_text( summary = summarize_bill_text(
model=model, model=model,
bill_text=bill_text, bill_text=bill_text,
summarization_prompts=summarization_prompts,
) )
if summary is None:
logger.warning("Skipping bill_text id=%s", bill_text.id)
continue
store_bill_summary_result( store_bill_summary_result(
bill_text=bill_text, bill_text=bill_text,
summary=summary, summary=summary,
View File
+22
View File
@@ -0,0 +1,22 @@
[project]
name = "ds-testing-pipelines"
version = "0.1.0"
description = "Data science pipeline tools and legislative dashboard."
requires-python = ">=3.12"
dependencies = [
"fastapi",
"httpx",
"uvicorn[standard]",
"jinja2",
"sqlalchemy",
"psycopg",
]
[project.optional-dependencies]
test = [
"pytest",
]
[tool.pytest.ini_options]
testpaths = ["tests"]
pythonpath = ["."]