From 1d1bafbd307f626a8b56c7fad2a1331021b4920b Mon Sep 17 00:00:00 2001 From: Richie Cahill Date: Tue, 23 Jun 2026 21:13:20 -0400 Subject: [PATCH] feat(haproxy-logs): ingest HAProxy request logs into Richie DB Add a pipeline to load HAProxy `option httplog` lines into the Richie database so bot/crawler traffic can be analyzed. - model: HaproxyRequest mirroring the httplog format, with a unique line_hash dedup key and indexes on common filter columns - migration: create the haproxy_request table (unique line_hash + indexes) - haproxy_logs package: - parser: httplog line -> columns, strips the journald prefix and hashes the normalized line - ingest: batched, idempotent insert that skips rows whose line_hash already exists, so re-ingesting the same logs is a no-op - cli: ingest-only `haproxy-logs` command reading stdin or a file - tests: parsing of a real GPTBot line and idempotent re-ingestion --- ..._06_23-adding_haproxy_data_96d72c748c24.py | 103 ++++++++++++++ python/haproxy_logs/__init__.py | 1 + python/haproxy_logs/cli.py | 54 +++++++ python/haproxy_logs/ingest.py | 88 ++++++++++++ python/haproxy_logs/parser.py | 132 ++++++++++++++++++ python/orm/richie/haproxy.py | 71 ++++++++++ tests/test_haproxy_logs.py | 127 +++++++++++++++++ 7 files changed, 576 insertions(+) create mode 100644 python/alembic/richie/versions/2026_06_23-adding_haproxy_data_96d72c748c24.py create mode 100644 python/haproxy_logs/__init__.py create mode 100644 python/haproxy_logs/cli.py create mode 100644 python/haproxy_logs/ingest.py create mode 100644 python/haproxy_logs/parser.py create mode 100644 python/orm/richie/haproxy.py create mode 100644 tests/test_haproxy_logs.py diff --git a/python/alembic/richie/versions/2026_06_23-adding_haproxy_data_96d72c748c24.py b/python/alembic/richie/versions/2026_06_23-adding_haproxy_data_96d72c748c24.py new file mode 100644 index 0000000..78ea272 --- /dev/null +++ b/python/alembic/richie/versions/2026_06_23-adding_haproxy_data_96d72c748c24.py @@ -0,0 +1,103 @@ +"""adding haproxy data. + +Revision ID: 96d72c748c24 +Revises: c460105682d2 +Create Date: 2026-06-23 16:37:17.768851 + +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import sqlalchemy as sa +from alembic import op + +from python.orm import RichieBase + +if TYPE_CHECKING: + from collections.abc import Sequence + +# revision identifiers, used by Alembic. +revision: str = "96d72c748c24" +down_revision: str | None = "c460105682d2" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + +schema = RichieBase.schema_name + + +def upgrade() -> None: + """Upgrade.""" + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "haproxy_request", + sa.Column("line_hash", sa.String(), nullable=False), + sa.Column("requested_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("client_ip", sa.String(), nullable=False), + sa.Column("client_port", sa.Integer(), nullable=False), + sa.Column("frontend", sa.String(), nullable=False), + sa.Column("ssl", sa.Boolean(), nullable=False), + sa.Column("backend", sa.String(), nullable=False), + sa.Column("server", sa.String(), nullable=False), + sa.Column("time_request", sa.Integer(), nullable=False), + sa.Column("time_queue", sa.Integer(), nullable=False), + sa.Column("time_connect", sa.Integer(), nullable=False), + sa.Column("time_response", sa.Integer(), nullable=False), + sa.Column("time_total", sa.Integer(), nullable=False), + sa.Column("status_code", sa.Integer(), nullable=False), + sa.Column("bytes_read", sa.BigInteger(), nullable=False), + sa.Column("termination_state", sa.String(), nullable=False), + sa.Column("active_connections", sa.Integer(), nullable=False), + sa.Column("frontend_connections", sa.Integer(), nullable=False), + sa.Column("backend_connections", sa.Integer(), nullable=False), + sa.Column("server_connections", sa.Integer(), nullable=False), + sa.Column("retries", sa.Integer(), nullable=False), + sa.Column("server_queue", sa.Integer(), nullable=False), + sa.Column("backend_queue", sa.Integer(), nullable=False), + sa.Column("host", sa.String(), nullable=True), + sa.Column("user_agent", sa.String(), nullable=True), + sa.Column("method", sa.String(), nullable=False), + sa.Column("target", sa.String(), nullable=False), + sa.Column("path", sa.String(), nullable=False), + sa.Column("query", sa.String(), nullable=True), + sa.Column("http_version", sa.String(), nullable=False), + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False), + sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False), + sa.PrimaryKeyConstraint("id", name=op.f("pk_haproxy_request")), + sa.UniqueConstraint("line_hash", name=op.f("uq_haproxy_request_line_hash")), + schema=schema, + ) + op.create_index(op.f("ix_haproxy_request_backend"), "haproxy_request", ["backend"], unique=False, schema=schema) + op.create_index(op.f("ix_haproxy_request_client_ip"), "haproxy_request", ["client_ip"], unique=False, schema=schema) + op.create_index(op.f("ix_haproxy_request_host"), "haproxy_request", ["host"], unique=False, schema=schema) + op.create_index(op.f("ix_haproxy_request_path"), "haproxy_request", ["path"], unique=False, schema=schema) + op.create_index( + op.f("ix_haproxy_request_requested_at"), "haproxy_request", ["requested_at"], unique=False, schema=schema + ) + op.create_index( + op.f("ix_haproxy_request_status_code"), "haproxy_request", ["status_code"], unique=False, schema=schema + ) + op.create_index( + op.f("ix_haproxy_request_time_response"), "haproxy_request", ["time_response"], unique=False, schema=schema + ) + op.create_index( + op.f("ix_haproxy_request_user_agent"), "haproxy_request", ["user_agent"], unique=False, schema=schema + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f("ix_haproxy_request_user_agent"), table_name="haproxy_request", schema=schema) + op.drop_index(op.f("ix_haproxy_request_time_response"), table_name="haproxy_request", schema=schema) + op.drop_index(op.f("ix_haproxy_request_status_code"), table_name="haproxy_request", schema=schema) + op.drop_index(op.f("ix_haproxy_request_requested_at"), table_name="haproxy_request", schema=schema) + op.drop_index(op.f("ix_haproxy_request_path"), table_name="haproxy_request", schema=schema) + op.drop_index(op.f("ix_haproxy_request_host"), table_name="haproxy_request", schema=schema) + op.drop_index(op.f("ix_haproxy_request_client_ip"), table_name="haproxy_request", schema=schema) + op.drop_index(op.f("ix_haproxy_request_backend"), table_name="haproxy_request", schema=schema) + op.drop_table("haproxy_request", schema=schema) + # ### end Alembic commands ### diff --git a/python/haproxy_logs/__init__.py b/python/haproxy_logs/__init__.py new file mode 100644 index 0000000..5ff71f3 --- /dev/null +++ b/python/haproxy_logs/__init__.py @@ -0,0 +1 @@ +"""Load HAProxy ``option httplog`` lines into SQLite and query them.""" diff --git a/python/haproxy_logs/cli.py b/python/haproxy_logs/cli.py new file mode 100644 index 0000000..59607fb --- /dev/null +++ b/python/haproxy_logs/cli.py @@ -0,0 +1,54 @@ +"""Command-line interface: load HAProxy logs into the Richie database. + +The table schema is managed with alembic (``database richie upgrade head``); this +command only inserts rows. + +Examples: + # stream the live log into the database (commit every line) + journalctl -u haproxy -o cat -f | haproxy-logs ingest --batch-size 1 + + # backfill from a saved log file + haproxy-logs ingest --file /tmp/haproxy.log +""" + +from __future__ import annotations + +import sys +from pathlib import Path +from typing import TYPE_CHECKING, Annotated + +import typer +from sqlalchemy.orm import Session + +from python.haproxy_logs.ingest import ingest_lines +from python.orm.common import get_postgres_engine + +if TYPE_CHECKING: + from collections.abc import Iterable + +app = typer.Typer(help="Load HAProxy logs into the Richie database.", no_args_is_help=True) + + +@app.command() +def ingest( + file: Annotated[str | None, typer.Option(help="Read lines from a file instead of stdin.")] = None, + batch_size: Annotated[int, typer.Option(help="Rows per commit; use 1 when tailing a live log.")] = 100, +) -> None: + """Parse HAProxy log lines from stdin (or a file) and store them in the Richie DB.""" + engine = get_postgres_engine(name="RICHIE") + with Session(engine) as session: + result = ingest_lines(_read_lines(file), session, batch_size=batch_size) + typer.echo(f"inserted={result.inserted} duplicates={result.duplicates} skipped={result.skipped}") + + +def _read_lines(file: str | None) -> Iterable[str]: + """Yield log lines from a file, or from stdin when no file is given.""" + if file is None: + yield from sys.stdin + return + with Path(file).open(encoding="utf-8", errors="replace") as handle: + yield from handle + + +if __name__ == "__main__": + app() diff --git a/python/haproxy_logs/ingest.py b/python/haproxy_logs/ingest.py new file mode 100644 index 0000000..a50520a --- /dev/null +++ b/python/haproxy_logs/ingest.py @@ -0,0 +1,88 @@ +"""Parse HAProxy log lines and persist them to the database. + +Ingestion is idempotent: each row carries a ``line_hash`` (a unique column), and +this module skips lines whose hash already exists, so the same logs can be +re-ingested without creating duplicate records. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +from sqlalchemy import select + +from python.haproxy_logs.parser import parse_line +from python.orm.richie.haproxy import HaproxyRequest + +if TYPE_CHECKING: + from collections.abc import Iterable + + from sqlalchemy.orm import Session + + +@dataclass(frozen=True) +class IngestResult: + """Summary counts for an ingest run.""" + + inserted: int + skipped: int + duplicates: int + + +def ingest_lines(lines: Iterable[str], session: Session, *, batch_size: int = 100) -> IngestResult: + """Parse log lines and insert new ones into the database in batches. + + Lines already present (same ``line_hash``) are dropped, as are duplicate + lines within the same run. Unparseable non-blank lines are counted as + ``skipped`` rather than raising, so a stray startup message never aborts a + streaming ingest. + + Args: + lines: Iterable of raw log lines (with or without a journald prefix). + session: Database session to insert into. + batch_size: Number of distinct rows to buffer before committing. Use + ``1`` when tailing a live log so rows land immediately. + + Returns: + Counts of inserted, skipped (unparseable) and duplicate lines. + """ + inserted = 0 + skipped = 0 + parsed_count = 0 + batch: dict[str, HaproxyRequest] = {} + + for line in lines: + parsed = parse_line(line) + if parsed is None: + if line.strip(): + skipped += 1 + continue + parsed_count += 1 + batch[parsed["line_hash"]] = HaproxyRequest(**parsed) + if len(batch) >= batch_size: + inserted += _flush(batch, session) + + inserted += _flush(batch, session) + return IngestResult(inserted=inserted, skipped=skipped, duplicates=parsed_count - inserted) + + +def _flush(batch: dict[str, HaproxyRequest], session: Session) -> int: + """Insert the rows whose hash is not already stored, then clear the batch. + + Returns the number of rows actually written. + """ + if not batch: + return 0 + + existing = set( + session.scalars(select(HaproxyRequest.line_hash).where(HaproxyRequest.line_hash.in_(batch.keys()))), + ) + new_rows = [row for line_hash, row in batch.items() if line_hash not in existing] + batch.clear() + + if not new_rows: + return 0 + session.add_all(new_rows) + session.commit() + return len(new_rows) diff --git a/python/haproxy_logs/parser.py b/python/haproxy_logs/parser.py new file mode 100644 index 0000000..3b3dbd8 --- /dev/null +++ b/python/haproxy_logs/parser.py @@ -0,0 +1,132 @@ +"""Parse HAProxy ``option httplog`` lines into column mappings. + +The expected format (with the request-header capture this project configures) is:: + + : [] / + //// + //// / + {|} " " + +Lines may still carry a systemd-journal prefix (``... haproxy[123]: ``); it is +stripped before parsing. Lines that are not request logs (startup messages, +health checks, ...) return ``None``. +""" + +from __future__ import annotations + +import hashlib +import re +from datetime import datetime +from typing import Any + +# Strips an optional journal prefix such as "Jun 22 20:17:46 jeeves haproxy[688739]: ". +_JOURNAL_PREFIX = re.compile(r"^.*?haproxy(?:\[\d+\])?:\s+") + +_LOG_LINE = re.compile( + r""" + ^ + (?P[0-9a-fA-F:.]+):(?P\d+)\s+ + \[(?P[^\]]+)\]\s+ + (?P\S+)\s+ + (?P[^/\s]+)/(?P\S+)\s+ + (?P-?\d+)/(?P-?\d+)/(?P-?\d+)/ + (?P-?\d+)/(?P-?\d+)\s+ + (?P-?\d+)\s+ + (?P\d+)\s+ + \S+\s+\S+\s+ # captured request/response cookies + (?P\S+)\s+ + (?P\d+)/(?P\d+)/ + (?P\d+)/(?P\d+)/(?P\d+)\s+ + (?P\d+)/(?P\d+)\s+ + (?P(?:\{[^}]*\}\s+)*) + "(?P[^"]*)" + \s*$ + """, + re.VERBOSE, +) + +_ACCEPT_DATE_FORMAT = "%d/%b/%Y:%H:%M:%S.%f" +_HEADER_CAPTURE = re.compile(r"\{([^}]*)\}") + + +def parse_line(line: str) -> dict[str, Any] | None: + """Parse one HAProxy http-log line into a ``HaproxyRequest`` column mapping. + + Args: + line: A raw log line, optionally still carrying a journald prefix. + + Returns: + A mapping of column name to value, or ``None`` if the line is not a + request log (blank lines, startup messages, aborted health checks, ...). + """ + stripped = _JOURNAL_PREFIX.sub("", line.strip()) + match = _LOG_LINE.match(stripped) + if match is None: + return None + + groups = match.groupdict() + frontend = groups["frontend"] + is_ssl = frontend.endswith("~") + host, user_agent = _split_request_headers(groups["captures"]) + method, target, http_version = _split_request(groups["request"]) + path, _, query = target.partition("?") + + return { + "line_hash": hashlib.sha256(stripped.encode("utf-8")).hexdigest(), + "requested_at": _parse_accept_date(groups["accept_date"]), + "client_ip": groups["client_ip"], + "client_port": int(groups["client_port"]), + "frontend": frontend.rstrip("~"), + "ssl": is_ssl, + "backend": groups["backend"], + "server": groups["server"], + "time_request": int(groups["time_request"]), + "time_queue": int(groups["time_queue"]), + "time_connect": int(groups["time_connect"]), + "time_response": int(groups["time_response"]), + "time_total": int(groups["time_total"]), + "status_code": int(groups["status_code"]), + "bytes_read": int(groups["bytes_read"]), + "termination_state": groups["termination_state"], + "active_connections": int(groups["active_connections"]), + "frontend_connections": int(groups["frontend_connections"]), + "backend_connections": int(groups["backend_connections"]), + "server_connections": int(groups["server_connections"]), + "retries": int(groups["retries"]), + "server_queue": int(groups["server_queue"]), + "backend_queue": int(groups["backend_queue"]), + "host": host, + "user_agent": user_agent, + "method": method, + "target": target, + "path": path, + "query": query or None, + "http_version": http_version, + } + + +def _parse_accept_date(value: str) -> datetime: + """Parse HAProxy's accept date and attach the host's local timezone.""" + # HAProxy logs naive local wall-clock time; astimezone() interprets it as the + # host's local zone and returns a timezone-aware datetime. + return datetime.strptime(value, _ACCEPT_DATE_FORMAT).astimezone() + + +def _split_request_headers(captures: str) -> tuple[str | None, str | None]: + """Pull the Host and User-Agent out of the first ``{host|user-agent}`` capture.""" + match = _HEADER_CAPTURE.search(captures) + if match is None: + return None, None + host, _, user_agent = match.group(1).partition("|") + return (host or None), (user_agent or None) + + +def _split_request(request: str) -> tuple[str, str, str]: + """Split a request line into method, target and HTTP version. + + Tolerates malformed values such as ```` by returning empty strings + for the missing parts. + """ + method, _, remainder = request.partition(" ") + target, _, http_version = remainder.partition(" ") + return method, target, http_version diff --git a/python/orm/richie/haproxy.py b/python/orm/richie/haproxy.py new file mode 100644 index 0000000..17b7570 --- /dev/null +++ b/python/orm/richie/haproxy.py @@ -0,0 +1,71 @@ +"""HAProxy request-log ORM model. + +The columns mirror HAProxy's default ``option httplog`` format. The table lives in +the Richie database; manage its schema with the usual alembic workflow:: + + database richie revision --autogenerate -m "add haproxy_request table" + database richie upgrade head +""" + +from __future__ import annotations + +from datetime import datetime + +from sqlalchemy import BigInteger, DateTime +from sqlalchemy.orm import Mapped, mapped_column + +from python.orm.richie.base import TableBase + + +class HaproxyRequest(TableBase): + """A single HAProxy HTTP request log line. + + Timer fields (``time_*``) are milliseconds and may be ``-1`` when a phase did + not complete, for example a client that disconnected before the response was + fully sent. + """ + + __tablename__ = "haproxy_request" + + # SHA-256 of the normalized log line. HAProxy log lines have no natural key, + # so this hash is the dedup key: re-ingesting the same lines is a no-op. + line_hash: Mapped[str] = mapped_column(unique=True) + + # When HAProxy accepted the request. HAProxy logs local wall-clock time; the + # parser attaches the host's timezone so this is stored timezone-aware. + requested_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), index=True) + + client_ip: Mapped[str] = mapped_column(index=True) + client_port: Mapped[int] + + frontend: Mapped[str] + ssl: Mapped[bool] + backend: Mapped[str] = mapped_column(index=True) + server: Mapped[str] + + time_request: Mapped[int] + time_queue: Mapped[int] + time_connect: Mapped[int] + time_response: Mapped[int] = mapped_column(index=True) + time_total: Mapped[int] + + status_code: Mapped[int] = mapped_column(index=True) + bytes_read: Mapped[int] = mapped_column(BigInteger) + termination_state: Mapped[str] + + active_connections: Mapped[int] + frontend_connections: Mapped[int] + backend_connections: Mapped[int] + server_connections: Mapped[int] + retries: Mapped[int] + server_queue: Mapped[int] + backend_queue: Mapped[int] + + host: Mapped[str | None] = mapped_column(index=True) + user_agent: Mapped[str | None] = mapped_column(index=True) + + method: Mapped[str] + target: Mapped[str] + path: Mapped[str] = mapped_column(index=True) + query: Mapped[str | None] + http_version: Mapped[str] diff --git a/tests/test_haproxy_logs.py b/tests/test_haproxy_logs.py new file mode 100644 index 0000000..4cd6571 --- /dev/null +++ b/tests/test_haproxy_logs.py @@ -0,0 +1,127 @@ +"""test_haproxy_logs.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest +from sqlalchemy import create_engine, func, select +from sqlalchemy.orm import Session +from sqlalchemy.pool import StaticPool + +from python.haproxy_logs.ingest import ingest_lines +from python.haproxy_logs.parser import parse_line +from python.orm.richie.haproxy import HaproxyRequest + +if TYPE_CHECKING: + from collections.abc import Iterator + +GPTBOT_LINE = ( + "Jun 22 20:17:46 jeeves haproxy[688739]: 74.7.242.30:59644 " + "[22/Jun/2026:20:17:46.227] ContentSwitching~ gitea/server 0/0/0/133/148 " + "200 292890 - - ---- 7/7/5/5/0 0/0 " + "{gitea.tmmworkshop.com|like Gecko; compatible; GPTBot/1.4; +https://openai.com/gptbot)} " + '"GET https://gitea.tmmworkshop.com/Richie/dotfiles/src/commit/abc/installer.py?display=source HTTP/2.0"' +) + + +def _line(client_ip: str, path: str, time_response: int, user_agent: str = "curl/8") -> str: + return ( + f"{client_ip}:50000 [22/Jun/2026:20:17:46.227] ContentSwitching~ gitea/server " + f"0/0/0/{time_response}/{time_response} 200 100 - - ---- 1/1/1/1/0 0/0 " + f'{{gitea.tmmworkshop.com|{user_agent}}} "GET https://gitea.tmmworkshop.com{path} HTTP/2.0"' + ) + + +def test_parse_real_gptbot_line() -> None: + """A real GPTBot request line parses into the expected fields.""" + parsed = parse_line(GPTBOT_LINE) + + assert parsed is not None + assert parsed["client_ip"] == "74.7.242.30" + assert parsed["client_port"] == 59644 + assert parsed["frontend"] == "ContentSwitching" + assert parsed["ssl"] is True + assert parsed["backend"] == "gitea" + assert parsed["time_response"] == 133 + assert parsed["time_total"] == 148 + assert parsed["status_code"] == 200 + assert parsed["bytes_read"] == 292890 + assert parsed["host"] == "gitea.tmmworkshop.com" + assert "GPTBot/1.4" in parsed["user_agent"] + assert parsed["method"] == "GET" + assert parsed["path"] == "https://gitea.tmmworkshop.com/Richie/dotfiles/src/commit/abc/installer.py" + assert parsed["query"] == "display=source" + assert parsed["http_version"] == "HTTP/2.0" + + +def test_parse_non_request_line_returns_none() -> None: + """A non-request log line is ignored rather than raising.""" + assert parse_line("Jun 22 20:00:00 jeeves haproxy[1]: Proxy ContentSwitching started.") is None + assert parse_line("") is None + + +@pytest.fixture +def session() -> Iterator[Session]: + """In-memory SQLite session with just the haproxy_request table created.""" + engine = create_engine( + "sqlite://", + connect_args={"check_same_thread": False}, + poolclass=StaticPool, + ) + HaproxyRequest.__table__.create(bind=engine, checkfirst=True) + with Session(engine) as open_session: + yield open_session + + +def test_ingest_inserts_and_skips(session: Session) -> None: + """Parseable lines are inserted; non-request lines are skipped, not fatal.""" + lines = [ + _line("10.0.0.1", "/a", 10), + _line("10.0.0.2", "/b", 20, user_agent="GPTBot/1.4"), + "Jun 22 20:00:00 jeeves haproxy[1]: Proxy ContentSwitching started.", + "", + ] + result = ingest_lines(lines, session) + + assert result.inserted == 2 + assert result.skipped == 1 + assert result.duplicates == 0 + assert session.scalar(select(func.count()).select_from(HaproxyRequest)) == 2 + + +def test_reingest_is_idempotent(session: Session) -> None: + """Re-ingesting the same lines creates no duplicate rows.""" + lines = [_line("10.0.0.1", "/a", 10), _line("10.0.0.2", "/b", 20)] + + first = ingest_lines(lines, session) + assert first.inserted == 2 + assert first.duplicates == 0 + + second = ingest_lines(lines, session) + assert second.inserted == 0 + assert second.duplicates == 2 + + assert session.scalar(select(func.count()).select_from(HaproxyRequest)) == 2 + + +def test_duplicate_lines_within_one_run_collapse(session: Session) -> None: + """Identical lines in a single run are stored once.""" + line = _line("10.0.0.1", "/a", 10) + result = ingest_lines([line, line, line], session) + + assert result.inserted == 1 + assert result.duplicates == 2 + assert session.scalar(select(func.count()).select_from(HaproxyRequest)) == 1 + + +def test_ingest_persists_parsed_fields(session: Session) -> None: + """A stored row keeps the fields pulled out by the parser.""" + ingest_lines([GPTBOT_LINE], session) + + stored = session.scalar(select(HaproxyRequest)) + assert stored is not None + assert stored.client_ip == "74.7.242.30" + assert stored.backend == "gitea" + assert stored.time_response == 133 + assert "GPTBot/1.4" in stored.user_agent