From e40ab757cad77b8177dd0517d9e1cf0ab41ae9b4 Mon Sep 17 00:00:00 2001 From: Richie Cahill Date: Thu, 26 Mar 2026 08:07:39 -0400 Subject: [PATCH] making more generic exception handling --- .../2026_03_25-attach_partitions_to_posts.py | 80 +++++++++++++++++++ python/data_science/ingest_posts.py | 31 +++---- 2 files changed, 97 insertions(+), 14 deletions(-) create mode 100644 python/alembic/data_science_dev/versions/2026_03_25-attach_partitions_to_posts.py diff --git a/python/alembic/data_science_dev/versions/2026_03_25-attach_partitions_to_posts.py b/python/alembic/data_science_dev/versions/2026_03_25-attach_partitions_to_posts.py new file mode 100644 index 0000000..a5b44da --- /dev/null +++ b/python/alembic/data_science_dev/versions/2026_03_25-attach_partitions_to_posts.py @@ -0,0 +1,80 @@ +"""Attach all partition tables to the posts parent table. + +Alembic autogenerate creates partition tables as standalone tables but does not +emit the ALTER TABLE ... ATTACH PARTITION statements needed for PostgreSQL to +route inserts to the correct partition. + +Revision ID: a1b2c3d4e5f6 +Revises: 605b1794838f +Create Date: 2026-03-25 10:00:00.000000 + +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from alembic import op +from sqlalchemy import text + +from python.orm import DataScienceDevBase +from python.orm.data_science_dev.posts.partitions import ( + PARTITION_END_YEAR, + PARTITION_START_YEAR, + iso_weeks_in_year, + week_bounds, +) + +if TYPE_CHECKING: + from collections.abc import Sequence + +# revision identifiers, used by Alembic. +revision: str = "a1b2c3d4e5f6" +down_revision: str | None = "605b1794838f" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + +schema = DataScienceDevBase.schema_name + +ALREADY_ATTACHED_QUERY = text(""" + SELECT inhrelid::regclass::text + FROM pg_inherits + WHERE inhparent = :parent::regclass +""") + + +def upgrade() -> None: + """Attach all weekly partition tables to the posts parent table.""" + connection = op.get_bind() + already_attached = { + row[0] + for row in connection.execute( + ALREADY_ATTACHED_QUERY, {"parent": f"{schema}.posts"} + ) + } + + for year in range(PARTITION_START_YEAR, PARTITION_END_YEAR + 1): + for week in range(1, iso_weeks_in_year(year) + 1): + table_name = f"posts_{year}_{week:02d}" + qualified_name = f"{schema}.{table_name}" + if qualified_name in already_attached: + continue + start, end = week_bounds(year, week) + start_str = start.strftime("%Y-%m-%d %H:%M:%S") + end_str = end.strftime("%Y-%m-%d %H:%M:%S") + op.execute( + f"ALTER TABLE {schema}.posts " + f"ATTACH PARTITION {qualified_name} " + f"FOR VALUES FROM ('{start_str}') TO ('{end_str}')" + ) + + +def downgrade() -> None: + """Detach all weekly partition tables from the posts parent table.""" + for year in range(PARTITION_START_YEAR, PARTITION_END_YEAR + 1): + for week in range(1, iso_weeks_in_year(year) + 1): + table_name = f"posts_{year}_{week:02d}" + op.execute( + f"ALTER TABLE {schema}.posts " + f"DETACH PARTITION {schema}.{table_name}" + ) diff --git a/python/data_science/ingest_posts.py b/python/data_science/ingest_posts.py index c43adf5..82c26c9 100644 --- a/python/data_science/ingest_posts.py +++ b/python/data_science/ingest_posts.py @@ -126,18 +126,21 @@ def ingest_file(path: Path, *, batch_size: int) -> None: """Ingest a single JSONL file into the posts table.""" log_trigger = max(100_000 // batch_size, 1) failed_lines: list[dict] = [] + try: + with get_psycopg_connection() as connection: + for index, batch in enumerate(read_jsonl_batches(path, batch_size, failed_lines), 1): + ingest_batch(connection, batch) + if index % log_trigger == 0: + logger.info("Ingested %d batches (%d rows) from %s", index, index * batch_size, path) - with get_psycopg_connection() as connection: - for index, batch in enumerate(read_jsonl_batches(path, batch_size, failed_lines), 1): - ingest_batch(connection, batch) - if index % log_trigger == 0: - logger.info("Ingested %d batches (%d rows) from %s", index, index * batch_size, path) - - if failed_lines: - logger.warning("Recording %d malformed lines from %s", len(failed_lines), path.name) - with connection.cursor() as cursor: - cursor.executemany(FAILED_INSERT, failed_lines) - connection.commit() + if failed_lines: + logger.warning("Recording %d malformed lines from %s", len(failed_lines), path.name) + with connection.cursor() as cursor: + cursor.executemany(FAILED_INSERT, failed_lines) + connection.commit() + except Exception: + logger.exception("Failed to ingest file: %s", path) + raise def ingest_batch(connection: psycopg.Connection, batch: list[dict]) -> None: @@ -160,7 +163,7 @@ def ingest_batch(connection: psycopg.Connection, batch: list[dict]) -> None: cursor.execute(INSERT_FROM_STAGING) connection.commit() - except (OSError, psycopg.Error) as error: + except Exception as error: connection.rollback() if len(batch) == 1: @@ -213,8 +216,8 @@ def parse_line(line: str, file_path: Path, failed_lines: list[dict]) -> Iterator except (orjson.JSONDecodeError, KeyError, ValueError) as error: logger.warning("Skipping malformed fragment in %s: %s", file_path.name, fragment[:120]) failed_lines.append({"raw_line": fragment, "error": str(error)}) - except (KeyError, ValueError) as error: - logger.warning("Skipping bad row in %s: %s", file_path.name, line[:120]) + except Exception as error: + logger.exception("Skipping bad row in %s: %s", file_path.name, line[:120]) failed_lines.append({"raw_line": line, "error": str(error)})