From 42ede194726abc3e762dc1fa60eb8d01146efebb Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 8 Mar 2026 20:34:29 +0000 Subject: [PATCH] Replace polling with WebSocket for real-time Signal message reception Switch from polling /v1/receive every 2s to a persistent WebSocket connection at ws://.../v1/receive/. Messages now arrive instantly via the signal-cli-rest-api WebSocket endpoint. - Add `listen()` generator to SignalClient using websockets library - Extract `_parse_envelope()` as standalone function - Replace `run_loop` polling with WebSocket listener + reconnect logic - Remove `poll_interval` from BotConfig and CLI args - Add websockets to Nix overlay and pyproject.toml dependencies https://claude.ai/code/session_01AKXQBuVBsW7J1YbukDiQ7A --- overlays/default.nix | 1 + pyproject.toml | 1 + python/signal_bot/main.py | 15 +++---- python/signal_bot/models.py | 1 - python/signal_bot/signal_client.py | 72 ++++++++++++++++++------------ 5 files changed, 53 insertions(+), 37 deletions(-) diff --git a/overlays/default.nix b/overlays/default.nix index d0e6f61..82ea850 100644 --- a/overlays/default.nix +++ b/overlays/default.nix @@ -43,6 +43,7 @@ tinytuya typer types-requests + websockets ] ); }; diff --git a/pyproject.toml b/pyproject.toml index b8b3551..3f58e7b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ dependencies = [ "requests", "sqlalchemy", "typer", + "websockets", ] [project.scripts] diff --git a/python/signal_bot/main.py b/python/signal_bot/main.py index caf2e62..d142f30 100644 --- a/python/signal_bot/main.py +++ b/python/signal_bot/main.py @@ -31,6 +31,8 @@ Available commands: Send a receipt photo with the message "!inventory" to scan it.\ """ +RECONNECT_DELAY = 5 + def dispatch( message: SignalMessage, @@ -84,20 +86,19 @@ def run_loop( llm: LLMClient, registry: DeviceRegistry, ) -> None: - """Main polling loop.""" + """Listen for messages via WebSocket, reconnecting on failure.""" inventory_path = Path(config.inventory_file) - logger.info(f"Bot started — polling every {config.poll_interval}s") + logger.info("Bot started — listening via WebSocket") while True: try: - messages = signal.receive() - for message in messages: + for message in signal.listen(): logger.info(f"Message from {message.source}: {message.message[:80]}") registry.record_contact(message.source, "") dispatch(message, signal, llm, registry, inventory_path) except Exception: - logger.exception("Error in message loop") - time.sleep(config.poll_interval) + logger.exception(f"WebSocket error, reconnecting in {RECONNECT_DELAY}s") + time.sleep(RECONNECT_DELAY) def main( @@ -106,7 +107,6 @@ def main( llm_host: Annotated[str, typer.Option(envvar="LLM_HOST")], llm_model: Annotated[str, typer.Option(envvar="LLM_MODEL")] = "qwen3-vl:32b", llm_port: Annotated[int, typer.Option(envvar="LLM_PORT")] = 11434, - poll_interval: Annotated[int, typer.Option(help="Seconds between polls")] = 2, inventory_file: Annotated[str, typer.Option(envvar="INVENTORY_FILE")] = "/var/lib/signal-bot/van_inventory.json", registry_file: Annotated[str, typer.Option(envvar="REGISTRY_FILE")] = "/var/lib/signal-bot/device_registry.json", log_level: Annotated[str, typer.Option()] = "INFO", @@ -119,7 +119,6 @@ def main( signal_api_url=signal_api_url, phone_number=phone_number, llm=llm_config, - poll_interval=poll_interval, inventory_file=inventory_file, ) diff --git a/python/signal_bot/models.py b/python/signal_bot/models.py index bcdd7bd..c71f900 100644 --- a/python/signal_bot/models.py +++ b/python/signal_bot/models.py @@ -82,5 +82,4 @@ class BotConfig(BaseModel): signal_api_url: str phone_number: str llm: LLMConfig - poll_interval: int = 2 inventory_file: str = "van_inventory.json" diff --git a/python/signal_bot/signal_client.py b/python/signal_bot/signal_client.py index aa7ba25..ea18e4f 100644 --- a/python/signal_bot/signal_client.py +++ b/python/signal_bot/signal_client.py @@ -2,16 +2,41 @@ from __future__ import annotations +import json import logging -from typing import Any +from typing import TYPE_CHECKING, Any import httpx +import websockets.sync.client + +if TYPE_CHECKING: + from collections.abc import Generator from python.signal_bot.models import SignalMessage logger = logging.getLogger(__name__) +def _parse_envelope(envelope: dict[str, Any]) -> SignalMessage | None: + """Parse a signal-cli envelope into a SignalMessage, or None if not a data message.""" + data_message = envelope.get("dataMessage") + if not data_message: + return None + + attachment_ids = [att["id"] for att in data_message.get("attachments", []) if "id" in att] + + group_info = data_message.get("groupInfo") + group_id = group_info.get("groupId") if group_info else None + + return SignalMessage( + source=envelope.get("source", ""), + timestamp=envelope.get("timestamp", 0), + message=data_message.get("message", "") or "", + attachments=attachment_ids, + group_id=group_id, + ) + + class SignalClient: """Communicate with signal-cli-rest-api. @@ -25,35 +50,26 @@ class SignalClient: self.phone_number = phone_number self._client = httpx.Client(base_url=self.base_url, timeout=30) - def receive(self) -> list[SignalMessage]: - """Poll for new messages.""" - response = self._client.get(f"/v1/receive/{self.phone_number}") - response.raise_for_status() - envelopes: list[dict[str, Any]] = response.json() + def _ws_url(self) -> str: + """Build the WebSocket URL from the base HTTP URL.""" + url = self.base_url.replace("http://", "ws://").replace("https://", "wss://") + return f"{url}/v1/receive/{self.phone_number}" - messages: list[SignalMessage] = [] - for raw in envelopes: - envelope = raw.get("envelope", {}) - data_message = envelope.get("dataMessage") - if not data_message: - continue + def listen(self) -> Generator[SignalMessage]: + """Connect via WebSocket and yield messages as they arrive.""" + ws_url = self._ws_url() + logger.info(f"Connecting to WebSocket: {ws_url}") - attachment_ids = [att["id"] for att in data_message.get("attachments", []) if "id" in att] - - group_info = data_message.get("groupInfo") - group_id = group_info.get("groupId") if group_info else None - - messages.append( - SignalMessage( - source=envelope.get("source", ""), - timestamp=envelope.get("timestamp", 0), - message=data_message.get("message", "") or "", - attachments=attachment_ids, - group_id=group_id, - ), - ) - - return messages + with websockets.sync.client.connect(ws_url) as ws: + for raw in ws: + try: + data = json.loads(raw) + envelope = data.get("envelope", {}) + message = _parse_envelope(envelope) + if message: + yield message + except json.JSONDecodeError: + logger.warning(f"Non-JSON WebSocket frame: {raw[:200]}") def send(self, recipient: str, message: str) -> None: """Send a text message."""