From f1e394565db28e4300c098a3b4022c669d076178 Mon Sep 17 00:00:00 2001 From: Richie Cahill Date: Tue, 10 Mar 2026 09:03:43 -0400 Subject: [PATCH] migrated to tanasty and added dead letter queue --- ..._dead_letter_message_table_a1b2c3d4e5f6.py | 54 +++++++++++++ python/orm/richie/__init__.py | 2 + python/orm/richie/dead_letter_message.py | 26 +++++++ python/signal_bot/device_registry.py | 6 +- python/signal_bot/main.py | 76 +++++++++++++------ python/signal_bot/models.py | 14 +++- tests/test_signal_bot.py | 2 + 7 files changed, 153 insertions(+), 27 deletions(-) create mode 100644 python/alembic/richie/versions/2026_03_10-add_dead_letter_message_table_a1b2c3d4e5f6.py create mode 100644 python/orm/richie/dead_letter_message.py diff --git a/python/alembic/richie/versions/2026_03_10-add_dead_letter_message_table_a1b2c3d4e5f6.py b/python/alembic/richie/versions/2026_03_10-add_dead_letter_message_table_a1b2c3d4e5f6.py new file mode 100644 index 0000000..0a7e615 --- /dev/null +++ b/python/alembic/richie/versions/2026_03_10-add_dead_letter_message_table_a1b2c3d4e5f6.py @@ -0,0 +1,54 @@ +"""add dead_letter_message table. + +Revision ID: a1b2c3d4e5f6 +Revises: 99fec682516c +Create Date: 2026-03-10 12:00:00.000000 + +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +from python.orm import RichieBase + +if TYPE_CHECKING: + from collections.abc import Sequence + +# revision identifiers, used by Alembic. +revision: str = "a1b2c3d4e5f6" +down_revision: str | None = "99fec682516c" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + +schema = RichieBase.schema_name + + +def upgrade() -> None: + """Upgrade.""" + op.create_table( + "dead_letter_message", + sa.Column("source", sa.String(), nullable=False), + sa.Column("message", sa.Text(), nullable=False), + sa.Column("received_at", sa.DateTime(timezone=True), nullable=False), + sa.Column( + "status", + postgresql.ENUM("UNPROCESSED", "PROCESSED", name="message_status", schema=schema), + nullable=False, + ), + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False), + sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False), + sa.PrimaryKeyConstraint("id", name=op.f("pk_dead_letter_message")), + schema=schema, + ) + + +def downgrade() -> None: + """Downgrade.""" + op.drop_table("dead_letter_message", schema=schema) + op.execute(sa.text(f"DROP TYPE IF EXISTS {schema}.message_status")) diff --git a/python/orm/richie/__init__.py b/python/orm/richie/__init__.py index 0ed7d9a..6448952 100644 --- a/python/orm/richie/__init__.py +++ b/python/orm/richie/__init__.py @@ -11,6 +11,7 @@ from python.orm.richie.contact import ( Need, RelationshipType, ) +from python.orm.richie.dead_letter_message import DeadLetterMessage from python.orm.richie.signal_device import SignalDevice __all__ = [ @@ -18,6 +19,7 @@ __all__ = [ "Contact", "ContactNeed", "ContactRelationship", + "DeadLetterMessage", "Legislator", "Need", "RelationshipType", diff --git a/python/orm/richie/dead_letter_message.py b/python/orm/richie/dead_letter_message.py new file mode 100644 index 0000000..2dee0aa --- /dev/null +++ b/python/orm/richie/dead_letter_message.py @@ -0,0 +1,26 @@ +"""Dead letter queue for Signal bot messages that fail processing.""" + +from __future__ import annotations + +from datetime import datetime + +from sqlalchemy import DateTime, Text +from sqlalchemy.dialects.postgresql import ENUM +from sqlalchemy.orm import Mapped, mapped_column + +from python.orm.richie.base import TableBase +from python.signal_bot.models import MessageStatus + + +class DeadLetterMessage(TableBase): + """A Signal message that failed processing and was sent to the dead letter queue.""" + + __tablename__ = "dead_letter_message" + + source: Mapped[str] + message: Mapped[str] = mapped_column(Text) + received_at: Mapped[datetime] = mapped_column(DateTime(timezone=True)) + status: Mapped[MessageStatus] = mapped_column( + ENUM(MessageStatus, name="message_status", create_type=True, schema="main"), + default=MessageStatus.UNPROCESSED, + ) diff --git a/python/signal_bot/device_registry.py b/python/signal_bot/device_registry.py index 367b1fb..460cf83 100644 --- a/python/signal_bot/device_registry.py +++ b/python/signal_bot/device_registry.py @@ -39,7 +39,7 @@ class DeviceRegistry: device = self._get(phone_number) return device is not None and device.trust_level == TrustLevel.VERIFIED - def record_contact(self, phone_number: str, safety_number: str | None = None) -> SignalDevice: + def record_contact(self, phone_number: str, safety_number: str | None = None) -> None: """Record seeing a device. Creates entry if new, updates last_seen.""" now = utcnow() with Session(self.engine) as session: @@ -48,7 +48,7 @@ class DeviceRegistry: ).scalar_one_or_none() if device: - if device.safety_number != safety_number: + if device.safety_number != safety_number and device.trust_level != TrustLevel.BLOCKED: logger.warning(f"Safety number changed for {phone_number}, resetting to UNVERIFIED") device.safety_number = safety_number device.trust_level = TrustLevel.UNVERIFIED @@ -64,8 +64,6 @@ class DeviceRegistry: logger.info(f"New device registered: {phone_number}") session.commit() - session.refresh(device) - return device def has_safety_number(self, phone_number: str) -> bool: """Check if a device has a safety number on file.""" diff --git a/python/signal_bot/main.py b/python/signal_bot/main.py index d05a98c..9574130 100644 --- a/python/signal_bot/main.py +++ b/python/signal_bot/main.py @@ -3,18 +3,20 @@ from __future__ import annotations import logging -import time from os import getenv from typing import Annotated import typer +from sqlalchemy.orm import Session +from tenacity import before_sleep_log, retry, stop_after_attempt, wait_exponential -from python.common import configure_logger +from python.common import configure_logger, utcnow from python.orm.common import get_postgres_engine +from python.orm.richie.dead_letter_message import DeadLetterMessage from python.signal_bot.commands.inventory import handle_inventory_update from python.signal_bot.device_registry import DeviceRegistry from python.signal_bot.llm_client import LLMClient -from python.signal_bot.models import BotConfig, SignalMessage +from python.signal_bot.models import BotConfig, MessageStatus, SignalMessage from python.signal_bot.signal_client import SignalClient logger = logging.getLogger(__name__) @@ -115,6 +117,38 @@ def dispatch( action(signal, message, llm, registry, config, cmd) +def _process_message( + message: SignalMessage, + signal: SignalClient, + llm: LLMClient, + registry: DeviceRegistry, + config: BotConfig, +) -> None: + """Process a single message, sending it to the dead letter queue after repeated failures.""" + max_attempts = config.max_message_attempts + for attempt in range(1, max_attempts + 1): + try: + safety_number = signal.get_safety_number(message.source) + registry.record_contact(message.source, safety_number) + dispatch(message, signal, llm, registry, config) + except Exception: + logger.exception(f"Failed to process message (attempt {attempt}/{max_attempts})") + else: + return + + logger.error(f"Message from {message.source} failed {max_attempts} times, sending to dead letter queue") + with Session(config.engine) as session: + session.add( + DeadLetterMessage( + source=message.source, + message=message.message, + received_at=utcnow(), + status=MessageStatus.UNPROCESSED, + ) + ) + session.commit() + + def run_loop( config: BotConfig, signal: SignalClient, @@ -124,25 +158,22 @@ def run_loop( """Listen for messages via WebSocket, reconnecting on failure.""" logger.info("Bot started — listening via WebSocket") - retries = 0 - delay = config.reconnect_delay + @retry( + stop=stop_after_attempt(config.max_retries), + wait=wait_exponential(multiplier=config.reconnect_delay, max=config.max_reconnect_delay), + before_sleep=before_sleep_log(logger, logging.WARNING), + reraise=True, + ) + def _listen() -> None: + for message in signal.listen(): + logger.info(f"Message from {message.source}: {message.message[:80]}") + _process_message(message, signal, llm, registry, config) - while retries < config.max_retries: - try: - for message in signal.listen(): - logger.info(f"Message from {message.source}: {message.message[:80]}") - safety_number = signal.get_safety_number(message.source) - registry.record_contact(message.source, safety_number) - dispatch(message, signal, llm, registry, config) - retries = 0 - delay = config.reconnect_delay - except Exception: - retries += 1 - logger.exception(f"WebSocket error ({retries}/{config.max_retries}), reconnecting in {delay}s") - time.sleep(delay) - delay = min(delay * 2, config.max_reconnect_delay) - - logger.critical("Max retries exceeded, shutting down") + try: + _listen() + except Exception: + logger.critical("Max retries exceeded, shutting down") + raise def main(log_level: Annotated[str, typer.Option()] = "INFO") -> None: @@ -162,13 +193,14 @@ def main(log_level: Annotated[str, typer.Option()] = "INFO") -> None: error = "INVENTORY_API_URL environment variable not set" raise ValueError(error) + engine = get_postgres_engine(name="SIGNALBOT") config = BotConfig( signal_api_url=signal_api_url, phone_number=phone_number, inventory_api_url=inventory_api_url, + engine=engine, ) - engine = get_postgres_engine(name="RICHIE") llm_host = getenv("LLM_HOST") llm_model = getenv("LLM_MODEL", "qwen3-vl:32b") llm_port = int(getenv("LLM_PORT", "11434")) diff --git a/python/signal_bot/models.py b/python/signal_bot/models.py index 1c78242..c9480b7 100644 --- a/python/signal_bot/models.py +++ b/python/signal_bot/models.py @@ -6,7 +6,8 @@ from datetime import datetime # noqa: TC003 - pydantic needs this at runtime from enum import StrEnum from typing import Any -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict +from sqlalchemy.engine import Engine # noqa: TC002 - pydantic needs this at runtime class TrustLevel(StrEnum): @@ -17,6 +18,13 @@ class TrustLevel(StrEnum): BLOCKED = "blocked" +class MessageStatus(StrEnum): + """Dead letter queue message status.""" + + UNPROCESSED = "unprocessed" + PROCESSED = "processed" + + class Device(BaseModel): """A registered device tracked by safety number.""" @@ -66,10 +74,14 @@ class InventoryUpdate(BaseModel): class BotConfig(BaseModel): """Top-level bot configuration.""" + model_config = ConfigDict(arbitrary_types_allowed=True) + signal_api_url: str phone_number: str inventory_api_url: str + engine: Engine cmd_prefix: str = "!" reconnect_delay: int = 5 max_reconnect_delay: int = 300 max_retries: int = 10 + max_message_attempts: int = 3 diff --git a/tests/test_signal_bot.py b/tests/test_signal_bot.py index 98cd298..533704d 100644 --- a/tests/test_signal_bot.py +++ b/tests/test_signal_bot.py @@ -142,10 +142,12 @@ class TestDispatch: @pytest.fixture def config(self): + engine = create_engine("sqlite://") return BotConfig( signal_api_url="http://localhost:8080", phone_number="+1234567890", inventory_api_url="http://localhost:9090", + engine=engine, ) def test_unverified_device_ignored(self, signal_mock, llm_mock, registry_mock, config):