mirror of
https://github.com/RichieCahill/dotfiles.git
synced 2026-04-17 04:58:19 -04:00
add Signal command and control bot service
Python service for jeeves that communicates over Signal via signal-cli-rest-api. Implements device verification via safety numbers (unverified devices cannot run commands until verified over SSH), and a van inventory command that uses an LLM on BOB (ollama) to parse receipt photos or text lists into structured inventory data. The LLM backend is configurable to swap models easily. https://claude.ai/code/session_01AKXQBuVBsW7J1YbukDiQ7A
This commit is contained in:
1
python/signal_bot/__init__.py
Normal file
1
python/signal_bot/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Signal command and control bot."""
|
||||
1
python/signal_bot/commands/__init__.py
Normal file
1
python/signal_bot/commands/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Signal bot commands."""
|
||||
134
python/signal_bot/commands/inventory.py
Normal file
134
python/signal_bot/commands/inventory.py
Normal file
@@ -0,0 +1,134 @@
|
||||
"""Van inventory command — parse receipts and item lists via LLM."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from python.signal_bot.models import InventoryItem, InventoryUpdate
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pathlib import Path
|
||||
|
||||
from python.signal_bot.llm_client import LLMClient
|
||||
from python.signal_bot.models import SignalMessage
|
||||
from python.signal_bot.signal_client import SignalClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SYSTEM_PROMPT = """\
|
||||
You are an inventory assistant. Extract items from the input and return ONLY
|
||||
a JSON array. Each element must have these fields:
|
||||
- "name": item name (string)
|
||||
- "quantity": integer count (default 1)
|
||||
- "category": category like "food", "tools", "supplies", etc.
|
||||
- "notes": any extra detail (empty string if none)
|
||||
|
||||
Example output:
|
||||
[{"name": "water bottles", "quantity": 6, "category": "supplies", "notes": "1 gallon each"}]
|
||||
|
||||
Return ONLY the JSON array, no other text.\
|
||||
"""
|
||||
|
||||
IMAGE_PROMPT = "Extract all items from this receipt or inventory photo."
|
||||
TEXT_PROMPT = "Extract all items from this inventory list."
|
||||
|
||||
|
||||
def parse_llm_response(raw: str) -> list[InventoryItem]:
|
||||
"""Parse the LLM JSON response into InventoryItem list."""
|
||||
text = raw.strip()
|
||||
# Strip markdown code fences if present
|
||||
if text.startswith("```"):
|
||||
lines = text.split("\n")
|
||||
lines = [line for line in lines if not line.startswith("```")]
|
||||
text = "\n".join(lines)
|
||||
|
||||
items_data: list[dict[str, Any]] = json.loads(text)
|
||||
return [InventoryItem.model_validate(item) for item in items_data]
|
||||
|
||||
|
||||
def load_inventory(path: Path) -> list[InventoryItem]:
|
||||
"""Load existing inventory from disk."""
|
||||
if not path.exists():
|
||||
return []
|
||||
data: list[dict[str, Any]] = json.loads(path.read_text())
|
||||
return [InventoryItem.model_validate(item) for item in data]
|
||||
|
||||
|
||||
def save_inventory(path: Path, items: list[InventoryItem]) -> None:
|
||||
"""Save inventory to disk."""
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
data = [item.model_dump() for item in items]
|
||||
path.write_text(json.dumps(data, indent=2) + "\n")
|
||||
|
||||
|
||||
def handle_inventory_update(
|
||||
message: SignalMessage,
|
||||
signal: SignalClient,
|
||||
llm: LLMClient,
|
||||
inventory_path: Path,
|
||||
) -> InventoryUpdate:
|
||||
"""Process an inventory update from a Signal message.
|
||||
|
||||
Accepts either an image (receipt photo) or text list.
|
||||
Uses the LLM to extract structured items, then merges into inventory.
|
||||
"""
|
||||
try:
|
||||
if message.attachments:
|
||||
image_data = signal.get_attachment(message.attachments[0])
|
||||
raw_response = llm.chat_with_image(
|
||||
IMAGE_PROMPT,
|
||||
image_data,
|
||||
system=SYSTEM_PROMPT,
|
||||
)
|
||||
source_type = "receipt_photo"
|
||||
elif message.message.strip():
|
||||
raw_response = llm.chat(
|
||||
f"{TEXT_PROMPT}\n\n{message.message}",
|
||||
system=SYSTEM_PROMPT,
|
||||
)
|
||||
source_type = "text_list"
|
||||
else:
|
||||
signal.reply(message, "Send a photo of a receipt or a text list of items to update inventory.")
|
||||
return InventoryUpdate()
|
||||
|
||||
new_items = parse_llm_response(raw_response)
|
||||
existing = load_inventory(inventory_path)
|
||||
merged = _merge_items(existing, new_items)
|
||||
save_inventory(inventory_path, merged)
|
||||
|
||||
summary = _format_summary(new_items)
|
||||
signal.reply(message, f"Inventory updated with {len(new_items)} item(s):\n{summary}")
|
||||
|
||||
return InventoryUpdate(items=new_items, raw_response=raw_response, source_type=source_type)
|
||||
|
||||
except Exception:
|
||||
logger.exception("Failed to process inventory update")
|
||||
signal.reply(message, "Failed to process inventory update. Check logs for details.")
|
||||
return InventoryUpdate()
|
||||
|
||||
|
||||
def _merge_items(existing: list[InventoryItem], new: list[InventoryItem]) -> list[InventoryItem]:
|
||||
"""Merge new items into existing inventory, summing quantities for matches."""
|
||||
by_name: dict[str, InventoryItem] = {item.name.lower(): item for item in existing}
|
||||
for item in new:
|
||||
key = item.name.lower()
|
||||
if key in by_name:
|
||||
current = by_name[key]
|
||||
by_name[key] = current.model_copy(
|
||||
update={
|
||||
"quantity": current.quantity + item.quantity,
|
||||
"category": item.category or current.category,
|
||||
"notes": item.notes or current.notes,
|
||||
},
|
||||
)
|
||||
else:
|
||||
by_name[key] = item
|
||||
return list(by_name.values())
|
||||
|
||||
|
||||
def _format_summary(items: list[InventoryItem]) -> str:
|
||||
"""Format items into a readable summary."""
|
||||
lines = [f" - {item.name} x{item.quantity} [{item.category}]" for item in items]
|
||||
return "\n".join(lines)
|
||||
135
python/signal_bot/device_registry.py
Normal file
135
python/signal_bot/device_registry.py
Normal file
@@ -0,0 +1,135 @@
|
||||
"""Device registry — tracks verified/unverified devices by safety number."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from python.common import utcnow
|
||||
from python.signal_bot.models import Device, TrustLevel
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pathlib import Path
|
||||
|
||||
from python.signal_bot.signal_client import SignalClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DeviceRegistry:
|
||||
"""Manage device trust based on Signal safety numbers.
|
||||
|
||||
Devices start as UNVERIFIED. An admin verifies them over SSH by calling
|
||||
``verify(phone_number)`` which marks the device VERIFIED and also tells
|
||||
signal-cli to trust the identity.
|
||||
|
||||
Only VERIFIED devices may execute commands.
|
||||
|
||||
Args:
|
||||
signal_client: The Signal API client (used to sync identities).
|
||||
registry_path: Path to the JSON file that persists device state.
|
||||
"""
|
||||
|
||||
def __init__(self, signal_client: SignalClient, registry_path: Path) -> None:
|
||||
self.signal_client = signal_client
|
||||
self.registry_path = registry_path
|
||||
self._devices: dict[str, Device] = {}
|
||||
self._load()
|
||||
|
||||
def is_verified(self, phone_number: str) -> bool:
|
||||
"""Check if a phone number is verified."""
|
||||
device = self._devices.get(phone_number)
|
||||
return device is not None and device.trust_level == TrustLevel.VERIFIED
|
||||
|
||||
def is_blocked(self, phone_number: str) -> bool:
|
||||
"""Check if a phone number is blocked."""
|
||||
device = self._devices.get(phone_number)
|
||||
return device is not None and device.trust_level == TrustLevel.BLOCKED
|
||||
|
||||
def record_contact(self, phone_number: str, safety_number: str) -> Device:
|
||||
"""Record seeing a device. Creates entry if new, updates last_seen."""
|
||||
now = utcnow()
|
||||
if phone_number in self._devices:
|
||||
device = self._devices[phone_number]
|
||||
if device.safety_number != safety_number:
|
||||
logger.warning(f"Safety number changed for {phone_number}, resetting to UNVERIFIED")
|
||||
device.safety_number = safety_number
|
||||
device.trust_level = TrustLevel.UNVERIFIED
|
||||
device.last_seen = now
|
||||
else:
|
||||
device = Device(
|
||||
phone_number=phone_number,
|
||||
safety_number=safety_number,
|
||||
trust_level=TrustLevel.UNVERIFIED,
|
||||
first_seen=now,
|
||||
last_seen=now,
|
||||
)
|
||||
self._devices[phone_number] = device
|
||||
logger.info(f"New device registered: {phone_number}")
|
||||
|
||||
self._save()
|
||||
return device
|
||||
|
||||
def verify(self, phone_number: str) -> bool:
|
||||
"""Mark a device as verified. Called by admin over SSH.
|
||||
|
||||
Returns True if the device was found and verified.
|
||||
"""
|
||||
device = self._devices.get(phone_number)
|
||||
if not device:
|
||||
logger.warning(f"Cannot verify unknown device: {phone_number}")
|
||||
return False
|
||||
|
||||
device.trust_level = TrustLevel.VERIFIED
|
||||
self.signal_client.trust_identity(phone_number, trust_all_known_keys=True)
|
||||
self._save()
|
||||
logger.info(f"Device verified: {phone_number}")
|
||||
return True
|
||||
|
||||
def block(self, phone_number: str) -> bool:
|
||||
"""Block a device."""
|
||||
device = self._devices.get(phone_number)
|
||||
if not device:
|
||||
return False
|
||||
device.trust_level = TrustLevel.BLOCKED
|
||||
self._save()
|
||||
logger.info(f"Device blocked: {phone_number}")
|
||||
return True
|
||||
|
||||
def unverify(self, phone_number: str) -> bool:
|
||||
"""Reset a device to unverified."""
|
||||
device = self._devices.get(phone_number)
|
||||
if not device:
|
||||
return False
|
||||
device.trust_level = TrustLevel.UNVERIFIED
|
||||
self._save()
|
||||
return True
|
||||
|
||||
def list_devices(self) -> list[Device]:
|
||||
"""Return all known devices."""
|
||||
return list(self._devices.values())
|
||||
|
||||
def sync_identities(self) -> None:
|
||||
"""Pull identity list from signal-cli and record any new ones."""
|
||||
identities = self.signal_client.get_identities()
|
||||
for identity in identities:
|
||||
number = identity.get("number", "")
|
||||
safety = identity.get("safety_number", identity.get("fingerprint", ""))
|
||||
if number:
|
||||
self.record_contact(number, safety)
|
||||
|
||||
def _load(self) -> None:
|
||||
"""Load registry from disk."""
|
||||
if not self.registry_path.exists():
|
||||
return
|
||||
data: list[dict[str, Any]] = json.loads(self.registry_path.read_text())
|
||||
for entry in data:
|
||||
device = Device.model_validate(entry)
|
||||
self._devices[device.phone_number] = device
|
||||
|
||||
def _save(self) -> None:
|
||||
"""Persist registry to disk."""
|
||||
self.registry_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
data = [device.model_dump(mode="json") for device in self._devices.values()]
|
||||
self.registry_path.write_text(json.dumps(data, indent=2) + "\n")
|
||||
73
python/signal_bot/llm_client.py
Normal file
73
python/signal_bot/llm_client.py
Normal file
@@ -0,0 +1,73 @@
|
||||
"""Flexible LLM client for ollama backends."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
import httpx
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from python.signal_bot.models import LLMConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LLMClient:
|
||||
"""Talk to an ollama instance.
|
||||
|
||||
Designed to be swappable — change ``config.model`` to try a new LLM
|
||||
without touching any calling code.
|
||||
|
||||
Args:
|
||||
config: LLM connection and model configuration.
|
||||
"""
|
||||
|
||||
def __init__(self, config: LLMConfig) -> None:
|
||||
self.config = config
|
||||
self._client = httpx.Client(base_url=config.base_url, timeout=120)
|
||||
|
||||
def chat(self, prompt: str, *, system: str = "") -> str:
|
||||
"""Send a text prompt and return the response."""
|
||||
messages: list[dict[str, Any]] = []
|
||||
if system:
|
||||
messages.append({"role": "system", "content": system})
|
||||
messages.append({"role": "user", "content": prompt})
|
||||
return self._generate(messages)
|
||||
|
||||
def chat_with_image(self, prompt: str, image_data: bytes, *, system: str = "") -> str:
|
||||
"""Send a prompt with an image and return the response.
|
||||
|
||||
Requires a vision-capable model (e.g. qwen3-vl).
|
||||
"""
|
||||
encoded = base64.b64encode(image_data).decode()
|
||||
messages: list[dict[str, Any]] = []
|
||||
if system:
|
||||
messages.append({"role": "system", "content": system})
|
||||
messages.append({"role": "user", "content": prompt, "images": [encoded]})
|
||||
return self._generate(messages)
|
||||
|
||||
def _generate(self, messages: list[dict[str, Any]]) -> str:
|
||||
"""Call the ollama chat API."""
|
||||
payload = {
|
||||
"model": self.config.model,
|
||||
"messages": messages,
|
||||
"stream": False,
|
||||
"options": {"temperature": self.config.temperature},
|
||||
}
|
||||
logger.info(f"LLM request to {self.config.model}")
|
||||
response = self._client.post("/api/chat", json=payload)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
return data["message"]["content"]
|
||||
|
||||
def list_models(self) -> list[str]:
|
||||
"""List available models on the ollama instance."""
|
||||
response = self._client.get("/api/tags")
|
||||
response.raise_for_status()
|
||||
return [m["name"] for m in response.json().get("models", [])]
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the HTTP client."""
|
||||
self._client.close()
|
||||
138
python/signal_bot/main.py
Normal file
138
python/signal_bot/main.py
Normal file
@@ -0,0 +1,138 @@
|
||||
"""Signal command and control bot — main entry point."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Annotated
|
||||
|
||||
import typer
|
||||
|
||||
from python.common import configure_logger
|
||||
from python.signal_bot.commands.inventory import handle_inventory_update
|
||||
from python.signal_bot.device_registry import DeviceRegistry
|
||||
from python.signal_bot.llm_client import LLMClient
|
||||
from python.signal_bot.models import BotConfig, LLMConfig, SignalMessage
|
||||
from python.signal_bot.signal_client import SignalClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Command prefix — messages must start with this to be treated as commands.
|
||||
CMD_PREFIX = "!"
|
||||
|
||||
HELP_TEXT = """\
|
||||
Available commands:
|
||||
!inventory <text list> — update van inventory from a text list
|
||||
!inventory (+ photo) — update van inventory from a receipt photo
|
||||
!status — show bot status
|
||||
!help — show this help message
|
||||
|
||||
Send a receipt photo with the message "!inventory" to scan it.\
|
||||
"""
|
||||
|
||||
|
||||
def dispatch(
|
||||
message: SignalMessage,
|
||||
signal: SignalClient,
|
||||
llm: LLMClient,
|
||||
registry: DeviceRegistry,
|
||||
inventory_path: Path,
|
||||
) -> None:
|
||||
"""Route an incoming message to the right command handler."""
|
||||
source = message.source
|
||||
|
||||
if registry.is_blocked(source):
|
||||
return
|
||||
|
||||
if not registry.is_verified(source):
|
||||
signal.reply(
|
||||
message,
|
||||
"Your device is not verified. Ask the admin to verify your safety number over SSH.",
|
||||
)
|
||||
return
|
||||
|
||||
text = message.message.strip()
|
||||
|
||||
if not text.startswith(CMD_PREFIX) and not message.attachments:
|
||||
return
|
||||
|
||||
cmd = text.lstrip(CMD_PREFIX).split()[0].lower() if text.startswith(CMD_PREFIX) else ""
|
||||
|
||||
if cmd == "help":
|
||||
signal.reply(message, HELP_TEXT)
|
||||
|
||||
elif cmd == "status":
|
||||
models = llm.list_models()
|
||||
model_list = ", ".join(models[:10])
|
||||
device_count = len(registry.list_devices())
|
||||
signal.reply(
|
||||
message,
|
||||
f"Bot online.\nLLM: {llm.config.model}\nAvailable models: {model_list}\nKnown devices: {device_count}",
|
||||
)
|
||||
|
||||
elif cmd == "inventory" or (message.attachments and not text.startswith(CMD_PREFIX)):
|
||||
handle_inventory_update(message, signal, llm, inventory_path)
|
||||
|
||||
else:
|
||||
signal.reply(message, f"Unknown command: {cmd}\n\n{HELP_TEXT}")
|
||||
|
||||
|
||||
def run_loop(
|
||||
config: BotConfig,
|
||||
signal: SignalClient,
|
||||
llm: LLMClient,
|
||||
registry: DeviceRegistry,
|
||||
) -> None:
|
||||
"""Main polling loop."""
|
||||
inventory_path = Path(config.inventory_file)
|
||||
logger.info(f"Bot started — polling every {config.poll_interval}s")
|
||||
|
||||
while True:
|
||||
try:
|
||||
messages = signal.receive()
|
||||
for message in messages:
|
||||
logger.info(f"Message from {message.source}: {message.message[:80]}")
|
||||
registry.record_contact(message.source, "")
|
||||
dispatch(message, signal, llm, registry, inventory_path)
|
||||
except Exception:
|
||||
logger.exception("Error in message loop")
|
||||
time.sleep(config.poll_interval)
|
||||
|
||||
|
||||
def main(
|
||||
signal_api_url: Annotated[str, typer.Option(envvar="SIGNAL_API_URL")],
|
||||
phone_number: Annotated[str, typer.Option(envvar="SIGNAL_PHONE_NUMBER")],
|
||||
llm_host: Annotated[str, typer.Option(envvar="LLM_HOST")],
|
||||
llm_model: Annotated[str, typer.Option(envvar="LLM_MODEL")] = "qwen3-vl:32b",
|
||||
llm_port: Annotated[int, typer.Option(envvar="LLM_PORT")] = 11434,
|
||||
poll_interval: Annotated[int, typer.Option(help="Seconds between polls")] = 2,
|
||||
inventory_file: Annotated[str, typer.Option(envvar="INVENTORY_FILE")] = "/var/lib/signal-bot/van_inventory.json",
|
||||
registry_file: Annotated[str, typer.Option(envvar="REGISTRY_FILE")] = "/var/lib/signal-bot/device_registry.json",
|
||||
log_level: Annotated[str, typer.Option()] = "INFO",
|
||||
) -> None:
|
||||
"""Run the Signal command and control bot."""
|
||||
configure_logger(log_level)
|
||||
|
||||
llm_config = LLMConfig(model=llm_model, host=llm_host, port=llm_port)
|
||||
config = BotConfig(
|
||||
signal_api_url=signal_api_url,
|
||||
phone_number=phone_number,
|
||||
llm=llm_config,
|
||||
poll_interval=poll_interval,
|
||||
inventory_file=inventory_file,
|
||||
)
|
||||
|
||||
signal = SignalClient(config.signal_api_url, config.phone_number)
|
||||
llm = LLMClient(llm_config)
|
||||
registry = DeviceRegistry(signal, Path(registry_file))
|
||||
|
||||
try:
|
||||
run_loop(config, signal, llm, registry)
|
||||
finally:
|
||||
signal.close()
|
||||
llm.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
typer.run(main)
|
||||
86
python/signal_bot/models.py
Normal file
86
python/signal_bot/models.py
Normal file
@@ -0,0 +1,86 @@
|
||||
"""Models for the Signal command and control bot."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime # noqa: TC003 - pydantic needs this at runtime
|
||||
from enum import StrEnum
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class TrustLevel(StrEnum):
|
||||
"""Device trust level."""
|
||||
|
||||
VERIFIED = "verified"
|
||||
UNVERIFIED = "unverified"
|
||||
BLOCKED = "blocked"
|
||||
|
||||
|
||||
class Device(BaseModel):
|
||||
"""A registered device tracked by safety number."""
|
||||
|
||||
phone_number: str
|
||||
safety_number: str
|
||||
trust_level: TrustLevel = TrustLevel.UNVERIFIED
|
||||
first_seen: datetime
|
||||
last_seen: datetime
|
||||
|
||||
|
||||
class SignalMessage(BaseModel):
|
||||
"""An incoming Signal message."""
|
||||
|
||||
source: str
|
||||
timestamp: int
|
||||
message: str = ""
|
||||
attachments: list[str] = []
|
||||
group_id: str | None = None
|
||||
is_receipt: bool = False
|
||||
|
||||
|
||||
class SignalEnvelope(BaseModel):
|
||||
"""Raw envelope from signal-cli-rest-api."""
|
||||
|
||||
envelope: dict[str, Any]
|
||||
account: str | None = None
|
||||
|
||||
|
||||
class InventoryItem(BaseModel):
|
||||
"""An item in the van inventory."""
|
||||
|
||||
name: str
|
||||
quantity: int = 1
|
||||
category: str = ""
|
||||
notes: str = ""
|
||||
|
||||
|
||||
class InventoryUpdate(BaseModel):
|
||||
"""Result of processing an inventory update."""
|
||||
|
||||
items: list[InventoryItem] = []
|
||||
raw_response: str = ""
|
||||
source_type: str = "" # "receipt_photo" or "text_list"
|
||||
|
||||
|
||||
class LLMConfig(BaseModel):
|
||||
"""Configuration for an LLM backend."""
|
||||
|
||||
model: str
|
||||
host: str
|
||||
port: int = 11434
|
||||
temperature: float = 0.1
|
||||
|
||||
@property
|
||||
def base_url(self) -> str:
|
||||
"""Ollama API base URL."""
|
||||
return f"http://{self.host}:{self.port}"
|
||||
|
||||
|
||||
class BotConfig(BaseModel):
|
||||
"""Top-level bot configuration."""
|
||||
|
||||
signal_api_url: str
|
||||
phone_number: str
|
||||
llm: LLMConfig
|
||||
poll_interval: int = 2
|
||||
inventory_file: str = "van_inventory.json"
|
||||
112
python/signal_bot/signal_client.py
Normal file
112
python/signal_bot/signal_client.py
Normal file
@@ -0,0 +1,112 @@
|
||||
"""Client for the signal-cli-rest-api."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
from python.signal_bot.models import SignalMessage
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SignalClient:
|
||||
"""Communicate with signal-cli-rest-api.
|
||||
|
||||
Args:
|
||||
base_url: URL of the signal-cli-rest-api (e.g. http://localhost:8989).
|
||||
phone_number: The registered phone number to send/receive as.
|
||||
"""
|
||||
|
||||
def __init__(self, base_url: str, phone_number: str) -> None:
|
||||
self.base_url = base_url.rstrip("/")
|
||||
self.phone_number = phone_number
|
||||
self._client = httpx.Client(base_url=self.base_url, timeout=30)
|
||||
|
||||
def receive(self) -> list[SignalMessage]:
|
||||
"""Poll for new messages."""
|
||||
response = self._client.get(f"/v1/receive/{self.phone_number}")
|
||||
response.raise_for_status()
|
||||
envelopes: list[dict[str, Any]] = response.json()
|
||||
|
||||
messages: list[SignalMessage] = []
|
||||
for raw in envelopes:
|
||||
envelope = raw.get("envelope", {})
|
||||
data_message = envelope.get("dataMessage")
|
||||
if not data_message:
|
||||
continue
|
||||
|
||||
attachment_ids = [
|
||||
att["id"] for att in data_message.get("attachments", []) if "id" in att
|
||||
]
|
||||
|
||||
group_info = data_message.get("groupInfo")
|
||||
group_id = group_info.get("groupId") if group_info else None
|
||||
|
||||
messages.append(
|
||||
SignalMessage(
|
||||
source=envelope.get("source", ""),
|
||||
timestamp=envelope.get("timestamp", 0),
|
||||
message=data_message.get("message", "") or "",
|
||||
attachments=attachment_ids,
|
||||
group_id=group_id,
|
||||
),
|
||||
)
|
||||
|
||||
return messages
|
||||
|
||||
def send(self, recipient: str, message: str) -> None:
|
||||
"""Send a text message."""
|
||||
payload = {
|
||||
"message": message,
|
||||
"number": self.phone_number,
|
||||
"recipients": [recipient],
|
||||
}
|
||||
response = self._client.post("/v2/send", json=payload)
|
||||
response.raise_for_status()
|
||||
|
||||
def send_to_group(self, group_id: str, message: str) -> None:
|
||||
"""Send a message to a group."""
|
||||
payload = {
|
||||
"message": message,
|
||||
"number": self.phone_number,
|
||||
"recipients": [group_id],
|
||||
}
|
||||
response = self._client.post("/v2/send", json=payload)
|
||||
response.raise_for_status()
|
||||
|
||||
def get_attachment(self, attachment_id: str) -> bytes:
|
||||
"""Download an attachment by ID."""
|
||||
response = self._client.get(f"/v1/attachments/{attachment_id}")
|
||||
response.raise_for_status()
|
||||
return response.content
|
||||
|
||||
def get_identities(self) -> list[dict[str, Any]]:
|
||||
"""List known identities and their trust levels."""
|
||||
response = self._client.get(f"/v1/identities/{self.phone_number}")
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def trust_identity(self, number_to_trust: str, *, trust_all_known_keys: bool = False) -> None:
|
||||
"""Trust an identity (verify safety number)."""
|
||||
payload: dict[str, Any] = {}
|
||||
if trust_all_known_keys:
|
||||
payload["trust_all_known_keys"] = True
|
||||
response = self._client.put(
|
||||
f"/v1/identities/{self.phone_number}/trust/{number_to_trust}",
|
||||
json=payload,
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
def reply(self, message: SignalMessage, text: str) -> None:
|
||||
"""Reply to a message, routing to group or individual."""
|
||||
if message.group_id:
|
||||
self.send_to_group(message.group_id, text)
|
||||
else:
|
||||
self.send(message.source, text)
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the HTTP client."""
|
||||
self._client.close()
|
||||
Reference in New Issue
Block a user