making more generic exception handling

This commit is contained in:
2026-03-26 08:07:39 -04:00
parent 345ba94a59
commit e40ab757ca
2 changed files with 97 additions and 14 deletions

View File

@@ -126,18 +126,21 @@ def ingest_file(path: Path, *, batch_size: int) -> None:
"""Ingest a single JSONL file into the posts table."""
log_trigger = max(100_000 // batch_size, 1)
failed_lines: list[dict] = []
try:
with get_psycopg_connection() as connection:
for index, batch in enumerate(read_jsonl_batches(path, batch_size, failed_lines), 1):
ingest_batch(connection, batch)
if index % log_trigger == 0:
logger.info("Ingested %d batches (%d rows) from %s", index, index * batch_size, path)
with get_psycopg_connection() as connection:
for index, batch in enumerate(read_jsonl_batches(path, batch_size, failed_lines), 1):
ingest_batch(connection, batch)
if index % log_trigger == 0:
logger.info("Ingested %d batches (%d rows) from %s", index, index * batch_size, path)
if failed_lines:
logger.warning("Recording %d malformed lines from %s", len(failed_lines), path.name)
with connection.cursor() as cursor:
cursor.executemany(FAILED_INSERT, failed_lines)
connection.commit()
if failed_lines:
logger.warning("Recording %d malformed lines from %s", len(failed_lines), path.name)
with connection.cursor() as cursor:
cursor.executemany(FAILED_INSERT, failed_lines)
connection.commit()
except Exception:
logger.exception("Failed to ingest file: %s", path)
raise
def ingest_batch(connection: psycopg.Connection, batch: list[dict]) -> None:
@@ -160,7 +163,7 @@ def ingest_batch(connection: psycopg.Connection, batch: list[dict]) -> None:
cursor.execute(INSERT_FROM_STAGING)
connection.commit()
except (OSError, psycopg.Error) as error:
except Exception as error:
connection.rollback()
if len(batch) == 1:
@@ -213,8 +216,8 @@ def parse_line(line: str, file_path: Path, failed_lines: list[dict]) -> Iterator
except (orjson.JSONDecodeError, KeyError, ValueError) as error:
logger.warning("Skipping malformed fragment in %s: %s", file_path.name, fragment[:120])
failed_lines.append({"raw_line": fragment, "error": str(error)})
except (KeyError, ValueError) as error:
logger.warning("Skipping bad row in %s: %s", file_path.name, line[:120])
except Exception as error:
logger.exception("Skipping bad row in %s: %s", file_path.name, line[:120])
failed_lines.append({"raw_line": line, "error": str(error)})