Compare commits

..

1 Commits

46 changed files with 336 additions and 7608 deletions

View File

@@ -308,7 +308,6 @@
"usernamehw", "usernamehw",
"userprefs", "userprefs",
"vaninventory", "vaninventory",
"vdev",
"vfat", "vfat",
"victron", "victron",
"virt", "virt",

View File

@@ -1 +1,26 @@
# dotfiles # dotfiles
<!-- LINE-COUNT-START -->
This repo has **20,055** lines of technical debt.
| File Type | Lines | Percentage |
|-----------|------:|-----------:|
| .py | 11,441 | 57.0% |
| .nix | 4,471 | 22.3% |
| .yaml | 1,121 | 5.6% |
| .html | 1,009 | 5.0% |
| .json | 555 | 2.8% |
| .yml | 479 | 2.4% |
| .toml | 290 | 1.4% |
| .css | 212 | 1.1% |
| .gitignore | 199 | 1.0% |
| .md | 75 | 0.4% |
| .cfg | 73 | 0.4% |
| .sh | 48 | 0.2% |
| .mako | 36 | 0.2% |
| .LICENSE | 21 | 0.1% |
| .conf | 17 | 0.1% |
| .Gemfile | 4 | 0.0% |
| .svg | 3 | 0.0% |
| .new | 1 | 0.0% |
<!-- LINE-COUNT-END -->

View File

@@ -25,7 +25,6 @@
fastapi-cli fastapi-cli
httpx httpx
mypy mypy
orjson
polars polars
psycopg psycopg
pydantic pydantic

View File

@@ -1,50 +0,0 @@
"""adding FailedIngestion.
Revision ID: 2f43120e3ffc
Revises: f99be864fe69
Create Date: 2026-03-24 23:46:17.277897
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from python.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "2f43120e3ffc"
down_revision: str | None = "f99be864fe69"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"failed_ingestion",
sa.Column("raw_line", sa.Text(), nullable=False),
sa.Column("error", sa.Text(), nullable=False),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("pk_failed_ingestion")),
schema=schema,
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("failed_ingestion", schema=schema)
# ### end Alembic commands ###

View File

@@ -1,72 +0,0 @@
"""Attach all partition tables to the posts parent table.
Alembic autogenerate creates partition tables as standalone tables but does not
emit the ALTER TABLE ... ATTACH PARTITION statements needed for PostgreSQL to
route inserts to the correct partition.
Revision ID: a1b2c3d4e5f6
Revises: 605b1794838f
Create Date: 2026-03-25 10:00:00.000000
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from alembic import op
from sqlalchemy import text
from python.orm import DataScienceDevBase
from python.orm.data_science_dev.posts.partitions import (
PARTITION_END_YEAR,
PARTITION_START_YEAR,
iso_weeks_in_year,
week_bounds,
)
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "a1b2c3d4e5f6"
down_revision: str | None = "605b1794838f"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
ALREADY_ATTACHED_QUERY = text("""
SELECT inhrelid::regclass::text
FROM pg_inherits
WHERE inhparent = :parent::regclass
""")
def upgrade() -> None:
"""Attach all weekly partition tables to the posts parent table."""
connection = op.get_bind()
already_attached = {row[0] for row in connection.execute(ALREADY_ATTACHED_QUERY, {"parent": f"{schema}.posts"})}
for year in range(PARTITION_START_YEAR, PARTITION_END_YEAR + 1):
for week in range(1, iso_weeks_in_year(year) + 1):
table_name = f"posts_{year}_{week:02d}"
qualified_name = f"{schema}.{table_name}"
if qualified_name in already_attached:
continue
start, end = week_bounds(year, week)
start_str = start.strftime("%Y-%m-%d %H:%M:%S")
end_str = end.strftime("%Y-%m-%d %H:%M:%S")
op.execute(
f"ALTER TABLE {schema}.posts "
f"ATTACH PARTITION {qualified_name} "
f"FOR VALUES FROM ('{start_str}') TO ('{end_str}')"
)
def downgrade() -> None:
"""Detach all weekly partition tables from the posts parent table."""
for year in range(PARTITION_START_YEAR, PARTITION_END_YEAR + 1):
for week in range(1, iso_weeks_in_year(year) + 1):
table_name = f"posts_{year}_{week:02d}"
op.execute(f"ALTER TABLE {schema}.posts DETACH PARTITION {schema}.{table_name}")

View File

@@ -1,153 +0,0 @@
"""adding congress data.
Revision ID: 83bfc8af92d8
Revises: a1b2c3d4e5f6
Create Date: 2026-03-27 10:43:02.324510
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from python.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "83bfc8af92d8"
down_revision: str | None = "a1b2c3d4e5f6"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"bill",
sa.Column("congress", sa.Integer(), nullable=False),
sa.Column("bill_type", sa.String(), nullable=False),
sa.Column("number", sa.Integer(), nullable=False),
sa.Column("title", sa.String(), nullable=True),
sa.Column("title_short", sa.String(), nullable=True),
sa.Column("official_title", sa.String(), nullable=True),
sa.Column("status", sa.String(), nullable=True),
sa.Column("status_at", sa.Date(), nullable=True),
sa.Column("sponsor_bioguide_id", sa.String(), nullable=True),
sa.Column("subjects_top_term", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("pk_bill")),
sa.UniqueConstraint("congress", "bill_type", "number", name="uq_bill_congress_type_number"),
schema=schema,
)
op.create_index("ix_bill_congress", "bill", ["congress"], unique=False, schema=schema)
op.create_table(
"legislator",
sa.Column("bioguide_id", sa.Text(), nullable=False),
sa.Column("thomas_id", sa.String(), nullable=True),
sa.Column("lis_id", sa.String(), nullable=True),
sa.Column("govtrack_id", sa.Integer(), nullable=True),
sa.Column("opensecrets_id", sa.String(), nullable=True),
sa.Column("fec_ids", sa.String(), nullable=True),
sa.Column("first_name", sa.String(), nullable=False),
sa.Column("last_name", sa.String(), nullable=False),
sa.Column("official_full_name", sa.String(), nullable=True),
sa.Column("nickname", sa.String(), nullable=True),
sa.Column("birthday", sa.Date(), nullable=True),
sa.Column("gender", sa.String(), nullable=True),
sa.Column("current_party", sa.String(), nullable=True),
sa.Column("current_state", sa.String(), nullable=True),
sa.Column("current_district", sa.Integer(), nullable=True),
sa.Column("current_chamber", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("pk_legislator")),
schema=schema,
)
op.create_index(op.f("ix_legislator_bioguide_id"), "legislator", ["bioguide_id"], unique=True, schema=schema)
op.create_table(
"bill_text",
sa.Column("bill_id", sa.Integer(), nullable=False),
sa.Column("version_code", sa.String(), nullable=False),
sa.Column("version_name", sa.String(), nullable=True),
sa.Column("text_content", sa.String(), nullable=True),
sa.Column("date", sa.Date(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.ForeignKeyConstraint(
["bill_id"], [f"{schema}.bill.id"], name=op.f("fk_bill_text_bill_id_bill"), ondelete="CASCADE"
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_bill_text")),
sa.UniqueConstraint("bill_id", "version_code", name="uq_bill_text_bill_id_version_code"),
schema=schema,
)
op.create_table(
"vote",
sa.Column("congress", sa.Integer(), nullable=False),
sa.Column("chamber", sa.String(), nullable=False),
sa.Column("session", sa.Integer(), nullable=False),
sa.Column("number", sa.Integer(), nullable=False),
sa.Column("vote_type", sa.String(), nullable=True),
sa.Column("question", sa.String(), nullable=True),
sa.Column("result", sa.String(), nullable=True),
sa.Column("result_text", sa.String(), nullable=True),
sa.Column("vote_date", sa.Date(), nullable=False),
sa.Column("yea_count", sa.Integer(), nullable=True),
sa.Column("nay_count", sa.Integer(), nullable=True),
sa.Column("not_voting_count", sa.Integer(), nullable=True),
sa.Column("present_count", sa.Integer(), nullable=True),
sa.Column("bill_id", sa.Integer(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.ForeignKeyConstraint(["bill_id"], [f"{schema}.bill.id"], name=op.f("fk_vote_bill_id_bill")),
sa.PrimaryKeyConstraint("id", name=op.f("pk_vote")),
sa.UniqueConstraint("congress", "chamber", "session", "number", name="uq_vote_congress_chamber_session_number"),
schema=schema,
)
op.create_index("ix_vote_congress_chamber", "vote", ["congress", "chamber"], unique=False, schema=schema)
op.create_index("ix_vote_date", "vote", ["vote_date"], unique=False, schema=schema)
op.create_table(
"vote_record",
sa.Column("vote_id", sa.Integer(), nullable=False),
sa.Column("legislator_id", sa.Integer(), nullable=False),
sa.Column("position", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["legislator_id"],
[f"{schema}.legislator.id"],
name=op.f("fk_vote_record_legislator_id_legislator"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["vote_id"], [f"{schema}.vote.id"], name=op.f("fk_vote_record_vote_id_vote"), ondelete="CASCADE"
),
sa.PrimaryKeyConstraint("vote_id", "legislator_id", name=op.f("pk_vote_record")),
schema=schema,
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("vote_record", schema=schema)
op.drop_index("ix_vote_date", table_name="vote", schema=schema)
op.drop_index("ix_vote_congress_chamber", table_name="vote", schema=schema)
op.drop_table("vote", schema=schema)
op.drop_table("bill_text", schema=schema)
op.drop_index(op.f("ix_legislator_bioguide_id"), table_name="legislator", schema=schema)
op.drop_table("legislator", schema=schema)
op.drop_index("ix_bill_congress", table_name="bill", schema=schema)
op.drop_table("bill", schema=schema)
# ### end Alembic commands ###

View File

@@ -1,58 +0,0 @@
"""adding LegislatorSocialMedia.
Revision ID: 5cd7eee3549d
Revises: 83bfc8af92d8
Create Date: 2026-03-29 11:53:44.224799
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from python.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "5cd7eee3549d"
down_revision: str | None = "83bfc8af92d8"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"legislator_social_media",
sa.Column("legislator_id", sa.Integer(), nullable=False),
sa.Column("platform", sa.String(), nullable=False),
sa.Column("account_name", sa.String(), nullable=False),
sa.Column("url", sa.String(), nullable=True),
sa.Column("source", sa.String(), nullable=False),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.ForeignKeyConstraint(
["legislator_id"],
[f"{schema}.legislator.id"],
name=op.f("fk_legislator_social_media_legislator_id_legislator"),
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_legislator_social_media")),
schema=schema,
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("legislator_social_media", schema=schema)
# ### end Alembic commands ###

View File

@@ -81,7 +81,6 @@ def include_name(
""" """
if type_ == "schema": if type_ == "schema":
# allows a database with multiple schemas to have separate alembic revisions
return name == target_metadata.schema return name == target_metadata.schema
return True return True

View File

@@ -1,187 +0,0 @@
"""removed ds table from richie DB.
Revision ID: c8a794340928
Revises: 6b275323f435
Create Date: 2026-03-29 15:29:23.643146
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
from python.orm import RichieBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "c8a794340928"
down_revision: str | None = "6b275323f435"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = RichieBase.schema_name
def upgrade() -> None:
"""Upgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("vote_record", schema=schema)
op.drop_index(op.f("ix_vote_congress_chamber"), table_name="vote", schema=schema)
op.drop_index(op.f("ix_vote_date"), table_name="vote", schema=schema)
op.drop_index(op.f("ix_legislator_bioguide_id"), table_name="legislator", schema=schema)
op.drop_table("legislator", schema=schema)
op.drop_table("vote", schema=schema)
op.drop_index(op.f("ix_bill_congress"), table_name="bill", schema=schema)
op.drop_table("bill", schema=schema)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"vote",
sa.Column("congress", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("chamber", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column("session", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("number", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("vote_type", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("question", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("result", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("result_text", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("vote_date", sa.DATE(), autoincrement=False, nullable=False),
sa.Column("yea_count", sa.INTEGER(), autoincrement=False, nullable=True),
sa.Column("nay_count", sa.INTEGER(), autoincrement=False, nullable=True),
sa.Column("not_voting_count", sa.INTEGER(), autoincrement=False, nullable=True),
sa.Column("present_count", sa.INTEGER(), autoincrement=False, nullable=True),
sa.Column("bill_id", sa.INTEGER(), autoincrement=False, nullable=True),
sa.Column("id", sa.INTEGER(), autoincrement=True, nullable=False),
sa.Column(
"created",
postgresql.TIMESTAMP(timezone=True),
server_default=sa.text("now()"),
autoincrement=False,
nullable=False,
),
sa.Column(
"updated",
postgresql.TIMESTAMP(timezone=True),
server_default=sa.text("now()"),
autoincrement=False,
nullable=False,
),
sa.ForeignKeyConstraint(["bill_id"], [f"{schema}.bill.id"], name=op.f("fk_vote_bill_id_bill")),
sa.PrimaryKeyConstraint("id", name=op.f("pk_vote")),
sa.UniqueConstraint(
"congress",
"chamber",
"session",
"number",
name=op.f("uq_vote_congress_chamber_session_number"),
postgresql_include=[],
postgresql_nulls_not_distinct=False,
),
schema=schema,
)
op.create_index(op.f("ix_vote_date"), "vote", ["vote_date"], unique=False, schema=schema)
op.create_index(op.f("ix_vote_congress_chamber"), "vote", ["congress", "chamber"], unique=False, schema=schema)
op.create_table(
"vote_record",
sa.Column("vote_id", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("legislator_id", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("position", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.ForeignKeyConstraint(
["legislator_id"],
[f"{schema}.legislator.id"],
name=op.f("fk_vote_record_legislator_id_legislator"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["vote_id"], [f"{schema}.vote.id"], name=op.f("fk_vote_record_vote_id_vote"), ondelete="CASCADE"
),
sa.PrimaryKeyConstraint("vote_id", "legislator_id", name=op.f("pk_vote_record")),
schema=schema,
)
op.create_table(
"legislator",
sa.Column("bioguide_id", sa.TEXT(), autoincrement=False, nullable=False),
sa.Column("thomas_id", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("lis_id", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("govtrack_id", sa.INTEGER(), autoincrement=False, nullable=True),
sa.Column("opensecrets_id", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("fec_ids", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("first_name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column("last_name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column("official_full_name", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("nickname", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("birthday", sa.DATE(), autoincrement=False, nullable=True),
sa.Column("gender", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("current_party", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("current_state", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("current_district", sa.INTEGER(), autoincrement=False, nullable=True),
sa.Column("current_chamber", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("id", sa.INTEGER(), autoincrement=True, nullable=False),
sa.Column(
"created",
postgresql.TIMESTAMP(timezone=True),
server_default=sa.text("now()"),
autoincrement=False,
nullable=False,
),
sa.Column(
"updated",
postgresql.TIMESTAMP(timezone=True),
server_default=sa.text("now()"),
autoincrement=False,
nullable=False,
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_legislator")),
schema=schema,
)
op.create_index(op.f("ix_legislator_bioguide_id"), "legislator", ["bioguide_id"], unique=True, schema=schema)
op.create_table(
"bill",
sa.Column("congress", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("bill_type", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column("number", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("title", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("title_short", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("official_title", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("status", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("status_at", sa.DATE(), autoincrement=False, nullable=True),
sa.Column("sponsor_bioguide_id", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("subjects_top_term", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("id", sa.INTEGER(), autoincrement=True, nullable=False),
sa.Column(
"created",
postgresql.TIMESTAMP(timezone=True),
server_default=sa.text("now()"),
autoincrement=False,
nullable=False,
),
sa.Column(
"updated",
postgresql.TIMESTAMP(timezone=True),
server_default=sa.text("now()"),
autoincrement=False,
nullable=False,
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_bill")),
sa.UniqueConstraint(
"congress",
"bill_type",
"number",
name=op.f("uq_bill_congress_type_number"),
postgresql_include=[],
postgresql_nulls_not_distinct=False,
),
schema=schema,
)
op.create_index(op.f("ix_bill_congress"), "bill", ["congress"], unique=False, schema=schema)
# ### end Alembic commands ###

View File

@@ -1,3 +0,0 @@
"""Data science CLI tools."""
from __future__ import annotations

View File

@@ -1,613 +0,0 @@
"""Ingestion pipeline for loading congress data from unitedstates/congress JSON files.
Loads legislators, bills, votes, vote records, and bill text into the data_science_dev database.
Expects the parent directory to contain congress-tracker/ and congress-legislators/ as siblings.
Usage:
ingest-congress /path/to/parent/
ingest-congress /path/to/parent/ --congress 118
ingest-congress /path/to/parent/ --congress 118 --only bills
"""
from __future__ import annotations
import logging
from pathlib import Path # noqa: TC003 needed at runtime for typer CLI argument
from typing import TYPE_CHECKING, Annotated
import orjson
import typer
import yaml
from sqlalchemy import select
from sqlalchemy.orm import Session
from python.common import configure_logger
from python.orm.common import get_postgres_engine
from python.orm.data_science_dev.congress import Bill, BillText, Legislator, LegislatorSocialMedia, Vote, VoteRecord
if TYPE_CHECKING:
from collections.abc import Iterator
from sqlalchemy.engine import Engine
logger = logging.getLogger(__name__)
BATCH_SIZE = 10_000
app = typer.Typer(help="Ingest unitedstates/congress data into data_science_dev.")
@app.command()
def main(
parent_dir: Annotated[
Path,
typer.Argument(help="Parent directory containing congress-tracker/ and congress-legislators/"),
],
congress: Annotated[int | None, typer.Option(help="Only ingest a specific congress number")] = None,
only: Annotated[
str | None,
typer.Option(help="Only run a specific step: legislators, social-media, bills, votes, bill-text"),
] = None,
) -> None:
"""Ingest congress data from unitedstates/congress JSON files."""
configure_logger(level="INFO")
data_dir = parent_dir / "congress-tracker/congress/data/"
legislators_dir = parent_dir / "congress-legislators"
if not data_dir.is_dir():
typer.echo(f"Expected congress-tracker/ directory: {data_dir}", err=True)
raise typer.Exit(code=1)
if not legislators_dir.is_dir():
typer.echo(f"Expected congress-legislators/ directory: {legislators_dir}", err=True)
raise typer.Exit(code=1)
engine = get_postgres_engine(name="DATA_SCIENCE_DEV")
congress_dirs = _resolve_congress_dirs(data_dir, congress)
if not congress_dirs:
typer.echo("No congress directories found.", err=True)
raise typer.Exit(code=1)
logger.info("Found %d congress directories to process", len(congress_dirs))
steps: dict[str, tuple] = {
"legislators": (ingest_legislators, (engine, legislators_dir)),
"legislators-social-media": (ingest_social_media, (engine, legislators_dir)),
"bills": (ingest_bills, (engine, congress_dirs)),
"votes": (ingest_votes, (engine, congress_dirs)),
"bill-text": (ingest_bill_text, (engine, congress_dirs)),
}
if only:
if only not in steps:
typer.echo(f"Unknown step: {only}. Choose from: {', '.join(steps)}", err=True)
raise typer.Exit(code=1)
steps = {only: steps[only]}
for step_name, (step_func, step_args) in steps.items():
logger.info("=== Starting step: %s ===", step_name)
step_func(*step_args)
logger.info("=== Finished step: %s ===", step_name)
logger.info("ingest-congress done")
def _resolve_congress_dirs(data_dir: Path, congress: int | None) -> list[Path]:
"""Find congress number directories under data_dir."""
if congress is not None:
target = data_dir / str(congress)
return [target] if target.is_dir() else []
return sorted(path for path in data_dir.iterdir() if path.is_dir() and path.name.isdigit())
def _flush_batch(session: Session, batch: list[object], label: str) -> int:
"""Add a batch of ORM objects to the session and commit. Returns count added."""
if not batch:
return 0
session.add_all(batch)
session.commit()
count = len(batch)
logger.info("Committed %d %s", count, label)
batch.clear()
return count
# ---------------------------------------------------------------------------
# Legislators — loaded from congress-legislators YAML files
# ---------------------------------------------------------------------------
def ingest_legislators(engine: Engine, legislators_dir: Path) -> None:
"""Load legislators from congress-legislators YAML files."""
legislators_data = _load_legislators_yaml(legislators_dir)
logger.info("Loaded %d legislators from YAML files", len(legislators_data))
with Session(engine) as session:
existing_legislators = {
legislator.bioguide_id: legislator for legislator in session.scalars(select(Legislator)).all()
}
logger.info("Found %d existing legislators in DB", len(existing_legislators))
total_inserted = 0
total_updated = 0
for entry in legislators_data:
bioguide_id = entry.get("id", {}).get("bioguide")
if not bioguide_id:
continue
fields = _parse_legislator(entry)
if existing := existing_legislators.get(bioguide_id):
changed = False
for field, value in fields.items():
if value is not None and getattr(existing, field) != value:
setattr(existing, field, value)
changed = True
if changed:
total_updated += 1
else:
session.add(Legislator(bioguide_id=bioguide_id, **fields))
total_inserted += 1
session.commit()
logger.info("Inserted %d new legislators, updated %d existing", total_inserted, total_updated)
def _load_legislators_yaml(legislators_dir: Path) -> list[dict]:
"""Load and combine legislators-current.yaml and legislators-historical.yaml."""
legislators: list[dict] = []
for filename in ("legislators-current.yaml", "legislators-historical.yaml"):
path = legislators_dir / filename
if not path.exists():
logger.warning("Legislators file not found: %s", path)
continue
with path.open() as file:
data = yaml.safe_load(file)
if isinstance(data, list):
legislators.extend(data)
return legislators
def _parse_legislator(entry: dict) -> dict:
"""Extract Legislator fields from a congress-legislators YAML entry."""
ids = entry.get("id", {})
name = entry.get("name", {})
bio = entry.get("bio", {})
terms = entry.get("terms", [])
latest_term = terms[-1] if terms else {}
fec_ids = ids.get("fec")
fec_ids_joined = ",".join(fec_ids) if isinstance(fec_ids, list) else fec_ids
chamber = latest_term.get("type")
chamber_normalized = {"rep": "House", "sen": "Senate"}.get(chamber, chamber)
return {
"thomas_id": ids.get("thomas"),
"lis_id": ids.get("lis"),
"govtrack_id": ids.get("govtrack"),
"opensecrets_id": ids.get("opensecrets"),
"fec_ids": fec_ids_joined,
"first_name": name.get("first"),
"last_name": name.get("last"),
"official_full_name": name.get("official_full"),
"nickname": name.get("nickname"),
"birthday": bio.get("birthday"),
"gender": bio.get("gender"),
"current_party": latest_term.get("party"),
"current_state": latest_term.get("state"),
"current_district": latest_term.get("district"),
"current_chamber": chamber_normalized,
}
# ---------------------------------------------------------------------------
# Social Media — loaded from legislators-social-media.yaml
# ---------------------------------------------------------------------------
SOCIAL_MEDIA_PLATFORMS = {
"twitter": "https://twitter.com/{account}",
"facebook": "https://facebook.com/{account}",
"youtube": "https://youtube.com/{account}",
"instagram": "https://instagram.com/{account}",
"mastodon": None,
}
def ingest_social_media(engine: Engine, legislators_dir: Path) -> None:
"""Load social media accounts from legislators-social-media.yaml."""
social_media_path = legislators_dir / "legislators-social-media.yaml"
if not social_media_path.exists():
logger.warning("Social media file not found: %s", social_media_path)
return
with social_media_path.open() as file:
social_media_data = yaml.safe_load(file)
if not isinstance(social_media_data, list):
logger.warning("Unexpected format in %s", social_media_path)
return
logger.info("Loaded %d entries from legislators-social-media.yaml", len(social_media_data))
with Session(engine) as session:
legislator_map = _build_legislator_map(session)
existing_accounts = {
(account.legislator_id, account.platform)
for account in session.scalars(select(LegislatorSocialMedia)).all()
}
logger.info("Found %d existing social media accounts in DB", len(existing_accounts))
total_inserted = 0
total_updated = 0
for entry in social_media_data:
bioguide_id = entry.get("id", {}).get("bioguide")
if not bioguide_id:
continue
legislator_id = legislator_map.get(bioguide_id)
if legislator_id is None:
continue
social = entry.get("social", {})
for platform, url_template in SOCIAL_MEDIA_PLATFORMS.items():
account_name = social.get(platform)
if not account_name:
continue
url = url_template.format(account=account_name) if url_template else None
if (legislator_id, platform) in existing_accounts:
total_updated += 1
else:
session.add(
LegislatorSocialMedia(
legislator_id=legislator_id,
platform=platform,
account_name=str(account_name),
url=url,
source="https://github.com/unitedstates/congress-legislators",
)
)
existing_accounts.add((legislator_id, platform))
total_inserted += 1
session.commit()
logger.info("Inserted %d new social media accounts, updated %d existing", total_inserted, total_updated)
def _iter_voters(position_group: object) -> Iterator[dict]:
"""Yield voter dicts from a vote position group (handles list, single dict, or string)."""
if isinstance(position_group, dict):
yield position_group
elif isinstance(position_group, list):
for voter in position_group:
if isinstance(voter, dict):
yield voter
# ---------------------------------------------------------------------------
# Bills
# ---------------------------------------------------------------------------
def ingest_bills(engine: Engine, congress_dirs: list[Path]) -> None:
"""Load bill data.json files."""
with Session(engine) as session:
existing_bills = {(bill.congress, bill.bill_type, bill.number) for bill in session.scalars(select(Bill)).all()}
logger.info("Found %d existing bills in DB", len(existing_bills))
total_inserted = 0
batch: list[Bill] = []
for congress_dir in congress_dirs:
bills_dir = congress_dir / "bills"
if not bills_dir.is_dir():
continue
logger.info("Scanning bills from %s", congress_dir.name)
for bill_file in bills_dir.rglob("data.json"):
data = _read_json(bill_file)
if data is None:
continue
bill = _parse_bill(data, existing_bills)
if bill is not None:
batch.append(bill)
if len(batch) >= BATCH_SIZE:
total_inserted += _flush_batch(session, batch, "bills")
total_inserted += _flush_batch(session, batch, "bills")
logger.info("Inserted %d new bills total", total_inserted)
def _parse_bill(data: dict, existing_bills: set[tuple[int, str, int]]) -> Bill | None:
"""Parse a bill data.json dict into a Bill ORM object, skipping existing."""
raw_congress = data.get("congress")
bill_type = data.get("bill_type")
raw_number = data.get("number")
if raw_congress is None or bill_type is None or raw_number is None:
return None
congress = int(raw_congress)
number = int(raw_number)
if (congress, bill_type, number) in existing_bills:
return None
sponsor_bioguide = None
sponsor = data.get("sponsor")
if sponsor:
sponsor_bioguide = sponsor.get("bioguide_id")
return Bill(
congress=congress,
bill_type=bill_type,
number=number,
title=data.get("short_title") or data.get("official_title"),
title_short=data.get("short_title"),
official_title=data.get("official_title"),
status=data.get("status"),
status_at=data.get("status_at"),
sponsor_bioguide_id=sponsor_bioguide,
subjects_top_term=data.get("subjects_top_term"),
)
# ---------------------------------------------------------------------------
# Votes (and vote records)
# ---------------------------------------------------------------------------
def ingest_votes(engine: Engine, congress_dirs: list[Path]) -> None:
"""Load vote data.json files with their vote records."""
with Session(engine) as session:
legislator_map = _build_legislator_map(session)
logger.info("Loaded %d legislators into lookup map", len(legislator_map))
bill_map = _build_bill_map(session)
logger.info("Loaded %d bills into lookup map", len(bill_map))
existing_votes = {
(vote.congress, vote.chamber, vote.session, vote.number) for vote in session.scalars(select(Vote)).all()
}
logger.info("Found %d existing votes in DB", len(existing_votes))
total_inserted = 0
batch: list[Vote] = []
for congress_dir in congress_dirs:
votes_dir = congress_dir / "votes"
if not votes_dir.is_dir():
continue
logger.info("Scanning votes from %s", congress_dir.name)
for vote_file in votes_dir.rglob("data.json"):
data = _read_json(vote_file)
if data is None:
continue
vote = _parse_vote(data, legislator_map, bill_map, existing_votes)
if vote is not None:
batch.append(vote)
if len(batch) >= BATCH_SIZE:
total_inserted += _flush_batch(session, batch, "votes")
total_inserted += _flush_batch(session, batch, "votes")
logger.info("Inserted %d new votes total", total_inserted)
def _build_legislator_map(session: Session) -> dict[str, int]:
"""Build a mapping of bioguide_id -> legislator.id."""
return {legislator.bioguide_id: legislator.id for legislator in session.scalars(select(Legislator)).all()}
def _build_bill_map(session: Session) -> dict[tuple[int, str, int], int]:
"""Build a mapping of (congress, bill_type, number) -> bill.id."""
return {(bill.congress, bill.bill_type, bill.number): bill.id for bill in session.scalars(select(Bill)).all()}
def _parse_vote(
data: dict,
legislator_map: dict[str, int],
bill_map: dict[tuple[int, str, int], int],
existing_votes: set[tuple[int, str, int, int]],
) -> Vote | None:
"""Parse a vote data.json dict into a Vote ORM object with records."""
raw_congress = data.get("congress")
chamber = data.get("chamber")
raw_number = data.get("number")
vote_date = data.get("date")
if raw_congress is None or chamber is None or raw_number is None or vote_date is None:
return None
raw_session = data.get("session")
if raw_session is None:
return None
congress = int(raw_congress)
number = int(raw_number)
session_number = int(raw_session)
# Normalize chamber from "h"/"s" to "House"/"Senate"
chamber_normalized = {"h": "House", "s": "Senate"}.get(chamber, chamber)
if (congress, chamber_normalized, session_number, number) in existing_votes:
return None
# Resolve linked bill
bill_id = None
bill_ref = data.get("bill")
if bill_ref:
bill_key = (
int(bill_ref.get("congress", congress)),
bill_ref.get("type"),
int(bill_ref.get("number", 0)),
)
bill_id = bill_map.get(bill_key)
raw_votes = data.get("votes", {})
vote_counts = _count_votes(raw_votes)
vote_records = _build_vote_records(raw_votes, legislator_map)
return Vote(
congress=congress,
chamber=chamber_normalized,
session=session_number,
number=number,
vote_type=data.get("type"),
question=data.get("question"),
result=data.get("result"),
result_text=data.get("result_text"),
vote_date=vote_date[:10] if isinstance(vote_date, str) else vote_date,
bill_id=bill_id,
vote_records=vote_records,
**vote_counts,
)
def _count_votes(raw_votes: dict) -> dict[str, int]:
"""Count voters per position category, correctly handling dict and list formats."""
yea_count = 0
nay_count = 0
not_voting_count = 0
present_count = 0
for position, position_group in raw_votes.items():
voter_count = sum(1 for _ in _iter_voters(position_group))
if position in ("Yea", "Aye"):
yea_count += voter_count
elif position in ("Nay", "No"):
nay_count += voter_count
elif position == "Not Voting":
not_voting_count += voter_count
elif position == "Present":
present_count += voter_count
return {
"yea_count": yea_count,
"nay_count": nay_count,
"not_voting_count": not_voting_count,
"present_count": present_count,
}
def _build_vote_records(raw_votes: dict, legislator_map: dict[str, int]) -> list[VoteRecord]:
"""Build VoteRecord objects from raw vote data."""
records: list[VoteRecord] = []
for position, position_group in raw_votes.items():
for voter in _iter_voters(position_group):
bioguide_id = voter.get("id")
if not bioguide_id:
continue
legislator_id = legislator_map.get(bioguide_id)
if legislator_id is None:
continue
records.append(
VoteRecord(
legislator_id=legislator_id,
position=position,
)
)
return records
# ---------------------------------------------------------------------------
# Bill Text
# ---------------------------------------------------------------------------
def ingest_bill_text(engine: Engine, congress_dirs: list[Path]) -> None:
"""Load bill text from text-versions directories."""
with Session(engine) as session:
bill_map = _build_bill_map(session)
logger.info("Loaded %d bills into lookup map", len(bill_map))
existing_bill_texts = {
(bill_text.bill_id, bill_text.version_code) for bill_text in session.scalars(select(BillText)).all()
}
logger.info("Found %d existing bill text versions in DB", len(existing_bill_texts))
total_inserted = 0
batch: list[BillText] = []
for congress_dir in congress_dirs:
logger.info("Scanning bill texts from %s", congress_dir.name)
for bill_text in _iter_bill_texts(congress_dir, bill_map, existing_bill_texts):
batch.append(bill_text)
if len(batch) >= BATCH_SIZE:
total_inserted += _flush_batch(session, batch, "bill texts")
total_inserted += _flush_batch(session, batch, "bill texts")
logger.info("Inserted %d new bill text versions total", total_inserted)
def _iter_bill_texts(
congress_dir: Path,
bill_map: dict[tuple[int, str, int], int],
existing_bill_texts: set[tuple[int, str]],
) -> Iterator[BillText]:
"""Yield BillText objects for a single congress directory, skipping existing."""
bills_dir = congress_dir / "bills"
if not bills_dir.is_dir():
return
for bill_dir in bills_dir.rglob("text-versions"):
if not bill_dir.is_dir():
continue
bill_key = _bill_key_from_dir(bill_dir.parent, congress_dir)
if bill_key is None:
continue
bill_id = bill_map.get(bill_key)
if bill_id is None:
continue
for version_dir in sorted(bill_dir.iterdir()):
if not version_dir.is_dir():
continue
if (bill_id, version_dir.name) in existing_bill_texts:
continue
text_content = _read_bill_text(version_dir)
version_data = _read_json(version_dir / "data.json")
yield BillText(
bill_id=bill_id,
version_code=version_dir.name,
version_name=version_data.get("version_name") if version_data else None,
date=version_data.get("issued_on") if version_data else None,
text_content=text_content,
)
def _bill_key_from_dir(bill_dir: Path, congress_dir: Path) -> tuple[int, str, int] | None:
"""Extract (congress, bill_type, number) from directory structure."""
congress = int(congress_dir.name)
bill_type = bill_dir.parent.name
name = bill_dir.name
# Directory name is like "hr3590" — strip the type prefix to get the number
number_str = name[len(bill_type) :]
if not number_str.isdigit():
return None
return (congress, bill_type, int(number_str))
def _read_bill_text(version_dir: Path) -> str | None:
"""Read bill text from a version directory, preferring .txt over .xml."""
for extension in ("txt", "htm", "html", "xml"):
candidates = list(version_dir.glob(f"document.{extension}"))
if not candidates:
candidates = list(version_dir.glob(f"*.{extension}"))
if candidates:
try:
return candidates[0].read_text(encoding="utf-8")
except Exception:
logger.exception("Failed to read %s", candidates[0])
return None
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _read_json(path: Path) -> dict | None:
"""Read and parse a JSON file, returning None on failure."""
try:
return orjson.loads(path.read_bytes())
except FileNotFoundError:
return None
except Exception:
logger.exception("Failed to parse %s", path)
return None
if __name__ == "__main__":
app()

View File

@@ -1,247 +0,0 @@
"""Ingestion pipeline for loading JSONL post files into the weekly-partitioned posts table.
Usage:
ingest-posts /path/to/files/
ingest-posts /path/to/single_file.jsonl
ingest-posts /data/dir/ --workers 4 --batch-size 5000
"""
from __future__ import annotations
import logging
from datetime import UTC, datetime
from pathlib import Path # noqa: TC003 this is needed for typer
from typing import TYPE_CHECKING, Annotated
import orjson
import psycopg
import typer
from python.common import configure_logger
from python.orm.common import get_connection_info
from python.parallelize import parallelize_process
if TYPE_CHECKING:
from collections.abc import Iterator
logger = logging.getLogger(__name__)
app = typer.Typer(help="Ingest JSONL post files into the partitioned posts table.")
@app.command()
def main(
path: Annotated[Path, typer.Argument(help="Directory containing JSONL files, or a single JSONL file")],
batch_size: Annotated[int, typer.Option(help="Rows per INSERT batch")] = 10000,
workers: Annotated[int, typer.Option(help="Parallel workers for multi-file ingestion")] = 4,
pattern: Annotated[str, typer.Option(help="Glob pattern for JSONL files")] = "*.jsonl",
) -> None:
"""Ingest JSONL post files into the weekly-partitioned posts table."""
configure_logger(level="INFO")
logger.info("starting ingest-posts")
logger.info("path=%s batch_size=%d workers=%d pattern=%s", path, batch_size, workers, pattern)
if path.is_file():
ingest_file(path, batch_size=batch_size)
elif path.is_dir():
ingest_directory(path, batch_size=batch_size, max_workers=workers, pattern=pattern)
else:
typer.echo(f"Path does not exist: {path}", err=True)
raise typer.Exit(code=1)
logger.info("ingest-posts done")
def ingest_directory(
directory: Path,
*,
batch_size: int,
max_workers: int,
pattern: str = "*.jsonl",
) -> None:
"""Ingest all JSONL files in a directory using parallel workers."""
files = sorted(directory.glob(pattern))
if not files:
logger.warning("No JSONL files found in %s", directory)
return
logger.info("Found %d JSONL files to ingest", len(files))
kwargs_list = [{"path": fp, "batch_size": batch_size} for fp in files]
parallelize_process(ingest_file, kwargs_list, max_workers=max_workers)
SCHEMA = "main"
COLUMNS = (
"post_id",
"user_id",
"instance",
"date",
"text",
"langs",
"like_count",
"reply_count",
"repost_count",
"reply_to",
"replied_author",
"thread_root",
"thread_root_author",
"repost_from",
"reposted_author",
"quotes",
"quoted_author",
"labels",
"sent_label",
"sent_score",
)
INSERT_FROM_STAGING = f"""
INSERT INTO {SCHEMA}.posts ({", ".join(COLUMNS)})
SELECT {", ".join(COLUMNS)} FROM pg_temp.staging
ON CONFLICT (post_id, date) DO NOTHING
""" # noqa: S608
FAILED_INSERT = f"""
INSERT INTO {SCHEMA}.failed_ingestion (raw_line, error)
VALUES (%(raw_line)s, %(error)s)
""" # noqa: S608
def get_psycopg_connection() -> psycopg.Connection:
"""Create a raw psycopg3 connection from environment variables."""
database, host, port, username, password = get_connection_info("DATA_SCIENCE_DEV")
return psycopg.connect(
dbname=database,
host=host,
port=int(port),
user=username,
password=password,
autocommit=False,
)
def ingest_file(path: Path, *, batch_size: int) -> None:
"""Ingest a single JSONL file into the posts table."""
log_trigger = max(100_000 // batch_size, 1)
failed_lines: list[dict] = []
try:
with get_psycopg_connection() as connection:
for index, batch in enumerate(read_jsonl_batches(path, batch_size, failed_lines), 1):
ingest_batch(connection, batch)
if index % log_trigger == 0:
logger.info("Ingested %d batches (%d rows) from %s", index, index * batch_size, path)
if failed_lines:
logger.warning("Recording %d malformed lines from %s", len(failed_lines), path.name)
with connection.cursor() as cursor:
cursor.executemany(FAILED_INSERT, failed_lines)
connection.commit()
except Exception:
logger.exception("Failed to ingest file: %s", path)
raise
def ingest_batch(connection: psycopg.Connection, batch: list[dict]) -> None:
"""COPY batch into a temp staging table, then INSERT ... ON CONFLICT into posts."""
if not batch:
return
try:
with connection.cursor() as cursor:
cursor.execute(f"""
CREATE TEMP TABLE IF NOT EXISTS staging
(LIKE {SCHEMA}.posts INCLUDING DEFAULTS)
ON COMMIT DELETE ROWS
""")
cursor.execute("TRUNCATE pg_temp.staging")
with cursor.copy(f"COPY pg_temp.staging ({', '.join(COLUMNS)}) FROM STDIN") as copy:
for row in batch:
copy.write_row(tuple(row.get(column) for column in COLUMNS))
cursor.execute(INSERT_FROM_STAGING)
connection.commit()
except Exception as error:
connection.rollback()
if len(batch) == 1:
logger.exception("Skipping bad row post_id=%s", batch[0].get("post_id"))
with connection.cursor() as cursor:
cursor.execute(
FAILED_INSERT,
{
"raw_line": orjson.dumps(batch[0], default=str).decode(),
"error": str(error),
},
)
connection.commit()
return
midpoint = len(batch) // 2
ingest_batch(connection, batch[:midpoint])
ingest_batch(connection, batch[midpoint:])
def read_jsonl_batches(file_path: Path, batch_size: int, failed_lines: list[dict]) -> Iterator[list[dict]]:
"""Stream a JSONL file and yield batches of transformed rows."""
batch: list[dict] = []
with file_path.open("r", encoding="utf-8") as handle:
for raw_line in handle:
line = raw_line.strip()
if not line:
continue
batch.extend(parse_line(line, file_path, failed_lines))
if len(batch) >= batch_size:
yield batch
batch = []
if batch:
yield batch
def parse_line(line: str, file_path: Path, failed_lines: list[dict]) -> Iterator[dict]:
"""Parse a JSONL line, handling concatenated JSON objects."""
try:
yield transform_row(orjson.loads(line))
except orjson.JSONDecodeError:
if "}{" not in line:
logger.warning("Skipping malformed line in %s: %s", file_path.name, line[:120])
failed_lines.append({"raw_line": line, "error": "malformed JSON"})
return
fragments = line.replace("}{", "}\n{").split("\n")
for fragment in fragments:
try:
yield transform_row(orjson.loads(fragment))
except (orjson.JSONDecodeError, KeyError, ValueError) as error:
logger.warning("Skipping malformed fragment in %s: %s", file_path.name, fragment[:120])
failed_lines.append({"raw_line": fragment, "error": str(error)})
except Exception as error:
logger.exception("Skipping bad row in %s: %s", file_path.name, line[:120])
failed_lines.append({"raw_line": line, "error": str(error)})
def transform_row(raw: dict) -> dict:
"""Transform a raw JSONL row into a dict matching the Posts table columns."""
raw["date"] = parse_date(raw["date"])
if raw.get("langs") is not None:
raw["langs"] = orjson.dumps(raw["langs"])
if raw.get("text") is not None:
raw["text"] = raw["text"].replace("\x00", "")
return raw
def parse_date(raw_date: int) -> datetime:
"""Parse compact YYYYMMDDHHmm integer into a naive datetime (input is UTC by spec)."""
return datetime(
raw_date // 100000000,
(raw_date // 1000000) % 100,
(raw_date // 10000) % 100,
(raw_date // 100) % 100,
raw_date % 100,
tzinfo=UTC,
)
if __name__ == "__main__":
app()

View File

@@ -90,13 +90,6 @@ DATABASES: dict[str, DatabaseConfig] = {
base_class_name="SignalBotBase", base_class_name="SignalBotBase",
models_module="python.orm.signal_bot.models", models_module="python.orm.signal_bot.models",
), ),
"data_science_dev": DatabaseConfig(
env_prefix="DATA_SCIENCE_DEV",
version_location="python/alembic/data_science_dev/versions",
base_module="python.orm.data_science_dev.base",
base_class_name="DataScienceDevBase",
models_module="python.orm.data_science_dev.models",
),
} }

View File

@@ -1,12 +1,10 @@
"""ORM package exports.""" """ORM package exports."""
from python.orm.data_science_dev.base import DataScienceDevBase
from python.orm.richie.base import RichieBase from python.orm.richie.base import RichieBase
from python.orm.signal_bot.base import SignalBotBase from python.orm.signal_bot.base import SignalBotBase
from python.orm.van_inventory.base import VanInventoryBase from python.orm.van_inventory.base import VanInventoryBase
__all__ = [ __all__ = [
"DataScienceDevBase",
"RichieBase", "RichieBase",
"SignalBotBase", "SignalBotBase",
"VanInventoryBase", "VanInventoryBase",

View File

@@ -1,11 +0,0 @@
"""Data science dev database ORM exports."""
from __future__ import annotations
from python.orm.data_science_dev.base import DataScienceDevBase, DataScienceDevTableBase, DataScienceDevTableBaseBig
__all__ = [
"DataScienceDevBase",
"DataScienceDevTableBase",
"DataScienceDevTableBaseBig",
]

View File

@@ -1,52 +0,0 @@
"""Data science dev database ORM base."""
from __future__ import annotations
from datetime import datetime
from sqlalchemy import BigInteger, DateTime, MetaData, func
from sqlalchemy.ext.declarative import AbstractConcreteBase
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from python.orm.common import NAMING_CONVENTION
class DataScienceDevBase(DeclarativeBase):
"""Base class for data_science_dev database ORM models."""
schema_name = "main"
metadata = MetaData(
schema=schema_name,
naming_convention=NAMING_CONVENTION,
)
class _TableMixin:
"""Shared timestamp columns for all table bases."""
created: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
)
updated: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
)
class DataScienceDevTableBase(_TableMixin, AbstractConcreteBase, DataScienceDevBase):
"""Table with Integer primary key."""
__abstract__ = True
id: Mapped[int] = mapped_column(primary_key=True)
class DataScienceDevTableBaseBig(_TableMixin, AbstractConcreteBase, DataScienceDevBase):
"""Table with BigInteger primary key."""
__abstract__ = True
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)

View File

@@ -1,14 +0,0 @@
"""init."""
from python.orm.data_science_dev.congress.bill import Bill, BillText
from python.orm.data_science_dev.congress.legislator import Legislator, LegislatorSocialMedia
from python.orm.data_science_dev.congress.vote import Vote, VoteRecord
__all__ = [
"Bill",
"BillText",
"Legislator",
"LegislatorSocialMedia",
"Vote",
"VoteRecord",
]

View File

@@ -1,66 +0,0 @@
"""Bill model - legislation introduced in Congress."""
from __future__ import annotations
from datetime import date
from typing import TYPE_CHECKING
from sqlalchemy import ForeignKey, Index, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from python.orm.data_science_dev.base import DataScienceDevTableBase
if TYPE_CHECKING:
from python.orm.data_science_dev.congress.vote import Vote
class Bill(DataScienceDevTableBase):
"""Legislation with congress number, type, titles, status, and sponsor."""
__tablename__ = "bill"
congress: Mapped[int]
bill_type: Mapped[str]
number: Mapped[int]
title: Mapped[str | None]
title_short: Mapped[str | None]
official_title: Mapped[str | None]
status: Mapped[str | None]
status_at: Mapped[date | None]
sponsor_bioguide_id: Mapped[str | None]
subjects_top_term: Mapped[str | None]
votes: Mapped[list[Vote]] = relationship(
"Vote",
back_populates="bill",
)
bill_texts: Mapped[list[BillText]] = relationship(
"BillText",
back_populates="bill",
cascade="all, delete-orphan",
)
__table_args__ = (
UniqueConstraint("congress", "bill_type", "number", name="uq_bill_congress_type_number"),
Index("ix_bill_congress", "congress"),
)
class BillText(DataScienceDevTableBase):
"""Stores different text versions of a bill (introduced, enrolled, etc.)."""
__tablename__ = "bill_text"
bill_id: Mapped[int] = mapped_column(ForeignKey("main.bill.id", ondelete="CASCADE"))
version_code: Mapped[str]
version_name: Mapped[str | None]
text_content: Mapped[str | None]
date: Mapped[date | None]
bill: Mapped[Bill] = relationship("Bill", back_populates="bill_texts")
__table_args__ = (UniqueConstraint("bill_id", "version_code", name="uq_bill_text_bill_id_version_code"),)

View File

@@ -1,66 +0,0 @@
"""Legislator model - members of Congress."""
from __future__ import annotations
from datetime import date
from typing import TYPE_CHECKING
from sqlalchemy import ForeignKey, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from python.orm.data_science_dev.base import DataScienceDevTableBase
if TYPE_CHECKING:
from python.orm.data_science_dev.congress.vote import VoteRecord
class Legislator(DataScienceDevTableBase):
"""Members of Congress with identification and current term info."""
__tablename__ = "legislator"
bioguide_id: Mapped[str] = mapped_column(Text, unique=True, index=True)
thomas_id: Mapped[str | None]
lis_id: Mapped[str | None]
govtrack_id: Mapped[int | None]
opensecrets_id: Mapped[str | None]
fec_ids: Mapped[str | None]
first_name: Mapped[str]
last_name: Mapped[str]
official_full_name: Mapped[str | None]
nickname: Mapped[str | None]
birthday: Mapped[date | None]
gender: Mapped[str | None]
current_party: Mapped[str | None]
current_state: Mapped[str | None]
current_district: Mapped[int | None]
current_chamber: Mapped[str | None]
social_media_accounts: Mapped[list[LegislatorSocialMedia]] = relationship(
"LegislatorSocialMedia",
back_populates="legislator",
cascade="all, delete-orphan",
)
vote_records: Mapped[list[VoteRecord]] = relationship(
"VoteRecord",
back_populates="legislator",
cascade="all, delete-orphan",
)
class LegislatorSocialMedia(DataScienceDevTableBase):
"""Social media account linked to a legislator."""
__tablename__ = "legislator_social_media"
legislator_id: Mapped[int] = mapped_column(ForeignKey("main.legislator.id"))
platform: Mapped[str]
account_name: Mapped[str]
url: Mapped[str | None]
source: Mapped[str]
legislator: Mapped[Legislator] = relationship(back_populates="social_media_accounts")

View File

@@ -1,79 +0,0 @@
"""Vote model - roll call votes in Congress."""
from __future__ import annotations
from datetime import date
from typing import TYPE_CHECKING
from sqlalchemy import ForeignKey, Index, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from python.orm.data_science_dev.base import DataScienceDevBase, DataScienceDevTableBase
if TYPE_CHECKING:
from python.orm.data_science_dev.congress.bill import Bill
from python.orm.data_science_dev.congress.legislator import Legislator
from python.orm.data_science_dev.congress.vote import Vote
class VoteRecord(DataScienceDevBase):
"""Links a vote to a legislator with their position (Yea, Nay, etc.)."""
__tablename__ = "vote_record"
vote_id: Mapped[int] = mapped_column(
ForeignKey("main.vote.id", ondelete="CASCADE"),
primary_key=True,
)
legislator_id: Mapped[int] = mapped_column(
ForeignKey("main.legislator.id", ondelete="CASCADE"),
primary_key=True,
)
position: Mapped[str]
vote: Mapped[Vote] = relationship("Vote", back_populates="vote_records")
legislator: Mapped[Legislator] = relationship("Legislator", back_populates="vote_records")
class Vote(DataScienceDevTableBase):
"""Roll call votes with counts and optional bill linkage."""
__tablename__ = "vote"
congress: Mapped[int]
chamber: Mapped[str]
session: Mapped[int]
number: Mapped[int]
vote_type: Mapped[str | None]
question: Mapped[str | None]
result: Mapped[str | None]
result_text: Mapped[str | None]
vote_date: Mapped[date]
yea_count: Mapped[int | None]
nay_count: Mapped[int | None]
not_voting_count: Mapped[int | None]
present_count: Mapped[int | None]
bill_id: Mapped[int | None] = mapped_column(ForeignKey("main.bill.id"))
bill: Mapped[Bill | None] = relationship("Bill", back_populates="votes")
vote_records: Mapped[list[VoteRecord]] = relationship(
"VoteRecord",
back_populates="vote",
cascade="all, delete-orphan",
)
__table_args__ = (
UniqueConstraint(
"congress",
"chamber",
"session",
"number",
name="uq_vote_congress_chamber_session_number",
),
Index("ix_vote_date", "vote_date"),
Index("ix_vote_congress_chamber", "congress", "chamber"),
)

View File

@@ -1,16 +0,0 @@
"""Data science dev database ORM models."""
from __future__ import annotations
from python.orm.data_science_dev.congress import Bill, BillText, Legislator, Vote, VoteRecord
from python.orm.data_science_dev.posts import partitions # noqa: F401 — registers partition classes in metadata
from python.orm.data_science_dev.posts.tables import Posts
__all__ = [
"Bill",
"BillText",
"Legislator",
"Posts",
"Vote",
"VoteRecord",
]

View File

@@ -1,11 +0,0 @@
"""Posts module — weekly-partitioned posts table and partition ORM models."""
from __future__ import annotations
from python.orm.data_science_dev.posts.failed_ingestion import FailedIngestion
from python.orm.data_science_dev.posts.tables import Posts
__all__ = [
"FailedIngestion",
"Posts",
]

View File

@@ -1,33 +0,0 @@
"""Shared column definitions for the posts partitioned table family."""
from __future__ import annotations
from datetime import datetime
from sqlalchemy import BigInteger, SmallInteger, Text
from sqlalchemy.orm import Mapped, mapped_column
class PostsColumns:
"""Mixin providing all posts columns. Used by both the parent table and partitions."""
post_id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
user_id: Mapped[int] = mapped_column(BigInteger)
instance: Mapped[str]
date: Mapped[datetime] = mapped_column(primary_key=True)
text: Mapped[str] = mapped_column(Text)
langs: Mapped[str | None]
like_count: Mapped[int]
reply_count: Mapped[int]
repost_count: Mapped[int]
reply_to: Mapped[int | None] = mapped_column(BigInteger)
replied_author: Mapped[int | None] = mapped_column(BigInteger)
thread_root: Mapped[int | None] = mapped_column(BigInteger)
thread_root_author: Mapped[int | None] = mapped_column(BigInteger)
repost_from: Mapped[int | None] = mapped_column(BigInteger)
reposted_author: Mapped[int | None] = mapped_column(BigInteger)
quotes: Mapped[int | None] = mapped_column(BigInteger)
quoted_author: Mapped[int | None] = mapped_column(BigInteger)
labels: Mapped[str | None]
sent_label: Mapped[int | None] = mapped_column(SmallInteger)
sent_score: Mapped[float | None]

View File

@@ -1,17 +0,0 @@
"""Table for storing JSONL lines that failed during post ingestion."""
from __future__ import annotations
from sqlalchemy import Text
from sqlalchemy.orm import Mapped, mapped_column
from python.orm.data_science_dev.base import DataScienceDevTableBase
class FailedIngestion(DataScienceDevTableBase):
"""Stores raw JSONL lines and their error messages when ingestion fails."""
__tablename__ = "failed_ingestion"
raw_line: Mapped[str] = mapped_column(Text)
error: Mapped[str] = mapped_column(Text)

View File

@@ -1,71 +0,0 @@
"""Dynamically generated ORM classes for each weekly partition of the posts table.
Each class maps to a PostgreSQL partition table (e.g. posts_2024_01).
These are real ORM models tracked by Alembic autogenerate.
Uses ISO week numbering (datetime.isocalendar().week). ISO years can have
52 or 53 weeks, and week boundaries are always Monday to Monday.
"""
from __future__ import annotations
import sys
from datetime import UTC, datetime
from python.orm.data_science_dev.base import DataScienceDevBase
from python.orm.data_science_dev.posts.columns import PostsColumns
PARTITION_START_YEAR = 2023
PARTITION_END_YEAR = 2026
_current_module = sys.modules[__name__]
def iso_weeks_in_year(year: int) -> int:
"""Return the number of ISO weeks in a given year (52 or 53)."""
dec_28 = datetime(year, 12, 28, tzinfo=UTC)
return dec_28.isocalendar().week
def week_bounds(year: int, week: int) -> tuple[datetime, datetime]:
"""Return (start, end) datetimes for an ISO week.
Start = Monday 00:00:00 UTC of the given ISO week.
End = Monday 00:00:00 UTC of the following ISO week.
"""
start = datetime.fromisocalendar(year, week, 1).replace(tzinfo=UTC)
if week < iso_weeks_in_year(year):
end = datetime.fromisocalendar(year, week + 1, 1).replace(tzinfo=UTC)
else:
end = datetime.fromisocalendar(year + 1, 1, 1).replace(tzinfo=UTC)
return start, end
def _build_partition_classes() -> dict[str, type]:
"""Generate one ORM class per ISO week partition."""
classes: dict[str, type] = {}
for year in range(PARTITION_START_YEAR, PARTITION_END_YEAR + 1):
for week in range(1, iso_weeks_in_year(year) + 1):
class_name = f"PostsWeek{year}W{week:02d}"
table_name = f"posts_{year}_{week:02d}"
partition_class = type(
class_name,
(PostsColumns, DataScienceDevBase),
{
"__tablename__": table_name,
"__table_args__": ({"implicit_returning": False},),
},
)
classes[class_name] = partition_class
return classes
# Generate all partition classes and register them on this module
_partition_classes = _build_partition_classes()
for _name, _cls in _partition_classes.items():
setattr(_current_module, _name, _cls)
__all__ = list(_partition_classes.keys())

View File

@@ -1,13 +0,0 @@
"""Posts parent table with PostgreSQL weekly range partitioning on date column."""
from __future__ import annotations
from python.orm.data_science_dev.base import DataScienceDevBase
from python.orm.data_science_dev.posts.columns import PostsColumns
class Posts(PostsColumns, DataScienceDevBase):
"""Parent partitioned table for posts, partitioned by week on `date`."""
__tablename__ = "posts"
__table_args__ = ({"postgresql_partition_by": "RANGE (date)"},)

View File

@@ -3,6 +3,7 @@
from __future__ import annotations from __future__ import annotations
from python.orm.richie.base import RichieBase, TableBase, TableBaseBig, TableBaseSmall from python.orm.richie.base import RichieBase, TableBase, TableBaseBig, TableBaseSmall
from python.orm.richie.congress import Bill, Legislator, Vote, VoteRecord
from python.orm.richie.contact import ( from python.orm.richie.contact import (
Contact, Contact,
ContactNeed, ContactNeed,
@@ -12,13 +13,17 @@ from python.orm.richie.contact import (
) )
__all__ = [ __all__ = [
"Bill",
"Contact", "Contact",
"ContactNeed", "ContactNeed",
"ContactRelationship", "ContactRelationship",
"Legislator",
"Need", "Need",
"RelationshipType", "RelationshipType",
"RichieBase", "RichieBase",
"TableBase", "TableBase",
"TableBaseBig", "TableBaseBig",
"TableBaseSmall", "TableBaseSmall",
"Vote",
"VoteRecord",
] ]

View File

@@ -0,0 +1,150 @@
"""Congress Tracker database models."""
from __future__ import annotations
from datetime import date
from sqlalchemy import ForeignKey, Index, Text, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from python.orm.richie.base import RichieBase, TableBase
class Legislator(TableBase):
"""Legislator model - members of Congress."""
__tablename__ = "legislator"
# Natural key - bioguide ID is the authoritative identifier
bioguide_id: Mapped[str] = mapped_column(Text, unique=True, index=True)
# Other IDs for cross-referencing
thomas_id: Mapped[str | None]
lis_id: Mapped[str | None]
govtrack_id: Mapped[int | None]
opensecrets_id: Mapped[str | None]
fec_ids: Mapped[str | None] # JSON array stored as string
# Name info
first_name: Mapped[str]
last_name: Mapped[str]
official_full_name: Mapped[str | None]
nickname: Mapped[str | None]
# Bio
birthday: Mapped[date | None]
gender: Mapped[str | None] # M/F
# Current term info (denormalized for query efficiency)
current_party: Mapped[str | None]
current_state: Mapped[str | None]
current_district: Mapped[int | None] # House only
current_chamber: Mapped[str | None] # rep/sen
# Relationships
vote_records: Mapped[list[VoteRecord]] = relationship(
"VoteRecord",
back_populates="legislator",
cascade="all, delete-orphan",
)
class Bill(TableBase):
"""Bill model - legislation introduced in Congress."""
__tablename__ = "bill"
# Composite natural key: congress + bill_type + number
congress: Mapped[int]
bill_type: Mapped[str] # hr, s, hres, sres, hjres, sjres
number: Mapped[int]
# Bill info
title: Mapped[str | None]
title_short: Mapped[str | None]
official_title: Mapped[str | None]
# Status
status: Mapped[str | None]
status_at: Mapped[date | None]
# Sponsor
sponsor_bioguide_id: Mapped[str | None]
# Subjects
subjects_top_term: Mapped[str | None]
# Relationships
votes: Mapped[list[Vote]] = relationship(
"Vote",
back_populates="bill",
)
__table_args__ = (
UniqueConstraint("congress", "bill_type", "number", name="uq_bill_congress_type_number"),
Index("ix_bill_congress", "congress"),
)
class Vote(TableBase):
"""Vote model - roll call votes in Congress."""
__tablename__ = "vote"
# Composite natural key: congress + chamber + session + number
congress: Mapped[int]
chamber: Mapped[str] # house/senate
session: Mapped[int]
number: Mapped[int]
# Vote details
vote_type: Mapped[str | None]
question: Mapped[str | None]
result: Mapped[str | None]
result_text: Mapped[str | None]
# Timing
vote_date: Mapped[date]
# Vote counts (denormalized for efficiency)
yea_count: Mapped[int | None]
nay_count: Mapped[int | None]
not_voting_count: Mapped[int | None]
present_count: Mapped[int | None]
# Related bill (optional - not all votes are on bills)
bill_id: Mapped[int | None] = mapped_column(ForeignKey("main.bill.id"))
# Relationships
bill: Mapped[Bill | None] = relationship("Bill", back_populates="votes")
vote_records: Mapped[list[VoteRecord]] = relationship(
"VoteRecord",
back_populates="vote",
cascade="all, delete-orphan",
)
__table_args__ = (
UniqueConstraint("congress", "chamber", "session", "number", name="uq_vote_congress_chamber_session_number"),
Index("ix_vote_date", "vote_date"),
Index("ix_vote_congress_chamber", "congress", "chamber"),
)
class VoteRecord(RichieBase):
"""Association table: Vote <-> Legislator with position."""
__tablename__ = "vote_record"
vote_id: Mapped[int] = mapped_column(
ForeignKey("main.vote.id", ondelete="CASCADE"),
primary_key=True,
)
legislator_id: Mapped[int] = mapped_column(
ForeignKey("main.legislator.id", ondelete="CASCADE"),
primary_key=True,
)
position: Mapped[str] # Yea, Nay, Not Voting, Present
# Relationships
vote: Mapped[Vote] = relationship("Vote", back_populates="vote_records")
legislator: Mapped[Legislator] = relationship("Legislator", back_populates="vote_records")

View File

@@ -63,9 +63,9 @@ class DeviceRegistry:
return return
with Session(self.engine) as session: with Session(self.engine) as session:
device = session.scalars( device = session.execute(
select(SignalDevice).where(SignalDevice.phone_number == phone_number) select(SignalDevice).where(SignalDevice.phone_number == phone_number)
).one_or_none() ).scalar_one_or_none()
if device: if device:
if device.safety_number != safety_number and device.trust_level != TrustLevel.BLOCKED: if device.safety_number != safety_number and device.trust_level != TrustLevel.BLOCKED:
@@ -99,9 +99,9 @@ class DeviceRegistry:
Returns True if the device was found and verified. Returns True if the device was found and verified.
""" """
with Session(self.engine) as session: with Session(self.engine) as session:
device = session.scalars( device = session.execute(
select(SignalDevice).where(SignalDevice.phone_number == phone_number) select(SignalDevice).where(SignalDevice.phone_number == phone_number)
).one_or_none() ).scalar_one_or_none()
if not device: if not device:
logger.warning(f"Cannot verify unknown device: {phone_number}") logger.warning(f"Cannot verify unknown device: {phone_number}")
@@ -139,9 +139,9 @@ class DeviceRegistry:
def grant_role(self, phone_number: str, role: Role) -> bool: def grant_role(self, phone_number: str, role: Role) -> bool:
"""Add a role to a device. Called by admin over SSH.""" """Add a role to a device. Called by admin over SSH."""
with Session(self.engine) as session: with Session(self.engine) as session:
device = session.scalars( device = session.execute(
select(SignalDevice).where(SignalDevice.phone_number == phone_number) select(SignalDevice).where(SignalDevice.phone_number == phone_number)
).one_or_none() ).scalar_one_or_none()
if not device: if not device:
logger.warning(f"Cannot grant role for unknown device: {phone_number}") logger.warning(f"Cannot grant role for unknown device: {phone_number}")
@@ -150,7 +150,7 @@ class DeviceRegistry:
if any(record.name == role for record in device.roles): if any(record.name == role for record in device.roles):
return True return True
role_record = session.scalars(select(RoleRecord).where(RoleRecord.name == role)).one_or_none() role_record = session.execute(select(RoleRecord).where(RoleRecord.name == role)).scalar_one_or_none()
if not role_record: if not role_record:
logger.warning(f"Unknown role: {role}") logger.warning(f"Unknown role: {role}")
@@ -165,9 +165,9 @@ class DeviceRegistry:
def revoke_role(self, phone_number: str, role: Role) -> bool: def revoke_role(self, phone_number: str, role: Role) -> bool:
"""Remove a role from a device. Called by admin over SSH.""" """Remove a role from a device. Called by admin over SSH."""
with Session(self.engine) as session: with Session(self.engine) as session:
device = session.scalars( device = session.execute(
select(SignalDevice).where(SignalDevice.phone_number == phone_number) select(SignalDevice).where(SignalDevice.phone_number == phone_number)
).one_or_none() ).scalar_one_or_none()
if not device: if not device:
logger.warning(f"Cannot revoke role for unknown device: {phone_number}") logger.warning(f"Cannot revoke role for unknown device: {phone_number}")
@@ -182,16 +182,16 @@ class DeviceRegistry:
def set_roles(self, phone_number: str, roles: list[Role]) -> bool: def set_roles(self, phone_number: str, roles: list[Role]) -> bool:
"""Replace all roles for a device. Called by admin over SSH.""" """Replace all roles for a device. Called by admin over SSH."""
with Session(self.engine) as session: with Session(self.engine) as session:
device = session.scalars( device = session.execute(
select(SignalDevice).where(SignalDevice.phone_number == phone_number) select(SignalDevice).where(SignalDevice.phone_number == phone_number)
).one_or_none() ).scalar_one_or_none()
if not device: if not device:
logger.warning(f"Cannot set roles for unknown device: {phone_number}") logger.warning(f"Cannot set roles for unknown device: {phone_number}")
return False return False
role_names = [str(role) for role in roles] role_names = [str(role) for role in roles]
records = session.scalars(select(RoleRecord).where(RoleRecord.name.in_(role_names))).all() records = list(session.execute(select(RoleRecord).where(RoleRecord.name.in_(role_names))).scalars().all())
device.roles = records device.roles = records
session.commit() session.commit()
self._update_cache(phone_number, device) self._update_cache(phone_number, device)
@@ -203,7 +203,7 @@ class DeviceRegistry:
def list_devices(self) -> list[SignalDevice]: def list_devices(self) -> list[SignalDevice]:
"""Return all known devices.""" """Return all known devices."""
with Session(self.engine) as session: with Session(self.engine) as session:
return list(session.scalars(select(SignalDevice)).all()) return list(session.execute(select(SignalDevice)).scalars().all())
def sync_identities(self) -> None: def sync_identities(self) -> None:
"""Pull identity list from signal-cli and record any new ones.""" """Pull identity list from signal-cli and record any new ones."""
@@ -226,7 +226,9 @@ class DeviceRegistry:
def _load_device(self, phone_number: str) -> SignalDevice | None: def _load_device(self, phone_number: str) -> SignalDevice | None:
"""Fetch a device by phone number (with joined roles).""" """Fetch a device by phone number (with joined roles)."""
with Session(self.engine) as session: with Session(self.engine) as session:
return session.scalars(select(SignalDevice).where(SignalDevice.phone_number == phone_number)).one_or_none() return session.execute(
select(SignalDevice).where(SignalDevice.phone_number == phone_number)
).scalar_one_or_none()
def _update_cache(self, phone_number: str, device: SignalDevice) -> None: def _update_cache(self, phone_number: str, device: SignalDevice) -> None:
"""Refresh the cache entry for a device.""" """Refresh the cache entry for a device."""
@@ -242,9 +244,9 @@ class DeviceRegistry:
def _set_trust(self, phone_number: str, level: str, log_msg: str | None = None) -> bool: def _set_trust(self, phone_number: str, level: str, log_msg: str | None = None) -> bool:
"""Update the trust level for a device.""" """Update the trust level for a device."""
with Session(self.engine) as session: with Session(self.engine) as session:
device = session.scalars( device = session.execute(
select(SignalDevice).where(SignalDevice.phone_number == phone_number) select(SignalDevice).where(SignalDevice.phone_number == phone_number)
).one_or_none() ).scalar_one_or_none()
if not device: if not device:
return False return False
@@ -267,7 +269,7 @@ def sync_roles(engine: Engine) -> None:
expected = {role.value for role in Role} expected = {role.value for role in Role}
with Session(engine) as session: with Session(engine) as session:
existing = set(session.scalars(select(RoleRecord.name)).all()) existing = {record.name for record in session.execute(select(RoleRecord)).scalars().all()}
to_add = expected - existing to_add = expected - existing
to_remove = existing - expected to_remove = existing - expected

View File

@@ -34,9 +34,8 @@ def main(config_file: Path) -> None:
logger.error(msg) logger.error(msg)
signal_alert(msg) signal_alert(msg)
continue continue
count_lookup = get_count_lookup(config_file, dataset.name)
logger.info(f"using {count_lookup} for {dataset.name}") get_snapshots_to_delete(dataset, get_count_lookup(config_file, dataset.name))
get_snapshots_to_delete(dataset, count_lookup)
except Exception: except Exception:
logger.exception("snapshot_manager failed") logger.exception("snapshot_manager failed")
signal_alert("snapshot_manager failed") signal_alert("snapshot_manager failed")
@@ -100,7 +99,6 @@ def get_snapshots_to_delete(
""" """
snapshots = dataset.get_snapshots() snapshots = dataset.get_snapshots()
logger.info(f"calculating snapshots for {dataset.name} to be deleted")
if not snapshots: if not snapshots:
logger.info(f"{dataset.name} has no snapshots") logger.info(f"{dataset.name} has no snapshots")
return return

View File

@@ -23,7 +23,6 @@
"magistral:24b" "magistral:24b"
"ministral-3:14b" "ministral-3:14b"
"nemotron-3-nano:30b" "nemotron-3-nano:30b"
"nemotron-3-nano:4b"
"nemotron-cascade-2:30b" "nemotron-cascade-2:30b"
"qwen3-coder:30b" "qwen3-coder:30b"
"qwen3-embedding:0.6b" "qwen3-embedding:0.6b"

View File

@@ -15,20 +15,18 @@ sudo zpool add storage -o ashift=12 logs mirror
sudo zpool create scratch -o ashift=12 -O acltype=posixacl -O atime=off -O dnodesize=auto -O xattr=sa -O compression=zstd -O encryption=aes-256-gcm -O keyformat=hex -O keylocation=file:///key -m /zfs/scratch sudo zpool create scratch -o ashift=12 -O acltype=posixacl -O atime=off -O dnodesize=auto -O xattr=sa -O compression=zstd -O encryption=aes-256-gcm -O keyformat=hex -O keylocation=file:///key -m /zfs/scratch
# media datasets # media datasets
sudo zfs create media/temp -o sync=disabled -o redundant_metadata=none
sudo zfs create media/secure -o encryption=aes-256-gcm -o keyformat=hex -o keylocation=file:///root/zfs.key sudo zfs create media/secure -o encryption=aes-256-gcm -o keyformat=hex -o keylocation=file:///root/zfs.key
sudo zfs create media/secure/docker -o compression=zstd-9 sudo zfs create media/secure/docker -o compression=zstd-9
sudo zfs create media/secure/github-runners -o compression=zstd-9 -o sync=disabled sudo zfs create media/secure/github-runners -o compression=zstd-9 -o sync=disabled
sudo zfs create media/secure/home_assistant -o compression=zstd-19 sudo zfs create media/secure/home_assistant -o compression=zstd-19
sudo zfs create media/secure/notes -o copies=2 sudo zfs create media/secure/notes -o copies=2
sudo zfs create media/secure/postgres -o mountpoint=/zfs/media/database/postgres -o recordsize=16k -o primarycache=metadata sudo zfs create media/secure/postgres -o recordsize=16k -o primarycache=metadata
sudo zfs create media/secure/postgres-wal -o mountpoint=/zfs/media/database/postgres-wal -o recordsize=32k -o primarycache=metadata -o special_small_blocks=32K -o compression=lz4 -o secondarycache=none -o logbias=latency
sudo zfs create media/secure/services -o compression=zstd-9 sudo zfs create media/secure/services -o compression=zstd-9
sudo zfs create media/secure/share -o mountpoint=/zfs/media/share -o exec=off sudo zfs create media/secure/share -o mountpoint=/zfs/media/share -o exec=off
# scratch datasets # scratch datasets
sudo zfs create scratch/kafka -o mountpoint=/zfs/scratch/kafka -o recordsize=1M sudo zfs create scratch/kafka -o mountpoint=/zfs/scratch/kafka -o recordsize=1M
sudo zfs create scratch/transmission -o mountpoint=/zfs/scratch/transmission -o recordsize=16k -o sync=disabled -o redundant_metadata=none sudo zfs create scratch/transmission -o mountpoint=/zfs/scratch/transmission -o recordsize=16k -o sync=disabled
# storage datasets # storage datasets
sudo zfs create storage/ollama -o recordsize=1M -o compression=zstd-19 -o sync=disabled sudo zfs create storage/ollama -o recordsize=1M -o compression=zstd-19 -o sync=disabled

View File

@@ -1,24 +0,0 @@
{
services.hedgedoc = {
enable = true;
settings = {
host = "0.0.0.0";
port = 3000;
domain = "192.168.90.40";
urlAddPort = true;
protocolUseSSL = false;
db = {
dialect = "postgres";
database = "hedgedoc";
username = "hedgedoc";
host = "/run/postgresql";
};
};
};
networking.firewall.allowedTCPPorts = [ 3000 ];
systemd.services.hedgedoc = {
after = [ "postgresql.service" ];
requires = [ "postgresql.service" ];
};
}

View File

@@ -3,7 +3,7 @@ let
in in
{ {
services.apache-kafka = { services.apache-kafka = {
enable = false; enable = true;
settings = { settings = {
listeners = [ "PLAINTEXT://localhost:9092" ]; listeners = [ "PLAINTEXT://localhost:9092" ];
"log.dirs" = [ vars.kafka ]; "log.dirs" = [ vars.kafka ];

View File

@@ -5,10 +5,6 @@ in
{ {
networking.firewall.allowedTCPPorts = [ 5432 ]; networking.firewall.allowedTCPPorts = [ 5432 ];
# Symlink pg_wal to a ZFS dataset on the special (metadata) vdev for fast WAL writes
# this is required for systemd sandboxing
systemd.services.postgresql.serviceConfig.ReadWritePaths = [ "/zfs/media/database/postgres-wal" ];
services.postgresql = { services.postgresql = {
enable = true; enable = true;
package = pkgs.postgresql_17_jit; package = pkgs.postgresql_17_jit;
@@ -37,19 +33,12 @@ in
# signalbot # signalbot
local signalbot signalbot trust local signalbot signalbot trust
# hedgedoc
local hedgedoc hedgedoc trust
# math # math
local postgres math trust local postgres math trust
host postgres math 127.0.0.1/32 trust host postgres math 127.0.0.1/32 trust
host postgres math ::1/128 trust host postgres math ::1/128 trust
host postgres math 192.168.90.1/24 trust host postgres math 192.168.90.1/24 trust
local data_science_dev math trust
host data_science_dev math 127.0.0.1/32 trust
host data_science_dev math ::1/128 trust
host data_science_dev math 192.168.90.1/24 trust
''; '';
identMap = '' identMap = ''
@@ -119,19 +108,10 @@ in
login = true; login = true;
}; };
} }
{
name = "hedgedoc";
ensureDBOwnership = true;
ensureClauses = {
login = true;
};
}
]; ];
ensureDatabases = [ ensureDatabases = [
"data_science_dev"
"hass" "hass"
"gitea" "gitea"
"hedgedoc"
"math" "math"
"n8n" "n8n"
"richie" "richie"

View File

@@ -4,7 +4,6 @@ hourly = 24
daily = 0 daily = 0
monthly = 0 monthly = 0
# root_pool
["root_pool/home"] ["root_pool/home"]
15_min = 8 15_min = 8
hourly = 24 hourly = 24
@@ -28,96 +27,57 @@ monthly = 0
hourly = 24 hourly = 24
daily = 30 daily = 30
monthly = 6 monthly = 6
# storage
["storage/ollama"]
15_min = 2
hourly = 0
daily = 0
monthly = 0
["storage/secure"] ["storage/plex"]
15_min = 0
hourly = 0
daily = 0
monthly = 0
["storage/secure/plex"]
15_min = 6 15_min = 6
hourly = 2 hourly = 2
daily = 1 daily = 1
monthly = 0 monthly = 0
["storage/secure/transmission"] ["media/plex"]
15_min = 4 15_min = 6
hourly = 0 hourly = 2
daily = 0 daily = 1
monthly = 0 monthly = 0
["storage/secure/secrets"] ["media/notes"]
15_min = 8 15_min = 8
hourly = 24 hourly = 24
daily = 30 daily = 30
monthly = 12 monthly = 12
# media ["media/docker"]
["media/temp"] 15_min = 3
15_min = 2 hourly = 12
hourly = 0 daily = 14
daily = 0 monthly = 2
monthly = 0
["media/services"]
["media/secure"] 15_min = 3
15_min = 0 hourly = 12
hourly = 0 daily = 14
daily = 0 monthly = 2
monthly = 0
["media/home_assistant"]
["media/secure/plex"]
15_min = 6
hourly = 2
daily = 1
monthly = 0
["media/secure/postgres-wal"]
15_min = 4
hourly = 2
daily = 0
monthly = 0
["media/secure/postgres"]
15_min = 8
hourly = 24
daily = 7
monthly = 0
["media/secure/share"]
15_min = 4
hourly = 0
daily = 0
monthly = 0
["media/secure/github-runners"]
15_min = 6
hourly = 2
daily = 1
monthly = 0
["media/secure/notes"]
15_min = 8
hourly = 24
daily = 30
monthly = 12
["media/secure/docker"]
15_min = 3 15_min = 3
hourly = 12 hourly = 12
daily = 14 daily = 14
monthly = 2 monthly = 2
# scratch
["scratch/transmission"] ["scratch/transmission"]
15_min = 2 15_min = 0
hourly = 0
daily = 0
monthly = 0
["storage/transmission"]
15_min = 0
hourly = 0
daily = 0
monthly = 0
["storage/ollama"]
15_min = 0
hourly = 0 hourly = 0
daily = 0 daily = 0
monthly = 0 monthly = 0

View File

@@ -14,7 +14,6 @@
./llms.nix ./llms.nix
./open_webui.nix ./open_webui.nix
./qmk.nix ./qmk.nix
./sunshine.nix
./syncthing.nix ./syncthing.nix
inputs.nixos-hardware.nixosModules.framework-13-7040-amd inputs.nixos-hardware.nixosModules.framework-13-7040-amd
]; ];

View File

@@ -8,8 +8,9 @@
"deepscaler:1.5b" "deepscaler:1.5b"
"deepseek-r1:8b" "deepseek-r1:8b"
"gemma3:12b" "gemma3:12b"
"gemma3:27b"
"gpt-oss:20b"
"lfm2:24b" "lfm2:24b"
"nemotron-3-nano:4b"
"qwen3:14b" "qwen3:14b"
"qwen3.5:27b" "qwen3.5:27b"
]; ];

View File

@@ -1,28 +0,0 @@
{ pkgs, ... }:
{
services.sunshine = {
enable = true;
openFirewall = true;
capSysAdmin = true;
};
environment.systemPackages = [ pkgs.kdePackages.libkscreen ];
boot = {
kernelParams = [
"drm.edid_firmware=DP-4:edid/virtual-display.bin"
"video=DP-4:e"
];
};
hardware.firmware = [
(pkgs.runCommandLocal "virtual-display-edid"
{
compressFirmware = false;
}
''
mkdir -p $out/lib/firmware/edid
cp ${./edid/virtual-display.bin} $out/lib/firmware/edid/virtual-display.bin
''
)
];
}

View File

@@ -210,9 +210,9 @@ class TestContactCache:
mock_session_cls.return_value.__exit__ = MagicMock(return_value=False) mock_session_cls.return_value.__exit__ = MagicMock(return_value=False)
mock_device = MagicMock() mock_device = MagicMock()
mock_device.trust_level = TrustLevel.UNVERIFIED mock_device.trust_level = TrustLevel.UNVERIFIED
mock_session.scalars.return_value.one_or_none.return_value = mock_device mock_session.execute.return_value.scalar_one_or_none.return_value = mock_device
registry.record_contact("+1234", "abc") registry.record_contact("+1234", "abc")
mock_session.scalars.assert_called_once() mock_session.execute.assert_called_once()
class TestLocationCommand: class TestLocationCommand:

86
tools/line_counter.py Normal file
View File

@@ -0,0 +1,86 @@
"""Count lines of code in the repository, grouped by file type."""
from __future__ import annotations
import subprocess
from collections import defaultdict
from pathlib import Path
def get_tracked_files() -> list[str]:
"""Get all git-tracked files."""
result = subprocess.run(
["git", "ls-files"], # noqa: S603, S607
capture_output=True,
text=True,
check=True,
)
return [f for f in result.stdout.strip().splitlines() if f]
def count_lines(filepath: str) -> int:
"""Count lines in a file, returning 0 for binary files."""
try:
return len(Path(filepath).read_text(encoding="utf-8").splitlines())
except (UnicodeDecodeError, OSError):
return 0
def count_lines_by_type() -> dict[str, int]:
"""Count lines grouped by file extension."""
lines_by_type: dict[str, int] = defaultdict(int)
for filepath in get_tracked_files():
ext = Path(filepath).suffix.lstrip(".")
if not ext:
ext = Path(filepath).name
lines_by_type[ext] += count_lines(filepath)
# Exclude binary/non-code files
for key in ("png", "lock"):
lines_by_type.pop(key, None)
return dict(sorted(lines_by_type.items(), key=lambda x: x[1], reverse=True))
def format_report() -> str:
"""Generate a formatted line count report."""
lines_by_type = count_lines_by_type()
total = sum(lines_by_type.values())
lines = [
f"This repo has **{total:,}** lines of technical debt.",
"",
"| File Type | Lines | Percentage |",
"|-----------|------:|-----------:|",
]
for ext, count in lines_by_type.items():
if count > 0:
pct = count / total * 100
prefix = "." if not ext.startswith(".") else ""
lines.append(f"| {prefix}{ext} | {count:,} | {pct:.1f}% |")
return "\n".join(lines)
def update_readme() -> None:
"""Update README.md with the line count report."""
readme_path = Path("README.md")
report = format_report()
start_marker = "<!-- LINE-COUNT-START -->"
end_marker = "<!-- LINE-COUNT-END -->"
content = readme_path.read_text(encoding="utf-8")
section = f"{start_marker}\n{report}\n{end_marker}"
if start_marker in content:
start = content.index(start_marker)
end = content.index(end_marker) + len(end_marker)
content = content[:start] + section + content[end:]
else:
content = content.rstrip() + "\n\n" + section + "\n"
readme_path.write_text(content, encoding="utf-8")
if __name__ == "__main__":
update_readme()

View File

@@ -20,15 +20,15 @@
// turns off all sounds and announcements // turns off all sounds and announcements
"accessibility.signals.terminalCommandFailed": { "accessibility.signals.terminalCommandFailed": {
"sound": "off", "sound": "off",
"announcement": "off", "announcement": "off"
}, },
"accessibility.signals.terminalQuickFix": { "accessibility.signals.terminalQuickFix": {
"sound": "off", "sound": "off",
"announcement": "off", "announcement": "off"
}, },
"accessibility.signals.terminalBell": { "accessibility.signals.terminalBell": {
"sound": "off", "sound": "off",
"announcement": "off", "announcement": "off"
}, },
// database settings // database settings
@@ -41,8 +41,8 @@
"driver": "PostgreSQL", "driver": "PostgreSQL",
"name": "main", "name": "main",
"database": "postgres", "database": "postgres",
"username": "richie", "username": "richie"
}, }
], ],
// formatters // formatters
@@ -55,7 +55,7 @@
"[yaml]": { "editor.defaultFormatter": "redhat.vscode-yaml" }, "[yaml]": { "editor.defaultFormatter": "redhat.vscode-yaml" },
"[javascriptreact]": { "editor.defaultFormatter": "esbenp.prettier-vscode" }, "[javascriptreact]": { "editor.defaultFormatter": "esbenp.prettier-vscode" },
"[github-actions-workflow]": { "[github-actions-workflow]": {
"editor.defaultFormatter": "redhat.vscode-yaml", "editor.defaultFormatter": "redhat.vscode-yaml"
}, },
"[dockercompose]": { "[dockercompose]": {
"editor.insertSpaces": true, "editor.insertSpaces": true,
@@ -64,9 +64,9 @@
"editor.quickSuggestions": { "editor.quickSuggestions": {
"other": true, "other": true,
"comments": false, "comments": false,
"strings": true, "strings": true
}, },
"editor.defaultFormatter": "redhat.vscode-yaml", "editor.defaultFormatter": "redhat.vscode-yaml"
}, },
// spell check // spell check
@@ -78,8 +78,7 @@
"Corvidae", "Corvidae",
"drivername", "drivername",
"fastapi", "fastapi",
"sandboxing", "syncthing"
"syncthing",
], ],
// nix // nix
@@ -97,5 +96,5 @@
// new // new
"hediet.vscode-drawio.resizeImages": null, "hediet.vscode-drawio.resizeImages": null,
"hediet.vscode-drawio.appearance": "automatic", "hediet.vscode-drawio.appearance": "automatic",
"claudeCode.preferredLocation": "panel", "claudeCode.preferredLocation": "panel"
} }