diff --git a/overlays/default.nix b/overlays/default.nix index 1bcdcd0..2b6c693 100644 --- a/overlays/default.nix +++ b/overlays/default.nix @@ -21,6 +21,7 @@ alembic apprise apscheduler + confluent-kafka fastapi fastapi-cli httpx diff --git a/python/data_science/bluesky_ids.py b/python/data_science/bluesky_ids.py new file mode 100644 index 0000000..d7f4d4f --- /dev/null +++ b/python/data_science/bluesky_ids.py @@ -0,0 +1,104 @@ +"""Utilities for converting Bluesky identifiers to numeric database IDs. + +Handles DID-to-user_id hashing, TID-to-post_id decoding, and AT-URI parsing. +""" + +from __future__ import annotations + +import hashlib + +TID_CHARSET = "234567abcdefghijklmnopqrstuvwxyz" +_TID_LENGTH = 13 +_BIGINT_MASK = 0x7FFFFFFFFFFFFFFF +_AT_URI_SEGMENT_COUNT = 3 + + +def did_to_user_id(did: str) -> int: + """Convert a DID string to a deterministic 63-bit integer for user_id. + + Uses SHA-256, truncated to 63 bits (positive signed BigInteger range). + Collision probability is negligible at Bluesky's scale (~tens of millions of users). + + Args: + did: A Bluesky DID string, e.g. "did:plc:abc123". + + Returns: + A positive 63-bit integer suitable for BigInteger storage. + """ + digest = hashlib.sha256(did.encode()).digest() + return int.from_bytes(digest[:8], "big") & _BIGINT_MASK + + +def tid_to_integer(tid: str) -> int: + """Decode a Bluesky TID (base32-sortbase) into a 64-bit integer for post_id. + + TIDs are 13-character, base32-sortbase encoded identifiers that encode a + microsecond timestamp plus a clock ID. They are globally unique by construction. + + Args: + tid: A 13-character TID string, e.g. "3abc2defghijk". + + Returns: + A positive integer suitable for BigInteger storage. + + Raises: + ValueError: If the TID is malformed (wrong length or invalid characters). + """ + if len(tid) != _TID_LENGTH: + message = f"TID must be {_TID_LENGTH} characters, got {len(tid)}: {tid!r}" + raise ValueError(message) + + result = 0 + for char in tid: + index = TID_CHARSET.find(char) + if index == -1: + message = f"Invalid character {char!r} in TID {tid!r}" + raise ValueError(message) + result = result * 32 + index + return result + + +def parse_at_uri(uri: str) -> tuple[str, str, str]: + """Parse an AT-URI into its components. + + Args: + uri: An AT-URI string, e.g. "at://did:plc:abc123/app.bsky.feed.post/3abc2defghijk". + + Returns: + A tuple of (did, collection, rkey). + + Raises: + ValueError: If the URI doesn't have the expected format. + """ + stripped = uri.removeprefix("at://") + parts = stripped.split("/", maxsplit=2) + if len(parts) != _AT_URI_SEGMENT_COUNT: + message = f"Expected {_AT_URI_SEGMENT_COUNT} path segments in AT-URI, got {len(parts)}: {uri!r}" + raise ValueError(message) + return parts[0], parts[1], parts[2] + + +def post_id_from_uri(uri: str) -> int: + """Extract and decode the post_id (TID) from an AT-URI. + + Args: + uri: An AT-URI pointing to a post. + + Returns: + The post_id as an integer. + """ + _did, _collection, rkey = parse_at_uri(uri) + return tid_to_integer(rkey) + + +def user_id_from_uri(uri: str) -> int: + """Extract and hash the user_id (DID) from an AT-URI. + + Args: + uri: An AT-URI pointing to a post. + + Returns: + The user_id as an integer. + """ + did, _collection, _rkey = parse_at_uri(uri) + return did_to_user_id(did) diff --git a/python/data_science/bluesky_transform.py b/python/data_science/bluesky_transform.py new file mode 100644 index 0000000..3c4e19b --- /dev/null +++ b/python/data_science/bluesky_transform.py @@ -0,0 +1,143 @@ +"""Transform Bluesky Jetstream messages into rows matching the Posts table schema.""" + +from __future__ import annotations + +import json +import logging +from datetime import datetime + +from python.data_science.bluesky_ids import ( + did_to_user_id, + post_id_from_uri, + tid_to_integer, + user_id_from_uri, +) + +logger = logging.getLogger(__name__) + +INSTANCE = "bsky" +POST_COLLECTION = "app.bsky.feed.post" +EMBED_RECORD_TYPE = "app.bsky.embed.record" +EMBED_RECORD_WITH_MEDIA_TYPE = "app.bsky.embed.recordWithMedia" + + +def transform_jetstream_post(message: dict) -> dict: + """Transform a Jetstream commit message into a dict matching Posts table columns. + + Expects a Jetstream message with kind=commit, operation=create, + collection=app.bsky.feed.post. + + Args: + message: The full Jetstream JSON message. + + Returns: + A dict with keys matching the Posts table columns. + """ + did = message["did"] + commit = message["commit"] + record = commit["record"] + + row: dict = { + "post_id": tid_to_integer(commit["rkey"]), + "user_id": did_to_user_id(did), + "instance": INSTANCE, + "date": datetime.fromisoformat(record["createdAt"]), + "text": record.get("text", ""), + "langs": _extract_langs(record), + "like_count": 0, + "reply_count": 0, + "repost_count": 0, + "reply_to": None, + "replied_author": None, + "thread_root": None, + "thread_root_author": None, + "repost_from": None, + "reposted_author": None, + "quotes": None, + "quoted_author": None, + "labels": _extract_labels(record), + "sent_label": None, + "sent_score": None, + } + + _extract_reply_refs(record, row) + _extract_quote_refs(record, row) + + return row + + +def is_post_create(message: dict) -> bool: + """Check if a Jetstream message is a post creation event. + + Args: + message: The full Jetstream JSON message. + + Returns: + True if this is a create commit for app.bsky.feed.post. + """ + if message.get("kind") != "commit": + return False + commit = message.get("commit", {}) + return commit.get("operation") == "create" and commit.get("collection") == POST_COLLECTION + + +def _extract_langs(record: dict) -> str | None: + """Extract langs array as a JSON string, or None if absent.""" + langs = record.get("langs") + if langs is None: + return None + return json.dumps(langs) + + +def _extract_labels(record: dict) -> str | None: + """Extract self-labels as a JSON string, or None if absent.""" + labels_obj = record.get("labels") + if labels_obj is None: + return None + values = labels_obj.get("values", []) + if not values: + return None + label_strings = [label.get("val", "") for label in values] + return json.dumps(label_strings) + + +def _extract_reply_refs(record: dict, row: dict) -> None: + """Populate reply_to, replied_author, thread_root, thread_root_author from record.reply.""" + reply = record.get("reply") + if reply is None: + return + + parent = reply.get("parent", {}) + parent_uri = parent.get("uri") + if parent_uri: + row["reply_to"] = post_id_from_uri(parent_uri) + row["replied_author"] = user_id_from_uri(parent_uri) + + root = reply.get("root", {}) + root_uri = root.get("uri") + if root_uri: + row["thread_root"] = post_id_from_uri(root_uri) + row["thread_root_author"] = user_id_from_uri(root_uri) + + +def _extract_quote_refs(record: dict, row: dict) -> None: + """Populate quotes and quoted_author from embed record references.""" + embed = record.get("embed") + if embed is None: + return + + embed_type = embed.get("$type", "") + + if embed_type == EMBED_RECORD_TYPE: + _set_quote_from_record(embed.get("record", {}), row) + elif embed_type == EMBED_RECORD_WITH_MEDIA_TYPE: + inner_record = embed.get("record", {}).get("record", {}) + _set_quote_from_record(inner_record, row) + + +def _set_quote_from_record(record_ref: dict, row: dict) -> None: + """Set quotes and quoted_author from a record reference object.""" + uri = record_ref.get("uri") + if uri and POST_COLLECTION in uri: + row["quotes"] = post_id_from_uri(uri) + row["quoted_author"] = user_id_from_uri(uri) diff --git a/python/data_science/firehose_consumer.py b/python/data_science/firehose_consumer.py new file mode 100644 index 0000000..7507d49 --- /dev/null +++ b/python/data_science/firehose_consumer.py @@ -0,0 +1,203 @@ +"""Kafka consumer that ingests Bluesky posts into the partitioned Posts table. + +Consumes Jetstream messages from Kafka, transforms them into Posts rows, +and batch-inserts them into PostgreSQL with manual offset commits. + +Usage: + firehose-consumer + firehose-consumer --kafka-servers kafka:9092 --batch-size 500 +""" + +from __future__ import annotations + +import json +import logging +import signal +from os import getenv +from threading import Event +from typing import Annotated + +import typer +from confluent_kafka import Consumer, KafkaError, KafkaException +from sqlalchemy.orm import Session + +from python.data_science.bluesky_transform import is_post_create, transform_jetstream_post +from python.data_science.ingest_posts import ingest_batch +from python.orm.common import get_postgres_engine +from python.orm.data_science_dev.posts.failed_ingestion import FailedIngestion + +logger = logging.getLogger(__name__) + +DEFAULT_TOPIC = "bluesky.firehose.posts" +DEFAULT_KAFKA_SERVERS = "localhost:9092" +DEFAULT_GROUP_ID = "bluesky-posts-ingestor" +DEFAULT_BATCH_SIZE = 500 +POLL_TIMEOUT_SECONDS = 5.0 + +shutdown_event = Event() + +app = typer.Typer(help="Consume Bluesky posts from Kafka and ingest into PostgreSQL.") + + +@app.command() +def main( + kafka_servers: Annotated[str, typer.Option(help="Kafka bootstrap servers")] = "", + topic: Annotated[str, typer.Option(help="Kafka topic to consume from")] = "", + group_id: Annotated[str, typer.Option(help="Kafka consumer group ID")] = "", + batch_size: Annotated[int, typer.Option(help="Messages per DB insert batch")] = DEFAULT_BATCH_SIZE, +) -> None: + """Consume Bluesky posts from Kafka and ingest into the partitioned posts table.""" + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", + datefmt="%H:%M:%S", + ) + + servers = kafka_servers or getenv("KAFKA_BOOTSTRAP_SERVERS", DEFAULT_KAFKA_SERVERS) + topic_name = topic or getenv("BLUESKY_FIREHOSE_TOPIC", DEFAULT_TOPIC) + group = group_id or getenv("KAFKA_GROUP_ID", DEFAULT_GROUP_ID) + + signal.signal(signal.SIGTERM, _handle_shutdown) + signal.signal(signal.SIGINT, _handle_shutdown) + + consumer = _create_consumer(servers, group) + consumer.subscribe([topic_name]) + + engine = get_postgres_engine(name="DATA_SCIENCE_DEV") + total_inserted = 0 + + logger.info("Starting firehose consumer: topic=%s group=%s batch_size=%d", topic_name, group, batch_size) + + try: + with Session(engine) as session: + while not shutdown_event.is_set(): + inserted = _consume_batch(consumer, session, batch_size) + total_inserted += inserted + if inserted > 0: + logger.info("Batch inserted %d rows (total: %d)", inserted, total_inserted) + except KafkaException: + logger.exception("Fatal Kafka error") + finally: + logger.info("Closing consumer (total inserted: %d)", total_inserted) + consumer.close() + + +def _consume_batch(consumer: Consumer, session: Session, batch_size: int) -> int: + """Poll a batch of messages, transform, and insert into the database. + + Args: + consumer: The Kafka consumer instance. + session: SQLAlchemy database session. + batch_size: Maximum number of messages to consume per batch. + + Returns: + Number of rows successfully inserted. + """ + messages = consumer.consume(num_messages=batch_size, timeout=POLL_TIMEOUT_SECONDS) + if not messages: + return 0 + + rows: list[dict] = [] + for message in messages: + error = message.error() + if error is not None: + if error.code() == KafkaError._PARTITION_EOF: # noqa: SLF001 — confluent-kafka exposes this as a pseudo-private constant; no public alternative exists + continue + logger.error("Consumer error: %s", error) + continue + + row = _safe_transform(message.value(), session) + if row is not None: + rows.append(row) + + if not rows: + consumer.commit(asynchronous=False) + return 0 + + inserted = ingest_batch(session, rows) + consumer.commit(asynchronous=False) + return inserted + + +def _safe_transform(raw_value: bytes | None, session: Session) -> dict | None: + """Transform a Kafka message value into a Posts row, logging failures. + + Args: + raw_value: Raw message bytes from Kafka. + session: SQLAlchemy session for logging failures. + + Returns: + A transformed row dict, or None if transformation failed. + """ + if raw_value is None: + return None + + try: + message = json.loads(raw_value) + except (json.JSONDecodeError, UnicodeDecodeError): + logger.exception("Failed to decode Kafka message") + _log_failed_ingestion(session, raw_value, "JSON decode error") + return None + + if not is_post_create(message): + return None + + try: + return transform_jetstream_post(message) + except (KeyError, ValueError, TypeError): + logger.exception("Failed to transform Jetstream message") + _log_failed_ingestion(session, raw_value, "Transform error") + return None + + +def _log_failed_ingestion(session: Session, raw_value: bytes, error: str) -> None: + """Log a failed ingestion to the FailedIngestion table. + + Args: + session: SQLAlchemy session. + raw_value: The raw message bytes. + error: Description of the error. + """ + try: + session.add( + FailedIngestion( + raw_line=raw_value.decode(errors="replace")[:10000], + error=error, + ) + ) + session.commit() + except Exception: + session.rollback() + logger.exception("Failed to log ingestion failure") + + +def _create_consumer(servers: str, group: str) -> Consumer: + """Create a configured Kafka consumer. + + Args: + servers: Kafka bootstrap servers string. + group: Consumer group ID. + + Returns: + A configured confluent_kafka.Consumer. + """ + config = { + "bootstrap.servers": servers, + "group.id": group, + "auto.offset.reset": "earliest", + "enable.auto.commit": False, + "max.poll.interval.ms": 300000, + "fetch.min.bytes": 1024, + "session.timeout.ms": 30000, + } + return Consumer(config) + + +def _handle_shutdown(_signum: int, _frame: object) -> None: + """Signal handler to trigger graceful shutdown.""" + logger.info("Shutdown signal received") + shutdown_event.set() + + +if __name__ == "__main__": + app() diff --git a/python/data_science/firehose_producer.py b/python/data_science/firehose_producer.py new file mode 100644 index 0000000..cd0872c --- /dev/null +++ b/python/data_science/firehose_producer.py @@ -0,0 +1,230 @@ +"""Bluesky Jetstream firehose to Kafka producer. + +Connects to the Bluesky Jetstream WebSocket API with zstd compression, +filters for post creation events, and produces them to a Kafka topic. + +Usage: + firehose-producer + firehose-producer --kafka-servers kafka:9092 --topic bluesky.firehose.posts +""" + +from __future__ import annotations + +import json +import logging +import signal +from os import getenv +from threading import Event +from typing import Annotated + +import typer +from compression import zstd +from confluent_kafka import KafkaError, KafkaException, Producer +from websockets.exceptions import ConnectionClosed +from websockets.sync.client import connect + +logger = logging.getLogger(__name__) + +JETSTREAM_URL = "wss://jetstream2.us-east.bsky.network/subscribe" +DEFAULT_TOPIC = "bluesky.firehose.posts" +DEFAULT_KAFKA_SERVERS = "localhost:9092" +POLL_INTERVAL = 100 +POST_COLLECTION = "app.bsky.feed.post" + +shutdown_event = Event() + +app = typer.Typer(help="Stream Bluesky firehose posts into Kafka.") + + +@app.command() +def main( + kafka_servers: Annotated[str, typer.Option(help="Kafka bootstrap servers")] = "", + topic: Annotated[str, typer.Option(help="Kafka topic to produce to")] = "", + collections: Annotated[str, typer.Option(help="Comma-separated collections to subscribe to")] = POST_COLLECTION, +) -> None: + """Connect to Bluesky Jetstream and produce post events to Kafka.""" + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", + datefmt="%H:%M:%S", + ) + + servers = kafka_servers or getenv("KAFKA_BOOTSTRAP_SERVERS", DEFAULT_KAFKA_SERVERS) + topic_name = topic or getenv("BLUESKY_FIREHOSE_TOPIC", DEFAULT_TOPIC) + + signal.signal(signal.SIGTERM, _handle_shutdown) + signal.signal(signal.SIGINT, _handle_shutdown) + + producer = _create_producer(servers) + cursor: int | None = None + + logger.info("Starting firehose producer → %s on %s", topic_name, servers) + + while not shutdown_event.is_set(): + try: + cursor = _stream_loop(producer, topic_name, collections, cursor) + except (ConnectionClosed, OSError): + logger.exception("WebSocket disconnected, reconnecting") + except KafkaException: + logger.exception("Kafka error, reconnecting") + + if not shutdown_event.is_set(): + logger.info("Reconnecting in 5 seconds (cursor=%s)", cursor) + shutdown_event.wait(timeout=5) + + logger.info("Shutting down, flushing producer") + producer.flush(timeout=30) + logger.info("Producer shutdown complete") + + +def _stream_loop( + producer: Producer, + topic: str, + collections: str, + cursor: int | None, +) -> int | None: + """Connect to Jetstream and stream messages to Kafka until disconnected. + + Args: + producer: The Kafka producer instance. + topic: Kafka topic name. + collections: Comma-separated AT Protocol collections to subscribe to. + cursor: Optional microsecond timestamp to resume from. + + Returns: + The last processed time_us cursor value. + """ + url = _build_jetstream_url(collections, cursor) + logger.info("Connecting to %s", url) + + message_count = 0 + last_cursor = cursor + + with connect(url, additional_headers={"Accept-Encoding": "zstd"}) as websocket: + logger.info("Connected to Jetstream") + + while not shutdown_event.is_set(): + try: + raw_frame = websocket.recv(timeout=10) + except TimeoutError: + producer.poll(0) + continue + + text = _decode_frame(raw_frame) + message = json.loads(text) + + time_us = message.get("time_us") + if time_us is not None: + last_cursor = time_us + + if not _is_post_create(message): + continue + + did = message.get("did", "") + + try: + producer.produce( + topic, + key=did.encode(), + value=text.encode() if isinstance(text, str) else text, + callback=_delivery_callback, + ) + except BufferError: + logger.warning("Producer buffer full, flushing") + producer.flush(timeout=10) + producer.produce( + topic, + key=did.encode(), + value=text.encode() if isinstance(text, str) else text, + callback=_delivery_callback, + ) + + message_count += 1 + if message_count % POLL_INTERVAL == 0: + producer.poll(0) + + if message_count % 10000 == 0: + logger.info("Produced %d messages (cursor=%s)", message_count, last_cursor) + + return last_cursor + + +def _build_jetstream_url(collections: str, cursor: int | None) -> str: + """Build the Jetstream WebSocket URL with query parameters. + + Args: + collections: Comma-separated collection names. + cursor: Optional microsecond timestamp for resumption. + + Returns: + The full WebSocket URL. + """ + params = ["compress=true"] + for raw_collection in collections.split(","): + cleaned = raw_collection.strip() + if cleaned: + params.append(f"wantedCollections={cleaned}") + if cursor is not None: + params.append(f"cursor={cursor}") + return f"{JETSTREAM_URL}?{'&'.join(params)}" + + +def _decode_frame(frame: str | bytes) -> str: + """Decode a WebSocket frame, decompressing zstd if binary. + + Jetstream with compress=true sends zstd-compressed binary frames. + + Args: + frame: Raw WebSocket frame data. + + Returns: + The decoded JSON string. + """ + if isinstance(frame, bytes): + return zstd.decompress(frame).decode() + return frame + + +def _is_post_create(message: dict) -> bool: + """Check if a Jetstream message is a post creation commit.""" + if message.get("kind") != "commit": + return False + commit = message.get("commit", {}) + return commit.get("operation") == "create" and commit.get("collection") == POST_COLLECTION + + +def _create_producer(servers: str) -> Producer: + """Create a configured Kafka producer. + + Args: + servers: Kafka bootstrap servers string. + + Returns: + A configured confluent_kafka.Producer. + """ + config = { + "bootstrap.servers": servers, + "linger.ms": 50, + "batch.size": 65536, + "compression.type": "zstd", + "acks": "all", + "retries": 5, + "retry.backoff.ms": 500, + } + return Producer(config) + + +def _delivery_callback(error: KafkaError | None, _message: object) -> None: + """Log delivery failures from the Kafka producer.""" + if error is not None: + logger.error("Kafka delivery failed: %s", error) + + +def _handle_shutdown(_signum: int, _frame: object) -> None: + """Signal handler to trigger graceful shutdown.""" + logger.info("Shutdown signal received") + shutdown_event.set() + + +if __name__ == "__main__": + app() diff --git a/systems/jeeves/services/bluesky_firehose.nix b/systems/jeeves/services/bluesky_firehose.nix new file mode 100644 index 0000000..d3c29af --- /dev/null +++ b/systems/jeeves/services/bluesky_firehose.nix @@ -0,0 +1,96 @@ +{ + pkgs, + inputs, + ... +}: +let + commonEnv = { + PYTHONPATH = "${inputs.self}"; + KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"; + BLUESKY_FIREHOSE_TOPIC = "bluesky.firehose.posts"; + }; + commonServiceConfig = { + Type = "simple"; + WorkingDirectory = "${inputs.self}"; + User = "richie"; + Group = "users"; + Restart = "on-failure"; + RestartSec = "10s"; + StandardOutput = "journal"; + StandardError = "journal"; + NoNewPrivileges = true; + ProtectSystem = "strict"; + ProtectHome = "read-only"; + PrivateTmp = true; + ReadOnlyPaths = [ "${inputs.self}" ]; + }; +in +{ + systemd.services.bluesky-firehose-topic-init = { + description = "Create Kafka topic for Bluesky firehose"; + after = [ "apache-kafka.service" ]; + requires = [ "apache-kafka.service" ]; + wantedBy = [ "multi-user.target" ]; + + serviceConfig = { + Type = "oneshot"; + RemainAfterExit = true; + ExecStart = pkgs.writeShellScript "create-bluesky-topic" '' + ${pkgs.apacheKafka}/bin/kafka-topics.sh \ + --bootstrap-server localhost:9092 \ + --create \ + --if-not-exists \ + --topic bluesky.firehose.posts \ + --partitions 6 \ + --replication-factor 1 + ''; + }; + }; + + systemd.services.bluesky-firehose-producer = { + description = "Bluesky Jetstream to Kafka producer"; + after = [ + "network.target" + "apache-kafka.service" + "bluesky-firehose-topic-init.service" + ]; + requires = [ + "apache-kafka.service" + "bluesky-firehose-topic-init.service" + ]; + wantedBy = [ "multi-user.target" ]; + + environment = commonEnv; + + serviceConfig = commonServiceConfig // { + ExecStart = "${pkgs.my_python}/bin/python -m python.data_science.firehose_producer"; + }; + }; + + systemd.services.bluesky-firehose-consumer = { + description = "Bluesky Kafka to PostgreSQL consumer"; + after = [ + "network.target" + "apache-kafka.service" + "bluesky-firehose-topic-init.service" + "postgresql.service" + ]; + requires = [ + "apache-kafka.service" + "bluesky-firehose-topic-init.service" + "postgresql.service" + ]; + wantedBy = [ "multi-user.target" ]; + + environment = commonEnv // { + DATA_SCIENCE_DEV_DB = "data_science_dev"; + DATA_SCIENCE_DEV_USER = "richie"; + DATA_SCIENCE_DEV_HOST = "/run/postgresql"; + DATA_SCIENCE_DEV_PORT = "5432"; + }; + + serviceConfig = commonServiceConfig // { + ExecStart = "${pkgs.my_python}/bin/python -m python.data_science.firehose_consumer"; + }; + }; +}