Files
dotfiles/python/haproxy_logs/ingest.py
T
Richie 1d1bafbd30 feat(haproxy-logs): ingest HAProxy request logs into Richie DB
Add a pipeline to load HAProxy `option httplog` lines into the Richie
database so bot/crawler traffic can be analyzed.

- model: HaproxyRequest mirroring the httplog format, with a unique
  line_hash dedup key and indexes on common filter columns
- migration: create the haproxy_request table (unique line_hash + indexes)
- haproxy_logs package:
  - parser: httplog line -> columns, strips the journald prefix and
    hashes the normalized line
  - ingest: batched, idempotent insert that skips rows whose line_hash
    already exists, so re-ingesting the same logs is a no-op
  - cli: ingest-only `haproxy-logs` command reading stdin or a file
- tests: parsing of a real GPTBot line and idempotent re-ingestion
2026-06-23 21:13:20 -04:00

89 lines
2.7 KiB
Python

"""Parse HAProxy log lines and persist them to the database.
Ingestion is idempotent: each row carries a ``line_hash`` (a unique column), and
this module skips lines whose hash already exists, so the same logs can be
re-ingested without creating duplicate records.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING
from sqlalchemy import select
from python.haproxy_logs.parser import parse_line
from python.orm.richie.haproxy import HaproxyRequest
if TYPE_CHECKING:
from collections.abc import Iterable
from sqlalchemy.orm import Session
@dataclass(frozen=True)
class IngestResult:
"""Summary counts for an ingest run."""
inserted: int
skipped: int
duplicates: int
def ingest_lines(lines: Iterable[str], session: Session, *, batch_size: int = 100) -> IngestResult:
"""Parse log lines and insert new ones into the database in batches.
Lines already present (same ``line_hash``) are dropped, as are duplicate
lines within the same run. Unparseable non-blank lines are counted as
``skipped`` rather than raising, so a stray startup message never aborts a
streaming ingest.
Args:
lines: Iterable of raw log lines (with or without a journald prefix).
session: Database session to insert into.
batch_size: Number of distinct rows to buffer before committing. Use
``1`` when tailing a live log so rows land immediately.
Returns:
Counts of inserted, skipped (unparseable) and duplicate lines.
"""
inserted = 0
skipped = 0
parsed_count = 0
batch: dict[str, HaproxyRequest] = {}
for line in lines:
parsed = parse_line(line)
if parsed is None:
if line.strip():
skipped += 1
continue
parsed_count += 1
batch[parsed["line_hash"]] = HaproxyRequest(**parsed)
if len(batch) >= batch_size:
inserted += _flush(batch, session)
inserted += _flush(batch, session)
return IngestResult(inserted=inserted, skipped=skipped, duplicates=parsed_count - inserted)
def _flush(batch: dict[str, HaproxyRequest], session: Session) -> int:
"""Insert the rows whose hash is not already stored, then clear the batch.
Returns the number of rows actually written.
"""
if not batch:
return 0
existing = set(
session.scalars(select(HaproxyRequest.line_hash).where(HaproxyRequest.line_hash.in_(batch.keys()))),
)
new_rows = [row for line_hash, row in batch.items() if line_hash not in existing]
batch.clear()
if not new_rows:
return 0
session.add_all(new_rows)
session.commit()
return len(new_rows)