Replace polling with WebSocket for real-time Signal message reception

Switch from polling /v1/receive every 2s to a persistent WebSocket
connection at ws://.../v1/receive/<number>. Messages now arrive
instantly via the signal-cli-rest-api WebSocket endpoint.

- Add `listen()` generator to SignalClient using websockets library
- Extract `_parse_envelope()` as standalone function
- Replace `run_loop` polling with WebSocket listener + reconnect logic
- Remove `poll_interval` from BotConfig and CLI args
- Add websockets to Nix overlay and pyproject.toml dependencies

https://claude.ai/code/session_01AKXQBuVBsW7J1YbukDiQ7A
This commit is contained in:
Claude
2026-03-08 20:34:29 +00:00
committed by Richie Cahill
parent f4f33eacc4
commit 42ede19472
5 changed files with 53 additions and 37 deletions

View File

@@ -43,6 +43,7 @@
tinytuya tinytuya
typer typer
types-requests types-requests
websockets
] ]
); );
}; };

View File

@@ -21,6 +21,7 @@ dependencies = [
"requests", "requests",
"sqlalchemy", "sqlalchemy",
"typer", "typer",
"websockets",
] ]
[project.scripts] [project.scripts]

View File

@@ -31,6 +31,8 @@ Available commands:
Send a receipt photo with the message "!inventory" to scan it.\ Send a receipt photo with the message "!inventory" to scan it.\
""" """
RECONNECT_DELAY = 5
def dispatch( def dispatch(
message: SignalMessage, message: SignalMessage,
@@ -84,20 +86,19 @@ def run_loop(
llm: LLMClient, llm: LLMClient,
registry: DeviceRegistry, registry: DeviceRegistry,
) -> None: ) -> None:
"""Main polling loop.""" """Listen for messages via WebSocket, reconnecting on failure."""
inventory_path = Path(config.inventory_file) inventory_path = Path(config.inventory_file)
logger.info(f"Bot started — polling every {config.poll_interval}s") logger.info("Bot started — listening via WebSocket")
while True: while True:
try: try:
messages = signal.receive() for message in signal.listen():
for message in messages:
logger.info(f"Message from {message.source}: {message.message[:80]}") logger.info(f"Message from {message.source}: {message.message[:80]}")
registry.record_contact(message.source, "") registry.record_contact(message.source, "")
dispatch(message, signal, llm, registry, inventory_path) dispatch(message, signal, llm, registry, inventory_path)
except Exception: except Exception:
logger.exception("Error in message loop") logger.exception(f"WebSocket error, reconnecting in {RECONNECT_DELAY}s")
time.sleep(config.poll_interval) time.sleep(RECONNECT_DELAY)
def main( def main(
@@ -106,7 +107,6 @@ def main(
llm_host: Annotated[str, typer.Option(envvar="LLM_HOST")], llm_host: Annotated[str, typer.Option(envvar="LLM_HOST")],
llm_model: Annotated[str, typer.Option(envvar="LLM_MODEL")] = "qwen3-vl:32b", llm_model: Annotated[str, typer.Option(envvar="LLM_MODEL")] = "qwen3-vl:32b",
llm_port: Annotated[int, typer.Option(envvar="LLM_PORT")] = 11434, llm_port: Annotated[int, typer.Option(envvar="LLM_PORT")] = 11434,
poll_interval: Annotated[int, typer.Option(help="Seconds between polls")] = 2,
inventory_file: Annotated[str, typer.Option(envvar="INVENTORY_FILE")] = "/var/lib/signal-bot/van_inventory.json", inventory_file: Annotated[str, typer.Option(envvar="INVENTORY_FILE")] = "/var/lib/signal-bot/van_inventory.json",
registry_file: Annotated[str, typer.Option(envvar="REGISTRY_FILE")] = "/var/lib/signal-bot/device_registry.json", registry_file: Annotated[str, typer.Option(envvar="REGISTRY_FILE")] = "/var/lib/signal-bot/device_registry.json",
log_level: Annotated[str, typer.Option()] = "INFO", log_level: Annotated[str, typer.Option()] = "INFO",
@@ -119,7 +119,6 @@ def main(
signal_api_url=signal_api_url, signal_api_url=signal_api_url,
phone_number=phone_number, phone_number=phone_number,
llm=llm_config, llm=llm_config,
poll_interval=poll_interval,
inventory_file=inventory_file, inventory_file=inventory_file,
) )

View File

@@ -82,5 +82,4 @@ class BotConfig(BaseModel):
signal_api_url: str signal_api_url: str
phone_number: str phone_number: str
llm: LLMConfig llm: LLMConfig
poll_interval: int = 2
inventory_file: str = "van_inventory.json" inventory_file: str = "van_inventory.json"

View File

@@ -2,16 +2,41 @@
from __future__ import annotations from __future__ import annotations
import json
import logging import logging
from typing import Any from typing import TYPE_CHECKING, Any
import httpx import httpx
import websockets.sync.client
if TYPE_CHECKING:
from collections.abc import Generator
from python.signal_bot.models import SignalMessage from python.signal_bot.models import SignalMessage
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _parse_envelope(envelope: dict[str, Any]) -> SignalMessage | None:
"""Parse a signal-cli envelope into a SignalMessage, or None if not a data message."""
data_message = envelope.get("dataMessage")
if not data_message:
return None
attachment_ids = [att["id"] for att in data_message.get("attachments", []) if "id" in att]
group_info = data_message.get("groupInfo")
group_id = group_info.get("groupId") if group_info else None
return SignalMessage(
source=envelope.get("source", ""),
timestamp=envelope.get("timestamp", 0),
message=data_message.get("message", "") or "",
attachments=attachment_ids,
group_id=group_id,
)
class SignalClient: class SignalClient:
"""Communicate with signal-cli-rest-api. """Communicate with signal-cli-rest-api.
@@ -25,35 +50,26 @@ class SignalClient:
self.phone_number = phone_number self.phone_number = phone_number
self._client = httpx.Client(base_url=self.base_url, timeout=30) self._client = httpx.Client(base_url=self.base_url, timeout=30)
def receive(self) -> list[SignalMessage]: def _ws_url(self) -> str:
"""Poll for new messages.""" """Build the WebSocket URL from the base HTTP URL."""
response = self._client.get(f"/v1/receive/{self.phone_number}") url = self.base_url.replace("http://", "ws://").replace("https://", "wss://")
response.raise_for_status() return f"{url}/v1/receive/{self.phone_number}"
envelopes: list[dict[str, Any]] = response.json()
messages: list[SignalMessage] = [] def listen(self) -> Generator[SignalMessage]:
for raw in envelopes: """Connect via WebSocket and yield messages as they arrive."""
envelope = raw.get("envelope", {}) ws_url = self._ws_url()
data_message = envelope.get("dataMessage") logger.info(f"Connecting to WebSocket: {ws_url}")
if not data_message:
continue
attachment_ids = [att["id"] for att in data_message.get("attachments", []) if "id" in att] with websockets.sync.client.connect(ws_url) as ws:
for raw in ws:
group_info = data_message.get("groupInfo") try:
group_id = group_info.get("groupId") if group_info else None data = json.loads(raw)
envelope = data.get("envelope", {})
messages.append( message = _parse_envelope(envelope)
SignalMessage( if message:
source=envelope.get("source", ""), yield message
timestamp=envelope.get("timestamp", 0), except json.JSONDecodeError:
message=data_message.get("message", "") or "", logger.warning(f"Non-JSON WebSocket frame: {raw[:200]}")
attachments=attachment_ids,
group_id=group_id,
),
)
return messages
def send(self, recipient: str, message: str) -> None: def send(self, recipient: str, message: str) -> None:
"""Send a text message.""" """Send a text message."""