Compare commits

..

1 Commits

Author SHA1 Message Date
dc210d7380 adding xf86_input_wacom 2025-11-02 09:54:38 -05:00
47 changed files with 481 additions and 2456 deletions

View File

@@ -16,4 +16,4 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Run tests
run: nix develop .#devShells.x86_64-linux.default -c pytest tests
run: pytest tests

View File

@@ -252,7 +252,6 @@
"schemeless",
"scrollback",
"SECUREFOX",
"sessionmaker",
"sessionstore",
"shellcheck",
"signon",
@@ -264,7 +263,6 @@
"socialtracking",
"sonarr",
"sponsorblock",
"sqlalchemy",
"sqltools",
"ssdp",
"SSHOPTS",
@@ -327,10 +325,5 @@
"zoxide",
"zram",
"zstd"
],
"python-envs.defaultEnvManager": "ms-python.python:system",
"python-envs.pythonProjects": [],
"python.testing.pytestArgs": ["tests"],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
]
}

View File

@@ -16,7 +16,6 @@
./nh.nix
./nix.nix
./programs.nix
./safe_reboot.nix
./ssh.nix
./snapshot_manager.nix
];
@@ -45,15 +44,7 @@
# firmware update
fwupd.enable = true;
snapshot_manager = {
enable = lib.mkDefault true;
PYTHONPATH = "${inputs.self}/";
};
safe_reboot = {
enable = lib.mkDefault true;
datasetPrefix = "root_pool/";
};
snapshot_manager.enable = lib.mkDefault true;
zfs = {
trim.enable = lib.mkDefault true;

View File

@@ -1,56 +0,0 @@
{
config,
inputs,
lib,
pkgs,
...
}:
let
cfg = config.services.safe_reboot;
python_command =
lib.escapeShellArgs (
[
"${pkgs.my_python}/bin/python"
"-m"
"python.tools.safe_reboot"
]
++ lib.optionals (cfg.drivePath != null) [ cfg.drivePath ]
++ [
"--dataset-prefix"
cfg.datasetPrefix
"--check-only"
]
);
in
{
options.services.safe_reboot = {
enable = lib.mkEnableOption "Safe reboot dataset/drive validation";
datasetPrefix = lib.mkOption {
type = lib.types.str;
default = "root_pool/";
description = "Dataset prefix that must have exec enabled before rebooting.";
};
drivePath = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Drive path that must exist before rebooting. Set to null to skip.";
};
};
config = lib.mkIf cfg.enable {
systemd.services.safe-reboot-check = {
description = "Safe reboot validation";
before = [ "systemd-reboot.service" ];
wantedBy = [ "reboot.target" ];
partOf = [ "reboot.target" ];
path = [ pkgs.zfs ];
environment = {
PYTHONPATH = "${inputs.self}/";
};
serviceConfig = {
Type = "oneshot";
ExecStart = python_command;
};
};
};
}

View File

@@ -1,4 +1,5 @@
{
inputs,
pkgs,
lib,
config,
@@ -16,12 +17,6 @@ in
default = ./snapshot_config.toml;
description = "Path to the snapshot_manager TOML config.";
};
PYTHONPATH = lib.mkOption {
type = lib.types.str;
description = ''
the PYTHONPATH to use for the snapshot_manager service.
'';
};
EnvironmentFile = lib.mkOption {
type = lib.types.nullOr (lib.types.coercedTo lib.types.path toString lib.types.str);
default = null;
@@ -40,12 +35,11 @@ in
requires = [ "zfs-import.target" ];
after = [ "zfs-import.target" ];
path = [ pkgs.zfs ];
environment = {
PYTHONPATH = cfg.PYTHONPATH;
};
serviceConfig = {
Type = "oneshot";
ExecStart = "${pkgs.my_python}/bin/python -m python.tools.snapshot_manager ${lib.escapeShellArg cfg.path}";
ExecStart = "${
inputs.system_tools.packages.${pkgs.system}.default
}/bin/snapshot_manager ${lib.escapeShellArg cfg.path}";
}
// lib.optionalAttrs (cfg.EnvironmentFile != null) {
EnvironmentFile = cfg.EnvironmentFile;

View File

@@ -37,8 +37,6 @@
TcpKeepAlive = "no";
X11Forwarding = lib.mkDefault false;
KexAlgorithms = [
"sntrup761x25519-sha512@openssh.com"
"mlkem768x25519-sha256"
"curve25519-sha256@libssh.org"
"diffie-hellman-group-exchange-sha256"
];

View File

@@ -1,8 +1,8 @@
{ pkgs, ... }:
{
boot = {
kernelPackages = pkgs.linuxPackages_6_17;
zfs.package = pkgs.zfs_unstable;
kernelPackages = pkgs.linuxPackages_6_16;
zfs.package = pkgs.zfs_2_3;
};
hardware.bluetooth = {

135
flake.lock generated
View File

@@ -8,11 +8,11 @@
},
"locked": {
"dir": "pkgs/firefox-addons",
"lastModified": 1763697825,
"narHash": "sha256-AgCCcVPOi1tuzuW5/StlwqBjRWSX62oL97qWuxrq5UA=",
"lastModified": 1760673822,
"narHash": "sha256-h+liPhhMw1yYvkDGLHzQJQShQs+yLjNgjfAyZX+sRrM=",
"owner": "rycee",
"repo": "nur-expressions",
"rev": "cefce78793603231be226fa77e7ad58e0e4899b8",
"rev": "5cca27f1bb30a26140d0cf60ab34daa45b4fa11f",
"type": "gitlab"
},
"original": {
@@ -29,11 +29,11 @@
]
},
"locked": {
"lastModified": 1763748372,
"narHash": "sha256-AUc78Qv3sWir0hvbmfXoZ7Jzq9VVL97l+sP9Jgms+JU=",
"lastModified": 1760662441,
"narHash": "sha256-mlDqR1Ntgs9uYYEAUR1IhamKBO0lxoNS4zGLzEZaY0A=",
"owner": "nix-community",
"repo": "home-manager",
"rev": "d10a9b16b2a3ee28433f3d1c603f4e9f1fecb8e1",
"rev": "722792af097dff5790f1a66d271a47759f477755",
"type": "github"
},
"original": {
@@ -44,11 +44,11 @@
},
"nixos-hardware": {
"locked": {
"lastModified": 1762847253,
"narHash": "sha256-BWWnUUT01lPwCWUvS0p6Px5UOBFeXJ8jR+ZdLX8IbrU=",
"lastModified": 1760106635,
"narHash": "sha256-2GoxVaKWTHBxRoeUYSjv0AfSOx4qw5CWSFz2b+VolKU=",
"owner": "nixos",
"repo": "nixos-hardware",
"rev": "899dc449bc6428b9ee6b3b8f771ca2b0ef945ab9",
"rev": "9ed85f8afebf2b7478f25db0a98d0e782c0ed903",
"type": "github"
},
"original": {
@@ -60,11 +60,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1763421233,
"narHash": "sha256-Stk9ZYRkGrnnpyJ4eqt9eQtdFWRRIvMxpNRf4sIegnw=",
"lastModified": 1760524057,
"narHash": "sha256-EVAqOteLBFmd7pKkb0+FIUyzTF61VKi7YmvP1tw4nEw=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "89c2b2330e733d6cdb5eae7b899326930c2c0648",
"rev": "544961dfcce86422ba200ed9a0b00dd4b1486ec5",
"type": "github"
},
"original": {
@@ -76,11 +76,11 @@
},
"nixpkgs-master": {
"locked": {
"lastModified": 1763774007,
"narHash": "sha256-PPeHfKA11P09kBkBD5pS3tIAFjnG5muHQnODQGTY87g=",
"lastModified": 1760751316,
"narHash": "sha256-1296zQfPiLZNrLKzX1t+kunadeI/mH82hKze3voduEI=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "8a7cf7e9e18384533d9ecd0bfbcf475ac1dc497e",
"rev": "d85429339c0bcf0428084fe1306c970aed364417",
"type": "github"
},
"original": {
@@ -106,6 +106,56 @@
"type": "github"
}
},
"pyproject-build-systems": {
"inputs": {
"nixpkgs": [
"system_tools",
"nixpkgs"
],
"pyproject-nix": [
"system_tools",
"pyproject-nix"
],
"uv2nix": [
"system_tools",
"uv2nix"
]
},
"locked": {
"lastModified": 1744599653,
"narHash": "sha256-nysSwVVjG4hKoOjhjvE6U5lIKA8sEr1d1QzEfZsannU=",
"owner": "pyproject-nix",
"repo": "build-system-pkgs",
"rev": "7dba6dbc73120e15b558754c26024f6c93015dd7",
"type": "github"
},
"original": {
"owner": "pyproject-nix",
"repo": "build-system-pkgs",
"type": "github"
}
},
"pyproject-nix": {
"inputs": {
"nixpkgs": [
"system_tools",
"nixpkgs"
]
},
"locked": {
"lastModified": 1746540146,
"narHash": "sha256-QxdHGNpbicIrw5t6U3x+ZxeY/7IEJ6lYbvsjXmcxFIM=",
"owner": "pyproject-nix",
"repo": "pyproject.nix",
"rev": "e09c10c24ebb955125fda449939bfba664c467fd",
"type": "github"
},
"original": {
"owner": "pyproject-nix",
"repo": "pyproject.nix",
"type": "github"
}
},
"root": {
"inputs": {
"firefox-addons": "firefox-addons",
@@ -115,6 +165,7 @@
"nixpkgs-master": "nixpkgs-master",
"nixpkgs-stable": "nixpkgs-stable",
"sops-nix": "sops-nix",
"system_tools": "system_tools",
"systems": "systems"
}
},
@@ -125,11 +176,11 @@
]
},
"locked": {
"lastModified": 1763607916,
"narHash": "sha256-VefBA1JWRXM929mBAFohFUtQJLUnEwZ2vmYUNkFnSjE=",
"lastModified": 1760393368,
"narHash": "sha256-8mN3kqyqa2PKY0wwZ2UmMEYMcxvNTwLaOrrDsw6Qi4E=",
"owner": "Mic92",
"repo": "sops-nix",
"rev": "877bb495a6f8faf0d89fc10bd142c4b7ed2bcc0b",
"rev": "ab8d56e85b8be14cff9d93735951e30c3e86a437",
"type": "github"
},
"original": {
@@ -138,6 +189,29 @@
"type": "github"
}
},
"system_tools": {
"inputs": {
"nixpkgs": [
"nixpkgs"
],
"pyproject-build-systems": "pyproject-build-systems",
"pyproject-nix": "pyproject-nix",
"uv2nix": "uv2nix"
},
"locked": {
"lastModified": 1760751967,
"narHash": "sha256-u/uciy9kpM/CBZKl05iAZRaOTwUHiuI0L/qbkk2mLUg=",
"owner": "RichieCahill",
"repo": "system_tools",
"rev": "a125c3e5c01cecbc3f2a842ffb1abb1210c35706",
"type": "github"
},
"original": {
"owner": "RichieCahill",
"repo": "system_tools",
"type": "github"
}
},
"systems": {
"locked": {
"lastModified": 1689347949,
@@ -152,6 +226,31 @@
"repo": "default-linux",
"type": "github"
}
},
"uv2nix": {
"inputs": {
"nixpkgs": [
"system_tools",
"nixpkgs"
],
"pyproject-nix": [
"system_tools",
"pyproject-nix"
]
},
"locked": {
"lastModified": 1747441483,
"narHash": "sha256-W8BFXk5R0TuJcjIhcGoMpSOaIufGXpizK0pm+uTqynA=",
"owner": "pyproject-nix",
"repo": "uv2nix",
"rev": "582024dc64663e9f88d467c2f7f7b20d278349de",
"type": "github"
},
"original": {
"owner": "pyproject-nix",
"repo": "uv2nix",
"type": "github"
}
}
},
"root": "root",

View File

@@ -31,6 +31,11 @@
inputs.nixpkgs.follows = "nixpkgs";
};
system_tools = {
url = "github:RichieCahill/system_tools";
inputs.nixpkgs.follows = "nixpkgs";
};
sops-nix = {
url = "github:Mic92/sops-nix";
inputs.nixpkgs.follows = "nixpkgs";
@@ -54,7 +59,6 @@
system:
import nixpkgs {
inherit system;
overlays = builtins.attrValues outputs.overlays;
config.allowUnfree = true;
}
);

View File

@@ -22,7 +22,6 @@
apscheduler
mypy
polars
psycopg
pyfakefs
pytest
pytest-cov
@@ -30,9 +29,6 @@
pytest-xdist
requests
ruff
scalene
sqlalchemy
textual
typer
types-requests
]

View File

@@ -48,12 +48,6 @@ lint.ignore = [
"ERA001", # (perm) I don't care about print statements dir
]
"python/splendor/**" = [
"S311", # (perm) there is no security issue here
"T201", # (perm) I don't care about print statements dir
"PLR2004", # (temps) need to think about this
]
[tool.ruff.lint.pydocstyle]
convention = "google"

View File

@@ -1,59 +0,0 @@
"""database."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from sqlalchemy import inspect
from sqlalchemy.exc import NoInspectionAvailable
if TYPE_CHECKING:
from collections.abc import Sequence
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
def safe_insert(orm_objects: Sequence[object], session: Session) -> list[tuple[Exception, object]]:
"""Safer insert at allows for partial rollbacks.
Args:
orm_objects (Sequence[object]): Tables to insert.
session (Session): Database session.
"""
if unmapped := [orm_object for orm_object in orm_objects if not _is_mapped_instance(orm_object)]:
error = f"safe_insert expects ORM-mapped instances {unmapped}"
raise TypeError(error)
return _safe_insert(orm_objects, session)
def _safe_insert(objects: Sequence[object], session: Session) -> list[tuple[Exception, object]]:
exceptions: list[tuple[Exception, object]] = []
try:
session.add_all(objects)
session.commit()
except Exception as error:
session.rollback()
objects_len = len(objects)
if objects_len == 1:
logger.exception(objects)
return [(error, objects[0])]
middle = objects_len // 2
exceptions.extend(_safe_insert(objects=objects[:middle], session=session))
exceptions.extend(_safe_insert(objects=objects[middle:], session=session))
return exceptions
def _is_mapped_instance(obj: object) -> bool:
"""Return True if `obj` is a SQLAlchemy ORM-mapped instance."""
try:
inspect(obj) # raises NoInspectionAvailable if not mapped
except NoInspectionAvailable:
return False
else:
return True

View File

@@ -85,7 +85,7 @@ def get_material_resistivity(
) -> float:
"""Get the resistivity of a material."""
if not temperature:
temperature = Temperature(20.0)
Temperature(20.0)
material_info = {
MaterialType.COPPER: (1.724e-8, 0.00393),
MaterialType.ALUMINUM: (2.908e-8, 0.00403),
@@ -180,7 +180,7 @@ def max_wire_length(
float: Maximum wire length in meters that maintains the specified voltage drop
"""
if not temperature:
temperature = Temperature(100.0, unit=TemperatureUnit.FAHRENHEIT)
Temperature(100.0, unit=TemperatureUnit.FAHRENHEIT)
resistivity = get_material_resistivity(material, temperature)
resistance_per_meter = resistivity / calculate_wire_area_m2(gauge)

View File

@@ -1 +0,0 @@
game_data/

View File

@@ -1 +0,0 @@
"""init."""

View File

@@ -1,675 +0,0 @@
"""Base logic for the Splendor game."""
from __future__ import annotations
import itertools
import json
import random
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Literal, Protocol
if TYPE_CHECKING:
from collections.abc import Sequence
from pathlib import Path
GemColor = Literal["white", "blue", "green", "red", "black", "gold"]
GEM_COLORS: tuple[GemColor, ...] = (
"white",
"blue",
"green",
"red",
"black",
"gold",
)
BASE_COLORS: tuple[GemColor, ...] = (
"white",
"blue",
"green",
"red",
"black",
)
GEM_ORDER: list[GemColor] = list(GEM_COLORS)
GEM_INDEX: dict[GemColor, int] = {c: i for i, c in enumerate(GEM_ORDER)}
BASE_INDEX: dict[GemColor, int] = {c: i for i, c in enumerate(BASE_COLORS)}
@dataclass(frozen=True)
class Card:
"""Development card: gives points + a permanent gem discount."""
tier: int
points: int
color: GemColor
cost: dict[GemColor, int]
@dataclass(frozen=True)
class Noble:
"""Noble tile: gives points if you have enough bonuses."""
name: str
points: int
requirements: dict[GemColor, int]
@dataclass
class PlayerState:
"""State of a player in the game."""
strategy: Strategy
tokens: dict[GemColor, int] = field(default_factory=lambda: dict.fromkeys(GEM_COLORS, 0))
discounts: dict[GemColor, int] = field(default_factory=lambda: dict.fromkeys(GEM_COLORS, 0))
cards: list[Card] = field(default_factory=list)
reserved: list[Card] = field(default_factory=list)
nobles: list[Noble] = field(default_factory=list)
card_score: int = 0
noble_score: int = 0
def total_tokens(self) -> int:
"""Total tokens in player's bank."""
return sum(self.tokens.values())
def add_noble(self, noble: Noble) -> None:
"""Add a noble to the player."""
self.nobles.append(noble)
self.noble_score = sum(noble.points for noble in self.nobles)
def add_card(self, card: Card) -> None:
"""Add a card to the player."""
self.cards.append(card)
self.card_score = sum(card.points for card in self.cards)
@property
def score(self) -> int:
"""Total points in player's cards + nobles."""
return self.card_score + self.noble_score
def can_afford(self, card: Card) -> bool:
"""Check if player can afford card, using discounts + gold."""
missing = 0
gold = self.tokens["gold"]
for color, cost in card.cost.items():
missing += max(0, cost - self.discounts.get(color, 0) - self.tokens.get(color, 0))
if missing > gold:
return False
return True
def pay_for_card(self, card: Card) -> dict[GemColor, int]:
"""Pay tokens for card, move card to tableau, return payment for bank."""
if not self.can_afford(card):
msg = f"cannot afford card {card}"
raise ValueError(msg)
payment: dict[GemColor, int] = dict.fromkeys(GEM_COLORS, 0)
gold_available = self.tokens["gold"]
for color in BASE_COLORS:
cost = card.cost.get(color, 0)
effective_cost = max(0, cost - self.discounts.get(color, 0))
use = min(self.tokens[color], effective_cost)
self.tokens[color] -= use
payment[color] += use
remaining = effective_cost - use
if remaining > 0:
use_gold = min(gold_available, remaining)
gold_available -= use_gold
self.tokens["gold"] -= use_gold
payment["gold"] += use_gold
self.add_card(card)
self.discounts[card.color] += 1
return payment
def get_default_starting_tokens(player_count: int) -> dict[GemColor, int]:
"""get_default_starting_tokens."""
token_count = (player_count * player_count - 3 * player_count + 10) // 2
return {
"white": token_count,
"blue": token_count,
"green": token_count,
"red": token_count,
"black": token_count,
"gold": 5,
}
@dataclass
class GameConfig:
"""Game configuration: gems, bank, cards, nobles, etc."""
win_score: int = 15
table_cards_per_tier: int = 4
reserve_limit: int = 3
token_limit: int = 10
turn_limit: int = 1000
minimum_tokens_to_buy_2: int = 4
max_token_take: int = 3
cards: list[Card] = field(default_factory=list)
nobles: list[Noble] = field(default_factory=list)
class GameState:
"""Game state: players, bank, decks, table, available nobles, etc."""
def __init__(
self,
config: GameConfig,
players: list[PlayerState],
bank: dict[GemColor, int],
decks_by_tier: dict[int, list[Card]],
table_by_tier: dict[int, list[Card]],
available_nobles: list[Noble],
) -> None:
"""Game state."""
self.config = config
self.players = players
self.bank = bank
self.decks_by_tier = decks_by_tier
self.table_by_tier = table_by_tier
self.available_nobles = available_nobles
self.noble_min_requirements = 0
self.get_noble_min_requirements()
self.current_player_index = 0
self.finished = False
def get_noble_min_requirements(self) -> None:
"""Find the minimum requirement for all available nobles."""
test = 0
for noble in self.available_nobles:
test = max(test, min(foo for foo in noble.requirements.values()))
self.noble_min_requirements = test
def next_player(self) -> None:
"""Advance to the next player."""
self.current_player_index = (self.current_player_index + 1) % len(self.players)
@property
def current_player(self) -> PlayerState:
"""Current player."""
return self.players[self.current_player_index]
def refill_table(self) -> None:
"""Refill face-up cards from decks."""
for tier, deck in self.decks_by_tier.items():
table = self.table_by_tier[tier]
while len(table) < self.config.table_cards_per_tier and deck:
table.append(deck.pop())
def check_winner_simple(self) -> PlayerState | None:
"""Simplified: end immediately when someone hits win_score."""
eligible = [player for player in self.players if player.score >= self.config.win_score]
if not eligible:
return None
eligible.sort(
key=lambda p: (p.score, -len(p.cards)),
reverse=True,
)
self.finished = True
return eligible[0]
class Action:
"""Marker protocol for actions."""
@dataclass
class TakeDifferent(Action):
"""Take up to 3 different gem colors."""
colors: list[GemColor]
@dataclass
class TakeDouble(Action):
"""Take two of the same color."""
color: GemColor
@dataclass
class BuyCard(Action):
"""Buy a face-up card."""
tier: int
index: int
@dataclass
class BuyCardReserved(Action):
"""Buy a face-up card."""
index: int
@dataclass
class ReserveCard(Action):
"""Reserve a face-up card."""
tier: int
index: int | None = None
from_deck: bool = False
class Strategy(Protocol):
"""Implement this to make a bot or human controller."""
def __init__(self, name: str) -> None:
"""Initialize a strategy."""
self.name = name
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Return an Action, or None to concede/end."""
raise NotImplementedError
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Called if player has more than token_limit tokens after an action.
Default: naive auto-discard.
"""
return auto_discard_tokens(player, excess)
def choose_noble(
self,
game: GameState, # noqa: ARG002
player: PlayerState, # noqa: ARG002
nobles: list[Noble],
) -> Noble:
"""Called if player qualifies for multiple nobles. Default: first."""
return nobles[0]
def auto_discard_tokens(player: PlayerState, excess: int) -> dict[GemColor, int]:
"""Very dumb discard logic: discard from colors you have the most of."""
to_discard: dict[GemColor, int] = dict.fromkeys(GEM_COLORS, 0)
remaining = excess
while remaining > 0:
color = max(player.tokens, key=lambda c: player.tokens[c])
if player.tokens[color] == 0:
break
player.tokens[color] -= 1
to_discard[color] += 1
remaining -= 1
return to_discard
def enforce_token_limit(
game: GameState,
strategy: Strategy,
player: PlayerState,
) -> None:
"""If player has more than token_limit tokens, force discards."""
limit = game.config.token_limit
total = player.total_tokens()
if total <= limit:
return
excess = total - limit
discards = strategy.choose_discard(game, player, excess)
for color, amount in discards.items():
available = player.tokens[color]
to_remove = min(amount, available)
if to_remove <= 0:
continue
player.tokens[color] -= to_remove
game.bank[color] += to_remove
remaining = player.total_tokens() - limit
if remaining > 0:
auto = auto_discard_tokens(player, remaining)
for color, amount in auto.items():
game.bank[color] += amount
def _check_nobles_for_player(player: PlayerState, noble: Noble) -> bool:
# this rule is slower
for color, cost in noble.requirements.items(): # noqa: SIM110
if player.discounts[color] < cost:
return False
return True
def check_nobles_for_player(
game: GameState,
strategy: Strategy,
player: PlayerState,
) -> None:
"""Award at most one noble to player if they qualify."""
if game.noble_min_requirements > max(player.discounts.values()):
return
candidates = [noble for noble in game.available_nobles if _check_nobles_for_player(player, noble)]
if not candidates:
return
chosen = candidates[0] if len(candidates) == 1 else strategy.choose_noble(game, player, candidates)
if chosen not in game.available_nobles:
return
game.available_nobles.remove(chosen)
game.get_noble_min_requirements()
player.add_noble(chosen)
def apply_take_different(game: GameState, strategy: Strategy, action: TakeDifferent) -> None:
"""Mutate game state according to action."""
player = game.current_player
colors = [color for color in action.colors if color in BASE_COLORS and game.bank[color] > 0]
if not (1 <= len(colors) <= game.config.max_token_take):
return
for color in colors:
game.bank[color] -= 1
player.tokens[color] += 1
enforce_token_limit(game, strategy, player)
def apply_take_double(game: GameState, strategy: Strategy, action: TakeDouble) -> None:
"""Mutate game state according to action."""
player = game.current_player
color = action.color
if color not in BASE_COLORS:
return
if game.bank[color] < game.config.minimum_tokens_to_buy_2:
return
game.bank[color] -= 2
player.tokens[color] += 2
enforce_token_limit(game, strategy, player)
def apply_buy_card(game: GameState, _strategy: Strategy, action: BuyCard) -> None:
"""Mutate game state according to action."""
player = game.current_player
row = game.table_by_tier.get(action.tier)
if row is None or not (0 <= action.index < len(row)):
return
card = row[action.index]
if not player.can_afford(card):
return
row.pop(action.index)
payment = player.pay_for_card(card)
for color, amount in payment.items():
game.bank[color] += amount
game.refill_table()
def apply_buy_card_reserved(game: GameState, _strategy: Strategy, action: BuyCardReserved) -> None:
"""Mutate game state according to action."""
player = game.current_player
if not (0 <= action.index < len(player.reserved)):
return
card = player.reserved[action.index]
if not player.can_afford(card):
return
player.reserved.pop(action.index)
payment = player.pay_for_card(card)
for color, amount in payment.items():
game.bank[color] += amount
def apply_reserve_card(game: GameState, strategy: Strategy, action: ReserveCard) -> None:
"""Mutate game state according to action."""
player = game.current_player
if len(player.reserved) >= game.config.reserve_limit:
return
card: Card | None = None
if action.from_deck:
deck = game.decks_by_tier.get(action.tier)
if deck:
card = deck.pop()
else:
row = game.table_by_tier.get(action.tier)
if row is None:
return
if action.index is None or not (0 <= action.index < len(row)):
return
card = row.pop(action.index)
game.refill_table()
if card is None:
return
player.reserved.append(card)
if game.bank["gold"] > 0:
game.bank["gold"] -= 1
player.tokens["gold"] += 1
enforce_token_limit(game, strategy, player)
def apply_action(game: GameState, strategy: Strategy, action: Action) -> None:
"""Mutate game state according to action."""
actions = {
TakeDifferent: apply_take_different,
TakeDouble: apply_take_double,
BuyCard: apply_buy_card,
ReserveCard: apply_reserve_card,
BuyCardReserved: apply_buy_card_reserved,
}
action_func = actions.get(type(action))
if action_func is None:
msg = f"Unknown action type: {type(action)}"
raise ValueError(msg)
action_func(game, strategy, action)
# not sure how to simplify this yet
def get_legal_actions( # noqa: C901
game: GameState,
player: PlayerState | None = None,
) -> list[Action]:
"""Enumerate all syntactically legal actions for the given player.
This enforces:
- token-taking rules
- reserve limits
- affordability for buys
"""
if player is None:
player = game.players[game.current_player_index]
actions: list[Action] = []
colors_available = [c for c in BASE_COLORS if game.bank[c] > 0]
for r in (1, 2, 3):
actions.extend(TakeDifferent(colors=list(combo)) for combo in itertools.combinations(colors_available, r))
actions.extend(
TakeDouble(color=color) for color in BASE_COLORS if game.bank[color] >= game.config.minimum_tokens_to_buy_2
)
for tier, row in game.table_by_tier.items():
for idx, card in enumerate(row):
if player.can_afford(card):
actions.append(BuyCard(tier=tier, index=idx))
for idx, card in enumerate(player.reserved):
if player.can_afford(card):
actions.append(BuyCardReserved(index=idx))
if len(player.reserved) < game.config.reserve_limit:
for tier, row in game.table_by_tier.items():
for idx, _ in enumerate(row):
actions.append(
ReserveCard(tier=tier, index=idx, from_deck=False),
)
for tier, deck in game.decks_by_tier.items():
if deck:
actions.append(
ReserveCard(tier=tier, index=None, from_deck=True),
)
return actions
def create_random_cards_tier(
tier: int,
card_count: int,
cost_choices: list[int],
point_choices: list[int],
) -> list[Card]:
"""Create a random set of cards for a given tier."""
cards: list[Card] = []
for color in BASE_COLORS:
for _ in range(card_count):
cost = dict.fromkeys(GEM_COLORS, 0)
for c in BASE_COLORS:
if c == color:
continue
cost[c] = random.choice(cost_choices)
points = random.choice(point_choices)
cards.append(Card(tier=tier, points=points, color=color, cost=cost))
return cards
def create_random_cards() -> list[Card]:
"""Generate a generic but Splendor-ish set of cards.
This is not the official deck, but structured similarly enough for play.
"""
cards: list[Card] = []
cards.extend(
create_random_cards_tier(
tier=1,
card_count=5,
cost_choices=[0, 1, 1, 2],
point_choices=[0, 0, 1],
)
)
cards.extend(
create_random_cards_tier(
tier=2,
card_count=4,
cost_choices=[2, 3, 4],
point_choices=[1, 2, 2, 3],
)
)
cards.extend(
create_random_cards_tier(
tier=3,
card_count=3,
cost_choices=[4, 5, 6],
point_choices=[3, 4, 5],
)
)
random.shuffle(cards)
return cards
def create_random_nobles() -> list[Noble]:
"""A small set of noble tiles, roughly Splendor-ish."""
nobles: list[Noble] = []
base_requirements: list[dict[GemColor, int]] = [
{"white": 3, "blue": 3, "green": 3},
{"blue": 3, "green": 3, "red": 3},
{"green": 3, "red": 3, "black": 3},
{"red": 3, "black": 3, "white": 3},
{"black": 3, "white": 3, "blue": 3},
{"white": 4, "blue": 4},
{"green": 4, "red": 4},
{"blue": 4, "black": 4},
]
for idx, req in enumerate(base_requirements, start=1):
nobles.append(
Noble(
name=f"Noble {idx}",
points=3,
requirements=dict(req.items()),
),
)
return nobles
def load_nobles(file: Path) -> list[Noble]:
"""Load nobles from a file."""
nobles = json.loads(file.read_text())
return [Noble(**noble) for noble in nobles]
def load_cards(file: Path) -> list[Card]:
"""Load cards from a file."""
cards = json.loads(file.read_text())
return [Card(**card) for card in cards]
def new_game(
strategies: Sequence[Strategy],
config: GameConfig,
) -> GameState:
"""Create a new game state from a config + list of players."""
num_players = len(strategies)
bank = get_default_starting_tokens(num_players)
decks_by_tier: dict[int, list[Card]] = {1: [], 2: [], 3: []}
for card in config.cards:
decks_by_tier.setdefault(card.tier, []).append(card)
for deck in decks_by_tier.values():
random.shuffle(deck)
table_by_tier: dict[int, list[Card]] = {1: [], 2: [], 3: []}
players = [PlayerState(strategy=strategy) for strategy in strategies]
nobles = list(config.nobles)
random.shuffle(nobles)
nobles = nobles[: num_players + 1]
game = GameState(
config=config,
players=players,
bank=bank,
decks_by_tier=decks_by_tier,
table_by_tier=table_by_tier,
available_nobles=nobles,
)
game.refill_table()
return game
def run_game(game: GameState) -> tuple[PlayerState, int]:
"""Run a full game loop until someone wins or a player returns None."""
turn_count = 0
while not game.finished:
turn_count += 1
player = game.current_player
strategy = player.strategy
action = strategy.choose_action(game, player)
if action is None:
game.finished = True
break
apply_action(game, strategy, action)
check_nobles_for_player(game, strategy, player)
winner = game.check_winner_simple()
if winner is not None:
return winner, turn_count
game.next_player()
if turn_count >= game.config.turn_limit:
break
fallback = max(game.players, key=lambda player: player.score)
return fallback, turn_count

View File

@@ -1,288 +0,0 @@
"""Bot for Splendor game."""
from __future__ import annotations
import random
from .base import (
BASE_COLORS,
Action,
BuyCard,
BuyCardReserved,
Card,
GameState,
GemColor,
PlayerState,
ReserveCard,
Strategy,
TakeDifferent,
TakeDouble,
auto_discard_tokens,
get_legal_actions,
)
def can_bot_afford(player: PlayerState, card: Card) -> bool:
"""Check if player can afford card, using discounts + gold."""
missing = 0
gold = player.tokens["gold"]
for color, cost in card.cost.items():
missing += max(0, cost - player.discounts.get(color, 0) - player.tokens.get(color, 0))
if missing > gold:
return False
return True
class RandomBot(Strategy):
"""Dumb bot that follows rules but doesn't think."""
def __init__(self, name: str) -> None:
"""Initialize the bot."""
super().__init__(name=name)
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Choose an action for the current player."""
affordable: list[tuple[int, int]] = []
for tier, row in game.table_by_tier.items():
for idx, card in enumerate(row):
if can_bot_afford(player, card):
affordable.append((tier, idx))
if affordable and random.random() < 0.5:
tier, idx = random.choice(affordable)
return BuyCard(tier=tier, index=idx)
if random.random() < 0.2:
tier = random.choice([1, 2, 3])
row = game.table_by_tier.get(tier, [])
if row:
idx = random.randrange(len(row))
return ReserveCard(tier=tier, index=idx, from_deck=False)
if random.random() < 0.5:
colors_for_double = [c for c in BASE_COLORS if game.bank[c] >= 4]
if colors_for_double:
return TakeDouble(color=random.choice(colors_for_double))
colors_for_diff = [c for c in BASE_COLORS if game.bank[c] > 0]
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[:3])
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Choose how many tokens to discard."""
return auto_discard_tokens(player, excess)
def check_cards_in_tier(row: list[Card], player: PlayerState) -> list[int]:
"""Check if player can afford card, using discounts + gold."""
return [index for index, card in enumerate(row) if can_bot_afford(player, card)]
class PersonalizedBot(Strategy):
"""PersonalizedBot."""
"""Dumb bot that follows rules but doesn't think."""
def __init__(self, name: str) -> None:
"""Initialize the bot."""
super().__init__(name=name)
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Choose an action for the current player."""
for tier in (1, 2, 3):
row = game.table_by_tier[tier]
if affordable := check_cards_in_tier(row, player):
index = random.choice(affordable)
return BuyCard(tier=tier, index=index)
colors_for_diff = [c for c in BASE_COLORS if game.bank[c] > 0]
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[:3])
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Choose how many tokens to discard."""
return auto_discard_tokens(player, excess)
class PersonalizedBot2(Strategy):
"""PersonalizedBot2."""
"""Dumb bot that follows rules but doesn't think."""
def __init__(self, name: str) -> None:
"""Initialize the bot."""
super().__init__(name=name)
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Choose an action for the current player."""
tiers = (1, 2, 3)
for tier in tiers:
row = game.table_by_tier[tier]
if affordable := check_cards_in_tier(row, player):
index = random.choice(affordable)
return BuyCard(tier=tier, index=index)
if affordable := check_cards_in_tier(player.reserved, player):
index = random.choice(affordable)
return BuyCardReserved(index=index)
colors_for_diff = [c for c in BASE_COLORS if game.bank[c] > 0]
if len(colors_for_diff) >= 3:
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[:3])
for tier in tiers:
len_deck = len(game.decks_by_tier[tier])
if len_deck:
return ReserveCard(tier=tier, index=None, from_deck=True)
return TakeDifferent(colors=colors_for_diff[:3])
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Choose how many tokens to discard."""
return auto_discard_tokens(player, excess)
def buy_card_reserved(player: PlayerState) -> Action | None:
"""Buy a card reserved."""
if affordable := check_cards_in_tier(player.reserved, player):
index = random.choice(affordable)
return BuyCardReserved(index=index)
return None
def buy_card(game: GameState, player: PlayerState) -> Action | None:
"""Buy a card."""
for tier in (1, 2, 3):
row = game.table_by_tier[tier]
if affordable := check_cards_in_tier(row, player):
index = random.choice(affordable)
return BuyCard(tier=tier, index=index)
return None
def take_tokens(game: GameState) -> Action | None:
"""Take tokens."""
colors_for_diff = [color for color in BASE_COLORS if game.bank[color] > 0]
if len(colors_for_diff) >= 3:
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[: game.config.max_token_take])
return None
class PersonalizedBot3(Strategy):
"""PersonalizedBot3."""
"""Dumb bot that follows rules but doesn't think."""
def __init__(self, name: str) -> None:
"""Initialize the bot."""
super().__init__(name=name)
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Choose an action for the current player."""
print(len(get_legal_actions(game, player)))
print(get_legal_actions(game, player))
if action := buy_card_reserved(player):
return action
if action := buy_card(game, player):
return action
colors_for_diff = [color for color in BASE_COLORS if game.bank[color] > 0]
if len(colors_for_diff) >= 3:
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[:3])
for tier in (1, 2, 3):
len_deck = len(game.decks_by_tier[tier])
if len_deck:
return ReserveCard(tier=tier, index=None, from_deck=True)
return TakeDifferent(colors=colors_for_diff[:3])
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Choose how many tokens to discard."""
return auto_discard_tokens(player, excess)
def estimate_value_of_card(game: GameState, player: PlayerState, color: GemColor) -> int:
"""Estimate value of a color in the player's bank."""
return game.bank[color] - player.discounts.get(color, 0)
def estimate_value_of_token(game: GameState, player: PlayerState, color: GemColor) -> int:
"""Estimate value of a color in the player's bank."""
return game.bank[color] - player.discounts.get(color, 0)
class PersonalizedBot4(Strategy):
"""PersonalizedBot4."""
def __init__(self, name: str) -> None:
"""Initialize the bot."""
super().__init__(name=name)
def filter_actions(self, actions: list[Action]) -> list[Action]:
"""Filter actions to only take different."""
return [
action
for action in actions
if (isinstance(action, TakeDifferent) and len(action.colors) == 3) or not isinstance(action, TakeDifferent)
]
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Choose an action for the current player."""
legal_actions = get_legal_actions(game, player)
print(len(legal_actions))
good_actions = self.filter_actions(legal_actions)
print(len(good_actions))
print(good_actions)
print(len(get_legal_actions(game, player)))
if action := buy_card_reserved(player):
return action
if action := buy_card(game, player):
return action
colors_for_diff = [color for color in BASE_COLORS if game.bank[color] > 0]
if len(colors_for_diff) >= 3:
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[:3])
for tier in (1, 2, 3):
len_deck = len(game.decks_by_tier[tier])
if len_deck:
return ReserveCard(tier=tier, index=None, from_deck=True)
return TakeDifferent(colors=colors_for_diff[:3])
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Choose how many tokens to discard."""
return auto_discard_tokens(player, excess)

View File

@@ -1,724 +0,0 @@
"""Splendor game."""
from __future__ import annotations
import sys
from typing import TYPE_CHECKING, Any
from textual.app import App, ComposeResult
from textual.containers import Horizontal, Vertical
from textual.widget import Widget
from textual.widgets import Footer, Header, Input, Static
from .base import (
BASE_COLORS,
GEM_COLORS,
Action,
BuyCard,
BuyCardReserved,
Card,
GameState,
GemColor,
Noble,
PlayerState,
ReserveCard,
Strategy,
TakeDifferent,
TakeDouble,
)
if TYPE_CHECKING:
from collections.abc import Mapping
# Abbreviations used when rendering costs
COST_ABBR: dict[GemColor, str] = {
"white": "W",
"blue": "B",
"green": "G",
"red": "R",
"black": "K",
"gold": "O",
}
# Abbreviations players can type on the command line
COLOR_ABBR_TO_FULL: dict[str, GemColor] = {
"w": "white",
"b": "blue",
"g": "green",
"r": "red",
"k": "black",
"o": "gold",
}
def parse_color_token(raw: str) -> GemColor:
"""Convert user input into a GemColor.
Supports:
- full names: white, blue, green, red, black, gold
- abbreviations: w, b, g, r, k, o
"""
key = raw.lower()
# full color names first
if key in BASE_COLORS:
return key # type: ignore[return-value]
# abbreviations
if key in COLOR_ABBR_TO_FULL:
return COLOR_ABBR_TO_FULL[key]
error = f"Unknown color: {raw}"
raise ValueError(error)
def format_cost(cost: Mapping[GemColor, int]) -> str:
"""Format a cost/requirements dict as colored tokens like 'B:2, R:1'.
Uses `color_token` internally so colors are guaranteed to match your bank.
"""
parts: list[str] = []
for color in GEM_COLORS:
n = cost.get(color, 0)
if not n:
continue
# color_token gives us e.g. "[blue]blue: 3[/]"
token = color_token(color, n)
# Turn the leading color name into the abbreviation (blue: 3 → B:3)
# We only replace the first occurrence.
full = f"{color}:"
abbr = f"{COST_ABBR[color]}:"
token = token.replace(full, abbr, 1)
parts.append(token)
return ", ".join(parts) if parts else "-"
def format_card(card: Card) -> str:
"""Readable card line using dataclass fields instead of __str__."""
color_abbr = COST_ABBR[card.color]
header = f"T{card.tier} {color_abbr} P{card.points}"
cost_str = format_cost(card.cost)
return f"{header} ({cost_str})"
def format_noble(noble: Noble) -> str:
"""Readable noble line using dataclass fields instead of __str__."""
cost_str = format_cost(noble.requirements)
return f"{noble.name} +{noble.points} ({cost_str})"
def format_tokens(tokens: Mapping[GemColor, int]) -> str:
"""Colored 'color: n' list for a token dict."""
return " ".join(color_token(c, tokens.get(c, 0)) for c in GEM_COLORS)
def format_discounts(discounts: Mapping[GemColor, int]) -> str:
"""Colored discounts, skipping zeros."""
parts: list[str] = []
for c in GEM_COLORS:
n = discounts.get(c, 0)
if not n:
continue
abbr = COST_ABBR[c]
fg, bg = COLOR_STYLE[c]
parts.append(f"[{fg} on {bg}]{abbr}:{n}[/{fg} on {bg}]")
return ", ".join(parts) if parts else "-"
COLOR_STYLE: dict[GemColor, tuple[str, str]] = {
"white": ("black", "white"), # fg, bg
"blue": ("bright_white", "blue"),
"green": ("bright_white", "sea_green4"),
"red": ("white", "red3"),
"black": ("white", "grey0"),
"gold": ("black", "yellow3"),
}
def fmt_gem(color: GemColor) -> str:
"""Render gem name with fg/bg matching real token color."""
fg, bg = COLOR_STYLE[color]
return f"[{fg} on {bg}] {color} [/{fg} on {bg}]"
def fmt_number(value: int) -> str:
"""Return a Rich-markup colored 'value' string."""
return f"[bold cyan]{value}[/]"
def color_token(name: GemColor, amount: int) -> str:
"""Return a Rich-markup colored 'name: n' string."""
# Map Splendor colors -> terminal colors
color_map: Mapping[GemColor, str] = {
"white": "white",
"blue": "blue",
"green": "green",
"red": "red",
"black": "grey70", # 'black' is unreadable on dark backgrounds
"gold": "yellow",
}
style = color_map.get(name, "white")
return f"[{style}]{name}: {amount}[/]"
class Board(Widget):
"""Big board widget with the layout you sketched."""
def __init__(self, game: GameState, me: PlayerState, **kwargs: Any) -> None: # noqa: ANN401
"""Initialize the board widget."""
super().__init__(**kwargs)
self.game = game
self.me = me
def compose(self) -> ComposeResult:
"""Compose the board widget."""
# Structure:
# ┌ bank row
# ├ middle row (tiers | nobles)
# └ players row
with Vertical(id="board_root"):
yield Static(id="bank_box")
with Horizontal(id="middle_row"):
with Vertical(id="tiers_box"):
yield Static(id="tier1_box")
yield Static(id="tier2_box")
yield Static(id="tier3_box")
yield Static(id="nobles_box")
yield Static(id="players_box")
def on_mount(self) -> None:
"""Refresh the board content."""
self.refresh_content()
def refresh_content(self) -> None:
"""Refresh the board content."""
self._render_bank()
self._render_tiers()
self._render_nobles()
self._render_players()
# --- sections ----------------------------------------------------
def _render_bank(self) -> None:
bank = self.game.bank
parts: list[str] = ["[b]Bank:[/b]"]
# One line, all tokens colored
parts.append(format_tokens(bank))
self.query_one("#bank_box", Static).update("\n".join(parts))
def _render_tiers(self) -> None:
for tier in (1, 2, 3):
box = self.query_one(f"#tier{tier}_box", Static)
cards: list[Card] = self.game.table_by_tier.get(tier, [])
lines: list[str] = [f"[b]Tier {tier} cards:[/b]"]
if not cards:
lines.append(" (none)")
else:
for idx, card in enumerate(cards):
lines.append(f" [{idx}] {format_card(card)}")
box.update("\n".join(lines))
def _render_nobles(self) -> None:
nobles_box = self.query_one("#nobles_box", Static)
lines: list[str] = ["[b]Nobles[/b]"]
if not self.game.available_nobles:
lines.append(" (none)")
else:
lines.extend(" - " + format_noble(noble) for noble in self.game.available_nobles)
nobles_box.update("\n".join(lines))
def _render_players(self) -> None:
players_box = self.query_one("#players_box", Static)
lines: list[str] = ["[b]Players:[/b]", ""]
for player in self.game.players:
mark = "*" if player is self.me else " "
token_str = format_tokens(player.tokens)
discount_str = format_discounts(player.discounts)
lines.append(
f"{mark} {player.name:10} Score={player.score:2d} Discounts={discount_str}",
)
lines.append(f" Tokens: {token_str}")
if player.nobles:
noble_names = ", ".join(n.name for n in player.nobles)
lines.append(f" Nobles: {noble_names}")
# Optional: show counts of cards / reserved
if player.cards:
lines.append(f" Cards: {len(player.cards)}")
if player.reserved:
lines.append(f" Reserved: {len(player.reserved)}")
lines.append("")
players_box.update("\n".join(lines))
class ActionApp(App[None]):
"""Textual app that asks for a single action command and returns an Action."""
CSS = """
Screen {
/* 3 rows: command zone, board, footer */
layout: grid;
grid-size: 1 3;
grid-rows: auto 1fr auto;
}
/* Top area with input + instructions */
#command_zone {
grid-columns: 1;
grid-rows: 1;
padding: 1 1;
}
/* Board sits in the middle row and can grow */
#board {
grid-columns: 1;
grid-rows: 2;
padding: 0 1 1 1;
}
Footer {
grid-columns: 1;
grid-rows: 3;
}
Input {
border: round $accent;
}
/* === Board layout === */
#board_root {
/* outer frame around the whole board area */
border: heavy white;
padding: 0 1;
}
/* Bank row: full width */
#bank_box {
border: heavy white;
padding: 0 1;
}
/* Middle row: tiers (left) + nobles (right) */
#middle_row {
layout: horizontal;
}
#tiers_box {
border: heavy white;
padding: 0 1;
width: 70%;
}
#tier1_box,
#tier2_box,
#tier3_box {
border-bottom: heavy white;
padding: 0 0 1 0;
margin-bottom: 1;
}
#nobles_box {
border: heavy white;
padding: 0 1;
width: 30%;
}
/* Players row: full width at bottom */
#players_box {
border: heavy white;
padding: 0 1;
}
"""
def __init__(self, game: GameState, player: PlayerState) -> None:
"""Initialize the action app."""
super().__init__()
self.game = game
self.player = player
self.result: Action | None = None
self.message: str = ""
def compose(self) -> ComposeResult:
"""Compose the action app."""
# Row 1: input + Actions text
with Vertical(id="command_zone"):
yield Input(
placeholder="Enter command, e.g. '1 white blue red' or '1 w b r' or 'q'",
id="input_line",
)
yield Static("", id="prompt")
# Row 2: board
yield Board(self.game, self.player, id="board")
# Row 3: footer
yield Footer()
def on_mount(self) -> None:
"""Mount the action app."""
self._update_prompt()
self.query_one(Input).focus()
def _update_prompt(self) -> None:
lines: list[str] = []
lines.append("[bold underline]Actions:[/]")
lines.append(
" [bold green]1[/] <colors...> - Take up to 3 different gem colors "
"(e.g. [cyan]1 white blue red[/] or [cyan]1 w b r[/])",
)
lines.append(
f" [bold green]2[/] <color> - Take 2 of the same color (needs {fmt_number(4)} in bank, "
"e.g. [cyan]2 blue[/] or [cyan]2 b[/])",
)
lines.append(
" [bold green]3[/] <tier> <idx> - Buy a face-up card (e.g. [cyan]3 1 0[/] for tier 1, index 0)",
)
lines.append(" [bold green]4[/] <idx> - Buy a reserved card")
lines.append(" [bold green]5[/] <tier> <idx> - Reserve a face-up card")
lines.append(" [bold green]6[/] <tier> - Reserve top card of a deck")
lines.append(" [bold red]q[/] - Quit game")
if self.message:
lines.append("")
lines.append(f"[bold red]Message:[/] {self.message}")
self.query_one("#prompt", Static).update("\n".join(lines))
def _cmd_1(self, parts: list[str]) -> str | None:
"""Take up to 3 different gem colors: 1 white blue red OR 1 w b r."""
color_names = parts[1:]
if not color_names:
return "Need at least one color (full name or abbreviation)."
colors: list[GemColor] = []
for name in color_names:
color = parse_color_token(name)
if self.game.bank[color] <= 0:
return f"No tokens left for color: {color}"
colors.append(color)
self.result = TakeDifferent(colors=colors[:3])
self.exit()
return None
def _cmd_2(self, parts: list[str]) -> str | None:
"""Take two of the same color."""
if len(parts) < 2:
return "Usage: 2 <color>"
color = parse_color_token(parts[1])
if self.game.bank[color] < self.game.config.minimum_tokens_to_buy_2:
return "Bank must have at least 4 of that color."
self.result = TakeDouble(color=color)
self.exit()
return None
def _cmd_3(self, parts: list[str]) -> str | None:
"""Buy face-up card."""
if len(parts) < 3:
return "Usage: 3 <tier> <index>"
tier = int(parts[1])
idx = int(parts[2])
self.result = BuyCard(tier=tier, index=idx)
self.exit()
return None
def _cmd_4(self, parts: list[str]) -> str | None:
"""Buy reserved card."""
if len(parts) < 2:
return "Usage: 4 <reserved_index>"
idx = int(parts[1])
if not (0 <= idx < len(self.player.reserved)):
return "Reserved index out of range."
self.result = BuyCardReserved(tier=0, index=idx)
self.exit()
return None
def _cmd_5(self, parts: list[str]) -> str | None:
"""Reserve face-up card."""
if len(parts) < 3:
return "Usage: 5 <tier> <index>"
tier = int(parts[1])
idx = int(parts[2])
self.result = ReserveCard(tier=tier, index=idx, from_deck=False)
self.exit()
return None
def _cmd_6(self, parts: list[str]) -> str | None:
"""Reserve top of deck."""
if len(parts) < 2:
return "Usage: 6 <tier>"
tier = int(parts[1])
self.result = ReserveCard(tier=tier, index=None, from_deck=True)
self.exit()
return None
def _unknown_cmd(self, _parts: list[str]) -> str:
return "Unknown command."
def on_input_submitted(self, event: Input.Submitted) -> None:
"""Handle user input."""
text = (event.value or "").strip()
event.input.value = ""
if not text:
return
if text.lower() in {"q", "quit", "0"}:
self.result = None
self.exit()
return
parts = text.split()
cmds = {
"1": self._cmd_1,
"2": self._cmd_2,
"3": self._cmd_3,
"4": self._cmd_4,
"5": self._cmd_5,
"6": self._cmd_6,
}
cmd = parts[0]
error = cmds.get(cmd, self._unknown_cmd)(parts)
if error:
self.message = error
self._update_prompt()
return
class DiscardApp(App[None]):
"""Textual app to choose discards when over token limit."""
CSS = """
Screen {
layout: vertical;
}
#command_zone {
padding: 1 1;
}
#board {
padding: 0 1 1 1;
}
Input {
border: round $accent;
}
"""
def __init__(self, game: GameState, player: PlayerState) -> None:
"""Initialize the discard app."""
super().__init__()
self.game = game
self.player = player
self.discards: dict[GemColor, int] = dict.fromkeys(GEM_COLORS, 0)
self.message: str = ""
def compose(self) -> ComposeResult: # type: ignore[override]
"""Compose the discard app."""
yield Header(show_clock=False)
with Vertical(id="command_zone"):
yield Input(
placeholder="Enter color to discard, e.g. 'blue' or 'b'",
id="input_line",
)
yield Static("", id="prompt")
# Board directly under the command zone
yield Board(self.game, self.player, id="board")
yield Footer()
def on_mount(self) -> None: # type: ignore[override]
"""Mount the discard app."""
self._update_prompt()
self.query_one(Input).focus()
def _remaining_to_discard(self) -> int:
return self.player.total_tokens() - sum(self.discards.values()) - self.game.config.token_limit
def _update_prompt(self) -> None:
remaining = max(self._remaining_to_discard(), 0)
lines: list[str] = []
lines.append(
"You must discard "
f"{fmt_number(remaining)} token(s) "
f"to get down to {fmt_number(self.game.config.token_limit)}.",
)
disc_str = ", ".join(f"{fmt_gem(c)}={fmt_number(self.discards[c])}" for c in GEM_COLORS)
lines.append(f"Current planned discards: {{ {disc_str} }}")
lines.append(
"Type a color name or abbreviation (e.g. 'blue' or 'b') to discard one token.",
)
if self.message:
lines.append("")
lines.append(f"[bold red]Message:[/] {self.message}")
self.query_one("#prompt", Static).update("\n".join(lines))
def on_input_submitted(self, event: Input.Submitted) -> None: # type: ignore[override]
"""Handle user input."""
raw = (event.value or "").strip()
event.input.value = ""
if not raw:
return
try:
color = parse_color_token(raw)
except ValueError:
self.message = f"Unknown color: {raw}"
self._update_prompt()
return
available = self.player.tokens[color] - self.discards[color]
if available <= 0:
self.message = f"No more {color} tokens available to discard."
self._update_prompt()
return
self.discards[color] += 1
if self._remaining_to_discard() <= 0:
self.exit()
return
self.message = ""
self._update_prompt()
# ---------------------------------------------------------------------------
# Noble choice app
# ---------------------------------------------------------------------------
class NobleChoiceApp(App[None]):
"""Textual app to choose one noble."""
CSS = """
Screen {
layout: vertical;
}
#command_zone {
padding: 1 1;
}
#board {
padding: 0 1 1 1;
}
Input {
border: round $accent;
}
"""
def __init__(
self,
game: GameState,
player: PlayerState,
nobles: list[Noble],
) -> None:
"""Initialize the noble choice app."""
super().__init__()
self.game = game
self.player = player
self.nobles = nobles
self.result: Noble | None = None
self.message: str = ""
def compose(self) -> ComposeResult: # type: ignore[override]
"""Compose the noble choice app."""
yield Header(show_clock=False)
with Vertical(id="command_zone"):
yield Input(
placeholder="Enter noble index, e.g. '0'",
id="input_line",
)
yield Static("", id="prompt")
# Board directly under the command zone
yield Board(self.game, self.player, id="board")
yield Footer()
def on_mount(self) -> None: # type: ignore[override]
"""Mount the noble choice app."""
self._update_prompt()
self.query_one(Input).focus()
def _update_prompt(self) -> None:
lines: list[str] = []
lines.append("[bold underline]You qualify for nobles:[/]")
for i, noble in enumerate(self.nobles):
lines.append(f" [bright_cyan]{i})[/] {format_noble(noble)}")
lines.append("Enter the index of the noble you want.")
if self.message:
lines.append("")
lines.append(f"[bold red]Message:[/] {self.message}")
self.query_one("#prompt", Static).update("\n".join(lines))
def on_input_submitted(self, event: Input.Submitted) -> None: # type: ignore[override]
"""Handle user input."""
raw = (event.value or "").strip()
event.input.value = ""
if not raw:
return
try:
idx = int(raw)
except ValueError:
self.message = "Please enter a valid integer index."
self._update_prompt()
return
if not (0 <= idx < len(self.nobles)):
self.message = "Index out of range."
self._update_prompt()
return
self.result = self.nobles[idx]
self.exit()
class TuiHuman(Strategy):
"""Textual-based human player Strategy with colorful board."""
def choose_action(
self,
game: GameState,
player: PlayerState,
) -> Action | None:
"""Choose an action for the player."""
if not sys.stdout.isatty():
return None
app = ActionApp(game, player)
app.run()
return app.result
def choose_discard(
self,
game: GameState,
player: PlayerState,
excess: int, # noqa: ARG002
) -> dict[GemColor, int]:
"""Choose tokens to discard."""
if not sys.stdout.isatty():
return dict.fromkeys(GEM_COLORS, 0)
app = DiscardApp(game, player)
app.run()
return app.discards
def choose_noble(
self,
game: GameState,
player: PlayerState,
nobles: list[Noble],
) -> Noble:
"""Choose a noble for the player."""
if not sys.stdout.isatty():
return nobles[0]
app = NobleChoiceApp(game, player, nobles)
app.run()
return app.result

View File

@@ -1,19 +0,0 @@
"""Main entry point for Splendor game."""
from __future__ import annotations
from .base import new_game, run_game
from .bot import RandomBot
from .human import TuiHuman
def main() -> None:
"""Main entry point."""
human = TuiHuman()
bot = RandomBot()
game_state = new_game(["You", "Bot A"])
run_game(game_state, [human, bot])
if __name__ == "__main__":
main()

View File

@@ -1,111 +0,0 @@
"""Public state for RL/search."""
from __future__ import annotations
from dataclasses import dataclass
from .base import (
BASE_COLORS,
BASE_INDEX,
GEM_ORDER,
Card,
GameState,
Noble,
PlayerState,
)
@dataclass(frozen=True)
class ObsCard:
"""Numeric-ish card view for RL/search."""
tier: int
points: int
color_index: int
cost: list[int]
@dataclass(frozen=True)
class ObsNoble:
"""Numeric-ish noble view for RL/search."""
points: int
requirements: list[int]
@dataclass(frozen=True)
class ObsPlayer:
"""Numeric-ish player view for RL/search."""
tokens: list[int]
discounts: list[int]
score: int
cards: list[ObsCard]
reserved: list[ObsCard]
nobles: list[ObsNoble]
@dataclass(frozen=True)
class Observation:
"""Full public state for RL/search."""
current_player: int
bank: list[int]
players: list[ObsPlayer]
table_by_tier: dict[int, list[ObsCard]]
decks_remaining: dict[int, int]
available_nobles: list[ObsNoble]
def _encode_card(card: Card) -> ObsCard:
color_index = BASE_INDEX.get(card.color, -1)
cost_vec = [card.cost.get(c, 0) for c in BASE_COLORS]
return ObsCard(
tier=card.tier,
points=card.points,
color_index=color_index,
cost=cost_vec,
)
def _encode_noble(noble: Noble) -> ObsNoble:
req_vec = [noble.requirements.get(c, 0) for c in BASE_COLORS]
return ObsNoble(
points=noble.points,
requirements=req_vec,
)
def _encode_player(player: PlayerState) -> ObsPlayer:
tokens_vec = [player.tokens[c] for c in GEM_ORDER]
discounts_vec = [player.discounts[c] for c in GEM_ORDER]
cards_enc = [_encode_card(c) for c in player.cards]
reserved_enc = [_encode_card(c) for c in player.reserved]
nobles_enc = [_encode_noble(n) for n in player.nobles]
return ObsPlayer(
tokens=tokens_vec,
discounts=discounts_vec,
score=player.score,
cards=cards_enc,
reserved=reserved_enc,
nobles=nobles_enc,
)
def to_observation(game: GameState) -> Observation:
"""Create a structured observation of the full public state."""
bank_vec = [game.bank[c] for c in GEM_ORDER]
players_enc = [_encode_player(p) for p in game.players]
table_enc: dict[int, list[ObsCard]] = {
tier: [_encode_card(c) for c in row] for tier, row in game.table_by_tier.items()
}
decks_remaining = {tier: len(deck) for tier, deck in game.decks_by_tier.items()}
nobles_enc = [_encode_noble(n) for n in game.available_nobles]
return Observation(
current_player=game.current_player_index,
bank=bank_vec,
players=players_enc,
table_by_tier=table_enc,
decks_remaining=decks_remaining,
available_nobles=nobles_enc,
)

View File

@@ -1,36 +0,0 @@
"""Simulate a step in the game."""
from __future__ import annotations
import copy
from .base import Action, GameState, PlayerState, apply_action, check_nobles_for_player
from .bot import RandomBot
class SimStrategy(RandomBot):
"""Strategy used in simulate_step.
We never call choose_action here (caller chooses actions),
but we reuse discard/noble-selection logic.
"""
def choose_action(self, game: GameState, player: PlayerState) -> Action | None: # noqa: ARG002
"""Choose an action for the current player."""
msg = "SimStrategy.choose_action should not be used in simulate_step"
raise RuntimeError(msg)
def simulate_step(game: GameState, action: Action) -> GameState:
"""Return a deep-copied next state after applying action for the current player.
Useful for tree search / MCTS:
next_state = simulate_step(state, action)
"""
next_state = copy.deepcopy(game)
sim_strategy = SimStrategy()
apply_action(next_state, sim_strategy, action)
check_nobles_for_player(next_state, sim_strategy, next_state.current_player)
next_state.next_player()
return next_state

View File

@@ -1,50 +0,0 @@
"""Simulator for Splendor game."""
from __future__ import annotations
from collections import defaultdict
from pathlib import Path
from statistics import mean
from .base import GameConfig, load_cards, load_nobles, new_game, run_game
from .bot import PersonalizedBot4, RandomBot
def main() -> None:
"""Main entry point."""
turn_limit = 1000
good_games = 0
games = 1
winners: dict[str, list] = defaultdict(list)
game_data = Path(__file__).parent / "game_data"
cards = load_cards(game_data / "cards/default.json")
nobles = load_nobles(game_data / "nobles/default.json")
for _ in range(games):
bot_a = RandomBot("bot_a")
bot_b = RandomBot("bot_b")
bot_c = RandomBot("bot_c")
bot_d = PersonalizedBot4("my_bot")
config = GameConfig(
cards=cards,
nobles=nobles,
turn_limit=turn_limit,
)
players = (bot_a, bot_b, bot_c, bot_d)
game_state = new_game(players, config)
winner, turns = run_game(game_state)
if turns < turn_limit:
good_games += 1
winners[winner.strategy.name].append(turns)
print(
f"out of {games} {turn_limit} turn games with {len(players)}"
f"random bots there where {good_games} games where a bot won"
)
for name, turns in winners.items():
print(f"{name} won {len(turns)} games in {mean(turns):.2f} turns")
if __name__ == "__main__":
main()

View File

@@ -1,111 +0,0 @@
"""Safe reboot helper."""
from __future__ import annotations
import logging
import sys
from pathlib import Path
from typing import TYPE_CHECKING, Annotated
import typer
from python.common import bash_wrapper, configure_logger
from python.zfs import Dataset, get_datasets
if TYPE_CHECKING:
from collections.abc import Sequence
logger = logging.getLogger(__name__)
def get_root_pool_datasets(dataset_prefix: str) -> list[Dataset]:
"""Return datasets that start with the provided prefix."""
return [dataset for dataset in get_datasets() if dataset.name.startswith(dataset_prefix)]
def get_non_executable_datasets(datasets: Sequence[Dataset]) -> list[str]:
"""Return dataset names that have exec disabled."""
return [dataset.name for dataset in datasets if dataset.exec.lower() != "on"]
def drive_present(drive: str) -> bool:
"""Check whether the provided drive exists."""
drive_path = drive.strip()
if not drive_path:
error = "Drive path cannot be empty"
raise ValueError(error)
return Path(drive_path).exists()
def reboot_system() -> None:
"""Call systemctl reboot."""
output, return_code = bash_wrapper("systemctl reboot")
if return_code != 0:
raise RuntimeError(output.strip() or "Failed to issue reboot command")
def validate_state(drive: str | None, dataset_prefix: str) -> list[str]:
"""Validate dataset and drive state."""
datasets = get_root_pool_datasets(dataset_prefix)
errors: list[str] = []
if not datasets:
errors.append(f"No datasets found with prefix {dataset_prefix}")
else:
non_exec_datasets = get_non_executable_datasets(datasets)
if non_exec_datasets:
errors.append(f"Datasets missing exec=on: {', '.join(non_exec_datasets)}")
if drive:
try:
if not drive_present(drive):
errors.append(f"Drive {drive} is not present")
except ValueError as err:
errors.append(str(err))
return errors
def reboot(
drive: Annotated[str | None, typer.Argument(help="Drive that must exist before rebooting.")] = None,
dataset_prefix: Annotated[
str,
typer.Option(
"--dataset-prefix",
"-p",
help="Datasets with this prefix are validated.",
),
] = "root_pool/",
dry_run: Annotated[
bool,
typer.Option(
"--check-only",
help="Only validate state without issuing the reboot command.",
),
] = False,
) -> None:
"""Validate datasets and drive before rebooting."""
configure_logger()
logger.info("Starting safe reboot checks")
if errors := validate_state(drive, dataset_prefix):
for error in errors:
logger.error(error)
sys.exit(1)
if dry_run:
logger.info("All checks passed")
return
logger.info("All checks passed, issuing reboot")
reboot_system()
def cli() -> None:
"""CLI entry point."""
typer.run(reboot)
if __name__ == "__main__":
cli()

View File

@@ -9,7 +9,6 @@
nix
home-manager
git
my_python
ssh-to-age
gnupg

View File

@@ -1,19 +1,18 @@
{ inputs, ... }:
{
imports = [
"${inputs.self}/users/richie"
"${inputs.self}/users/gaming"
"${inputs.self}/common/global"
"${inputs.self}/common/optional/desktop.nix"
"${inputs.self}/common/optional/docker.nix"
"${inputs.self}/common/optional/scanner.nix"
"${inputs.self}/common/optional/steam.nix"
"${inputs.self}/common/optional/syncthing_base.nix"
"${inputs.self}/common/optional/systemd-boot.nix"
"${inputs.self}/common/optional/update.nix"
"${inputs.self}/common/optional/yubikey.nix"
"${inputs.self}/common/optional/zerotier.nix"
"${inputs.self}/common/optional/nvidia.nix"
../../users/richie
../../users/gaming
../../common/global
../../common/optional/desktop.nix
../../common/optional/docker.nix
../../common/optional/scanner.nix
../../common/optional/steam.nix
../../common/optional/syncthing_base.nix
../../common/optional/systemd-boot.nix
../../common/optional/update.nix
../../common/optional/yubikey.nix
../../common/optional/zerotier.nix
../../common/optional/nvidia.nix
./hardware.nix
./syncthing.nix
./llms.nix

View File

@@ -1,14 +1,14 @@
{ inputs, ... }:
{
imports = [
"${inputs.self}/users/richie"
"${inputs.self}/common/global"
"${inputs.self}/common/optional/docker.nix"
"${inputs.self}/common/optional/ssh_decrypt.nix"
"${inputs.self}/common/optional/syncthing_base.nix"
"${inputs.self}/common/optional/systemd-boot.nix"
"${inputs.self}/common/optional/update.nix"
"${inputs.self}/common/optional/zerotier.nix"
../../users/richie
../../common/global
../../common/optional/docker.nix
../../common/optional/ssh_decrypt.nix
../../common/optional/syncthing_base.nix
../../common/optional/systemd-boot.nix
../../common/optional/update.nix
../../common/optional/zerotier.nix
./docker
./hardware.nix
./programs.nix

View File

@@ -19,6 +19,7 @@
"192.168.95.35"
"127.0.0.1"
];
use_x_forwarded_for = true;
};
homeassistant = {
time_zone = "America/New_York";
@@ -59,17 +60,16 @@
};
extraPackages =
python3Packages: with python3Packages; [
aioesphomeapi # for esphome
bleak-esphome # for esphome
esphome-dashboard-api # for esphome
forecast-solar # for solar forecast
pymodbus # for modbus
gtts # not sure what wants this
jellyfin-apiclient-python # for jellyfin
paho-mqtt # for mqtt
psycopg2 # for postgresql
forecast-solar # for solar forecast
aioesphomeapi # for esphome
esphome-dashboard-api # for esphome
py-improv-ble-client # for esphome
pymodbus # for modbus
pyopenweathermap # for weather
bleak-esphome # for esphome
];
extraComponents = [ "isal" ];
};

View File

@@ -77,7 +77,7 @@ modbus:
# GPS
- name: GPS Latitude
slave: 1
slave: 100
address: 2800
input_type: holding
data_type: int32
@@ -87,7 +87,7 @@ modbus:
unique_id: gps_latitude
- name: GPS Longitude
slave: 1
slave: 100
address: 2802
input_type: holding
data_type: int32
@@ -97,7 +97,7 @@ modbus:
unique_id: gps_longitude
- name: GPS Course
slave: 1
slave: 100
address: 2804
input_type: holding
data_type: uint16
@@ -108,7 +108,7 @@ modbus:
unique_id: gps_course
- name: GPS Speed
slave: 1
slave: 100
address: 2805
input_type: holding
data_type: uint16
@@ -119,7 +119,7 @@ modbus:
unique_id: gps_speed
- name: GPS Fix
slave: 1
slave: 100
address: 2806
input_type: holding
data_type: uint16
@@ -128,7 +128,7 @@ modbus:
unique_id: gps_fix
- name: GPS Satellites
slave: 1
slave: 100
address: 2807
input_type: holding
data_type: uint16
@@ -137,7 +137,7 @@ modbus:
unique_id: gps_satellites
- name: GPS Altitude
slave: 1
slave: 100
address: 2808
input_type: holding
data_type: int32

View File

@@ -1,18 +1,17 @@
{ inputs, ... }:
let
vars = import ./vars.nix;
in
{
imports = [
"${inputs.self}/users/richie"
"${inputs.self}/users/math"
"${inputs.self}/users/dov"
"${inputs.self}/common/global"
"${inputs.self}/common/optional/docker.nix"
"${inputs.self}/common/optional/ssh_decrypt.nix"
"${inputs.self}/common/optional/syncthing_base.nix"
"${inputs.self}/common/optional/update.nix"
"${inputs.self}/common/optional/zerotier.nix"
../../users/richie
../../users/math
../../users/dov
../../common/global
../../common/optional/docker.nix
../../common/optional/ssh_decrypt.nix
../../common/optional/syncthing_base.nix
../../common/optional/update.nix
../../common/optional/zerotier.nix
./docker
./services
./hardware.nix

View File

@@ -1,6 +1,7 @@
{
config,
pkgs,
lib,
...
}:

View File

@@ -68,7 +68,7 @@ in
jellyfin-apiclient-python
psycopg2
pymetno
aio-ownet
pyownet
rokuecp
uiprotect
wakeonlan

View File

@@ -1,6 +1,6 @@
{
pkgs,
inputs,
pkgs,
...
}:
let
@@ -22,13 +22,10 @@ in
wantedBy = [ "multi-user.target" ];
description = "validates startup";
path = [ pkgs.zfs ];
environment = {
PYTHONPATH = "${inputs.self}/";
};
serviceConfig = {
EnvironmentFile = "${vars.secrets}/services/server-validation";
Type = "oneshot";
ExecStart = "${pkgs.my_python}/bin/python -m python.system_tests.validate_system '${./validate_system.toml}'";
ExecStart = "${inputs.system_tools.packages.x86_64-linux.default}/bin/validate_system '${./validate_system.toml}'";
};
};
};

View File

@@ -1,14 +1,14 @@
{ inputs, ... }:
{
imports = [
"${inputs.self}/users/elise"
"${inputs.self}/users/richie"
"${inputs.self}/common/global"
"${inputs.self}/common/optional/desktop.nix"
"${inputs.self}/common/optional/steam.nix"
"${inputs.self}/common/optional/systemd-boot.nix"
"${inputs.self}/common/optional/update.nix"
"${inputs.self}/common/optional/zerotier.nix"
../../users/elise
../../users/richie
../../common/global
../../common/optional/desktop.nix
../../common/optional/steam.nix
../../common/optional/systemd-boot.nix
../../common/optional/update.nix
../../common/optional/zerotier.nix
./hardware.nix
inputs.nixos-hardware.nixosModules.framework-13-7040-amd
];

View File

@@ -1,15 +1,15 @@
{ inputs, ... }:
{
imports = [
"${inputs.self}/users/richie"
"${inputs.self}/common/global"
"${inputs.self}/common/optional/desktop.nix"
"${inputs.self}/common/optional/docker.nix"
"${inputs.self}/common/optional/steam.nix"
"${inputs.self}/common/optional/syncthing_base.nix"
"${inputs.self}/common/optional/systemd-boot.nix"
"${inputs.self}/common/optional/yubikey.nix"
"${inputs.self}/common/optional/zerotier.nix"
../../users/richie
../../common/global
../../common/optional/desktop.nix
../../common/optional/docker.nix
../../common/optional/steam.nix
../../common/optional/syncthing_base.nix
../../common/optional/systemd-boot.nix
../../common/optional/yubikey.nix
../../common/optional/zerotier.nix
./hardware.nix
./llms.nix
./syncthing.nix

258
temp_flake.lock Normal file
View File

@@ -0,0 +1,258 @@
{
"nodes": {
"firefox-addons": {
"inputs": {
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"dir": "pkgs/firefox-addons",
"lastModified": 1757649814,
"narHash": "sha256-VjtA+fqkraKHbGzjKJBPfDj+SXysXiR4SrghTr10HoY=",
"owner": "rycee",
"repo": "nur-expressions",
"rev": "789920825fc982a93a2bf91a714367fa8f7ea0a6",
"type": "gitlab"
},
"original": {
"dir": "pkgs/firefox-addons",
"owner": "rycee",
"repo": "nur-expressions",
"type": "gitlab"
}
},
"home-manager": {
"inputs": {
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1757698511,
"narHash": "sha256-UqHHGydF/q3jfYXCpvYLA0TWtvByOp1NwOKCUjhYmPs=",
"owner": "nix-community",
"repo": "home-manager",
"rev": "a3fcc92180c7462082cd849498369591dfb20855",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "home-manager",
"type": "github"
}
},
"nixos-hardware": {
"locked": {
"lastModified": 1757103352,
"narHash": "sha256-PtT7ix43ss8PONJ1VJw3f6t2yAoGH+q462Sn8lrmWmk=",
"owner": "nixos",
"repo": "nixos-hardware",
"rev": "11b2a10c7be726321bb854403fdeec391e798bf0",
"type": "github"
},
"original": {
"owner": "nixos",
"ref": "master",
"repo": "nixos-hardware",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1757487488,
"narHash": "sha256-zwE/e7CuPJUWKdvvTCB7iunV4E/+G0lKfv4kk/5Izdg=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "ab0f3607a6c7486ea22229b92ed2d355f1482ee0",
"type": "github"
},
"original": {
"owner": "nixos",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs-master": {
"locked": {
"lastModified": 1757720853,
"narHash": "sha256-VBS5+YKIT8Aj81ZW+8Bg9MuYoI6OqO6HSrwG4dpHpW4=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "2ca437b4796d049192eb30576a50fef139038c09",
"type": "github"
},
"original": {
"owner": "nixos",
"ref": "master",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs-stable": {
"locked": {
"lastModified": 1735563628,
"narHash": "sha256-OnSAY7XDSx7CtDoqNh8jwVwh4xNL/2HaJxGjryLWzX8=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "b134951a4c9f3c995fd7be05f3243f8ecd65d798",
"type": "github"
},
"original": {
"owner": "nixos",
"ref": "nixos-24.05",
"repo": "nixpkgs",
"type": "github"
}
},
"pyproject-build-systems": {
"inputs": {
"nixpkgs": [
"system_tools",
"nixpkgs"
],
"pyproject-nix": [
"system_tools",
"pyproject-nix"
],
"uv2nix": [
"system_tools",
"uv2nix"
]
},
"locked": {
"lastModified": 1744599653,
"narHash": "sha256-nysSwVVjG4hKoOjhjvE6U5lIKA8sEr1d1QzEfZsannU=",
"owner": "pyproject-nix",
"repo": "build-system-pkgs",
"rev": "7dba6dbc73120e15b558754c26024f6c93015dd7",
"type": "github"
},
"original": {
"owner": "pyproject-nix",
"repo": "build-system-pkgs",
"type": "github"
}
},
"pyproject-nix": {
"inputs": {
"nixpkgs": [
"system_tools",
"nixpkgs"
]
},
"locked": {
"lastModified": 1746540146,
"narHash": "sha256-QxdHGNpbicIrw5t6U3x+ZxeY/7IEJ6lYbvsjXmcxFIM=",
"owner": "pyproject-nix",
"repo": "pyproject.nix",
"rev": "e09c10c24ebb955125fda449939bfba664c467fd",
"type": "github"
},
"original": {
"owner": "pyproject-nix",
"repo": "pyproject.nix",
"type": "github"
}
},
"root": {
"inputs": {
"firefox-addons": "firefox-addons",
"home-manager": "home-manager",
"nixos-hardware": "nixos-hardware",
"nixpkgs": "nixpkgs",
"nixpkgs-master": "nixpkgs-master",
"nixpkgs-stable": "nixpkgs-stable",
"sops-nix": "sops-nix",
"system_tools": "system_tools",
"systems": "systems"
}
},
"sops-nix": {
"inputs": {
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1757503115,
"narHash": "sha256-S9F6bHUBh+CFEUalv/qxNImRapCxvSnOzWBUZgK1zDU=",
"owner": "Mic92",
"repo": "sops-nix",
"rev": "0bf793823386187dff101ee2a9d4ed26de8bbf8c",
"type": "github"
},
"original": {
"owner": "Mic92",
"repo": "sops-nix",
"type": "github"
}
},
"system_tools": {
"inputs": {
"nixpkgs": [
"nixpkgs"
],
"pyproject-build-systems": "pyproject-build-systems",
"pyproject-nix": "pyproject-nix",
"uv2nix": "uv2nix"
},
"locked": {
"lastModified": 1757910132,
"narHash": "sha256-6r45DD/tMN+hYgnMc2/c82Z0bb1A7FnI/nvU8kZf/Us=",
"owner": "RichieCahill",
"repo": "system_tools",
"rev": "d63c486fe3b76c24b2ed2fff33d6f54c847b50e8",
"type": "github"
},
"original": {
"owner": "RichieCahill",
"repo": "system_tools",
"type": "github"
}
},
"systems": {
"locked": {
"lastModified": 1689347949,
"narHash": "sha256-12tWmuL2zgBgZkdoB6qXZsgJEH9LR3oUgpaQq2RbI80=",
"owner": "nix-systems",
"repo": "default-linux",
"rev": "31732fcf5e8fea42e59c2488ad31a0e651500f68",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default-linux",
"type": "github"
}
},
"uv2nix": {
"inputs": {
"nixpkgs": [
"system_tools",
"nixpkgs"
],
"pyproject-nix": [
"system_tools",
"pyproject-nix"
]
},
"locked": {
"lastModified": 1747441483,
"narHash": "sha256-W8BFXk5R0TuJcjIhcGoMpSOaIufGXpizK0pm+uTqynA=",
"owner": "pyproject-nix",
"repo": "uv2nix",
"rev": "582024dc64663e9f88d467c2f7f7b20d278349de",
"type": "github"
},
"original": {
"owner": "pyproject-nix",
"repo": "uv2nix",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

View File

@@ -1,67 +0,0 @@
"""test_database."""
from __future__ import annotations
from typing import TYPE_CHECKING
import pytest
from sqlalchemy import Integer, String, create_engine, select
from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column, sessionmaker
from python.database import safe_insert
if TYPE_CHECKING:
from collections.abc import Generator
class TestingBase(DeclarativeBase):
"""TestingBase."""
class Item(TestingBase):
"""Item."""
__tablename__ = "items"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(50), nullable=False, unique=True)
@pytest.fixture
def session() -> Generator[Session]:
"""Fresh in-memory DB + tables for each test."""
engine = create_engine("sqlite+pysqlite:///:memory:", echo=False, future=True)
TestingBase.metadata.create_all(engine)
with sessionmaker(bind=engine, expire_on_commit=False, future=True)() as s:
yield s
def test_partial_failure_unique_constraint(session: Session) -> None:
"""Duplicate name should fail only for the conflicting row; others commit."""
objs = [Item(name="a"), Item(name="b"), Item(name="a"), Item(name="c")]
failures = safe_insert(objs, session)
assert len(failures) == 1
exc, failed_obj = failures[0]
assert isinstance(exc, Exception)
assert isinstance(failed_obj, Item)
assert failed_obj.name == "a"
rows = session.scalars(select(Item.name)).all()
assert sorted(rows) == ["a", "b", "c"]
assert rows.count("a") == 1
def test_all_good_inserts(session: Session) -> None:
"""No failures when all rows are valid."""
objs = [Item(name="x"), Item(name="y")]
failures = safe_insert(objs, session)
assert failures == []
rows = session.scalars(select(Item.name).where(Item.name.in_(("x", "y")))).all()
assert sorted(rows) == ["x", "y"]
def test_unmapped_object_raises(session: Session) -> None:
"""Non-ORM instances should raise TypeError immediately."""
with pytest.raises(TypeError):
safe_insert([object()], session)

View File

@@ -1,96 +0,0 @@
"""Tests for safe_reboot."""
from __future__ import annotations
from typing import TYPE_CHECKING
import pytest
from python.tools.safe_reboot import reboot
from python.zfs.dataset import Dataset
if TYPE_CHECKING:
from pytest_mock import MockerFixture
SAFE_REBOOT = "python.tools.safe_reboot"
def create_dataset(mocker: MockerFixture, name: str, exec_state: str) -> Dataset:
"""Create a mock dataset."""
dataset = mocker.MagicMock(spec=Dataset)
dataset.name = name
dataset.exec = exec_state
return dataset
def test_reboot_reboots_when_checks_pass(mocker: MockerFixture) -> None:
"""The command should reboot when all checks pass."""
dataset = create_dataset(mocker, "root_pool/root", "on")
mocker.patch(f"{SAFE_REBOOT}.get_datasets", return_value=(dataset,))
mocker.patch(f"{SAFE_REBOOT}.drive_present", return_value=True)
mock_bash = mocker.patch(f"{SAFE_REBOOT}.bash_wrapper", return_value=("", 0))
reboot("/dev/disk/root-drive")
mock_bash.assert_called_once_with("systemctl reboot")
def test_reboot_reboots_without_drive_requirement(mocker: MockerFixture) -> None:
"""The command should reboot even when no drive is provided."""
dataset = create_dataset(mocker, "root_pool/root", "on")
mocker.patch(f"{SAFE_REBOOT}.get_datasets", return_value=(dataset,))
mock_bash = mocker.patch(f"{SAFE_REBOOT}.bash_wrapper", return_value=("", 0))
reboot(None)
mock_bash.assert_called_once_with("systemctl reboot")
def test_reboot_errors_on_non_exec_dataset(mocker: MockerFixture) -> None:
"""The command should exit when a dataset lacks exec."""
dataset = create_dataset(mocker, "root_pool/root", "off")
mocker.patch(f"{SAFE_REBOOT}.get_datasets", return_value=(dataset,))
mocker.patch(f"{SAFE_REBOOT}.drive_present", return_value=True)
mocker.patch(f"{SAFE_REBOOT}.bash_wrapper", return_value=("", 0))
with pytest.raises(SystemExit) as excinfo:
reboot("/dev/disk/root-drive")
assert excinfo.value.code == 1
def test_reboot_errors_when_driver_missing(mocker: MockerFixture) -> None:
"""The command should exit when the requested driver is absent."""
dataset = create_dataset(mocker, "root_pool/root", "on")
mocker.patch(f"{SAFE_REBOOT}.get_datasets", return_value=(dataset,))
mocker.patch(f"{SAFE_REBOOT}.drive_present", return_value=False)
mocker.patch(f"{SAFE_REBOOT}.bash_wrapper", return_value=("", 0))
with pytest.raises(SystemExit) as excinfo:
reboot("/dev/disk/root-drive")
assert excinfo.value.code == 1
def test_reboot_errors_when_no_datasets_found(mocker: MockerFixture) -> None:
"""The command should exit when no datasets match the prefix."""
mocker.patch(f"{SAFE_REBOOT}.get_datasets", return_value=())
mocker.patch(f"{SAFE_REBOOT}.drive_present", return_value=True)
mocker.patch(f"{SAFE_REBOOT}.bash_wrapper", return_value=("", 0))
with pytest.raises(SystemExit) as excinfo:
reboot("/dev/disk/root-drive")
assert excinfo.value.code == 1
def test_reboot_check_only_skips_reboot(mocker: MockerFixture) -> None:
"""The command should only validate when --check-only is provided."""
dataset = create_dataset(mocker, "root_pool/root", "on")
mocker.patch(f"{SAFE_REBOOT}.get_datasets", return_value=(dataset,))
mocker.patch(f"{SAFE_REBOOT}.drive_present", return_value=True)
mock_bash = mocker.patch(f"{SAFE_REBOOT}.bash_wrapper", return_value=("", 0))
reboot("/dev/disk/root-drive", check_only=True)
mock_bash.assert_not_called()

View File

@@ -1,11 +1,9 @@
{
programs.git = {
enable = true;
settings = {
user = {
email = "dov.kruger@gmail.com";
name = "Dov Kruger";
};
userEmail = "dov.kruger@gmail.com";
userName = "Dov Kruger";
extraConfig = {
pull.rebase = true;
color.ui = true;
};

View File

@@ -1,11 +1,9 @@
{
programs.git = {
enable = true;
settings = {
user = {
email = "DumbPuppy208@gmail.com";
name = "Elise Corvidae";
};
userEmail = "DumbPuppy208@gmail.com";
userName = "Elise Corvidae";
extraConfig = {
pull.rebase = true;
color.ui = true;
};

View File

@@ -15,6 +15,7 @@
# graphics tools
gimp3
xcursorgen
xf86_input_wacom
# browser
chromium
firefox

View File

@@ -1,4 +1,4 @@
{ pkgs, ... }:
{ pkgs, inputs, ... }:
{
home.packages = with pkgs; [
# cli

View File

@@ -1,11 +1,9 @@
{
programs.git = {
enable = true;
settings = {
user = {
email = "matthew.michal11@gmail.com";
name = "Matthew Michal";
};
userEmail = "matthew.michal11@gmail.com";
userName = "Matthew Michal";
extraConfig = {
pull.rebase = true;
color.ui = true;
};

View File

@@ -1,11 +1,9 @@
{
programs.git = {
enable = true;
settings = {
user = {
email = "mousikos112@gmail.com";
name = "megan";
};
userEmail = "mousikos112@gmail.com";
userName = "megan";
extraConfig = {
pull.rebase = true;
color.ui = true;
};

View File

@@ -1,11 +1,9 @@
{
programs.git = {
enable = true;
settings = {
user = {
email = "Richie@tmmworkshop.com";
name = "Richie Cahill";
};
userEmail = "Richie@tmmworkshop.com";
userName = "Richie Cahill";
extraConfig = {
pull.rebase = true;
color.ui = true;
};

View File

@@ -88,6 +88,5 @@
"redhat.telemetry.enabled": true,
"gitlens.plusFeatures.enabled": false,
// new
"hediet.vscode-drawio.resizeImages": null,
"hediet.vscode-drawio.appearance": "automatic"
"hediet.vscode-drawio.resizeImages": null
}

View File

@@ -1,4 +1,4 @@
{ pkgs, ... }:
{ pkgs, inputs, ... }:
{
home.packages = with pkgs; [
# cli
@@ -58,5 +58,6 @@
nix-tree
nixfmt-rfc-style
treefmt
inputs.system_tools.packages.x86_64-linux.default
];
}

View File

@@ -43,6 +43,12 @@
identityFile = "~/.ssh/id_ed25519";
port = 922;
};
leviathan = {
hostname = "192.168.90.127";
user = "richie";
identityFile = "~/.ssh/id_ed25519";
port = 332;
};
};
};
}