adding database_cli.py and alembic

This commit is contained in:
2026-04-21 11:08:00 -04:00
parent 0c99a63347
commit e5ba089479
12 changed files with 6214 additions and 0 deletions

View File

@@ -0,0 +1,50 @@
"""adding FailedIngestion.
Revision ID: 2f43120e3ffc
Revises: f99be864fe69
Create Date: 2026-03-24 23:46:17.277897
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from python.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "2f43120e3ffc"
down_revision: str | None = "f99be864fe69"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"failed_ingestion",
sa.Column("raw_line", sa.Text(), nullable=False),
sa.Column("error", sa.Text(), nullable=False),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("pk_failed_ingestion")),
schema=schema,
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("failed_ingestion", schema=schema)
# ### end Alembic commands ###

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,72 @@
"""Attach all partition tables to the posts parent table.
Alembic autogenerate creates partition tables as standalone tables but does not
emit the ALTER TABLE ... ATTACH PARTITION statements needed for PostgreSQL to
route inserts to the correct partition.
Revision ID: a1b2c3d4e5f6
Revises: 605b1794838f
Create Date: 2026-03-25 10:00:00.000000
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from alembic import op
from sqlalchemy import text
from python.orm import DataScienceDevBase
from python.orm.data_science_dev.posts.partitions import (
PARTITION_END_YEAR,
PARTITION_START_YEAR,
iso_weeks_in_year,
week_bounds,
)
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "a1b2c3d4e5f6"
down_revision: str | None = "605b1794838f"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
ALREADY_ATTACHED_QUERY = text("""
SELECT inhrelid::regclass::text
FROM pg_inherits
WHERE inhparent = :parent::regclass
""")
def upgrade() -> None:
"""Attach all weekly partition tables to the posts parent table."""
connection = op.get_bind()
already_attached = {row[0] for row in connection.execute(ALREADY_ATTACHED_QUERY, {"parent": f"{schema}.posts"})}
for year in range(PARTITION_START_YEAR, PARTITION_END_YEAR + 1):
for week in range(1, iso_weeks_in_year(year) + 1):
table_name = f"posts_{year}_{week:02d}"
qualified_name = f"{schema}.{table_name}"
if qualified_name in already_attached:
continue
start, end = week_bounds(year, week)
start_str = start.strftime("%Y-%m-%d %H:%M:%S")
end_str = end.strftime("%Y-%m-%d %H:%M:%S")
op.execute(
f"ALTER TABLE {schema}.posts "
f"ATTACH PARTITION {qualified_name} "
f"FOR VALUES FROM ('{start_str}') TO ('{end_str}')"
)
def downgrade() -> None:
"""Detach all weekly partition tables from the posts parent table."""
for year in range(PARTITION_START_YEAR, PARTITION_END_YEAR + 1):
for week in range(1, iso_weeks_in_year(year) + 1):
table_name = f"posts_{year}_{week:02d}"
op.execute(f"ALTER TABLE {schema}.posts DETACH PARTITION {schema}.{table_name}")

View File

@@ -0,0 +1,153 @@
"""adding congress data.
Revision ID: 83bfc8af92d8
Revises: a1b2c3d4e5f6
Create Date: 2026-03-27 10:43:02.324510
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from python.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "83bfc8af92d8"
down_revision: str | None = "a1b2c3d4e5f6"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"bill",
sa.Column("congress", sa.Integer(), nullable=False),
sa.Column("bill_type", sa.String(), nullable=False),
sa.Column("number", sa.Integer(), nullable=False),
sa.Column("title", sa.String(), nullable=True),
sa.Column("title_short", sa.String(), nullable=True),
sa.Column("official_title", sa.String(), nullable=True),
sa.Column("status", sa.String(), nullable=True),
sa.Column("status_at", sa.Date(), nullable=True),
sa.Column("sponsor_bioguide_id", sa.String(), nullable=True),
sa.Column("subjects_top_term", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("pk_bill")),
sa.UniqueConstraint("congress", "bill_type", "number", name="uq_bill_congress_type_number"),
schema=schema,
)
op.create_index("ix_bill_congress", "bill", ["congress"], unique=False, schema=schema)
op.create_table(
"legislator",
sa.Column("bioguide_id", sa.Text(), nullable=False),
sa.Column("thomas_id", sa.String(), nullable=True),
sa.Column("lis_id", sa.String(), nullable=True),
sa.Column("govtrack_id", sa.Integer(), nullable=True),
sa.Column("opensecrets_id", sa.String(), nullable=True),
sa.Column("fec_ids", sa.String(), nullable=True),
sa.Column("first_name", sa.String(), nullable=False),
sa.Column("last_name", sa.String(), nullable=False),
sa.Column("official_full_name", sa.String(), nullable=True),
sa.Column("nickname", sa.String(), nullable=True),
sa.Column("birthday", sa.Date(), nullable=True),
sa.Column("gender", sa.String(), nullable=True),
sa.Column("current_party", sa.String(), nullable=True),
sa.Column("current_state", sa.String(), nullable=True),
sa.Column("current_district", sa.Integer(), nullable=True),
sa.Column("current_chamber", sa.String(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("pk_legislator")),
schema=schema,
)
op.create_index(op.f("ix_legislator_bioguide_id"), "legislator", ["bioguide_id"], unique=True, schema=schema)
op.create_table(
"bill_text",
sa.Column("bill_id", sa.Integer(), nullable=False),
sa.Column("version_code", sa.String(), nullable=False),
sa.Column("version_name", sa.String(), nullable=True),
sa.Column("text_content", sa.String(), nullable=True),
sa.Column("date", sa.Date(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.ForeignKeyConstraint(
["bill_id"], [f"{schema}.bill.id"], name=op.f("fk_bill_text_bill_id_bill"), ondelete="CASCADE"
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_bill_text")),
sa.UniqueConstraint("bill_id", "version_code", name="uq_bill_text_bill_id_version_code"),
schema=schema,
)
op.create_table(
"vote",
sa.Column("congress", sa.Integer(), nullable=False),
sa.Column("chamber", sa.String(), nullable=False),
sa.Column("session", sa.Integer(), nullable=False),
sa.Column("number", sa.Integer(), nullable=False),
sa.Column("vote_type", sa.String(), nullable=True),
sa.Column("question", sa.String(), nullable=True),
sa.Column("result", sa.String(), nullable=True),
sa.Column("result_text", sa.String(), nullable=True),
sa.Column("vote_date", sa.Date(), nullable=False),
sa.Column("yea_count", sa.Integer(), nullable=True),
sa.Column("nay_count", sa.Integer(), nullable=True),
sa.Column("not_voting_count", sa.Integer(), nullable=True),
sa.Column("present_count", sa.Integer(), nullable=True),
sa.Column("bill_id", sa.Integer(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.ForeignKeyConstraint(["bill_id"], [f"{schema}.bill.id"], name=op.f("fk_vote_bill_id_bill")),
sa.PrimaryKeyConstraint("id", name=op.f("pk_vote")),
sa.UniqueConstraint("congress", "chamber", "session", "number", name="uq_vote_congress_chamber_session_number"),
schema=schema,
)
op.create_index("ix_vote_congress_chamber", "vote", ["congress", "chamber"], unique=False, schema=schema)
op.create_index("ix_vote_date", "vote", ["vote_date"], unique=False, schema=schema)
op.create_table(
"vote_record",
sa.Column("vote_id", sa.Integer(), nullable=False),
sa.Column("legislator_id", sa.Integer(), nullable=False),
sa.Column("position", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["legislator_id"],
[f"{schema}.legislator.id"],
name=op.f("fk_vote_record_legislator_id_legislator"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["vote_id"], [f"{schema}.vote.id"], name=op.f("fk_vote_record_vote_id_vote"), ondelete="CASCADE"
),
sa.PrimaryKeyConstraint("vote_id", "legislator_id", name=op.f("pk_vote_record")),
schema=schema,
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("vote_record", schema=schema)
op.drop_index("ix_vote_date", table_name="vote", schema=schema)
op.drop_index("ix_vote_congress_chamber", table_name="vote", schema=schema)
op.drop_table("vote", schema=schema)
op.drop_table("bill_text", schema=schema)
op.drop_index(op.f("ix_legislator_bioguide_id"), table_name="legislator", schema=schema)
op.drop_table("legislator", schema=schema)
op.drop_index("ix_bill_congress", table_name="bill", schema=schema)
op.drop_table("bill", schema=schema)
# ### end Alembic commands ###

View File

@@ -0,0 +1,58 @@
"""adding LegislatorSocialMedia.
Revision ID: 5cd7eee3549d
Revises: 83bfc8af92d8
Create Date: 2026-03-29 11:53:44.224799
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from python.orm import DataScienceDevBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "5cd7eee3549d"
down_revision: str | None = "83bfc8af92d8"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = DataScienceDevBase.schema_name
def upgrade() -> None:
"""Upgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"legislator_social_media",
sa.Column("legislator_id", sa.Integer(), nullable=False),
sa.Column("platform", sa.String(), nullable=False),
sa.Column("account_name", sa.String(), nullable=False),
sa.Column("url", sa.String(), nullable=True),
sa.Column("source", sa.String(), nullable=False),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.ForeignKeyConstraint(
["legislator_id"],
[f"{schema}.legislator.id"],
name=op.f("fk_legislator_social_media_legislator_id_legislator"),
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_legislator_social_media")),
schema=schema,
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("legislator_social_media", schema=schema)
# ### end Alembic commands ###

122
alembic/env.py Normal file
View File

@@ -0,0 +1,122 @@
"""Alembic."""
from __future__ import annotations
import logging
import sys
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal
from alembic import context
from alembic.script import write_hooks
from sqlalchemy.schema import CreateSchema
from pipelines.common import bash_wrapper
from pipelines.orm.common import get_postgres_engine
if TYPE_CHECKING:
from collections.abc import MutableMapping
from sqlalchemy.orm import DeclarativeBase
config = context.config
base_class: type[DeclarativeBase] = config.attributes.get("base")
if base_class is None:
error = "No base class provided. Use the database CLI to run alembic commands."
raise RuntimeError(error)
target_metadata = base_class.metadata
logging.basicConfig(
level="DEBUG",
datefmt="%Y-%m-%dT%H:%M:%S%z",
format="%(asctime)s %(levelname)s %(filename)s:%(lineno)d - %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
@write_hooks.register("dynamic_schema")
def dynamic_schema(filename: str, _options: dict[Any, Any]) -> None:
"""Dynamic schema."""
original_file = Path(filename).read_text()
schema_name = base_class.schema_name
dynamic_schema_file_part1 = original_file.replace(f"schema='{schema_name}'", "schema=schema")
dynamic_schema_file = dynamic_schema_file_part1.replace(f"'{schema_name}.", "f'{schema}.")
Path(filename).write_text(dynamic_schema_file)
@write_hooks.register("import_postgresql")
def import_postgresql(filename: str, _options: dict[Any, Any]) -> None:
"""Add postgresql dialect import when postgresql types are used."""
content = Path(filename).read_text()
if "postgresql." in content and "from sqlalchemy.dialects import postgresql" not in content:
content = content.replace(
"import sqlalchemy as sa\n",
"import sqlalchemy as sa\nfrom sqlalchemy.dialects import postgresql\n",
)
Path(filename).write_text(content)
@write_hooks.register("ruff")
def ruff_check_and_format(filename: str, _options: dict[Any, Any]) -> None:
"""Docstring for ruff_check_and_format."""
bash_wrapper(f"ruff check --fix {filename}")
bash_wrapper(f"ruff format {filename}")
def include_name(
name: str | None,
type_: Literal["schema", "table", "column", "index", "unique_constraint", "foreign_key_constraint"],
_parent_names: MutableMapping[Literal["schema_name", "table_name", "schema_qualified_table_name"], str | None],
) -> bool:
"""Filter tables to be included in the migration.
Args:
name (str): The name of the table.
type_ (str): The type of the table.
_parent_names (MutableMapping): The names of the parent tables.
Returns:
bool: True if the table should be included, False otherwise.
"""
if type_ == "schema":
# allows a database with multiple schemas to have separate alembic revisions
return name == target_metadata.schema
return True
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
env_prefix = config.attributes.get("env_prefix", "POSTGRES")
connectable = get_postgres_engine(name=env_prefix)
with connectable.connect() as connection:
schema = base_class.schema_name
if not connectable.dialect.has_schema(connection, schema):
answer = input(f"Schema {schema!r} does not exist. Create it? [y/N] ")
if answer.lower() != "y":
error = f"Schema {schema!r} does not exist. Exiting."
raise SystemExit(error)
connection.execute(CreateSchema(schema))
connection.commit()
context.configure(
connection=connection,
target_metadata=target_metadata,
include_schemas=True,
version_table_schema=schema,
include_name=include_name,
)
with context.begin_transaction():
context.run_migrations()
connection.commit()
run_migrations_online()

36
alembic/script.py.mako Normal file
View File

@@ -0,0 +1,36 @@
"""${message}.
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from python.orm import ${config.attributes["base"].__name__}
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: str | None = ${repr(down_revision)}
branch_labels: str | Sequence[str] | None = ${repr(branch_labels)}
depends_on: str | Sequence[str] | None = ${repr(depends_on)}
schema=${config.attributes["base"].__name__}.schema_name
def upgrade() -> None:
"""Upgrade."""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Downgrade."""
${downgrades if downgrades else "pass"}

123
database_cli.py Normal file
View File

@@ -0,0 +1,123 @@
"""CLI wrapper around alembic for multi-database support.
Usage:
database <db_name> <command> [args...]
Examples:
database van_inventory upgrade head
database van_inventory downgrade head-1
database van_inventory revision --autogenerate -m "add meals table"
database van_inventory check
database richie check
database richie upgrade head
"""
from __future__ import annotations
from dataclasses import dataclass
from importlib import import_module
from typing import TYPE_CHECKING, Annotated
import typer
from alembic.config import CommandLine, Config
if TYPE_CHECKING:
from sqlalchemy.orm import DeclarativeBase
@dataclass(frozen=True)
class DatabaseConfig:
"""Configuration for a database."""
env_prefix: str
version_location: str
base_module: str
base_class_name: str
models_module: str
script_location: str = "python/alembic"
file_template: str = "%%(year)d_%%(month).2d_%%(day).2d-%%(slug)s_%%(rev)s"
def get_base(self) -> type[DeclarativeBase]:
"""Import and return the Base class."""
module = import_module(self.base_module)
return getattr(module, self.base_class_name)
def import_models(self) -> None:
"""Import ORM models so alembic autogenerate can detect them."""
import_module(self.models_module)
def alembic_config(self) -> Config:
"""Build an alembic Config for this database."""
# Runtime import needed — Config is in TYPE_CHECKING for the return type annotation
from alembic.config import Config as AlembicConfig # noqa: PLC0415
cfg = AlembicConfig()
cfg.set_main_option("script_location", self.script_location)
cfg.set_main_option("file_template", self.file_template)
cfg.set_main_option("prepend_sys_path", ".")
cfg.set_main_option("version_path_separator", "os")
cfg.set_main_option("version_locations", self.version_location)
cfg.set_main_option("revision_environment", "true")
cfg.set_section_option(
"post_write_hooks", "hooks", "dynamic_schema,import_postgresql,ruff"
)
cfg.set_section_option(
"post_write_hooks", "dynamic_schema.type", "dynamic_schema"
)
cfg.set_section_option(
"post_write_hooks", "import_postgresql.type", "import_postgresql"
)
cfg.set_section_option("post_write_hooks", "ruff.type", "ruff")
cfg.attributes["base"] = self.get_base()
cfg.attributes["env_prefix"] = self.env_prefix
self.import_models()
return cfg
DATABASES: dict[str, DatabaseConfig] = {
"data_science_dev": DatabaseConfig(
env_prefix="DATA_SCIENCE_DEV",
version_location="python/alembic/data_science_dev/versions",
base_module="python.orm.data_science_dev.base",
base_class_name="DataScienceDevBase",
models_module="python.orm.data_science_dev.models",
),
}
app = typer.Typer(help="Multi-database alembic wrapper.")
@app.command(
context_settings={"allow_extra_args": True, "ignore_unknown_options": True},
)
def main(
ctx: typer.Context,
db_name: Annotated[
str, typer.Argument(help=f"Database name. Options: {', '.join(DATABASES)}")
],
command: Annotated[
str,
typer.Argument(
help="Alembic command (upgrade, downgrade, revision, check, etc.)"
),
],
) -> None:
"""Run an alembic command against the specified database."""
db_config = DATABASES.get(db_name)
if not db_config:
typer.echo(
f"Unknown database: {db_name!r}. Available: {', '.join(DATABASES)}",
err=True,
)
raise typer.Exit(code=1)
alembic_cfg = db_config.alembic_config()
cmd_line = CommandLine()
options = cmd_line.parser.parse_args([command, *ctx.args])
cmd_line.run_cmd(alembic_cfg, options)
if __name__ == "__main__":
app()

View File

22
pyproject.toml Normal file
View File

@@ -0,0 +1,22 @@
[project]
name = "ds-testing-pipelines"
version = "0.1.0"
description = "Data science pipeline tools and legislative dashboard."
requires-python = ">=3.12"
dependencies = [
"fastapi",
"uvicorn[standard]",
"jinja2",
"sqlalchemy",
"psycopg",
]
[project.optional-dependencies]
test = [
"httpx",
"pytest",
]
[tool.pytest.ini_options]
testpaths = ["tests"]
pythonpath = ["."]