Compare commits

..

37 Commits

Author SHA1 Message Date
Richie 7740ebb594 opning ports for testing
pytest / pytest (pull_request) Failing after 26s
build_systems / build-bob (pull_request) Successful in 47s
build_systems / build-rhapsody-in-green (pull_request) Successful in 1m0s
treefmt / nix fmt (pull_request) Successful in 5s
build_systems / build-brain (pull_request) Successful in 46s
build_systems / build-leviathan (pull_request) Successful in 53s
build_systems / build-jeeves (pull_request) Successful in 2m37s
2026-06-13 22:17:46 -04:00
Richie 07a9adfdd5 added a index for the VEctor DB 2026-06-13 22:17:46 -04:00
Richie 74e4c2e921 improved BM25 write 2026-06-13 22:17:46 -04:00
Richie 70d24c2a85 added ZstdMiddleware to ebook_search 2026-06-13 22:17:46 -04:00
Richie 773e9f9d4a added vector_engine to fix name postgres name space issue 2026-06-13 22:17:46 -04:00
Richie 8b608f7aa0 reworked ebook_search routers 2026-06-13 22:17:46 -04:00
Richie be6d8c9db9 made fastapi tools 2026-06-13 22:17:46 -04:00
Richie 6bb6f935b1 added proper cache invalidation to load_bm25_corpus 2026-06-13 22:17:46 -04:00
Richie c4e8a395d2 updated tests 2026-06-13 22:17:46 -04:00
Richie d51ed42919 improved reranking weights 2026-06-13 22:17:46 -04:00
Richie 7466c7ed3a fixed duplicat enrichment 2026-06-13 22:17:46 -04:00
Richie e9b574aa58 improved queary for vector search 2026-06-13 22:17:46 -04:00
Richie bcd855cb88 cleaned up installer.py 2026-06-13 22:17:46 -04:00
Richie b976fbf13f add .ebook_search_bm25 to gitignore 2026-06-13 22:17:46 -04:00
Richie 5ba41feb2d updated python 2026-06-13 22:17:46 -04:00
Richie 3ebd9df21f setup tests 2026-06-13 22:17:46 -04:00
Richie 73177ef399 build api and frountend 2026-06-13 22:17:46 -04:00
Richie d81f5a0ec1 added answer.py and config 2026-06-13 22:17:46 -04:00
Richie 4dac3d1c60 added __init__ 2026-06-13 22:17:46 -04:00
Richie a802dbd2b3 made llm_interface.py 2026-06-13 22:17:46 -04:00
Richie 8e6a2809b0 added rerank 2026-06-13 22:17:46 -04:00
Richie c5293b0dcf built ingest 2026-06-13 22:17:46 -04:00
Richie d740b25b2c built rag search setup 2026-06-13 22:17:46 -04:00
Richie febb88dc77 set up embedding system 2026-06-13 22:17:46 -04:00
Richie b9949e8d72 built BM25 search foundation 2026-06-13 22:17:46 -04:00
Richie 345384e76f clean up 2026-06-13 22:17:46 -04:00
Richie ac899d5fca added ebook embedding to orm 2026-06-13 22:17:46 -04:00
Richie 76e206a727 removed hedgedoc 2026-06-13 22:17:46 -04:00
Richie 090e8dddca adding embedding Models to jeeves 2026-06-13 22:17:46 -04:00
Richie c8505d413c updated series_index to float and added UniqueConstraint to audiobook and audiobook_author 2026-06-13 22:17:46 -04:00
Richie ff685112a6 fixed omnibus for audio books 2026-06-13 22:17:46 -04:00
Richie e113dc3ef3 moved installer to python dir 2026-06-13 22:17:46 -04:00
Richie 340f37f114 deleted frontend dir 2026-06-13 22:17:46 -04:00
Richie 5611daab97 added llm_tool_calling.py 2026-06-13 22:17:46 -04:00
Richie f20bee82ec built workflow 2026-06-13 22:17:46 -04:00
Richie ef4e6f75a5 Add catalog.py for manually adding authors and series to the database. 2026-06-13 22:17:46 -04:00
Richie 1ab5d3d650 adding audiobook data to DB 2026-06-13 22:17:46 -04:00
50 changed files with 3086 additions and 153 deletions
+1
View File
@@ -64,6 +64,7 @@
ruff
scalene
sqlalchemy
sqlalchemy
bm25s
tenacity
textual
+5 -7
View File
@@ -3,7 +3,7 @@ name = "system_tools"
version = "0.1.0"
description = ""
authors = [{ name = "Richie Cahill", email = "richie@tmmworkshop.com" }]
requires-python = "~=3.14.0"
requires-python = "~=3.13.0"
readme = "README.md"
license = "MIT"
# these dependencies are a best effort and aren't guaranteed to work
@@ -12,22 +12,20 @@ dependencies = [
"alembic",
"apprise",
"apscheduler",
"fastapi",
"fastapi-cli",
"httpx",
"python-multipart",
"polars",
"psycopg[binary]",
"pydantic",
"python-multipart",
"pyyaml",
"sqlalchemy",
"tenacity",
"tinytuya",
"typer",
"websockets",
]
[project.scripts]
database = "python.database_cli:app"
van-inventory = "python.van_inventory.main:serve"
whisper-transcribe = "python.tools.whisper.transcribe:main"
[dependency-groups]
@@ -43,7 +41,7 @@ dev = [
[tool.ruff]
target-version = "py314"
target-version = "py313"
line-length = 120
+2 -6
View File
@@ -1,10 +1,9 @@
"""FastAPI interface for Contact database."""
from __future__ import annotations
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, Annotated
from typing import Annotated
import typer
import uvicorn
@@ -15,9 +14,6 @@ from python.common import configure_logger
from python.fastapi_tools import ZstdMiddleware
from python.orm.common import get_postgres_engine
if TYPE_CHECKING:
from collections.abc import AsyncIterator
logger = logging.getLogger(__name__)
+1 -1
View File
@@ -9,7 +9,7 @@ from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from python.fastapi_tools.db import DbSession # noqa: TC001 this is a FastAPI needed at runtime
from python.fastapi_tools.db import DbSession
from python.orm.richie.contact import Contact, ContactRelationship, Need, RelationshipType
TEMPLATES_DIR = Path(__file__).parent.parent / "templates"
+1 -1
View File
@@ -9,7 +9,7 @@ from fastapi.templating import Jinja2Templates
from sqlalchemy import select
from sqlalchemy.orm import Session, selectinload
from python.fastapi_tools.db import DbSession # noqa: TC001 this is a FastAPI needed at runtime
from python.fastapi_tools.db import DbSession
from python.orm.richie.contact import Contact, ContactRelationship, Need, RelationshipType
TEMPLATES_DIR = Path(__file__).parent.parent / "templates"
+15 -3
View File
@@ -4,10 +4,12 @@ Usage:
database <db_name> <command> [args...]
Examples:
database van_inventory upgrade head
database van_inventory downgrade head-1
database van_inventory revision --autogenerate -m "add meals table"
database van_inventory check
database richie check
database richie upgrade head
database richie downgrade head-1
database richie revision --autogenerate -m "add meals table"
"""
from __future__ import annotations
@@ -46,7 +48,10 @@ class DatabaseConfig:
def alembic_config(self) -> Config:
"""Build an alembic Config for this database."""
cfg = Config()
# Runtime import needed — Config is in TYPE_CHECKING for the return type annotation
from alembic.config import Config as AlembicConfig # noqa: PLC0415
cfg = AlembicConfig()
cfg.set_main_option("script_location", self.script_location)
cfg.set_main_option("file_template", self.file_template)
cfg.set_main_option("prepend_sys_path", ".")
@@ -71,6 +76,13 @@ DATABASES: dict[str, DatabaseConfig] = {
base_class_name="RichieBase",
models_module="python.orm.richie",
),
"van_inventory": DatabaseConfig(
env_prefix="VAN_INVENTORY",
version_location="python/alembic/van_inventory/versions",
base_module="python.orm.van_inventory.base",
base_class_name="VanInventoryBase",
models_module="python.orm.van_inventory.models",
),
}
+1 -7
View File
@@ -13,7 +13,6 @@ from sqlalchemy import literal, select
from sqlalchemy.orm import Session
from python.ebook_search.bm25_corpus import (
BM25CorpusUnavailableError,
load_bm25_corpus,
score_bm25_corpus,
)
@@ -253,12 +252,7 @@ def vector_candidates(engine: Engine, query: str, config: EbookSearchConfig) ->
def bm25_candidates(query: str, config: EbookSearchConfig) -> list[SearchResult]:
"""Return BM25-ranked lexical candidates using the persisted corpus."""
try:
corpus = load_bm25_corpus(config)
except BM25CorpusUnavailableError as error:
logger.warning("ebook_bm25_index_unavailable_skipping error=%s", error)
return []
corpus = load_bm25_corpus(config)
if not corpus.records:
logger.info("ebook_bm25_search_complete corpus=0 candidates=0")
return []
+2 -6
View File
@@ -1,15 +1,11 @@
"""FastAPI dependencies."""
from __future__ import annotations
from typing import TYPE_CHECKING, Annotated
from collections.abc import Iterator
from typing import Annotated
from fastapi import Depends, Request
from sqlalchemy.orm import Session
if TYPE_CHECKING:
from collections.abc import Iterator
def get_db(request: Request) -> Iterator[Session]:
"""Get database session from app state."""
+1 -5
View File
@@ -1,14 +1,10 @@
"""Zstd response compression middleware."""
from compression import zstd
from typing import TYPE_CHECKING
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.requests import Request
from starlette.responses import Response
if TYPE_CHECKING:
from starlette.requests import Request
MINIMUM_RESPONSE_SIZE = 500
+2 -6
View File
@@ -1,10 +1,9 @@
"""FastAPI heater control service."""
from __future__ import annotations
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, Annotated
from typing import Annotated
import typer
import uvicorn
@@ -14,9 +13,6 @@ from python.common import configure_logger
from python.heater.controller import HeaterController
from python.heater.models import ActionResult, DeviceConfig, HeaterStatus
if TYPE_CHECKING:
from collections.abc import AsyncIterator
logger = logging.getLogger(__name__)
-1
View File
@@ -262,7 +262,6 @@ def installer(
):
run(command, check=True, stdin=test.stdout)
# Fixed mount point for the new system; the installer runs as root on a fresh disk
mnt_dir = "/tmp/nix_install" # noqa: S108
Path(mnt_dir).mkdir(parents=True, exist_ok=True)
+2
View File
@@ -1,7 +1,9 @@
"""ORM package exports."""
from python.orm.richie.base import RichieBase
from python.orm.van_inventory.base import VanInventoryBase
__all__ = [
"RichieBase",
"VanInventoryBase",
]
+4 -4
View File
@@ -2,7 +2,7 @@
from __future__ import annotations
from sqlalchemy import ForeignKey, UniqueConstraint
from sqlalchemy import ForeignKey, String, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from python.orm.richie.base import TableBase
@@ -14,7 +14,7 @@ class AudiobookAuthor(TableBase):
__tablename__ = "audiobook_author"
__table_args__ = (UniqueConstraint("name"),)
name: Mapped[str]
name: Mapped[str] = mapped_column(String, unique=True)
books: Mapped[list[Audiobook]] = relationship("Audiobook", back_populates="author")
series: Mapped[list[AudiobookSeries]] = relationship("AudiobookSeries", back_populates="author")
@@ -26,7 +26,7 @@ class AudiobookSeries(TableBase):
__tablename__ = "audiobook_series"
__table_args__ = (UniqueConstraint("author_id", "name"),)
name: Mapped[str]
name: Mapped[str] = mapped_column(String)
author_id: Mapped[int] = mapped_column(ForeignKey("main.audiobook_author.id", ondelete="CASCADE"))
author: Mapped[AudiobookAuthor] = relationship("AudiobookAuthor", back_populates="series")
@@ -46,7 +46,7 @@ class Audiobook(TableBase):
),
)
title: Mapped[str]
title: Mapped[str] = mapped_column(String)
author_id: Mapped[int] = mapped_column(ForeignKey("main.audiobook_author.id", ondelete="CASCADE"))
series_id: Mapped[int | None] = mapped_column(ForeignKey("main.audiobook_series.id", ondelete="SET NULL"))
series_index: Mapped[float] = mapped_column(default=0.0)
+1
View File
@@ -0,0 +1 @@
"""Van inventory database ORM exports."""
+39
View File
@@ -0,0 +1,39 @@
"""Van inventory database ORM base."""
from __future__ import annotations
from datetime import datetime
from sqlalchemy import DateTime, MetaData, func
from sqlalchemy.ext.declarative import AbstractConcreteBase
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from python.orm.common import NAMING_CONVENTION
class VanInventoryBase(DeclarativeBase):
"""Base class for van_inventory database ORM models."""
schema_name = "main"
metadata = MetaData(
schema=schema_name,
naming_convention=NAMING_CONVENTION,
)
class VanTableBase(AbstractConcreteBase, VanInventoryBase):
"""Abstract concrete base for van_inventory tables with IDs and timestamps."""
__abstract__ = True
id: Mapped[int] = mapped_column(primary_key=True)
created: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
)
updated: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
)
+46
View File
@@ -0,0 +1,46 @@
"""Van inventory ORM models."""
from __future__ import annotations
from sqlalchemy import ForeignKey, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from python.orm.van_inventory.base import VanTableBase
class Item(VanTableBase):
"""A food item in the van."""
__tablename__ = "items"
name: Mapped[str] = mapped_column(unique=True)
quantity: Mapped[float] = mapped_column(default=0)
unit: Mapped[str]
category: Mapped[str | None]
meal_ingredients: Mapped[list[MealIngredient]] = relationship(back_populates="item")
class Meal(VanTableBase):
"""A meal that can be made from items in the van."""
__tablename__ = "meals"
name: Mapped[str] = mapped_column(unique=True)
instructions: Mapped[str | None]
ingredients: Mapped[list[MealIngredient]] = relationship(back_populates="meal")
class MealIngredient(VanTableBase):
"""Links a meal to the items it requires, with quantities."""
__tablename__ = "meal_ingredients"
__table_args__ = (UniqueConstraint("meal_id", "item_id"),)
meal_id: Mapped[int] = mapped_column(ForeignKey("meals.id"))
item_id: Mapped[int] = mapped_column(ForeignKey("items.id"))
quantity_needed: Mapped[float]
meal: Mapped[Meal] = relationship(back_populates="ingredients")
item: Mapped[Item] = relationship(back_populates="meal_ingredients")
+1
View File
@@ -0,0 +1 @@
game_data/
+1
View File
@@ -0,0 +1 @@
"""init."""
+675
View File
@@ -0,0 +1,675 @@
"""Base logic for the Splendor game."""
from __future__ import annotations
import itertools
import json
import random
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Literal, Protocol
if TYPE_CHECKING:
from collections.abc import Sequence
from pathlib import Path
GemColor = Literal["white", "blue", "green", "red", "black", "gold"]
GEM_COLORS: tuple[GemColor, ...] = (
"white",
"blue",
"green",
"red",
"black",
"gold",
)
BASE_COLORS: tuple[GemColor, ...] = (
"white",
"blue",
"green",
"red",
"black",
)
GEM_ORDER: list[GemColor] = list(GEM_COLORS)
GEM_INDEX: dict[GemColor, int] = {c: i for i, c in enumerate(GEM_ORDER)}
BASE_INDEX: dict[GemColor, int] = {c: i for i, c in enumerate(BASE_COLORS)}
@dataclass(frozen=True)
class Card:
"""Development card: gives points + a permanent gem discount."""
tier: int
points: int
color: GemColor
cost: dict[GemColor, int]
@dataclass(frozen=True)
class Noble:
"""Noble tile: gives points if you have enough bonuses."""
name: str
points: int
requirements: dict[GemColor, int]
@dataclass
class PlayerState:
"""State of a player in the game."""
strategy: Strategy
tokens: dict[GemColor, int] = field(default_factory=lambda: dict.fromkeys(GEM_COLORS, 0))
discounts: dict[GemColor, int] = field(default_factory=lambda: dict.fromkeys(GEM_COLORS, 0))
cards: list[Card] = field(default_factory=list)
reserved: list[Card] = field(default_factory=list)
nobles: list[Noble] = field(default_factory=list)
card_score: int = 0
noble_score: int = 0
def total_tokens(self) -> int:
"""Total tokens in player's bank."""
return sum(self.tokens.values())
def add_noble(self, noble: Noble) -> None:
"""Add a noble to the player."""
self.nobles.append(noble)
self.noble_score = sum(noble.points for noble in self.nobles)
def add_card(self, card: Card) -> None:
"""Add a card to the player."""
self.cards.append(card)
self.card_score = sum(card.points for card in self.cards)
@property
def score(self) -> int:
"""Total points in player's cards + nobles."""
return self.card_score + self.noble_score
def can_afford(self, card: Card) -> bool:
"""Check if player can afford card, using discounts + gold."""
missing = 0
gold = self.tokens["gold"]
for color, cost in card.cost.items():
missing += max(0, cost - self.discounts.get(color, 0) - self.tokens.get(color, 0))
if missing > gold:
return False
return True
def pay_for_card(self, card: Card) -> dict[GemColor, int]:
"""Pay tokens for card, move card to tableau, return payment for bank."""
if not self.can_afford(card):
msg = f"cannot afford card {card}"
raise ValueError(msg)
payment: dict[GemColor, int] = dict.fromkeys(GEM_COLORS, 0)
gold_available = self.tokens["gold"]
for color in BASE_COLORS:
cost = card.cost.get(color, 0)
effective_cost = max(0, cost - self.discounts.get(color, 0))
use = min(self.tokens[color], effective_cost)
self.tokens[color] -= use
payment[color] += use
remaining = effective_cost - use
if remaining > 0:
use_gold = min(gold_available, remaining)
gold_available -= use_gold
self.tokens["gold"] -= use_gold
payment["gold"] += use_gold
self.add_card(card)
self.discounts[card.color] += 1
return payment
def get_default_starting_tokens(player_count: int) -> dict[GemColor, int]:
"""get_default_starting_tokens."""
token_count = (player_count * player_count - 3 * player_count + 10) // 2
return {
"white": token_count,
"blue": token_count,
"green": token_count,
"red": token_count,
"black": token_count,
"gold": 5,
}
@dataclass
class GameConfig:
"""Game configuration: gems, bank, cards, nobles, etc."""
win_score: int = 15
table_cards_per_tier: int = 4
reserve_limit: int = 3
token_limit: int = 10
turn_limit: int = 1000
minimum_tokens_to_buy_2: int = 4
max_token_take: int = 3
cards: list[Card] = field(default_factory=list)
nobles: list[Noble] = field(default_factory=list)
class GameState:
"""Game state: players, bank, decks, table, available nobles, etc."""
def __init__(
self,
config: GameConfig,
players: list[PlayerState],
bank: dict[GemColor, int],
decks_by_tier: dict[int, list[Card]],
table_by_tier: dict[int, list[Card]],
available_nobles: list[Noble],
) -> None:
"""Game state."""
self.config = config
self.players = players
self.bank = bank
self.decks_by_tier = decks_by_tier
self.table_by_tier = table_by_tier
self.available_nobles = available_nobles
self.noble_min_requirements = 0
self.get_noble_min_requirements()
self.current_player_index = 0
self.finished = False
def get_noble_min_requirements(self) -> None:
"""Find the minimum requirement for all available nobles."""
test = 0
for noble in self.available_nobles:
test = max(test, min(foo for foo in noble.requirements.values()))
self.noble_min_requirements = test
def next_player(self) -> None:
"""Advance to the next player."""
self.current_player_index = (self.current_player_index + 1) % len(self.players)
@property
def current_player(self) -> PlayerState:
"""Current player."""
return self.players[self.current_player_index]
def refill_table(self) -> None:
"""Refill face-up cards from decks."""
for tier, deck in self.decks_by_tier.items():
table = self.table_by_tier[tier]
while len(table) < self.config.table_cards_per_tier and deck:
table.append(deck.pop())
def check_winner_simple(self) -> PlayerState | None:
"""Simplified: end immediately when someone hits win_score."""
eligible = [player for player in self.players if player.score >= self.config.win_score]
if not eligible:
return None
eligible.sort(
key=lambda p: (p.score, -len(p.cards)),
reverse=True,
)
self.finished = True
return eligible[0]
class Action:
"""Marker protocol for actions."""
@dataclass
class TakeDifferent(Action):
"""Take up to 3 different gem colors."""
colors: list[GemColor]
@dataclass
class TakeDouble(Action):
"""Take two of the same color."""
color: GemColor
@dataclass
class BuyCard(Action):
"""Buy a face-up card."""
tier: int
index: int
@dataclass
class BuyCardReserved(Action):
"""Buy a face-up card."""
index: int
@dataclass
class ReserveCard(Action):
"""Reserve a face-up card."""
tier: int
index: int | None = None
from_deck: bool = False
class Strategy(Protocol):
"""Implement this to make a bot or human controller."""
def __init__(self, name: str) -> None:
"""Initialize a strategy."""
self.name = name
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Return an Action, or None to concede/end."""
raise NotImplementedError
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Called if player has more than token_limit tokens after an action.
Default: naive auto-discard.
"""
return auto_discard_tokens(player, excess)
def choose_noble(
self,
game: GameState, # noqa: ARG002
player: PlayerState, # noqa: ARG002
nobles: list[Noble],
) -> Noble:
"""Called if player qualifies for multiple nobles. Default: first."""
return nobles[0]
def auto_discard_tokens(player: PlayerState, excess: int) -> dict[GemColor, int]:
"""Very dumb discard logic: discard from colors you have the most of."""
to_discard: dict[GemColor, int] = dict.fromkeys(GEM_COLORS, 0)
remaining = excess
while remaining > 0:
color = max(player.tokens, key=lambda c: player.tokens[c])
if player.tokens[color] == 0:
break
player.tokens[color] -= 1
to_discard[color] += 1
remaining -= 1
return to_discard
def enforce_token_limit(
game: GameState,
strategy: Strategy,
player: PlayerState,
) -> None:
"""If player has more than token_limit tokens, force discards."""
limit = game.config.token_limit
total = player.total_tokens()
if total <= limit:
return
excess = total - limit
discards = strategy.choose_discard(game, player, excess)
for color, amount in discards.items():
available = player.tokens[color]
to_remove = min(amount, available)
if to_remove <= 0:
continue
player.tokens[color] -= to_remove
game.bank[color] += to_remove
remaining = player.total_tokens() - limit
if remaining > 0:
auto = auto_discard_tokens(player, remaining)
for color, amount in auto.items():
game.bank[color] += amount
def _check_nobles_for_player(player: PlayerState, noble: Noble) -> bool:
# this rule is slower
for color, cost in noble.requirements.items(): # noqa: SIM110
if player.discounts[color] < cost:
return False
return True
def check_nobles_for_player(
game: GameState,
strategy: Strategy,
player: PlayerState,
) -> None:
"""Award at most one noble to player if they qualify."""
if game.noble_min_requirements > max(player.discounts.values()):
return
candidates = [noble for noble in game.available_nobles if _check_nobles_for_player(player, noble)]
if not candidates:
return
chosen = candidates[0] if len(candidates) == 1 else strategy.choose_noble(game, player, candidates)
if chosen not in game.available_nobles:
return
game.available_nobles.remove(chosen)
game.get_noble_min_requirements()
player.add_noble(chosen)
def apply_take_different(game: GameState, strategy: Strategy, action: TakeDifferent) -> None:
"""Mutate game state according to action."""
player = game.current_player
colors = [color for color in action.colors if color in BASE_COLORS and game.bank[color] > 0]
if not (1 <= len(colors) <= game.config.max_token_take):
return
for color in colors:
game.bank[color] -= 1
player.tokens[color] += 1
enforce_token_limit(game, strategy, player)
def apply_take_double(game: GameState, strategy: Strategy, action: TakeDouble) -> None:
"""Mutate game state according to action."""
player = game.current_player
color = action.color
if color not in BASE_COLORS:
return
if game.bank[color] < game.config.minimum_tokens_to_buy_2:
return
game.bank[color] -= 2
player.tokens[color] += 2
enforce_token_limit(game, strategy, player)
def apply_buy_card(game: GameState, _strategy: Strategy, action: BuyCard) -> None:
"""Mutate game state according to action."""
player = game.current_player
row = game.table_by_tier.get(action.tier)
if row is None or not (0 <= action.index < len(row)):
return
card = row[action.index]
if not player.can_afford(card):
return
row.pop(action.index)
payment = player.pay_for_card(card)
for color, amount in payment.items():
game.bank[color] += amount
game.refill_table()
def apply_buy_card_reserved(game: GameState, _strategy: Strategy, action: BuyCardReserved) -> None:
"""Mutate game state according to action."""
player = game.current_player
if not (0 <= action.index < len(player.reserved)):
return
card = player.reserved[action.index]
if not player.can_afford(card):
return
player.reserved.pop(action.index)
payment = player.pay_for_card(card)
for color, amount in payment.items():
game.bank[color] += amount
def apply_reserve_card(game: GameState, strategy: Strategy, action: ReserveCard) -> None:
"""Mutate game state according to action."""
player = game.current_player
if len(player.reserved) >= game.config.reserve_limit:
return
card: Card | None = None
if action.from_deck:
deck = game.decks_by_tier.get(action.tier)
if deck:
card = deck.pop()
else:
row = game.table_by_tier.get(action.tier)
if row is None:
return
if action.index is None or not (0 <= action.index < len(row)):
return
card = row.pop(action.index)
game.refill_table()
if card is None:
return
player.reserved.append(card)
if game.bank["gold"] > 0:
game.bank["gold"] -= 1
player.tokens["gold"] += 1
enforce_token_limit(game, strategy, player)
def apply_action(game: GameState, strategy: Strategy, action: Action) -> None:
"""Mutate game state according to action."""
actions = {
TakeDifferent: apply_take_different,
TakeDouble: apply_take_double,
BuyCard: apply_buy_card,
ReserveCard: apply_reserve_card,
BuyCardReserved: apply_buy_card_reserved,
}
action_func = actions.get(type(action))
if action_func is None:
msg = f"Unknown action type: {type(action)}"
raise ValueError(msg)
action_func(game, strategy, action)
# not sure how to simplify this yet
def get_legal_actions( # noqa: C901
game: GameState,
player: PlayerState | None = None,
) -> list[Action]:
"""Enumerate all syntactically legal actions for the given player.
This enforces:
- token-taking rules
- reserve limits
- affordability for buys
"""
if player is None:
player = game.players[game.current_player_index]
actions: list[Action] = []
colors_available = [c for c in BASE_COLORS if game.bank[c] > 0]
for r in (1, 2, 3):
actions.extend(TakeDifferent(colors=list(combo)) for combo in itertools.combinations(colors_available, r))
actions.extend(
TakeDouble(color=color) for color in BASE_COLORS if game.bank[color] >= game.config.minimum_tokens_to_buy_2
)
for tier, row in game.table_by_tier.items():
for idx, card in enumerate(row):
if player.can_afford(card):
actions.append(BuyCard(tier=tier, index=idx))
for idx, card in enumerate(player.reserved):
if player.can_afford(card):
actions.append(BuyCardReserved(index=idx))
if len(player.reserved) < game.config.reserve_limit:
for tier, row in game.table_by_tier.items():
for idx, _ in enumerate(row):
actions.append(
ReserveCard(tier=tier, index=idx, from_deck=False),
)
for tier, deck in game.decks_by_tier.items():
if deck:
actions.append(
ReserveCard(tier=tier, index=None, from_deck=True),
)
return actions
def create_random_cards_tier(
tier: int,
card_count: int,
cost_choices: list[int],
point_choices: list[int],
) -> list[Card]:
"""Create a random set of cards for a given tier."""
cards: list[Card] = []
for color in BASE_COLORS:
for _ in range(card_count):
cost = dict.fromkeys(GEM_COLORS, 0)
for c in BASE_COLORS:
if c == color:
continue
cost[c] = random.choice(cost_choices)
points = random.choice(point_choices)
cards.append(Card(tier=tier, points=points, color=color, cost=cost))
return cards
def create_random_cards() -> list[Card]:
"""Generate a generic but Splendor-ish set of cards.
This is not the official deck, but structured similarly enough for play.
"""
cards: list[Card] = []
cards.extend(
create_random_cards_tier(
tier=1,
card_count=5,
cost_choices=[0, 1, 1, 2],
point_choices=[0, 0, 1],
)
)
cards.extend(
create_random_cards_tier(
tier=2,
card_count=4,
cost_choices=[2, 3, 4],
point_choices=[1, 2, 2, 3],
)
)
cards.extend(
create_random_cards_tier(
tier=3,
card_count=3,
cost_choices=[4, 5, 6],
point_choices=[3, 4, 5],
)
)
random.shuffle(cards)
return cards
def create_random_nobles() -> list[Noble]:
"""A small set of noble tiles, roughly Splendor-ish."""
nobles: list[Noble] = []
base_requirements: list[dict[GemColor, int]] = [
{"white": 3, "blue": 3, "green": 3},
{"blue": 3, "green": 3, "red": 3},
{"green": 3, "red": 3, "black": 3},
{"red": 3, "black": 3, "white": 3},
{"black": 3, "white": 3, "blue": 3},
{"white": 4, "blue": 4},
{"green": 4, "red": 4},
{"blue": 4, "black": 4},
]
for idx, req in enumerate(base_requirements, start=1):
nobles.append(
Noble(
name=f"Noble {idx}",
points=3,
requirements=dict(req.items()),
),
)
return nobles
def load_nobles(file: Path) -> list[Noble]:
"""Load nobles from a file."""
nobles = json.loads(file.read_text())
return [Noble(**noble) for noble in nobles]
def load_cards(file: Path) -> list[Card]:
"""Load cards from a file."""
cards = json.loads(file.read_text())
return [Card(**card) for card in cards]
def new_game(
strategies: Sequence[Strategy],
config: GameConfig,
) -> GameState:
"""Create a new game state from a config + list of players."""
num_players = len(strategies)
bank = get_default_starting_tokens(num_players)
decks_by_tier: dict[int, list[Card]] = {1: [], 2: [], 3: []}
for card in config.cards:
decks_by_tier.setdefault(card.tier, []).append(card)
for deck in decks_by_tier.values():
random.shuffle(deck)
table_by_tier: dict[int, list[Card]] = {1: [], 2: [], 3: []}
players = [PlayerState(strategy=strategy) for strategy in strategies]
nobles = list(config.nobles)
random.shuffle(nobles)
nobles = nobles[: num_players + 1]
game = GameState(
config=config,
players=players,
bank=bank,
decks_by_tier=decks_by_tier,
table_by_tier=table_by_tier,
available_nobles=nobles,
)
game.refill_table()
return game
def run_game(game: GameState) -> tuple[PlayerState, int]:
"""Run a full game loop until someone wins or a player returns None."""
turn_count = 0
while not game.finished:
turn_count += 1
player = game.current_player
strategy = player.strategy
action = strategy.choose_action(game, player)
if action is None:
game.finished = True
break
apply_action(game, strategy, action)
check_nobles_for_player(game, strategy, player)
winner = game.check_winner_simple()
if winner is not None:
return winner, turn_count
game.next_player()
if turn_count >= game.config.turn_limit:
break
fallback = max(game.players, key=lambda player: player.score)
return fallback, turn_count
+288
View File
@@ -0,0 +1,288 @@
"""Bot for Splendor game."""
from __future__ import annotations
import random
from .base import (
BASE_COLORS,
Action,
BuyCard,
BuyCardReserved,
Card,
GameState,
GemColor,
PlayerState,
ReserveCard,
Strategy,
TakeDifferent,
TakeDouble,
auto_discard_tokens,
get_legal_actions,
)
def can_bot_afford(player: PlayerState, card: Card) -> bool:
"""Check if player can afford card, using discounts + gold."""
missing = 0
gold = player.tokens["gold"]
for color, cost in card.cost.items():
missing += max(0, cost - player.discounts.get(color, 0) - player.tokens.get(color, 0))
if missing > gold:
return False
return True
class RandomBot(Strategy):
"""Dumb bot that follows rules but doesn't think."""
def __init__(self, name: str) -> None:
"""Initialize the bot."""
super().__init__(name=name)
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Choose an action for the current player."""
affordable: list[tuple[int, int]] = []
for tier, row in game.table_by_tier.items():
for idx, card in enumerate(row):
if can_bot_afford(player, card):
affordable.append((tier, idx))
if affordable and random.random() < 0.5:
tier, idx = random.choice(affordable)
return BuyCard(tier=tier, index=idx)
if random.random() < 0.2:
tier = random.choice([1, 2, 3])
row = game.table_by_tier.get(tier, [])
if row:
idx = random.randrange(len(row))
return ReserveCard(tier=tier, index=idx, from_deck=False)
if random.random() < 0.5:
colors_for_double = [c for c in BASE_COLORS if game.bank[c] >= 4]
if colors_for_double:
return TakeDouble(color=random.choice(colors_for_double))
colors_for_diff = [c for c in BASE_COLORS if game.bank[c] > 0]
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[:3])
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Choose how many tokens to discard."""
return auto_discard_tokens(player, excess)
def check_cards_in_tier(row: list[Card], player: PlayerState) -> list[int]:
"""Check if player can afford card, using discounts + gold."""
return [index for index, card in enumerate(row) if can_bot_afford(player, card)]
class PersonalizedBot(Strategy):
"""PersonalizedBot."""
"""Dumb bot that follows rules but doesn't think."""
def __init__(self, name: str) -> None:
"""Initialize the bot."""
super().__init__(name=name)
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Choose an action for the current player."""
for tier in (1, 2, 3):
row = game.table_by_tier[tier]
if affordable := check_cards_in_tier(row, player):
index = random.choice(affordable)
return BuyCard(tier=tier, index=index)
colors_for_diff = [c for c in BASE_COLORS if game.bank[c] > 0]
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[:3])
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Choose how many tokens to discard."""
return auto_discard_tokens(player, excess)
class PersonalizedBot2(Strategy):
"""PersonalizedBot2."""
"""Dumb bot that follows rules but doesn't think."""
def __init__(self, name: str) -> None:
"""Initialize the bot."""
super().__init__(name=name)
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Choose an action for the current player."""
tiers = (1, 2, 3)
for tier in tiers:
row = game.table_by_tier[tier]
if affordable := check_cards_in_tier(row, player):
index = random.choice(affordable)
return BuyCard(tier=tier, index=index)
if affordable := check_cards_in_tier(player.reserved, player):
index = random.choice(affordable)
return BuyCardReserved(index=index)
colors_for_diff = [c for c in BASE_COLORS if game.bank[c] > 0]
if len(colors_for_diff) >= 3:
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[:3])
for tier in tiers:
len_deck = len(game.decks_by_tier[tier])
if len_deck:
return ReserveCard(tier=tier, index=None, from_deck=True)
return TakeDifferent(colors=colors_for_diff[:3])
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Choose how many tokens to discard."""
return auto_discard_tokens(player, excess)
def buy_card_reserved(player: PlayerState) -> Action | None:
"""Buy a card reserved."""
if affordable := check_cards_in_tier(player.reserved, player):
index = random.choice(affordable)
return BuyCardReserved(index=index)
return None
def buy_card(game: GameState, player: PlayerState) -> Action | None:
"""Buy a card."""
for tier in (1, 2, 3):
row = game.table_by_tier[tier]
if affordable := check_cards_in_tier(row, player):
index = random.choice(affordable)
return BuyCard(tier=tier, index=index)
return None
def take_tokens(game: GameState) -> Action | None:
"""Take tokens."""
colors_for_diff = [color for color in BASE_COLORS if game.bank[color] > 0]
if len(colors_for_diff) >= 3:
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[: game.config.max_token_take])
return None
class PersonalizedBot3(Strategy):
"""PersonalizedBot3."""
"""Dumb bot that follows rules but doesn't think."""
def __init__(self, name: str) -> None:
"""Initialize the bot."""
super().__init__(name=name)
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Choose an action for the current player."""
print(len(get_legal_actions(game, player)))
print(get_legal_actions(game, player))
if action := buy_card_reserved(player):
return action
if action := buy_card(game, player):
return action
colors_for_diff = [color for color in BASE_COLORS if game.bank[color] > 0]
if len(colors_for_diff) >= 3:
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[:3])
for tier in (1, 2, 3):
len_deck = len(game.decks_by_tier[tier])
if len_deck:
return ReserveCard(tier=tier, index=None, from_deck=True)
return TakeDifferent(colors=colors_for_diff[:3])
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Choose how many tokens to discard."""
return auto_discard_tokens(player, excess)
def estimate_value_of_card(game: GameState, player: PlayerState, color: GemColor) -> int:
"""Estimate value of a color in the player's bank."""
return game.bank[color] - player.discounts.get(color, 0)
def estimate_value_of_token(game: GameState, player: PlayerState, color: GemColor) -> int:
"""Estimate value of a color in the player's bank."""
return game.bank[color] - player.discounts.get(color, 0)
class PersonalizedBot4(Strategy):
"""PersonalizedBot4."""
def __init__(self, name: str) -> None:
"""Initialize the bot."""
super().__init__(name=name)
def filter_actions(self, actions: list[Action]) -> list[Action]:
"""Filter actions to only take different."""
return [
action
for action in actions
if (isinstance(action, TakeDifferent) and len(action.colors) == 3) or not isinstance(action, TakeDifferent)
]
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Choose an action for the current player."""
legal_actions = get_legal_actions(game, player)
print(len(legal_actions))
good_actions = self.filter_actions(legal_actions)
print(len(good_actions))
print(good_actions)
print(len(get_legal_actions(game, player)))
if action := buy_card_reserved(player):
return action
if action := buy_card(game, player):
return action
colors_for_diff = [color for color in BASE_COLORS if game.bank[color] > 0]
if len(colors_for_diff) >= 3:
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[:3])
for tier in (1, 2, 3):
len_deck = len(game.decks_by_tier[tier])
if len_deck:
return ReserveCard(tier=tier, index=None, from_deck=True)
return TakeDifferent(colors=colors_for_diff[:3])
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Choose how many tokens to discard."""
return auto_discard_tokens(player, excess)
+724
View File
@@ -0,0 +1,724 @@
"""Splendor game."""
from __future__ import annotations
import sys
from typing import TYPE_CHECKING, Any
from textual.app import App, ComposeResult
from textual.containers import Horizontal, Vertical
from textual.widget import Widget
from textual.widgets import Footer, Header, Input, Static
from .base import (
BASE_COLORS,
GEM_COLORS,
Action,
BuyCard,
BuyCardReserved,
Card,
GameState,
GemColor,
Noble,
PlayerState,
ReserveCard,
Strategy,
TakeDifferent,
TakeDouble,
)
if TYPE_CHECKING:
from collections.abc import Mapping
# Abbreviations used when rendering costs
COST_ABBR: dict[GemColor, str] = {
"white": "W",
"blue": "B",
"green": "G",
"red": "R",
"black": "K",
"gold": "O",
}
# Abbreviations players can type on the command line
COLOR_ABBR_TO_FULL: dict[str, GemColor] = {
"w": "white",
"b": "blue",
"g": "green",
"r": "red",
"k": "black",
"o": "gold",
}
def parse_color_token(raw: str) -> GemColor:
"""Convert user input into a GemColor.
Supports:
- full names: white, blue, green, red, black, gold
- abbreviations: w, b, g, r, k, o
"""
key = raw.lower()
# full color names first
if key in BASE_COLORS:
return key # type: ignore[return-value]
# abbreviations
if key in COLOR_ABBR_TO_FULL:
return COLOR_ABBR_TO_FULL[key]
error = f"Unknown color: {raw}"
raise ValueError(error)
def format_cost(cost: Mapping[GemColor, int]) -> str:
"""Format a cost/requirements dict as colored tokens like 'B:2, R:1'.
Uses `color_token` internally so colors are guaranteed to match your bank.
"""
parts: list[str] = []
for color in GEM_COLORS:
n = cost.get(color, 0)
if not n:
continue
# color_token gives us e.g. "[blue]blue: 3[/]"
token = color_token(color, n)
# Turn the leading color name into the abbreviation (blue: 3 → B:3)
# We only replace the first occurrence.
full = f"{color}:"
abbr = f"{COST_ABBR[color]}:"
token = token.replace(full, abbr, 1)
parts.append(token)
return ", ".join(parts) if parts else "-"
def format_card(card: Card) -> str:
"""Readable card line using dataclass fields instead of __str__."""
color_abbr = COST_ABBR[card.color]
header = f"T{card.tier} {color_abbr} P{card.points}"
cost_str = format_cost(card.cost)
return f"{header} ({cost_str})"
def format_noble(noble: Noble) -> str:
"""Readable noble line using dataclass fields instead of __str__."""
cost_str = format_cost(noble.requirements)
return f"{noble.name} +{noble.points} ({cost_str})"
def format_tokens(tokens: Mapping[GemColor, int]) -> str:
"""Colored 'color: n' list for a token dict."""
return " ".join(color_token(c, tokens.get(c, 0)) for c in GEM_COLORS)
def format_discounts(discounts: Mapping[GemColor, int]) -> str:
"""Colored discounts, skipping zeros."""
parts: list[str] = []
for c in GEM_COLORS:
n = discounts.get(c, 0)
if not n:
continue
abbr = COST_ABBR[c]
fg, bg = COLOR_STYLE[c]
parts.append(f"[{fg} on {bg}]{abbr}:{n}[/{fg} on {bg}]")
return ", ".join(parts) if parts else "-"
COLOR_STYLE: dict[GemColor, tuple[str, str]] = {
"white": ("black", "white"), # fg, bg
"blue": ("bright_white", "blue"),
"green": ("bright_white", "sea_green4"),
"red": ("white", "red3"),
"black": ("white", "grey0"),
"gold": ("black", "yellow3"),
}
def fmt_gem(color: GemColor) -> str:
"""Render gem name with fg/bg matching real token color."""
fg, bg = COLOR_STYLE[color]
return f"[{fg} on {bg}] {color} [/{fg} on {bg}]"
def fmt_number(value: int) -> str:
"""Return a Rich-markup colored 'value' string."""
return f"[bold cyan]{value}[/]"
def color_token(name: GemColor, amount: int) -> str:
"""Return a Rich-markup colored 'name: n' string."""
# Map Splendor colors -> terminal colors
color_map: Mapping[GemColor, str] = {
"white": "white",
"blue": "blue",
"green": "green",
"red": "red",
"black": "grey70", # 'black' is unreadable on dark backgrounds
"gold": "yellow",
}
style = color_map.get(name, "white")
return f"[{style}]{name}: {amount}[/]"
class Board(Widget):
"""Big board widget with the layout you sketched."""
def __init__(self, game: GameState, me: PlayerState, **kwargs: Any) -> None: # noqa: ANN401
"""Initialize the board widget."""
super().__init__(**kwargs)
self.game = game
self.me = me
def compose(self) -> ComposeResult:
"""Compose the board widget."""
# Structure:
# ┌ bank row
# ├ middle row (tiers | nobles)
# └ players row
with Vertical(id="board_root"):
yield Static(id="bank_box")
with Horizontal(id="middle_row"):
with Vertical(id="tiers_box"):
yield Static(id="tier1_box")
yield Static(id="tier2_box")
yield Static(id="tier3_box")
yield Static(id="nobles_box")
yield Static(id="players_box")
def on_mount(self) -> None:
"""Refresh the board content."""
self.refresh_content()
def refresh_content(self) -> None:
"""Refresh the board content."""
self._render_bank()
self._render_tiers()
self._render_nobles()
self._render_players()
# --- sections ----------------------------------------------------
def _render_bank(self) -> None:
bank = self.game.bank
parts: list[str] = ["[b]Bank:[/b]"]
# One line, all tokens colored
parts.append(format_tokens(bank))
self.query_one("#bank_box", Static).update("\n".join(parts))
def _render_tiers(self) -> None:
for tier in (1, 2, 3):
box = self.query_one(f"#tier{tier}_box", Static)
cards: list[Card] = self.game.table_by_tier.get(tier, [])
lines: list[str] = [f"[b]Tier {tier} cards:[/b]"]
if not cards:
lines.append(" (none)")
else:
for idx, card in enumerate(cards):
lines.append(f" [{idx}] {format_card(card)}")
box.update("\n".join(lines))
def _render_nobles(self) -> None:
nobles_box = self.query_one("#nobles_box", Static)
lines: list[str] = ["[b]Nobles[/b]"]
if not self.game.available_nobles:
lines.append(" (none)")
else:
lines.extend(" - " + format_noble(noble) for noble in self.game.available_nobles)
nobles_box.update("\n".join(lines))
def _render_players(self) -> None:
players_box = self.query_one("#players_box", Static)
lines: list[str] = ["[b]Players:[/b]", ""]
for player in self.game.players:
mark = "*" if player is self.me else " "
token_str = format_tokens(player.tokens)
discount_str = format_discounts(player.discounts)
lines.append(
f"{mark} {player.name:10} Score={player.score:2d} Discounts={discount_str}",
)
lines.append(f" Tokens: {token_str}")
if player.nobles:
noble_names = ", ".join(n.name for n in player.nobles)
lines.append(f" Nobles: {noble_names}")
# Optional: show counts of cards / reserved
if player.cards:
lines.append(f" Cards: {len(player.cards)}")
if player.reserved:
lines.append(f" Reserved: {len(player.reserved)}")
lines.append("")
players_box.update("\n".join(lines))
class ActionApp(App[None]):
"""Textual app that asks for a single action command and returns an Action."""
CSS = """
Screen {
/* 3 rows: command zone, board, footer */
layout: grid;
grid-size: 1 3;
grid-rows: auto 1fr auto;
}
/* Top area with input + instructions */
#command_zone {
grid-columns: 1;
grid-rows: 1;
padding: 1 1;
}
/* Board sits in the middle row and can grow */
#board {
grid-columns: 1;
grid-rows: 2;
padding: 0 1 1 1;
}
Footer {
grid-columns: 1;
grid-rows: 3;
}
Input {
border: round $accent;
}
/* === Board layout === */
#board_root {
/* outer frame around the whole board area */
border: heavy white;
padding: 0 1;
}
/* Bank row: full width */
#bank_box {
border: heavy white;
padding: 0 1;
}
/* Middle row: tiers (left) + nobles (right) */
#middle_row {
layout: horizontal;
}
#tiers_box {
border: heavy white;
padding: 0 1;
width: 70%;
}
#tier1_box,
#tier2_box,
#tier3_box {
border-bottom: heavy white;
padding: 0 0 1 0;
margin-bottom: 1;
}
#nobles_box {
border: heavy white;
padding: 0 1;
width: 30%;
}
/* Players row: full width at bottom */
#players_box {
border: heavy white;
padding: 0 1;
}
"""
def __init__(self, game: GameState, player: PlayerState) -> None:
"""Initialize the action app."""
super().__init__()
self.game = game
self.player = player
self.result: Action | None = None
self.message: str = ""
def compose(self) -> ComposeResult:
"""Compose the action app."""
# Row 1: input + Actions text
with Vertical(id="command_zone"):
yield Input(
placeholder="Enter command, e.g. '1 white blue red' or '1 w b r' or 'q'",
id="input_line",
)
yield Static("", id="prompt")
# Row 2: board
yield Board(self.game, self.player, id="board")
# Row 3: footer
yield Footer()
def on_mount(self) -> None:
"""Mount the action app."""
self._update_prompt()
self.query_one(Input).focus()
def _update_prompt(self) -> None:
lines: list[str] = []
lines.append("[bold underline]Actions:[/]")
lines.append(
" [bold green]1[/] <colors...> - Take up to 3 different gem colors "
"(e.g. [cyan]1 white blue red[/] or [cyan]1 w b r[/])",
)
lines.append(
f" [bold green]2[/] <color> - Take 2 of the same color (needs {fmt_number(4)} in bank, "
"e.g. [cyan]2 blue[/] or [cyan]2 b[/])",
)
lines.append(
" [bold green]3[/] <tier> <idx> - Buy a face-up card (e.g. [cyan]3 1 0[/] for tier 1, index 0)",
)
lines.append(" [bold green]4[/] <idx> - Buy a reserved card")
lines.append(" [bold green]5[/] <tier> <idx> - Reserve a face-up card")
lines.append(" [bold green]6[/] <tier> - Reserve top card of a deck")
lines.append(" [bold red]q[/] - Quit game")
if self.message:
lines.append("")
lines.append(f"[bold red]Message:[/] {self.message}")
self.query_one("#prompt", Static).update("\n".join(lines))
def _cmd_1(self, parts: list[str]) -> str | None:
"""Take up to 3 different gem colors: 1 white blue red OR 1 w b r."""
color_names = parts[1:]
if not color_names:
return "Need at least one color (full name or abbreviation)."
colors: list[GemColor] = []
for name in color_names:
color = parse_color_token(name)
if self.game.bank[color] <= 0:
return f"No tokens left for color: {color}"
colors.append(color)
self.result = TakeDifferent(colors=colors[:3])
self.exit()
return None
def _cmd_2(self, parts: list[str]) -> str | None:
"""Take two of the same color."""
if len(parts) < 2:
return "Usage: 2 <color>"
color = parse_color_token(parts[1])
if self.game.bank[color] < self.game.config.minimum_tokens_to_buy_2:
return "Bank must have at least 4 of that color."
self.result = TakeDouble(color=color)
self.exit()
return None
def _cmd_3(self, parts: list[str]) -> str | None:
"""Buy face-up card."""
if len(parts) < 3:
return "Usage: 3 <tier> <index>"
tier = int(parts[1])
idx = int(parts[2])
self.result = BuyCard(tier=tier, index=idx)
self.exit()
return None
def _cmd_4(self, parts: list[str]) -> str | None:
"""Buy reserved card."""
if len(parts) < 2:
return "Usage: 4 <reserved_index>"
idx = int(parts[1])
if not (0 <= idx < len(self.player.reserved)):
return "Reserved index out of range."
self.result = BuyCardReserved(tier=0, index=idx)
self.exit()
return None
def _cmd_5(self, parts: list[str]) -> str | None:
"""Reserve face-up card."""
if len(parts) < 3:
return "Usage: 5 <tier> <index>"
tier = int(parts[1])
idx = int(parts[2])
self.result = ReserveCard(tier=tier, index=idx, from_deck=False)
self.exit()
return None
def _cmd_6(self, parts: list[str]) -> str | None:
"""Reserve top of deck."""
if len(parts) < 2:
return "Usage: 6 <tier>"
tier = int(parts[1])
self.result = ReserveCard(tier=tier, index=None, from_deck=True)
self.exit()
return None
def _unknown_cmd(self, _parts: list[str]) -> str:
return "Unknown command."
def on_input_submitted(self, event: Input.Submitted) -> None:
"""Handle user input."""
text = (event.value or "").strip()
event.input.value = ""
if not text:
return
if text.lower() in {"q", "quit", "0"}:
self.result = None
self.exit()
return
parts = text.split()
cmds = {
"1": self._cmd_1,
"2": self._cmd_2,
"3": self._cmd_3,
"4": self._cmd_4,
"5": self._cmd_5,
"6": self._cmd_6,
}
cmd = parts[0]
error = cmds.get(cmd, self._unknown_cmd)(parts)
if error:
self.message = error
self._update_prompt()
return
class DiscardApp(App[None]):
"""Textual app to choose discards when over token limit."""
CSS = """
Screen {
layout: vertical;
}
#command_zone {
padding: 1 1;
}
#board {
padding: 0 1 1 1;
}
Input {
border: round $accent;
}
"""
def __init__(self, game: GameState, player: PlayerState) -> None:
"""Initialize the discard app."""
super().__init__()
self.game = game
self.player = player
self.discards: dict[GemColor, int] = dict.fromkeys(GEM_COLORS, 0)
self.message: str = ""
def compose(self) -> ComposeResult: # type: ignore[override]
"""Compose the discard app."""
yield Header(show_clock=False)
with Vertical(id="command_zone"):
yield Input(
placeholder="Enter color to discard, e.g. 'blue' or 'b'",
id="input_line",
)
yield Static("", id="prompt")
# Board directly under the command zone
yield Board(self.game, self.player, id="board")
yield Footer()
def on_mount(self) -> None: # type: ignore[override]
"""Mount the discard app."""
self._update_prompt()
self.query_one(Input).focus()
def _remaining_to_discard(self) -> int:
return self.player.total_tokens() - sum(self.discards.values()) - self.game.config.token_limit
def _update_prompt(self) -> None:
remaining = max(self._remaining_to_discard(), 0)
lines: list[str] = []
lines.append(
"You must discard "
f"{fmt_number(remaining)} token(s) "
f"to get down to {fmt_number(self.game.config.token_limit)}.",
)
disc_str = ", ".join(f"{fmt_gem(c)}={fmt_number(self.discards[c])}" for c in GEM_COLORS)
lines.append(f"Current planned discards: {{ {disc_str} }}")
lines.append(
"Type a color name or abbreviation (e.g. 'blue' or 'b') to discard one token.",
)
if self.message:
lines.append("")
lines.append(f"[bold red]Message:[/] {self.message}")
self.query_one("#prompt", Static).update("\n".join(lines))
def on_input_submitted(self, event: Input.Submitted) -> None: # type: ignore[override]
"""Handle user input."""
raw = (event.value or "").strip()
event.input.value = ""
if not raw:
return
try:
color = parse_color_token(raw)
except ValueError:
self.message = f"Unknown color: {raw}"
self._update_prompt()
return
available = self.player.tokens[color] - self.discards[color]
if available <= 0:
self.message = f"No more {color} tokens available to discard."
self._update_prompt()
return
self.discards[color] += 1
if self._remaining_to_discard() <= 0:
self.exit()
return
self.message = ""
self._update_prompt()
# ---------------------------------------------------------------------------
# Noble choice app
# ---------------------------------------------------------------------------
class NobleChoiceApp(App[None]):
"""Textual app to choose one noble."""
CSS = """
Screen {
layout: vertical;
}
#command_zone {
padding: 1 1;
}
#board {
padding: 0 1 1 1;
}
Input {
border: round $accent;
}
"""
def __init__(
self,
game: GameState,
player: PlayerState,
nobles: list[Noble],
) -> None:
"""Initialize the noble choice app."""
super().__init__()
self.game = game
self.player = player
self.nobles = nobles
self.result: Noble | None = None
self.message: str = ""
def compose(self) -> ComposeResult: # type: ignore[override]
"""Compose the noble choice app."""
yield Header(show_clock=False)
with Vertical(id="command_zone"):
yield Input(
placeholder="Enter noble index, e.g. '0'",
id="input_line",
)
yield Static("", id="prompt")
# Board directly under the command zone
yield Board(self.game, self.player, id="board")
yield Footer()
def on_mount(self) -> None: # type: ignore[override]
"""Mount the noble choice app."""
self._update_prompt()
self.query_one(Input).focus()
def _update_prompt(self) -> None:
lines: list[str] = []
lines.append("[bold underline]You qualify for nobles:[/]")
for i, noble in enumerate(self.nobles):
lines.append(f" [bright_cyan]{i})[/] {format_noble(noble)}")
lines.append("Enter the index of the noble you want.")
if self.message:
lines.append("")
lines.append(f"[bold red]Message:[/] {self.message}")
self.query_one("#prompt", Static).update("\n".join(lines))
def on_input_submitted(self, event: Input.Submitted) -> None: # type: ignore[override]
"""Handle user input."""
raw = (event.value or "").strip()
event.input.value = ""
if not raw:
return
try:
idx = int(raw)
except ValueError:
self.message = "Please enter a valid integer index."
self._update_prompt()
return
if not (0 <= idx < len(self.nobles)):
self.message = "Index out of range."
self._update_prompt()
return
self.result = self.nobles[idx]
self.exit()
class TuiHuman(Strategy):
"""Textual-based human player Strategy with colorful board."""
def choose_action(
self,
game: GameState,
player: PlayerState,
) -> Action | None:
"""Choose an action for the player."""
if not sys.stdout.isatty():
return None
app = ActionApp(game, player)
app.run()
return app.result
def choose_discard(
self,
game: GameState,
player: PlayerState,
excess: int, # noqa: ARG002
) -> dict[GemColor, int]:
"""Choose tokens to discard."""
if not sys.stdout.isatty():
return dict.fromkeys(GEM_COLORS, 0)
app = DiscardApp(game, player)
app.run()
return app.discards
def choose_noble(
self,
game: GameState,
player: PlayerState,
nobles: list[Noble],
) -> Noble:
"""Choose a noble for the player."""
if not sys.stdout.isatty():
return nobles[0]
app = NobleChoiceApp(game, player, nobles)
app.run()
return app.result
+19
View File
@@ -0,0 +1,19 @@
"""Main entry point for Splendor game."""
from __future__ import annotations
from .base import new_game, run_game
from .bot import RandomBot
from .human import TuiHuman
def main() -> None:
"""Main entry point."""
human = TuiHuman()
bot = RandomBot()
game_state = new_game(["You", "Bot A"])
run_game(game_state, [human, bot])
if __name__ == "__main__":
main()
+111
View File
@@ -0,0 +1,111 @@
"""Public state for RL/search."""
from __future__ import annotations
from dataclasses import dataclass
from .base import (
BASE_COLORS,
BASE_INDEX,
GEM_ORDER,
Card,
GameState,
Noble,
PlayerState,
)
@dataclass(frozen=True)
class ObsCard:
"""Numeric-ish card view for RL/search."""
tier: int
points: int
color_index: int
cost: list[int]
@dataclass(frozen=True)
class ObsNoble:
"""Numeric-ish noble view for RL/search."""
points: int
requirements: list[int]
@dataclass(frozen=True)
class ObsPlayer:
"""Numeric-ish player view for RL/search."""
tokens: list[int]
discounts: list[int]
score: int
cards: list[ObsCard]
reserved: list[ObsCard]
nobles: list[ObsNoble]
@dataclass(frozen=True)
class Observation:
"""Full public state for RL/search."""
current_player: int
bank: list[int]
players: list[ObsPlayer]
table_by_tier: dict[int, list[ObsCard]]
decks_remaining: dict[int, int]
available_nobles: list[ObsNoble]
def _encode_card(card: Card) -> ObsCard:
color_index = BASE_INDEX.get(card.color, -1)
cost_vec = [card.cost.get(c, 0) for c in BASE_COLORS]
return ObsCard(
tier=card.tier,
points=card.points,
color_index=color_index,
cost=cost_vec,
)
def _encode_noble(noble: Noble) -> ObsNoble:
req_vec = [noble.requirements.get(c, 0) for c in BASE_COLORS]
return ObsNoble(
points=noble.points,
requirements=req_vec,
)
def _encode_player(player: PlayerState) -> ObsPlayer:
tokens_vec = [player.tokens[c] for c in GEM_ORDER]
discounts_vec = [player.discounts[c] for c in GEM_ORDER]
cards_enc = [_encode_card(c) for c in player.cards]
reserved_enc = [_encode_card(c) for c in player.reserved]
nobles_enc = [_encode_noble(n) for n in player.nobles]
return ObsPlayer(
tokens=tokens_vec,
discounts=discounts_vec,
score=player.score,
cards=cards_enc,
reserved=reserved_enc,
nobles=nobles_enc,
)
def to_observation(game: GameState) -> Observation:
"""Create a structured observation of the full public state."""
bank_vec = [game.bank[c] for c in GEM_ORDER]
players_enc = [_encode_player(p) for p in game.players]
table_enc: dict[int, list[ObsCard]] = {
tier: [_encode_card(c) for c in row] for tier, row in game.table_by_tier.items()
}
decks_remaining = {tier: len(deck) for tier, deck in game.decks_by_tier.items()}
nobles_enc = [_encode_noble(n) for n in game.available_nobles]
return Observation(
current_player=game.current_player_index,
bank=bank_vec,
players=players_enc,
table_by_tier=table_enc,
decks_remaining=decks_remaining,
available_nobles=nobles_enc,
)
+36
View File
@@ -0,0 +1,36 @@
"""Simulate a step in the game."""
from __future__ import annotations
import copy
from .base import Action, GameState, PlayerState, apply_action, check_nobles_for_player
from .bot import RandomBot
class SimStrategy(RandomBot):
"""Strategy used in simulate_step.
We never call choose_action here (caller chooses actions),
but we reuse discard/noble-selection logic.
"""
def choose_action(self, game: GameState, player: PlayerState) -> Action | None: # noqa: ARG002
"""Choose an action for the current player."""
msg = "SimStrategy.choose_action should not be used in simulate_step"
raise RuntimeError(msg)
def simulate_step(game: GameState, action: Action) -> GameState:
"""Return a deep-copied next state after applying action for the current player.
Useful for tree search / MCTS:
next_state = simulate_step(state, action)
"""
next_state = copy.deepcopy(game)
sim_strategy = SimStrategy()
apply_action(next_state, sim_strategy, action)
check_nobles_for_player(next_state, sim_strategy, next_state.current_player)
next_state.next_player()
return next_state
+50
View File
@@ -0,0 +1,50 @@
"""Simulator for Splendor game."""
from __future__ import annotations
from collections import defaultdict
from pathlib import Path
from statistics import mean
from .base import GameConfig, load_cards, load_nobles, new_game, run_game
from .bot import PersonalizedBot4, RandomBot
def main() -> None:
"""Main entry point."""
turn_limit = 1000
good_games = 0
games = 1
winners: dict[str, list] = defaultdict(list)
game_data = Path(__file__).parent / "game_data"
cards = load_cards(game_data / "cards/default.json")
nobles = load_nobles(game_data / "nobles/default.json")
for _ in range(games):
bot_a = RandomBot("bot_a")
bot_b = RandomBot("bot_b")
bot_c = RandomBot("bot_c")
bot_d = PersonalizedBot4("my_bot")
config = GameConfig(
cards=cards,
nobles=nobles,
turn_limit=turn_limit,
)
players = (bot_a, bot_b, bot_c, bot_d)
game_state = new_game(players, config)
winner, turns = run_game(game_state)
if turns < turn_limit:
good_games += 1
winners[winner.strategy.name].append(turns)
print(
f"out of {games} {turn_limit} turn games with {len(players)}"
f"random bots there where {good_games} games where a bot won"
)
for name, turns in winners.items():
print(f"{name} won {len(turns)} games in {mean(turns):.2f} turns")
if __name__ == "__main__":
main()
+1 -1
View File
@@ -4,7 +4,7 @@ import logging
import sys
import tomllib
from os import environ
from pathlib import Path # noqa: TC003 This is required for the typer CLI
from pathlib import Path
from socket import gethostname
import typer
+1 -1
View File
@@ -451,7 +451,7 @@ def convert_aax_file_with_agent(aax_file: Path, config: ConversionConfig) -> Non
destination.parent.mkdir(parents=True, exist_ok=True)
try:
temp_file.replace(destination)
except OSError as error:
except Exception as error: # noqa: BLE001
write_review_file(
destination=destination,
ffprobe_metadata=ffprobe_metadata,
+1
View File
@@ -169,6 +169,7 @@ def csv_id(row: dict[str, str | None], csv_path: Path, row_number: int) -> int |
except ValueError as error:
msg = f"{csv_path}:{row_number}: id must be an integer: {value}"
raise CatalogImportError(msg) from error
return None
if __name__ == "__main__":
@@ -1,3 +1,4 @@
# ruff: noqa: LOG015, E501, D102, D103, D107 These need the be fixed
"""Install NixOS on a ZFS pool."""
from __future__ import annotations
@@ -16,9 +17,6 @@ from typing import TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Sequence
logger = logging.getLogger(__name__)
ESCAPE_KEY = 27
def configure_logger(level: str = "INFO") -> None:
"""Configure the logger.
@@ -44,7 +42,7 @@ def bash_wrapper(command: str) -> str:
Tuple[str, int]: A tuple containing the output of the command (stdout) as a string,
the error output (stderr) as a string (optional), and the return code as an integer.
"""
logger.debug(f"running {command=}")
logging.debug(f"running {command=}")
# This is a acceptable risk
process = Popen(command.split(), stdout=PIPE, stderr=PIPE)
output, _ = process.communicate()
@@ -65,7 +63,7 @@ def partition_disk(disk: str, swap_size: int, reserve: int = 0) -> None:
reserve (int, optional): The size of the reserve partition in GB. Defaults to 0.
minimum value is 0.
"""
logger.info(f"partitioning {disk=}")
logging.info(f"partitioning {disk=}")
swap_size = max(swap_size, 1)
reserve = max(reserve, 0)
@@ -73,16 +71,16 @@ def partition_disk(disk: str, swap_size: int, reserve: int = 0) -> None:
if reserve > 0:
msg = f"Creating swap partition on {disk=} with size {swap_size=}GiB and reserve {reserve=}GiB"
logger.info(msg)
logging.info(msg)
swap_start = swap_size + reserve
swap_partition = f"mkpart swap -{swap_start}GiB -{reserve}GiB "
else:
logger.info(f"Creating swap partition on {disk=} with size {swap_size=}GiB")
logging.info(f"Creating swap partition on {disk=} with size {swap_size=}GiB")
swap_start = swap_size
swap_partition = f"mkpart swap -{swap_start}GiB 100% "
logger.debug(f"{swap_partition=}")
logging.debug(f"{swap_partition=}")
create_partitions = (
f"parted --script --align=optimal {disk} -- "
@@ -94,7 +92,7 @@ def partition_disk(disk: str, swap_size: int, reserve: int = 0) -> None:
)
bash_wrapper(create_partitions)
logger.info(f"{disk=} successfully partitioned")
logging.info(f"{disk=} successfully partitioned")
def create_zfs_pool(pool_disks: Sequence[str], mnt_dir: str) -> None:
@@ -133,7 +131,7 @@ def create_zfs_pool(pool_disks: Sequence[str], mnt_dir: str) -> None:
bash_wrapper(zpool_create)
zpools = bash_wrapper("zpool list -o name")
if "root_pool" not in zpools.splitlines():
logger.critical("Failed to create root_pool")
logging.critical("Failed to create root_pool")
sys.exit(1)
@@ -153,7 +151,7 @@ def create_zfs_datasets() -> None:
}
missing_datasets = expected_datasets.difference(datasets.splitlines())
if missing_datasets:
logger.critical(f"Failed to create pools {missing_datasets}")
logging.critical(f"Failed to create pools {missing_datasets}")
sys.exit(1)
@@ -166,7 +164,8 @@ def get_cpu_manufacturer() -> str:
for line in output.splitlines():
if "vendor_id" in line:
return id_vendor[line.split(": ")[1].strip()]
error = "Failed to get CPU manufacturer"
error = "CPU manufacturer not found"
raise RuntimeError(error)
@@ -201,15 +200,7 @@ def create_nix_hardware_file(mnt_dir: str, disks: Sequence[str], *, encrypt: boo
' imports = [ (modulesPath + "/installer/scan/not-detected.nix") ];\n\n'
" boot = {\n"
" initrd = {\n"
" availableKernelModules = [ \n"
' "ahci"\n'
' "ehci_pci"\n'
' "nvme"\n'
' "sd_mod"\n'
' "usb_storage"\n'
' "usbhid"\n'
' "xhci_pci"\n'
" ];\n"
' availableKernelModules = [ \n "ahci"\n "ehci_pci"\n "nvme"\n "sd_mod"\n "usb_storage"\n "usbhid"\n "xhci_pci"\n ];\n'
" kernelModules = [ ];\n"
f" {devices}"
" };\n"
@@ -223,18 +214,11 @@ def create_nix_hardware_file(mnt_dir: str, disks: Sequence[str], *, encrypt: boo
' "/nix" = {\n device = "root_pool/nix";\n fsType = "zfs";\n };\n\n'
' "/boot" = {\n'
f' device = "/dev/disk/by-uuid/{get_boot_drive_id(disks[0])}";\n'
' fsType = "vfat";\n'
" options = [\n"
' "fmask=0077"\n'
' "dmask=0077"\n'
" ];\n"
" };\n"
" };\n\n"
' fsType = "vfat";\n options = [\n "fmask=0077"\n "dmask=0077"\n ];\n };\n };\n\n'
" swapDevices = [ ];\n\n"
" networking.useDHCP = lib.mkDefault true;\n\n"
' nixpkgs.hostPlatform = lib.mkDefault "x86_64-linux";\n'
f" hardware.cpu.{cpu_manufacturer}.updateMicrocode = lib.mkDefault "
"config.hardware.enableRedistributableFirmware;\n"
f" hardware.cpu.{cpu_manufacturer}.updateMicrocode = lib.mkDefault config.hardware.enableRedistributableFirmware;\n"
f' networking.hostId = "{host_id}";\n'
"}\n"
)
@@ -272,32 +256,19 @@ def installer(
encrypt_key: str | None,
) -> None:
"""Main."""
logger.info("Starting installation")
logging.info("Starting installation")
for disk in disks:
partition_disk(disk, swap_size, reserve)
if encrypt_key:
sleep(1)
key_input = encrypt_key.encode()
run(
("cryptsetup", "luksFormat", "--type", "luks2", f"{disk}-part2", "-"),
input=key_input,
check=True,
)
run(
(
"cryptsetup",
"luksOpen",
f"{disk}-part2",
f"luks-root-pool-{disk.split('/')[-1]}-part2",
"-",
),
input=key_input,
check=True,
)
for command in (
f'printf "{encrypt_key}" | cryptsetup luksFormat --type luks2 {disk}-part2 -',
f'printf "{encrypt_key}" | cryptsetup luksOpen {disk}-part2 luks-root-pool-{disk.split("/")[-1]}-part2 -',
):
run(command, shell=True, check=True) # noqa: S602
# Fixed mount point for the new system; the installer runs as root on a fresh disk
mnt_dir = "/tmp/nix_install" # noqa: S108
Path(mnt_dir).mkdir(parents=True, exist_ok=True)
@@ -311,73 +282,59 @@ def installer(
create_zfs_datasets()
install_nixos(mnt_dir, disks, encrypt=bool(encrypt_key))
install_nixos(mnt_dir, disks, encrypt=encrypt_key)
logger.info("Installation complete")
logging.info("Installation complete")
class Cursor:
"""Track cursor position and constrain movement to screen bounds."""
"""Cursor class to store the cursor position."""
def __init__(self) -> None:
"""Initialize cursor position and screen dimensions."""
self.x_position = 0
self.y_position = 0
self.height = 0
self.width = 0
def set_height(self, height: int) -> None:
"""Set the maximum screen height."""
self.height = height
def set_width(self, width: int) -> None:
"""Set the maximum screen width."""
self.width = width
def x_bounce_check(self, cursor: int) -> int:
"""Clamp an x position to the screen width."""
cursor = max(0, cursor)
return min(self.width - 1, cursor)
def y_bounce_check(self, cursor: int) -> int:
"""Clamp a y position to the screen height."""
cursor = max(0, cursor)
return min(self.height - 1, cursor)
def set_x(self, x: int) -> None:
"""Set the cursor x position."""
self.x_position = self.x_bounce_check(x)
def set_y(self, y: int) -> None:
"""Set the cursor y position."""
self.y_position = self.y_bounce_check(y)
def get_x(self) -> int:
"""Get the cursor x position."""
return self.x_position
def get_y(self) -> int:
"""Get the cursor y position."""
return self.y_position
def move_up(self) -> None:
"""Move the cursor up one row."""
self.set_y(self.y_position - 1)
def move_down(self) -> None:
"""Move the cursor down one row."""
self.set_y(self.y_position + 1)
def move_left(self) -> None:
"""Move the cursor left one column."""
self.set_x(self.x_position - 1)
def move_right(self) -> None:
"""Move the cursor right one column."""
self.set_x(self.x_position + 1)
def navigation(self, key: int) -> None:
"""Move the cursor for a curses navigation key."""
action = {
curses.KEY_DOWN: self.move_down,
curses.KEY_UP: self.move_up,
@@ -392,7 +349,6 @@ class State:
"""State class to store the state of the program."""
def __init__(self) -> None:
"""Initialize installer menu state."""
self.key = 0
self.cursor = Cursor()
@@ -410,7 +366,6 @@ class State:
def get_device(raw_device: str) -> dict[str, str]:
"""Parse an lsblk key-value device row."""
raw_device_components = raw_device.split(" ")
return {thing.split("=")[0].lower(): thing.split("=")[1].strip('"') for thing in raw_device_components}
@@ -440,7 +395,6 @@ def get_device_id_mapping() -> dict[str, set[str]]:
def calculate_device_menu_padding(devices: list[dict[str, str]], column: str, padding: int = 0) -> int:
"""Calculate the width needed for a device menu column."""
return max(len(device[column]) for device in devices) + padding
@@ -452,7 +406,6 @@ def draw_device_ids(
menu_width: list[int],
device_ids: set[str],
) -> tuple[State, int]:
"""Draw selectable device IDs for a device row."""
for device_id in sorted(device_ids):
row_number = row_number + 1
if row_number == state.cursor.get_y() and state.cursor.get_x() in menu_width:
@@ -481,7 +434,7 @@ def draw_device_menu(
state: State,
menu_start_y: int = 0,
menu_start_x: int = 0,
) -> tuple[State, int]:
) -> State:
"""Draw the device menu and handle user input.
Args:
@@ -537,7 +490,6 @@ def draw_device_menu(
def debug_menu(std_screen: curses.window, key: int) -> None:
"""Draw debug information for the current curses screen."""
height, width = std_screen.getmaxyx()
width_height = f"Width: {width}, Height: {height}"
std_screen.addstr(height - 4, 0, width_height, curses.color_pair(5))
@@ -557,7 +509,6 @@ def status_bar(
width: int,
height: int,
) -> None:
"""Draw the footer status bar."""
std_screen.attron(curses.A_REVERSE)
std_screen.attron(curses.color_pair(3))
@@ -570,7 +521,6 @@ def status_bar(
def set_color() -> None:
"""Initialize curses color pairs."""
curses.start_color()
curses.use_default_colors()
for i in range(curses.COLORS):
@@ -578,7 +528,6 @@ def set_color() -> None:
def get_text_input(std_screen: curses.window, prompt: str, y: int, x: int) -> str:
"""Read text input from a curses screen."""
curses.echo()
std_screen.addstr(y, x, prompt)
input_str = ""
@@ -586,7 +535,7 @@ def get_text_input(std_screen: curses.window, prompt: str, y: int, x: int) -> st
key = std_screen.getch()
if key == ord("\n"):
break
if key == ESCAPE_KEY:
if key == 27: # ESC key # noqa: PLR2004
input_str = ""
break
if key in (curses.KEY_BACKSPACE, ord("\b"), 127):
@@ -604,7 +553,6 @@ def swap_size_input(
state: State,
swap_offset: int,
) -> State:
"""Handle swap size input."""
swap_size_text = "Swap size (GB): "
std_screen.addstr(swap_offset, 0, f"{swap_size_text}{state.swap_size}")
if state.key == ord("\n") and state.cursor.get_y() == swap_offset:
@@ -628,7 +576,6 @@ def reserve_size_input(
state: State,
reserve_offset: int,
) -> State:
"""Handle reserve size input."""
reserve_size_text = "reserve size (GB): "
std_screen.addstr(reserve_offset, 0, f"{reserve_size_text}{state.reserve_size}")
if state.key == ord("\n") and state.cursor.get_y() == reserve_offset:
@@ -652,7 +599,6 @@ def draw_menu(std_screen: curses.window) -> State:
Args:
std_screen (curses.window): the curses window to draw on
Returns:
State: the state object
"""
@@ -712,18 +658,17 @@ def draw_menu(std_screen: curses.window) -> State:
def main() -> None:
"""Run the installer menu and start installation."""
configure_logger("DEBUG")
state = curses.wrapper(draw_menu)
encrypt_key = getenv("ENCRYPT_KEY")
logger.info("installing_nixos")
logger.info(f"disks: {state.selected_device_ids}")
logger.info(f"swap_size: {state.swap_size}")
logger.info(f"reserve: {state.reserve_size}")
logger.info(f"encrypted: {bool(encrypt_key)}")
logging.info("installing_nixos")
logging.info(f"disks: {state.selected_device_ids}")
logging.info(f"swap_size: {state.swap_size}")
logging.info(f"reserve: {state.reserve_size}")
logging.info(f"encrypted: {bool(encrypt_key)}")
sleep(3)
+1
View File
@@ -0,0 +1 @@
"""Van inventory FastAPI application."""
+16
View File
@@ -0,0 +1,16 @@
"""FastAPI dependencies for van inventory."""
from collections.abc import Iterator
from typing import Annotated
from fastapi import Depends, Request
from sqlalchemy.orm import Session
def get_db(request: Request) -> Iterator[Session]:
"""Get database session from app state."""
with Session(request.app.state.engine) as session:
yield session
DbSession = Annotated[Session, Depends(get_db)]
+56
View File
@@ -0,0 +1,56 @@
"""FastAPI app for van inventory."""
from __future__ import annotations
import logging
from contextlib import asynccontextmanager
from pathlib import Path
from typing import TYPE_CHECKING, Annotated
import typer
import uvicorn
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from python.common import configure_logger
from python.orm.common import get_postgres_engine
from python.van_inventory.routers import api_router, frontend_router
STATIC_DIR = Path(__file__).resolve().parent / "static"
if TYPE_CHECKING:
from collections.abc import AsyncIterator
logger = logging.getLogger(__name__)
def create_app() -> FastAPI:
"""Create and configure the FastAPI application."""
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
app.state.engine = get_postgres_engine(name="VAN_INVENTORY")
yield
app.state.engine.dispose()
app = FastAPI(title="Van Inventory", lifespan=lifespan)
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
app.include_router(api_router)
app.include_router(frontend_router)
return app
def serve(
# Intentionally binds all interfaces — this is a LAN-only van server
host: Annotated[str, typer.Option("--host", "-h", help="Host to bind to")] = "0.0.0.0", # noqa: S104
port: Annotated[int, typer.Option("--port", "-p", help="Port to bind to")] = 8001,
log_level: Annotated[str, typer.Option("--log-level", "-l", help="Log level")] = "INFO",
) -> None:
"""Start the Van Inventory server."""
configure_logger(log_level)
app = create_app()
uvicorn.run(app, host=host, port=port)
if __name__ == "__main__":
typer.run(serve)
+6
View File
@@ -0,0 +1,6 @@
"""Van inventory API routers."""
from python.van_inventory.routers.api import router as api_router
from python.van_inventory.routers.frontend import router as frontend_router
__all__ = ["api_router", "frontend_router"]
+314
View File
@@ -0,0 +1,314 @@
"""Van inventory API router."""
from __future__ import annotations
from typing import TYPE_CHECKING
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from python.orm.van_inventory.models import Item, Meal, MealIngredient
if TYPE_CHECKING:
from python.van_inventory.dependencies import DbSession
# --- Schemas ---
class ItemCreate(BaseModel):
"""Schema for creating an item."""
name: str
quantity: float = Field(default=0, ge=0)
unit: str
category: str | None = None
class ItemUpdate(BaseModel):
"""Schema for updating an item."""
name: str | None = None
quantity: float | None = Field(default=None, ge=0)
unit: str | None = None
category: str | None = None
class ItemResponse(BaseModel):
"""Schema for item response."""
id: int
name: str
quantity: float
unit: str
category: str | None
model_config = {"from_attributes": True}
class IngredientCreate(BaseModel):
"""Schema for adding an ingredient to a meal."""
item_id: int
quantity_needed: float = Field(gt=0)
class MealCreate(BaseModel):
"""Schema for creating a meal."""
name: str
instructions: str | None = None
ingredients: list[IngredientCreate] = []
class MealUpdate(BaseModel):
"""Schema for updating a meal."""
name: str | None = None
instructions: str | None = None
class IngredientResponse(BaseModel):
"""Schema for ingredient response."""
item_id: int
item_name: str
quantity_needed: float
unit: str
model_config = {"from_attributes": True}
class MealResponse(BaseModel):
"""Schema for meal response."""
id: int
name: str
instructions: str | None
ingredients: list[IngredientResponse] = []
model_config = {"from_attributes": True}
@classmethod
def from_meal(cls, meal: Meal) -> MealResponse:
"""Build a MealResponse from an ORM Meal with loaded ingredients."""
return cls(
id=meal.id,
name=meal.name,
instructions=meal.instructions,
ingredients=[
IngredientResponse(
item_id=mi.item_id,
item_name=mi.item.name,
quantity_needed=mi.quantity_needed,
unit=mi.item.unit,
)
for mi in meal.ingredients
],
)
class ShoppingItem(BaseModel):
"""An item needed for a meal that is short on stock."""
item_name: str
unit: str
needed: float
have: float
short: float
class MealAvailability(BaseModel):
"""Availability status for a meal."""
meal_id: int
meal_name: str
can_make: bool
missing: list[ShoppingItem] = []
# --- Routes ---
router = APIRouter(prefix="/api", tags=["van_inventory"])
# Items
@router.post("/items", response_model=ItemResponse)
def create_item(item: ItemCreate, db: DbSession) -> Item:
"""Create a new inventory item."""
db_item = Item(**item.model_dump())
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
@router.get("/items", response_model=list[ItemResponse])
def list_items(db: DbSession) -> list[Item]:
"""List all inventory items."""
return list(db.scalars(select(Item).order_by(Item.name)).all())
@router.get("/items/{item_id}", response_model=ItemResponse)
def get_item(item_id: int, db: DbSession) -> Item:
"""Get an item by ID."""
item = db.get(Item, item_id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
return item
@router.patch("/items/{item_id}", response_model=ItemResponse)
def update_item(item_id: int, item: ItemUpdate, db: DbSession) -> Item:
"""Update an item by ID."""
db_item = db.get(Item, item_id)
if not db_item:
raise HTTPException(status_code=404, detail="Item not found")
for key, value in item.model_dump(exclude_unset=True).items():
setattr(db_item, key, value)
db.commit()
db.refresh(db_item)
return db_item
@router.delete("/items/{item_id}")
def delete_item(item_id: int, db: DbSession) -> dict[str, bool]:
"""Delete an item by ID."""
item = db.get(Item, item_id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
db.delete(item)
db.commit()
return {"deleted": True}
# Meals
@router.post("/meals", response_model=MealResponse)
def create_meal(meal: MealCreate, db: DbSession) -> MealResponse:
"""Create a new meal with optional ingredients."""
for ing in meal.ingredients:
if not db.get(Item, ing.item_id):
raise HTTPException(status_code=422, detail=f"Item {ing.item_id} not found")
db_meal = Meal(name=meal.name, instructions=meal.instructions)
db.add(db_meal)
db.flush()
for ing in meal.ingredients:
db.add(MealIngredient(meal_id=db_meal.id, item_id=ing.item_id, quantity_needed=ing.quantity_needed))
db.commit()
db_meal = db.scalar(
select(Meal)
.where(Meal.id == db_meal.id)
.options(selectinload(Meal.ingredients).selectinload(MealIngredient.item))
)
return MealResponse.from_meal(db_meal)
@router.get("/meals", response_model=list[MealResponse])
def list_meals(db: DbSession) -> list[MealResponse]:
"""List all meals with ingredients."""
meals = list(
db.scalars(
select(Meal).options(selectinload(Meal.ingredients).selectinload(MealIngredient.item)).order_by(Meal.name)
).all()
)
return [MealResponse.from_meal(m) for m in meals]
@router.get("/meals/availability", response_model=list[MealAvailability])
def check_all_meals(db: DbSession) -> list[MealAvailability]:
"""Check which meals can be made with current inventory."""
meals = list(
db.scalars(select(Meal).options(selectinload(Meal.ingredients).selectinload(MealIngredient.item))).all()
)
return [_check_meal(m) for m in meals]
@router.get("/meals/{meal_id}", response_model=MealResponse)
def get_meal(meal_id: int, db: DbSession) -> MealResponse:
"""Get a meal by ID with ingredients."""
meal = db.scalar(
select(Meal).where(Meal.id == meal_id).options(selectinload(Meal.ingredients).selectinload(MealIngredient.item))
)
if not meal:
raise HTTPException(status_code=404, detail="Meal not found")
return MealResponse.from_meal(meal)
@router.delete("/meals/{meal_id}")
def delete_meal(meal_id: int, db: DbSession) -> dict[str, bool]:
"""Delete a meal by ID."""
meal = db.get(Meal, meal_id)
if not meal:
raise HTTPException(status_code=404, detail="Meal not found")
db.delete(meal)
db.commit()
return {"deleted": True}
@router.post("/meals/{meal_id}/ingredients", response_model=MealResponse)
def add_ingredient(meal_id: int, ingredient: IngredientCreate, db: DbSession) -> MealResponse:
"""Add an ingredient to a meal."""
meal = db.get(Meal, meal_id)
if not meal:
raise HTTPException(status_code=404, detail="Meal not found")
if not db.get(Item, ingredient.item_id):
raise HTTPException(status_code=422, detail="Item not found")
existing = db.scalar(
select(MealIngredient).where(MealIngredient.meal_id == meal_id, MealIngredient.item_id == ingredient.item_id)
)
if existing:
raise HTTPException(status_code=409, detail="Ingredient already exists for this meal")
db.add(MealIngredient(meal_id=meal_id, item_id=ingredient.item_id, quantity_needed=ingredient.quantity_needed))
db.commit()
meal = db.scalar(
select(Meal).where(Meal.id == meal_id).options(selectinload(Meal.ingredients).selectinload(MealIngredient.item))
)
return MealResponse.from_meal(meal)
@router.delete("/meals/{meal_id}/ingredients/{item_id}")
def remove_ingredient(meal_id: int, item_id: int, db: DbSession) -> dict[str, bool]:
"""Remove an ingredient from a meal."""
mi = db.scalar(select(MealIngredient).where(MealIngredient.meal_id == meal_id, MealIngredient.item_id == item_id))
if not mi:
raise HTTPException(status_code=404, detail="Ingredient not found")
db.delete(mi)
db.commit()
return {"deleted": True}
@router.get("/meals/{meal_id}/availability", response_model=MealAvailability)
def check_meal(meal_id: int, db: DbSession) -> MealAvailability:
"""Check if a specific meal can be made and what's missing."""
meal = db.scalar(
select(Meal).where(Meal.id == meal_id).options(selectinload(Meal.ingredients).selectinload(MealIngredient.item))
)
if not meal:
raise HTTPException(status_code=404, detail="Meal not found")
return _check_meal(meal)
def _check_meal(meal: Meal) -> MealAvailability:
missing = [
ShoppingItem(
item_name=mi.item.name,
unit=mi.item.unit,
needed=mi.quantity_needed,
have=mi.item.quantity,
short=mi.quantity_needed - mi.item.quantity,
)
for mi in meal.ingredients
if mi.item.quantity < mi.quantity_needed
]
return MealAvailability(
meal_id=meal.id,
meal_name=meal.name,
can_make=len(missing) == 0,
missing=missing,
)
+198
View File
@@ -0,0 +1,198 @@
"""HTMX frontend routes for van inventory."""
from __future__ import annotations
from pathlib import Path
from typing import Annotated
from fastapi import APIRouter, Form, HTTPException, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from python.orm.van_inventory.models import Item, Meal, MealIngredient
# FastAPI needs DbSession at runtime to resolve the Depends() annotation
from python.van_inventory.dependencies import DbSession # noqa: TC001
from python.van_inventory.routers.api import _check_meal
TEMPLATE_DIR = Path(__file__).resolve().parent.parent / "templates"
templates = Jinja2Templates(directory=TEMPLATE_DIR)
router = APIRouter(tags=["frontend"])
# --- Items ---
@router.get("/", response_class=HTMLResponse)
def items_page(request: Request, db: DbSession) -> HTMLResponse:
"""Render the inventory page."""
items = list(db.scalars(select(Item).order_by(Item.name)).all())
return templates.TemplateResponse(request, "items.html", {"items": items})
@router.post("/items", response_class=HTMLResponse)
def htmx_create_item(
request: Request,
db: DbSession,
name: Annotated[str, Form()],
quantity: Annotated[float, Form()] = 0,
unit: Annotated[str, Form()] = "",
category: Annotated[str | None, Form()] = None,
) -> HTMLResponse:
"""Create an item and return updated item rows."""
if quantity < 0:
raise HTTPException(status_code=422, detail="Quantity must not be negative")
db.add(Item(name=name, quantity=quantity, unit=unit, category=category or None))
db.commit()
items = list(db.scalars(select(Item).order_by(Item.name)).all())
return templates.TemplateResponse(request, "partials/item_rows.html", {"items": items})
@router.patch("/items/{item_id}", response_class=HTMLResponse)
def htmx_update_item(
request: Request,
item_id: int,
db: DbSession,
quantity: Annotated[float, Form()],
) -> HTMLResponse:
"""Update an item's quantity and return updated item rows."""
if quantity < 0:
raise HTTPException(status_code=422, detail="Quantity must not be negative")
item = db.get(Item, item_id)
if item:
item.quantity = quantity
db.commit()
items = list(db.scalars(select(Item).order_by(Item.name)).all())
return templates.TemplateResponse(request, "partials/item_rows.html", {"items": items})
@router.delete("/items/{item_id}", response_class=HTMLResponse)
def htmx_delete_item(request: Request, item_id: int, db: DbSession) -> HTMLResponse:
"""Delete an item and return updated item rows."""
item = db.get(Item, item_id)
if item:
db.delete(item)
db.commit()
items = list(db.scalars(select(Item).order_by(Item.name)).all())
return templates.TemplateResponse(request, "partials/item_rows.html", {"items": items})
# --- Meals ---
def _load_meals(db: DbSession) -> list[Meal]:
return list(
db.scalars(
select(Meal).options(selectinload(Meal.ingredients).selectinload(MealIngredient.item)).order_by(Meal.name)
).all()
)
@router.get("/meals", response_class=HTMLResponse)
def meals_page(request: Request, db: DbSession) -> HTMLResponse:
"""Render the meals page."""
meals = _load_meals(db)
return templates.TemplateResponse(request, "meals.html", {"meals": meals})
@router.post("/meals", response_class=HTMLResponse)
def htmx_create_meal(
request: Request,
db: DbSession,
name: Annotated[str, Form()],
instructions: Annotated[str | None, Form()] = None,
) -> HTMLResponse:
"""Create a meal and return updated meal rows."""
db.add(Meal(name=name, instructions=instructions or None))
db.commit()
meals = _load_meals(db)
return templates.TemplateResponse(request, "partials/meal_rows.html", {"meals": meals})
@router.delete("/meals/{meal_id}", response_class=HTMLResponse)
def htmx_delete_meal(request: Request, meal_id: int, db: DbSession) -> HTMLResponse:
"""Delete a meal and return updated meal rows."""
meal = db.get(Meal, meal_id)
if meal:
db.delete(meal)
db.commit()
meals = _load_meals(db)
return templates.TemplateResponse(request, "partials/meal_rows.html", {"meals": meals})
# --- Meal detail ---
def _load_meal(db: DbSession, meal_id: int) -> Meal | None:
return db.scalar(
select(Meal).where(Meal.id == meal_id).options(selectinload(Meal.ingredients).selectinload(MealIngredient.item))
)
@router.get("/meals/{meal_id}", response_class=HTMLResponse)
def meal_detail_page(request: Request, meal_id: int, db: DbSession) -> HTMLResponse:
"""Render the meal detail page."""
meal = _load_meal(db, meal_id)
if not meal:
raise HTTPException(status_code=404, detail="Meal not found")
items = list(db.scalars(select(Item).order_by(Item.name)).all())
return templates.TemplateResponse(request, "meal_detail.html", {"meal": meal, "items": items})
@router.post("/meals/{meal_id}/ingredients", response_class=HTMLResponse)
def htmx_add_ingredient(
request: Request,
meal_id: int,
db: DbSession,
item_id: Annotated[int, Form()],
quantity_needed: Annotated[float, Form()],
) -> HTMLResponse:
"""Add an ingredient to a meal and return updated ingredient rows."""
if quantity_needed <= 0:
raise HTTPException(status_code=422, detail="Quantity must be positive")
meal = db.get(Meal, meal_id)
if not meal:
raise HTTPException(status_code=404, detail="Meal not found")
if not db.get(Item, item_id):
raise HTTPException(status_code=422, detail="Item not found")
existing = db.scalar(
select(MealIngredient).where(MealIngredient.meal_id == meal_id, MealIngredient.item_id == item_id)
)
if existing:
raise HTTPException(status_code=409, detail="Ingredient already exists for this meal")
db.add(MealIngredient(meal_id=meal_id, item_id=item_id, quantity_needed=quantity_needed))
db.commit()
meal = _load_meal(db, meal_id)
return templates.TemplateResponse(request, "partials/ingredient_rows.html", {"meal": meal})
@router.delete("/meals/{meal_id}/ingredients/{item_id}", response_class=HTMLResponse)
def htmx_remove_ingredient(
request: Request,
meal_id: int,
item_id: int,
db: DbSession,
) -> HTMLResponse:
"""Remove an ingredient from a meal and return updated ingredient rows."""
mi = db.scalar(select(MealIngredient).where(MealIngredient.meal_id == meal_id, MealIngredient.item_id == item_id))
if mi:
db.delete(mi)
db.commit()
meal = _load_meal(db, meal_id)
return templates.TemplateResponse(request, "partials/ingredient_rows.html", {"meal": meal})
# --- Availability ---
@router.get("/availability", response_class=HTMLResponse)
def availability_page(request: Request, db: DbSession) -> HTMLResponse:
"""Render the meal availability page."""
meals = list(
db.scalars(select(Meal).options(selectinload(Meal.ingredients).selectinload(MealIngredient.item))).all()
)
availability = [_check_meal(m) for m in meals]
return templates.TemplateResponse(request, "availability.html", {"availability": availability})
+212
View File
@@ -0,0 +1,212 @@
:root {
--neon-pink: #ff2a6d;
--neon-cyan: #05d9e8;
--neon-yellow: #f9f002;
--neon-purple: #d300c5;
--bg-dark: #0a0a0f;
--bg-panel: #0d0d1a;
--bg-input: #111128;
--border: #1a1a3e;
--text: #c0c0d0;
--text-dim: #8e8ea0;
}
* { box-sizing: border-box; margin: 0; padding: 0; }
body {
font-family: 'Share Tech Mono', monospace;
max-width: 900px;
margin: 0 auto;
padding: 1rem;
background: var(--bg-dark);
color: var(--text);
position: relative;
}
/* Scanline overlay */
body::before {
content: '';
position: fixed;
top: 0; left: 0; right: 0; bottom: 0;
background: repeating-linear-gradient(
0deg,
transparent,
transparent 2px,
rgba(0, 0, 0, 0.08) 2px,
rgba(0, 0, 0, 0.08) 4px
);
pointer-events: none;
z-index: 9999;
}
h1, h2, h3 {
font-family: 'Orbitron', sans-serif;
margin-bottom: 0.5rem;
color: var(--neon-cyan);
text-shadow: 0 0 10px rgba(5, 217, 232, 0.5), 0 0 40px rgba(5, 217, 232, 0.2);
text-transform: uppercase;
letter-spacing: 2px;
}
a { color: var(--neon-pink); text-decoration: none; transition: all 0.2s; }
a:hover {
text-shadow: 0 0 8px rgba(255, 42, 109, 0.8), 0 0 20px rgba(255, 42, 109, 0.4);
}
nav {
display: flex;
gap: 1.5rem;
padding: 1rem 0;
border-bottom: 1px solid var(--border);
margin-bottom: 1.5rem;
position: relative;
}
nav::after {
content: '';
position: absolute;
bottom: -1px;
left: 0;
right: 0;
height: 1px;
background: linear-gradient(90deg, var(--neon-pink), var(--neon-cyan), var(--neon-purple));
opacity: 0.6;
}
nav a {
font-family: 'Orbitron', sans-serif;
font-weight: 700;
font-size: 0.85rem;
letter-spacing: 1px;
text-transform: uppercase;
padding: 0.3rem 0;
border-bottom: 2px solid transparent;
transition: all 0.2s;
}
nav a:hover {
border-bottom-color: var(--neon-pink);
text-shadow: 0 0 8px rgba(255, 42, 109, 0.8);
}
table {
width: 100%;
border-collapse: collapse;
margin: 1rem 0;
border: 1px solid var(--border);
}
th, td {
text-align: left;
padding: 0.6rem 0.75rem;
border-bottom: 1px solid var(--border);
}
th {
font-family: 'Orbitron', sans-serif;
color: var(--neon-cyan);
font-size: 0.7rem;
text-transform: uppercase;
letter-spacing: 2px;
background: var(--bg-panel);
border-bottom: 1px solid var(--neon-cyan);
text-shadow: 0 0 6px rgba(5, 217, 232, 0.3);
}
tr:hover td {
background: rgba(5, 217, 232, 0.03);
}
form {
display: flex;
flex-wrap: wrap;
gap: 0.5rem;
align-items: end;
margin: 1rem 0;
padding: 1rem;
border: 1px solid var(--border);
background: var(--bg-panel);
}
input, select {
padding: 0.5rem 0.6rem;
border: 1px solid var(--border);
border-radius: 2px;
background: var(--bg-input);
color: var(--neon-cyan);
font-family: 'Share Tech Mono', monospace;
transition: all 0.2s;
}
input:focus, select:focus {
outline: none;
border-color: var(--neon-cyan);
box-shadow: 0 0 8px rgba(5, 217, 232, 0.3), inset 0 0 8px rgba(5, 217, 232, 0.05);
}
button {
padding: 0.5rem 1.2rem;
border: 1px solid var(--neon-pink);
border-radius: 2px;
background: transparent;
color: var(--neon-pink);
cursor: pointer;
font-family: 'Orbitron', sans-serif;
font-weight: 700;
font-size: 0.7rem;
letter-spacing: 1px;
text-transform: uppercase;
transition: all 0.2s;
}
button:hover {
background: var(--neon-pink);
color: var(--bg-dark);
box-shadow: 0 0 15px rgba(255, 42, 109, 0.5), 0 0 30px rgba(255, 42, 109, 0.2);
}
button.danger {
border-color: var(--text-dim);
color: var(--text-dim);
}
button.danger:hover {
border-color: var(--neon-pink);
background: var(--neon-pink);
color: var(--bg-dark);
box-shadow: 0 0 15px rgba(255, 42, 109, 0.5);
}
.badge {
display: inline-block;
padding: 0.2rem 0.6rem;
border-radius: 2px;
font-family: 'Orbitron', sans-serif;
font-size: 0.65rem;
font-weight: 700;
letter-spacing: 1px;
text-transform: uppercase;
}
.badge.yes {
background: rgba(5, 217, 232, 0.1);
color: var(--neon-cyan);
border: 1px solid var(--neon-cyan);
text-shadow: 0 0 6px rgba(5, 217, 232, 0.5);
}
.badge.no {
background: rgba(255, 42, 109, 0.1);
color: var(--neon-pink);
border: 1px solid var(--neon-pink);
text-shadow: 0 0 6px rgba(255, 42, 109, 0.5);
}
.missing-list { font-size: 0.85rem; color: var(--text-dim); }
label {
font-size: 0.75rem;
color: var(--text-dim);
display: flex;
flex-direction: column;
gap: 0.2rem;
text-transform: uppercase;
letter-spacing: 1px;
}
.flash {
padding: 0.5rem 1rem;
margin: 0.5rem 0;
border-radius: 2px;
background: rgba(5, 217, 232, 0.1);
color: var(--neon-cyan);
border: 1px solid var(--neon-cyan);
}
@@ -0,0 +1,30 @@
{% extends "base.html" %}
{% block title %}What Can I Make? - Van{% endblock %}
{% block content %}
<h1>What Can I Make?</h1>
<table>
<thead>
<tr><th>Meal</th><th>Status</th><th>Missing</th></tr>
</thead>
<tbody>
{% for meal in availability %}
<tr>
<td><a href="/meals/{{ meal.meal_id }}">{{ meal.meal_name }}</a></td>
<td>
{% if meal.can_make %}
<span class="badge yes">Ready</span>
{% else %}
<span class="badge no">Missing items</span>
{% endif %}
</td>
<td class="missing-list">
{% for m in meal.missing %}
{{ m.item_name }}: need {{ m.short }} more {{ m.unit }}{% if not loop.last %}, {% endif %}
{% endfor %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endblock %}
+20
View File
@@ -0,0 +1,20 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{% block title %}Van Inventory{% endblock %}</title>
<script src="https://unpkg.com/htmx.org@2.0.4"></script>
<link rel="preconnect" href="https://fonts.googleapis.com">
<link href="https://fonts.googleapis.com/css2?family=Orbitron:wght@400;700;900&family=Share+Tech+Mono&display=swap" rel="stylesheet">
<link rel="stylesheet" href="/static/style.css">
</head>
<body>
<nav>
<a href="/">Inventory</a>
<a href="/meals">Meals</a>
<a href="/availability">What Can I Make?</a>
</nav>
{% block content %}{% endblock %}
</body>
</html>
+17
View File
@@ -0,0 +1,17 @@
{% extends "base.html" %}
{% block title %}Inventory - Van{% endblock %}
{% block content %}
<h1>Van Inventory</h1>
<form hx-post="/items" hx-target="#item-list" hx-swap="innerHTML" hx-on::after-request="if(event.detail.successful) this.reset()">
<label>Name <input type="text" name="name" required></label>
<label>Qty <input type="number" name="quantity" step="any" value="0" min="0" required></label>
<label>Unit <input type="text" name="unit" required placeholder="lbs, cans, etc"></label>
<label>Category <input type="text" name="category" placeholder="optional"></label>
<button type="submit">Add Item</button>
</form>
<div id="item-list">
{% include "partials/item_rows.html" %}
</div>
{% endblock %}
@@ -0,0 +1,24 @@
{% extends "base.html" %}
{% block title %}{{ meal.name }} - Van{% endblock %}
{% block content %}
<h1>{{ meal.name }}</h1>
{% if meal.instructions %}<p>{{ meal.instructions }}</p>{% endif %}
<h2>Ingredients</h2>
<form hx-post="/meals/{{ meal.id }}/ingredients" hx-target="#ingredient-list" hx-swap="innerHTML" hx-on::after-request="if(event.detail.successful) this.reset()">
<label>Item
<select name="item_id" required>
<option value="">--</option>
{% for item in items %}
<option value="{{ item.id }}">{{ item.name }} ({{ item.unit }})</option>
{% endfor %}
</select>
</label>
<label>Qty needed <input type="number" name="quantity_needed" step="any" min="0.01" required></label>
<button type="submit">Add</button>
</form>
<div id="ingredient-list">
{% include "partials/ingredient_rows.html" %}
</div>
{% endblock %}
+15
View File
@@ -0,0 +1,15 @@
{% extends "base.html" %}
{% block title %}Meals - Van{% endblock %}
{% block content %}
<h1>Meals</h1>
<form hx-post="/meals" hx-target="#meal-list" hx-swap="innerHTML" hx-on::after-request="if(event.detail.successful) this.reset()">
<label>Name <input type="text" name="name" required></label>
<label>Instructions <input type="text" name="instructions" placeholder="optional"></label>
<button type="submit">Add Meal</button>
</form>
<div id="meal-list">
{% include "partials/meal_rows.html" %}
</div>
{% endblock %}
@@ -0,0 +1,16 @@
<table>
<thead>
<tr><th>Item</th><th>Needed</th><th>Have</th><th>Unit</th><th></th></tr>
</thead>
<tbody>
{% for mi in meal.ingredients %}
<tr>
<td>{{ mi.item.name }}</td>
<td>{{ mi.quantity_needed }}</td>
<td>{{ mi.item.quantity }}</td>
<td>{{ mi.item.unit }}</td>
<td><button class="danger" hx-delete="/meals/{{ meal.id }}/ingredients/{{ mi.item_id }}" hx-target="#ingredient-list" hx-swap="innerHTML" hx-confirm="Remove {{ mi.item.name }}?">X</button></td>
</tr>
{% endfor %}
</tbody>
</table>
@@ -0,0 +1,21 @@
<table>
<thead>
<tr><th>Name</th><th>Qty</th><th>Unit</th><th>Category</th><th></th></tr>
</thead>
<tbody>
{% for item in items %}
<tr>
<td>{{ item.name }}</td>
<td>
<form hx-patch="/items/{{ item.id }}" hx-target="#item-list" hx-swap="innerHTML" style="display:inline; margin:0;">
<input type="number" name="quantity" value="{{ item.quantity }}" step="any" min="0" style="width:5rem">
<button type="submit" style="padding:0.2rem 0.5rem; font-size:0.8rem;">Update</button>
</form>
</td>
<td>{{ item.unit }}</td>
<td>{{ item.category or "" }}</td>
<td><button class="danger" hx-delete="/items/{{ item.id }}" hx-target="#item-list" hx-swap="innerHTML" hx-confirm="Delete {{ item.name }}?">X</button></td>
</tr>
{% endfor %}
</tbody>
</table>
@@ -0,0 +1,15 @@
<table>
<thead>
<tr><th>Name</th><th>Ingredients</th><th>Instructions</th><th></th></tr>
</thead>
<tbody>
{% for meal in meals %}
<tr>
<td><a href="/meals/{{ meal.id }}">{{ meal.name }}</a></td>
<td>{{ meal.ingredients | length }}</td>
<td>{{ (meal.instructions or "")[:50] }}</td>
<td><button class="danger" hx-delete="/meals/{{ meal.id }}" hx-target="#meal-list" hx-swap="innerHTML" hx-confirm="Delete {{ meal.name }}?">X</button></td>
</tr>
{% endfor %}
</tbody>
</table>
+1 -1
View File
@@ -257,7 +257,7 @@ def update_weather(config: Config) -> None:
logger.info(f"Masked location: {masked_lat}, {masked_lon}")
weather = fetch_weather(config.pirate_weather_api_key, masked_lat, masked_lon)
weather = fetch_weather(config.pirate_weather_api_key, lat, lon)
logger.info(f"Weather: {weather.temperature}°F, {weather.condition}")
post_to_ha(config.ha_url, config.ha_token, weather)
+1 -3
View File
@@ -1,8 +1,6 @@
"""Models for van weather service."""
from __future__ import annotations
from datetime import datetime # noqa: TC003 This is required for pydantic
from datetime import datetime
from pydantic import BaseModel, field_serializer
+5 -5
View File
@@ -108,7 +108,7 @@ class Dataset:
self.written = int(properties["written"]["value"])
self.xattr = properties["xattr"]["value"]
def get_snapshots(self) -> list[Snapshot]:
def get_snapshots(self) -> list[Snapshot] | None:
"""Get all snapshots from zfs and process then is test dicts of sets."""
snapshots_data = _zfs_list(f"zfs list -t snapshot -pHj {self.name} -o all")
@@ -125,10 +125,10 @@ class Dataset:
if return_code == 0:
return "snapshot created"
snapshots = self.get_snapshots()
snapshot_names = {snapshot.name for snapshot in snapshots}
if snapshot_name in snapshot_names:
return f"Snapshot {snapshot_name} already exists for {self.name}"
if snapshots := self.get_snapshots():
snapshot_names = {snapshot.name for snapshot in snapshots}
if snapshot_name in snapshot_names:
return f"Snapshot {snapshot_name} already exists for {self.name}"
return f"Failed to create snapshot {snapshot_name} for {self.name}"
+50
View File
@@ -0,0 +1,50 @@
{
pkgs,
inputs,
...
}:
{
networking.firewall.allowedTCPPorts = [ 8001 ];
users = {
users.vaninventory = {
isSystemUser = true;
group = "vaninventory";
};
groups.vaninventory = { };
};
systemd.services.van_inventory = {
description = "Van Inventory API";
after = [
"network.target"
"postgresql.service"
];
requires = [ "postgresql.service" ];
wantedBy = [ "multi-user.target" ];
environment = {
PYTHONPATH = "${inputs.self}/";
VAN_INVENTORY_DB = "vaninventory";
VAN_INVENTORY_USER = "vaninventory";
VAN_INVENTORY_HOST = "/run/postgresql";
VAN_INVENTORY_PORT = "5432";
};
serviceConfig = {
Type = "simple";
User = "vaninventory";
Group = "vaninventory";
ExecStart = "${pkgs.my_python}/bin/python -m python.van_inventory.main --host 0.0.0.0 --port 8001";
Restart = "on-failure";
RestartSec = "5s";
StandardOutput = "journal";
StandardError = "journal";
NoNewPrivileges = true;
ProtectSystem = "strict";
ProtectHome = "read-only";
PrivateTmp = true;
ReadOnlyPaths = [ "${inputs.self}" ];
};
};
}
+4 -4
View File
@@ -19,7 +19,7 @@ if TYPE_CHECKING:
class MockFuture(Future):
"""MockFuture."""
def __init__(self, result: Any) -> None:
def __init__(self, result: Any) -> None: # noqa: ANN401
"""Init."""
super().__init__()
self._result = result
@@ -31,7 +31,7 @@ class MockFuture(Future):
logging.debug(f"{timeout}=")
return self._exception
def result(self, timeout: float | None = None) -> Any:
def result(self, timeout: float | None = None) -> Any: # noqa: ANN401
"""Result."""
logging.debug(f"{timeout}=")
return self._result
@@ -40,11 +40,11 @@ class MockFuture(Future):
class MockPoolExecutor(ThreadPoolExecutor):
"""MockPoolExecutor."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
def __init__(self, *args: Any, **kwargs: Any) -> None: # noqa: ANN401
"""Initializes a new ThreadPoolExecutor instance."""
super().__init__(*args, **kwargs)
def submit(self, fn: Callable[..., Any], /, *args: Any, **kwargs: Any) -> Future:
def submit(self, fn: Callable[..., Any], /, *args: Any, **kwargs: Any) -> Future: # noqa: ANN401
"""Submits a callable to be executed with the given arguments.
Args:
+3 -6
View File
@@ -21,7 +21,7 @@ def test_validate_system(mocker: MockerFixture, fs: FakeFilesystem) -> None:
"""test_validate_system."""
fs.create_file(
"/mock_snapshot_config.toml",
contents='zpools = ["root_pool", "storage", "media"]\nservices = ["docker"]\n',
contents='zpool = ["root_pool", "storage", "media"]\nservices = ["docker"]\n',
)
mocker.patch(f"{VALIDATE_SYSTEM}.systemd_tests", return_value=None)
@@ -33,10 +33,9 @@ def test_validate_system_errors(mocker: MockerFixture, fs: FakeFilesystem) -> No
"""test_validate_system_errors."""
fs.create_file(
"/mock_snapshot_config.toml",
contents='zpools = ["root_pool", "storage", "media"]\nservices = ["docker"]\n',
contents='zpool = ["root_pool", "storage", "media"]\nservices = ["docker"]\n',
)
mocker.patch(f"{VALIDATE_SYSTEM}.signal_alert")
mocker.patch(f"{VALIDATE_SYSTEM}.systemd_tests", return_value=["systemd_tests error"])
mocker.patch(f"{VALIDATE_SYSTEM}.zpool_tests", return_value=["zpool_tests error"])
@@ -50,11 +49,9 @@ def test_validate_system_execution(mocker: MockerFixture, fs: FakeFilesystem) ->
"""test_validate_system_execution."""
fs.create_file(
"/mock_snapshot_config.toml",
contents='zpools = ["root_pool", "storage", "media"]\nservices = ["docker"]\n',
contents='zpool = ["root_pool", "storage", "media"]\nservices = ["docker"]\n',
)
mocker.patch(f"{VALIDATE_SYSTEM}.signal_alert")
mocker.patch(f"{VALIDATE_SYSTEM}.systemd_tests", return_value=None)
mocker.patch(f"{VALIDATE_SYSTEM}.zpool_tests", side_effect=RuntimeError("zpool_tests error"))
with pytest.raises(SystemExit) as exception_info: