Compare commits

..

10 Commits

56 changed files with 858 additions and 2281 deletions

View File

@@ -0,0 +1,9 @@
---
description: Format code using treefmt
---
// turbo
1. Run treefmt
```bash
treefmt
```

View File

@@ -25,4 +25,4 @@ jobs:
- name: Build default package
run: "nixos-rebuild build --flake ./#${{ matrix.system }}"
- name: copy to nix-cache
run: nix copy --accept-flake-config --to unix:///host-nix/var/nix/daemon-socket/socket .#nixosConfigurations.${{ matrix.system }}.config.system.build.toplevel
run: nix copy --to ssh://jeeves .#nixosConfigurations.${{ matrix.system }}.config.system.build.toplevel

48
.github/workflows/fix_eval_warnings.yml vendored Normal file
View File

@@ -0,0 +1,48 @@
name: Fix Evaluation Warnings
on:
workflow_run:
workflows: ["build_systems"]
types:
- completed
permissions:
contents: write
pull-requests: write
actions: read
jobs:
analyze-and-fix:
runs-on: self-hosted
if: ${{ github.event.workflow_run.conclusion == 'success' || github.event.workflow_run.conclusion == 'failure' }}
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Download logs
env:
GH_TOKEN: ${{ github.token }}
RUN_ID: ${{ github.event.workflow_run.id }}
run: |
gh run view $RUN_ID --log > build.log
- name: Run Fix Script
env:
GITHUB_TOKEN: ${{ github.token }}
GITHUB_REPOSITORY: ${{ github.repository }}
RUN_ID: ${{ github.event.workflow_run.id }}
PYTHONPATH: .
run: |
python3 python/tools/fix_eval_warnings.py build.log
- name: Create Pull Request
if: hashFiles('fix_suggestions.md') != ''
uses: peter-evans/create-pull-request@v6
with:
token: ${{ github.token }}
commit-message: "fix: automated evaluation warning fixes"
title: "fix: automated evaluation warning fixes"
body-path: fix_suggestions.md
branch: "auto-fix-eval-warnings-${{ github.event.workflow_run.id }}"
base: main
labels: "automated-fix"

1
.gitignore vendored
View File

@@ -165,3 +165,4 @@ test.*
# syncthing
.stfolder
fix_suggestions.md

13
.vscode/settings.json vendored
View File

@@ -81,6 +81,7 @@
"FASTFOX",
"ffmpegthumbnailer",
"filebot",
"filebrowser",
"fileroller",
"findbar",
"Fira",
@@ -97,7 +98,6 @@
"getch",
"getmaxyx",
"ghdeploy",
"gitea",
"globalprivacycontrol",
"gparted",
"gtts",
@@ -116,9 +116,7 @@
"httpchk",
"hurlenko",
"hwloc",
"ical",
"ignorelist",
"improv",
"INITDB",
"iocharset",
"ioit",
@@ -128,8 +126,6 @@
"jnoortheen",
"jsbc",
"kagi",
"keyformat",
"keylocation",
"kuma",
"lazer",
"levelname",
@@ -229,9 +225,12 @@
"pylint",
"pymetno",
"pymodbus",
"pyopenweathermap",
"pyownet",
"pytest",
"qbit",
"qbittorrent",
"qbittorrentvpn",
"qbitvpn",
"quicksuggest",
"radarr",
"readahead",
@@ -289,11 +288,9 @@
"twimg",
"typer",
"uaccess",
"ubiquiti",
"ublock",
"uiprotect",
"uitour",
"unifi",
"unrar",
"unsubmitted",
"uptimekuma",

View File

@@ -1,5 +1,12 @@
## Dev environment tips
- use treefmt to format all files
- make python code ruff compliant
- use pytest to test python code
- keep new code consistent with the existing style
### Python
- make code `ruff` compliant
- use pytest to test python code tests should be put in `tests` directory
- dont use global state
- use google style docstrings
- use typer over argparse

View File

@@ -23,7 +23,7 @@
boot = {
tmp.useTmpfs = true;
kernelPackages = lib.mkDefault pkgs.linuxPackages_6_12;
zfs.package = lib.mkDefault pkgs.zfs_2_4;
zfs.package = lib.mkDefault pkgs.zfs_2_3;
};
hardware.enableRedistributableFirmware = true;

View File

@@ -1,8 +1,8 @@
{ pkgs, ... }:
{
boot = {
kernelPackages = pkgs.linuxPackages_6_18;
zfs.package = pkgs.zfs_2_4;
kernelPackages = pkgs.linuxPackages_6_17;
zfs.package = pkgs.zfs_unstable;
};
hardware.bluetooth = {

36
flake.lock generated
View File

@@ -8,11 +8,11 @@
},
"locked": {
"dir": "pkgs/firefox-addons",
"lastModified": 1766762570,
"narHash": "sha256-Nevsj5NYurwp3I6nSMeh3uirwoinVSbCldqOXu4smms=",
"lastModified": 1763697825,
"narHash": "sha256-AgCCcVPOi1tuzuW5/StlwqBjRWSX62oL97qWuxrq5UA=",
"owner": "rycee",
"repo": "nur-expressions",
"rev": "03d7d310ea91d6e4b47ed70aa86c781fcc5b38e1",
"rev": "cefce78793603231be226fa77e7ad58e0e4899b8",
"type": "gitlab"
},
"original": {
@@ -29,11 +29,11 @@
]
},
"locked": {
"lastModified": 1766682973,
"narHash": "sha256-GKO35onS711ThCxwWcfuvbIBKXwriahGqs+WZuJ3v9E=",
"lastModified": 1763748372,
"narHash": "sha256-AUc78Qv3sWir0hvbmfXoZ7Jzq9VVL97l+sP9Jgms+JU=",
"owner": "nix-community",
"repo": "home-manager",
"rev": "91cdb0e2d574c64fae80d221f4bf09d5592e9ec2",
"rev": "d10a9b16b2a3ee28433f3d1c603f4e9f1fecb8e1",
"type": "github"
},
"original": {
@@ -44,11 +44,11 @@
},
"nixos-hardware": {
"locked": {
"lastModified": 1766568855,
"narHash": "sha256-UXVtN77D7pzKmzOotFTStgZBqpOcf8cO95FcupWp4Zo=",
"lastModified": 1762847253,
"narHash": "sha256-BWWnUUT01lPwCWUvS0p6Px5UOBFeXJ8jR+ZdLX8IbrU=",
"owner": "nixos",
"repo": "nixos-hardware",
"rev": "c5db9569ac9cc70929c268ac461f4003e3e5ca80",
"rev": "899dc449bc6428b9ee6b3b8f771ca2b0ef945ab9",
"type": "github"
},
"original": {
@@ -60,11 +60,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1766651565,
"narHash": "sha256-QEhk0eXgyIqTpJ/ehZKg9IKS7EtlWxF3N7DXy42zPfU=",
"lastModified": 1763421233,
"narHash": "sha256-Stk9ZYRkGrnnpyJ4eqt9eQtdFWRRIvMxpNRf4sIegnw=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "3e2499d5539c16d0d173ba53552a4ff8547f4539",
"rev": "89c2b2330e733d6cdb5eae7b899326930c2c0648",
"type": "github"
},
"original": {
@@ -76,11 +76,11 @@
},
"nixpkgs-master": {
"locked": {
"lastModified": 1766794443,
"narHash": "sha256-Q8IyTQ3Lu8vX/iqO3U+E4pjLbP1NsqFih6uElf8OYrQ=",
"lastModified": 1763774007,
"narHash": "sha256-PPeHfKA11P09kBkBD5pS3tIAFjnG5muHQnODQGTY87g=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "088b069b8270ee36d83533c86b9f91d924d185d9",
"rev": "8a7cf7e9e18384533d9ecd0bfbcf475ac1dc497e",
"type": "github"
},
"original": {
@@ -125,11 +125,11 @@
]
},
"locked": {
"lastModified": 1766289575,
"narHash": "sha256-BOKCwOQQIP4p9z8DasT5r+qjri3x7sPCOq+FTjY8Z+o=",
"lastModified": 1763607916,
"narHash": "sha256-VefBA1JWRXM929mBAFohFUtQJLUnEwZ2vmYUNkFnSjE=",
"owner": "Mic92",
"repo": "sops-nix",
"rev": "9836912e37aef546029e48c8749834735a6b9dad",
"rev": "877bb495a6f8faf0d89fc10bd142c4b7ed2bcc0b",
"type": "github"
},
"original": {

View File

@@ -30,9 +30,7 @@
pytest-xdist
requests
ruff
scalene
sqlalchemy
textual
typer
types-requests
]

View File

@@ -48,12 +48,6 @@ lint.ignore = [
"ERA001", # (perm) I don't care about print statements dir
]
"python/splendor/**" = [
"S311", # (perm) there is no security issue here
"T201", # (perm) I don't care about print statements dir
"PLR2004", # (temps) need to think about this
]
[tool.ruff.lint.pydocstyle]
convention = "google"
@@ -64,7 +58,7 @@ builtins-ignorelist = ["id"]
max-args = 9
[tool.coverage.run]
source = ["system_tools"]
source = ["python"]
[tool.coverage.report]
exclude_lines = [

View File

@@ -1 +0,0 @@
game_data/

View File

@@ -1 +0,0 @@
"""init."""

View File

@@ -1,675 +0,0 @@
"""Base logic for the Splendor game."""
from __future__ import annotations
import itertools
import json
import random
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Literal, Protocol
if TYPE_CHECKING:
from collections.abc import Sequence
from pathlib import Path
GemColor = Literal["white", "blue", "green", "red", "black", "gold"]
GEM_COLORS: tuple[GemColor, ...] = (
"white",
"blue",
"green",
"red",
"black",
"gold",
)
BASE_COLORS: tuple[GemColor, ...] = (
"white",
"blue",
"green",
"red",
"black",
)
GEM_ORDER: list[GemColor] = list(GEM_COLORS)
GEM_INDEX: dict[GemColor, int] = {c: i for i, c in enumerate(GEM_ORDER)}
BASE_INDEX: dict[GemColor, int] = {c: i for i, c in enumerate(BASE_COLORS)}
@dataclass(frozen=True)
class Card:
"""Development card: gives points + a permanent gem discount."""
tier: int
points: int
color: GemColor
cost: dict[GemColor, int]
@dataclass(frozen=True)
class Noble:
"""Noble tile: gives points if you have enough bonuses."""
name: str
points: int
requirements: dict[GemColor, int]
@dataclass
class PlayerState:
"""State of a player in the game."""
strategy: Strategy
tokens: dict[GemColor, int] = field(default_factory=lambda: dict.fromkeys(GEM_COLORS, 0))
discounts: dict[GemColor, int] = field(default_factory=lambda: dict.fromkeys(GEM_COLORS, 0))
cards: list[Card] = field(default_factory=list)
reserved: list[Card] = field(default_factory=list)
nobles: list[Noble] = field(default_factory=list)
card_score: int = 0
noble_score: int = 0
def total_tokens(self) -> int:
"""Total tokens in player's bank."""
return sum(self.tokens.values())
def add_noble(self, noble: Noble) -> None:
"""Add a noble to the player."""
self.nobles.append(noble)
self.noble_score = sum(noble.points for noble in self.nobles)
def add_card(self, card: Card) -> None:
"""Add a card to the player."""
self.cards.append(card)
self.card_score = sum(card.points for card in self.cards)
@property
def score(self) -> int:
"""Total points in player's cards + nobles."""
return self.card_score + self.noble_score
def can_afford(self, card: Card) -> bool:
"""Check if player can afford card, using discounts + gold."""
missing = 0
gold = self.tokens["gold"]
for color, cost in card.cost.items():
missing += max(0, cost - self.discounts.get(color, 0) - self.tokens.get(color, 0))
if missing > gold:
return False
return True
def pay_for_card(self, card: Card) -> dict[GemColor, int]:
"""Pay tokens for card, move card to tableau, return payment for bank."""
if not self.can_afford(card):
msg = f"cannot afford card {card}"
raise ValueError(msg)
payment: dict[GemColor, int] = dict.fromkeys(GEM_COLORS, 0)
gold_available = self.tokens["gold"]
for color in BASE_COLORS:
cost = card.cost.get(color, 0)
effective_cost = max(0, cost - self.discounts.get(color, 0))
use = min(self.tokens[color], effective_cost)
self.tokens[color] -= use
payment[color] += use
remaining = effective_cost - use
if remaining > 0:
use_gold = min(gold_available, remaining)
gold_available -= use_gold
self.tokens["gold"] -= use_gold
payment["gold"] += use_gold
self.add_card(card)
self.discounts[card.color] += 1
return payment
def get_default_starting_tokens(player_count: int) -> dict[GemColor, int]:
"""get_default_starting_tokens."""
token_count = (player_count * player_count - 3 * player_count + 10) // 2
return {
"white": token_count,
"blue": token_count,
"green": token_count,
"red": token_count,
"black": token_count,
"gold": 5,
}
@dataclass
class GameConfig:
"""Game configuration: gems, bank, cards, nobles, etc."""
win_score: int = 15
table_cards_per_tier: int = 4
reserve_limit: int = 3
token_limit: int = 10
turn_limit: int = 1000
minimum_tokens_to_buy_2: int = 4
max_token_take: int = 3
cards: list[Card] = field(default_factory=list)
nobles: list[Noble] = field(default_factory=list)
class GameState:
"""Game state: players, bank, decks, table, available nobles, etc."""
def __init__(
self,
config: GameConfig,
players: list[PlayerState],
bank: dict[GemColor, int],
decks_by_tier: dict[int, list[Card]],
table_by_tier: dict[int, list[Card]],
available_nobles: list[Noble],
) -> None:
"""Game state."""
self.config = config
self.players = players
self.bank = bank
self.decks_by_tier = decks_by_tier
self.table_by_tier = table_by_tier
self.available_nobles = available_nobles
self.noble_min_requirements = 0
self.get_noble_min_requirements()
self.current_player_index = 0
self.finished = False
def get_noble_min_requirements(self) -> None:
"""Find the minimum requirement for all available nobles."""
test = 0
for noble in self.available_nobles:
test = max(test, min(foo for foo in noble.requirements.values()))
self.noble_min_requirements = test
def next_player(self) -> None:
"""Advance to the next player."""
self.current_player_index = (self.current_player_index + 1) % len(self.players)
@property
def current_player(self) -> PlayerState:
"""Current player."""
return self.players[self.current_player_index]
def refill_table(self) -> None:
"""Refill face-up cards from decks."""
for tier, deck in self.decks_by_tier.items():
table = self.table_by_tier[tier]
while len(table) < self.config.table_cards_per_tier and deck:
table.append(deck.pop())
def check_winner_simple(self) -> PlayerState | None:
"""Simplified: end immediately when someone hits win_score."""
eligible = [player for player in self.players if player.score >= self.config.win_score]
if not eligible:
return None
eligible.sort(
key=lambda p: (p.score, -len(p.cards)),
reverse=True,
)
self.finished = True
return eligible[0]
class Action:
"""Marker protocol for actions."""
@dataclass
class TakeDifferent(Action):
"""Take up to 3 different gem colors."""
colors: list[GemColor]
@dataclass
class TakeDouble(Action):
"""Take two of the same color."""
color: GemColor
@dataclass
class BuyCard(Action):
"""Buy a face-up card."""
tier: int
index: int
@dataclass
class BuyCardReserved(Action):
"""Buy a face-up card."""
index: int
@dataclass
class ReserveCard(Action):
"""Reserve a face-up card."""
tier: int
index: int | None = None
from_deck: bool = False
class Strategy(Protocol):
"""Implement this to make a bot or human controller."""
def __init__(self, name: str) -> None:
"""Initialize a strategy."""
self.name = name
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Return an Action, or None to concede/end."""
raise NotImplementedError
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Called if player has more than token_limit tokens after an action.
Default: naive auto-discard.
"""
return auto_discard_tokens(player, excess)
def choose_noble(
self,
game: GameState, # noqa: ARG002
player: PlayerState, # noqa: ARG002
nobles: list[Noble],
) -> Noble:
"""Called if player qualifies for multiple nobles. Default: first."""
return nobles[0]
def auto_discard_tokens(player: PlayerState, excess: int) -> dict[GemColor, int]:
"""Very dumb discard logic: discard from colors you have the most of."""
to_discard: dict[GemColor, int] = dict.fromkeys(GEM_COLORS, 0)
remaining = excess
while remaining > 0:
color = max(player.tokens, key=lambda c: player.tokens[c])
if player.tokens[color] == 0:
break
player.tokens[color] -= 1
to_discard[color] += 1
remaining -= 1
return to_discard
def enforce_token_limit(
game: GameState,
strategy: Strategy,
player: PlayerState,
) -> None:
"""If player has more than token_limit tokens, force discards."""
limit = game.config.token_limit
total = player.total_tokens()
if total <= limit:
return
excess = total - limit
discards = strategy.choose_discard(game, player, excess)
for color, amount in discards.items():
available = player.tokens[color]
to_remove = min(amount, available)
if to_remove <= 0:
continue
player.tokens[color] -= to_remove
game.bank[color] += to_remove
remaining = player.total_tokens() - limit
if remaining > 0:
auto = auto_discard_tokens(player, remaining)
for color, amount in auto.items():
game.bank[color] += amount
def _check_nobles_for_player(player: PlayerState, noble: Noble) -> bool:
# this rule is slower
for color, cost in noble.requirements.items(): # noqa: SIM110
if player.discounts[color] < cost:
return False
return True
def check_nobles_for_player(
game: GameState,
strategy: Strategy,
player: PlayerState,
) -> None:
"""Award at most one noble to player if they qualify."""
if game.noble_min_requirements > max(player.discounts.values()):
return
candidates = [noble for noble in game.available_nobles if _check_nobles_for_player(player, noble)]
if not candidates:
return
chosen = candidates[0] if len(candidates) == 1 else strategy.choose_noble(game, player, candidates)
if chosen not in game.available_nobles:
return
game.available_nobles.remove(chosen)
game.get_noble_min_requirements()
player.add_noble(chosen)
def apply_take_different(game: GameState, strategy: Strategy, action: TakeDifferent) -> None:
"""Mutate game state according to action."""
player = game.current_player
colors = [color for color in action.colors if color in BASE_COLORS and game.bank[color] > 0]
if not (1 <= len(colors) <= game.config.max_token_take):
return
for color in colors:
game.bank[color] -= 1
player.tokens[color] += 1
enforce_token_limit(game, strategy, player)
def apply_take_double(game: GameState, strategy: Strategy, action: TakeDouble) -> None:
"""Mutate game state according to action."""
player = game.current_player
color = action.color
if color not in BASE_COLORS:
return
if game.bank[color] < game.config.minimum_tokens_to_buy_2:
return
game.bank[color] -= 2
player.tokens[color] += 2
enforce_token_limit(game, strategy, player)
def apply_buy_card(game: GameState, _strategy: Strategy, action: BuyCard) -> None:
"""Mutate game state according to action."""
player = game.current_player
row = game.table_by_tier.get(action.tier)
if row is None or not (0 <= action.index < len(row)):
return
card = row[action.index]
if not player.can_afford(card):
return
row.pop(action.index)
payment = player.pay_for_card(card)
for color, amount in payment.items():
game.bank[color] += amount
game.refill_table()
def apply_buy_card_reserved(game: GameState, _strategy: Strategy, action: BuyCardReserved) -> None:
"""Mutate game state according to action."""
player = game.current_player
if not (0 <= action.index < len(player.reserved)):
return
card = player.reserved[action.index]
if not player.can_afford(card):
return
player.reserved.pop(action.index)
payment = player.pay_for_card(card)
for color, amount in payment.items():
game.bank[color] += amount
def apply_reserve_card(game: GameState, strategy: Strategy, action: ReserveCard) -> None:
"""Mutate game state according to action."""
player = game.current_player
if len(player.reserved) >= game.config.reserve_limit:
return
card: Card | None = None
if action.from_deck:
deck = game.decks_by_tier.get(action.tier)
if deck:
card = deck.pop()
else:
row = game.table_by_tier.get(action.tier)
if row is None:
return
if action.index is None or not (0 <= action.index < len(row)):
return
card = row.pop(action.index)
game.refill_table()
if card is None:
return
player.reserved.append(card)
if game.bank["gold"] > 0:
game.bank["gold"] -= 1
player.tokens["gold"] += 1
enforce_token_limit(game, strategy, player)
def apply_action(game: GameState, strategy: Strategy, action: Action) -> None:
"""Mutate game state according to action."""
actions = {
TakeDifferent: apply_take_different,
TakeDouble: apply_take_double,
BuyCard: apply_buy_card,
ReserveCard: apply_reserve_card,
BuyCardReserved: apply_buy_card_reserved,
}
action_func = actions.get(type(action))
if action_func is None:
msg = f"Unknown action type: {type(action)}"
raise ValueError(msg)
action_func(game, strategy, action)
# not sure how to simplify this yet
def get_legal_actions( # noqa: C901
game: GameState,
player: PlayerState | None = None,
) -> list[Action]:
"""Enumerate all syntactically legal actions for the given player.
This enforces:
- token-taking rules
- reserve limits
- affordability for buys
"""
if player is None:
player = game.players[game.current_player_index]
actions: list[Action] = []
colors_available = [c for c in BASE_COLORS if game.bank[c] > 0]
for r in (1, 2, 3):
actions.extend(TakeDifferent(colors=list(combo)) for combo in itertools.combinations(colors_available, r))
actions.extend(
TakeDouble(color=color) for color in BASE_COLORS if game.bank[color] >= game.config.minimum_tokens_to_buy_2
)
for tier, row in game.table_by_tier.items():
for idx, card in enumerate(row):
if player.can_afford(card):
actions.append(BuyCard(tier=tier, index=idx))
for idx, card in enumerate(player.reserved):
if player.can_afford(card):
actions.append(BuyCardReserved(index=idx))
if len(player.reserved) < game.config.reserve_limit:
for tier, row in game.table_by_tier.items():
for idx, _ in enumerate(row):
actions.append(
ReserveCard(tier=tier, index=idx, from_deck=False),
)
for tier, deck in game.decks_by_tier.items():
if deck:
actions.append(
ReserveCard(tier=tier, index=None, from_deck=True),
)
return actions
def create_random_cards_tier(
tier: int,
card_count: int,
cost_choices: list[int],
point_choices: list[int],
) -> list[Card]:
"""Create a random set of cards for a given tier."""
cards: list[Card] = []
for color in BASE_COLORS:
for _ in range(card_count):
cost = dict.fromkeys(GEM_COLORS, 0)
for c in BASE_COLORS:
if c == color:
continue
cost[c] = random.choice(cost_choices)
points = random.choice(point_choices)
cards.append(Card(tier=tier, points=points, color=color, cost=cost))
return cards
def create_random_cards() -> list[Card]:
"""Generate a generic but Splendor-ish set of cards.
This is not the official deck, but structured similarly enough for play.
"""
cards: list[Card] = []
cards.extend(
create_random_cards_tier(
tier=1,
card_count=5,
cost_choices=[0, 1, 1, 2],
point_choices=[0, 0, 1],
)
)
cards.extend(
create_random_cards_tier(
tier=2,
card_count=4,
cost_choices=[2, 3, 4],
point_choices=[1, 2, 2, 3],
)
)
cards.extend(
create_random_cards_tier(
tier=3,
card_count=3,
cost_choices=[4, 5, 6],
point_choices=[3, 4, 5],
)
)
random.shuffle(cards)
return cards
def create_random_nobles() -> list[Noble]:
"""A small set of noble tiles, roughly Splendor-ish."""
nobles: list[Noble] = []
base_requirements: list[dict[GemColor, int]] = [
{"white": 3, "blue": 3, "green": 3},
{"blue": 3, "green": 3, "red": 3},
{"green": 3, "red": 3, "black": 3},
{"red": 3, "black": 3, "white": 3},
{"black": 3, "white": 3, "blue": 3},
{"white": 4, "blue": 4},
{"green": 4, "red": 4},
{"blue": 4, "black": 4},
]
for idx, req in enumerate(base_requirements, start=1):
nobles.append(
Noble(
name=f"Noble {idx}",
points=3,
requirements=dict(req.items()),
),
)
return nobles
def load_nobles(file: Path) -> list[Noble]:
"""Load nobles from a file."""
nobles = json.loads(file.read_text())
return [Noble(**noble) for noble in nobles]
def load_cards(file: Path) -> list[Card]:
"""Load cards from a file."""
cards = json.loads(file.read_text())
return [Card(**card) for card in cards]
def new_game(
strategies: Sequence[Strategy],
config: GameConfig,
) -> GameState:
"""Create a new game state from a config + list of players."""
num_players = len(strategies)
bank = get_default_starting_tokens(num_players)
decks_by_tier: dict[int, list[Card]] = {1: [], 2: [], 3: []}
for card in config.cards:
decks_by_tier.setdefault(card.tier, []).append(card)
for deck in decks_by_tier.values():
random.shuffle(deck)
table_by_tier: dict[int, list[Card]] = {1: [], 2: [], 3: []}
players = [PlayerState(strategy=strategy) for strategy in strategies]
nobles = list(config.nobles)
random.shuffle(nobles)
nobles = nobles[: num_players + 1]
game = GameState(
config=config,
players=players,
bank=bank,
decks_by_tier=decks_by_tier,
table_by_tier=table_by_tier,
available_nobles=nobles,
)
game.refill_table()
return game
def run_game(game: GameState) -> tuple[PlayerState, int]:
"""Run a full game loop until someone wins or a player returns None."""
turn_count = 0
while not game.finished:
turn_count += 1
player = game.current_player
strategy = player.strategy
action = strategy.choose_action(game, player)
if action is None:
game.finished = True
break
apply_action(game, strategy, action)
check_nobles_for_player(game, strategy, player)
winner = game.check_winner_simple()
if winner is not None:
return winner, turn_count
game.next_player()
if turn_count >= game.config.turn_limit:
break
fallback = max(game.players, key=lambda player: player.score)
return fallback, turn_count

View File

@@ -1,288 +0,0 @@
"""Bot for Splendor game."""
from __future__ import annotations
import random
from .base import (
BASE_COLORS,
Action,
BuyCard,
BuyCardReserved,
Card,
GameState,
GemColor,
PlayerState,
ReserveCard,
Strategy,
TakeDifferent,
TakeDouble,
auto_discard_tokens,
get_legal_actions,
)
def can_bot_afford(player: PlayerState, card: Card) -> bool:
"""Check if player can afford card, using discounts + gold."""
missing = 0
gold = player.tokens["gold"]
for color, cost in card.cost.items():
missing += max(0, cost - player.discounts.get(color, 0) - player.tokens.get(color, 0))
if missing > gold:
return False
return True
class RandomBot(Strategy):
"""Dumb bot that follows rules but doesn't think."""
def __init__(self, name: str) -> None:
"""Initialize the bot."""
super().__init__(name=name)
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Choose an action for the current player."""
affordable: list[tuple[int, int]] = []
for tier, row in game.table_by_tier.items():
for idx, card in enumerate(row):
if can_bot_afford(player, card):
affordable.append((tier, idx))
if affordable and random.random() < 0.5:
tier, idx = random.choice(affordable)
return BuyCard(tier=tier, index=idx)
if random.random() < 0.2:
tier = random.choice([1, 2, 3])
row = game.table_by_tier.get(tier, [])
if row:
idx = random.randrange(len(row))
return ReserveCard(tier=tier, index=idx, from_deck=False)
if random.random() < 0.5:
colors_for_double = [c for c in BASE_COLORS if game.bank[c] >= 4]
if colors_for_double:
return TakeDouble(color=random.choice(colors_for_double))
colors_for_diff = [c for c in BASE_COLORS if game.bank[c] > 0]
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[:3])
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Choose how many tokens to discard."""
return auto_discard_tokens(player, excess)
def check_cards_in_tier(row: list[Card], player: PlayerState) -> list[int]:
"""Check if player can afford card, using discounts + gold."""
return [index for index, card in enumerate(row) if can_bot_afford(player, card)]
class PersonalizedBot(Strategy):
"""PersonalizedBot."""
"""Dumb bot that follows rules but doesn't think."""
def __init__(self, name: str) -> None:
"""Initialize the bot."""
super().__init__(name=name)
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Choose an action for the current player."""
for tier in (1, 2, 3):
row = game.table_by_tier[tier]
if affordable := check_cards_in_tier(row, player):
index = random.choice(affordable)
return BuyCard(tier=tier, index=index)
colors_for_diff = [c for c in BASE_COLORS if game.bank[c] > 0]
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[:3])
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Choose how many tokens to discard."""
return auto_discard_tokens(player, excess)
class PersonalizedBot2(Strategy):
"""PersonalizedBot2."""
"""Dumb bot that follows rules but doesn't think."""
def __init__(self, name: str) -> None:
"""Initialize the bot."""
super().__init__(name=name)
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Choose an action for the current player."""
tiers = (1, 2, 3)
for tier in tiers:
row = game.table_by_tier[tier]
if affordable := check_cards_in_tier(row, player):
index = random.choice(affordable)
return BuyCard(tier=tier, index=index)
if affordable := check_cards_in_tier(player.reserved, player):
index = random.choice(affordable)
return BuyCardReserved(index=index)
colors_for_diff = [c for c in BASE_COLORS if game.bank[c] > 0]
if len(colors_for_diff) >= 3:
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[:3])
for tier in tiers:
len_deck = len(game.decks_by_tier[tier])
if len_deck:
return ReserveCard(tier=tier, index=None, from_deck=True)
return TakeDifferent(colors=colors_for_diff[:3])
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Choose how many tokens to discard."""
return auto_discard_tokens(player, excess)
def buy_card_reserved(player: PlayerState) -> Action | None:
"""Buy a card reserved."""
if affordable := check_cards_in_tier(player.reserved, player):
index = random.choice(affordable)
return BuyCardReserved(index=index)
return None
def buy_card(game: GameState, player: PlayerState) -> Action | None:
"""Buy a card."""
for tier in (1, 2, 3):
row = game.table_by_tier[tier]
if affordable := check_cards_in_tier(row, player):
index = random.choice(affordable)
return BuyCard(tier=tier, index=index)
return None
def take_tokens(game: GameState) -> Action | None:
"""Take tokens."""
colors_for_diff = [color for color in BASE_COLORS if game.bank[color] > 0]
if len(colors_for_diff) >= 3:
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[: game.config.max_token_take])
return None
class PersonalizedBot3(Strategy):
"""PersonalizedBot3."""
"""Dumb bot that follows rules but doesn't think."""
def __init__(self, name: str) -> None:
"""Initialize the bot."""
super().__init__(name=name)
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Choose an action for the current player."""
print(len(get_legal_actions(game, player)))
print(get_legal_actions(game, player))
if action := buy_card_reserved(player):
return action
if action := buy_card(game, player):
return action
colors_for_diff = [color for color in BASE_COLORS if game.bank[color] > 0]
if len(colors_for_diff) >= 3:
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[:3])
for tier in (1, 2, 3):
len_deck = len(game.decks_by_tier[tier])
if len_deck:
return ReserveCard(tier=tier, index=None, from_deck=True)
return TakeDifferent(colors=colors_for_diff[:3])
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Choose how many tokens to discard."""
return auto_discard_tokens(player, excess)
def estimate_value_of_card(game: GameState, player: PlayerState, color: GemColor) -> int:
"""Estimate value of a color in the player's bank."""
return game.bank[color] - player.discounts.get(color, 0)
def estimate_value_of_token(game: GameState, player: PlayerState, color: GemColor) -> int:
"""Estimate value of a color in the player's bank."""
return game.bank[color] - player.discounts.get(color, 0)
class PersonalizedBot4(Strategy):
"""PersonalizedBot4."""
def __init__(self, name: str) -> None:
"""Initialize the bot."""
super().__init__(name=name)
def filter_actions(self, actions: list[Action]) -> list[Action]:
"""Filter actions to only take different."""
return [
action
for action in actions
if (isinstance(action, TakeDifferent) and len(action.colors) == 3) or not isinstance(action, TakeDifferent)
]
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Choose an action for the current player."""
legal_actions = get_legal_actions(game, player)
print(len(legal_actions))
good_actions = self.filter_actions(legal_actions)
print(len(good_actions))
print(good_actions)
print(len(get_legal_actions(game, player)))
if action := buy_card_reserved(player):
return action
if action := buy_card(game, player):
return action
colors_for_diff = [color for color in BASE_COLORS if game.bank[color] > 0]
if len(colors_for_diff) >= 3:
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[:3])
for tier in (1, 2, 3):
len_deck = len(game.decks_by_tier[tier])
if len_deck:
return ReserveCard(tier=tier, index=None, from_deck=True)
return TakeDifferent(colors=colors_for_diff[:3])
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Choose how many tokens to discard."""
return auto_discard_tokens(player, excess)

View File

@@ -1,724 +0,0 @@
"""Splendor game."""
from __future__ import annotations
import sys
from typing import TYPE_CHECKING, Any
from textual.app import App, ComposeResult
from textual.containers import Horizontal, Vertical
from textual.widget import Widget
from textual.widgets import Footer, Header, Input, Static
from .base import (
BASE_COLORS,
GEM_COLORS,
Action,
BuyCard,
BuyCardReserved,
Card,
GameState,
GemColor,
Noble,
PlayerState,
ReserveCard,
Strategy,
TakeDifferent,
TakeDouble,
)
if TYPE_CHECKING:
from collections.abc import Mapping
# Abbreviations used when rendering costs
COST_ABBR: dict[GemColor, str] = {
"white": "W",
"blue": "B",
"green": "G",
"red": "R",
"black": "K",
"gold": "O",
}
# Abbreviations players can type on the command line
COLOR_ABBR_TO_FULL: dict[str, GemColor] = {
"w": "white",
"b": "blue",
"g": "green",
"r": "red",
"k": "black",
"o": "gold",
}
def parse_color_token(raw: str) -> GemColor:
"""Convert user input into a GemColor.
Supports:
- full names: white, blue, green, red, black, gold
- abbreviations: w, b, g, r, k, o
"""
key = raw.lower()
# full color names first
if key in BASE_COLORS:
return key # type: ignore[return-value]
# abbreviations
if key in COLOR_ABBR_TO_FULL:
return COLOR_ABBR_TO_FULL[key]
error = f"Unknown color: {raw}"
raise ValueError(error)
def format_cost(cost: Mapping[GemColor, int]) -> str:
"""Format a cost/requirements dict as colored tokens like 'B:2, R:1'.
Uses `color_token` internally so colors are guaranteed to match your bank.
"""
parts: list[str] = []
for color in GEM_COLORS:
n = cost.get(color, 0)
if not n:
continue
# color_token gives us e.g. "[blue]blue: 3[/]"
token = color_token(color, n)
# Turn the leading color name into the abbreviation (blue: 3 → B:3)
# We only replace the first occurrence.
full = f"{color}:"
abbr = f"{COST_ABBR[color]}:"
token = token.replace(full, abbr, 1)
parts.append(token)
return ", ".join(parts) if parts else "-"
def format_card(card: Card) -> str:
"""Readable card line using dataclass fields instead of __str__."""
color_abbr = COST_ABBR[card.color]
header = f"T{card.tier} {color_abbr} P{card.points}"
cost_str = format_cost(card.cost)
return f"{header} ({cost_str})"
def format_noble(noble: Noble) -> str:
"""Readable noble line using dataclass fields instead of __str__."""
cost_str = format_cost(noble.requirements)
return f"{noble.name} +{noble.points} ({cost_str})"
def format_tokens(tokens: Mapping[GemColor, int]) -> str:
"""Colored 'color: n' list for a token dict."""
return " ".join(color_token(c, tokens.get(c, 0)) for c in GEM_COLORS)
def format_discounts(discounts: Mapping[GemColor, int]) -> str:
"""Colored discounts, skipping zeros."""
parts: list[str] = []
for c in GEM_COLORS:
n = discounts.get(c, 0)
if not n:
continue
abbr = COST_ABBR[c]
fg, bg = COLOR_STYLE[c]
parts.append(f"[{fg} on {bg}]{abbr}:{n}[/{fg} on {bg}]")
return ", ".join(parts) if parts else "-"
COLOR_STYLE: dict[GemColor, tuple[str, str]] = {
"white": ("black", "white"), # fg, bg
"blue": ("bright_white", "blue"),
"green": ("bright_white", "sea_green4"),
"red": ("white", "red3"),
"black": ("white", "grey0"),
"gold": ("black", "yellow3"),
}
def fmt_gem(color: GemColor) -> str:
"""Render gem name with fg/bg matching real token color."""
fg, bg = COLOR_STYLE[color]
return f"[{fg} on {bg}] {color} [/{fg} on {bg}]"
def fmt_number(value: int) -> str:
"""Return a Rich-markup colored 'value' string."""
return f"[bold cyan]{value}[/]"
def color_token(name: GemColor, amount: int) -> str:
"""Return a Rich-markup colored 'name: n' string."""
# Map Splendor colors -> terminal colors
color_map: Mapping[GemColor, str] = {
"white": "white",
"blue": "blue",
"green": "green",
"red": "red",
"black": "grey70", # 'black' is unreadable on dark backgrounds
"gold": "yellow",
}
style = color_map.get(name, "white")
return f"[{style}]{name}: {amount}[/]"
class Board(Widget):
"""Big board widget with the layout you sketched."""
def __init__(self, game: GameState, me: PlayerState, **kwargs: Any) -> None: # noqa: ANN401
"""Initialize the board widget."""
super().__init__(**kwargs)
self.game = game
self.me = me
def compose(self) -> ComposeResult:
"""Compose the board widget."""
# Structure:
# ┌ bank row
# ├ middle row (tiers | nobles)
# └ players row
with Vertical(id="board_root"):
yield Static(id="bank_box")
with Horizontal(id="middle_row"):
with Vertical(id="tiers_box"):
yield Static(id="tier1_box")
yield Static(id="tier2_box")
yield Static(id="tier3_box")
yield Static(id="nobles_box")
yield Static(id="players_box")
def on_mount(self) -> None:
"""Refresh the board content."""
self.refresh_content()
def refresh_content(self) -> None:
"""Refresh the board content."""
self._render_bank()
self._render_tiers()
self._render_nobles()
self._render_players()
# --- sections ----------------------------------------------------
def _render_bank(self) -> None:
bank = self.game.bank
parts: list[str] = ["[b]Bank:[/b]"]
# One line, all tokens colored
parts.append(format_tokens(bank))
self.query_one("#bank_box", Static).update("\n".join(parts))
def _render_tiers(self) -> None:
for tier in (1, 2, 3):
box = self.query_one(f"#tier{tier}_box", Static)
cards: list[Card] = self.game.table_by_tier.get(tier, [])
lines: list[str] = [f"[b]Tier {tier} cards:[/b]"]
if not cards:
lines.append(" (none)")
else:
for idx, card in enumerate(cards):
lines.append(f" [{idx}] {format_card(card)}")
box.update("\n".join(lines))
def _render_nobles(self) -> None:
nobles_box = self.query_one("#nobles_box", Static)
lines: list[str] = ["[b]Nobles[/b]"]
if not self.game.available_nobles:
lines.append(" (none)")
else:
lines.extend(" - " + format_noble(noble) for noble in self.game.available_nobles)
nobles_box.update("\n".join(lines))
def _render_players(self) -> None:
players_box = self.query_one("#players_box", Static)
lines: list[str] = ["[b]Players:[/b]", ""]
for player in self.game.players:
mark = "*" if player is self.me else " "
token_str = format_tokens(player.tokens)
discount_str = format_discounts(player.discounts)
lines.append(
f"{mark} {player.name:10} Score={player.score:2d} Discounts={discount_str}",
)
lines.append(f" Tokens: {token_str}")
if player.nobles:
noble_names = ", ".join(n.name for n in player.nobles)
lines.append(f" Nobles: {noble_names}")
# Optional: show counts of cards / reserved
if player.cards:
lines.append(f" Cards: {len(player.cards)}")
if player.reserved:
lines.append(f" Reserved: {len(player.reserved)}")
lines.append("")
players_box.update("\n".join(lines))
class ActionApp(App[None]):
"""Textual app that asks for a single action command and returns an Action."""
CSS = """
Screen {
/* 3 rows: command zone, board, footer */
layout: grid;
grid-size: 1 3;
grid-rows: auto 1fr auto;
}
/* Top area with input + instructions */
#command_zone {
grid-columns: 1;
grid-rows: 1;
padding: 1 1;
}
/* Board sits in the middle row and can grow */
#board {
grid-columns: 1;
grid-rows: 2;
padding: 0 1 1 1;
}
Footer {
grid-columns: 1;
grid-rows: 3;
}
Input {
border: round $accent;
}
/* === Board layout === */
#board_root {
/* outer frame around the whole board area */
border: heavy white;
padding: 0 1;
}
/* Bank row: full width */
#bank_box {
border: heavy white;
padding: 0 1;
}
/* Middle row: tiers (left) + nobles (right) */
#middle_row {
layout: horizontal;
}
#tiers_box {
border: heavy white;
padding: 0 1;
width: 70%;
}
#tier1_box,
#tier2_box,
#tier3_box {
border-bottom: heavy white;
padding: 0 0 1 0;
margin-bottom: 1;
}
#nobles_box {
border: heavy white;
padding: 0 1;
width: 30%;
}
/* Players row: full width at bottom */
#players_box {
border: heavy white;
padding: 0 1;
}
"""
def __init__(self, game: GameState, player: PlayerState) -> None:
"""Initialize the action app."""
super().__init__()
self.game = game
self.player = player
self.result: Action | None = None
self.message: str = ""
def compose(self) -> ComposeResult:
"""Compose the action app."""
# Row 1: input + Actions text
with Vertical(id="command_zone"):
yield Input(
placeholder="Enter command, e.g. '1 white blue red' or '1 w b r' or 'q'",
id="input_line",
)
yield Static("", id="prompt")
# Row 2: board
yield Board(self.game, self.player, id="board")
# Row 3: footer
yield Footer()
def on_mount(self) -> None:
"""Mount the action app."""
self._update_prompt()
self.query_one(Input).focus()
def _update_prompt(self) -> None:
lines: list[str] = []
lines.append("[bold underline]Actions:[/]")
lines.append(
" [bold green]1[/] <colors...> - Take up to 3 different gem colors "
"(e.g. [cyan]1 white blue red[/] or [cyan]1 w b r[/])",
)
lines.append(
f" [bold green]2[/] <color> - Take 2 of the same color (needs {fmt_number(4)} in bank, "
"e.g. [cyan]2 blue[/] or [cyan]2 b[/])",
)
lines.append(
" [bold green]3[/] <tier> <idx> - Buy a face-up card (e.g. [cyan]3 1 0[/] for tier 1, index 0)",
)
lines.append(" [bold green]4[/] <idx> - Buy a reserved card")
lines.append(" [bold green]5[/] <tier> <idx> - Reserve a face-up card")
lines.append(" [bold green]6[/] <tier> - Reserve top card of a deck")
lines.append(" [bold red]q[/] - Quit game")
if self.message:
lines.append("")
lines.append(f"[bold red]Message:[/] {self.message}")
self.query_one("#prompt", Static).update("\n".join(lines))
def _cmd_1(self, parts: list[str]) -> str | None:
"""Take up to 3 different gem colors: 1 white blue red OR 1 w b r."""
color_names = parts[1:]
if not color_names:
return "Need at least one color (full name or abbreviation)."
colors: list[GemColor] = []
for name in color_names:
color = parse_color_token(name)
if self.game.bank[color] <= 0:
return f"No tokens left for color: {color}"
colors.append(color)
self.result = TakeDifferent(colors=colors[:3])
self.exit()
return None
def _cmd_2(self, parts: list[str]) -> str | None:
"""Take two of the same color."""
if len(parts) < 2:
return "Usage: 2 <color>"
color = parse_color_token(parts[1])
if self.game.bank[color] < self.game.config.minimum_tokens_to_buy_2:
return "Bank must have at least 4 of that color."
self.result = TakeDouble(color=color)
self.exit()
return None
def _cmd_3(self, parts: list[str]) -> str | None:
"""Buy face-up card."""
if len(parts) < 3:
return "Usage: 3 <tier> <index>"
tier = int(parts[1])
idx = int(parts[2])
self.result = BuyCard(tier=tier, index=idx)
self.exit()
return None
def _cmd_4(self, parts: list[str]) -> str | None:
"""Buy reserved card."""
if len(parts) < 2:
return "Usage: 4 <reserved_index>"
idx = int(parts[1])
if not (0 <= idx < len(self.player.reserved)):
return "Reserved index out of range."
self.result = BuyCardReserved(tier=0, index=idx)
self.exit()
return None
def _cmd_5(self, parts: list[str]) -> str | None:
"""Reserve face-up card."""
if len(parts) < 3:
return "Usage: 5 <tier> <index>"
tier = int(parts[1])
idx = int(parts[2])
self.result = ReserveCard(tier=tier, index=idx, from_deck=False)
self.exit()
return None
def _cmd_6(self, parts: list[str]) -> str | None:
"""Reserve top of deck."""
if len(parts) < 2:
return "Usage: 6 <tier>"
tier = int(parts[1])
self.result = ReserveCard(tier=tier, index=None, from_deck=True)
self.exit()
return None
def _unknown_cmd(self, _parts: list[str]) -> str:
return "Unknown command."
def on_input_submitted(self, event: Input.Submitted) -> None:
"""Handle user input."""
text = (event.value or "").strip()
event.input.value = ""
if not text:
return
if text.lower() in {"q", "quit", "0"}:
self.result = None
self.exit()
return
parts = text.split()
cmds = {
"1": self._cmd_1,
"2": self._cmd_2,
"3": self._cmd_3,
"4": self._cmd_4,
"5": self._cmd_5,
"6": self._cmd_6,
}
cmd = parts[0]
error = cmds.get(cmd, self._unknown_cmd)(parts)
if error:
self.message = error
self._update_prompt()
return
class DiscardApp(App[None]):
"""Textual app to choose discards when over token limit."""
CSS = """
Screen {
layout: vertical;
}
#command_zone {
padding: 1 1;
}
#board {
padding: 0 1 1 1;
}
Input {
border: round $accent;
}
"""
def __init__(self, game: GameState, player: PlayerState) -> None:
"""Initialize the discard app."""
super().__init__()
self.game = game
self.player = player
self.discards: dict[GemColor, int] = dict.fromkeys(GEM_COLORS, 0)
self.message: str = ""
def compose(self) -> ComposeResult: # type: ignore[override]
"""Compose the discard app."""
yield Header(show_clock=False)
with Vertical(id="command_zone"):
yield Input(
placeholder="Enter color to discard, e.g. 'blue' or 'b'",
id="input_line",
)
yield Static("", id="prompt")
# Board directly under the command zone
yield Board(self.game, self.player, id="board")
yield Footer()
def on_mount(self) -> None: # type: ignore[override]
"""Mount the discard app."""
self._update_prompt()
self.query_one(Input).focus()
def _remaining_to_discard(self) -> int:
return self.player.total_tokens() - sum(self.discards.values()) - self.game.config.token_limit
def _update_prompt(self) -> None:
remaining = max(self._remaining_to_discard(), 0)
lines: list[str] = []
lines.append(
"You must discard "
f"{fmt_number(remaining)} token(s) "
f"to get down to {fmt_number(self.game.config.token_limit)}.",
)
disc_str = ", ".join(f"{fmt_gem(c)}={fmt_number(self.discards[c])}" for c in GEM_COLORS)
lines.append(f"Current planned discards: {{ {disc_str} }}")
lines.append(
"Type a color name or abbreviation (e.g. 'blue' or 'b') to discard one token.",
)
if self.message:
lines.append("")
lines.append(f"[bold red]Message:[/] {self.message}")
self.query_one("#prompt", Static).update("\n".join(lines))
def on_input_submitted(self, event: Input.Submitted) -> None: # type: ignore[override]
"""Handle user input."""
raw = (event.value or "").strip()
event.input.value = ""
if not raw:
return
try:
color = parse_color_token(raw)
except ValueError:
self.message = f"Unknown color: {raw}"
self._update_prompt()
return
available = self.player.tokens[color] - self.discards[color]
if available <= 0:
self.message = f"No more {color} tokens available to discard."
self._update_prompt()
return
self.discards[color] += 1
if self._remaining_to_discard() <= 0:
self.exit()
return
self.message = ""
self._update_prompt()
# ---------------------------------------------------------------------------
# Noble choice app
# ---------------------------------------------------------------------------
class NobleChoiceApp(App[None]):
"""Textual app to choose one noble."""
CSS = """
Screen {
layout: vertical;
}
#command_zone {
padding: 1 1;
}
#board {
padding: 0 1 1 1;
}
Input {
border: round $accent;
}
"""
def __init__(
self,
game: GameState,
player: PlayerState,
nobles: list[Noble],
) -> None:
"""Initialize the noble choice app."""
super().__init__()
self.game = game
self.player = player
self.nobles = nobles
self.result: Noble | None = None
self.message: str = ""
def compose(self) -> ComposeResult: # type: ignore[override]
"""Compose the noble choice app."""
yield Header(show_clock=False)
with Vertical(id="command_zone"):
yield Input(
placeholder="Enter noble index, e.g. '0'",
id="input_line",
)
yield Static("", id="prompt")
# Board directly under the command zone
yield Board(self.game, self.player, id="board")
yield Footer()
def on_mount(self) -> None: # type: ignore[override]
"""Mount the noble choice app."""
self._update_prompt()
self.query_one(Input).focus()
def _update_prompt(self) -> None:
lines: list[str] = []
lines.append("[bold underline]You qualify for nobles:[/]")
for i, noble in enumerate(self.nobles):
lines.append(f" [bright_cyan]{i})[/] {format_noble(noble)}")
lines.append("Enter the index of the noble you want.")
if self.message:
lines.append("")
lines.append(f"[bold red]Message:[/] {self.message}")
self.query_one("#prompt", Static).update("\n".join(lines))
def on_input_submitted(self, event: Input.Submitted) -> None: # type: ignore[override]
"""Handle user input."""
raw = (event.value or "").strip()
event.input.value = ""
if not raw:
return
try:
idx = int(raw)
except ValueError:
self.message = "Please enter a valid integer index."
self._update_prompt()
return
if not (0 <= idx < len(self.nobles)):
self.message = "Index out of range."
self._update_prompt()
return
self.result = self.nobles[idx]
self.exit()
class TuiHuman(Strategy):
"""Textual-based human player Strategy with colorful board."""
def choose_action(
self,
game: GameState,
player: PlayerState,
) -> Action | None:
"""Choose an action for the player."""
if not sys.stdout.isatty():
return None
app = ActionApp(game, player)
app.run()
return app.result
def choose_discard(
self,
game: GameState,
player: PlayerState,
excess: int, # noqa: ARG002
) -> dict[GemColor, int]:
"""Choose tokens to discard."""
if not sys.stdout.isatty():
return dict.fromkeys(GEM_COLORS, 0)
app = DiscardApp(game, player)
app.run()
return app.discards
def choose_noble(
self,
game: GameState,
player: PlayerState,
nobles: list[Noble],
) -> Noble:
"""Choose a noble for the player."""
if not sys.stdout.isatty():
return nobles[0]
app = NobleChoiceApp(game, player, nobles)
app.run()
return app.result

View File

@@ -1,19 +0,0 @@
"""Main entry point for Splendor game."""
from __future__ import annotations
from .base import new_game, run_game
from .bot import RandomBot
from .human import TuiHuman
def main() -> None:
"""Main entry point."""
human = TuiHuman()
bot = RandomBot()
game_state = new_game(["You", "Bot A"])
run_game(game_state, [human, bot])
if __name__ == "__main__":
main()

View File

@@ -1,111 +0,0 @@
"""Public state for RL/search."""
from __future__ import annotations
from dataclasses import dataclass
from .base import (
BASE_COLORS,
BASE_INDEX,
GEM_ORDER,
Card,
GameState,
Noble,
PlayerState,
)
@dataclass(frozen=True)
class ObsCard:
"""Numeric-ish card view for RL/search."""
tier: int
points: int
color_index: int
cost: list[int]
@dataclass(frozen=True)
class ObsNoble:
"""Numeric-ish noble view for RL/search."""
points: int
requirements: list[int]
@dataclass(frozen=True)
class ObsPlayer:
"""Numeric-ish player view for RL/search."""
tokens: list[int]
discounts: list[int]
score: int
cards: list[ObsCard]
reserved: list[ObsCard]
nobles: list[ObsNoble]
@dataclass(frozen=True)
class Observation:
"""Full public state for RL/search."""
current_player: int
bank: list[int]
players: list[ObsPlayer]
table_by_tier: dict[int, list[ObsCard]]
decks_remaining: dict[int, int]
available_nobles: list[ObsNoble]
def _encode_card(card: Card) -> ObsCard:
color_index = BASE_INDEX.get(card.color, -1)
cost_vec = [card.cost.get(c, 0) for c in BASE_COLORS]
return ObsCard(
tier=card.tier,
points=card.points,
color_index=color_index,
cost=cost_vec,
)
def _encode_noble(noble: Noble) -> ObsNoble:
req_vec = [noble.requirements.get(c, 0) for c in BASE_COLORS]
return ObsNoble(
points=noble.points,
requirements=req_vec,
)
def _encode_player(player: PlayerState) -> ObsPlayer:
tokens_vec = [player.tokens[c] for c in GEM_ORDER]
discounts_vec = [player.discounts[c] for c in GEM_ORDER]
cards_enc = [_encode_card(c) for c in player.cards]
reserved_enc = [_encode_card(c) for c in player.reserved]
nobles_enc = [_encode_noble(n) for n in player.nobles]
return ObsPlayer(
tokens=tokens_vec,
discounts=discounts_vec,
score=player.score,
cards=cards_enc,
reserved=reserved_enc,
nobles=nobles_enc,
)
def to_observation(game: GameState) -> Observation:
"""Create a structured observation of the full public state."""
bank_vec = [game.bank[c] for c in GEM_ORDER]
players_enc = [_encode_player(p) for p in game.players]
table_enc: dict[int, list[ObsCard]] = {
tier: [_encode_card(c) for c in row] for tier, row in game.table_by_tier.items()
}
decks_remaining = {tier: len(deck) for tier, deck in game.decks_by_tier.items()}
nobles_enc = [_encode_noble(n) for n in game.available_nobles]
return Observation(
current_player=game.current_player_index,
bank=bank_vec,
players=players_enc,
table_by_tier=table_enc,
decks_remaining=decks_remaining,
available_nobles=nobles_enc,
)

View File

@@ -1,36 +0,0 @@
"""Simulate a step in the game."""
from __future__ import annotations
import copy
from .base import Action, GameState, PlayerState, apply_action, check_nobles_for_player
from .bot import RandomBot
class SimStrategy(RandomBot):
"""Strategy used in simulate_step.
We never call choose_action here (caller chooses actions),
but we reuse discard/noble-selection logic.
"""
def choose_action(self, game: GameState, player: PlayerState) -> Action | None: # noqa: ARG002
"""Choose an action for the current player."""
msg = "SimStrategy.choose_action should not be used in simulate_step"
raise RuntimeError(msg)
def simulate_step(game: GameState, action: Action) -> GameState:
"""Return a deep-copied next state after applying action for the current player.
Useful for tree search / MCTS:
next_state = simulate_step(state, action)
"""
next_state = copy.deepcopy(game)
sim_strategy = SimStrategy()
apply_action(next_state, sim_strategy, action)
check_nobles_for_player(next_state, sim_strategy, next_state.current_player)
next_state.next_player()
return next_state

View File

@@ -1,50 +0,0 @@
"""Simulator for Splendor game."""
from __future__ import annotations
from collections import defaultdict
from pathlib import Path
from statistics import mean
from .base import GameConfig, load_cards, load_nobles, new_game, run_game
from .bot import PersonalizedBot4, RandomBot
def main() -> None:
"""Main entry point."""
turn_limit = 1000
good_games = 0
games = 1
winners: dict[str, list] = defaultdict(list)
game_data = Path(__file__).parent / "game_data"
cards = load_cards(game_data / "cards/default.json")
nobles = load_nobles(game_data / "nobles/default.json")
for _ in range(games):
bot_a = RandomBot("bot_a")
bot_b = RandomBot("bot_b")
bot_c = RandomBot("bot_c")
bot_d = PersonalizedBot4("my_bot")
config = GameConfig(
cards=cards,
nobles=nobles,
turn_limit=turn_limit,
)
players = (bot_a, bot_b, bot_c, bot_d)
game_state = new_game(players, config)
winner, turns = run_game(game_state)
if turns < turn_limit:
good_games += 1
winners[winner.strategy.name].append(turns)
print(
f"out of {games} {turn_limit} turn games with {len(players)}"
f"random bots there where {good_games} games where a bot won"
)
for name, turns in winners.items():
print(f"{name} won {len(turns)} games in {mean(turns):.2f} turns")
if __name__ == "__main__":
main()

161
python/tools/fix_eval_warnings.py Executable file
View File

@@ -0,0 +1,161 @@
#!/usr/bin/env python3
"""fix_eval_warnings."""
from __future__ import annotations
import logging
import os
from dataclasses import dataclass
from pathlib import Path
import requests
import typer
from python.common import configure_logger
logger = logging.getLogger(__name__)
@dataclass
class Config:
"""Configuration for the script.
Attributes:
github_token (str): GitHub token for API authentication.
model_name (str): The name of the LLM model to use. Defaults to "gpt-4o".
api_base (str): The base URL for the GitHub Models API.
Defaults to "https://models.inference.ai.azure.com".
"""
github_token: str
model_name: str = "gpt-4o"
api_base: str = "https://models.inference.ai.azure.com"
def get_log_content(run_id: str) -> None:
"""Fetch the logs for a specific workflow run.
Args:
run_id (str): The run ID.
"""
logger.info(f"Fetching logs for run ID: {run_id}")
# List artifacts to find logs (or use jobs API)
# For simplicity, we might need to use 'gh' cli in the workflow to download logs
# But let's try to read from a file if passed as argument, which is easier for the workflow
def parse_warnings(log_file_path: Path) -> list[str]:
"""Parse the log file for evaluation warnings.
Args:
log_file_path (Path): The path to the log file.
Returns:
list[str]: A list of warning messages.
"""
warnings = []
with log_file_path.open(encoding="utf-8", errors="ignore") as f:
warnings.extend(line.strip() for line in f if "evaluation warning:" in line)
return warnings
def generate_fix(warning_msg: str, config: Config) -> str | None:
"""Call GitHub Models to generate a fix for the warning.
Args:
warning_msg (str): The warning message.
config (Config): The configuration object.
Returns:
Optional[str]: The suggested fix or None.
"""
logger.info(f"Generating fix for: {warning_msg}")
prompt = f"""
I encountered the following Nix evaluation warning:
`{warning_msg}`
Please explain what this warning means and suggest how to fix it in the Nix code.
If possible, provide the exact code change in a diff format or a clear description of what to change.
"""
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {config.github_token}"}
payload = {
"messages": [
{"role": "system", "content": "You are an expert NixOS and Nix language developer."},
{"role": "user", "content": prompt},
],
"model": config.model_name,
"temperature": 0.1,
}
try:
response = requests.post(f"{config.api_base}/chat/completions", headers=headers, json=payload, timeout=30)
response.raise_for_status()
result = response.json()
return result["choices"][0]["message"]["content"] # type: ignore[no-any-return]
except Exception:
logger.exception("Error calling LLM")
return None
def main(
log_file: Path = typer.Argument(..., help="Path to the build log file"), # noqa: B008
model_name: str = typer.Option("gpt-4o", envvar="MODEL_NAME", help="LLM Model Name"),
) -> None:
"""Detect evaluation warnings in logs and suggest fixes using GitHub Models.
Args:
log_file (Path): Path to the build log file containing evaluation warnings.
model_name (str): The name of the LLM model to use for generating fixes.
Defaults to "gpt-4o", can be overridden by MODEL_NAME environment variable.
"""
configure_logger()
github_token = os.environ.get("GITHUB_TOKEN")
if not github_token:
logger.warning("GITHUB_TOKEN not set. LLM calls will fail.")
config = Config(github_token=github_token or "", model_name=model_name)
if not log_file.exists():
logger.error(f"Log file not found: {log_file}")
raise typer.Exit(code=1)
warnings = parse_warnings(log_file)
if not warnings:
logger.info("No evaluation warnings found.")
raise typer.Exit(code=0)
logger.info(f"Found {len(warnings)} warnings.")
# Process unique warnings to save tokens
unique_warnings = list(set(warnings))
fixes = []
for warning in unique_warnings:
if not config.github_token:
logger.warning("Skipping LLM call due to missing GITHUB_TOKEN")
continue
fix = generate_fix(warning, config)
if fix:
fixes.append(f"## Warning\n`{warning}`\n\n## Suggested Fix\n{fix}\n")
# Output fixes to a markdown file for the PR body
if fixes:
with Path("fix_suggestions.md").open("w") as f:
f.write("# Automated Fix Suggestions\n\n")
f.write("\n---\n".join(fixes))
logger.info("Fix suggestions written to fix_suggestions.md")
else:
logger.info("No fixes generated.")
app = typer.Typer()
app.command()(main)
if __name__ == "__main__":
app()

View File

@@ -16,6 +16,7 @@
"Qihoo360-Light-R1-32B"
];
models = "/zfs/models";
acceleration = "cuda";
openFirewall = true;
};
# open-webui = {

View File

@@ -60,20 +60,16 @@
extraPackages =
python3Packages: with python3Packages; [
aioesphomeapi # for esphome
aiounifi # for ubiquiti integration
bleak-esphome # for esphome
esphome-dashboard-api # for esphome
forecast-solar # for solar forecast
gtts # not sure what wants this
ical # for todo
jellyfin-apiclient-python # for jellyfin
paho-mqtt # for mqtt
psycopg2 # for postgresql
py-improv-ble-client # for esphome
pymodbus # for modbus
pyopenweathermap # for weather
uiprotect # for ubiquiti integration
unifi-discovery # for ubiquiti integration
];
extraComponents = [ "isal" ];
};

View File

@@ -10,7 +10,7 @@ sensor:
max_sub_interval:
minutes: 5
- platform: integration
source: sensor.batteries_jk0_discharging_power
source: sensor.batteries_jk0_charging_power
name: "JK0 energy out"
unique_id: jk0_energy_out_kwh
unit_prefix: k
@@ -30,7 +30,7 @@ sensor:
max_sub_interval:
minutes: 5
- platform: integration
source: sensor.battery1_jk1_discharging_power
source: sensor.battery1_jk1_discharge_power
name: "JK1 energy out"
unique_id: jk1_energy_out_kwh
unit_prefix: k

View File

@@ -17,6 +17,7 @@ in
./services
./hardware.nix
./networking.nix
./nvidia.nix
./programs.nix
./runners
./syncthing.nix

View File

@@ -0,0 +1,60 @@
{
config,
pkgs,
...
}:
let
vars = import ../vars.nix;
in
{
# environment.systemPackages = with pkgs; [ php.withExtensions ({ all, ... }: [ all.pdo_pgsql ]) ];
services.httpd = {
enable = true;
adminAddr = "webmaster@localhost";
enablePHP = true;
phpPackage = pkgs.php.withExtensions (
{ enabled, all }:
enabled
++ [
all.pdo
all.pdo_pgsql
]
);
extraModules = [ "rewrite" ];
virtualHosts.great_cloud_of_witnesses = {
hostName = "localhost";
listen = [
{
ip = "*";
port = 8092;
}
];
documentRoot = "${vars.services}/great_cloud_of_witnesses";
extraConfig = ''
<Directory "${vars.services}/great_cloud_of_witnesses">
AllowOverride All
Require all granted
</Directory>
'';
};
};
sops.secrets.gcw_password = {
sopsFile = ../../../users/secrets.yaml;
neededForUsers = true;
};
users = {
users.gcw = {
isSystemUser = true;
hashedPasswordFile = config.sops.secrets.gcw_password.path;
group = "gcw";
};
groups.gcw = { };
};
}

View File

@@ -0,0 +1,47 @@
let
vars = import ../vars.nix;
in
{
networking.firewall = {
allowedTCPPorts = [
6882
8081
8118
];
allowedUDPPorts = [ 6882 ];
};
virtualisation.oci-containers.containers.qbitvpn = {
image = "binhex/arch-qbittorrentvpn:5.0.3-1-01";
devices = [ "/dev/net/tun:/dev/net/tun" ];
extraOptions = [ "--cap-add=NET_ADMIN" ];
ports = [
"6882:6881"
"6882:6881/udp"
"8081:8081"
"8118:8118"
];
volumes = [
"${vars.docker_configs}/qbitvpn:/config"
"${vars.qbitvpn}:/data"
"${vars.qbitvpn_scratch}:/data/incomplete"
"/etc/localtime:/etc/localtime:ro"
];
environment = {
WEBUI_PORT = "8081";
PUID = "600";
PGID = "100";
VPN_ENABLED = "yes";
VPN_CLIENT = "openvpn";
STRICT_PORT_FORWARD = "yes";
ENABLE_PRIVOXY = "yes";
LAN_NETWORK = "192.168.90.0/24";
NAME_SERVERS = "1.1.1.1,1.0.0.1";
UMASK = "000";
DEBUG = "false";
DELUGE_DAEMON_LOG_LEVEL = "debug";
DELUGE_WEB_LOG_LEVEL = "debug";
};
environmentFiles = [ "${vars.secrets}/docker/qbitvpn" ];
autoStart = true;
};
}

View File

@@ -61,7 +61,31 @@ in
"luks-root-pool-wwn-0x55cd2e4150f01556-part2" =
makeLuksSSD "/dev/disk/by-id/wwn-0x55cd2e4150f01556-part2";
# Media pool
"luks-media_pool-nvme-INTEL_SSDPEK1A118GA_BTOC14120V2J118B-part1" =
makeLuksSSD "/dev/disk/by-id/nvme-INTEL_SSDPEK1A118GA_BTOC14120V2J118B-part1";
"luks-media_pool-nvme-INTEL_SSDPEK1A118GA_BTOC14120WAG118B-part1" =
makeLuksSSD "/dev/disk/by-id/nvme-INTEL_SSDPEK1A118GA_BTOC14120WAG118B-part1";
"luks-media_pool-nvme-INTEL_SSDPE2ME012T4_CVMD5130000G1P2HGN-part1" =
makeLuksSSD "/dev/disk/by-id/nvme-INTEL_SSDPE2ME012T4_CVMD5130000G1P2HGN-part1";
"luks-media_pool-nvme-INTEL_SSDPE2ME012T4_CVMD5130000U1P2HGN-part1" =
makeLuksSSD "/dev/disk/by-id/nvme-INTEL_SSDPE2ME012T4_CVMD5130000U1P2HGN-part1";
# Scratch pool
"luks-scratch-pool-ata-CT480BX500SSD1_2314E6C3C01C-part1" =
makeLuksSSD "/dev/disk/by-id/ata-CT480BX500SSD1_2314E6C3C01C-part1";
"luks-scratch-pool-ata-CT480BX500SSD1_2314E6C3C01E-part1" =
makeLuksSSD "/dev/disk/by-id/ata-CT480BX500SSD1_2314E6C3C01E-part1";
# Storage pool
"luks-storage_pool-nvme-Samsung_SSD_970_EVO_Plus_2TB_S6S2NS0T834822N-part1" =
makeLuksSSD "/dev/disk/by-id/nvme-Samsung_SSD_970_EVO_Plus_2TB_S6S2NS0T834822N-part1";
"luks-storage_pool-nvme-Samsung_SSD_970_EVO_Plus_2TB_S6S2NS0T834817F-part1" =
makeLuksSSD "/dev/disk/by-id/nvme-Samsung_SSD_970_EVO_Plus_2TB_S6S2NS0T834817F-part1";
"luks-storage_pool-nvme-INTEL_MEMPEK1W016GA_PHBT828104DF016D-part1" =
makeLuksSSD "/dev/disk/by-id/nvme-INTEL_MEMPEK1W016GA_PHBT828104DF016D-part1";
"luks-storage_pool-nvme-INTEL_MEMPEK1W016GA_PHBT828105A8016D-part1" =
makeLuksSSD "/dev/disk/by-id/nvme-INTEL_MEMPEK1W016GA_PHBT828105A8016D-part1";
"luks-storage_pool-wwn-0x5000cca23bc438dd-part1" =
makeLuksDevice "/dev/disk/by-id/wwn-0x5000cca23bc438dd-part1";
"luks-storage_pool-wwn-0x5000cca23bd035f5-part1" =

View File

@@ -2,71 +2,31 @@
networking = {
hostName = "jeeves";
hostId = "0e15ce35";
firewall = {
enable = true;
interfaces.br-nix-builder = {
allowedTCPPorts = [ ];
allowedUDPPorts = [ ];
};
};
firewall.enable = true;
useNetworkd = true;
};
systemd.network = {
enable = true;
wait-online = {
enable = false;
anyInterface = true;
};
netdevs = {
"20-br-nix-builder" = {
netdevConfig = {
Kind = "bridge";
Name = "br-nix-builder";
};
};
"30-internet-vlan" = {
netdevConfig = {
Kind = "vlan";
Name = "internet-vlan";
};
vlanConfig.Id = 100;
};
};
networks = {
"10-1GB_Primary" = {
matchConfig.Name = "enp97s0f1";
matchConfig.Name = "enp98s0f0";
address = [ "192.168.99.14/24" ];
routes = [ { Gateway = "192.168.99.1"; } ];
vlan = [ "internet-vlan" ];
linkConfig.RequiredForOnline = "routable";
};
"50-internet-vlan" = {
matchConfig.Name = "internet-vlan";
bridge = [ "br-nix-builder" ];
linkConfig.RequiredForOnline = "no";
"10-1GB_Secondary" = {
matchConfig.Name = "enp98s0f1";
DHCP = "yes";
};
"60-br-nix-builder" = {
matchConfig.Name = "br-nix-builder";
bridgeConfig = { };
address = [ "192.168.3.10/24" ];
routingPolicyRules = [
{
From = "192.168.3.0/24";
Table = 100;
Priority = 100;
}
];
routes = [
{
Gateway = "192.168.3.1";
Table = 100;
GatewayOnLink = false;
Metric = 2048;
PreferredSource = "192.168.3.10";
}
];
linkConfig.RequiredForOnline = "no";
"10-10GB_Primary" = {
matchConfig.Name = "enp97s0f0np0";
DHCP = "yes";
linkConfig.RequiredForOnline = "routable";
};
"10-10GB_Secondary" = {
matchConfig.Name = "enp97s0f1np1";
DHCP = "yes";
};
};
};

16
systems/jeeves/nvidia.nix Normal file
View File

@@ -0,0 +1,16 @@
{ config, ... }:
{
nixpkgs.config.cudaSupport = true;
services.xserver.videoDrivers = [ "nvidia" ];
hardware = {
nvidia = {
modesetting.enable = true;
powerManagement.enable = true;
package = config.boot.kernelPackages.nvidiaPackages.beta;
nvidiaSettings = true;
open = false;
};
nvidia-container-toolkit.enable = true;
};
}

View File

@@ -16,20 +16,11 @@
};
services.nix_builder.containers = {
nix-builder-00.enable = true;
nix-builder-01.enable = true;
nix-builder-02.enable = true;
nix-builder-03.enable = true;
nix-builder-04.enable = true;
nix-builder-05.enable = true;
nix-builder-06.enable = true;
nix-builder-07.enable = true;
nix-builder-08.enable = true;
nix-builder-09.enable = true;
nix-builder-10.enable = true;
nix-builder-11.enable = true;
nix-builder-12.enable = true;
nix-builder-13.enable = true;
nix-builder-14.enable = true;
nix-builder-0.enable = true;
nix-builder-1.enable = true;
nix-builder-2.enable = true;
nix-builder-3.enable = true;
nix-builder-4.enable = true;
nix-builder-5.enable = true;
};
}

View File

@@ -6,19 +6,12 @@
}:
with lib;
let
vars = import ../vars.nix;
cfg = config.services.nix_builder;
in
{
options.services.nix_builder = {
bridgeName = mkOption {
type = types.str;
default = "br-nix-builder";
description = "Bridge name for the builder containers.";
};
containers = mkOption {
options.services.nix_builder.containers = mkOption {
type = types.attrsOf (
types.submodule (
{ name, ... }:
@@ -30,32 +23,18 @@ in
default = { };
description = "GitHub runner container configurations";
};
};
config = {
containers = mapAttrs (
name: containerCfg:
mkIf containerCfg.enable {
config.containers = mapAttrs (
name: cfg:
mkIf cfg.enable {
autoStart = true;
privateNetwork = true;
hostBridge = cfg.bridgeName;
ephemeral = true;
bindMounts = {
storage = {
hostPath = "/zfs/media/github-runners/${name}";
"/storage" = {
mountPoint = "/zfs/media/github-runners/${name}";
isReadOnly = false;
};
host-nix = {
mountPoint = "/host-nix/var/nix/daemon-socket";
hostPath = "/nix/var/nix/daemon-socket";
isReadOnly = false;
};
pat = {
hostPath = "${vars.secrets}/services/github-runners/runner_pat";
mountPoint = "${vars.secrets}/services/github-runners/runner_pat";
isReadOnly = true;
};
"/secrets".mountPoint = "${vars.secrets}/services/github-runners/${name}";
"ssh-keys".mountPoint = "${vars.secrets}/services/github-runners/id_ed25519_github-runners";
};
config =
{
@@ -65,12 +44,6 @@ in
...
}:
{
networking = {
useDHCP = lib.mkDefault true;
interfaces.eth0.useDHCP = true;
# Ensure containers don't inherit the host's stub resolver (127.0.0.53) which was causing issues
useHostResolvConf = false;
};
nix.settings = {
trusted-substituters = [
"https://cache.nixos.org"
@@ -91,13 +64,16 @@ in
"flakes"
"nix-command"
];
sandbox = true;
allowed-users = [ "github-runners" ];
trusted-users = [
"root"
"github-runners"
];
};
programs.ssh.extraConfig = ''
Host jeeves
Port 629
User github-runners
HostName jeeves
IdentityFile ${vars.secrets}/services/github-runners/id_ed25519_github-runners
StrictHostKeyChecking no
UserKnownHostsFile /dev/null
'';
nixpkgs = {
overlays = builtins.attrValues outputs.overlays;
config.allowUnfree = true;
@@ -108,12 +84,13 @@ in
workDir = "/zfs/media/github-runners/${name}";
url = "https://github.com/RichieCahill/dotfiles";
extraLabels = [ "nixos" ];
tokenFile = "${vars.secrets}/services/github-runners/runner_pat";
tokenFile = "${vars.secrets}/services/github-runners/${name}";
user = "github-runners";
group = "github-runners";
extraPackages = with pkgs; [
nixfmt-rfc-style
nixos-rebuild
openssh
treefmt
my_python
];
@@ -127,9 +104,8 @@ in
};
groups.github-runners.gid = 601;
};
system.stateVersion = "24.05";
system.stateVersion = "24.11";
};
}
) cfg.containers;
};
) config.services.nix_builder.containers;
}

View File

@@ -12,30 +12,31 @@ sudo zpool add storage -o ashift=12 special mirror
sudo zpool add storage -o ashift=12 logs mirror
# scratch
sudo zpool create scratch -o ashift=12 -O acltype=posixacl -O atime=off -O dnodesize=auto -O xattr=sa -O compression=zstd -O encryption=aes-256-gcm -O keyformat=hex -O keylocation=file:///key -m /zfs/scratch
sudo zpool create -o ashift=12 -O acltype=posixacl -O atime=off -O dnodesize=auto -O xattr=sa -O compression=zstd -m /zfs/scratch scratch
# media datasets
sudo zfs create media/secure -o encryption=aes-256-gcm -o keyformat=hex -o keylocation=file:///root/zfs.key
sudo zfs create media/secure/docker -o compression=zstd-9
sudo zfs create media/secure/github-runners -o compression=zstd-9 -o sync=disabled
sudo zfs create media/secure/home_assistant -o compression=zstd-19
sudo zfs create media/secure/notes -o copies=2
sudo zfs create media/secure/postgres -o recordsize=16k -o primarycache=metadata
sudo zfs create media/secure/services -o compression=zstd-9
sudo zfs create media/secure/share -o mountpoint=/zfs/media/share -o exec=off
sudo zfs create -o compression=zstd-9 media/docker
sudo zfs create -o compression=zstd-9 -o sync=disabled media/github-runners
sudo zfs create -o copies=3 media/notes
sudo zfs create -o compression=zstd-9 media/plex
sudo zfs create -o compression=zstd-9 media/services
sudo zfs create -o compression=zstd-19 media/home_assistant
sudo zfs create -o exec=off media/share
sudo zfs create -o recordsize=16k -o primarycache=metadata -o mountpoint=/zfs/media/database/postgres media/postgres
# scratch datasets
sudo zfs create scratch/kafka -o mountpoint=/zfs/scratch/kafka -o recordsize=1M
sudo zfs create scratch/transmission -o mountpoint=/zfs/scratch/transmission -o recordsize=16k -o sync=disabled
sudo zfs create -o recordsize=16k -o sync=disabled scratch/qbitvpn
sudo zfs create -o recordsize=16k -o sync=disabled scratch/transmission
sudo zfs create -o recordsize=1M scratch/kafka
# storage datasets
sudo zfs create storage/ollama -o recordsize=1M -o compression=zstd-19 -o sync=disabled
sudo zfs create storage/secure -o encryption=aes-256-gcm -o keyformat=hex -o keylocation=file:///root/zfs.key
sudo zfs create storage/secure/archive -o recordsize=1M -o compression=zstd-19
sudo zfs create storage/secure/library -o recordsize=1M -o compression=zstd-19
sudo zfs create storage/secure/main -o compression=zstd-19
sudo zfs create storage/secure/photos -o recordsize=16K -o compression=zstd-19 -o copies=2
sudo zfs create storage/secure/plex -o recordsize=1M -o compression=zstd-19
sudo zfs create storage/secure/secrets -o compression=zstd-19 -o copies=3
sudo zfs create storage/secure/syncthing -o compression=zstd-19
sudo zfs create storage/secure/transmission -o recordsize=1M -o compression=zstd-9 -o exec=off -o sync=disabled
sudo zfs create -o recordsize=1M -o compression=zstd-19 storage/archive
sudo zfs create -o compression=zstd-19 storage/main
sudo zfs create -o recordsize=16K -o compression=zstd-19 -o copies=2 storage/photos
sudo zfs create -o recordsize=1M -o compression=zstd-19 storage/plex
sudo zfs create -o compression=zstd-19 -o copies=3 storage/secrets
sudo zfs create -o compression=zstd-19 storage/syncthing
sudo zfs create -o recordsize=1M -o compression=zstd-9 -o exec=off -o sync=disabled storage/qbitvpn
sudo zfs create -o recordsize=1M -o compression=zstd-9 -o exec=off -o sync=disabled storage/transmission
sudo zfs create -o recordsize=1M -o compression=zstd-19 storage/library
sudo zfs create -o recordsize=1M -o compression=zstd-19 -o sync=disabled storage/ollama

View File

@@ -0,0 +1,21 @@
{
pkgs,
...
}:
let
vars = import ../vars.nix;
in
{
systemd.services.filebrowser = {
description = "filebrowser";
after = [ "network.target" ];
wantedBy = [ "multi-user.target" ];
serviceConfig = {
Type = "simple";
User = "richie";
Group = "users";
ExecStart = "${pkgs.filebrowser}/bin/filebrowser --root=/zfs --address=0.0.0.0 --database=${vars.docker_configs}/filebrowser/filebrowser.db";
Restart = "on-failure";
};
};
}

View File

@@ -1,46 +0,0 @@
let
vars = import ../vars.nix;
in
{
networking.firewall.allowedTCPPorts = [ 6443 ];
services.gitea = {
enable = true;
appName = "TMM Workshop";
stateDir = "${vars.services}/gitea/";
lfs.enable = true;
database = {
type = "postgres";
name = "gitea";
user = "gitea";
socket = "/run/postgresql";
port = 5432;
createDatabase = false;
};
settings = {
service.DISABLE_REGISTRATION = true;
server = {
DOMAIN = "tmmworkshop.com";
ROOT_URL = "https://gitea.tmmworkshop.com/";
HTTP_PORT = 6443;
SSH_PORT = 2223;
SSH_LISTEN_PORT = 2224;
START_SSH_SERVER = true;
PUBLIC_URL_DETECTION = "auto";
};
repository = {
ENABLE_PUSH_CREATE_USER = true;
DEFAULT_MERGE_STYLE = "rebase-merge";
};
log = {
LEVEL = "Trace";
ENABLE_SSH_LOG = true;
};
};
};
systemd.services.gitea = {
requires = [ "docker.service" ];
after = [ "docker.service" ];
};
}

View File

@@ -27,21 +27,21 @@ frontend ContentSwitching
# tmmworkshop.com
acl host_audiobookshelf hdr(host) -i audiobookshelf.tmmworkshop.com
acl host_cache hdr(host) -i cache.tmmworkshop.com
acl host_filebrowser hdr(host) -i filebrowser.tmmworkshop.com
acl host_homeassistant hdr(host) -i homeassistant.tmmworkshop.com
acl host_jellyfin hdr(host) -i jellyfin.tmmworkshop.com
acl host_share hdr(host) -i share.tmmworkshop.com
acl host_gcw hdr(host) -i gcw.tmmworkshop.com
acl host_n8n hdr(host) -i n8n.tmmworkshop.com
acl host_gitea hdr(host) -i gitea.tmmworkshop.com
use_backend audiobookshelf_nodes if host_audiobookshelf
use_backend cache_nodes if host_cache
use_backend filebrowser_nodes if host_filebrowser
use_backend homeassistant_nodes if host_homeassistant
use_backend jellyfin if host_jellyfin
use_backend share_nodes if host_share
use_backend gcw_nodes if host_gcw
use_backend n8n if host_n8n
use_backend gitea if host_gitea
backend audiobookshelf_nodes
mode http
@@ -51,6 +51,10 @@ backend cache_nodes
mode http
server server 127.0.0.1:5000
backend filebrowser_nodes
mode http
server server 127.0.0.1:8080
backend homeassistant_nodes
mode http
server server 192.168.90.35:8123
@@ -73,7 +77,3 @@ backend gcw_nodes
backend n8n
mode http
server server 127.0.0.1:5678
backend gitea
mode http
server server 127.0.0.1:6443

View File

@@ -9,25 +9,13 @@ in
host = "0.0.0.0";
loadModels = [
"codellama:7b"
"deepscaler:1.5b"
"deepseek-r1:14b"
"deepseek-r1:32b"
"deepseek-r1:8b"
"devstral-small-2:24b"
"functiongemma:270m"
"gemma3:12b"
"gemma3:27b"
"gpt-oss:120b"
"gpt-oss:20b"
"llama3.1:70b"
"llama3.1:8b"
"llama3.2:1b"
"llama3.2:3b"
"magistral:24b"
"ministral-3:14b"
"nemotron-3-nano:30b"
"qwen3-coder:30b"
"qwen3-vl:32b"
"qwen3:14b"
"qwen3:30b"
];

View File

@@ -1,16 +0,0 @@
let
vars = import ../vars.nix;
in
{
services.open-webui = {
stateDir = "${vars.services}/open_webui/";
enable = true;
openFirewall = true;
environment = {
ANONYMIZED_TELEMETRY = "False";
DO_NOT_TRACK = "True";
SCARF_NO_ANALYTICS = "True";
OLLAMA_API_BASE_URL = "http://127.0.0.1:11434";
};
};
}

View File

@@ -28,7 +28,25 @@ in
#type database DBuser origin-address auth-method
local hass hass trust
local gitea gitea trust
# ipv4
host hass hass 192.168.90.1/24 trust
host hass hass 127.0.0.1/32 trust
# ipv6
host hass hass ::1/128 trust
# megan
host megan megan 192.168.90.1/24 trust
host megan megan 127.0.0.1/32 trust
host gcw megan 192.168.90.1/24 trust
host gcw megan 127.0.0.1/32 trust
# gcw
local gcw gcw trust
host gcw gcw 192.168.90.1/24 trust
host gcw gcw 127.0.0.1/32 trust
# math
local postgres math trust
@@ -79,7 +97,17 @@ in
};
}
{
name = "gitea";
name = "megan";
ensureDBOwnership = true;
ensureClauses = {
login = true;
createrole = true;
createdb = true;
replication = true;
};
}
{
name = "gcw";
ensureDBOwnership = true;
ensureClauses = {
login = true;
@@ -100,9 +128,12 @@ in
}
];
ensureDatabases = [
"gcw"
"hass"
"gitea"
"math"
"megan"
"mxr_dev"
"mxr_prod"
"n8n"
"richie"
];

View File

@@ -3,7 +3,9 @@ services = [
"audiobookshelf",
"cloud_flare_tunnel",
"haproxy",
"docker-qbitvpn",
"docker",
"filebrowser",
"home-assistant",
"jellyfin",
]

View File

@@ -64,12 +64,24 @@ hourly = 12
daily = 14
monthly = 2
["scratch/qbitvpn"]
15_min = 0
hourly = 0
daily = 0
monthly = 0
["scratch/transmission"]
15_min = 0
hourly = 0
daily = 0
monthly = 0
["storage/qbitvpn"]
15_min = 0
hourly = 0
daily = 0
monthly = 0
["storage/transmission"]
15_min = 0
hourly = 0

View File

@@ -10,6 +10,8 @@ in
docker_configs = "${zfs_media}/docker/configs";
home_assistant = "${zfs_media}/home_assistant";
notes = "${zfs_media}/notes";
qbitvpn = "${zfs_storage}/qbitvpn";
qbitvpn_scratch = "${zfs_scratch}/qbitvpn";
secrets = "${zfs_storage}/secrets";
services = "${zfs_media}/services";
share = "${zfs_media}/share";

View File

@@ -13,7 +13,6 @@
./hardware.nix
./llms.nix
./syncthing.nix
./qmk.nix
inputs.nixos-hardware.nixosModules.framework-13-7040-amd
];

View File

@@ -1,34 +0,0 @@
{ pkgs, ... }:
{
environment.systemPackages = with pkgs; [
qmk
vial
];
services = {
udev = {
packages = [ pkgs.qmk-udev-rules ];
extraRules = ''
# Keychron keyboards
KERNEL=="hidraw*", ATTRS{idVendor}=="3434", MODE="0660", GROUP="plugdev"
SUBSYSTEM=="usb", ATTR{idVendor}=="3434", MODE="0660", GROUP="plugdev"
# Some boards use 32f0 as vendor id
KERNEL=="hidraw*", ATTRS{idVendor}=="32f0", MODE="0660", GROUP="plugdev"
SUBSYSTEM=="usb", ATTR{idVendor}=="32f0", MODE="0660", GROUP="plugdev"
# Keychron HID device permissions
SUBSYSTEM=="usb", ATTR{idVendor}=="3434", MODE="0660", GROUP="plugdev"
SUBSYSTEM=="hidraw", ATTRS{idVendor}=="3434", MODE="0660", GROUP="plugdev"
KERNEL=="hidraw*", ATTRS{idVendor}=="32f0", MODE="0660", GROUP="plugdev"
SUBSYSTEM=="usb", ATTR{idVendor}=="32f0", MODE="0660", GROUP="plugdev"
# Keychron / QMK common bootloaders
SUBSYSTEM=="usb", ATTR{idVendor}=="0483", ATTR{idProduct}=="df11", MODE="0660", GROUP="plugdev"
SUBSYSTEM=="usb", ATTR{idVendor}=="03eb", MODE="0660", GROUP="plugdev"
SUBSYSTEM=="usb", ATTR{idVendor}=="16c0", MODE="0660", GROUP="plugdev"
'';
};
};
}

6
tests/conftest.py Normal file
View File

@@ -0,0 +1,6 @@
"""Fixtures for tests."""
from __future__ import annotations
PASSWORD = "password" # noqa: S105
TOKEN = "token" # noqa: S105

View File

@@ -0,0 +1,75 @@
"""test_fix_eval_warnings."""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
from typer.testing import CliRunner
from python.tools.fix_eval_warnings import Config, app, generate_fix, parse_warnings
from tests.conftest import TOKEN
if TYPE_CHECKING:
from pyfakefs.fake_filesystem import FakeFilesystem
from pytest_mock import MockerFixture
runner = CliRunner()
def test_parse_warnings(fs: FakeFilesystem) -> None:
"""test_parse_warnings."""
log_file = Path("/build.log")
fs.create_file(
log_file,
contents="Some output\nevaluation warning: 'system' is deprecated\nMore output",
encoding="utf-8",
)
warnings = parse_warnings(log_file)
assert len(warnings) == 1
assert warnings[0] == "evaluation warning: 'system' is deprecated"
def test_generate_fix(mocker: MockerFixture) -> None:
"""test_generate_fix."""
mock_post = mocker.patch("python.tools.fix_eval_warnings.requests.post")
mock_response = mocker.MagicMock()
mock_response.json.return_value = {
"choices": [{"message": {"content": "Use stdenv.hostPlatform.system"}}]
}
mock_post.return_value = mock_response
config = Config(github_token=TOKEN)
fix = generate_fix("evaluation warning: 'system' is deprecated", config)
assert fix == "Use stdenv.hostPlatform.system"
mock_post.assert_called_once()
def test_main(mocker: MockerFixture, fs: FakeFilesystem) -> None:
"""test_main."""
log_file = Path("/build.log")
fs.create_file(
log_file,
contents="Some output\nevaluation warning: 'system' is deprecated\nMore output",
encoding="utf-8",
)
mock_generate_fix = mocker.patch("python.tools.fix_eval_warnings.generate_fix")
mock_generate_fix.return_value = "Fixed it"
mock_logger = mocker.patch("python.tools.fix_eval_warnings.logger")
# We need to mock GITHUB_TOKEN env var or the script will warn/fail
mocker.patch.dict("os.environ", {"GITHUB_TOKEN": TOKEN})
result = runner.invoke(app, [str(log_file)])
assert result.exit_code == 0
# Verify logger calls instead of stdout, as CliRunner might not capture logging output correctly
# when logging is configured to write to sys.stdout directly.
assert any("Found 1 warnings" in str(call) for call in mock_logger.info.call_args_list)
assert any(
"Fix suggestions written to fix_suggestions.md" in str(call)
for call in mock_logger.info.call_args_list
)
assert Path("fix_suggestions.md").exists()

30
users/megan/default.nix Normal file
View File

@@ -0,0 +1,30 @@
{
pkgs,
config,
...
}:
{
sops.secrets.megan_password = {
sopsFile = ../secrets.yaml;
neededForUsers = true;
};
users = {
users.megan = {
isNormalUser = true;
hashedPasswordFile = "${config.sops.secrets.megan_password.path}";
shell = pkgs.zsh;
group = "megan";
extraGroups = [
"audio"
"video"
"users"
];
uid = 1101;
};
groups.megan.gid = 1101;
};
home-manager.users.megan = import ./systems/${config.networking.hostName}.nix;
}

View File

@@ -0,0 +1,9 @@
{
imports = [
./direnv.nix
./git.nix
./zsh.nix
];
programs.starship.enable = true;
}

View File

@@ -0,0 +1,8 @@
{
programs.direnv = {
enable = true;
enableZshIntegration = true;
nix-direnv.enable = true;
};
}

View File

@@ -0,0 +1,14 @@
{
programs.git = {
enable = true;
settings = {
user = {
email = "mousikos112@gmail.com";
name = "megan";
};
pull.rebase = true;
color.ui = true;
};
lfs.enable = true;
};
}

View File

@@ -0,0 +1,31 @@
{
programs.zsh = {
enable = true;
syntaxHighlighting.enable = true;
history.size = 10000;
oh-my-zsh = {
enable = true;
plugins = [
"git"
"docker"
"docker-compose"
"colored-man-pages"
"rust"
"systemd"
"tmux"
"ufw"
"z"
];
};
shellAliases = {
"lrt" = "eza --icons -lsnew";
"ls" = "eza";
"ll" = "eza --long --group";
"la" = "eza --all";
"rspace" = "'for f in *\ *; do mv \"$f\" \"\${f// /_}\"; done'";
"rebuild" = "sudo nixos-rebuild switch --flake /home/richie/dotfiles#$HOST";
"nix-test" = "nixos-rebuild test --flake /home/richie/dotfiles";
};
};
}

View File

@@ -0,0 +1,18 @@
{ config, ... }:
{
imports = [
./cli
./programs.nix
];
programs = {
home-manager.enable = true;
git.enable = true;
};
home = {
username = "megan";
homeDirectory = "/home/${config.home.username}";
stateVersion = "24.05";
};
}

View File

@@ -0,0 +1,41 @@
{ pkgs, ... }:
{
home.packages = with pkgs; [
# cli
bat
btop
eza
fd
ffmpegthumbnailer
fzf
git
gnupg
imagemagick
jq
ncdu
neofetch
ouch
p7zip
poppler
rar
ripgrep
starship
tmux
unzip
yazi
zoxide
# system info
hwloc
lynis
pciutils
smartmontools
usbutils
# networking
iperf3
nmap
wget
# python
poetry
ruff
];
}

View File

@@ -0,0 +1,5 @@
{
imports = [
../home/global.nix
];
}

View File

@@ -41,7 +41,6 @@ in
"scanner"
"transmission"
"uaccess"
"uucp"
"wireshark"
];
uid = 1000;

View File

@@ -23,11 +23,10 @@
signal-desktop
zoom-us
# dev tools
claude-code
gparted
jetbrains.datagrip
master.antigravity-fhs
proxychains
master.antigravity-fhs
gparted
# games
dwarf-fortress
tower-pixel-dungeon