From 50e764146a38dfd3970fb4d2fde428ca019c0a84 Mon Sep 17 00:00:00 2001 From: Richie Cahill Date: Tue, 24 Mar 2026 23:47:04 -0400 Subject: [PATCH] added ingest_posts.py --- ..._24-adding_failedingestion_2f43120e3ffc.py | 50 ++++++ python/alembic/env.py | 5 - python/data_science/__init__.py | 3 + python/data_science/ingest_posts.py | 170 ++++++++++++++++++ python/orm/data_science_dev/posts/__init__.py | 2 + .../posts/failed_ingestion.py | 17 ++ 6 files changed, 242 insertions(+), 5 deletions(-) create mode 100644 python/alembic/data_science_dev/versions/2026_03_24-adding_failedingestion_2f43120e3ffc.py create mode 100644 python/data_science/__init__.py create mode 100644 python/data_science/ingest_posts.py create mode 100644 python/orm/data_science_dev/posts/failed_ingestion.py diff --git a/python/alembic/data_science_dev/versions/2026_03_24-adding_failedingestion_2f43120e3ffc.py b/python/alembic/data_science_dev/versions/2026_03_24-adding_failedingestion_2f43120e3ffc.py new file mode 100644 index 0000000..507e5e7 --- /dev/null +++ b/python/alembic/data_science_dev/versions/2026_03_24-adding_failedingestion_2f43120e3ffc.py @@ -0,0 +1,50 @@ +"""adding FailedIngestion. + +Revision ID: 2f43120e3ffc +Revises: f99be864fe69 +Create Date: 2026-03-24 23:46:17.277897 + +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import sqlalchemy as sa +from alembic import op + +from python.orm import DataScienceDevBase + +if TYPE_CHECKING: + from collections.abc import Sequence + +# revision identifiers, used by Alembic. +revision: str = "2f43120e3ffc" +down_revision: str | None = "f99be864fe69" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + +schema = DataScienceDevBase.schema_name + + +def upgrade() -> None: + """Upgrade.""" + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "failed_ingestion", + sa.Column("raw_line", sa.Text(), nullable=False), + sa.Column("error", sa.Text(), nullable=False), + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False), + sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False), + sa.PrimaryKeyConstraint("id", name=op.f("pk_failed_ingestion")), + schema=schema, + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table("failed_ingestion", schema=schema) + # ### end Alembic commands ### diff --git a/python/alembic/env.py b/python/alembic/env.py index 7a6bda7..3055e22 100644 --- a/python/alembic/env.py +++ b/python/alembic/env.py @@ -84,11 +84,6 @@ def include_name( if type_ == "schema": # allows a database with multiple schemas to have separate alembic revisions return name == target_metadata.schema - if type_ == "table": - # Exclude weekly partition tables (e.g. posts_2024_01) from autogenerate. - # They are created via PARTITION OF in migrations. PG propagates schema changes - # from the parent table to all partitions, so only the parent needs ALTER statements. - return name and re.match(r"^posts_\d{4}_\d{2}$", name) return True diff --git a/python/data_science/__init__.py b/python/data_science/__init__.py new file mode 100644 index 0000000..5ee7ea8 --- /dev/null +++ b/python/data_science/__init__.py @@ -0,0 +1,3 @@ +"""Data science CLI tools.""" + +from __future__ import annotations diff --git a/python/data_science/ingest_posts.py b/python/data_science/ingest_posts.py new file mode 100644 index 0000000..e51f7e4 --- /dev/null +++ b/python/data_science/ingest_posts.py @@ -0,0 +1,170 @@ +"""Ingestion pipeline for loading JSONL post files into the weekly-partitioned posts table. + +Usage: + ingest-posts /path/to/files/ + ingest-posts /path/to/single_file.jsonl + ingest-posts /data/dir/ --workers 4 --batch-size 5000 +""" + +from __future__ import annotations + +import json +import logging +from datetime import UTC, datetime +from pathlib import Path +from typing import TYPE_CHECKING, Annotated + +import typer +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.orm import Session + +from python.orm.common import get_postgres_engine +from python.orm.data_science_dev.posts.failed_ingestion import FailedIngestion +from python.orm.data_science_dev.posts.tables import Posts +from python.parallelize import parallelize_process + +if TYPE_CHECKING: + from collections.abc import Iterator + +logger = logging.getLogger(__name__) + + +app = typer.Typer(help="Ingest JSONL post files into the partitioned posts table.") + + +@app.command() +def main( + path: Annotated[Path, typer.Argument(help="Directory containing JSONL files, or a single JSONL file")], + batch_size: Annotated[int, typer.Option(help="Rows per INSERT batch")] = 10000, + workers: Annotated[int, typer.Option(help="Parallel workers for multi-file ingestion")] = 4, + pattern: Annotated[str, typer.Option(help="Glob pattern for JSONL files")] = "*.jsonl", +) -> None: + """Ingest JSONL post files into the weekly-partitioned posts table.""" + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", + datefmt="%H:%M:%S", + ) + + if path.is_file(): + ingest_file(str(path), batch_size=batch_size) + elif path.is_dir(): + ingest_directory(path, batch_size=batch_size, max_workers=workers, pattern=pattern) + else: + typer.echo(f"Path does not exist: {path}", err=True) + raise typer.Exit(code=1) + + +def ingest_directory( + directory: Path, + *, + batch_size: int, + max_workers: int, + pattern: str = "*.jsonl", +) -> int: + """Ingest all JSONL files in a directory using parallel workers.""" + files = sorted(directory.glob(pattern)) + if not files: + logger.warning("No JSONL files found in %s", directory) + return 0 + + logger.info("Found %d JSONL files to ingest", len(files)) + + file_paths = [str(file) for file in files] + total_rows = 0 + + kwargs_list = [{"file_path": fp, "batch_size": batch_size} for fp in file_paths] + executor_results = parallelize_process(ingest_file, kwargs_list, max_workers=max_workers) + total_rows = sum(executor_results.results) + + logger.info("Ingestion complete — %d total rows across %d files", total_rows, len(files)) + return total_rows + + +def ingest_file(file_path: str, *, batch_size: int) -> int: + """Ingest a single JSONL file into the posts table. Returns total rows inserted.""" + path = Path(file_path) + engine = get_postgres_engine(name="DATA_SCIENCE_DEV") + total_rows = 0 + + with Session(engine) as session: + for batch in read_jsonl_batches(path, batch_size): + inserted = _ingest_batch(session, batch) + total_rows += inserted + logger.info(" %s: inserted %d rows (total: %d)", path.name, inserted, total_rows) + + logger.info("Finished %s — %d rows", path.name, total_rows) + return total_rows + + +def _ingest_batch(session: Session, batch: list[dict]) -> int: + """Try bulk insert; on failure, binary-split to isolate bad rows.""" + if not batch: + return 0 + + try: + statement = insert(Posts).values(batch).on_conflict_do_nothing(index_elements=["post_id"]) + result = session.execute(statement) + session.commit() + except (OSError, SQLAlchemyError) as error: + session.rollback() + + if len(batch) == 1: + logger.exception("Skipping bad row post_id=%s", batch[0].get("post_id")) + session.add( + FailedIngestion( + raw_line=json.dumps(batch[0], default=str), + error=str(error), + ) + ) + session.commit() + return 0 + + midpoint = len(batch) // 2 + left = _ingest_batch(session, batch[:midpoint]) + right = _ingest_batch(session, batch[midpoint:]) + return left + right + else: + return result.rowcount + + +def read_jsonl_batches(file_path: Path, batch_size: int) -> Iterator[list[dict]]: + """Stream a JSONL file and yield batches of transformed rows.""" + batch: list[dict] = [] + with file_path.open("r", encoding="utf-8") as handle: + for raw_line in handle: + line = raw_line.strip() + if not line: + continue + row = transform_row(json.loads(line)) + batch.append(row) + if len(batch) >= batch_size: + yield batch + batch = [] + if batch: + yield batch + + +def transform_row(raw: dict) -> dict: + """Transform a raw JSONL row into a dict matching the Posts table columns.""" + raw["date"] = parse_date(raw["date"]) + if raw.get("langs") is not None: + raw["langs"] = json.dumps(raw["langs"]) + return raw + + +def parse_date(raw_date: int) -> datetime: + """Parse compact YYYYMMDDHHmm integer into a naive datetime (input is UTC by spec).""" + return datetime( + raw_date // 100000000, + (raw_date // 1000000) % 100, + (raw_date // 10000) % 100, + (raw_date // 100) % 100, + raw_date % 100, + tzinfo=UTC, + ) + + +if __name__ == "__main__": + app() diff --git a/python/orm/data_science_dev/posts/__init__.py b/python/orm/data_science_dev/posts/__init__.py index 0b3debc..51256d7 100644 --- a/python/orm/data_science_dev/posts/__init__.py +++ b/python/orm/data_science_dev/posts/__init__.py @@ -2,8 +2,10 @@ from __future__ import annotations +from python.orm.data_science_dev.posts.failed_ingestion import FailedIngestion from python.orm.data_science_dev.posts.tables import Posts __all__ = [ + "FailedIngestion", "Posts", ] diff --git a/python/orm/data_science_dev/posts/failed_ingestion.py b/python/orm/data_science_dev/posts/failed_ingestion.py new file mode 100644 index 0000000..57e7501 --- /dev/null +++ b/python/orm/data_science_dev/posts/failed_ingestion.py @@ -0,0 +1,17 @@ +"""Table for storing JSONL lines that failed during post ingestion.""" + +from __future__ import annotations + +from sqlalchemy import Text +from sqlalchemy.orm import Mapped, mapped_column + +from python.orm.data_science_dev.base import DataScienceDevTableBase + + +class FailedIngestion(DataScienceDevTableBase): + """Stores raw JSONL lines and their error messages when ingestion fails.""" + + __tablename__ = "failed_ingestion" + + raw_line: Mapped[str] = mapped_column(Text) + error: Mapped[str] = mapped_column(Text)