migrated to tanasty and added dead letter queue

This commit is contained in:
2026-03-10 09:03:43 -04:00
parent 754ced4822
commit f1e394565d
7 changed files with 153 additions and 27 deletions

View File

@@ -0,0 +1,54 @@
"""add dead_letter_message table.
Revision ID: a1b2c3d4e5f6
Revises: 99fec682516c
Create Date: 2026-03-10 12:00:00.000000
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
from python.orm import RichieBase
if TYPE_CHECKING:
from collections.abc import Sequence
# revision identifiers, used by Alembic.
revision: str = "a1b2c3d4e5f6"
down_revision: str | None = "99fec682516c"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
schema = RichieBase.schema_name
def upgrade() -> None:
"""Upgrade."""
op.create_table(
"dead_letter_message",
sa.Column("source", sa.String(), nullable=False),
sa.Column("message", sa.Text(), nullable=False),
sa.Column("received_at", sa.DateTime(timezone=True), nullable=False),
sa.Column(
"status",
postgresql.ENUM("UNPROCESSED", "PROCESSED", name="message_status", schema=schema),
nullable=False,
),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("created", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("pk_dead_letter_message")),
schema=schema,
)
def downgrade() -> None:
"""Downgrade."""
op.drop_table("dead_letter_message", schema=schema)
op.execute(sa.text(f"DROP TYPE IF EXISTS {schema}.message_status"))

View File

@@ -11,6 +11,7 @@ from python.orm.richie.contact import (
Need, Need,
RelationshipType, RelationshipType,
) )
from python.orm.richie.dead_letter_message import DeadLetterMessage
from python.orm.richie.signal_device import SignalDevice from python.orm.richie.signal_device import SignalDevice
__all__ = [ __all__ = [
@@ -18,6 +19,7 @@ __all__ = [
"Contact", "Contact",
"ContactNeed", "ContactNeed",
"ContactRelationship", "ContactRelationship",
"DeadLetterMessage",
"Legislator", "Legislator",
"Need", "Need",
"RelationshipType", "RelationshipType",

View File

@@ -0,0 +1,26 @@
"""Dead letter queue for Signal bot messages that fail processing."""
from __future__ import annotations
from datetime import datetime
from sqlalchemy import DateTime, Text
from sqlalchemy.dialects.postgresql import ENUM
from sqlalchemy.orm import Mapped, mapped_column
from python.orm.richie.base import TableBase
from python.signal_bot.models import MessageStatus
class DeadLetterMessage(TableBase):
"""A Signal message that failed processing and was sent to the dead letter queue."""
__tablename__ = "dead_letter_message"
source: Mapped[str]
message: Mapped[str] = mapped_column(Text)
received_at: Mapped[datetime] = mapped_column(DateTime(timezone=True))
status: Mapped[MessageStatus] = mapped_column(
ENUM(MessageStatus, name="message_status", create_type=True, schema="main"),
default=MessageStatus.UNPROCESSED,
)

View File

@@ -39,7 +39,7 @@ class DeviceRegistry:
device = self._get(phone_number) device = self._get(phone_number)
return device is not None and device.trust_level == TrustLevel.VERIFIED return device is not None and device.trust_level == TrustLevel.VERIFIED
def record_contact(self, phone_number: str, safety_number: str | None = None) -> SignalDevice: def record_contact(self, phone_number: str, safety_number: str | None = None) -> None:
"""Record seeing a device. Creates entry if new, updates last_seen.""" """Record seeing a device. Creates entry if new, updates last_seen."""
now = utcnow() now = utcnow()
with Session(self.engine) as session: with Session(self.engine) as session:
@@ -48,7 +48,7 @@ class DeviceRegistry:
).scalar_one_or_none() ).scalar_one_or_none()
if device: if device:
if device.safety_number != safety_number: if device.safety_number != safety_number and device.trust_level != TrustLevel.BLOCKED:
logger.warning(f"Safety number changed for {phone_number}, resetting to UNVERIFIED") logger.warning(f"Safety number changed for {phone_number}, resetting to UNVERIFIED")
device.safety_number = safety_number device.safety_number = safety_number
device.trust_level = TrustLevel.UNVERIFIED device.trust_level = TrustLevel.UNVERIFIED
@@ -64,8 +64,6 @@ class DeviceRegistry:
logger.info(f"New device registered: {phone_number}") logger.info(f"New device registered: {phone_number}")
session.commit() session.commit()
session.refresh(device)
return device
def has_safety_number(self, phone_number: str) -> bool: def has_safety_number(self, phone_number: str) -> bool:
"""Check if a device has a safety number on file.""" """Check if a device has a safety number on file."""

View File

@@ -3,18 +3,20 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
import time
from os import getenv from os import getenv
from typing import Annotated from typing import Annotated
import typer import typer
from sqlalchemy.orm import Session
from tenacity import before_sleep_log, retry, stop_after_attempt, wait_exponential
from python.common import configure_logger from python.common import configure_logger, utcnow
from python.orm.common import get_postgres_engine from python.orm.common import get_postgres_engine
from python.orm.richie.dead_letter_message import DeadLetterMessage
from python.signal_bot.commands.inventory import handle_inventory_update from python.signal_bot.commands.inventory import handle_inventory_update
from python.signal_bot.device_registry import DeviceRegistry from python.signal_bot.device_registry import DeviceRegistry
from python.signal_bot.llm_client import LLMClient from python.signal_bot.llm_client import LLMClient
from python.signal_bot.models import BotConfig, SignalMessage from python.signal_bot.models import BotConfig, MessageStatus, SignalMessage
from python.signal_bot.signal_client import SignalClient from python.signal_bot.signal_client import SignalClient
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -115,6 +117,38 @@ def dispatch(
action(signal, message, llm, registry, config, cmd) action(signal, message, llm, registry, config, cmd)
def _process_message(
message: SignalMessage,
signal: SignalClient,
llm: LLMClient,
registry: DeviceRegistry,
config: BotConfig,
) -> None:
"""Process a single message, sending it to the dead letter queue after repeated failures."""
max_attempts = config.max_message_attempts
for attempt in range(1, max_attempts + 1):
try:
safety_number = signal.get_safety_number(message.source)
registry.record_contact(message.source, safety_number)
dispatch(message, signal, llm, registry, config)
except Exception:
logger.exception(f"Failed to process message (attempt {attempt}/{max_attempts})")
else:
return
logger.error(f"Message from {message.source} failed {max_attempts} times, sending to dead letter queue")
with Session(config.engine) as session:
session.add(
DeadLetterMessage(
source=message.source,
message=message.message,
received_at=utcnow(),
status=MessageStatus.UNPROCESSED,
)
)
session.commit()
def run_loop( def run_loop(
config: BotConfig, config: BotConfig,
signal: SignalClient, signal: SignalClient,
@@ -124,25 +158,22 @@ def run_loop(
"""Listen for messages via WebSocket, reconnecting on failure.""" """Listen for messages via WebSocket, reconnecting on failure."""
logger.info("Bot started — listening via WebSocket") logger.info("Bot started — listening via WebSocket")
retries = 0 @retry(
delay = config.reconnect_delay stop=stop_after_attempt(config.max_retries),
wait=wait_exponential(multiplier=config.reconnect_delay, max=config.max_reconnect_delay),
before_sleep=before_sleep_log(logger, logging.WARNING),
reraise=True,
)
def _listen() -> None:
for message in signal.listen():
logger.info(f"Message from {message.source}: {message.message[:80]}")
_process_message(message, signal, llm, registry, config)
while retries < config.max_retries: try:
try: _listen()
for message in signal.listen(): except Exception:
logger.info(f"Message from {message.source}: {message.message[:80]}") logger.critical("Max retries exceeded, shutting down")
safety_number = signal.get_safety_number(message.source) raise
registry.record_contact(message.source, safety_number)
dispatch(message, signal, llm, registry, config)
retries = 0
delay = config.reconnect_delay
except Exception:
retries += 1
logger.exception(f"WebSocket error ({retries}/{config.max_retries}), reconnecting in {delay}s")
time.sleep(delay)
delay = min(delay * 2, config.max_reconnect_delay)
logger.critical("Max retries exceeded, shutting down")
def main(log_level: Annotated[str, typer.Option()] = "INFO") -> None: def main(log_level: Annotated[str, typer.Option()] = "INFO") -> None:
@@ -162,13 +193,14 @@ def main(log_level: Annotated[str, typer.Option()] = "INFO") -> None:
error = "INVENTORY_API_URL environment variable not set" error = "INVENTORY_API_URL environment variable not set"
raise ValueError(error) raise ValueError(error)
engine = get_postgres_engine(name="SIGNALBOT")
config = BotConfig( config = BotConfig(
signal_api_url=signal_api_url, signal_api_url=signal_api_url,
phone_number=phone_number, phone_number=phone_number,
inventory_api_url=inventory_api_url, inventory_api_url=inventory_api_url,
engine=engine,
) )
engine = get_postgres_engine(name="RICHIE")
llm_host = getenv("LLM_HOST") llm_host = getenv("LLM_HOST")
llm_model = getenv("LLM_MODEL", "qwen3-vl:32b") llm_model = getenv("LLM_MODEL", "qwen3-vl:32b")
llm_port = int(getenv("LLM_PORT", "11434")) llm_port = int(getenv("LLM_PORT", "11434"))

View File

@@ -6,7 +6,8 @@ from datetime import datetime # noqa: TC003 - pydantic needs this at runtime
from enum import StrEnum from enum import StrEnum
from typing import Any from typing import Any
from pydantic import BaseModel from pydantic import BaseModel, ConfigDict
from sqlalchemy.engine import Engine # noqa: TC002 - pydantic needs this at runtime
class TrustLevel(StrEnum): class TrustLevel(StrEnum):
@@ -17,6 +18,13 @@ class TrustLevel(StrEnum):
BLOCKED = "blocked" BLOCKED = "blocked"
class MessageStatus(StrEnum):
"""Dead letter queue message status."""
UNPROCESSED = "unprocessed"
PROCESSED = "processed"
class Device(BaseModel): class Device(BaseModel):
"""A registered device tracked by safety number.""" """A registered device tracked by safety number."""
@@ -66,10 +74,14 @@ class InventoryUpdate(BaseModel):
class BotConfig(BaseModel): class BotConfig(BaseModel):
"""Top-level bot configuration.""" """Top-level bot configuration."""
model_config = ConfigDict(arbitrary_types_allowed=True)
signal_api_url: str signal_api_url: str
phone_number: str phone_number: str
inventory_api_url: str inventory_api_url: str
engine: Engine
cmd_prefix: str = "!" cmd_prefix: str = "!"
reconnect_delay: int = 5 reconnect_delay: int = 5
max_reconnect_delay: int = 300 max_reconnect_delay: int = 300
max_retries: int = 10 max_retries: int = 10
max_message_attempts: int = 3

View File

@@ -142,10 +142,12 @@ class TestDispatch:
@pytest.fixture @pytest.fixture
def config(self): def config(self):
engine = create_engine("sqlite://")
return BotConfig( return BotConfig(
signal_api_url="http://localhost:8080", signal_api_url="http://localhost:8080",
phone_number="+1234567890", phone_number="+1234567890",
inventory_api_url="http://localhost:9090", inventory_api_url="http://localhost:9090",
engine=engine,
) )
def test_unverified_device_ignored(self, signal_mock, llm_mock, registry_mock, config): def test_unverified_device_ignored(self, signal_mock, llm_mock, registry_mock, config):