diff --git a/pyproject.toml b/pyproject.toml index adbc03e..b8b3551 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,7 +56,10 @@ lint.ignore = [ [tool.ruff.lint.per-file-ignores] "tests/**" = [ - "S101", # (perm) pytest needs asserts + "ANN", # (perm) type annotations not needed in tests + "D", # (perm) docstrings not needed in tests + "PLR2004", # (perm) magic values are fine in test assertions + "S101", # (perm) pytest needs asserts ] "python/stuff/**" = [ "T201", # (perm) I don't care about print statements dir @@ -82,6 +85,9 @@ lint.ignore = [ "python/alembic/**" = [ "INP001", # (perm) this creates LSP issues for alembic ] +"python/signal_bot/**" = [ + "D107", # (perm) class docstrings cover __init__ +] [tool.ruff.lint.pydocstyle] convention = "google" diff --git a/python/signal_bot/__init__.py b/python/signal_bot/__init__.py new file mode 100644 index 0000000..78fa05f --- /dev/null +++ b/python/signal_bot/__init__.py @@ -0,0 +1 @@ +"""Signal command and control bot.""" diff --git a/python/signal_bot/commands/__init__.py b/python/signal_bot/commands/__init__.py new file mode 100644 index 0000000..71ffd0c --- /dev/null +++ b/python/signal_bot/commands/__init__.py @@ -0,0 +1 @@ +"""Signal bot commands.""" diff --git a/python/signal_bot/commands/inventory.py b/python/signal_bot/commands/inventory.py new file mode 100644 index 0000000..61b6a8c --- /dev/null +++ b/python/signal_bot/commands/inventory.py @@ -0,0 +1,134 @@ +"""Van inventory command — parse receipts and item lists via LLM.""" + +from __future__ import annotations + +import json +import logging +from typing import TYPE_CHECKING, Any + +from python.signal_bot.models import InventoryItem, InventoryUpdate + +if TYPE_CHECKING: + from pathlib import Path + + from python.signal_bot.llm_client import LLMClient + from python.signal_bot.models import SignalMessage + from python.signal_bot.signal_client import SignalClient + +logger = logging.getLogger(__name__) + +SYSTEM_PROMPT = """\ +You are an inventory assistant. Extract items from the input and return ONLY +a JSON array. Each element must have these fields: + - "name": item name (string) + - "quantity": integer count (default 1) + - "category": category like "food", "tools", "supplies", etc. + - "notes": any extra detail (empty string if none) + +Example output: +[{"name": "water bottles", "quantity": 6, "category": "supplies", "notes": "1 gallon each"}] + +Return ONLY the JSON array, no other text.\ +""" + +IMAGE_PROMPT = "Extract all items from this receipt or inventory photo." +TEXT_PROMPT = "Extract all items from this inventory list." + + +def parse_llm_response(raw: str) -> list[InventoryItem]: + """Parse the LLM JSON response into InventoryItem list.""" + text = raw.strip() + # Strip markdown code fences if present + if text.startswith("```"): + lines = text.split("\n") + lines = [line for line in lines if not line.startswith("```")] + text = "\n".join(lines) + + items_data: list[dict[str, Any]] = json.loads(text) + return [InventoryItem.model_validate(item) for item in items_data] + + +def load_inventory(path: Path) -> list[InventoryItem]: + """Load existing inventory from disk.""" + if not path.exists(): + return [] + data: list[dict[str, Any]] = json.loads(path.read_text()) + return [InventoryItem.model_validate(item) for item in data] + + +def save_inventory(path: Path, items: list[InventoryItem]) -> None: + """Save inventory to disk.""" + path.parent.mkdir(parents=True, exist_ok=True) + data = [item.model_dump() for item in items] + path.write_text(json.dumps(data, indent=2) + "\n") + + +def handle_inventory_update( + message: SignalMessage, + signal: SignalClient, + llm: LLMClient, + inventory_path: Path, +) -> InventoryUpdate: + """Process an inventory update from a Signal message. + + Accepts either an image (receipt photo) or text list. + Uses the LLM to extract structured items, then merges into inventory. + """ + try: + if message.attachments: + image_data = signal.get_attachment(message.attachments[0]) + raw_response = llm.chat_with_image( + IMAGE_PROMPT, + image_data, + system=SYSTEM_PROMPT, + ) + source_type = "receipt_photo" + elif message.message.strip(): + raw_response = llm.chat( + f"{TEXT_PROMPT}\n\n{message.message}", + system=SYSTEM_PROMPT, + ) + source_type = "text_list" + else: + signal.reply(message, "Send a photo of a receipt or a text list of items to update inventory.") + return InventoryUpdate() + + new_items = parse_llm_response(raw_response) + existing = load_inventory(inventory_path) + merged = _merge_items(existing, new_items) + save_inventory(inventory_path, merged) + + summary = _format_summary(new_items) + signal.reply(message, f"Inventory updated with {len(new_items)} item(s):\n{summary}") + + return InventoryUpdate(items=new_items, raw_response=raw_response, source_type=source_type) + + except Exception: + logger.exception("Failed to process inventory update") + signal.reply(message, "Failed to process inventory update. Check logs for details.") + return InventoryUpdate() + + +def _merge_items(existing: list[InventoryItem], new: list[InventoryItem]) -> list[InventoryItem]: + """Merge new items into existing inventory, summing quantities for matches.""" + by_name: dict[str, InventoryItem] = {item.name.lower(): item for item in existing} + for item in new: + key = item.name.lower() + if key in by_name: + current = by_name[key] + by_name[key] = current.model_copy( + update={ + "quantity": current.quantity + item.quantity, + "category": item.category or current.category, + "notes": item.notes or current.notes, + }, + ) + else: + by_name[key] = item + return list(by_name.values()) + + +def _format_summary(items: list[InventoryItem]) -> str: + """Format items into a readable summary.""" + lines = [f" - {item.name} x{item.quantity} [{item.category}]" for item in items] + return "\n".join(lines) diff --git a/python/signal_bot/device_registry.py b/python/signal_bot/device_registry.py new file mode 100644 index 0000000..3f5f03f --- /dev/null +++ b/python/signal_bot/device_registry.py @@ -0,0 +1,135 @@ +"""Device registry — tracks verified/unverified devices by safety number.""" + +from __future__ import annotations + +import json +import logging +from typing import TYPE_CHECKING, Any + +from python.common import utcnow +from python.signal_bot.models import Device, TrustLevel + +if TYPE_CHECKING: + from pathlib import Path + + from python.signal_bot.signal_client import SignalClient + +logger = logging.getLogger(__name__) + + +class DeviceRegistry: + """Manage device trust based on Signal safety numbers. + + Devices start as UNVERIFIED. An admin verifies them over SSH by calling + ``verify(phone_number)`` which marks the device VERIFIED and also tells + signal-cli to trust the identity. + + Only VERIFIED devices may execute commands. + + Args: + signal_client: The Signal API client (used to sync identities). + registry_path: Path to the JSON file that persists device state. + """ + + def __init__(self, signal_client: SignalClient, registry_path: Path) -> None: + self.signal_client = signal_client + self.registry_path = registry_path + self._devices: dict[str, Device] = {} + self._load() + + def is_verified(self, phone_number: str) -> bool: + """Check if a phone number is verified.""" + device = self._devices.get(phone_number) + return device is not None and device.trust_level == TrustLevel.VERIFIED + + def is_blocked(self, phone_number: str) -> bool: + """Check if a phone number is blocked.""" + device = self._devices.get(phone_number) + return device is not None and device.trust_level == TrustLevel.BLOCKED + + def record_contact(self, phone_number: str, safety_number: str) -> Device: + """Record seeing a device. Creates entry if new, updates last_seen.""" + now = utcnow() + if phone_number in self._devices: + device = self._devices[phone_number] + if device.safety_number != safety_number: + logger.warning(f"Safety number changed for {phone_number}, resetting to UNVERIFIED") + device.safety_number = safety_number + device.trust_level = TrustLevel.UNVERIFIED + device.last_seen = now + else: + device = Device( + phone_number=phone_number, + safety_number=safety_number, + trust_level=TrustLevel.UNVERIFIED, + first_seen=now, + last_seen=now, + ) + self._devices[phone_number] = device + logger.info(f"New device registered: {phone_number}") + + self._save() + return device + + def verify(self, phone_number: str) -> bool: + """Mark a device as verified. Called by admin over SSH. + + Returns True if the device was found and verified. + """ + device = self._devices.get(phone_number) + if not device: + logger.warning(f"Cannot verify unknown device: {phone_number}") + return False + + device.trust_level = TrustLevel.VERIFIED + self.signal_client.trust_identity(phone_number, trust_all_known_keys=True) + self._save() + logger.info(f"Device verified: {phone_number}") + return True + + def block(self, phone_number: str) -> bool: + """Block a device.""" + device = self._devices.get(phone_number) + if not device: + return False + device.trust_level = TrustLevel.BLOCKED + self._save() + logger.info(f"Device blocked: {phone_number}") + return True + + def unverify(self, phone_number: str) -> bool: + """Reset a device to unverified.""" + device = self._devices.get(phone_number) + if not device: + return False + device.trust_level = TrustLevel.UNVERIFIED + self._save() + return True + + def list_devices(self) -> list[Device]: + """Return all known devices.""" + return list(self._devices.values()) + + def sync_identities(self) -> None: + """Pull identity list from signal-cli and record any new ones.""" + identities = self.signal_client.get_identities() + for identity in identities: + number = identity.get("number", "") + safety = identity.get("safety_number", identity.get("fingerprint", "")) + if number: + self.record_contact(number, safety) + + def _load(self) -> None: + """Load registry from disk.""" + if not self.registry_path.exists(): + return + data: list[dict[str, Any]] = json.loads(self.registry_path.read_text()) + for entry in data: + device = Device.model_validate(entry) + self._devices[device.phone_number] = device + + def _save(self) -> None: + """Persist registry to disk.""" + self.registry_path.parent.mkdir(parents=True, exist_ok=True) + data = [device.model_dump(mode="json") for device in self._devices.values()] + self.registry_path.write_text(json.dumps(data, indent=2) + "\n") diff --git a/python/signal_bot/llm_client.py b/python/signal_bot/llm_client.py new file mode 100644 index 0000000..38de976 --- /dev/null +++ b/python/signal_bot/llm_client.py @@ -0,0 +1,73 @@ +"""Flexible LLM client for ollama backends.""" + +from __future__ import annotations + +import base64 +import logging +from typing import TYPE_CHECKING, Any + +import httpx + +if TYPE_CHECKING: + from python.signal_bot.models import LLMConfig + +logger = logging.getLogger(__name__) + + +class LLMClient: + """Talk to an ollama instance. + + Designed to be swappable — change ``config.model`` to try a new LLM + without touching any calling code. + + Args: + config: LLM connection and model configuration. + """ + + def __init__(self, config: LLMConfig) -> None: + self.config = config + self._client = httpx.Client(base_url=config.base_url, timeout=120) + + def chat(self, prompt: str, *, system: str = "") -> str: + """Send a text prompt and return the response.""" + messages: list[dict[str, Any]] = [] + if system: + messages.append({"role": "system", "content": system}) + messages.append({"role": "user", "content": prompt}) + return self._generate(messages) + + def chat_with_image(self, prompt: str, image_data: bytes, *, system: str = "") -> str: + """Send a prompt with an image and return the response. + + Requires a vision-capable model (e.g. qwen3-vl). + """ + encoded = base64.b64encode(image_data).decode() + messages: list[dict[str, Any]] = [] + if system: + messages.append({"role": "system", "content": system}) + messages.append({"role": "user", "content": prompt, "images": [encoded]}) + return self._generate(messages) + + def _generate(self, messages: list[dict[str, Any]]) -> str: + """Call the ollama chat API.""" + payload = { + "model": self.config.model, + "messages": messages, + "stream": False, + "options": {"temperature": self.config.temperature}, + } + logger.info(f"LLM request to {self.config.model}") + response = self._client.post("/api/chat", json=payload) + response.raise_for_status() + data = response.json() + return data["message"]["content"] + + def list_models(self) -> list[str]: + """List available models on the ollama instance.""" + response = self._client.get("/api/tags") + response.raise_for_status() + return [m["name"] for m in response.json().get("models", [])] + + def close(self) -> None: + """Close the HTTP client.""" + self._client.close() diff --git a/python/signal_bot/main.py b/python/signal_bot/main.py new file mode 100644 index 0000000..caf2e62 --- /dev/null +++ b/python/signal_bot/main.py @@ -0,0 +1,138 @@ +"""Signal command and control bot — main entry point.""" + +from __future__ import annotations + +import logging +import time +from pathlib import Path +from typing import Annotated + +import typer + +from python.common import configure_logger +from python.signal_bot.commands.inventory import handle_inventory_update +from python.signal_bot.device_registry import DeviceRegistry +from python.signal_bot.llm_client import LLMClient +from python.signal_bot.models import BotConfig, LLMConfig, SignalMessage +from python.signal_bot.signal_client import SignalClient + +logger = logging.getLogger(__name__) + +# Command prefix — messages must start with this to be treated as commands. +CMD_PREFIX = "!" + +HELP_TEXT = """\ +Available commands: + !inventory — update van inventory from a text list + !inventory (+ photo) — update van inventory from a receipt photo + !status — show bot status + !help — show this help message + +Send a receipt photo with the message "!inventory" to scan it.\ +""" + + +def dispatch( + message: SignalMessage, + signal: SignalClient, + llm: LLMClient, + registry: DeviceRegistry, + inventory_path: Path, +) -> None: + """Route an incoming message to the right command handler.""" + source = message.source + + if registry.is_blocked(source): + return + + if not registry.is_verified(source): + signal.reply( + message, + "Your device is not verified. Ask the admin to verify your safety number over SSH.", + ) + return + + text = message.message.strip() + + if not text.startswith(CMD_PREFIX) and not message.attachments: + return + + cmd = text.lstrip(CMD_PREFIX).split()[0].lower() if text.startswith(CMD_PREFIX) else "" + + if cmd == "help": + signal.reply(message, HELP_TEXT) + + elif cmd == "status": + models = llm.list_models() + model_list = ", ".join(models[:10]) + device_count = len(registry.list_devices()) + signal.reply( + message, + f"Bot online.\nLLM: {llm.config.model}\nAvailable models: {model_list}\nKnown devices: {device_count}", + ) + + elif cmd == "inventory" or (message.attachments and not text.startswith(CMD_PREFIX)): + handle_inventory_update(message, signal, llm, inventory_path) + + else: + signal.reply(message, f"Unknown command: {cmd}\n\n{HELP_TEXT}") + + +def run_loop( + config: BotConfig, + signal: SignalClient, + llm: LLMClient, + registry: DeviceRegistry, +) -> None: + """Main polling loop.""" + inventory_path = Path(config.inventory_file) + logger.info(f"Bot started — polling every {config.poll_interval}s") + + while True: + try: + messages = signal.receive() + for message in messages: + logger.info(f"Message from {message.source}: {message.message[:80]}") + registry.record_contact(message.source, "") + dispatch(message, signal, llm, registry, inventory_path) + except Exception: + logger.exception("Error in message loop") + time.sleep(config.poll_interval) + + +def main( + signal_api_url: Annotated[str, typer.Option(envvar="SIGNAL_API_URL")], + phone_number: Annotated[str, typer.Option(envvar="SIGNAL_PHONE_NUMBER")], + llm_host: Annotated[str, typer.Option(envvar="LLM_HOST")], + llm_model: Annotated[str, typer.Option(envvar="LLM_MODEL")] = "qwen3-vl:32b", + llm_port: Annotated[int, typer.Option(envvar="LLM_PORT")] = 11434, + poll_interval: Annotated[int, typer.Option(help="Seconds between polls")] = 2, + inventory_file: Annotated[str, typer.Option(envvar="INVENTORY_FILE")] = "/var/lib/signal-bot/van_inventory.json", + registry_file: Annotated[str, typer.Option(envvar="REGISTRY_FILE")] = "/var/lib/signal-bot/device_registry.json", + log_level: Annotated[str, typer.Option()] = "INFO", +) -> None: + """Run the Signal command and control bot.""" + configure_logger(log_level) + + llm_config = LLMConfig(model=llm_model, host=llm_host, port=llm_port) + config = BotConfig( + signal_api_url=signal_api_url, + phone_number=phone_number, + llm=llm_config, + poll_interval=poll_interval, + inventory_file=inventory_file, + ) + + signal = SignalClient(config.signal_api_url, config.phone_number) + llm = LLMClient(llm_config) + registry = DeviceRegistry(signal, Path(registry_file)) + + try: + run_loop(config, signal, llm, registry) + finally: + signal.close() + llm.close() + + +if __name__ == "__main__": + typer.run(main) diff --git a/python/signal_bot/models.py b/python/signal_bot/models.py new file mode 100644 index 0000000..bcdd7bd --- /dev/null +++ b/python/signal_bot/models.py @@ -0,0 +1,86 @@ +"""Models for the Signal command and control bot.""" + +from __future__ import annotations + +from datetime import datetime # noqa: TC003 - pydantic needs this at runtime +from enum import StrEnum +from typing import Any + +from pydantic import BaseModel + + +class TrustLevel(StrEnum): + """Device trust level.""" + + VERIFIED = "verified" + UNVERIFIED = "unverified" + BLOCKED = "blocked" + + +class Device(BaseModel): + """A registered device tracked by safety number.""" + + phone_number: str + safety_number: str + trust_level: TrustLevel = TrustLevel.UNVERIFIED + first_seen: datetime + last_seen: datetime + + +class SignalMessage(BaseModel): + """An incoming Signal message.""" + + source: str + timestamp: int + message: str = "" + attachments: list[str] = [] + group_id: str | None = None + is_receipt: bool = False + + +class SignalEnvelope(BaseModel): + """Raw envelope from signal-cli-rest-api.""" + + envelope: dict[str, Any] + account: str | None = None + + +class InventoryItem(BaseModel): + """An item in the van inventory.""" + + name: str + quantity: int = 1 + category: str = "" + notes: str = "" + + +class InventoryUpdate(BaseModel): + """Result of processing an inventory update.""" + + items: list[InventoryItem] = [] + raw_response: str = "" + source_type: str = "" # "receipt_photo" or "text_list" + + +class LLMConfig(BaseModel): + """Configuration for an LLM backend.""" + + model: str + host: str + port: int = 11434 + temperature: float = 0.1 + + @property + def base_url(self) -> str: + """Ollama API base URL.""" + return f"http://{self.host}:{self.port}" + + +class BotConfig(BaseModel): + """Top-level bot configuration.""" + + signal_api_url: str + phone_number: str + llm: LLMConfig + poll_interval: int = 2 + inventory_file: str = "van_inventory.json" diff --git a/python/signal_bot/signal_client.py b/python/signal_bot/signal_client.py new file mode 100644 index 0000000..9f6bd0f --- /dev/null +++ b/python/signal_bot/signal_client.py @@ -0,0 +1,112 @@ +"""Client for the signal-cli-rest-api.""" + +from __future__ import annotations + +import logging +from typing import Any + +import httpx + +from python.signal_bot.models import SignalMessage + +logger = logging.getLogger(__name__) + + +class SignalClient: + """Communicate with signal-cli-rest-api. + + Args: + base_url: URL of the signal-cli-rest-api (e.g. http://localhost:8989). + phone_number: The registered phone number to send/receive as. + """ + + def __init__(self, base_url: str, phone_number: str) -> None: + self.base_url = base_url.rstrip("/") + self.phone_number = phone_number + self._client = httpx.Client(base_url=self.base_url, timeout=30) + + def receive(self) -> list[SignalMessage]: + """Poll for new messages.""" + response = self._client.get(f"/v1/receive/{self.phone_number}") + response.raise_for_status() + envelopes: list[dict[str, Any]] = response.json() + + messages: list[SignalMessage] = [] + for raw in envelopes: + envelope = raw.get("envelope", {}) + data_message = envelope.get("dataMessage") + if not data_message: + continue + + attachment_ids = [ + att["id"] for att in data_message.get("attachments", []) if "id" in att + ] + + group_info = data_message.get("groupInfo") + group_id = group_info.get("groupId") if group_info else None + + messages.append( + SignalMessage( + source=envelope.get("source", ""), + timestamp=envelope.get("timestamp", 0), + message=data_message.get("message", "") or "", + attachments=attachment_ids, + group_id=group_id, + ), + ) + + return messages + + def send(self, recipient: str, message: str) -> None: + """Send a text message.""" + payload = { + "message": message, + "number": self.phone_number, + "recipients": [recipient], + } + response = self._client.post("/v2/send", json=payload) + response.raise_for_status() + + def send_to_group(self, group_id: str, message: str) -> None: + """Send a message to a group.""" + payload = { + "message": message, + "number": self.phone_number, + "recipients": [group_id], + } + response = self._client.post("/v2/send", json=payload) + response.raise_for_status() + + def get_attachment(self, attachment_id: str) -> bytes: + """Download an attachment by ID.""" + response = self._client.get(f"/v1/attachments/{attachment_id}") + response.raise_for_status() + return response.content + + def get_identities(self) -> list[dict[str, Any]]: + """List known identities and their trust levels.""" + response = self._client.get(f"/v1/identities/{self.phone_number}") + response.raise_for_status() + return response.json() + + def trust_identity(self, number_to_trust: str, *, trust_all_known_keys: bool = False) -> None: + """Trust an identity (verify safety number).""" + payload: dict[str, Any] = {} + if trust_all_known_keys: + payload["trust_all_known_keys"] = True + response = self._client.put( + f"/v1/identities/{self.phone_number}/trust/{number_to_trust}", + json=payload, + ) + response.raise_for_status() + + def reply(self, message: SignalMessage, text: str) -> None: + """Reply to a message, routing to group or individual.""" + if message.group_id: + self.send_to_group(message.group_id, text) + else: + self.send(message.source, text) + + def close(self) -> None: + """Close the HTTP client.""" + self._client.close() diff --git a/systems/jeeves/services/signal_bot.nix b/systems/jeeves/services/signal_bot.nix new file mode 100644 index 0000000..289d5af --- /dev/null +++ b/systems/jeeves/services/signal_bot.nix @@ -0,0 +1,42 @@ +{ + pkgs, + inputs, + ... +}: +let + vars = import ../vars.nix; +in +{ + systemd.services.signal-bot = { + description = "Signal command and control bot"; + after = [ + "network.target" + "podman-signal_cli_rest_api.service" + ]; + wants = [ "podman-signal_cli_rest_api.service" ]; + wantedBy = [ "multi-user.target" ]; + + environment = { + PYTHONPATH = "${inputs.self}"; + }; + + serviceConfig = { + Type = "simple"; + EnvironmentFile = "${vars.secrets}/services/signal-bot"; + ExecStart = "${pkgs.my_python}/bin/python -m python.signal_bot.main"; + StateDirectory = "signal-bot"; + Restart = "on-failure"; + RestartSec = "10s"; + StandardOutput = "journal"; + StandardError = "journal"; + NoNewPrivileges = true; + ProtectSystem = "strict"; + ProtectHome = "read-only"; + PrivateTmp = true; + ReadWritePaths = [ "/var/lib/signal-bot" ]; + ReadOnlyPaths = [ + "${inputs.self}" + ]; + }; + }; +} diff --git a/tests/test_signal_bot.py b/tests/test_signal_bot.py new file mode 100644 index 0000000..954593c --- /dev/null +++ b/tests/test_signal_bot.py @@ -0,0 +1,209 @@ +"""Tests for the Signal command and control bot.""" + +from __future__ import annotations + +import json +from unittest.mock import MagicMock + +import pytest + +from python.signal_bot.commands.inventory import ( + _format_summary, + _merge_items, + load_inventory, + parse_llm_response, + save_inventory, +) +from python.signal_bot.device_registry import DeviceRegistry +from python.signal_bot.llm_client import LLMClient +from python.signal_bot.main import dispatch +from python.signal_bot.models import ( + InventoryItem, + LLMConfig, + SignalMessage, + TrustLevel, +) +from python.signal_bot.signal_client import SignalClient + + +class TestModels: + def test_llm_config_base_url(self): + config = LLMConfig(model="test:7b", host="bob.local", port=11434) + assert config.base_url == "http://bob.local:11434" + + def test_trust_level_values(self): + assert TrustLevel.VERIFIED == "verified" + assert TrustLevel.UNVERIFIED == "unverified" + assert TrustLevel.BLOCKED == "blocked" + + def test_signal_message_defaults(self): + msg = SignalMessage(source="+1234", timestamp=0) + assert msg.message == "" + assert msg.attachments == [] + assert msg.group_id is None + + def test_inventory_item_defaults(self): + item = InventoryItem(name="wrench") + assert item.quantity == 1 + assert item.category == "" + + +class TestInventoryParsing: + def test_parse_llm_response_basic(self): + raw = '[{"name": "water", "quantity": 6, "category": "supplies", "notes": ""}]' + items = parse_llm_response(raw) + assert len(items) == 1 + assert items[0].name == "water" + assert items[0].quantity == 6 + + def test_parse_llm_response_with_code_fence(self): + raw = '```json\n[{"name": "tape", "quantity": 1, "category": "tools", "notes": ""}]\n```' + items = parse_llm_response(raw) + assert len(items) == 1 + assert items[0].name == "tape" + + def test_parse_llm_response_invalid_json(self): + with pytest.raises(json.JSONDecodeError): + parse_llm_response("not json at all") + + def test_merge_items_new(self): + existing = [InventoryItem(name="tape", quantity=2, category="tools")] + new = [InventoryItem(name="rope", quantity=1, category="supplies")] + merged = _merge_items(existing, new) + assert len(merged) == 2 + + def test_merge_items_duplicate_sums_quantity(self): + existing = [InventoryItem(name="tape", quantity=2, category="tools")] + new = [InventoryItem(name="tape", quantity=3, category="tools")] + merged = _merge_items(existing, new) + assert len(merged) == 1 + assert merged[0].quantity == 5 + + def test_merge_items_case_insensitive(self): + existing = [InventoryItem(name="Tape", quantity=1)] + new = [InventoryItem(name="tape", quantity=2)] + merged = _merge_items(existing, new) + assert len(merged) == 1 + assert merged[0].quantity == 3 + + def test_format_summary(self): + items = [InventoryItem(name="water", quantity=6, category="supplies")] + summary = _format_summary(items) + assert "water" in summary + assert "x6" in summary + + def test_save_and_load_inventory(self, tmp_path): + path = tmp_path / "inventory.json" + items = [InventoryItem(name="wrench", quantity=1, category="tools")] + save_inventory(path, items) + loaded = load_inventory(path) + assert len(loaded) == 1 + assert loaded[0].name == "wrench" + + def test_load_inventory_missing_file(self, tmp_path): + path = tmp_path / "does_not_exist.json" + assert load_inventory(path) == [] + + +class TestDeviceRegistry: + @pytest.fixture + def signal_mock(self): + return MagicMock(spec=SignalClient) + + @pytest.fixture + def registry(self, tmp_path, signal_mock): + return DeviceRegistry(signal_mock, tmp_path / "devices.json") + + def test_new_device_is_unverified(self, registry): + registry.record_contact("+1234", "abc123") + assert not registry.is_verified("+1234") + + def test_verify_device(self, registry): + registry.record_contact("+1234", "abc123") + assert registry.verify("+1234") + assert registry.is_verified("+1234") + + def test_verify_unknown_device(self, registry): + assert not registry.verify("+9999") + + def test_block_device(self, registry): + registry.record_contact("+1234", "abc123") + registry.block("+1234") + assert registry.is_blocked("+1234") + + def test_safety_number_change_resets_trust(self, registry): + registry.record_contact("+1234", "abc123") + registry.verify("+1234") + assert registry.is_verified("+1234") + registry.record_contact("+1234", "different_safety_number") + assert not registry.is_verified("+1234") + + def test_persistence(self, tmp_path, signal_mock): + path = tmp_path / "devices.json" + reg1 = DeviceRegistry(signal_mock, path) + reg1.record_contact("+1234", "abc123") + reg1.verify("+1234") + + reg2 = DeviceRegistry(signal_mock, path) + assert reg2.is_verified("+1234") + + def test_list_devices(self, registry): + registry.record_contact("+1234", "abc") + registry.record_contact("+5678", "def") + assert len(registry.list_devices()) == 2 + + +class TestDispatch: + @pytest.fixture + def signal_mock(self): + return MagicMock(spec=SignalClient) + + @pytest.fixture + def llm_mock(self): + return MagicMock(spec=LLMClient) + + @pytest.fixture + def registry_mock(self): + mock = MagicMock(spec=DeviceRegistry) + mock.is_blocked.return_value = False + mock.is_verified.return_value = True + return mock + + def test_blocked_device_ignored(self, signal_mock, llm_mock, registry_mock, tmp_path): + registry_mock.is_blocked.return_value = True + msg = SignalMessage(source="+1234", timestamp=0, message="!help") + dispatch(msg, signal_mock, llm_mock, registry_mock, tmp_path / "inv.json") + signal_mock.reply.assert_not_called() + + def test_unverified_device_gets_warning(self, signal_mock, llm_mock, registry_mock, tmp_path): + registry_mock.is_verified.return_value = False + msg = SignalMessage(source="+1234", timestamp=0, message="!help") + dispatch(msg, signal_mock, llm_mock, registry_mock, tmp_path / "inv.json") + signal_mock.reply.assert_called_once() + assert "not verified" in signal_mock.reply.call_args[0][1] + + def test_help_command(self, signal_mock, llm_mock, registry_mock, tmp_path): + msg = SignalMessage(source="+1234", timestamp=0, message="!help") + dispatch(msg, signal_mock, llm_mock, registry_mock, tmp_path / "inv.json") + signal_mock.reply.assert_called_once() + assert "Available commands" in signal_mock.reply.call_args[0][1] + + def test_unknown_command(self, signal_mock, llm_mock, registry_mock, tmp_path): + msg = SignalMessage(source="+1234", timestamp=0, message="!foobar") + dispatch(msg, signal_mock, llm_mock, registry_mock, tmp_path / "inv.json") + signal_mock.reply.assert_called_once() + assert "Unknown command" in signal_mock.reply.call_args[0][1] + + def test_non_command_message_ignored(self, signal_mock, llm_mock, registry_mock, tmp_path): + msg = SignalMessage(source="+1234", timestamp=0, message="hello there") + dispatch(msg, signal_mock, llm_mock, registry_mock, tmp_path / "inv.json") + signal_mock.reply.assert_not_called() + + def test_status_command(self, signal_mock, llm_mock, registry_mock, tmp_path): + llm_mock.list_models.return_value = ["model1", "model2"] + llm_mock.config = LLMConfig(model="test:7b", host="bob") + registry_mock.list_devices.return_value = [] + msg = SignalMessage(source="+1234", timestamp=0, message="!status") + dispatch(msg, signal_mock, llm_mock, registry_mock, tmp_path / "inv.json") + signal_mock.reply.assert_called_once() + assert "Bot online" in signal_mock.reply.call_args[0][1]