Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f71c0c8ed6 | |||
| 8cf7f3cc4a | |||
| 1d1bafbd30 | |||
| e1c4ae0d6e |
@@ -0,0 +1,103 @@
|
||||
"""adding haproxy data.
|
||||
|
||||
Revision ID: 96d72c748c24
|
||||
Revises: c460105682d2
|
||||
Create Date: 2026-06-23 16:37:17.768851
|
||||
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
from python.orm import RichieBase
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Sequence
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "96d72c748c24"
|
||||
down_revision: str | None = "c460105682d2"
|
||||
branch_labels: str | Sequence[str] | None = None
|
||||
depends_on: str | Sequence[str] | None = None
|
||||
|
||||
schema = RichieBase.schema_name
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
"""Upgrade."""
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table(
|
||||
"haproxy_request",
|
||||
sa.Column("line_hash", sa.String(), nullable=False),
|
||||
sa.Column("requested_at", sa.DateTime(timezone=True), nullable=False),
|
||||
sa.Column("client_ip", sa.String(), nullable=False),
|
||||
sa.Column("client_port", sa.Integer(), nullable=False),
|
||||
sa.Column("frontend", sa.String(), nullable=False),
|
||||
sa.Column("ssl", sa.Boolean(), nullable=False),
|
||||
sa.Column("backend", sa.String(), nullable=False),
|
||||
sa.Column("server", sa.String(), nullable=False),
|
||||
sa.Column("time_request", sa.Integer(), nullable=False),
|
||||
sa.Column("time_queue", sa.Integer(), nullable=False),
|
||||
sa.Column("time_connect", sa.Integer(), nullable=False),
|
||||
sa.Column("time_response", sa.Integer(), nullable=False),
|
||||
sa.Column("time_total", sa.Integer(), nullable=False),
|
||||
sa.Column("status_code", sa.Integer(), nullable=False),
|
||||
sa.Column("bytes_read", sa.BigInteger(), nullable=False),
|
||||
sa.Column("termination_state", sa.String(), nullable=False),
|
||||
sa.Column("active_connections", sa.Integer(), nullable=False),
|
||||
sa.Column("frontend_connections", sa.Integer(), nullable=False),
|
||||
sa.Column("backend_connections", sa.Integer(), nullable=False),
|
||||
sa.Column("server_connections", sa.Integer(), nullable=False),
|
||||
sa.Column("retries", sa.Integer(), nullable=False),
|
||||
sa.Column("server_queue", sa.Integer(), nullable=False),
|
||||
sa.Column("backend_queue", sa.Integer(), nullable=False),
|
||||
sa.Column("host", sa.String(), nullable=True),
|
||||
sa.Column("user_agent", sa.String(), nullable=True),
|
||||
sa.Column("method", sa.String(), nullable=False),
|
||||
sa.Column("target", sa.String(), nullable=False),
|
||||
sa.Column("path", sa.String(), nullable=False),
|
||||
sa.Column("query", sa.String(), nullable=True),
|
||||
sa.Column("http_version", sa.String(), nullable=False),
|
||||
sa.Column("id", sa.Integer(), nullable=False),
|
||||
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
|
||||
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
|
||||
sa.PrimaryKeyConstraint("id", name=op.f("pk_haproxy_request")),
|
||||
sa.UniqueConstraint("line_hash", name=op.f("uq_haproxy_request_line_hash")),
|
||||
schema=schema,
|
||||
)
|
||||
op.create_index(op.f("ix_haproxy_request_backend"), "haproxy_request", ["backend"], unique=False, schema=schema)
|
||||
op.create_index(op.f("ix_haproxy_request_client_ip"), "haproxy_request", ["client_ip"], unique=False, schema=schema)
|
||||
op.create_index(op.f("ix_haproxy_request_host"), "haproxy_request", ["host"], unique=False, schema=schema)
|
||||
op.create_index(op.f("ix_haproxy_request_path"), "haproxy_request", ["path"], unique=False, schema=schema)
|
||||
op.create_index(
|
||||
op.f("ix_haproxy_request_requested_at"), "haproxy_request", ["requested_at"], unique=False, schema=schema
|
||||
)
|
||||
op.create_index(
|
||||
op.f("ix_haproxy_request_status_code"), "haproxy_request", ["status_code"], unique=False, schema=schema
|
||||
)
|
||||
op.create_index(
|
||||
op.f("ix_haproxy_request_time_response"), "haproxy_request", ["time_response"], unique=False, schema=schema
|
||||
)
|
||||
op.create_index(
|
||||
op.f("ix_haproxy_request_user_agent"), "haproxy_request", ["user_agent"], unique=False, schema=schema
|
||||
)
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
"""Downgrade."""
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_index(op.f("ix_haproxy_request_user_agent"), table_name="haproxy_request", schema=schema)
|
||||
op.drop_index(op.f("ix_haproxy_request_time_response"), table_name="haproxy_request", schema=schema)
|
||||
op.drop_index(op.f("ix_haproxy_request_status_code"), table_name="haproxy_request", schema=schema)
|
||||
op.drop_index(op.f("ix_haproxy_request_requested_at"), table_name="haproxy_request", schema=schema)
|
||||
op.drop_index(op.f("ix_haproxy_request_path"), table_name="haproxy_request", schema=schema)
|
||||
op.drop_index(op.f("ix_haproxy_request_host"), table_name="haproxy_request", schema=schema)
|
||||
op.drop_index(op.f("ix_haproxy_request_client_ip"), table_name="haproxy_request", schema=schema)
|
||||
op.drop_index(op.f("ix_haproxy_request_backend"), table_name="haproxy_request", schema=schema)
|
||||
op.drop_table("haproxy_request", schema=schema)
|
||||
# ### end Alembic commands ###
|
||||
@@ -0,0 +1 @@
|
||||
"""Load HAProxy ``option httplog`` lines into SQLite and query them."""
|
||||
@@ -0,0 +1,54 @@
|
||||
"""Command-line interface: load HAProxy logs into the Richie database.
|
||||
|
||||
The table schema is managed with alembic (``database richie upgrade head``); this
|
||||
command only inserts rows.
|
||||
|
||||
Examples:
|
||||
# stream the live log into the database (commit every line)
|
||||
journalctl -u haproxy -o cat -f | haproxy-logs ingest --batch-size 1
|
||||
|
||||
# backfill from a saved log file
|
||||
haproxy-logs ingest --file /tmp/haproxy.log
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Annotated
|
||||
|
||||
import typer
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from python.haproxy_logs.ingest import ingest_lines
|
||||
from python.orm.common import get_postgres_engine
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterable
|
||||
|
||||
app = typer.Typer(help="Load HAProxy logs into the Richie database.", no_args_is_help=True)
|
||||
|
||||
|
||||
@app.command()
|
||||
def ingest(
|
||||
file: Annotated[str | None, typer.Option(help="Read lines from a file instead of stdin.")] = None,
|
||||
batch_size: Annotated[int, typer.Option(help="Rows per commit; use 1 when tailing a live log.")] = 100,
|
||||
) -> None:
|
||||
"""Parse HAProxy log lines from stdin (or a file) and store them in the Richie DB."""
|
||||
engine = get_postgres_engine(name="RICHIE")
|
||||
with Session(engine) as session:
|
||||
result = ingest_lines(_read_lines(file), session, batch_size=batch_size)
|
||||
typer.echo(f"inserted={result.inserted} duplicates={result.duplicates} skipped={result.skipped}")
|
||||
|
||||
|
||||
def _read_lines(file: str | None) -> Iterable[str]:
|
||||
"""Yield log lines from a file, or from stdin when no file is given."""
|
||||
if file is None:
|
||||
yield from sys.stdin
|
||||
return
|
||||
with Path(file).open(encoding="utf-8", errors="replace") as handle:
|
||||
yield from handle
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app()
|
||||
@@ -0,0 +1,88 @@
|
||||
"""Parse HAProxy log lines and persist them to the database.
|
||||
|
||||
Ingestion is idempotent: each row carries a ``line_hash`` (a unique column), and
|
||||
this module skips lines whose hash already exists, so the same logs can be
|
||||
re-ingested without creating duplicate records.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from sqlalchemy import select
|
||||
|
||||
from python.haproxy_logs.parser import parse_line
|
||||
from python.orm.richie.haproxy import HaproxyRequest
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterable
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class IngestResult:
|
||||
"""Summary counts for an ingest run."""
|
||||
|
||||
inserted: int
|
||||
skipped: int
|
||||
duplicates: int
|
||||
|
||||
|
||||
def ingest_lines(lines: Iterable[str], session: Session, *, batch_size: int = 100) -> IngestResult:
|
||||
"""Parse log lines and insert new ones into the database in batches.
|
||||
|
||||
Lines already present (same ``line_hash``) are dropped, as are duplicate
|
||||
lines within the same run. Unparseable non-blank lines are counted as
|
||||
``skipped`` rather than raising, so a stray startup message never aborts a
|
||||
streaming ingest.
|
||||
|
||||
Args:
|
||||
lines: Iterable of raw log lines (with or without a journald prefix).
|
||||
session: Database session to insert into.
|
||||
batch_size: Number of distinct rows to buffer before committing. Use
|
||||
``1`` when tailing a live log so rows land immediately.
|
||||
|
||||
Returns:
|
||||
Counts of inserted, skipped (unparseable) and duplicate lines.
|
||||
"""
|
||||
inserted = 0
|
||||
skipped = 0
|
||||
parsed_count = 0
|
||||
batch: dict[str, HaproxyRequest] = {}
|
||||
|
||||
for line in lines:
|
||||
parsed = parse_line(line)
|
||||
if parsed is None:
|
||||
if line.strip():
|
||||
skipped += 1
|
||||
continue
|
||||
parsed_count += 1
|
||||
batch[parsed["line_hash"]] = HaproxyRequest(**parsed)
|
||||
if len(batch) >= batch_size:
|
||||
inserted += _flush(batch, session)
|
||||
|
||||
inserted += _flush(batch, session)
|
||||
return IngestResult(inserted=inserted, skipped=skipped, duplicates=parsed_count - inserted)
|
||||
|
||||
|
||||
def _flush(batch: dict[str, HaproxyRequest], session: Session) -> int:
|
||||
"""Insert the rows whose hash is not already stored, then clear the batch.
|
||||
|
||||
Returns the number of rows actually written.
|
||||
"""
|
||||
if not batch:
|
||||
return 0
|
||||
|
||||
existing = set(
|
||||
session.scalars(select(HaproxyRequest.line_hash).where(HaproxyRequest.line_hash.in_(batch.keys()))),
|
||||
)
|
||||
new_rows = [row for line_hash, row in batch.items() if line_hash not in existing]
|
||||
batch.clear()
|
||||
|
||||
if not new_rows:
|
||||
return 0
|
||||
session.add_all(new_rows)
|
||||
session.commit()
|
||||
return len(new_rows)
|
||||
@@ -0,0 +1,132 @@
|
||||
"""Parse HAProxy ``option httplog`` lines into column mappings.
|
||||
|
||||
The expected format (with the request-header capture this project configures) is::
|
||||
|
||||
<client_ip>:<port> [<accept_date>] <frontend> <backend>/<server>
|
||||
<TR>/<Tw>/<Tc>/<Tr>/<Ta> <status> <bytes> <req_cookie> <resp_cookie>
|
||||
<term_state> <ac>/<fc>/<bc>/<sc>/<rc> <srv_q>/<back_q>
|
||||
{<host>|<user_agent>} "<method> <target> <version>"
|
||||
|
||||
Lines may still carry a systemd-journal prefix (``... haproxy[123]: ``); it is
|
||||
stripped before parsing. Lines that are not request logs (startup messages,
|
||||
health checks, ...) return ``None``.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import re
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
# Strips an optional journal prefix such as "Jun 22 20:17:46 jeeves haproxy[688739]: ".
|
||||
_JOURNAL_PREFIX = re.compile(r"^.*?haproxy(?:\[\d+\])?:\s+")
|
||||
|
||||
_LOG_LINE = re.compile(
|
||||
r"""
|
||||
^
|
||||
(?P<client_ip>[0-9a-fA-F:.]+):(?P<client_port>\d+)\s+
|
||||
\[(?P<accept_date>[^\]]+)\]\s+
|
||||
(?P<frontend>\S+)\s+
|
||||
(?P<backend>[^/\s]+)/(?P<server>\S+)\s+
|
||||
(?P<time_request>-?\d+)/(?P<time_queue>-?\d+)/(?P<time_connect>-?\d+)/
|
||||
(?P<time_response>-?\d+)/(?P<time_total>-?\d+)\s+
|
||||
(?P<status_code>-?\d+)\s+
|
||||
(?P<bytes_read>\d+)\s+
|
||||
\S+\s+\S+\s+ # captured request/response cookies
|
||||
(?P<termination_state>\S+)\s+
|
||||
(?P<active_connections>\d+)/(?P<frontend_connections>\d+)/
|
||||
(?P<backend_connections>\d+)/(?P<server_connections>\d+)/(?P<retries>\d+)\s+
|
||||
(?P<server_queue>\d+)/(?P<backend_queue>\d+)\s+
|
||||
(?P<captures>(?:\{[^}]*\}\s+)*)
|
||||
"(?P<request>[^"]*)"
|
||||
\s*$
|
||||
""",
|
||||
re.VERBOSE,
|
||||
)
|
||||
|
||||
_ACCEPT_DATE_FORMAT = "%d/%b/%Y:%H:%M:%S.%f"
|
||||
_HEADER_CAPTURE = re.compile(r"\{([^}]*)\}")
|
||||
|
||||
|
||||
def parse_line(line: str) -> dict[str, Any] | None:
|
||||
"""Parse one HAProxy http-log line into a ``HaproxyRequest`` column mapping.
|
||||
|
||||
Args:
|
||||
line: A raw log line, optionally still carrying a journald prefix.
|
||||
|
||||
Returns:
|
||||
A mapping of column name to value, or ``None`` if the line is not a
|
||||
request log (blank lines, startup messages, aborted health checks, ...).
|
||||
"""
|
||||
stripped = _JOURNAL_PREFIX.sub("", line.strip())
|
||||
match = _LOG_LINE.match(stripped)
|
||||
if match is None:
|
||||
return None
|
||||
|
||||
groups = match.groupdict()
|
||||
frontend = groups["frontend"]
|
||||
is_ssl = frontend.endswith("~")
|
||||
host, user_agent = _split_request_headers(groups["captures"])
|
||||
method, target, http_version = _split_request(groups["request"])
|
||||
path, _, query = target.partition("?")
|
||||
|
||||
return {
|
||||
"line_hash": hashlib.sha256(stripped.encode("utf-8")).hexdigest(),
|
||||
"requested_at": _parse_accept_date(groups["accept_date"]),
|
||||
"client_ip": groups["client_ip"],
|
||||
"client_port": int(groups["client_port"]),
|
||||
"frontend": frontend.rstrip("~"),
|
||||
"ssl": is_ssl,
|
||||
"backend": groups["backend"],
|
||||
"server": groups["server"],
|
||||
"time_request": int(groups["time_request"]),
|
||||
"time_queue": int(groups["time_queue"]),
|
||||
"time_connect": int(groups["time_connect"]),
|
||||
"time_response": int(groups["time_response"]),
|
||||
"time_total": int(groups["time_total"]),
|
||||
"status_code": int(groups["status_code"]),
|
||||
"bytes_read": int(groups["bytes_read"]),
|
||||
"termination_state": groups["termination_state"],
|
||||
"active_connections": int(groups["active_connections"]),
|
||||
"frontend_connections": int(groups["frontend_connections"]),
|
||||
"backend_connections": int(groups["backend_connections"]),
|
||||
"server_connections": int(groups["server_connections"]),
|
||||
"retries": int(groups["retries"]),
|
||||
"server_queue": int(groups["server_queue"]),
|
||||
"backend_queue": int(groups["backend_queue"]),
|
||||
"host": host,
|
||||
"user_agent": user_agent,
|
||||
"method": method,
|
||||
"target": target,
|
||||
"path": path,
|
||||
"query": query or None,
|
||||
"http_version": http_version,
|
||||
}
|
||||
|
||||
|
||||
def _parse_accept_date(value: str) -> datetime:
|
||||
"""Parse HAProxy's accept date and attach the host's local timezone."""
|
||||
# HAProxy logs naive local wall-clock time; astimezone() interprets it as the
|
||||
# host's local zone and returns a timezone-aware datetime.
|
||||
return datetime.strptime(value, _ACCEPT_DATE_FORMAT).astimezone()
|
||||
|
||||
|
||||
def _split_request_headers(captures: str) -> tuple[str | None, str | None]:
|
||||
"""Pull the Host and User-Agent out of the first ``{host|user-agent}`` capture."""
|
||||
match = _HEADER_CAPTURE.search(captures)
|
||||
if match is None:
|
||||
return None, None
|
||||
host, _, user_agent = match.group(1).partition("|")
|
||||
return (host or None), (user_agent or None)
|
||||
|
||||
|
||||
def _split_request(request: str) -> tuple[str, str, str]:
|
||||
"""Split a request line into method, target and HTTP version.
|
||||
|
||||
Tolerates malformed values such as ``<BADREQ>`` by returning empty strings
|
||||
for the missing parts.
|
||||
"""
|
||||
method, _, remainder = request.partition(" ")
|
||||
target, _, http_version = remainder.partition(" ")
|
||||
return method, target, http_version
|
||||
@@ -0,0 +1,71 @@
|
||||
"""HAProxy request-log ORM model.
|
||||
|
||||
The columns mirror HAProxy's default ``option httplog`` format. The table lives in
|
||||
the Richie database; manage its schema with the usual alembic workflow::
|
||||
|
||||
database richie revision --autogenerate -m "add haproxy_request table"
|
||||
database richie upgrade head
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy import BigInteger, DateTime
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
from python.orm.richie.base import TableBase
|
||||
|
||||
|
||||
class HaproxyRequest(TableBase):
|
||||
"""A single HAProxy HTTP request log line.
|
||||
|
||||
Timer fields (``time_*``) are milliseconds and may be ``-1`` when a phase did
|
||||
not complete, for example a client that disconnected before the response was
|
||||
fully sent.
|
||||
"""
|
||||
|
||||
__tablename__ = "haproxy_request"
|
||||
|
||||
# SHA-256 of the normalized log line. HAProxy log lines have no natural key,
|
||||
# so this hash is the dedup key: re-ingesting the same lines is a no-op.
|
||||
line_hash: Mapped[str] = mapped_column(unique=True)
|
||||
|
||||
# When HAProxy accepted the request. HAProxy logs local wall-clock time; the
|
||||
# parser attaches the host's timezone so this is stored timezone-aware.
|
||||
requested_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), index=True)
|
||||
|
||||
client_ip: Mapped[str] = mapped_column(index=True)
|
||||
client_port: Mapped[int]
|
||||
|
||||
frontend: Mapped[str]
|
||||
ssl: Mapped[bool]
|
||||
backend: Mapped[str] = mapped_column(index=True)
|
||||
server: Mapped[str]
|
||||
|
||||
time_request: Mapped[int]
|
||||
time_queue: Mapped[int]
|
||||
time_connect: Mapped[int]
|
||||
time_response: Mapped[int] = mapped_column(index=True)
|
||||
time_total: Mapped[int]
|
||||
|
||||
status_code: Mapped[int] = mapped_column(index=True)
|
||||
bytes_read: Mapped[int] = mapped_column(BigInteger)
|
||||
termination_state: Mapped[str]
|
||||
|
||||
active_connections: Mapped[int]
|
||||
frontend_connections: Mapped[int]
|
||||
backend_connections: Mapped[int]
|
||||
server_connections: Mapped[int]
|
||||
retries: Mapped[int]
|
||||
server_queue: Mapped[int]
|
||||
backend_queue: Mapped[int]
|
||||
|
||||
host: Mapped[str | None] = mapped_column(index=True)
|
||||
user_agent: Mapped[str | None] = mapped_column(index=True)
|
||||
|
||||
method: Mapped[str]
|
||||
target: Mapped[str]
|
||||
path: Mapped[str] = mapped_column(index=True)
|
||||
query: Mapped[str | None]
|
||||
http_version: Mapped[str]
|
||||
@@ -0,0 +1,529 @@
|
||||
{
|
||||
"annotations": {
|
||||
"list": []
|
||||
},
|
||||
"editable": false,
|
||||
"fiscalYearStartMonth": 0,
|
||||
"graphTooltip": 0,
|
||||
"links": [],
|
||||
"panels": [
|
||||
{
|
||||
"id": 1,
|
||||
"type": "stat",
|
||||
"title": "Total requests",
|
||||
"datasource": {
|
||||
"type": "postgres",
|
||||
"uid": "richie-postgres"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 4,
|
||||
"w": 6,
|
||||
"x": 0,
|
||||
"y": 0
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
"mode": "thresholds"
|
||||
},
|
||||
"thresholds": {
|
||||
"mode": "absolute",
|
||||
"steps": [
|
||||
{
|
||||
"color": "blue",
|
||||
"value": null
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"colorMode": "value",
|
||||
"graphMode": "area",
|
||||
"justifyMode": "auto",
|
||||
"reduceOptions": {
|
||||
"calcs": [
|
||||
"lastNotNull"
|
||||
],
|
||||
"fields": "",
|
||||
"values": false
|
||||
},
|
||||
"textMode": "auto"
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"datasource": {
|
||||
"type": "postgres",
|
||||
"uid": "richie-postgres"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"format": "table",
|
||||
"rawQuery": true,
|
||||
"rawSql": "SELECT count(*) AS \"Total requests\" FROM main.haproxy_request WHERE $__timeFilter(requested_at)",
|
||||
"refId": "A"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"type": "stat",
|
||||
"title": "Unique client IPs",
|
||||
"datasource": {
|
||||
"type": "postgres",
|
||||
"uid": "richie-postgres"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 4,
|
||||
"w": 6,
|
||||
"x": 6,
|
||||
"y": 0
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
"mode": "thresholds"
|
||||
},
|
||||
"thresholds": {
|
||||
"mode": "absolute",
|
||||
"steps": [
|
||||
{
|
||||
"color": "blue",
|
||||
"value": null
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"colorMode": "value",
|
||||
"graphMode": "area",
|
||||
"justifyMode": "auto",
|
||||
"reduceOptions": {
|
||||
"calcs": [
|
||||
"lastNotNull"
|
||||
],
|
||||
"fields": "",
|
||||
"values": false
|
||||
},
|
||||
"textMode": "auto"
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"datasource": {
|
||||
"type": "postgres",
|
||||
"uid": "richie-postgres"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"format": "table",
|
||||
"rawQuery": true,
|
||||
"rawSql": "SELECT count(DISTINCT client_ip) AS \"Unique IPs\" FROM main.haproxy_request WHERE $__timeFilter(requested_at)",
|
||||
"refId": "A"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": 3,
|
||||
"type": "stat",
|
||||
"title": "Distinct user-agents",
|
||||
"datasource": {
|
||||
"type": "postgres",
|
||||
"uid": "richie-postgres"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 4,
|
||||
"w": 6,
|
||||
"x": 12,
|
||||
"y": 0
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
"mode": "thresholds"
|
||||
},
|
||||
"thresholds": {
|
||||
"mode": "absolute",
|
||||
"steps": [
|
||||
{
|
||||
"color": "blue",
|
||||
"value": null
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"colorMode": "value",
|
||||
"graphMode": "area",
|
||||
"justifyMode": "auto",
|
||||
"reduceOptions": {
|
||||
"calcs": [
|
||||
"lastNotNull"
|
||||
],
|
||||
"fields": "",
|
||||
"values": false
|
||||
},
|
||||
"textMode": "auto"
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"datasource": {
|
||||
"type": "postgres",
|
||||
"uid": "richie-postgres"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"format": "table",
|
||||
"rawQuery": true,
|
||||
"rawSql": "SELECT count(DISTINCT user_agent) AS \"User-Agents\" FROM main.haproxy_request WHERE $__timeFilter(requested_at)",
|
||||
"refId": "A"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": 4,
|
||||
"type": "stat",
|
||||
"title": "4xx/5xx responses",
|
||||
"datasource": {
|
||||
"type": "postgres",
|
||||
"uid": "richie-postgres"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 4,
|
||||
"w": 6,
|
||||
"x": 18,
|
||||
"y": 0
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
"mode": "thresholds"
|
||||
},
|
||||
"thresholds": {
|
||||
"mode": "absolute",
|
||||
"steps": [
|
||||
{
|
||||
"color": "blue",
|
||||
"value": null
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"colorMode": "value",
|
||||
"graphMode": "area",
|
||||
"justifyMode": "auto",
|
||||
"reduceOptions": {
|
||||
"calcs": [
|
||||
"lastNotNull"
|
||||
],
|
||||
"fields": "",
|
||||
"values": false
|
||||
},
|
||||
"textMode": "auto"
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"datasource": {
|
||||
"type": "postgres",
|
||||
"uid": "richie-postgres"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"format": "table",
|
||||
"rawQuery": true,
|
||||
"rawSql": "SELECT count(*) FILTER (WHERE status_code >= 400) AS \"Errors\" FROM main.haproxy_request WHERE $__timeFilter(requested_at)",
|
||||
"refId": "A"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": 5,
|
||||
"type": "timeseries",
|
||||
"title": "Requests by backend",
|
||||
"datasource": {
|
||||
"type": "postgres",
|
||||
"uid": "richie-postgres"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 8,
|
||||
"w": 12,
|
||||
"x": 0,
|
||||
"y": 4
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"custom": {
|
||||
"drawStyle": "line",
|
||||
"fillOpacity": 10,
|
||||
"showPoints": "never"
|
||||
},
|
||||
"unit": "reqps"
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"legend": {
|
||||
"displayMode": "list",
|
||||
"placement": "bottom",
|
||||
"showLegend": true
|
||||
},
|
||||
"tooltip": {
|
||||
"mode": "multi",
|
||||
"sort": "desc"
|
||||
}
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"datasource": {
|
||||
"type": "postgres",
|
||||
"uid": "richie-postgres"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"format": "time_series",
|
||||
"rawQuery": true,
|
||||
"rawSql": "SELECT $__timeGroupAlias(requested_at, $__interval), backend AS metric, count(*) AS requests FROM main.haproxy_request WHERE $__timeFilter(requested_at) GROUP BY 1, backend ORDER BY 1",
|
||||
"refId": "A"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": 6,
|
||||
"type": "timeseries",
|
||||
"title": "Backend response time percentiles",
|
||||
"datasource": {
|
||||
"type": "postgres",
|
||||
"uid": "richie-postgres"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 8,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 4
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"custom": {
|
||||
"drawStyle": "line",
|
||||
"fillOpacity": 10,
|
||||
"showPoints": "never"
|
||||
},
|
||||
"unit": "ms"
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"legend": {
|
||||
"displayMode": "list",
|
||||
"placement": "bottom",
|
||||
"showLegend": true
|
||||
},
|
||||
"tooltip": {
|
||||
"mode": "multi",
|
||||
"sort": "desc"
|
||||
}
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"datasource": {
|
||||
"type": "postgres",
|
||||
"uid": "richie-postgres"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"format": "time_series",
|
||||
"rawQuery": true,
|
||||
"rawSql": "SELECT $__timeGroupAlias(requested_at, $__interval), percentile_cont(0.50) WITHIN GROUP (ORDER BY time_response) AS \"p50\", percentile_cont(0.90) WITHIN GROUP (ORDER BY time_response) AS \"p90\", percentile_cont(0.95) WITHIN GROUP (ORDER BY time_response) AS \"p95\", percentile_cont(0.99) WITHIN GROUP (ORDER BY time_response) AS \"p99\" FROM main.haproxy_request WHERE $__timeFilter(requested_at) AND time_response >= 0 GROUP BY 1 ORDER BY 1",
|
||||
"refId": "A"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": 7,
|
||||
"type": "table",
|
||||
"title": "Top user-agents (bots)",
|
||||
"datasource": {
|
||||
"type": "postgres",
|
||||
"uid": "richie-postgres"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 8,
|
||||
"w": 12,
|
||||
"x": 0,
|
||||
"y": 12
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"custom": {
|
||||
"filterable": true
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"showHeader": true,
|
||||
"sortBy": []
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"datasource": {
|
||||
"type": "postgres",
|
||||
"uid": "richie-postgres"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"format": "table",
|
||||
"rawQuery": true,
|
||||
"rawSql": "SELECT user_agent AS \"User-Agent\", count(*) AS \"Requests\", count(DISTINCT client_ip) AS \"Distinct IPs\" FROM main.haproxy_request WHERE $__timeFilter(requested_at) GROUP BY user_agent ORDER BY \"Requests\" DESC LIMIT 25",
|
||||
"refId": "A"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": 8,
|
||||
"type": "table",
|
||||
"title": "Top client IPs",
|
||||
"datasource": {
|
||||
"type": "postgres",
|
||||
"uid": "richie-postgres"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 8,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 12
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"custom": {
|
||||
"filterable": true
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"showHeader": true,
|
||||
"sortBy": []
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"datasource": {
|
||||
"type": "postgres",
|
||||
"uid": "richie-postgres"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"format": "table",
|
||||
"rawQuery": true,
|
||||
"rawSql": "SELECT client_ip AS \"Client IP\", count(*) AS \"Requests\", max(user_agent) AS \"User-Agent (sample)\" FROM main.haproxy_request WHERE $__timeFilter(requested_at) GROUP BY client_ip ORDER BY \"Requests\" DESC LIMIT 25",
|
||||
"refId": "A"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": 9,
|
||||
"type": "table",
|
||||
"title": "Top endpoints",
|
||||
"datasource": {
|
||||
"type": "postgres",
|
||||
"uid": "richie-postgres"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 8,
|
||||
"w": 16,
|
||||
"x": 0,
|
||||
"y": 20
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"custom": {
|
||||
"filterable": true
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"showHeader": true,
|
||||
"sortBy": []
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"datasource": {
|
||||
"type": "postgres",
|
||||
"uid": "richie-postgres"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"format": "table",
|
||||
"rawQuery": true,
|
||||
"rawSql": "SELECT host AS \"Host\", path AS \"Path\", count(*) AS \"Requests\", round(avg(time_response)) AS \"Avg ms\" FROM main.haproxy_request WHERE $__timeFilter(requested_at) AND time_response >= 0 GROUP BY host, path ORDER BY \"Requests\" DESC LIMIT 25",
|
||||
"refId": "A"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": 10,
|
||||
"type": "piechart",
|
||||
"title": "Requests by status code",
|
||||
"datasource": {
|
||||
"type": "postgres",
|
||||
"uid": "richie-postgres"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 8,
|
||||
"w": 8,
|
||||
"x": 16,
|
||||
"y": 20
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"legend": {
|
||||
"displayMode": "list",
|
||||
"placement": "right",
|
||||
"showLegend": true
|
||||
},
|
||||
"pieType": "donut",
|
||||
"reduceOptions": {
|
||||
"calcs": [
|
||||
"lastNotNull"
|
||||
],
|
||||
"values": true
|
||||
}
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"datasource": {
|
||||
"type": "postgres",
|
||||
"uid": "richie-postgres"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"format": "table",
|
||||
"rawQuery": true,
|
||||
"rawSql": "SELECT status_code::text AS metric, count(*) AS value FROM main.haproxy_request WHERE $__timeFilter(requested_at) GROUP BY status_code ORDER BY value DESC",
|
||||
"refId": "A"
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"refresh": "30s",
|
||||
"schemaVersion": 39,
|
||||
"style": "dark",
|
||||
"tags": [
|
||||
"haproxy",
|
||||
"richie"
|
||||
],
|
||||
"templating": {
|
||||
"list": []
|
||||
},
|
||||
"time": {
|
||||
"from": "now-24h",
|
||||
"to": "now"
|
||||
},
|
||||
"timepicker": {},
|
||||
"timezone": "",
|
||||
"title": "HAProxy Requests",
|
||||
"uid": "haproxy-requests",
|
||||
"version": 1,
|
||||
"weekStart": ""
|
||||
}
|
||||
@@ -62,6 +62,21 @@ in
|
||||
uid = "prom-pid-short";
|
||||
url = "http://127.0.0.1:9092";
|
||||
}
|
||||
{
|
||||
access = "proxy";
|
||||
editable = false;
|
||||
name = "richie-postgres";
|
||||
type = "postgres";
|
||||
uid = "richie-postgres";
|
||||
url = "/run/postgresql";
|
||||
user = "richie";
|
||||
jsonData = {
|
||||
database = "richie";
|
||||
sslmode = "disable";
|
||||
postgresVersion = 1700;
|
||||
timescaledb = false;
|
||||
};
|
||||
}
|
||||
];
|
||||
};
|
||||
};
|
||||
@@ -71,6 +86,7 @@ in
|
||||
services.grafana.after = [
|
||||
"prometheus-main.service"
|
||||
"prometheus-pid-short.service"
|
||||
"postgresql.service"
|
||||
];
|
||||
|
||||
tmpfiles.rules = [
|
||||
|
||||
@@ -29,7 +29,7 @@ frontend ContentSwitching
|
||||
# ACME challenge routing (must be first)
|
||||
acl is_acme path_beg /.well-known/acme-challenge/
|
||||
|
||||
# tmmworkshop.com
|
||||
# Host ACLs (defined early so rate-limiting can scope to a single vhost)
|
||||
acl host_audiobookshelf hdr(host) -i audiobookshelf.tmmworkshop.com
|
||||
acl host_cache hdr(host) -i cache.tmmworkshop.com
|
||||
acl host_jellyfin hdr(host) -i jellyfin.tmmworkshop.com
|
||||
@@ -37,6 +37,53 @@ frontend ContentSwitching
|
||||
acl host_gitea hdr(host) -i gitea.tmmworkshop.com
|
||||
acl host_norn_sight hdr(host) -i www.norn-sight.com
|
||||
|
||||
# --- Rate limiting (Gitea only, per source IP) ---
|
||||
# Trusted devices exempt from rate limiting (add one line per IP/CIDR).
|
||||
# Internal / reserved-for-private-use ranges:
|
||||
# IPv4: RFC 1918 private, loopback, link-local
|
||||
acl rate_limit_allowlist src 10.0.0.0/8 172.16.0.0/12 192.168.0.0/16 127.0.0.0/8 169.254.0.0/16
|
||||
# IPv6: loopback, unique local (ULA), link-local
|
||||
acl rate_limit_allowlist src ::1/128 fc00::/7 fe80::/10
|
||||
# Add specific public devices below as needed:
|
||||
# acl rate_limit_allowlist src 192.0.2.50
|
||||
|
||||
# Logged-in Gitea sessions bypass the rate limits. Gitea sets the
|
||||
# `i_like_gitea` session cookie on login, and it is only sent to the Gitea
|
||||
# vhost, so this only affects Gitea traffic. Note: this matches cookie
|
||||
# PRESENCE, not validity, so it filters anonymous crawlers (which carry no
|
||||
# cookie) rather than acting as a hard security boundary.
|
||||
acl gitea_logged_in req.cook(i_like_gitea) -m found
|
||||
|
||||
# Track HTTP request rate per client IP over a 10s sliding window. Only Gitea
|
||||
# is rate-limited; all other vhosts are left alone.
|
||||
# ipv6 table type also covers IPv4 (mapped), so it works for both binds.
|
||||
stick-table type ipv6 size 100k expire 30s store http_req_rate(10s)
|
||||
http-request track-sc0 src if host_gitea !is_acme !rate_limit_allowlist !gitea_logged_in
|
||||
# Threshold: deny (429) when a client exceeds this many requests per 10s.
|
||||
acl over_rate_limit sc_http_req_rate(0) gt 10
|
||||
http-request deny deny_status 429 if over_rate_limit host_gitea !is_acme !rate_limit_allowlist !gitea_logged_in
|
||||
|
||||
# --- Request logging ---
|
||||
# Capture the Host header and User-Agent so the httplog shows who is
|
||||
# requesting what. They appear in the log's {captured|headers} field,
|
||||
# in this order: {host|user-agent}. Client IP is already logged by httplog.
|
||||
http-request capture req.hdr(Host) len 100
|
||||
http-request capture req.hdr(User-Agent) len 128
|
||||
|
||||
# --- robots.txt ---
|
||||
# Serve a single global robots.txt for every vhost (asks crawlers to wait
|
||||
# 10s between requests via Crawl-delay). Returned for both HTTP and HTTPS.
|
||||
# File is deployed to /etc/haproxy/robots.txt by haproxy.nix.
|
||||
acl is_robots path /robots.txt
|
||||
http-request return status 200 content-type "text/plain" file /etc/haproxy/robots.txt if is_robots
|
||||
|
||||
# --- Per-endpoint limit: Gitea compare/diff is expensive; cap at 1 req / 5 min / IP ---
|
||||
# Tracked in a separate 5-minute table (st_compare) since a proxy has only one
|
||||
# inline stick-table. Allow-listed (internal) IPs are exempt.
|
||||
acl is_gitea_compare path_beg /Richie/dotfiles/compare
|
||||
http-request track-sc1 src table st_compare if host_gitea is_gitea_compare !rate_limit_allowlist !gitea_logged_in
|
||||
http-request deny deny_status 429 if host_gitea is_gitea_compare !rate_limit_allowlist !gitea_logged_in { sc_http_req_rate(1,st_compare) gt 1 }
|
||||
|
||||
# Hosts allowed to serve plain HTTP (add entries to skip the HTTPS redirect)
|
||||
acl allow_http hdr(host) -i __none__
|
||||
# acl allow_http hdr(host) -i example.tmmworkshop.com
|
||||
@@ -52,6 +99,11 @@ frontend ContentSwitching
|
||||
use_backend gitea if host_gitea
|
||||
use_backend norn_sight if host_norn_sight
|
||||
|
||||
# Stick-table only (no servers): tracks per-IP request rate to Gitea's compare
|
||||
# endpoint over a 5-minute window so the frontend can cap it at 1 per 5 min.
|
||||
backend st_compare
|
||||
stick-table type ipv6 size 100k expire 600s store http_req_rate(300s)
|
||||
|
||||
backend acme_challenge
|
||||
mode http
|
||||
server acme 127.0.0.1:8402
|
||||
|
||||
@@ -4,6 +4,9 @@
|
||||
443
|
||||
];
|
||||
|
||||
# Global robots.txt served by HAProxy for every vhost (see haproxy.cfg).
|
||||
environment.etc."haproxy/robots.txt".source = ./robots.txt;
|
||||
|
||||
services.haproxy = {
|
||||
enable = true;
|
||||
config = builtins.readFile ./haproxy.cfg;
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
User-agent: *
|
||||
Crawl-delay: 10
|
||||
@@ -0,0 +1,127 @@
|
||||
"""test_haproxy_logs."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import create_engine, func, select
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy.pool import StaticPool
|
||||
|
||||
from python.haproxy_logs.ingest import ingest_lines
|
||||
from python.haproxy_logs.parser import parse_line
|
||||
from python.orm.richie.haproxy import HaproxyRequest
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterator
|
||||
|
||||
GPTBOT_LINE = (
|
||||
"Jun 22 20:17:46 jeeves haproxy[688739]: 74.7.242.30:59644 "
|
||||
"[22/Jun/2026:20:17:46.227] ContentSwitching~ gitea/server 0/0/0/133/148 "
|
||||
"200 292890 - - ---- 7/7/5/5/0 0/0 "
|
||||
"{gitea.tmmworkshop.com|like Gecko; compatible; GPTBot/1.4; +https://openai.com/gptbot)} "
|
||||
'"GET https://gitea.tmmworkshop.com/Richie/dotfiles/src/commit/abc/installer.py?display=source HTTP/2.0"'
|
||||
)
|
||||
|
||||
|
||||
def _line(client_ip: str, path: str, time_response: int, user_agent: str = "curl/8") -> str:
|
||||
return (
|
||||
f"{client_ip}:50000 [22/Jun/2026:20:17:46.227] ContentSwitching~ gitea/server "
|
||||
f"0/0/0/{time_response}/{time_response} 200 100 - - ---- 1/1/1/1/0 0/0 "
|
||||
f'{{gitea.tmmworkshop.com|{user_agent}}} "GET https://gitea.tmmworkshop.com{path} HTTP/2.0"'
|
||||
)
|
||||
|
||||
|
||||
def test_parse_real_gptbot_line() -> None:
|
||||
"""A real GPTBot request line parses into the expected fields."""
|
||||
parsed = parse_line(GPTBOT_LINE)
|
||||
|
||||
assert parsed is not None
|
||||
assert parsed["client_ip"] == "74.7.242.30"
|
||||
assert parsed["client_port"] == 59644
|
||||
assert parsed["frontend"] == "ContentSwitching"
|
||||
assert parsed["ssl"] is True
|
||||
assert parsed["backend"] == "gitea"
|
||||
assert parsed["time_response"] == 133
|
||||
assert parsed["time_total"] == 148
|
||||
assert parsed["status_code"] == 200
|
||||
assert parsed["bytes_read"] == 292890
|
||||
assert parsed["host"] == "gitea.tmmworkshop.com"
|
||||
assert "GPTBot/1.4" in parsed["user_agent"]
|
||||
assert parsed["method"] == "GET"
|
||||
assert parsed["path"] == "https://gitea.tmmworkshop.com/Richie/dotfiles/src/commit/abc/installer.py"
|
||||
assert parsed["query"] == "display=source"
|
||||
assert parsed["http_version"] == "HTTP/2.0"
|
||||
|
||||
|
||||
def test_parse_non_request_line_returns_none() -> None:
|
||||
"""A non-request log line is ignored rather than raising."""
|
||||
assert parse_line("Jun 22 20:00:00 jeeves haproxy[1]: Proxy ContentSwitching started.") is None
|
||||
assert parse_line("") is None
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def session() -> Iterator[Session]:
|
||||
"""In-memory SQLite session with just the haproxy_request table created."""
|
||||
engine = create_engine(
|
||||
"sqlite://",
|
||||
connect_args={"check_same_thread": False},
|
||||
poolclass=StaticPool,
|
||||
)
|
||||
HaproxyRequest.__table__.create(bind=engine, checkfirst=True)
|
||||
with Session(engine) as open_session:
|
||||
yield open_session
|
||||
|
||||
|
||||
def test_ingest_inserts_and_skips(session: Session) -> None:
|
||||
"""Parseable lines are inserted; non-request lines are skipped, not fatal."""
|
||||
lines = [
|
||||
_line("10.0.0.1", "/a", 10),
|
||||
_line("10.0.0.2", "/b", 20, user_agent="GPTBot/1.4"),
|
||||
"Jun 22 20:00:00 jeeves haproxy[1]: Proxy ContentSwitching started.",
|
||||
"",
|
||||
]
|
||||
result = ingest_lines(lines, session)
|
||||
|
||||
assert result.inserted == 2
|
||||
assert result.skipped == 1
|
||||
assert result.duplicates == 0
|
||||
assert session.scalar(select(func.count()).select_from(HaproxyRequest)) == 2
|
||||
|
||||
|
||||
def test_reingest_is_idempotent(session: Session) -> None:
|
||||
"""Re-ingesting the same lines creates no duplicate rows."""
|
||||
lines = [_line("10.0.0.1", "/a", 10), _line("10.0.0.2", "/b", 20)]
|
||||
|
||||
first = ingest_lines(lines, session)
|
||||
assert first.inserted == 2
|
||||
assert first.duplicates == 0
|
||||
|
||||
second = ingest_lines(lines, session)
|
||||
assert second.inserted == 0
|
||||
assert second.duplicates == 2
|
||||
|
||||
assert session.scalar(select(func.count()).select_from(HaproxyRequest)) == 2
|
||||
|
||||
|
||||
def test_duplicate_lines_within_one_run_collapse(session: Session) -> None:
|
||||
"""Identical lines in a single run are stored once."""
|
||||
line = _line("10.0.0.1", "/a", 10)
|
||||
result = ingest_lines([line, line, line], session)
|
||||
|
||||
assert result.inserted == 1
|
||||
assert result.duplicates == 2
|
||||
assert session.scalar(select(func.count()).select_from(HaproxyRequest)) == 1
|
||||
|
||||
|
||||
def test_ingest_persists_parsed_fields(session: Session) -> None:
|
||||
"""A stored row keeps the fields pulled out by the parser."""
|
||||
ingest_lines([GPTBOT_LINE], session)
|
||||
|
||||
stored = session.scalar(select(HaproxyRequest))
|
||||
assert stored is not None
|
||||
assert stored.client_ip == "74.7.242.30"
|
||||
assert stored.backend == "gitea"
|
||||
assert stored.time_response == 133
|
||||
assert "GPTBot/1.4" in stored.user_agent
|
||||
Reference in New Issue
Block a user