"""Parse HAProxy log lines and persist them to the database. Ingestion is idempotent: each row carries a ``line_hash`` (a unique column), and this module skips lines whose hash already exists, so the same logs can be re-ingested without creating duplicate records. """ from __future__ import annotations from dataclasses import dataclass from typing import TYPE_CHECKING from sqlalchemy import select from python.haproxy_logs.parser import parse_line from python.orm.richie.haproxy import HaproxyRequest if TYPE_CHECKING: from collections.abc import Iterable from sqlalchemy.orm import Session @dataclass(frozen=True) class IngestResult: """Summary counts for an ingest run.""" inserted: int skipped: int duplicates: int def ingest_lines(lines: Iterable[str], session: Session, *, batch_size: int = 100) -> IngestResult: """Parse log lines and insert new ones into the database in batches. Lines already present (same ``line_hash``) are dropped, as are duplicate lines within the same run. Unparseable non-blank lines are counted as ``skipped`` rather than raising, so a stray startup message never aborts a streaming ingest. Args: lines: Iterable of raw log lines (with or without a journald prefix). session: Database session to insert into. batch_size: Number of distinct rows to buffer before committing. Use ``1`` when tailing a live log so rows land immediately. Returns: Counts of inserted, skipped (unparseable) and duplicate lines. """ inserted = 0 skipped = 0 parsed_count = 0 batch: dict[str, HaproxyRequest] = {} for line in lines: parsed = parse_line(line) if parsed is None: if line.strip(): skipped += 1 continue parsed_count += 1 batch[parsed["line_hash"]] = HaproxyRequest(**parsed) if len(batch) >= batch_size: inserted += _flush(batch, session) inserted += _flush(batch, session) return IngestResult(inserted=inserted, skipped=skipped, duplicates=parsed_count - inserted) def _flush(batch: dict[str, HaproxyRequest], session: Session) -> int: """Insert the rows whose hash is not already stored, then clear the batch. Returns the number of rows actually written. """ if not batch: return 0 existing = set( session.scalars(select(HaproxyRequest.line_hash).where(HaproxyRequest.line_hash.in_(batch.keys()))), ) new_rows = [row for line_hash, row in batch.items() if line_hash not in existing] batch.clear() if not new_rows: return 0 session.add_all(new_rows) session.commit() return len(new_rows)