mirror of
https://github.com/RichieCahill/dotfiles.git
synced 2026-04-21 06:39:09 -04:00
Compare commits
2 Commits
feature/se
...
claude/add
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
56b0b6651e | ||
| f627a5ac6e |
25
README.md
25
README.md
@@ -1 +1,26 @@
|
|||||||
# dotfiles
|
# dotfiles
|
||||||
|
|
||||||
|
<!-- LINE-COUNT-START -->
|
||||||
|
This repo has **20,055** lines of technical debt.
|
||||||
|
|
||||||
|
| File Type | Lines | Percentage |
|
||||||
|
|-----------|------:|-----------:|
|
||||||
|
| .py | 11,441 | 57.0% |
|
||||||
|
| .nix | 4,471 | 22.3% |
|
||||||
|
| .yaml | 1,121 | 5.6% |
|
||||||
|
| .html | 1,009 | 5.0% |
|
||||||
|
| .json | 555 | 2.8% |
|
||||||
|
| .yml | 479 | 2.4% |
|
||||||
|
| .toml | 290 | 1.4% |
|
||||||
|
| .css | 212 | 1.1% |
|
||||||
|
| .gitignore | 199 | 1.0% |
|
||||||
|
| .md | 75 | 0.4% |
|
||||||
|
| .cfg | 73 | 0.4% |
|
||||||
|
| .sh | 48 | 0.2% |
|
||||||
|
| .mako | 36 | 0.2% |
|
||||||
|
| .LICENSE | 21 | 0.1% |
|
||||||
|
| .conf | 17 | 0.1% |
|
||||||
|
| .Gemfile | 4 | 0.0% |
|
||||||
|
| .svg | 3 | 0.0% |
|
||||||
|
| .new | 1 | 0.0% |
|
||||||
|
<!-- LINE-COUNT-END -->
|
||||||
|
|||||||
@@ -21,12 +21,10 @@
|
|||||||
alembic
|
alembic
|
||||||
apprise
|
apprise
|
||||||
apscheduler
|
apscheduler
|
||||||
confluent-kafka
|
|
||||||
fastapi
|
fastapi
|
||||||
fastapi-cli
|
fastapi-cli
|
||||||
httpx
|
httpx
|
||||||
mypy
|
mypy
|
||||||
orjson
|
|
||||||
polars
|
polars
|
||||||
psycopg
|
psycopg
|
||||||
pydantic
|
pydantic
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -1,50 +0,0 @@
|
|||||||
"""adding FailedIngestion.
|
|
||||||
|
|
||||||
Revision ID: 2f43120e3ffc
|
|
||||||
Revises: f99be864fe69
|
|
||||||
Create Date: 2026-03-24 23:46:17.277897
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from typing import TYPE_CHECKING
|
|
||||||
|
|
||||||
import sqlalchemy as sa
|
|
||||||
from alembic import op
|
|
||||||
|
|
||||||
from python.orm import DataScienceDevBase
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from collections.abc import Sequence
|
|
||||||
|
|
||||||
# revision identifiers, used by Alembic.
|
|
||||||
revision: str = "2f43120e3ffc"
|
|
||||||
down_revision: str | None = "f99be864fe69"
|
|
||||||
branch_labels: str | Sequence[str] | None = None
|
|
||||||
depends_on: str | Sequence[str] | None = None
|
|
||||||
|
|
||||||
schema = DataScienceDevBase.schema_name
|
|
||||||
|
|
||||||
|
|
||||||
def upgrade() -> None:
|
|
||||||
"""Upgrade."""
|
|
||||||
# ### commands auto generated by Alembic - please adjust! ###
|
|
||||||
op.create_table(
|
|
||||||
"failed_ingestion",
|
|
||||||
sa.Column("raw_line", sa.Text(), nullable=False),
|
|
||||||
sa.Column("error", sa.Text(), nullable=False),
|
|
||||||
sa.Column("id", sa.Integer(), nullable=False),
|
|
||||||
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
|
|
||||||
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
|
|
||||||
sa.PrimaryKeyConstraint("id", name=op.f("pk_failed_ingestion")),
|
|
||||||
schema=schema,
|
|
||||||
)
|
|
||||||
# ### end Alembic commands ###
|
|
||||||
|
|
||||||
|
|
||||||
def downgrade() -> None:
|
|
||||||
"""Downgrade."""
|
|
||||||
# ### commands auto generated by Alembic - please adjust! ###
|
|
||||||
op.drop_table("failed_ingestion", schema=schema)
|
|
||||||
# ### end Alembic commands ###
|
|
||||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@@ -1,80 +0,0 @@
|
|||||||
"""Attach all partition tables to the posts parent table.
|
|
||||||
|
|
||||||
Alembic autogenerate creates partition tables as standalone tables but does not
|
|
||||||
emit the ALTER TABLE ... ATTACH PARTITION statements needed for PostgreSQL to
|
|
||||||
route inserts to the correct partition.
|
|
||||||
|
|
||||||
Revision ID: a1b2c3d4e5f6
|
|
||||||
Revises: 605b1794838f
|
|
||||||
Create Date: 2026-03-25 10:00:00.000000
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from typing import TYPE_CHECKING
|
|
||||||
|
|
||||||
from alembic import op
|
|
||||||
from sqlalchemy import text
|
|
||||||
|
|
||||||
from python.orm import DataScienceDevBase
|
|
||||||
from python.orm.data_science_dev.posts.partitions import (
|
|
||||||
PARTITION_END_YEAR,
|
|
||||||
PARTITION_START_YEAR,
|
|
||||||
iso_weeks_in_year,
|
|
||||||
week_bounds,
|
|
||||||
)
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from collections.abc import Sequence
|
|
||||||
|
|
||||||
# revision identifiers, used by Alembic.
|
|
||||||
revision: str = "a1b2c3d4e5f6"
|
|
||||||
down_revision: str | None = "605b1794838f"
|
|
||||||
branch_labels: str | Sequence[str] | None = None
|
|
||||||
depends_on: str | Sequence[str] | None = None
|
|
||||||
|
|
||||||
schema = DataScienceDevBase.schema_name
|
|
||||||
|
|
||||||
ALREADY_ATTACHED_QUERY = text("""
|
|
||||||
SELECT inhrelid::regclass::text
|
|
||||||
FROM pg_inherits
|
|
||||||
WHERE inhparent = :parent::regclass
|
|
||||||
""")
|
|
||||||
|
|
||||||
|
|
||||||
def upgrade() -> None:
|
|
||||||
"""Attach all weekly partition tables to the posts parent table."""
|
|
||||||
connection = op.get_bind()
|
|
||||||
already_attached = {
|
|
||||||
row[0]
|
|
||||||
for row in connection.execute(
|
|
||||||
ALREADY_ATTACHED_QUERY, {"parent": f"{schema}.posts"}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
for year in range(PARTITION_START_YEAR, PARTITION_END_YEAR + 1):
|
|
||||||
for week in range(1, iso_weeks_in_year(year) + 1):
|
|
||||||
table_name = f"posts_{year}_{week:02d}"
|
|
||||||
qualified_name = f"{schema}.{table_name}"
|
|
||||||
if qualified_name in already_attached:
|
|
||||||
continue
|
|
||||||
start, end = week_bounds(year, week)
|
|
||||||
start_str = start.strftime("%Y-%m-%d %H:%M:%S")
|
|
||||||
end_str = end.strftime("%Y-%m-%d %H:%M:%S")
|
|
||||||
op.execute(
|
|
||||||
f"ALTER TABLE {schema}.posts "
|
|
||||||
f"ATTACH PARTITION {qualified_name} "
|
|
||||||
f"FOR VALUES FROM ('{start_str}') TO ('{end_str}')"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def downgrade() -> None:
|
|
||||||
"""Detach all weekly partition tables from the posts parent table."""
|
|
||||||
for year in range(PARTITION_START_YEAR, PARTITION_END_YEAR + 1):
|
|
||||||
for week in range(1, iso_weeks_in_year(year) + 1):
|
|
||||||
table_name = f"posts_{year}_{week:02d}"
|
|
||||||
op.execute(
|
|
||||||
f"ALTER TABLE {schema}.posts "
|
|
||||||
f"DETACH PARTITION {schema}.{table_name}"
|
|
||||||
)
|
|
||||||
@@ -3,7 +3,6 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import re
|
|
||||||
import sys
|
import sys
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import TYPE_CHECKING, Any, Literal
|
from typing import TYPE_CHECKING, Any, Literal
|
||||||
@@ -82,7 +81,6 @@ def include_name(
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
if type_ == "schema":
|
if type_ == "schema":
|
||||||
# allows a database with multiple schemas to have separate alembic revisions
|
|
||||||
return name == target_metadata.schema
|
return name == target_metadata.schema
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|||||||
@@ -1,3 +0,0 @@
|
|||||||
"""Data science CLI tools."""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
@@ -1,104 +0,0 @@
|
|||||||
"""Utilities for converting Bluesky identifiers to numeric database IDs.
|
|
||||||
|
|
||||||
Handles DID-to-user_id hashing, TID-to-post_id decoding, and AT-URI parsing.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import hashlib
|
|
||||||
|
|
||||||
TID_CHARSET = "234567abcdefghijklmnopqrstuvwxyz"
|
|
||||||
_TID_LENGTH = 13
|
|
||||||
_BIGINT_MASK = 0x7FFFFFFFFFFFFFFF
|
|
||||||
_AT_URI_SEGMENT_COUNT = 3
|
|
||||||
|
|
||||||
|
|
||||||
def did_to_user_id(did: str) -> int:
|
|
||||||
"""Convert a DID string to a deterministic 63-bit integer for user_id.
|
|
||||||
|
|
||||||
Uses SHA-256, truncated to 63 bits (positive signed BigInteger range).
|
|
||||||
Collision probability is negligible at Bluesky's scale (~tens of millions of users).
|
|
||||||
|
|
||||||
Args:
|
|
||||||
did: A Bluesky DID string, e.g. "did:plc:abc123".
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A positive 63-bit integer suitable for BigInteger storage.
|
|
||||||
"""
|
|
||||||
digest = hashlib.sha256(did.encode()).digest()
|
|
||||||
return int.from_bytes(digest[:8], "big") & _BIGINT_MASK
|
|
||||||
|
|
||||||
|
|
||||||
def tid_to_integer(tid: str) -> int:
|
|
||||||
"""Decode a Bluesky TID (base32-sortbase) into a 64-bit integer for post_id.
|
|
||||||
|
|
||||||
TIDs are 13-character, base32-sortbase encoded identifiers that encode a
|
|
||||||
microsecond timestamp plus a clock ID. They are globally unique by construction.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
tid: A 13-character TID string, e.g. "3abc2defghijk".
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A positive integer suitable for BigInteger storage.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
ValueError: If the TID is malformed (wrong length or invalid characters).
|
|
||||||
"""
|
|
||||||
if len(tid) != _TID_LENGTH:
|
|
||||||
message = f"TID must be {_TID_LENGTH} characters, got {len(tid)}: {tid!r}"
|
|
||||||
raise ValueError(message)
|
|
||||||
|
|
||||||
result = 0
|
|
||||||
for char in tid:
|
|
||||||
index = TID_CHARSET.find(char)
|
|
||||||
if index == -1:
|
|
||||||
message = f"Invalid character {char!r} in TID {tid!r}"
|
|
||||||
raise ValueError(message)
|
|
||||||
result = result * 32 + index
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
def parse_at_uri(uri: str) -> tuple[str, str, str]:
|
|
||||||
"""Parse an AT-URI into its components.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
uri: An AT-URI string, e.g. "at://did:plc:abc123/app.bsky.feed.post/3abc2defghijk".
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A tuple of (did, collection, rkey).
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
ValueError: If the URI doesn't have the expected format.
|
|
||||||
"""
|
|
||||||
stripped = uri.removeprefix("at://")
|
|
||||||
parts = stripped.split("/", maxsplit=2)
|
|
||||||
if len(parts) != _AT_URI_SEGMENT_COUNT:
|
|
||||||
message = f"Expected {_AT_URI_SEGMENT_COUNT} path segments in AT-URI, got {len(parts)}: {uri!r}"
|
|
||||||
raise ValueError(message)
|
|
||||||
return parts[0], parts[1], parts[2]
|
|
||||||
|
|
||||||
|
|
||||||
def post_id_from_uri(uri: str) -> int:
|
|
||||||
"""Extract and decode the post_id (TID) from an AT-URI.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
uri: An AT-URI pointing to a post.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The post_id as an integer.
|
|
||||||
"""
|
|
||||||
_did, _collection, rkey = parse_at_uri(uri)
|
|
||||||
return tid_to_integer(rkey)
|
|
||||||
|
|
||||||
|
|
||||||
def user_id_from_uri(uri: str) -> int:
|
|
||||||
"""Extract and hash the user_id (DID) from an AT-URI.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
uri: An AT-URI pointing to a post.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The user_id as an integer.
|
|
||||||
"""
|
|
||||||
did, _collection, _rkey = parse_at_uri(uri)
|
|
||||||
return did_to_user_id(did)
|
|
||||||
@@ -1,143 +0,0 @@
|
|||||||
"""Transform Bluesky Jetstream messages into rows matching the Posts table schema."""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
from python.data_science.bluesky_ids import (
|
|
||||||
did_to_user_id,
|
|
||||||
post_id_from_uri,
|
|
||||||
tid_to_integer,
|
|
||||||
user_id_from_uri,
|
|
||||||
)
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
INSTANCE = "bsky"
|
|
||||||
POST_COLLECTION = "app.bsky.feed.post"
|
|
||||||
EMBED_RECORD_TYPE = "app.bsky.embed.record"
|
|
||||||
EMBED_RECORD_WITH_MEDIA_TYPE = "app.bsky.embed.recordWithMedia"
|
|
||||||
|
|
||||||
|
|
||||||
def transform_jetstream_post(message: dict) -> dict:
|
|
||||||
"""Transform a Jetstream commit message into a dict matching Posts table columns.
|
|
||||||
|
|
||||||
Expects a Jetstream message with kind=commit, operation=create,
|
|
||||||
collection=app.bsky.feed.post.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
message: The full Jetstream JSON message.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A dict with keys matching the Posts table columns.
|
|
||||||
"""
|
|
||||||
did = message["did"]
|
|
||||||
commit = message["commit"]
|
|
||||||
record = commit["record"]
|
|
||||||
|
|
||||||
row: dict = {
|
|
||||||
"post_id": tid_to_integer(commit["rkey"]),
|
|
||||||
"user_id": did_to_user_id(did),
|
|
||||||
"instance": INSTANCE,
|
|
||||||
"date": datetime.fromisoformat(record["createdAt"]),
|
|
||||||
"text": record.get("text", ""),
|
|
||||||
"langs": _extract_langs(record),
|
|
||||||
"like_count": 0,
|
|
||||||
"reply_count": 0,
|
|
||||||
"repost_count": 0,
|
|
||||||
"reply_to": None,
|
|
||||||
"replied_author": None,
|
|
||||||
"thread_root": None,
|
|
||||||
"thread_root_author": None,
|
|
||||||
"repost_from": None,
|
|
||||||
"reposted_author": None,
|
|
||||||
"quotes": None,
|
|
||||||
"quoted_author": None,
|
|
||||||
"labels": _extract_labels(record),
|
|
||||||
"sent_label": None,
|
|
||||||
"sent_score": None,
|
|
||||||
}
|
|
||||||
|
|
||||||
_extract_reply_refs(record, row)
|
|
||||||
_extract_quote_refs(record, row)
|
|
||||||
|
|
||||||
return row
|
|
||||||
|
|
||||||
|
|
||||||
def is_post_create(message: dict) -> bool:
|
|
||||||
"""Check if a Jetstream message is a post creation event.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
message: The full Jetstream JSON message.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
True if this is a create commit for app.bsky.feed.post.
|
|
||||||
"""
|
|
||||||
if message.get("kind") != "commit":
|
|
||||||
return False
|
|
||||||
commit = message.get("commit", {})
|
|
||||||
return commit.get("operation") == "create" and commit.get("collection") == POST_COLLECTION
|
|
||||||
|
|
||||||
|
|
||||||
def _extract_langs(record: dict) -> str | None:
|
|
||||||
"""Extract langs array as a JSON string, or None if absent."""
|
|
||||||
langs = record.get("langs")
|
|
||||||
if langs is None:
|
|
||||||
return None
|
|
||||||
return json.dumps(langs)
|
|
||||||
|
|
||||||
|
|
||||||
def _extract_labels(record: dict) -> str | None:
|
|
||||||
"""Extract self-labels as a JSON string, or None if absent."""
|
|
||||||
labels_obj = record.get("labels")
|
|
||||||
if labels_obj is None:
|
|
||||||
return None
|
|
||||||
values = labels_obj.get("values", [])
|
|
||||||
if not values:
|
|
||||||
return None
|
|
||||||
label_strings = [label.get("val", "") for label in values]
|
|
||||||
return json.dumps(label_strings)
|
|
||||||
|
|
||||||
|
|
||||||
def _extract_reply_refs(record: dict, row: dict) -> None:
|
|
||||||
"""Populate reply_to, replied_author, thread_root, thread_root_author from record.reply."""
|
|
||||||
reply = record.get("reply")
|
|
||||||
if reply is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
parent = reply.get("parent", {})
|
|
||||||
parent_uri = parent.get("uri")
|
|
||||||
if parent_uri:
|
|
||||||
row["reply_to"] = post_id_from_uri(parent_uri)
|
|
||||||
row["replied_author"] = user_id_from_uri(parent_uri)
|
|
||||||
|
|
||||||
root = reply.get("root", {})
|
|
||||||
root_uri = root.get("uri")
|
|
||||||
if root_uri:
|
|
||||||
row["thread_root"] = post_id_from_uri(root_uri)
|
|
||||||
row["thread_root_author"] = user_id_from_uri(root_uri)
|
|
||||||
|
|
||||||
|
|
||||||
def _extract_quote_refs(record: dict, row: dict) -> None:
|
|
||||||
"""Populate quotes and quoted_author from embed record references."""
|
|
||||||
embed = record.get("embed")
|
|
||||||
if embed is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
embed_type = embed.get("$type", "")
|
|
||||||
|
|
||||||
if embed_type == EMBED_RECORD_TYPE:
|
|
||||||
_set_quote_from_record(embed.get("record", {}), row)
|
|
||||||
elif embed_type == EMBED_RECORD_WITH_MEDIA_TYPE:
|
|
||||||
inner_record = embed.get("record", {}).get("record", {})
|
|
||||||
_set_quote_from_record(inner_record, row)
|
|
||||||
|
|
||||||
|
|
||||||
def _set_quote_from_record(record_ref: dict, row: dict) -> None:
|
|
||||||
"""Set quotes and quoted_author from a record reference object."""
|
|
||||||
uri = record_ref.get("uri")
|
|
||||||
if uri and POST_COLLECTION in uri:
|
|
||||||
row["quotes"] = post_id_from_uri(uri)
|
|
||||||
row["quoted_author"] = user_id_from_uri(uri)
|
|
||||||
@@ -1,203 +0,0 @@
|
|||||||
"""Kafka consumer that ingests Bluesky posts into the partitioned Posts table.
|
|
||||||
|
|
||||||
Consumes Jetstream messages from Kafka, transforms them into Posts rows,
|
|
||||||
and batch-inserts them into PostgreSQL with manual offset commits.
|
|
||||||
|
|
||||||
Usage:
|
|
||||||
firehose-consumer
|
|
||||||
firehose-consumer --kafka-servers kafka:9092 --batch-size 500
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
import signal
|
|
||||||
from os import getenv
|
|
||||||
from threading import Event
|
|
||||||
from typing import Annotated
|
|
||||||
|
|
||||||
import typer
|
|
||||||
from confluent_kafka import Consumer, KafkaError, KafkaException
|
|
||||||
from sqlalchemy.orm import Session
|
|
||||||
|
|
||||||
from python.data_science.bluesky_transform import is_post_create, transform_jetstream_post
|
|
||||||
from python.data_science.ingest_posts import ingest_batch
|
|
||||||
from python.orm.common import get_postgres_engine
|
|
||||||
from python.orm.data_science_dev.posts.failed_ingestion import FailedIngestion
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
DEFAULT_TOPIC = "bluesky.firehose.posts"
|
|
||||||
DEFAULT_KAFKA_SERVERS = "localhost:9092"
|
|
||||||
DEFAULT_GROUP_ID = "bluesky-posts-ingestor"
|
|
||||||
DEFAULT_BATCH_SIZE = 500
|
|
||||||
POLL_TIMEOUT_SECONDS = 5.0
|
|
||||||
|
|
||||||
shutdown_event = Event()
|
|
||||||
|
|
||||||
app = typer.Typer(help="Consume Bluesky posts from Kafka and ingest into PostgreSQL.")
|
|
||||||
|
|
||||||
|
|
||||||
@app.command()
|
|
||||||
def main(
|
|
||||||
kafka_servers: Annotated[str, typer.Option(help="Kafka bootstrap servers")] = "",
|
|
||||||
topic: Annotated[str, typer.Option(help="Kafka topic to consume from")] = "",
|
|
||||||
group_id: Annotated[str, typer.Option(help="Kafka consumer group ID")] = "",
|
|
||||||
batch_size: Annotated[int, typer.Option(help="Messages per DB insert batch")] = DEFAULT_BATCH_SIZE,
|
|
||||||
) -> None:
|
|
||||||
"""Consume Bluesky posts from Kafka and ingest into the partitioned posts table."""
|
|
||||||
logging.basicConfig(
|
|
||||||
level=logging.INFO,
|
|
||||||
format="%(asctime)s %(levelname)s %(message)s",
|
|
||||||
datefmt="%H:%M:%S",
|
|
||||||
)
|
|
||||||
|
|
||||||
servers = kafka_servers or getenv("KAFKA_BOOTSTRAP_SERVERS", DEFAULT_KAFKA_SERVERS)
|
|
||||||
topic_name = topic or getenv("BLUESKY_FIREHOSE_TOPIC", DEFAULT_TOPIC)
|
|
||||||
group = group_id or getenv("KAFKA_GROUP_ID", DEFAULT_GROUP_ID)
|
|
||||||
|
|
||||||
signal.signal(signal.SIGTERM, _handle_shutdown)
|
|
||||||
signal.signal(signal.SIGINT, _handle_shutdown)
|
|
||||||
|
|
||||||
consumer = _create_consumer(servers, group)
|
|
||||||
consumer.subscribe([topic_name])
|
|
||||||
|
|
||||||
engine = get_postgres_engine(name="DATA_SCIENCE_DEV")
|
|
||||||
total_inserted = 0
|
|
||||||
|
|
||||||
logger.info("Starting firehose consumer: topic=%s group=%s batch_size=%d", topic_name, group, batch_size)
|
|
||||||
|
|
||||||
try:
|
|
||||||
with Session(engine) as session:
|
|
||||||
while not shutdown_event.is_set():
|
|
||||||
inserted = _consume_batch(consumer, session, batch_size)
|
|
||||||
total_inserted += inserted
|
|
||||||
if inserted > 0:
|
|
||||||
logger.info("Batch inserted %d rows (total: %d)", inserted, total_inserted)
|
|
||||||
except KafkaException:
|
|
||||||
logger.exception("Fatal Kafka error")
|
|
||||||
finally:
|
|
||||||
logger.info("Closing consumer (total inserted: %d)", total_inserted)
|
|
||||||
consumer.close()
|
|
||||||
|
|
||||||
|
|
||||||
def _consume_batch(consumer: Consumer, session: Session, batch_size: int) -> int:
|
|
||||||
"""Poll a batch of messages, transform, and insert into the database.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
consumer: The Kafka consumer instance.
|
|
||||||
session: SQLAlchemy database session.
|
|
||||||
batch_size: Maximum number of messages to consume per batch.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Number of rows successfully inserted.
|
|
||||||
"""
|
|
||||||
messages = consumer.consume(num_messages=batch_size, timeout=POLL_TIMEOUT_SECONDS)
|
|
||||||
if not messages:
|
|
||||||
return 0
|
|
||||||
|
|
||||||
rows: list[dict] = []
|
|
||||||
for message in messages:
|
|
||||||
error = message.error()
|
|
||||||
if error is not None:
|
|
||||||
if error.code() == KafkaError._PARTITION_EOF: # noqa: SLF001 — confluent-kafka exposes this as a pseudo-private constant; no public alternative exists
|
|
||||||
continue
|
|
||||||
logger.error("Consumer error: %s", error)
|
|
||||||
continue
|
|
||||||
|
|
||||||
row = _safe_transform(message.value(), session)
|
|
||||||
if row is not None:
|
|
||||||
rows.append(row)
|
|
||||||
|
|
||||||
if not rows:
|
|
||||||
consumer.commit(asynchronous=False)
|
|
||||||
return 0
|
|
||||||
|
|
||||||
inserted = ingest_batch(session, rows)
|
|
||||||
consumer.commit(asynchronous=False)
|
|
||||||
return inserted
|
|
||||||
|
|
||||||
|
|
||||||
def _safe_transform(raw_value: bytes | None, session: Session) -> dict | None:
|
|
||||||
"""Transform a Kafka message value into a Posts row, logging failures.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
raw_value: Raw message bytes from Kafka.
|
|
||||||
session: SQLAlchemy session for logging failures.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A transformed row dict, or None if transformation failed.
|
|
||||||
"""
|
|
||||||
if raw_value is None:
|
|
||||||
return None
|
|
||||||
|
|
||||||
try:
|
|
||||||
message = json.loads(raw_value)
|
|
||||||
except (json.JSONDecodeError, UnicodeDecodeError):
|
|
||||||
logger.exception("Failed to decode Kafka message")
|
|
||||||
_log_failed_ingestion(session, raw_value, "JSON decode error")
|
|
||||||
return None
|
|
||||||
|
|
||||||
if not is_post_create(message):
|
|
||||||
return None
|
|
||||||
|
|
||||||
try:
|
|
||||||
return transform_jetstream_post(message)
|
|
||||||
except (KeyError, ValueError, TypeError):
|
|
||||||
logger.exception("Failed to transform Jetstream message")
|
|
||||||
_log_failed_ingestion(session, raw_value, "Transform error")
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _log_failed_ingestion(session: Session, raw_value: bytes, error: str) -> None:
|
|
||||||
"""Log a failed ingestion to the FailedIngestion table.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
session: SQLAlchemy session.
|
|
||||||
raw_value: The raw message bytes.
|
|
||||||
error: Description of the error.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
session.add(
|
|
||||||
FailedIngestion(
|
|
||||||
raw_line=raw_value.decode(errors="replace")[:10000],
|
|
||||||
error=error,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
session.commit()
|
|
||||||
except Exception:
|
|
||||||
session.rollback()
|
|
||||||
logger.exception("Failed to log ingestion failure")
|
|
||||||
|
|
||||||
|
|
||||||
def _create_consumer(servers: str, group: str) -> Consumer:
|
|
||||||
"""Create a configured Kafka consumer.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
servers: Kafka bootstrap servers string.
|
|
||||||
group: Consumer group ID.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A configured confluent_kafka.Consumer.
|
|
||||||
"""
|
|
||||||
config = {
|
|
||||||
"bootstrap.servers": servers,
|
|
||||||
"group.id": group,
|
|
||||||
"auto.offset.reset": "earliest",
|
|
||||||
"enable.auto.commit": False,
|
|
||||||
"max.poll.interval.ms": 300000,
|
|
||||||
"fetch.min.bytes": 1024,
|
|
||||||
"session.timeout.ms": 30000,
|
|
||||||
}
|
|
||||||
return Consumer(config)
|
|
||||||
|
|
||||||
|
|
||||||
def _handle_shutdown(_signum: int, _frame: object) -> None:
|
|
||||||
"""Signal handler to trigger graceful shutdown."""
|
|
||||||
logger.info("Shutdown signal received")
|
|
||||||
shutdown_event.set()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
app()
|
|
||||||
@@ -1,230 +0,0 @@
|
|||||||
"""Bluesky Jetstream firehose to Kafka producer.
|
|
||||||
|
|
||||||
Connects to the Bluesky Jetstream WebSocket API with zstd compression,
|
|
||||||
filters for post creation events, and produces them to a Kafka topic.
|
|
||||||
|
|
||||||
Usage:
|
|
||||||
firehose-producer
|
|
||||||
firehose-producer --kafka-servers kafka:9092 --topic bluesky.firehose.posts
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
import signal
|
|
||||||
from os import getenv
|
|
||||||
from threading import Event
|
|
||||||
from typing import Annotated
|
|
||||||
|
|
||||||
import typer
|
|
||||||
from compression import zstd
|
|
||||||
from confluent_kafka import KafkaError, KafkaException, Producer
|
|
||||||
from websockets.exceptions import ConnectionClosed
|
|
||||||
from websockets.sync.client import connect
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
JETSTREAM_URL = "wss://jetstream2.us-east.bsky.network/subscribe"
|
|
||||||
DEFAULT_TOPIC = "bluesky.firehose.posts"
|
|
||||||
DEFAULT_KAFKA_SERVERS = "localhost:9092"
|
|
||||||
POLL_INTERVAL = 100
|
|
||||||
POST_COLLECTION = "app.bsky.feed.post"
|
|
||||||
|
|
||||||
shutdown_event = Event()
|
|
||||||
|
|
||||||
app = typer.Typer(help="Stream Bluesky firehose posts into Kafka.")
|
|
||||||
|
|
||||||
|
|
||||||
@app.command()
|
|
||||||
def main(
|
|
||||||
kafka_servers: Annotated[str, typer.Option(help="Kafka bootstrap servers")] = "",
|
|
||||||
topic: Annotated[str, typer.Option(help="Kafka topic to produce to")] = "",
|
|
||||||
collections: Annotated[str, typer.Option(help="Comma-separated collections to subscribe to")] = POST_COLLECTION,
|
|
||||||
) -> None:
|
|
||||||
"""Connect to Bluesky Jetstream and produce post events to Kafka."""
|
|
||||||
logging.basicConfig(
|
|
||||||
level=logging.INFO,
|
|
||||||
format="%(asctime)s %(levelname)s %(message)s",
|
|
||||||
datefmt="%H:%M:%S",
|
|
||||||
)
|
|
||||||
|
|
||||||
servers = kafka_servers or getenv("KAFKA_BOOTSTRAP_SERVERS", DEFAULT_KAFKA_SERVERS)
|
|
||||||
topic_name = topic or getenv("BLUESKY_FIREHOSE_TOPIC", DEFAULT_TOPIC)
|
|
||||||
|
|
||||||
signal.signal(signal.SIGTERM, _handle_shutdown)
|
|
||||||
signal.signal(signal.SIGINT, _handle_shutdown)
|
|
||||||
|
|
||||||
producer = _create_producer(servers)
|
|
||||||
cursor: int | None = None
|
|
||||||
|
|
||||||
logger.info("Starting firehose producer → %s on %s", topic_name, servers)
|
|
||||||
|
|
||||||
while not shutdown_event.is_set():
|
|
||||||
try:
|
|
||||||
cursor = _stream_loop(producer, topic_name, collections, cursor)
|
|
||||||
except (ConnectionClosed, OSError):
|
|
||||||
logger.exception("WebSocket disconnected, reconnecting")
|
|
||||||
except KafkaException:
|
|
||||||
logger.exception("Kafka error, reconnecting")
|
|
||||||
|
|
||||||
if not shutdown_event.is_set():
|
|
||||||
logger.info("Reconnecting in 5 seconds (cursor=%s)", cursor)
|
|
||||||
shutdown_event.wait(timeout=5)
|
|
||||||
|
|
||||||
logger.info("Shutting down, flushing producer")
|
|
||||||
producer.flush(timeout=30)
|
|
||||||
logger.info("Producer shutdown complete")
|
|
||||||
|
|
||||||
|
|
||||||
def _stream_loop(
|
|
||||||
producer: Producer,
|
|
||||||
topic: str,
|
|
||||||
collections: str,
|
|
||||||
cursor: int | None,
|
|
||||||
) -> int | None:
|
|
||||||
"""Connect to Jetstream and stream messages to Kafka until disconnected.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
producer: The Kafka producer instance.
|
|
||||||
topic: Kafka topic name.
|
|
||||||
collections: Comma-separated AT Protocol collections to subscribe to.
|
|
||||||
cursor: Optional microsecond timestamp to resume from.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The last processed time_us cursor value.
|
|
||||||
"""
|
|
||||||
url = _build_jetstream_url(collections, cursor)
|
|
||||||
logger.info("Connecting to %s", url)
|
|
||||||
|
|
||||||
message_count = 0
|
|
||||||
last_cursor = cursor
|
|
||||||
|
|
||||||
with connect(url, additional_headers={"Accept-Encoding": "zstd"}) as websocket:
|
|
||||||
logger.info("Connected to Jetstream")
|
|
||||||
|
|
||||||
while not shutdown_event.is_set():
|
|
||||||
try:
|
|
||||||
raw_frame = websocket.recv(timeout=10)
|
|
||||||
except TimeoutError:
|
|
||||||
producer.poll(0)
|
|
||||||
continue
|
|
||||||
|
|
||||||
text = _decode_frame(raw_frame)
|
|
||||||
message = json.loads(text)
|
|
||||||
|
|
||||||
time_us = message.get("time_us")
|
|
||||||
if time_us is not None:
|
|
||||||
last_cursor = time_us
|
|
||||||
|
|
||||||
if not _is_post_create(message):
|
|
||||||
continue
|
|
||||||
|
|
||||||
did = message.get("did", "")
|
|
||||||
|
|
||||||
try:
|
|
||||||
producer.produce(
|
|
||||||
topic,
|
|
||||||
key=did.encode(),
|
|
||||||
value=text.encode() if isinstance(text, str) else text,
|
|
||||||
callback=_delivery_callback,
|
|
||||||
)
|
|
||||||
except BufferError:
|
|
||||||
logger.warning("Producer buffer full, flushing")
|
|
||||||
producer.flush(timeout=10)
|
|
||||||
producer.produce(
|
|
||||||
topic,
|
|
||||||
key=did.encode(),
|
|
||||||
value=text.encode() if isinstance(text, str) else text,
|
|
||||||
callback=_delivery_callback,
|
|
||||||
)
|
|
||||||
|
|
||||||
message_count += 1
|
|
||||||
if message_count % POLL_INTERVAL == 0:
|
|
||||||
producer.poll(0)
|
|
||||||
|
|
||||||
if message_count % 10000 == 0:
|
|
||||||
logger.info("Produced %d messages (cursor=%s)", message_count, last_cursor)
|
|
||||||
|
|
||||||
return last_cursor
|
|
||||||
|
|
||||||
|
|
||||||
def _build_jetstream_url(collections: str, cursor: int | None) -> str:
|
|
||||||
"""Build the Jetstream WebSocket URL with query parameters.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
collections: Comma-separated collection names.
|
|
||||||
cursor: Optional microsecond timestamp for resumption.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The full WebSocket URL.
|
|
||||||
"""
|
|
||||||
params = ["compress=true"]
|
|
||||||
for raw_collection in collections.split(","):
|
|
||||||
cleaned = raw_collection.strip()
|
|
||||||
if cleaned:
|
|
||||||
params.append(f"wantedCollections={cleaned}")
|
|
||||||
if cursor is not None:
|
|
||||||
params.append(f"cursor={cursor}")
|
|
||||||
return f"{JETSTREAM_URL}?{'&'.join(params)}"
|
|
||||||
|
|
||||||
|
|
||||||
def _decode_frame(frame: str | bytes) -> str:
|
|
||||||
"""Decode a WebSocket frame, decompressing zstd if binary.
|
|
||||||
|
|
||||||
Jetstream with compress=true sends zstd-compressed binary frames.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
frame: Raw WebSocket frame data.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The decoded JSON string.
|
|
||||||
"""
|
|
||||||
if isinstance(frame, bytes):
|
|
||||||
return zstd.decompress(frame).decode()
|
|
||||||
return frame
|
|
||||||
|
|
||||||
|
|
||||||
def _is_post_create(message: dict) -> bool:
|
|
||||||
"""Check if a Jetstream message is a post creation commit."""
|
|
||||||
if message.get("kind") != "commit":
|
|
||||||
return False
|
|
||||||
commit = message.get("commit", {})
|
|
||||||
return commit.get("operation") == "create" and commit.get("collection") == POST_COLLECTION
|
|
||||||
|
|
||||||
|
|
||||||
def _create_producer(servers: str) -> Producer:
|
|
||||||
"""Create a configured Kafka producer.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
servers: Kafka bootstrap servers string.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A configured confluent_kafka.Producer.
|
|
||||||
"""
|
|
||||||
config = {
|
|
||||||
"bootstrap.servers": servers,
|
|
||||||
"linger.ms": 50,
|
|
||||||
"batch.size": 65536,
|
|
||||||
"compression.type": "zstd",
|
|
||||||
"acks": "all",
|
|
||||||
"retries": 5,
|
|
||||||
"retry.backoff.ms": 500,
|
|
||||||
}
|
|
||||||
return Producer(config)
|
|
||||||
|
|
||||||
|
|
||||||
def _delivery_callback(error: KafkaError | None, _message: object) -> None:
|
|
||||||
"""Log delivery failures from the Kafka producer."""
|
|
||||||
if error is not None:
|
|
||||||
logger.error("Kafka delivery failed: %s", error)
|
|
||||||
|
|
||||||
|
|
||||||
def _handle_shutdown(_signum: int, _frame: object) -> None:
|
|
||||||
"""Signal handler to trigger graceful shutdown."""
|
|
||||||
logger.info("Shutdown signal received")
|
|
||||||
shutdown_event.set()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
app()
|
|
||||||
@@ -1,247 +0,0 @@
|
|||||||
"""Ingestion pipeline for loading JSONL post files into the weekly-partitioned posts table.
|
|
||||||
|
|
||||||
Usage:
|
|
||||||
ingest-posts /path/to/files/
|
|
||||||
ingest-posts /path/to/single_file.jsonl
|
|
||||||
ingest-posts /data/dir/ --workers 4 --batch-size 5000
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import logging
|
|
||||||
from datetime import UTC, datetime
|
|
||||||
from pathlib import Path # noqa: TC003 this is needed for typer
|
|
||||||
from typing import TYPE_CHECKING, Annotated
|
|
||||||
|
|
||||||
import orjson
|
|
||||||
import psycopg
|
|
||||||
import typer
|
|
||||||
|
|
||||||
from python.common import configure_logger
|
|
||||||
from python.orm.common import get_connection_info
|
|
||||||
from python.parallelize import parallelize_process
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from collections.abc import Iterator
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
app = typer.Typer(help="Ingest JSONL post files into the partitioned posts table.")
|
|
||||||
|
|
||||||
|
|
||||||
@app.command()
|
|
||||||
def main(
|
|
||||||
path: Annotated[Path, typer.Argument(help="Directory containing JSONL files, or a single JSONL file")],
|
|
||||||
batch_size: Annotated[int, typer.Option(help="Rows per INSERT batch")] = 10000,
|
|
||||||
workers: Annotated[int, typer.Option(help="Parallel workers for multi-file ingestion")] = 4,
|
|
||||||
pattern: Annotated[str, typer.Option(help="Glob pattern for JSONL files")] = "*.jsonl",
|
|
||||||
) -> None:
|
|
||||||
"""Ingest JSONL post files into the weekly-partitioned posts table."""
|
|
||||||
configure_logger(level="INFO")
|
|
||||||
|
|
||||||
logger.info("starting ingest-posts")
|
|
||||||
logger.info("path=%s batch_size=%d workers=%d pattern=%s", path, batch_size, workers, pattern)
|
|
||||||
if path.is_file():
|
|
||||||
ingest_file(path, batch_size=batch_size)
|
|
||||||
elif path.is_dir():
|
|
||||||
ingest_directory(path, batch_size=batch_size, max_workers=workers, pattern=pattern)
|
|
||||||
else:
|
|
||||||
typer.echo(f"Path does not exist: {path}", err=True)
|
|
||||||
raise typer.Exit(code=1)
|
|
||||||
|
|
||||||
logger.info("ingest-posts done")
|
|
||||||
|
|
||||||
|
|
||||||
def ingest_directory(
|
|
||||||
directory: Path,
|
|
||||||
*,
|
|
||||||
batch_size: int,
|
|
||||||
max_workers: int,
|
|
||||||
pattern: str = "*.jsonl",
|
|
||||||
) -> None:
|
|
||||||
"""Ingest all JSONL files in a directory using parallel workers."""
|
|
||||||
files = sorted(directory.glob(pattern))
|
|
||||||
if not files:
|
|
||||||
logger.warning("No JSONL files found in %s", directory)
|
|
||||||
return
|
|
||||||
|
|
||||||
logger.info("Found %d JSONL files to ingest", len(files))
|
|
||||||
|
|
||||||
kwargs_list = [{"path": fp, "batch_size": batch_size} for fp in files]
|
|
||||||
parallelize_process(ingest_file, kwargs_list, max_workers=max_workers)
|
|
||||||
|
|
||||||
|
|
||||||
SCHEMA = "main"
|
|
||||||
|
|
||||||
COLUMNS = (
|
|
||||||
"post_id",
|
|
||||||
"user_id",
|
|
||||||
"instance",
|
|
||||||
"date",
|
|
||||||
"text",
|
|
||||||
"langs",
|
|
||||||
"like_count",
|
|
||||||
"reply_count",
|
|
||||||
"repost_count",
|
|
||||||
"reply_to",
|
|
||||||
"replied_author",
|
|
||||||
"thread_root",
|
|
||||||
"thread_root_author",
|
|
||||||
"repost_from",
|
|
||||||
"reposted_author",
|
|
||||||
"quotes",
|
|
||||||
"quoted_author",
|
|
||||||
"labels",
|
|
||||||
"sent_label",
|
|
||||||
"sent_score",
|
|
||||||
)
|
|
||||||
|
|
||||||
INSERT_FROM_STAGING = f"""
|
|
||||||
INSERT INTO {SCHEMA}.posts ({", ".join(COLUMNS)})
|
|
||||||
SELECT {", ".join(COLUMNS)} FROM pg_temp.staging
|
|
||||||
ON CONFLICT (post_id, date) DO NOTHING
|
|
||||||
""" # noqa: S608
|
|
||||||
|
|
||||||
FAILED_INSERT = f"""
|
|
||||||
INSERT INTO {SCHEMA}.failed_ingestion (raw_line, error)
|
|
||||||
VALUES (%(raw_line)s, %(error)s)
|
|
||||||
""" # noqa: S608
|
|
||||||
|
|
||||||
|
|
||||||
def get_psycopg_connection() -> psycopg.Connection:
|
|
||||||
"""Create a raw psycopg3 connection from environment variables."""
|
|
||||||
database, host, port, username, password = get_connection_info("DATA_SCIENCE_DEV")
|
|
||||||
return psycopg.connect(
|
|
||||||
dbname=database,
|
|
||||||
host=host,
|
|
||||||
port=int(port),
|
|
||||||
user=username,
|
|
||||||
password=password,
|
|
||||||
autocommit=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def ingest_file(path: Path, *, batch_size: int) -> None:
|
|
||||||
"""Ingest a single JSONL file into the posts table."""
|
|
||||||
log_trigger = max(100_000 // batch_size, 1)
|
|
||||||
failed_lines: list[dict] = []
|
|
||||||
try:
|
|
||||||
with get_psycopg_connection() as connection:
|
|
||||||
for index, batch in enumerate(read_jsonl_batches(path, batch_size, failed_lines), 1):
|
|
||||||
ingest_batch(connection, batch)
|
|
||||||
if index % log_trigger == 0:
|
|
||||||
logger.info("Ingested %d batches (%d rows) from %s", index, index * batch_size, path)
|
|
||||||
|
|
||||||
if failed_lines:
|
|
||||||
logger.warning("Recording %d malformed lines from %s", len(failed_lines), path.name)
|
|
||||||
with connection.cursor() as cursor:
|
|
||||||
cursor.executemany(FAILED_INSERT, failed_lines)
|
|
||||||
connection.commit()
|
|
||||||
except Exception:
|
|
||||||
logger.exception("Failed to ingest file: %s", path)
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
def ingest_batch(connection: psycopg.Connection, batch: list[dict]) -> None:
|
|
||||||
"""COPY batch into a temp staging table, then INSERT ... ON CONFLICT into posts."""
|
|
||||||
if not batch:
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
with connection.cursor() as cursor:
|
|
||||||
cursor.execute(f"""
|
|
||||||
CREATE TEMP TABLE IF NOT EXISTS staging
|
|
||||||
(LIKE {SCHEMA}.posts INCLUDING DEFAULTS)
|
|
||||||
ON COMMIT DELETE ROWS
|
|
||||||
""")
|
|
||||||
cursor.execute("TRUNCATE pg_temp.staging")
|
|
||||||
|
|
||||||
with cursor.copy(f"COPY pg_temp.staging ({', '.join(COLUMNS)}) FROM STDIN") as copy:
|
|
||||||
for row in batch:
|
|
||||||
copy.write_row(tuple(row.get(column) for column in COLUMNS))
|
|
||||||
|
|
||||||
cursor.execute(INSERT_FROM_STAGING)
|
|
||||||
connection.commit()
|
|
||||||
except Exception as error:
|
|
||||||
connection.rollback()
|
|
||||||
|
|
||||||
if len(batch) == 1:
|
|
||||||
logger.exception("Skipping bad row post_id=%s", batch[0].get("post_id"))
|
|
||||||
with connection.cursor() as cursor:
|
|
||||||
cursor.execute(
|
|
||||||
FAILED_INSERT,
|
|
||||||
{
|
|
||||||
"raw_line": orjson.dumps(batch[0], default=str).decode(),
|
|
||||||
"error": str(error),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
connection.commit()
|
|
||||||
return
|
|
||||||
|
|
||||||
midpoint = len(batch) // 2
|
|
||||||
ingest_batch(connection, batch[:midpoint])
|
|
||||||
ingest_batch(connection, batch[midpoint:])
|
|
||||||
|
|
||||||
|
|
||||||
def read_jsonl_batches(file_path: Path, batch_size: int, failed_lines: list[dict]) -> Iterator[list[dict]]:
|
|
||||||
"""Stream a JSONL file and yield batches of transformed rows."""
|
|
||||||
batch: list[dict] = []
|
|
||||||
with file_path.open("r", encoding="utf-8") as handle:
|
|
||||||
for raw_line in handle:
|
|
||||||
line = raw_line.strip()
|
|
||||||
if not line:
|
|
||||||
continue
|
|
||||||
batch.extend(parse_line(line, file_path, failed_lines))
|
|
||||||
if len(batch) >= batch_size:
|
|
||||||
yield batch
|
|
||||||
batch = []
|
|
||||||
if batch:
|
|
||||||
yield batch
|
|
||||||
|
|
||||||
|
|
||||||
def parse_line(line: str, file_path: Path, failed_lines: list[dict]) -> Iterator[dict]:
|
|
||||||
"""Parse a JSONL line, handling concatenated JSON objects."""
|
|
||||||
try:
|
|
||||||
yield transform_row(orjson.loads(line))
|
|
||||||
except orjson.JSONDecodeError:
|
|
||||||
if "}{" not in line:
|
|
||||||
logger.warning("Skipping malformed line in %s: %s", file_path.name, line[:120])
|
|
||||||
failed_lines.append({"raw_line": line, "error": "malformed JSON"})
|
|
||||||
return
|
|
||||||
fragments = line.replace("}{", "}\n{").split("\n")
|
|
||||||
for fragment in fragments:
|
|
||||||
try:
|
|
||||||
yield transform_row(orjson.loads(fragment))
|
|
||||||
except (orjson.JSONDecodeError, KeyError, ValueError) as error:
|
|
||||||
logger.warning("Skipping malformed fragment in %s: %s", file_path.name, fragment[:120])
|
|
||||||
failed_lines.append({"raw_line": fragment, "error": str(error)})
|
|
||||||
except Exception as error:
|
|
||||||
logger.exception("Skipping bad row in %s: %s", file_path.name, line[:120])
|
|
||||||
failed_lines.append({"raw_line": line, "error": str(error)})
|
|
||||||
|
|
||||||
|
|
||||||
def transform_row(raw: dict) -> dict:
|
|
||||||
"""Transform a raw JSONL row into a dict matching the Posts table columns."""
|
|
||||||
raw["date"] = parse_date(raw["date"])
|
|
||||||
if raw.get("langs") is not None:
|
|
||||||
raw["langs"] = orjson.dumps(raw["langs"])
|
|
||||||
if raw.get("text") is not None:
|
|
||||||
raw["text"] = raw["text"].replace("\x00", "")
|
|
||||||
return raw
|
|
||||||
|
|
||||||
|
|
||||||
def parse_date(raw_date: int) -> datetime:
|
|
||||||
"""Parse compact YYYYMMDDHHmm integer into a naive datetime (input is UTC by spec)."""
|
|
||||||
return datetime(
|
|
||||||
raw_date // 100000000,
|
|
||||||
(raw_date // 1000000) % 100,
|
|
||||||
(raw_date // 10000) % 100,
|
|
||||||
(raw_date // 100) % 100,
|
|
||||||
raw_date % 100,
|
|
||||||
tzinfo=UTC,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
app()
|
|
||||||
@@ -90,13 +90,6 @@ DATABASES: dict[str, DatabaseConfig] = {
|
|||||||
base_class_name="SignalBotBase",
|
base_class_name="SignalBotBase",
|
||||||
models_module="python.orm.signal_bot.models",
|
models_module="python.orm.signal_bot.models",
|
||||||
),
|
),
|
||||||
"data_science_dev": DatabaseConfig(
|
|
||||||
env_prefix="DATA_SCIENCE_DEV",
|
|
||||||
version_location="python/alembic/data_science_dev/versions",
|
|
||||||
base_module="python.orm.data_science_dev.base",
|
|
||||||
base_class_name="DataScienceDevBase",
|
|
||||||
models_module="python.orm.data_science_dev.models",
|
|
||||||
),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,12 +1,10 @@
|
|||||||
"""ORM package exports."""
|
"""ORM package exports."""
|
||||||
|
|
||||||
from python.orm.data_science_dev.base import DataScienceDevBase
|
|
||||||
from python.orm.richie.base import RichieBase
|
from python.orm.richie.base import RichieBase
|
||||||
from python.orm.signal_bot.base import SignalBotBase
|
from python.orm.signal_bot.base import SignalBotBase
|
||||||
from python.orm.van_inventory.base import VanInventoryBase
|
from python.orm.van_inventory.base import VanInventoryBase
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"DataScienceDevBase",
|
|
||||||
"RichieBase",
|
"RichieBase",
|
||||||
"SignalBotBase",
|
"SignalBotBase",
|
||||||
"VanInventoryBase",
|
"VanInventoryBase",
|
||||||
|
|||||||
@@ -1,11 +0,0 @@
|
|||||||
"""Data science dev database ORM exports."""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from python.orm.data_science_dev.base import DataScienceDevBase, DataScienceDevTableBase, DataScienceDevTableBaseBig
|
|
||||||
|
|
||||||
__all__ = [
|
|
||||||
"DataScienceDevBase",
|
|
||||||
"DataScienceDevTableBase",
|
|
||||||
"DataScienceDevTableBaseBig",
|
|
||||||
]
|
|
||||||
@@ -1,52 +0,0 @@
|
|||||||
"""Data science dev database ORM base."""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
from sqlalchemy import BigInteger, DateTime, MetaData, func
|
|
||||||
from sqlalchemy.ext.declarative import AbstractConcreteBase
|
|
||||||
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
|
|
||||||
|
|
||||||
from python.orm.common import NAMING_CONVENTION
|
|
||||||
|
|
||||||
|
|
||||||
class DataScienceDevBase(DeclarativeBase):
|
|
||||||
"""Base class for data_science_dev database ORM models."""
|
|
||||||
|
|
||||||
schema_name = "main"
|
|
||||||
|
|
||||||
metadata = MetaData(
|
|
||||||
schema=schema_name,
|
|
||||||
naming_convention=NAMING_CONVENTION,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class _TableMixin:
|
|
||||||
"""Shared timestamp columns for all table bases."""
|
|
||||||
|
|
||||||
created: Mapped[datetime] = mapped_column(
|
|
||||||
DateTime(timezone=True),
|
|
||||||
server_default=func.now(),
|
|
||||||
)
|
|
||||||
updated: Mapped[datetime] = mapped_column(
|
|
||||||
DateTime(timezone=True),
|
|
||||||
server_default=func.now(),
|
|
||||||
onupdate=func.now(),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class DataScienceDevTableBase(_TableMixin, AbstractConcreteBase, DataScienceDevBase):
|
|
||||||
"""Table with Integer primary key."""
|
|
||||||
|
|
||||||
__abstract__ = True
|
|
||||||
|
|
||||||
id: Mapped[int] = mapped_column(primary_key=True)
|
|
||||||
|
|
||||||
|
|
||||||
class DataScienceDevTableBaseBig(_TableMixin, AbstractConcreteBase, DataScienceDevBase):
|
|
||||||
"""Table with BigInteger primary key."""
|
|
||||||
|
|
||||||
__abstract__ = True
|
|
||||||
|
|
||||||
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
|
|
||||||
@@ -1,10 +0,0 @@
|
|||||||
"""Data science dev database ORM models."""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from python.orm.data_science_dev.posts import partitions # noqa: F401 — registers partition classes in metadata
|
|
||||||
from python.orm.data_science_dev.posts.tables import Posts
|
|
||||||
|
|
||||||
__all__ = [
|
|
||||||
"Posts",
|
|
||||||
]
|
|
||||||
@@ -1,11 +0,0 @@
|
|||||||
"""Posts module — weekly-partitioned posts table and partition ORM models."""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from python.orm.data_science_dev.posts.failed_ingestion import FailedIngestion
|
|
||||||
from python.orm.data_science_dev.posts.tables import Posts
|
|
||||||
|
|
||||||
__all__ = [
|
|
||||||
"FailedIngestion",
|
|
||||||
"Posts",
|
|
||||||
]
|
|
||||||
@@ -1,33 +0,0 @@
|
|||||||
"""Shared column definitions for the posts partitioned table family."""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
from sqlalchemy import BigInteger, SmallInteger, Text
|
|
||||||
from sqlalchemy.orm import Mapped, mapped_column
|
|
||||||
|
|
||||||
|
|
||||||
class PostsColumns:
|
|
||||||
"""Mixin providing all posts columns. Used by both the parent table and partitions."""
|
|
||||||
|
|
||||||
post_id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
|
|
||||||
user_id: Mapped[int] = mapped_column(BigInteger)
|
|
||||||
instance: Mapped[str]
|
|
||||||
date: Mapped[datetime] = mapped_column(primary_key=True)
|
|
||||||
text: Mapped[str] = mapped_column(Text)
|
|
||||||
langs: Mapped[str | None]
|
|
||||||
like_count: Mapped[int]
|
|
||||||
reply_count: Mapped[int]
|
|
||||||
repost_count: Mapped[int]
|
|
||||||
reply_to: Mapped[int | None] = mapped_column(BigInteger)
|
|
||||||
replied_author: Mapped[int | None] = mapped_column(BigInteger)
|
|
||||||
thread_root: Mapped[int | None] = mapped_column(BigInteger)
|
|
||||||
thread_root_author: Mapped[int | None] = mapped_column(BigInteger)
|
|
||||||
repost_from: Mapped[int | None] = mapped_column(BigInteger)
|
|
||||||
reposted_author: Mapped[int | None] = mapped_column(BigInteger)
|
|
||||||
quotes: Mapped[int | None] = mapped_column(BigInteger)
|
|
||||||
quoted_author: Mapped[int | None] = mapped_column(BigInteger)
|
|
||||||
labels: Mapped[str | None]
|
|
||||||
sent_label: Mapped[int | None] = mapped_column(SmallInteger)
|
|
||||||
sent_score: Mapped[float | None]
|
|
||||||
@@ -1,17 +0,0 @@
|
|||||||
"""Table for storing JSONL lines that failed during post ingestion."""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from sqlalchemy import Text
|
|
||||||
from sqlalchemy.orm import Mapped, mapped_column
|
|
||||||
|
|
||||||
from python.orm.data_science_dev.base import DataScienceDevTableBase
|
|
||||||
|
|
||||||
|
|
||||||
class FailedIngestion(DataScienceDevTableBase):
|
|
||||||
"""Stores raw JSONL lines and their error messages when ingestion fails."""
|
|
||||||
|
|
||||||
__tablename__ = "failed_ingestion"
|
|
||||||
|
|
||||||
raw_line: Mapped[str] = mapped_column(Text)
|
|
||||||
error: Mapped[str] = mapped_column(Text)
|
|
||||||
@@ -1,71 +0,0 @@
|
|||||||
"""Dynamically generated ORM classes for each weekly partition of the posts table.
|
|
||||||
|
|
||||||
Each class maps to a PostgreSQL partition table (e.g. posts_2024_01).
|
|
||||||
These are real ORM models tracked by Alembic autogenerate.
|
|
||||||
|
|
||||||
Uses ISO week numbering (datetime.isocalendar().week). ISO years can have
|
|
||||||
52 or 53 weeks, and week boundaries are always Monday to Monday.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import sys
|
|
||||||
from datetime import UTC, datetime
|
|
||||||
|
|
||||||
from python.orm.data_science_dev.base import DataScienceDevBase
|
|
||||||
from python.orm.data_science_dev.posts.columns import PostsColumns
|
|
||||||
|
|
||||||
PARTITION_START_YEAR = 2023
|
|
||||||
PARTITION_END_YEAR = 2026
|
|
||||||
|
|
||||||
_current_module = sys.modules[__name__]
|
|
||||||
|
|
||||||
|
|
||||||
def iso_weeks_in_year(year: int) -> int:
|
|
||||||
"""Return the number of ISO weeks in a given year (52 or 53)."""
|
|
||||||
dec_28 = datetime(year, 12, 28, tzinfo=UTC)
|
|
||||||
return dec_28.isocalendar().week
|
|
||||||
|
|
||||||
|
|
||||||
def week_bounds(year: int, week: int) -> tuple[datetime, datetime]:
|
|
||||||
"""Return (start, end) datetimes for an ISO week.
|
|
||||||
|
|
||||||
Start = Monday 00:00:00 UTC of the given ISO week.
|
|
||||||
End = Monday 00:00:00 UTC of the following ISO week.
|
|
||||||
"""
|
|
||||||
start = datetime.fromisocalendar(year, week, 1).replace(tzinfo=UTC)
|
|
||||||
if week < iso_weeks_in_year(year):
|
|
||||||
end = datetime.fromisocalendar(year, week + 1, 1).replace(tzinfo=UTC)
|
|
||||||
else:
|
|
||||||
end = datetime.fromisocalendar(year + 1, 1, 1).replace(tzinfo=UTC)
|
|
||||||
return start, end
|
|
||||||
|
|
||||||
|
|
||||||
def _build_partition_classes() -> dict[str, type]:
|
|
||||||
"""Generate one ORM class per ISO week partition."""
|
|
||||||
classes: dict[str, type] = {}
|
|
||||||
|
|
||||||
for year in range(PARTITION_START_YEAR, PARTITION_END_YEAR + 1):
|
|
||||||
for week in range(1, iso_weeks_in_year(year) + 1):
|
|
||||||
class_name = f"PostsWeek{year}W{week:02d}"
|
|
||||||
table_name = f"posts_{year}_{week:02d}"
|
|
||||||
|
|
||||||
partition_class = type(
|
|
||||||
class_name,
|
|
||||||
(PostsColumns, DataScienceDevBase),
|
|
||||||
{
|
|
||||||
"__tablename__": table_name,
|
|
||||||
"__table_args__": ({"implicit_returning": False},),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
classes[class_name] = partition_class
|
|
||||||
|
|
||||||
return classes
|
|
||||||
|
|
||||||
|
|
||||||
# Generate all partition classes and register them on this module
|
|
||||||
_partition_classes = _build_partition_classes()
|
|
||||||
for _name, _cls in _partition_classes.items():
|
|
||||||
setattr(_current_module, _name, _cls)
|
|
||||||
__all__ = list(_partition_classes.keys())
|
|
||||||
@@ -1,13 +0,0 @@
|
|||||||
"""Posts parent table with PostgreSQL weekly range partitioning on date column."""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from python.orm.data_science_dev.base import DataScienceDevBase
|
|
||||||
from python.orm.data_science_dev.posts.columns import PostsColumns
|
|
||||||
|
|
||||||
|
|
||||||
class Posts(PostsColumns, DataScienceDevBase):
|
|
||||||
"""Parent partitioned table for posts, partitioned by week on `date`."""
|
|
||||||
|
|
||||||
__tablename__ = "posts"
|
|
||||||
__table_args__ = ({"postgresql_partition_by": "RANGE (date)"},)
|
|
||||||
@@ -15,7 +15,6 @@ sudo zpool add storage -o ashift=12 logs mirror
|
|||||||
sudo zpool create scratch -o ashift=12 -O acltype=posixacl -O atime=off -O dnodesize=auto -O xattr=sa -O compression=zstd -O encryption=aes-256-gcm -O keyformat=hex -O keylocation=file:///key -m /zfs/scratch
|
sudo zpool create scratch -o ashift=12 -O acltype=posixacl -O atime=off -O dnodesize=auto -O xattr=sa -O compression=zstd -O encryption=aes-256-gcm -O keyformat=hex -O keylocation=file:///key -m /zfs/scratch
|
||||||
|
|
||||||
# media datasets
|
# media datasets
|
||||||
sudo zfs create media/temp -o sync=disabled -o redundant_metadata=none
|
|
||||||
sudo zfs create media/secure -o encryption=aes-256-gcm -o keyformat=hex -o keylocation=file:///root/zfs.key
|
sudo zfs create media/secure -o encryption=aes-256-gcm -o keyformat=hex -o keylocation=file:///root/zfs.key
|
||||||
sudo zfs create media/secure/docker -o compression=zstd-9
|
sudo zfs create media/secure/docker -o compression=zstd-9
|
||||||
sudo zfs create media/secure/github-runners -o compression=zstd-9 -o sync=disabled
|
sudo zfs create media/secure/github-runners -o compression=zstd-9 -o sync=disabled
|
||||||
@@ -27,7 +26,7 @@ sudo zfs create media/secure/share -o mountpoint=/zfs/media/share -o exec=off
|
|||||||
|
|
||||||
# scratch datasets
|
# scratch datasets
|
||||||
sudo zfs create scratch/kafka -o mountpoint=/zfs/scratch/kafka -o recordsize=1M
|
sudo zfs create scratch/kafka -o mountpoint=/zfs/scratch/kafka -o recordsize=1M
|
||||||
sudo zfs create scratch/transmission -o mountpoint=/zfs/scratch/transmission -o recordsize=16k -o sync=disabled -o redundant_metadata=none
|
sudo zfs create scratch/transmission -o mountpoint=/zfs/scratch/transmission -o recordsize=16k -o sync=disabled
|
||||||
|
|
||||||
# storage datasets
|
# storage datasets
|
||||||
sudo zfs create storage/ollama -o recordsize=1M -o compression=zstd-19 -o sync=disabled
|
sudo zfs create storage/ollama -o recordsize=1M -o compression=zstd-19 -o sync=disabled
|
||||||
|
|||||||
@@ -1,96 +0,0 @@
|
|||||||
{
|
|
||||||
pkgs,
|
|
||||||
inputs,
|
|
||||||
...
|
|
||||||
}:
|
|
||||||
let
|
|
||||||
commonEnv = {
|
|
||||||
PYTHONPATH = "${inputs.self}";
|
|
||||||
KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
|
|
||||||
BLUESKY_FIREHOSE_TOPIC = "bluesky.firehose.posts";
|
|
||||||
};
|
|
||||||
commonServiceConfig = {
|
|
||||||
Type = "simple";
|
|
||||||
WorkingDirectory = "${inputs.self}";
|
|
||||||
User = "richie";
|
|
||||||
Group = "users";
|
|
||||||
Restart = "on-failure";
|
|
||||||
RestartSec = "10s";
|
|
||||||
StandardOutput = "journal";
|
|
||||||
StandardError = "journal";
|
|
||||||
NoNewPrivileges = true;
|
|
||||||
ProtectSystem = "strict";
|
|
||||||
ProtectHome = "read-only";
|
|
||||||
PrivateTmp = true;
|
|
||||||
ReadOnlyPaths = [ "${inputs.self}" ];
|
|
||||||
};
|
|
||||||
in
|
|
||||||
{
|
|
||||||
systemd.services.bluesky-firehose-topic-init = {
|
|
||||||
description = "Create Kafka topic for Bluesky firehose";
|
|
||||||
after = [ "apache-kafka.service" ];
|
|
||||||
requires = [ "apache-kafka.service" ];
|
|
||||||
wantedBy = [ "multi-user.target" ];
|
|
||||||
|
|
||||||
serviceConfig = {
|
|
||||||
Type = "oneshot";
|
|
||||||
RemainAfterExit = true;
|
|
||||||
ExecStart = pkgs.writeShellScript "create-bluesky-topic" ''
|
|
||||||
${pkgs.apacheKafka}/bin/kafka-topics.sh \
|
|
||||||
--bootstrap-server localhost:9092 \
|
|
||||||
--create \
|
|
||||||
--if-not-exists \
|
|
||||||
--topic bluesky.firehose.posts \
|
|
||||||
--partitions 6 \
|
|
||||||
--replication-factor 1
|
|
||||||
'';
|
|
||||||
};
|
|
||||||
};
|
|
||||||
|
|
||||||
systemd.services.bluesky-firehose-producer = {
|
|
||||||
description = "Bluesky Jetstream to Kafka producer";
|
|
||||||
after = [
|
|
||||||
"network.target"
|
|
||||||
"apache-kafka.service"
|
|
||||||
"bluesky-firehose-topic-init.service"
|
|
||||||
];
|
|
||||||
requires = [
|
|
||||||
"apache-kafka.service"
|
|
||||||
"bluesky-firehose-topic-init.service"
|
|
||||||
];
|
|
||||||
wantedBy = [ "multi-user.target" ];
|
|
||||||
|
|
||||||
environment = commonEnv;
|
|
||||||
|
|
||||||
serviceConfig = commonServiceConfig // {
|
|
||||||
ExecStart = "${pkgs.my_python}/bin/python -m python.data_science.firehose_producer";
|
|
||||||
};
|
|
||||||
};
|
|
||||||
|
|
||||||
systemd.services.bluesky-firehose-consumer = {
|
|
||||||
description = "Bluesky Kafka to PostgreSQL consumer";
|
|
||||||
after = [
|
|
||||||
"network.target"
|
|
||||||
"apache-kafka.service"
|
|
||||||
"bluesky-firehose-topic-init.service"
|
|
||||||
"postgresql.service"
|
|
||||||
];
|
|
||||||
requires = [
|
|
||||||
"apache-kafka.service"
|
|
||||||
"bluesky-firehose-topic-init.service"
|
|
||||||
"postgresql.service"
|
|
||||||
];
|
|
||||||
wantedBy = [ "multi-user.target" ];
|
|
||||||
|
|
||||||
environment = commonEnv // {
|
|
||||||
DATA_SCIENCE_DEV_DB = "data_science_dev";
|
|
||||||
DATA_SCIENCE_DEV_USER = "richie";
|
|
||||||
DATA_SCIENCE_DEV_HOST = "/run/postgresql";
|
|
||||||
DATA_SCIENCE_DEV_PORT = "5432";
|
|
||||||
};
|
|
||||||
|
|
||||||
serviceConfig = commonServiceConfig // {
|
|
||||||
ExecStart = "${pkgs.my_python}/bin/python -m python.data_science.firehose_consumer";
|
|
||||||
};
|
|
||||||
};
|
|
||||||
}
|
|
||||||
@@ -3,10 +3,17 @@ let
|
|||||||
in
|
in
|
||||||
{
|
{
|
||||||
services.apache-kafka = {
|
services.apache-kafka = {
|
||||||
enable = false;
|
enable = true;
|
||||||
settings = {
|
settings = {
|
||||||
listeners = [ "PLAINTEXT://localhost:9092" ];
|
listeners = [ "PLAINTEXT://localhost:9092" ];
|
||||||
"log.dirs" = [ vars.kafka ];
|
"log.dirs" = [ vars.kafka ];
|
||||||
|
"num.partitions" = 6;
|
||||||
|
"default.replication.factor" = 1;
|
||||||
|
"log.retention.hours" = 168;
|
||||||
|
"log.retention.bytes" = 10737418240;
|
||||||
|
"log.segment.bytes" = 1073741824;
|
||||||
|
"log.cleanup.policy" = "delete";
|
||||||
|
"auto.create.topics.enable" = false;
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -39,10 +39,6 @@ in
|
|||||||
host postgres math ::1/128 trust
|
host postgres math ::1/128 trust
|
||||||
host postgres math 192.168.90.1/24 trust
|
host postgres math 192.168.90.1/24 trust
|
||||||
|
|
||||||
local data_science_dev math trust
|
|
||||||
host data_science_dev math 127.0.0.1/32 trust
|
|
||||||
host data_science_dev math ::1/128 trust
|
|
||||||
host data_science_dev math 192.168.90.1/24 trust
|
|
||||||
'';
|
'';
|
||||||
|
|
||||||
identMap = ''
|
identMap = ''
|
||||||
@@ -114,7 +110,6 @@ in
|
|||||||
}
|
}
|
||||||
];
|
];
|
||||||
ensureDatabases = [
|
ensureDatabases = [
|
||||||
"data_science_dev"
|
|
||||||
"hass"
|
"hass"
|
||||||
"gitea"
|
"gitea"
|
||||||
"math"
|
"math"
|
||||||
|
|||||||
86
tools/line_counter.py
Normal file
86
tools/line_counter.py
Normal file
@@ -0,0 +1,86 @@
|
|||||||
|
"""Count lines of code in the repository, grouped by file type."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import subprocess
|
||||||
|
from collections import defaultdict
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
def get_tracked_files() -> list[str]:
|
||||||
|
"""Get all git-tracked files."""
|
||||||
|
result = subprocess.run(
|
||||||
|
["git", "ls-files"], # noqa: S603, S607
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
check=True,
|
||||||
|
)
|
||||||
|
return [f for f in result.stdout.strip().splitlines() if f]
|
||||||
|
|
||||||
|
|
||||||
|
def count_lines(filepath: str) -> int:
|
||||||
|
"""Count lines in a file, returning 0 for binary files."""
|
||||||
|
try:
|
||||||
|
return len(Path(filepath).read_text(encoding="utf-8").splitlines())
|
||||||
|
except (UnicodeDecodeError, OSError):
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def count_lines_by_type() -> dict[str, int]:
|
||||||
|
"""Count lines grouped by file extension."""
|
||||||
|
lines_by_type: dict[str, int] = defaultdict(int)
|
||||||
|
for filepath in get_tracked_files():
|
||||||
|
ext = Path(filepath).suffix.lstrip(".")
|
||||||
|
if not ext:
|
||||||
|
ext = Path(filepath).name
|
||||||
|
lines_by_type[ext] += count_lines(filepath)
|
||||||
|
# Exclude binary/non-code files
|
||||||
|
for key in ("png", "lock"):
|
||||||
|
lines_by_type.pop(key, None)
|
||||||
|
return dict(sorted(lines_by_type.items(), key=lambda x: x[1], reverse=True))
|
||||||
|
|
||||||
|
|
||||||
|
def format_report() -> str:
|
||||||
|
"""Generate a formatted line count report."""
|
||||||
|
lines_by_type = count_lines_by_type()
|
||||||
|
total = sum(lines_by_type.values())
|
||||||
|
|
||||||
|
lines = [
|
||||||
|
f"This repo has **{total:,}** lines of technical debt.",
|
||||||
|
"",
|
||||||
|
"| File Type | Lines | Percentage |",
|
||||||
|
"|-----------|------:|-----------:|",
|
||||||
|
]
|
||||||
|
for ext, count in lines_by_type.items():
|
||||||
|
if count > 0:
|
||||||
|
pct = count / total * 100
|
||||||
|
prefix = "." if not ext.startswith(".") else ""
|
||||||
|
lines.append(f"| {prefix}{ext} | {count:,} | {pct:.1f}% |")
|
||||||
|
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
|
||||||
|
def update_readme() -> None:
|
||||||
|
"""Update README.md with the line count report."""
|
||||||
|
readme_path = Path("README.md")
|
||||||
|
report = format_report()
|
||||||
|
|
||||||
|
start_marker = "<!-- LINE-COUNT-START -->"
|
||||||
|
end_marker = "<!-- LINE-COUNT-END -->"
|
||||||
|
|
||||||
|
content = readme_path.read_text(encoding="utf-8")
|
||||||
|
|
||||||
|
section = f"{start_marker}\n{report}\n{end_marker}"
|
||||||
|
|
||||||
|
if start_marker in content:
|
||||||
|
start = content.index(start_marker)
|
||||||
|
end = content.index(end_marker) + len(end_marker)
|
||||||
|
content = content[:start] + section + content[end:]
|
||||||
|
else:
|
||||||
|
content = content.rstrip() + "\n\n" + section + "\n"
|
||||||
|
|
||||||
|
readme_path.write_text(content, encoding="utf-8")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
update_readme()
|
||||||
Reference in New Issue
Block a user