Compare commits

..

1 Commits

Author SHA1 Message Date
6e5a8429b3 added merge_group to build_systems and treefmt 2025-07-28 12:50:21 -04:00
160 changed files with 498 additions and 7896 deletions

View File

@@ -2,6 +2,7 @@ name: build_systems
on:
workflow_dispatch:
pull_request:
merge_group:
push:
branches: [main]
schedule:
@@ -15,14 +16,12 @@ jobs:
matrix:
system:
- "bob"
- "brain"
- "jeeves"
- "leviathan"
- "rhapsody-in-green"
continue-on-error: true
steps:
- uses: actions/checkout@v4
- name: Build default package
run: "nixos-rebuild build --flake ./#${{ matrix.system }}"
- name: copy to nix-cache
run: nix copy --accept-flake-config --to unix:///host-nix/var/nix/daemon-socket/socket .#nixosConfigurations.${{ matrix.system }}.config.system.build.toplevel
run: nix copy --to ssh://jeeves .#nixosConfigurations.${{ matrix.system }}.config.system.build.toplevel

View File

@@ -1,19 +0,0 @@
name: pytest
on:
push:
branches:
- main
pull_request:
branches:
- main
merge_group:
jobs:
pytest:
runs-on: self-hosted
steps:
- uses: actions/checkout@v4
- name: Run tests
run: nix develop .#devShells.x86_64-linux.default -c pytest tests

View File

@@ -1,19 +0,0 @@
name: pytest_safe
on:
push:
branches:
- main
pull_request:
branches:
- main
merge_group:
jobs:
pytest:
runs-on: self-hosted
steps:
- uses: actions/checkout@v4
- name: Run tests
run: unshare --map-root-user --user --net -- pytest tests

View File

@@ -2,6 +2,7 @@ name: treefmt
on:
workflow_dispatch:
pull_request:
merge_group:
push:
branches: [main]

View File

@@ -1,13 +1,9 @@
# Generate AGE keys from SSH keys with:
# ssh-keygen -A
# nix-shell -p ssh-to-age --run 'cat /etc/ssh/ssh_host_ed25519_key.pub | ssh-to-age'
keys:
- &admin_richie age1u8zj599elqqvcmhxn8zuwrufsz8w8w366d3ayrljjejljt2q45kq8mxw9c # cspell:disable-line
- &system_bob age1q47vup0tjhulkg7d6xwmdsgrw64h4ax3la3evzqpxyy4adsmk9fs56qz3y # cspell:disable-line
- &system_brain age1jhf7vm0005j60mjq63696frrmjhpy8kpc2d66mw044lqap5mjv4snmwvwm # cspell:disable-line
- &system_jeeves age13lmqgc3jvkyah5e3vcwmj4s5wsc2akctcga0lpc0x8v8du3fxprqp4ldkv # cspell:disable-line
- &system_leviathan age1l272y8udvg60z7edgje42fu49uwt4x2gxn5zvywssnv9h2krms8s094m4k # cspell:disable-line
- &system_router age1xzxryqq63x65yuza9lmmkud7crjjxpnkdew070yhx6xn7xe4tdws5twxsv # cspell:disable-line
- &system_rhapsody age1ufnewppysaq2wwcl4ugngjz8pfzc5a35yg7luq0qmuqvctajcycs5lf6k4 # cspell:disable-line
creation_rules:
@@ -16,7 +12,6 @@ creation_rules:
- age:
- *admin_richie
- *system_bob
- *system_brain
- *system_jeeves
- *system_leviathan
- *system_router
- *system_rhapsody

40
.vscode/settings.json vendored
View File

@@ -2,7 +2,6 @@
"cSpell.words": [
"aboutwelcome",
"acltype",
"addopts",
"addstr",
"advplyr",
"ahci",
@@ -10,7 +9,6 @@
"aiounifi",
"alsa",
"apiclient",
"apscheduler",
"archlinux",
"ashift",
"asrouter",
@@ -81,6 +79,7 @@
"FASTFOX",
"ffmpegthumbnailer",
"filebot",
"filebrowser",
"fileroller",
"findbar",
"Fira",
@@ -97,7 +96,6 @@
"getch",
"getmaxyx",
"ghdeploy",
"gitea",
"globalprivacycontrol",
"gparted",
"gtts",
@@ -116,9 +114,6 @@
"httpchk",
"hurlenko",
"hwloc",
"ical",
"ignorelist",
"improv",
"INITDB",
"iocharset",
"ioit",
@@ -128,8 +123,6 @@
"jnoortheen",
"jsbc",
"kagi",
"keyformat",
"keylocation",
"kuma",
"lazer",
"levelname",
@@ -155,15 +148,11 @@
"mixtral",
"mklabel",
"mkpart",
"modbus",
"modbuss",
"modesetting",
"mountpoint",
"mountpoints",
"mousewheel",
"mqtt",
"mtxr",
"mypy",
"ncdu",
"nemo",
"neofetch",
@@ -195,7 +184,6 @@
"overalljails",
"overscroll",
"overseerr",
"paho",
"partitionwise",
"pbmode",
"pciutils",
@@ -223,15 +211,13 @@
"pulseaudio",
"punycode",
"pychromecast",
"pydocstyle",
"pyfakefs",
"pylance",
"pylint",
"pymetno",
"pymodbus",
"pyopenweathermap",
"pyownet",
"pytest",
"qbit",
"qbittorrent",
"qbittorrentvpn",
"qbitvpn",
"quicksuggest",
"radarr",
"readahead",
@@ -253,7 +239,6 @@
"schemeless",
"scrollback",
"SECUREFOX",
"sessionmaker",
"sessionstore",
"shellcheck",
"signon",
@@ -265,7 +250,6 @@
"socialtracking",
"sonarr",
"sponsorblock",
"sqlalchemy",
"sqltools",
"ssdp",
"SSHOPTS",
@@ -277,7 +261,6 @@
"tabmanager",
"tamasfe",
"TCPIP",
"testdisk",
"tiktok",
"timonwong",
"titlebar",
@@ -287,13 +270,10 @@
"topstories",
"treefmt",
"twimg",
"typer",
"uaccess",
"ubiquiti",
"ublock",
"uiprotect",
"uitour",
"unifi",
"unrar",
"unsubmitted",
"uptimekuma",
@@ -305,7 +285,6 @@
"usernamehw",
"userprefs",
"vfat",
"victron",
"virt",
"virtualisation",
"vpnpromourl",
@@ -317,8 +296,6 @@
"wireshark",
"Workqueues",
"xattr",
"xcursorgen",
"xdist",
"xhci",
"yazi",
"yubikey",
@@ -330,10 +307,5 @@
"zoxide",
"zram",
"zstd"
],
"python-envs.defaultEnvManager": "ms-python.python:system",
"python-envs.pythonProjects": [],
"python.testing.pytestArgs": ["tests"],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
]
}

View File

@@ -1,5 +0,0 @@
## Dev environment tips
- use treefmt to format all files
- make python code ruff compliant
- use pytest to test python code

View File

@@ -23,7 +23,7 @@
boot = {
tmp.useTmpfs = true;
kernelPackages = lib.mkDefault pkgs.linuxPackages_6_12;
zfs.package = lib.mkDefault pkgs.zfs_2_4;
zfs.package = lib.mkDefault pkgs.zfs_2_3;
};
hardware.enableRedistributableFirmware = true;
@@ -44,10 +44,7 @@
# firmware update
fwupd.enable = true;
snapshot_manager = {
enable = lib.mkDefault true;
PYTHONPATH = "${inputs.self}/";
};
snapshot_manager.enable = lib.mkDefault true;
zfs = {
trim.enable = lib.mkDefault true;

View File

@@ -1,10 +1,4 @@
{ lib, pkgs, ... }:
let
libPath = pkgs.lib.makeLibraryPath [
pkgs.zlib
pkgs.stdenv.cc.cc.lib
];
in
{
programs.nix-ld = {
enable = lib.mkDefault true;
@@ -21,7 +15,6 @@ in
libxml2
openssl
stdenv.cc.cc
stdenv.cc.cc.lib
systemd
util-linux
xz
@@ -30,9 +23,4 @@ in
zstd
];
};
environment = {
sessionVariables.LD_LIBRARY_PATH = lib.mkDefault libPath;
variables.LD_LIBRARY_PATH = lib.mkDefault libPath;
};
}

View File

@@ -2,6 +2,6 @@
{
environment.systemPackages = with pkgs; [
git
my_python
python313
];
}

View File

@@ -1,4 +1,5 @@
{
inputs,
pkgs,
lib,
config,
@@ -10,48 +11,33 @@ in
{
options = {
services.snapshot_manager = {
enable = lib.mkEnableOption "ZFS snapshot manager";
enable = lib.mkOption {
default = true;
example = true;
description = "Whether to enable k3s-net.";
type = lib.types.bool;
};
path = lib.mkOption {
type = lib.types.path;
description = "Path that needs to be updated via git pull";
default = ./snapshot_config.toml;
description = "Path to the snapshot_manager TOML config.";
};
PYTHONPATH = lib.mkOption {
type = lib.types.str;
description = ''
the PYTHONPATH to use for the snapshot_manager service.
'';
};
EnvironmentFile = lib.mkOption {
type = lib.types.nullOr (lib.types.coercedTo lib.types.path toString lib.types.str);
default = null;
description = ''
Single environment file for the service (e.g. /etc/snapshot-manager/env).
Use a leading "-" to ignore if missing (systemd feature).
'';
};
};
};
config = lib.mkIf cfg.enable {
systemd = {
services.snapshot_manager = {
services."snapshot_manager" = {
description = "ZFS Snapshot Manager";
requires = [ "zfs-import.target" ];
after = [ "zfs-import.target" ];
path = [ pkgs.zfs ];
environment = {
PYTHONPATH = cfg.PYTHONPATH;
};
serviceConfig = {
Type = "oneshot";
ExecStart = "${pkgs.my_python}/bin/python -m python.tools.snapshot_manager ${lib.escapeShellArg cfg.path}";
}
// lib.optionalAttrs (cfg.EnvironmentFile != null) {
EnvironmentFile = cfg.EnvironmentFile;
ExecStart = "${inputs.system_tools.packages.x86_64-linux.default}/bin/snapshot_manager --config-file='${cfg.path}'";
};
};
timers.snapshot_manager = {
timers."snapshot_manager" = {
wantedBy = [ "timers.target" ];
timerConfig = {
OnBootSec = "15m";

View File

@@ -37,8 +37,6 @@
TcpKeepAlive = "no";
X11Forwarding = lib.mkDefault false;
KexAlgorithms = [
"sntrup761x25519-sha512@openssh.com"
"mlkem768x25519-sha256"
"curve25519-sha256@libssh.org"
"diffie-hellman-group-exchange-sha256"
];

View File

@@ -1,8 +1,8 @@
{ pkgs, ... }:
{
boot = {
kernelPackages = pkgs.linuxPackages_6_18;
zfs.package = pkgs.zfs_2_4;
kernelPackages = pkgs.linuxPackages_6_15;
zfs.package = pkgs.zfs_2_3;
};
hardware.bluetooth = {

View File

@@ -10,9 +10,6 @@
authorizedKeys = config.users.users.richie.openssh.authorizedKeys.keys;
};
};
availableKernelModules = [
"igb"
"r8152"
];
availableKernelModules = [ "igb" ];
};
}

View File

@@ -8,11 +8,10 @@
dataDir = "/home/richie/Syncthing";
configDir = "/home/richie/.config/syncthing";
settings.devices = {
bob.id = "CJIAPEJ-VO74RR4-F75VU6M-QNZAMYG-FYUJG7Y-6AT62HJ-355PRPL-PJFETAZ"; # cspell:disable-line
brain.id = "SSCGIPI-IV3VYKB-TRNIJE3-COV4T2H-CDBER7F-I2CGHYA-NWOEUDU-3T5QAAN"; # cspell:disable-line
ipad.id = "KI76T3X-SFUGV2L-VSNYTKR-TSIUV5L-SHWD3HE-GQRGRCN-GY4UFMD-CW6Z6AX"; # cspell:disable-line
jeeves.id = "ICRHXZW-ECYJCUZ-I4CZ64R-3XRK7CG-LL2HAAK-FGOHD22-BQA4AI6-5OAL6AG"; # cspell:disable-line
phone.id = "TBRULKD-7DZPGGZ-F6LLB7J-MSO54AY-7KLPBIN-QOFK6PX-W2HBEWI-PHM2CQI"; # cspell:disable-line
jeeves.id = "ICRHXZW-ECYJCUZ-I4CZ64R-3XRK7CG-LL2HAAK-FGOHD22-BQA4AI6-5OAL6AG"; # cspell:disable-line
ipad.id = "KI76T3X-SFUGV2L-VSNYTKR-TSIUV5L-SHWD3HE-GQRGRCN-GY4UFMD-CW6Z6AX"; # cspell:disable-line
bob.id = "CJIAPEJ-VO74RR4-F75VU6M-QNZAMYG-FYUJG7Y-6AT62HJ-355PRPL-PJFETAZ"; # cspell:disable-line
rhapsody-in-green.id = "ASL3KC4-3XEN6PA-7BQBRKE-A7JXLI6-DJT43BY-Q4WPOER-7UALUAZ-VTPQ6Q4"; # cspell:disable-line
};
};

View File

@@ -5,7 +5,5 @@
randomizedDelaySec = "1h";
persistent = true;
flake = "github:RichieCahill/dotfiles";
allowReboot = true;
dates = "Sat *-*-* 06:00:00";
};
}

View File

@@ -1,4 +0,0 @@
source "https://rubygems.org"
# The github-pages gem pins all compatible versions of Jekyll and its plugins
gem "github-pages", group: :jekyll_plugins

View File

@@ -1,23 +0,0 @@
title: "Richie Cahill"
description: "ALL THE CHAOS THAT I CANT DO AT WORK"
baseurl: "/dotfiles"
url: "https://richiecahill.github.io"
remote_theme: pages-themes/hacker@v0.2.0
plugins:
- jekyll-feed
- jekyll-remote-theme
- jekyll-seo-tag
- jekyll-sitemap
- jekyll-paginate
paginate: 5
paginate_path: "/page:num"
author:
name: "Richie Cahill"
email: "richie@tmmworkshop.com"
social_links:
github: "RichieCahill"
website: "https://tmmworkshop.com"

View File

@@ -1,13 +0,0 @@
# The MONOREPO experiment
Im testing a [MONOREPO](https://en.wikipedia.org/wiki/Monorepo) because Phil said this was a bad idea. To that i say hold my beer.
In all seriousness, I Think that for a small dev team/solo dev. The simplicity is worth higher barer to entry. One of my most annoying processes was updating my system tools. I had to build my update in a feature branch and then merge it into my main branch. then go to my dotfiles create a feature branch update the system tools merge it into main.
It will be starting with my Nix Dotfiles Python tools and now my blog.
I will be reaching ot to phil on 2030-10-31 and 2035-10-31 to give him updates on the progress.
Known Issues:
- the python tests are running on the current derivation not the one the derivation im updating to.

View File

@@ -1,17 +0,0 @@
---
layout: default
title: "Welcome"
---
Welcome to my build logs, notes, and experiments.
You can read my latest posts below
<ul>
{% for post in site.posts %}
<li>
<a href="{{ post.url | relative_url }}">{{ post.title }}</a>
<small>— {{ post.date | date: "%Y-%m-%d" }}</small>
</li>
{% endfor %}
</ul>

3
esphome/.gitignore vendored
View File

@@ -1,3 +0,0 @@
# esphome
/.esphome/
/secrets.yaml

View File

@@ -1,132 +0,0 @@
esphome:
name: batteries
friendly_name: batteries
esp32:
board: esp32dev
framework:
type: arduino
logger:
api:
encryption:
key: !secret api_key
external_components:
- source: github://syssi/esphome-jk-bms@main
ota:
- platform: esphome
password: !secret ota_password
wifi:
ssid: !secret wifi_ssid
password: !secret wifi_password
fast_connect: on
captive_portal:
esp32_ble_tracker:
scan_parameters:
interval: 1100ms
window: 1100ms
active: true
ble_client:
- mac_address: "C8:47:80:29:0F:DB"
id: jk_ble0
jk_bms_ble:
- ble_client_id: jk_ble0
protocol_version: JK02_32S
throttle: 1s
id: jk_bms0
button:
- platform: jk_bms_ble
retrieve_settings:
name: "JK0 retrieve settings"
retrieve_device_info:
name: "JK0 retrieve device info"
sensor:
- platform: jk_bms_ble
jk_bms_ble_id: jk_bms0
total_voltage:
name: "JK0 Total Voltage"
state_of_charge:
name: "JK0 SoC"
charging_power:
name: "JK0 charging power"
discharging_power:
name: "JK0 discharging power"
temperature_sensor_1:
name: "JK0 Temp 1"
temperature_sensor_2:
name: "JK0 Temp 2"
balancing:
name: "JK0 balancing"
total_runtime:
name: "JK0 total runtime"
balancing_current:
name: "JK0 balancing current"
delta_cell_voltage:
name: "JK0 cell delta voltage"
average_cell_voltage:
name: "JK0 cell average voltage"
cell_voltage_1:
name: "JK0 cell voltage 1"
cell_voltage_2:
name: "JK0 cell voltage 2"
cell_voltage_3:
name: "JK0 cell voltage 3"
cell_voltage_4:
name: "JK0 cell voltage 4"
cell_voltage_5:
name: "JK0 cell voltage 5"
cell_voltage_6:
name: "JK0 cell voltage 6"
cell_voltage_7:
name: "JK0 cell voltage 7"
cell_voltage_8:
name: "JK0 cell voltage 8"
cell_resistance_1:
name: "JK0 cell resistance 1"
cell_resistance_2:
name: "JK0 cell resistance 2"
cell_resistance_3:
name: "JK0 cell resistance 3"
cell_resistance_4:
name: "JK0 cell resistance 4"
cell_resistance_5:
name: "JK0 cell resistance 5"
cell_resistance_6:
name: "JK0 cell resistance 6"
cell_resistance_7:
name: "JK0 cell resistance 7"
cell_resistance_8:
name: "JK0 cell resistance 8"
total_charging_cycle_capacity:
name: "JK0 total charging cycle capacity"
text_sensor:
- platform: jk_bms_ble
jk_bms_ble_id: jk_bms0
errors:
name: "JK0 Errors"
switch:
- platform: jk_bms_ble
jk_bms_ble_id: jk_bms0
charging:
name: "JK0 Charging"
discharging:
name: "JK0 Discharging"
balancer:
name: "JK0 Balancing"
- platform: ble_client
ble_client_id: jk_ble0
name: "JK0 enable bluetooth connection"
id: ble_client_switch0

View File

@@ -1,132 +0,0 @@
esphome:
name: battery1
friendly_name: battery1
esp32:
board: esp32dev
framework:
type: arduino
logger:
api:
encryption:
key: !secret api_key
external_components:
- source: github://syssi/esphome-jk-bms@main
ota:
- platform: esphome
password: !secret ota_password
wifi:
ssid: !secret wifi_ssid
password: !secret wifi_password
fast_connect: on
captive_portal:
esp32_ble_tracker:
scan_parameters:
interval: 1100ms
window: 1100ms
active: true
ble_client:
- mac_address: "C8:47:80:37:9D:DD"
id: jk_ble1
jk_bms_ble:
- ble_client_id: jk_ble1
protocol_version: JK02_32S
throttle: 1s
id: jk_bms1
button:
- platform: jk_bms_ble
retrieve_settings:
name: "JK1 retrieve settings"
retrieve_device_info:
name: "JK1 retrieve device info"
sensor:
- platform: jk_bms_ble
jk_bms_ble_id: jk_bms1
total_voltage:
name: "JK1 Total Voltage"
state_of_charge:
name: "JK1 SoC"
charging_power:
name: "JK1 charging power"
discharging_power:
name: "JK1 discharging power"
temperature_sensor_1:
name: "JK1 Temp 1"
temperature_sensor_2:
name: "JK1 Temp 2"
balancing:
name: "JK1 balancing"
total_runtime:
name: "JK1 total runtime"
balancing_current:
name: "JK1 balancing current"
delta_cell_voltage:
name: "JK1 cell delta voltage"
average_cell_voltage:
name: "JK1 cell average voltage"
cell_voltage_1:
name: "JK1 cell voltage 1"
cell_voltage_2:
name: "JK1 cell voltage 2"
cell_voltage_3:
name: "JK1 cell voltage 3"
cell_voltage_4:
name: "JK1 cell voltage 4"
cell_voltage_5:
name: "JK1 cell voltage 5"
cell_voltage_6:
name: "JK1 cell voltage 6"
cell_voltage_7:
name: "JK1 cell voltage 7"
cell_voltage_8:
name: "JK1 cell voltage 8"
cell_resistance_1:
name: "JK1 cell resistance 1"
cell_resistance_2:
name: "JK1 cell resistance 2"
cell_resistance_3:
name: "JK1 cell resistance 3"
cell_resistance_4:
name: "JK1 cell resistance 4"
cell_resistance_5:
name: "JK1 cell resistance 5"
cell_resistance_6:
name: "JK1 cell resistance 6"
cell_resistance_7:
name: "JK1 cell resistance 7"
cell_resistance_8:
name: "JK1 cell resistance 8"
total_charging_cycle_capacity:
name: "JK1 total charging cycle capacity"
text_sensor:
- platform: jk_bms_ble
jk_bms_ble_id: jk_bms1
errors:
name: "JK1 Errors"
switch:
- platform: jk_bms_ble
jk_bms_ble_id: jk_bms1
charging:
name: "JK1 Charging"
discharging:
name: "JK1 Discharging"
balancer:
name: "JK1 Balancing"
- platform: ble_client
ble_client_id: jk_ble1
name: "JK1 enable bluetooth connection"
id: ble_client_switch0

View File

@@ -1,48 +0,0 @@
esphome:
name: "environment"
friendly_name: "environment"
esp32:
board: esp32dev
framework:
type: arduino
i2c:
sda: GPIO21
scl: GPIO22
scan: True
id: bus_a
sensor:
- platform: aht10
i2c_id: bus_a
address: 0x38
variant: AHT20
temperature:
name: "environment Temperature"
id: aht10_temperature
humidity:
name: "environment Humidity"
id: aht10_humidity
update_interval: 5s
web_server:
port: 80
logger:
level: DEBUG
api:
encryption:
key: !secret api_key
ota:
- platform: esphome
password: !secret ota_password
wifi:
ssid: !secret wifi_ssid
password: !secret wifi_password
fast_connect: on
captive_portal:

File diff suppressed because one or more lines are too long

135
flake.lock generated
View File

@@ -8,11 +8,11 @@
},
"locked": {
"dir": "pkgs/firefox-addons",
"lastModified": 1766762570,
"narHash": "sha256-Nevsj5NYurwp3I6nSMeh3uirwoinVSbCldqOXu4smms=",
"lastModified": 1753416229,
"narHash": "sha256-45s1L4h/6t3M+/ppqow1OFUgfk9jZHsR4jxNgxIWWmM=",
"owner": "rycee",
"repo": "nur-expressions",
"rev": "03d7d310ea91d6e4b47ed70aa86c781fcc5b38e1",
"rev": "553afee4efb5a7dea03cf654deafacd8fa1004f9",
"type": "gitlab"
},
"original": {
@@ -29,11 +29,11 @@
]
},
"locked": {
"lastModified": 1766682973,
"narHash": "sha256-GKO35onS711ThCxwWcfuvbIBKXwriahGqs+WZuJ3v9E=",
"lastModified": 1753470191,
"narHash": "sha256-hOUWU5L62G9sm8NxdiLWlLIJZz9H52VuFiDllHdwmVA=",
"owner": "nix-community",
"repo": "home-manager",
"rev": "91cdb0e2d574c64fae80d221f4bf09d5592e9ec2",
"rev": "a1817d1c0e5eabe7dfdfe4caa46c94d9d8f3fdb6",
"type": "github"
},
"original": {
@@ -44,11 +44,11 @@
},
"nixos-hardware": {
"locked": {
"lastModified": 1766568855,
"narHash": "sha256-UXVtN77D7pzKmzOotFTStgZBqpOcf8cO95FcupWp4Zo=",
"lastModified": 1753122741,
"narHash": "sha256-nFxE8lk9JvGelxClCmwuJYftbHqwnc01dRN4DVLUroM=",
"owner": "nixos",
"repo": "nixos-hardware",
"rev": "c5db9569ac9cc70929c268ac461f4003e3e5ca80",
"rev": "cc66fddc6cb04ab479a1bb062f4d4da27c936a22",
"type": "github"
},
"original": {
@@ -60,11 +60,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1766651565,
"narHash": "sha256-QEhk0eXgyIqTpJ/ehZKg9IKS7EtlWxF3N7DXy42zPfU=",
"lastModified": 1753250450,
"narHash": "sha256-i+CQV2rPmP8wHxj0aq4siYyohHwVlsh40kV89f3nw1s=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "3e2499d5539c16d0d173ba53552a4ff8547f4539",
"rev": "fc02ee70efb805d3b2865908a13ddd4474557ecf",
"type": "github"
},
"original": {
@@ -76,11 +76,11 @@
},
"nixpkgs-master": {
"locked": {
"lastModified": 1766794443,
"narHash": "sha256-Q8IyTQ3Lu8vX/iqO3U+E4pjLbP1NsqFih6uElf8OYrQ=",
"lastModified": 1753489269,
"narHash": "sha256-Iy/9c6DaxCY9ECCLgpoo+uwY1K5YTmJLU7fSg7JeALk=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "088b069b8270ee36d83533c86b9f91d924d185d9",
"rev": "ce6b0611ca70423d97fab2a3f53f61f1fa4ff0a4",
"type": "github"
},
"original": {
@@ -106,6 +106,56 @@
"type": "github"
}
},
"pyproject-build-systems": {
"inputs": {
"nixpkgs": [
"system_tools",
"nixpkgs"
],
"pyproject-nix": [
"system_tools",
"pyproject-nix"
],
"uv2nix": [
"system_tools",
"uv2nix"
]
},
"locked": {
"lastModified": 1744599653,
"narHash": "sha256-nysSwVVjG4hKoOjhjvE6U5lIKA8sEr1d1QzEfZsannU=",
"owner": "pyproject-nix",
"repo": "build-system-pkgs",
"rev": "7dba6dbc73120e15b558754c26024f6c93015dd7",
"type": "github"
},
"original": {
"owner": "pyproject-nix",
"repo": "build-system-pkgs",
"type": "github"
}
},
"pyproject-nix": {
"inputs": {
"nixpkgs": [
"system_tools",
"nixpkgs"
]
},
"locked": {
"lastModified": 1746540146,
"narHash": "sha256-QxdHGNpbicIrw5t6U3x+ZxeY/7IEJ6lYbvsjXmcxFIM=",
"owner": "pyproject-nix",
"repo": "pyproject.nix",
"rev": "e09c10c24ebb955125fda449939bfba664c467fd",
"type": "github"
},
"original": {
"owner": "pyproject-nix",
"repo": "pyproject.nix",
"type": "github"
}
},
"root": {
"inputs": {
"firefox-addons": "firefox-addons",
@@ -115,6 +165,7 @@
"nixpkgs-master": "nixpkgs-master",
"nixpkgs-stable": "nixpkgs-stable",
"sops-nix": "sops-nix",
"system_tools": "system_tools",
"systems": "systems"
}
},
@@ -125,11 +176,11 @@
]
},
"locked": {
"lastModified": 1766289575,
"narHash": "sha256-BOKCwOQQIP4p9z8DasT5r+qjri3x7sPCOq+FTjY8Z+o=",
"lastModified": 1752544651,
"narHash": "sha256-GllP7cmQu7zLZTs9z0J2gIL42IZHa9CBEXwBY9szT0U=",
"owner": "Mic92",
"repo": "sops-nix",
"rev": "9836912e37aef546029e48c8749834735a6b9dad",
"rev": "2c8def626f54708a9c38a5861866660395bb3461",
"type": "github"
},
"original": {
@@ -138,6 +189,29 @@
"type": "github"
}
},
"system_tools": {
"inputs": {
"nixpkgs": [
"nixpkgs"
],
"pyproject-build-systems": "pyproject-build-systems",
"pyproject-nix": "pyproject-nix",
"uv2nix": "uv2nix"
},
"locked": {
"lastModified": 1747501237,
"narHash": "sha256-woyaUwmZurfNTXBEFM6M7ueSd/Udixs+4DUInhL835c=",
"owner": "RichieCahill",
"repo": "system_tools",
"rev": "68ab5d1c17ac3fe2487f73dbbb4848bd2291139e",
"type": "github"
},
"original": {
"owner": "RichieCahill",
"repo": "system_tools",
"type": "github"
}
},
"systems": {
"locked": {
"lastModified": 1689347949,
@@ -152,6 +226,31 @@
"repo": "default-linux",
"type": "github"
}
},
"uv2nix": {
"inputs": {
"nixpkgs": [
"system_tools",
"nixpkgs"
],
"pyproject-nix": [
"system_tools",
"pyproject-nix"
]
},
"locked": {
"lastModified": 1747441483,
"narHash": "sha256-W8BFXk5R0TuJcjIhcGoMpSOaIufGXpizK0pm+uTqynA=",
"owner": "pyproject-nix",
"repo": "uv2nix",
"rev": "582024dc64663e9f88d467c2f7f7b20d278349de",
"type": "github"
},
"original": {
"owner": "pyproject-nix",
"repo": "uv2nix",
"type": "github"
}
}
},
"root": "root",

View File

@@ -31,6 +31,11 @@
inputs.nixpkgs.follows = "nixpkgs";
};
system_tools = {
url = "github:RichieCahill/system_tools";
inputs.nixpkgs.follows = "nixpkgs";
};
sops-nix = {
url = "github:Mic92/sops-nix";
inputs.nixpkgs.follows = "nixpkgs";
@@ -54,7 +59,6 @@
system:
import nixpkgs {
inherit system;
overlays = builtins.attrValues outputs.overlays;
config.allowUnfree = true;
}
);
@@ -73,12 +77,6 @@
];
specialArgs = { inherit inputs outputs; };
};
brain = lib.nixosSystem {
modules = [
./systems/brain
];
specialArgs = { inherit inputs outputs; };
};
jeeves = lib.nixosSystem {
modules = [
./systems/jeeves
@@ -91,12 +89,6 @@
];
specialArgs = { inherit inputs outputs; };
};
leviathan = lib.nixosSystem {
modules = [
./systems/leviathan
];
specialArgs = { inherit inputs outputs; };
};
};
};
}

View File

@@ -3,39 +3,15 @@
# When applied, the stable nixpkgs set (declared in the flake inputs) will be accessible through 'pkgs.stable'
stable = final: _prev: {
stable = import inputs.nixpkgs-stable {
system = final.stdenv.hostPlatform.system;
system = final.system;
config.allowUnfree = true;
};
};
# When applied, the master nixpkgs set (declared in the flake inputs) will be accessible through 'pkgs.master'
master = final: _prev: {
master = import inputs.nixpkgs-master {
system = final.stdenv.hostPlatform.system;
system = final.system;
config.allowUnfree = true;
};
};
python-env = final: _prev: {
my_python = final.python313.withPackages (
ps: with ps; [
apprise
apscheduler
mypy
polars
psycopg
pyfakefs
pytest
pytest-cov
pytest-mock
pytest-xdist
requests
ruff
scalene
sqlalchemy
textual
typer
types-requests
]
);
};
}

View File

@@ -1,79 +0,0 @@
[project]
name = "system_tools"
version = "0.1.0"
description = ""
authors = [{ name = "Richie Cahill", email = "richie@tmmworkshop.com" }]
requires-python = "~=3.13.0"
readme = "README.md"
license = "MIT"
# these dependencies are a best effort and aren't guaranteed to work
dependencies = ["apprise", "apscheduler", "polars", "requests", "typer"]
[dependency-groups]
dev = [
"mypy",
"pyfakefs",
"pytest-cov",
"pytest-mock",
"pytest-xdist",
"pytest",
"ruff",
"types-requests",
]
[tool.ruff]
target-version = "py313"
line-length = 120
lint.select = ["ALL"]
lint.ignore = [
"G004", # (PERM) This is a performers nit
"COM812", # (TEMP) conflicts when used with the formatter
"ISC001", # (TEMP) conflicts when used with the formatter
"S603", # (PERM) This is known to cause a false positive
]
[tool.ruff.lint.per-file-ignores]
"tests/**" = [
"S101", # (perm) pytest needs asserts
]
"python/random/**" = [
"T201", # (perm) I don't care about print statements dir
]
"python/testing/**" = [
"T201", # (perm) I don't care about print statements dir
"ERA001", # (perm) I don't care about print statements dir
]
"python/splendor/**" = [
"S311", # (perm) there is no security issue here
"T201", # (perm) I don't care about print statements dir
"PLR2004", # (temps) need to think about this
]
[tool.ruff.lint.pydocstyle]
convention = "google"
[tool.ruff.lint.flake8-builtins]
builtins-ignorelist = ["id"]
[tool.ruff.lint.pylint]
max-args = 9
[tool.coverage.run]
source = ["system_tools"]
[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"if TYPE_CHECKING:",
"raise NotImplementedError",
"if __name__ == \"__main__\":",
]
[tool.pytest.ini_options]
addopts = "-n auto -ra"
# --cov=system_tools --cov-report=term-missing --cov-report=xml --cov-report=html --cov-branch

View File

@@ -1 +0,0 @@
"""Server Tools."""

View File

@@ -1,72 +0,0 @@
"""common."""
from __future__ import annotations
import logging
import sys
from datetime import UTC, datetime
from os import getenv
from subprocess import PIPE, Popen
from apprise import Apprise
logger = logging.getLogger(__name__)
def configure_logger(level: str = "INFO") -> None:
"""Configure the logger.
Args:
level (str, optional): The logging level. Defaults to "INFO".
"""
logging.basicConfig(
level=level,
datefmt="%Y-%m-%dT%H:%M:%S%z",
format="%(asctime)s %(levelname)s %(filename)s:%(lineno)d - %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
def bash_wrapper(command: str) -> tuple[str, int]:
"""Execute a bash command and capture the output.
Args:
command (str): The bash command to be executed.
Returns:
Tuple[str, int]: A tuple containing the output of the command (stdout) as a string,
the error output (stderr) as a string (optional), and the return code as an integer.
"""
# This is a acceptable risk
process = Popen(command.split(), stdout=PIPE, stderr=PIPE)
output, error = process.communicate()
if error:
logger.error(f"{error=}")
return error.decode(), process.returncode
return output.decode(), process.returncode
def signal_alert(body: str, title: str = "") -> None:
"""Send a signal alert.
Args:
body (str): The body of the alert.
title (str, optional): The title of the alert. Defaults to "".
"""
apprise_client = Apprise()
from_phone = getenv("SIGNAL_ALERT_FROM_PHONE")
to_phone = getenv("SIGNAL_ALERT_TO_PHONE")
if not from_phone or not to_phone:
logger.info("SIGNAL_ALERT_FROM_PHONE or SIGNAL_ALERT_TO_PHONE not set")
return
apprise_client.add(f"signal://localhost:8989/{from_phone}/{to_phone}")
apprise_client.notify(title=title, body=body)
def utcnow() -> datetime:
"""Get the current UTC time."""
return datetime.now(tz=UTC)

View File

@@ -1,59 +0,0 @@
"""database."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from sqlalchemy import inspect
from sqlalchemy.exc import NoInspectionAvailable
if TYPE_CHECKING:
from collections.abc import Sequence
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
def safe_insert(orm_objects: Sequence[object], session: Session) -> list[tuple[Exception, object]]:
"""Safer insert at allows for partial rollbacks.
Args:
orm_objects (Sequence[object]): Tables to insert.
session (Session): Database session.
"""
if unmapped := [orm_object for orm_object in orm_objects if not _is_mapped_instance(orm_object)]:
error = f"safe_insert expects ORM-mapped instances {unmapped}"
raise TypeError(error)
return _safe_insert(orm_objects, session)
def _safe_insert(objects: Sequence[object], session: Session) -> list[tuple[Exception, object]]:
exceptions: list[tuple[Exception, object]] = []
try:
session.add_all(objects)
session.commit()
except Exception as error:
session.rollback()
objects_len = len(objects)
if objects_len == 1:
logger.exception(objects)
return [(error, objects[0])]
middle = objects_len // 2
exceptions.extend(_safe_insert(objects=objects[:middle], session=session))
exceptions.extend(_safe_insert(objects=objects[middle:], session=session))
return exceptions
def _is_mapped_instance(obj: object) -> bool:
"""Return True if `obj` is a SQLAlchemy ORM-mapped instance."""
try:
inspect(obj) # raises NoInspectionAvailable if not mapped
except NoInspectionAvailable:
return False
else:
return True

View File

@@ -1 +0,0 @@
"""installer."""

View File

@@ -1,308 +0,0 @@
"""Install NixOS on a ZFS pool."""
from __future__ import annotations
import curses
import logging
import sys
from os import getenv
from pathlib import Path
from random import getrandbits
from subprocess import PIPE, Popen, run
from time import sleep
from typing import TYPE_CHECKING
from python.common import configure_logger
from python.installer.tui import draw_menu
if TYPE_CHECKING:
from collections.abc import Sequence
logger = logging.getLogger(__name__)
def bash_wrapper(command: str) -> str:
"""Execute a bash command and capture the output.
Args:
command (str): The bash command to be executed.
Returns:
Tuple[str, int]: A tuple containing the output of the command (stdout) as a string,
the error output (stderr) as a string (optional), and the return code as an integer.
"""
logger.debug(f"running {command=}")
# This is a acceptable risk
process = Popen(command.split(), stdout=PIPE, stderr=PIPE)
output, _ = process.communicate()
if process.returncode != 0:
error = f"Failed to run command {command=} return code {process.returncode=}"
raise RuntimeError(error)
return output.decode()
def partition_disk(disk: str, swap_size: int, reserve: int = 0) -> None:
"""Partition a disk.
Args:
disk (str): The disk to partition.
swap_size (int): The size of the swap partition in GB.
minimum value is 1.
reserve (int, optional): The size of the reserve partition in GB. Defaults to 0.
minimum value is 0.
"""
logger.info(f"partitioning {disk=}")
swap_size = max(swap_size, 1)
reserve = max(reserve, 0)
bash_wrapper(f"blkdiscard -f {disk}")
if reserve > 0:
msg = f"Creating swap partition on {disk=} with size {swap_size=}GiB and reserve {reserve=}GiB"
logger.info(msg)
swap_start = swap_size + reserve
swap_partition = f"mkpart swap -{swap_start}GiB -{reserve}GiB "
else:
logger.info(f"Creating swap partition on {disk=} with size {swap_size=}GiB")
swap_start = swap_size
swap_partition = f"mkpart swap -{swap_start}GiB 100% "
logger.debug(f"{swap_partition=}")
create_partitions = (
f"parted --script --align=optimal {disk} -- "
"mklabel gpt "
"mkpart EFI 1MiB 4GiB "
f"mkpart root_pool 4GiB -{swap_start}GiB "
f"{swap_partition}"
"set 1 esp on"
)
bash_wrapper(create_partitions)
logger.info(f"{disk=} successfully partitioned")
def create_zfs_pool(pool_disks: Sequence[str], mnt_dir: str) -> None:
"""Create a ZFS pool.
Args:
pool_disks (Sequence[str]): A tuple of disks to use for the pool.
mnt_dir (str): The mount directory.
"""
if len(pool_disks) <= 0:
error = "disks must be a tuple of at least length 1"
raise ValueError(error)
zpool_create = (
"zpool create "
"-o ashift=12 "
"-o autotrim=on "
f"-R {mnt_dir} "
"-O acltype=posixacl "
"-O canmount=off "
"-O dnodesize=auto "
"-O normalization=formD "
"-O relatime=on "
"-O xattr=sa "
"-O mountpoint=legacy "
"-O compression=zstd "
"-O atime=off "
"root_pool "
)
if len(pool_disks) == 1:
zpool_create += pool_disks[0]
else:
zpool_create += "mirror "
zpool_create += " ".join(pool_disks)
bash_wrapper(zpool_create)
zpools = bash_wrapper("zpool list -o name")
if "root_pool" not in zpools.splitlines():
logger.critical("Failed to create root_pool")
sys.exit(1)
def create_zfs_datasets() -> None:
"""Create ZFS datasets."""
bash_wrapper("zfs create -o canmount=noauto -o reservation=10G root_pool/root")
bash_wrapper("zfs create root_pool/home")
bash_wrapper("zfs create root_pool/var -o reservation=1G")
bash_wrapper("zfs create -o compression=zstd-9 -o reservation=10G root_pool/nix")
datasets = bash_wrapper("zfs list -o name")
expected_datasets = {
"root_pool/root",
"root_pool/home",
"root_pool/var",
"root_pool/nix",
}
missing_datasets = expected_datasets.difference(datasets.splitlines())
if missing_datasets:
logger.critical(f"Failed to create pools {missing_datasets}")
sys.exit(1)
def get_cpu_manufacturer() -> str:
"""Get the CPU manufacturer."""
output = bash_wrapper("cat /proc/cpuinfo")
id_vendor = {"AuthenticAMD": "amd", "GenuineIntel": "intel"}
for line in output.splitlines():
if "vendor_id" in line:
return id_vendor[line.split(": ")[1].strip()]
error = "Failed to get CPU manufacturer"
raise RuntimeError(error)
def get_boot_drive_id(disk: str) -> str:
"""Get the boot drive ID."""
output = bash_wrapper(f"lsblk -o UUID {disk}-part1")
return output.splitlines()[1]
def create_nix_hardware_file(mnt_dir: str, disks: Sequence[str], encrypt: str | None) -> None:
"""Create a NixOS hardware file."""
cpu_manufacturer = get_cpu_manufacturer()
devices = ""
if encrypt:
disk = disks[0]
devices = (
f' luks.devices."luks-root-pool-{disk.split("/")[-1]}-part2"'
"= {\n"
f' device = "{disk}-part2";\n'
" bypassWorkqueues = true;\n"
" allowDiscards = true;\n"
" };\n"
)
host_id = format(getrandbits(32), "08x")
nix_hardware = (
"{ config, lib, modulesPath, ... }:\n"
"{\n"
' imports = [ (modulesPath + "/installer/scan/not-detected.nix") ];\n\n'
" boot = {\n"
" initrd = {\n"
' availableKernelModules = [ \n "ahci"\n "ehci_pci"\n "nvme"\n "sd_mod"\n'
' "usb_storage"\n "usbhid"\n "xhci_pci"\n ];\n'
" kernelModules = [ ];\n"
f" {devices}"
" };\n"
f' kernelModules = [ "kvm-{cpu_manufacturer}" ];\n'
" extraModulePackages = [ ];\n"
" };\n\n"
" fileSystems = {\n"
' "/" = lib.mkDefault {\n device = "root_pool/root";\n fsType = "zfs";\n };\n\n'
' "/home" = {\n device = "root_pool/home";\n fsType = "zfs";\n };\n\n'
' "/var" = {\n device = "root_pool/var";\n fsType = "zfs";\n };\n\n'
' "/nix" = {\n device = "root_pool/nix";\n fsType = "zfs";\n };\n\n'
' "/boot" = {\n'
f' device = "/dev/disk/by-uuid/{get_boot_drive_id(disks[0])}";\n'
' fsType = "vfat";\n options = [\n "fmask=0077"\n'
' "dmask=0077"\n ];\n };\n };\n\n'
" swapDevices = [ ];\n\n"
" networking.useDHCP = lib.mkDefault true;\n\n"
' nixpkgs.hostPlatform = lib.mkDefault "x86_64-linux";\n'
f" hardware.cpu.{cpu_manufacturer}.updateMicrocode = "
"lib.mkDefault config.hardware.enableRedistributableFirmware;\n"
f' networking.hostId = "{host_id}";\n'
"}\n"
)
Path(f"{mnt_dir}/etc/nixos/hardware-configuration.nix").write_text(nix_hardware)
def install_nixos(mnt_dir: str, disks: Sequence[str], encrypt: str | None) -> None:
"""Install NixOS."""
bash_wrapper(f"mount -o X-mount.mkdir -t zfs root_pool/root {mnt_dir}")
bash_wrapper(f"mount -o X-mount.mkdir -t zfs root_pool/home {mnt_dir}/home")
bash_wrapper(f"mount -o X-mount.mkdir -t zfs root_pool/var {mnt_dir}/var")
bash_wrapper(f"mount -o X-mount.mkdir -t zfs root_pool/nix {mnt_dir}/nix")
for disk in disks:
bash_wrapper(f"mkfs.vfat -n EFI {disk}-part1")
# set up mirroring afterwards if more than one disk
boot_partition = (
f"mount -t vfat -o fmask=0077,dmask=0077,iocharset=iso8859-1,X-mount.mkdir {disks[0]}-part1 {mnt_dir}/boot"
)
bash_wrapper(boot_partition)
bash_wrapper(f"nixos-generate-config --root {mnt_dir}")
create_nix_hardware_file(mnt_dir, disks, encrypt)
run(("nixos-install", "--root", mnt_dir), check=True)
def installer(
disks: Sequence[str],
swap_size: int,
reserve: int,
encrypt_key: str | None,
) -> None:
"""Main."""
logger.info("Starting installation")
for disk in disks:
partition_disk(disk, swap_size, reserve)
test = Popen(("printf", f"'{encrypt_key}'"), stdout=PIPE)
if encrypt_key:
sleep(1)
for command in (
f"cryptsetup luksFormat --type luks2 {disk}-part2 -",
f"cryptsetup luksOpen {disk}-part2 luks-root-pool-{disk.split('/')[-1]}-part2 -",
):
run(command, check=True, stdin=test.stdout)
mnt_dir = "/tmp/nix_install" # noqa: S108
Path(mnt_dir).mkdir(parents=True, exist_ok=True)
if encrypt_key:
pool_disks = [f"/dev/mapper/luks-root-pool-{disk.split('/')[-1]}-part2" for disk in disks]
else:
pool_disks = [f"{disk}-part2" for disk in disks]
create_zfs_pool(pool_disks, mnt_dir)
create_zfs_datasets()
install_nixos(mnt_dir, disks, encrypt_key)
logger.info("Installation complete")
def main() -> None:
"""Main."""
configure_logger("DEBUG")
state = curses.wrapper(draw_menu)
encrypt_key = getenv("ENCRYPT_KEY")
logger.info("installing_nixos")
logger.info(f"disks: {state.selected_device_ids}")
logger.info(f"swap_size: {state.swap_size}")
logger.info(f"reserve: {state.reserve_size}")
logger.info(f"encrypted: {bool(encrypt_key)}")
sleep(3)
installer(
disks=state.get_selected_devices(),
swap_size=state.swap_size,
reserve=state.reserve_size,
encrypt_key=encrypt_key,
)
if __name__ == "__main__":
main()

View File

@@ -1,498 +0,0 @@
"""TUI module."""
from __future__ import annotations
import curses
import logging
from collections import defaultdict
from subprocess import PIPE, Popen
logger = logging.getLogger(__name__)
def bash_wrapper(command: str) -> str:
"""Execute a bash command and capture the output.
Args:
command (str): The bash command to be executed.
Returns:
Tuple[str, int]: A tuple containing the output of the command (stdout) as a string,
the error output (stderr) as a string (optional), and the return code as an integer.
"""
logger.debug(f"running {command=}")
# This is a acceptable risk
process = Popen(command.split(), stdout=PIPE, stderr=PIPE)
output, _ = process.communicate()
if process.returncode != 0:
error = f"Failed to run command {command=} return code {process.returncode=}"
raise RuntimeError(error)
return output.decode()
class Cursor:
"""Cursor class."""
def __init__(self) -> None:
"""Initialize the Cursor class."""
self.x_position = 0
self.y_position = 0
self.height = 0
self.width = 0
def set_height(self, height: int) -> None:
"""Set height."""
self.height = height
def set_width(self, width: int) -> None:
"""Set width."""
self.width = width
def x_bounce_check(self, cursor: int) -> int:
"""X bounce check."""
cursor = max(0, cursor)
return min(self.width - 1, cursor)
def y_bounce_check(self, cursor: int) -> int:
"""Y bounce check."""
cursor = max(0, cursor)
return min(self.height - 1, cursor)
def set_x(self, x: int) -> None:
"""Set x."""
self.x_position = self.x_bounce_check(x)
def set_y(self, y: int) -> None:
"""Set y."""
self.y_position = self.y_bounce_check(y)
def get_x(self) -> int:
"""Get x."""
return self.x_position
def get_y(self) -> int:
"""Get y."""
return self.y_position
def move_up(self) -> None:
"""Move up."""
self.set_y(self.y_position - 1)
def move_down(self) -> None:
"""Move down."""
self.set_y(self.y_position + 1)
def move_left(self) -> None:
"""Move left."""
self.set_x(self.x_position - 1)
def move_right(self) -> None:
"""Move right."""
self.set_x(self.x_position + 1)
def navigation(self, key: int) -> None:
"""Navigation.
Args:
key (int): The key.
"""
action = {
curses.KEY_DOWN: self.move_down,
curses.KEY_UP: self.move_up,
curses.KEY_RIGHT: self.move_right,
curses.KEY_LEFT: self.move_left,
}
action.get(key, lambda: None)()
class State:
"""State class to store the state of the program."""
def __init__(self) -> None:
"""Initialize the State class."""
self.key = 0
self.cursor = Cursor()
self.swap_size = 0
self.show_swap_input = False
self.reserve_size = 0
self.show_reserve_input = False
self.selected_device_ids: set[str] = set()
def get_selected_devices(self) -> tuple[str, ...]:
"""Get selected devices."""
return tuple(self.selected_device_ids)
def get_device(raw_device: str) -> dict[str, str]:
"""Get a device.
Args:
raw_device (str): The raw device.
Returns:
dict[str, str]: The device.
"""
raw_device_components = raw_device.split(" ")
return {thing.split("=")[0].lower(): thing.split("=")[1].strip('"') for thing in raw_device_components}
def get_devices() -> list[dict[str, str]]:
"""Get a list of devices."""
# --bytes
raw_devices = bash_wrapper("lsblk --paths --pairs").splitlines()
return [get_device(raw_device) for raw_device in raw_devices]
def set_color() -> None:
"""Set the color."""
curses.start_color()
curses.use_default_colors()
for i in range(curses.COLORS):
curses.init_pair(i + 1, i, -1)
def debug_menu(std_screen: curses.window, key: int) -> None:
"""Debug menu.
Args:
std_screen (curses.window): The curses window.
key (int): The key.
"""
height, width = std_screen.getmaxyx()
std_screen.addstr(height - 4, 0, f"Width: {width}, Height: {height}", curses.color_pair(5))
key_pressed = f"Last key pressed: {key}"[: width - 1]
if key == 0:
key_pressed = "No key press detected..."[: width - 1]
std_screen.addstr(height - 3, 0, key_pressed)
for i in range(8):
std_screen.addstr(height - 2, i * 3, f"{i}██", curses.color_pair(i))
def get_text_input(std_screen: curses.window, prompt: str, y: int, x: int) -> str:
"""Get text input.
Args:
std_screen (curses.window): The curses window.
prompt (str): The prompt.
y (int): The y position.
x (int): The x position.
Returns:
str: The input string.
"""
esc_key = 27
curses.echo()
std_screen.addstr(y, x, prompt)
input_str = ""
while True:
key = std_screen.getch()
if key == ord("\n"):
break
if key == esc_key:
input_str = ""
break
if key in (curses.KEY_BACKSPACE, ord("\b"), 127):
input_str = input_str[:-1]
std_screen.addstr(y, x + len(prompt), input_str + " ")
else:
input_str += chr(key)
std_screen.refresh()
curses.noecho()
return input_str
def swap_size_input(
std_screen: curses.window,
state: State,
swap_offset: int,
) -> State:
"""Reserve size input.
Args:
std_screen (curses.window): The curses window.
state (State): The state object.
swap_offset (int): The swap offset.
Returns:
State: The updated state object.
"""
swap_size_text = "Swap size (GB): "
std_screen.addstr(swap_offset, 0, f"{swap_size_text}{state.swap_size}")
if state.key == ord("\n") and state.cursor.get_y() == swap_offset:
state.show_swap_input = True
if state.show_swap_input:
swap_size_str = get_text_input(std_screen, swap_size_text, swap_offset, 0)
try:
state.swap_size = int(swap_size_str)
state.show_swap_input = False
except ValueError:
std_screen.addstr(swap_offset, 0, "Invalid input. Press any key to continue.")
std_screen.getch()
state.show_swap_input = False
return state
def reserve_size_input(
std_screen: curses.window,
state: State,
reserve_offset: int,
) -> State:
"""Reserve size input.
Args:
std_screen (curses.window): The curses window.
state (State): The state object.
reserve_offset (int): The reserve offset.
Returns:
State: The updated state object.
"""
reserve_size_text = "reserve size (GB): "
std_screen.addstr(reserve_offset, 0, f"{reserve_size_text}{state.reserve_size}")
if state.key == ord("\n") and state.cursor.get_y() == reserve_offset:
state.show_reserve_input = True
if state.show_reserve_input:
reserve_size_str = get_text_input(std_screen, reserve_size_text, reserve_offset, 0)
try:
state.reserve_size = int(reserve_size_str)
state.show_reserve_input = False
except ValueError:
std_screen.addstr(reserve_offset, 0, "Invalid input. Press any key to continue.")
std_screen.getch()
state.show_reserve_input = False
return state
def status_bar(
std_screen: curses.window,
cursor: Cursor,
width: int,
height: int,
) -> None:
"""Draw the status bar.
Args:
std_screen (curses.window): The curses window.
cursor (Cursor): The cursor.
width (int): The width.
height (int): The height.
"""
std_screen.attron(curses.A_REVERSE)
std_screen.attron(curses.color_pair(3))
status_bar = f"Press 'q' to exit | STATUS BAR | Pos: {cursor.get_x()}, {cursor.get_y()}"
std_screen.addstr(height - 1, 0, status_bar)
std_screen.addstr(height - 1, len(status_bar), " " * (width - len(status_bar) - 1))
std_screen.attroff(curses.color_pair(3))
std_screen.attroff(curses.A_REVERSE)
def get_device_id_mapping() -> dict[str, set[str]]:
"""Get a list of device ids.
Returns:
list[str]: the list of device ids
"""
device_ids = bash_wrapper("find /dev/disk/by-id -type l").splitlines()
device_id_mapping: dict[str, set[str]] = defaultdict(set)
for device_id in device_ids:
device = bash_wrapper(f"readlink -f {device_id}").strip()
device_id_mapping[device].add(device_id)
return device_id_mapping
def calculate_device_menu_padding(devices: list[dict[str, str]], column: str, padding: int = 0) -> int:
"""Calculate the device menu padding.
Args:
devices (list[dict[str, str]]): The devices.
column (str): The column.
padding (int, optional): The padding. Defaults to 0.
Returns:
int: The calculated padding.
"""
return max(len(device[column]) for device in devices) + padding
def draw_device_ids(
state: State,
row_number: int,
menu_start_x: int,
std_screen: curses.window,
menu_width: list[int],
device_ids: set[str],
) -> tuple[State, int]:
"""Draw device IDs.
Args:
state (State): The state object.
row_number (int): The row number.
menu_start_x (int): The menu start x.
std_screen (curses.window): The curses window.
menu_width (list[int]): The menu width.
device_ids (set[str]): The device IDs.
Returns:
tuple[State, int]: The updated state object and the row number.
"""
for device_id in sorted(device_ids):
row_number = row_number + 1
if row_number == state.cursor.get_y() and state.cursor.get_x() in menu_width:
std_screen.attron(curses.A_BOLD)
if state.key == ord(" "):
if device_id not in state.selected_device_ids:
state.selected_device_ids.add(device_id)
else:
state.selected_device_ids.remove(device_id)
if device_id in state.selected_device_ids:
std_screen.attron(curses.color_pair(7))
std_screen.addstr(row_number, menu_start_x, f" {device_id}")
std_screen.attroff(curses.color_pair(7))
std_screen.attroff(curses.A_BOLD)
return state, row_number
def draw_device_menu(
std_screen: curses.window,
devices: list[dict[str, str]],
device_id_mapping: dict[str, set[str]],
state: State,
menu_start_y: int = 0,
menu_start_x: int = 0,
) -> tuple[State, int]:
"""Draw the device menu and handle user input.
Args:
std_screen (curses.window): the curses window to draw on
devices (list[dict[str, str]]): the list of devices to draw
device_id_mapping (dict[str, set[str]]): the list of device ids to draw
state (State): the state object to update
menu_start_y (int, optional): the y position to start drawing the menu. Defaults to 0.
menu_start_x (int, optional): the x position to start drawing the menu. Defaults to 0.
Returns:
State: the updated state object
"""
padding = 2
name_padding = calculate_device_menu_padding(devices, "name", padding)
size_padding = calculate_device_menu_padding(devices, "size", padding)
type_padding = calculate_device_menu_padding(devices, "type", padding)
mountpoints_padding = calculate_device_menu_padding(devices, "mountpoints", padding)
device_header = (
f"{'Name':{name_padding}}{'Size':{size_padding}}{'Type':{type_padding}}{'Mountpoints':{mountpoints_padding}}"
)
menu_width = list(range(menu_start_x, len(device_header) + menu_start_x))
std_screen.addstr(menu_start_y, menu_start_x, device_header, curses.color_pair(5))
devises_list_start = menu_start_y + 1
row_number = devises_list_start
for device in devices:
row_number = row_number + 1
device_name = device["name"]
device_row = (
f"{device_name:{name_padding}}"
f"{device['size']:{size_padding}}"
f"{device['type']:{type_padding}}"
f"{device['mountpoints']:{mountpoints_padding}}"
)
std_screen.addstr(row_number, menu_start_x, device_row)
state, row_number = draw_device_ids(
state=state,
row_number=row_number,
menu_start_x=menu_start_x,
std_screen=std_screen,
menu_width=menu_width,
device_ids=device_id_mapping[device_name],
)
return state, row_number
def draw_menu(std_screen: curses.window) -> State:
"""Draw the menu and handle user input.
Args:
std_screen (curses.window): the curses window to draw on
Returns:
State: the state object
"""
# Clear and refresh the screen for a blank canvas
std_screen.clear()
std_screen.refresh()
set_color()
state = State()
devices = get_devices()
device_id_mapping = get_device_id_mapping()
# Loop where k is the last character pressed
while state.key != ord("q"):
std_screen.clear()
height, width = std_screen.getmaxyx()
state.cursor.set_height(height)
state.cursor.set_width(width)
state.cursor.navigation(state.key)
state, device_menu_size = draw_device_menu(
std_screen=std_screen,
state=state,
devices=devices,
device_id_mapping=device_id_mapping,
)
swap_offset = device_menu_size + 2
swap_size_input(
std_screen=std_screen,
state=state,
swap_offset=swap_offset,
)
reserve_size_input(
std_screen=std_screen,
state=state,
reserve_offset=swap_offset + 1,
)
status_bar(std_screen, state.cursor, width, height)
debug_menu(std_screen, state.key)
std_screen.move(state.cursor.get_y(), state.cursor.get_x())
std_screen.refresh()
state.key = std_screen.getch()
return state

View File

@@ -1,155 +0,0 @@
"""Thing."""
from __future__ import annotations
import logging
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from dataclasses import dataclass
from multiprocessing import cpu_count
from typing import TYPE_CHECKING, Any, Literal, TypeVar
if TYPE_CHECKING:
from collections.abc import Callable, Mapping, Sequence
logger = logging.getLogger(__name__)
R = TypeVar("R")
modes = Literal["normal", "early_error"]
@dataclass
class ExecutorResults[R]:
"""Dataclass to store the results and exceptions of the parallel execution."""
results: list[R]
exceptions: list[BaseException]
def __repr__(self) -> str:
"""Return a string representation of the object."""
return f"results={self.results} exceptions={self.exceptions}"
def _parallelize_base[R](
executor_type: type[ThreadPoolExecutor | ProcessPoolExecutor],
func: Callable[..., R],
kwargs_list: Sequence[Mapping[str, Any]],
max_workers: int | None,
progress_tracker: int | None,
mode: modes,
) -> ExecutorResults:
total_work = len(kwargs_list)
with executor_type(max_workers=max_workers) as executor:
futures = [executor.submit(func, **kwarg) for kwarg in kwargs_list]
results = []
exceptions = []
for index, future in enumerate(futures, 1):
if exception := future.exception():
logger.error(f"{future} raised {exception.__class__.__name__}")
exceptions.append(exception)
if mode == "early_error":
executor.shutdown(wait=False)
raise exception
continue
results.append(future.result())
if progress_tracker and index % progress_tracker == 0:
logger.info(f"Progress: {index}/{total_work}")
return ExecutorResults(results, exceptions)
def parallelize_thread[R](
func: Callable[..., R],
kwargs_list: Sequence[Mapping[str, Any]],
max_workers: int | None = None,
progress_tracker: int | None = None,
mode: modes = "normal",
) -> ExecutorResults:
"""Generic function to run a function with multiple arguments in threads.
Args:
func (Callable[..., R]): Function to run in threads.
kwargs_list (Sequence[Mapping[str, Any]]): List of dictionaries with the arguments for the function.
max_workers (int, optional): Number of workers to use. Defaults to 8.
progress_tracker (int, optional): Number of tasks to complete before logging progress.
mode (modes, optional): Mode to use. Defaults to "normal".
Returns:
tuple[list[R], list[Exception]]: List with the results and a list with the exceptions.
"""
return _parallelize_base(
executor_type=ThreadPoolExecutor,
func=func,
kwargs_list=kwargs_list,
max_workers=max_workers,
progress_tracker=progress_tracker,
mode=mode,
)
def parallelize_process[R](
func: Callable[..., R],
kwargs_list: Sequence[Mapping[str, Any]],
max_workers: int | None = None,
progress_tracker: int | None = None,
mode: modes = "normal",
) -> ExecutorResults:
"""Generic function to run a function with multiple arguments in process.
Args:
func (Callable[..., R]): Function to run in process.
kwargs_list (Sequence[Mapping[str, Any]]): List of dictionaries with the arguments for the function.
max_workers (int, optional): Number of workers to use. Defaults to 4.
progress_tracker (int, optional): Number of tasks to complete before logging progress.
mode (modes, optional): Mode to use. Defaults to "normal".
Returns:
tuple[list[R], list[Exception]]: List with the results and a list with the exceptions.
"""
if max_workers and max_workers > cpu_count():
error = f"max_workers must be less than or equal to {cpu_count()}"
raise RuntimeError(error)
return process_executor_unchecked(
func=func,
kwargs_list=kwargs_list,
max_workers=max_workers,
progress_tracker=progress_tracker,
mode=mode,
)
def process_executor_unchecked[R](
func: Callable[..., R],
kwargs_list: Sequence[Mapping[str, Any]],
max_workers: int | None,
progress_tracker: int | None,
mode: modes = "normal",
) -> ExecutorResults:
"""Generic function to run a function with multiple arguments in parallel.
Note: this function does not check if the number of workers is greater than the number of CPUs.
This can cause the system to become unresponsive.
Args:
func (Callable[..., R]): Function to run in parallel.
kwargs_list (Sequence[Mapping[str, Any]]): List of dictionaries with the arguments for the function.
max_workers (int, optional): Number of workers to use. Defaults to 8.
progress_tracker (int, optional): Number of tasks to complete before logging progress.
mode (modes, optional): Mode to use. Defaults to "normal".
Returns:
tuple[list[R], list[Exception]]: List with the results and a list with the exceptions.
"""
return _parallelize_base(
executor_type=ProcessPoolExecutor,
func=func,
kwargs_list=kwargs_list,
max_workers=max_workers,
progress_tracker=progress_tracker,
mode=mode,
)

View File

@@ -1 +0,0 @@
"""init."""

View File

@@ -1,40 +0,0 @@
"""capasitor."""
def calculate_capacitor_capacity(voltage: float, farads: float) -> float:
"""Calculate capacitor capacity."""
joules = (farads * voltage**2) // 2
return joules // 3600
def calculate_pack_capacity(cells: int, cell_voltage: float, farads: float) -> float:
"""Calculate pack capacity."""
return calculate_capacitor_capacity(cells * cell_voltage, farads / cells)
def calculate_pack_capacity2(cells: int, cell_voltage: float, farads: float, cell_cost: float) -> tuple[float, float]:
"""Calculate pack capacity."""
capacitor_capacity = calculate_capacitor_capacity(cells * cell_voltage, farads / cells)
return capacitor_capacity, cell_cost * cells
def main() -> None:
"""Main."""
watt_hours = calculate_pack_capacity(cells=10, cell_voltage=2.7, farads=500)
print(f"{watt_hours=}")
print(f"{watt_hours*16=}")
watt_hours = calculate_pack_capacity(cells=1, cell_voltage=2.7, farads=5000)
print(f"{watt_hours=}")
watt_hours, cost = calculate_pack_capacity2(
cells=10,
cell_voltage=2.7,
farads=3000,
cell_cost=11.60,
)
print(f"{watt_hours=}")
print(f"{cost=}")
if __name__ == "__main__":
main()

View File

@@ -1,25 +0,0 @@
"""thing."""
def caculat_batry_specs(
cell_amp_hour: int,
cell_voltage: float,
cells_per_pack: int,
packs: int,
) -> tuple[float, float]:
"""Caculat battry specs."""
pack_voltage = cell_voltage * cells_per_pack
pack_watt_hours = pack_voltage * cell_amp_hour
battry_capacity = pack_watt_hours * packs
return (
battry_capacity,
pack_voltage,
)
battry_capacity, pack_voltage = caculat_batry_specs(300, 3.2, 8, 2)
print(f"{battry_capacity=} {pack_voltage=}")
cost = 1700
print(f"$/kWh {cost / battry_capacity}")

View File

@@ -1,196 +0,0 @@
"""voltage_drop."""
import math
from enum import Enum
class TemperatureUnit(Enum):
"""Temperature unit."""
CELSIUS = "c"
FAHRENHEIT = "f"
KELVIN = "k"
class Temperature:
"""Temperature."""
def __init__(
self,
temperature: float,
unit: TemperatureUnit = TemperatureUnit.CELSIUS,
) -> None:
"""__init__."""
unit_modifier = {
TemperatureUnit.CELSIUS: 1,
TemperatureUnit.FAHRENHEIT: 0.5556,
TemperatureUnit.KELVIN: 1.8,
}
self.temperature = temperature * unit_modifier[unit]
def __float__(self) -> float:
"""Return the temperature in degrees Celsius."""
return self.temperature
class LengthUnit(Enum):
"""Length unit."""
METERS = "m"
FEET = "ft"
INCHES = "in"
class Length:
"""Length."""
def __init__(self, length: float, unit: LengthUnit) -> None:
"""__init__."""
self.meters = self._convert_to_meters(length, unit)
def _convert_to_meters(self, length: float, unit: LengthUnit) -> float:
thing = {
LengthUnit.METERS: 1,
LengthUnit.FEET: 0.3048,
LengthUnit.INCHES: 0.0254,
}
test = thing.get(unit)
if test:
return length * test
error = f"Unsupported unit: {unit}"
raise ValueError(error)
def __float__(self) -> float:
"""Return the length in meters."""
return self.meters
def feet(self) -> float:
"""Return the length in feet."""
return self.meters * 3.2808
class MaterialType(Enum):
"""Material type."""
COPPER = "copper"
ALUMINUM = "aluminum"
CCA = "cca"
SILVER = "silver"
GOLD = "gold"
def get_material_resistivity(
material: MaterialType,
temperature: Temperature | None = None,
) -> float:
"""Get the resistivity of a material."""
if not temperature:
temperature = Temperature(20.0)
material_info = {
MaterialType.COPPER: (1.724e-8, 0.00393),
MaterialType.ALUMINUM: (2.908e-8, 0.00403),
MaterialType.CCA: (2.577e-8, 0.00397),
MaterialType.SILVER: (1.632e-8, 0.00380),
MaterialType.GOLD: (2.503e-8, 0.00340),
}
base_resistivity, temp_coefficient = material_info[material]
return base_resistivity * (1 + temp_coefficient * float(temperature))
def calculate_awg_diameter_mm(gauge: int) -> float:
"""Calculate wire diameter in millimeters for a given AWG gauge."""
return round(0.127 * 92 ** ((36 - gauge) / 39), 3)
def calculate_wire_area_m2(gauge: int) -> float:
"""Calculate the area of a wire in square meters.
Args:
gauge (int): The AWG (American Wire Gauge) number of the wire
Returns:
float: The area of the wire in square meters
"""
return math.pi * (calculate_awg_diameter_mm(gauge) / 2000) ** 2
def calculate_resistance_per_meter(gauge: int) -> float:
"""Calculate the resistance per meter of a wire.
Args:
gauge (int): The AWG (American Wire Gauge) number of the wire
Returns:
float: The resistance per meter of the wire
"""
return get_material_resistivity(MaterialType.COPPER) / calculate_wire_area_m2(gauge)
def voltage_drop(
gauge: int,
material: MaterialType,
length: Length,
current_a: float,
) -> float:
"""Calculate the voltage drop of a wire.
Args:
gauge (int): The AWG (American Wire Gauge) number of the wire
material (MaterialType): The type of conductor material (e.g., copper, aluminum)
length (Length): The length of the wire in meters
current_a (float): The current flowing through the wire in amperes
Returns:
float: The voltage drop of the wire in volts
"""
resistivity = get_material_resistivity(material)
resistance_per_meter = resistivity / calculate_wire_area_m2(gauge)
total_resistance = resistance_per_meter * float(length) * 2 # round-trip
return total_resistance * current_a
print(
voltage_drop(
gauge=10,
material=MaterialType.CCA,
length=Length(length=20, unit=LengthUnit.FEET),
current_a=20,
)
)
def max_wire_length(
gauge: int,
material: MaterialType,
current_amps: float,
voltage_drop: float = 0.3,
temperature: Temperature | None = None,
) -> Length:
"""Calculate the maximum allowable wire length based on voltage drop criteria.
Args:
gauge (int): The AWG (American Wire Gauge) number of the wire
material (MaterialType): The type of conductor material (e.g., copper, aluminum)
current_amps (float): The current flowing through the wire in amperes
voltage_drop (float, optional): Maximum allowable voltage drop as a decimal (default 0.1 or 10%)
temperature (Temperature | None, optional): The temperature of the wire. Defaults to None.
Returns:
float: Maximum wire length in meters that maintains the specified voltage drop
"""
if not temperature:
temperature = Temperature(100.0, unit=TemperatureUnit.FAHRENHEIT)
resistivity = get_material_resistivity(material, temperature)
resistance_per_meter = resistivity / calculate_wire_area_m2(gauge)
# V = IR, solve for length where V is the allowed voltage drop
return Length(
voltage_drop / (current_amps * resistance_per_meter),
LengthUnit.METERS,
)
print(max_wire_length(gauge=10, material=MaterialType.CCA, current_amps=20).feet())
print(max_wire_length(gauge=10, material=MaterialType.CCA, current_amps=10).feet())
print(max_wire_length(gauge=10, material=MaterialType.CCA, current_amps=5).feet())

View File

@@ -1 +0,0 @@
game_data/

View File

@@ -1 +0,0 @@
"""init."""

View File

@@ -1,675 +0,0 @@
"""Base logic for the Splendor game."""
from __future__ import annotations
import itertools
import json
import random
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Literal, Protocol
if TYPE_CHECKING:
from collections.abc import Sequence
from pathlib import Path
GemColor = Literal["white", "blue", "green", "red", "black", "gold"]
GEM_COLORS: tuple[GemColor, ...] = (
"white",
"blue",
"green",
"red",
"black",
"gold",
)
BASE_COLORS: tuple[GemColor, ...] = (
"white",
"blue",
"green",
"red",
"black",
)
GEM_ORDER: list[GemColor] = list(GEM_COLORS)
GEM_INDEX: dict[GemColor, int] = {c: i for i, c in enumerate(GEM_ORDER)}
BASE_INDEX: dict[GemColor, int] = {c: i for i, c in enumerate(BASE_COLORS)}
@dataclass(frozen=True)
class Card:
"""Development card: gives points + a permanent gem discount."""
tier: int
points: int
color: GemColor
cost: dict[GemColor, int]
@dataclass(frozen=True)
class Noble:
"""Noble tile: gives points if you have enough bonuses."""
name: str
points: int
requirements: dict[GemColor, int]
@dataclass
class PlayerState:
"""State of a player in the game."""
strategy: Strategy
tokens: dict[GemColor, int] = field(default_factory=lambda: dict.fromkeys(GEM_COLORS, 0))
discounts: dict[GemColor, int] = field(default_factory=lambda: dict.fromkeys(GEM_COLORS, 0))
cards: list[Card] = field(default_factory=list)
reserved: list[Card] = field(default_factory=list)
nobles: list[Noble] = field(default_factory=list)
card_score: int = 0
noble_score: int = 0
def total_tokens(self) -> int:
"""Total tokens in player's bank."""
return sum(self.tokens.values())
def add_noble(self, noble: Noble) -> None:
"""Add a noble to the player."""
self.nobles.append(noble)
self.noble_score = sum(noble.points for noble in self.nobles)
def add_card(self, card: Card) -> None:
"""Add a card to the player."""
self.cards.append(card)
self.card_score = sum(card.points for card in self.cards)
@property
def score(self) -> int:
"""Total points in player's cards + nobles."""
return self.card_score + self.noble_score
def can_afford(self, card: Card) -> bool:
"""Check if player can afford card, using discounts + gold."""
missing = 0
gold = self.tokens["gold"]
for color, cost in card.cost.items():
missing += max(0, cost - self.discounts.get(color, 0) - self.tokens.get(color, 0))
if missing > gold:
return False
return True
def pay_for_card(self, card: Card) -> dict[GemColor, int]:
"""Pay tokens for card, move card to tableau, return payment for bank."""
if not self.can_afford(card):
msg = f"cannot afford card {card}"
raise ValueError(msg)
payment: dict[GemColor, int] = dict.fromkeys(GEM_COLORS, 0)
gold_available = self.tokens["gold"]
for color in BASE_COLORS:
cost = card.cost.get(color, 0)
effective_cost = max(0, cost - self.discounts.get(color, 0))
use = min(self.tokens[color], effective_cost)
self.tokens[color] -= use
payment[color] += use
remaining = effective_cost - use
if remaining > 0:
use_gold = min(gold_available, remaining)
gold_available -= use_gold
self.tokens["gold"] -= use_gold
payment["gold"] += use_gold
self.add_card(card)
self.discounts[card.color] += 1
return payment
def get_default_starting_tokens(player_count: int) -> dict[GemColor, int]:
"""get_default_starting_tokens."""
token_count = (player_count * player_count - 3 * player_count + 10) // 2
return {
"white": token_count,
"blue": token_count,
"green": token_count,
"red": token_count,
"black": token_count,
"gold": 5,
}
@dataclass
class GameConfig:
"""Game configuration: gems, bank, cards, nobles, etc."""
win_score: int = 15
table_cards_per_tier: int = 4
reserve_limit: int = 3
token_limit: int = 10
turn_limit: int = 1000
minimum_tokens_to_buy_2: int = 4
max_token_take: int = 3
cards: list[Card] = field(default_factory=list)
nobles: list[Noble] = field(default_factory=list)
class GameState:
"""Game state: players, bank, decks, table, available nobles, etc."""
def __init__(
self,
config: GameConfig,
players: list[PlayerState],
bank: dict[GemColor, int],
decks_by_tier: dict[int, list[Card]],
table_by_tier: dict[int, list[Card]],
available_nobles: list[Noble],
) -> None:
"""Game state."""
self.config = config
self.players = players
self.bank = bank
self.decks_by_tier = decks_by_tier
self.table_by_tier = table_by_tier
self.available_nobles = available_nobles
self.noble_min_requirements = 0
self.get_noble_min_requirements()
self.current_player_index = 0
self.finished = False
def get_noble_min_requirements(self) -> None:
"""Find the minimum requirement for all available nobles."""
test = 0
for noble in self.available_nobles:
test = max(test, min(foo for foo in noble.requirements.values()))
self.noble_min_requirements = test
def next_player(self) -> None:
"""Advance to the next player."""
self.current_player_index = (self.current_player_index + 1) % len(self.players)
@property
def current_player(self) -> PlayerState:
"""Current player."""
return self.players[self.current_player_index]
def refill_table(self) -> None:
"""Refill face-up cards from decks."""
for tier, deck in self.decks_by_tier.items():
table = self.table_by_tier[tier]
while len(table) < self.config.table_cards_per_tier and deck:
table.append(deck.pop())
def check_winner_simple(self) -> PlayerState | None:
"""Simplified: end immediately when someone hits win_score."""
eligible = [player for player in self.players if player.score >= self.config.win_score]
if not eligible:
return None
eligible.sort(
key=lambda p: (p.score, -len(p.cards)),
reverse=True,
)
self.finished = True
return eligible[0]
class Action:
"""Marker protocol for actions."""
@dataclass
class TakeDifferent(Action):
"""Take up to 3 different gem colors."""
colors: list[GemColor]
@dataclass
class TakeDouble(Action):
"""Take two of the same color."""
color: GemColor
@dataclass
class BuyCard(Action):
"""Buy a face-up card."""
tier: int
index: int
@dataclass
class BuyCardReserved(Action):
"""Buy a face-up card."""
index: int
@dataclass
class ReserveCard(Action):
"""Reserve a face-up card."""
tier: int
index: int | None = None
from_deck: bool = False
class Strategy(Protocol):
"""Implement this to make a bot or human controller."""
def __init__(self, name: str) -> None:
"""Initialize a strategy."""
self.name = name
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Return an Action, or None to concede/end."""
raise NotImplementedError
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Called if player has more than token_limit tokens after an action.
Default: naive auto-discard.
"""
return auto_discard_tokens(player, excess)
def choose_noble(
self,
game: GameState, # noqa: ARG002
player: PlayerState, # noqa: ARG002
nobles: list[Noble],
) -> Noble:
"""Called if player qualifies for multiple nobles. Default: first."""
return nobles[0]
def auto_discard_tokens(player: PlayerState, excess: int) -> dict[GemColor, int]:
"""Very dumb discard logic: discard from colors you have the most of."""
to_discard: dict[GemColor, int] = dict.fromkeys(GEM_COLORS, 0)
remaining = excess
while remaining > 0:
color = max(player.tokens, key=lambda c: player.tokens[c])
if player.tokens[color] == 0:
break
player.tokens[color] -= 1
to_discard[color] += 1
remaining -= 1
return to_discard
def enforce_token_limit(
game: GameState,
strategy: Strategy,
player: PlayerState,
) -> None:
"""If player has more than token_limit tokens, force discards."""
limit = game.config.token_limit
total = player.total_tokens()
if total <= limit:
return
excess = total - limit
discards = strategy.choose_discard(game, player, excess)
for color, amount in discards.items():
available = player.tokens[color]
to_remove = min(amount, available)
if to_remove <= 0:
continue
player.tokens[color] -= to_remove
game.bank[color] += to_remove
remaining = player.total_tokens() - limit
if remaining > 0:
auto = auto_discard_tokens(player, remaining)
for color, amount in auto.items():
game.bank[color] += amount
def _check_nobles_for_player(player: PlayerState, noble: Noble) -> bool:
# this rule is slower
for color, cost in noble.requirements.items(): # noqa: SIM110
if player.discounts[color] < cost:
return False
return True
def check_nobles_for_player(
game: GameState,
strategy: Strategy,
player: PlayerState,
) -> None:
"""Award at most one noble to player if they qualify."""
if game.noble_min_requirements > max(player.discounts.values()):
return
candidates = [noble for noble in game.available_nobles if _check_nobles_for_player(player, noble)]
if not candidates:
return
chosen = candidates[0] if len(candidates) == 1 else strategy.choose_noble(game, player, candidates)
if chosen not in game.available_nobles:
return
game.available_nobles.remove(chosen)
game.get_noble_min_requirements()
player.add_noble(chosen)
def apply_take_different(game: GameState, strategy: Strategy, action: TakeDifferent) -> None:
"""Mutate game state according to action."""
player = game.current_player
colors = [color for color in action.colors if color in BASE_COLORS and game.bank[color] > 0]
if not (1 <= len(colors) <= game.config.max_token_take):
return
for color in colors:
game.bank[color] -= 1
player.tokens[color] += 1
enforce_token_limit(game, strategy, player)
def apply_take_double(game: GameState, strategy: Strategy, action: TakeDouble) -> None:
"""Mutate game state according to action."""
player = game.current_player
color = action.color
if color not in BASE_COLORS:
return
if game.bank[color] < game.config.minimum_tokens_to_buy_2:
return
game.bank[color] -= 2
player.tokens[color] += 2
enforce_token_limit(game, strategy, player)
def apply_buy_card(game: GameState, _strategy: Strategy, action: BuyCard) -> None:
"""Mutate game state according to action."""
player = game.current_player
row = game.table_by_tier.get(action.tier)
if row is None or not (0 <= action.index < len(row)):
return
card = row[action.index]
if not player.can_afford(card):
return
row.pop(action.index)
payment = player.pay_for_card(card)
for color, amount in payment.items():
game.bank[color] += amount
game.refill_table()
def apply_buy_card_reserved(game: GameState, _strategy: Strategy, action: BuyCardReserved) -> None:
"""Mutate game state according to action."""
player = game.current_player
if not (0 <= action.index < len(player.reserved)):
return
card = player.reserved[action.index]
if not player.can_afford(card):
return
player.reserved.pop(action.index)
payment = player.pay_for_card(card)
for color, amount in payment.items():
game.bank[color] += amount
def apply_reserve_card(game: GameState, strategy: Strategy, action: ReserveCard) -> None:
"""Mutate game state according to action."""
player = game.current_player
if len(player.reserved) >= game.config.reserve_limit:
return
card: Card | None = None
if action.from_deck:
deck = game.decks_by_tier.get(action.tier)
if deck:
card = deck.pop()
else:
row = game.table_by_tier.get(action.tier)
if row is None:
return
if action.index is None or not (0 <= action.index < len(row)):
return
card = row.pop(action.index)
game.refill_table()
if card is None:
return
player.reserved.append(card)
if game.bank["gold"] > 0:
game.bank["gold"] -= 1
player.tokens["gold"] += 1
enforce_token_limit(game, strategy, player)
def apply_action(game: GameState, strategy: Strategy, action: Action) -> None:
"""Mutate game state according to action."""
actions = {
TakeDifferent: apply_take_different,
TakeDouble: apply_take_double,
BuyCard: apply_buy_card,
ReserveCard: apply_reserve_card,
BuyCardReserved: apply_buy_card_reserved,
}
action_func = actions.get(type(action))
if action_func is None:
msg = f"Unknown action type: {type(action)}"
raise ValueError(msg)
action_func(game, strategy, action)
# not sure how to simplify this yet
def get_legal_actions( # noqa: C901
game: GameState,
player: PlayerState | None = None,
) -> list[Action]:
"""Enumerate all syntactically legal actions for the given player.
This enforces:
- token-taking rules
- reserve limits
- affordability for buys
"""
if player is None:
player = game.players[game.current_player_index]
actions: list[Action] = []
colors_available = [c for c in BASE_COLORS if game.bank[c] > 0]
for r in (1, 2, 3):
actions.extend(TakeDifferent(colors=list(combo)) for combo in itertools.combinations(colors_available, r))
actions.extend(
TakeDouble(color=color) for color in BASE_COLORS if game.bank[color] >= game.config.minimum_tokens_to_buy_2
)
for tier, row in game.table_by_tier.items():
for idx, card in enumerate(row):
if player.can_afford(card):
actions.append(BuyCard(tier=tier, index=idx))
for idx, card in enumerate(player.reserved):
if player.can_afford(card):
actions.append(BuyCardReserved(index=idx))
if len(player.reserved) < game.config.reserve_limit:
for tier, row in game.table_by_tier.items():
for idx, _ in enumerate(row):
actions.append(
ReserveCard(tier=tier, index=idx, from_deck=False),
)
for tier, deck in game.decks_by_tier.items():
if deck:
actions.append(
ReserveCard(tier=tier, index=None, from_deck=True),
)
return actions
def create_random_cards_tier(
tier: int,
card_count: int,
cost_choices: list[int],
point_choices: list[int],
) -> list[Card]:
"""Create a random set of cards for a given tier."""
cards: list[Card] = []
for color in BASE_COLORS:
for _ in range(card_count):
cost = dict.fromkeys(GEM_COLORS, 0)
for c in BASE_COLORS:
if c == color:
continue
cost[c] = random.choice(cost_choices)
points = random.choice(point_choices)
cards.append(Card(tier=tier, points=points, color=color, cost=cost))
return cards
def create_random_cards() -> list[Card]:
"""Generate a generic but Splendor-ish set of cards.
This is not the official deck, but structured similarly enough for play.
"""
cards: list[Card] = []
cards.extend(
create_random_cards_tier(
tier=1,
card_count=5,
cost_choices=[0, 1, 1, 2],
point_choices=[0, 0, 1],
)
)
cards.extend(
create_random_cards_tier(
tier=2,
card_count=4,
cost_choices=[2, 3, 4],
point_choices=[1, 2, 2, 3],
)
)
cards.extend(
create_random_cards_tier(
tier=3,
card_count=3,
cost_choices=[4, 5, 6],
point_choices=[3, 4, 5],
)
)
random.shuffle(cards)
return cards
def create_random_nobles() -> list[Noble]:
"""A small set of noble tiles, roughly Splendor-ish."""
nobles: list[Noble] = []
base_requirements: list[dict[GemColor, int]] = [
{"white": 3, "blue": 3, "green": 3},
{"blue": 3, "green": 3, "red": 3},
{"green": 3, "red": 3, "black": 3},
{"red": 3, "black": 3, "white": 3},
{"black": 3, "white": 3, "blue": 3},
{"white": 4, "blue": 4},
{"green": 4, "red": 4},
{"blue": 4, "black": 4},
]
for idx, req in enumerate(base_requirements, start=1):
nobles.append(
Noble(
name=f"Noble {idx}",
points=3,
requirements=dict(req.items()),
),
)
return nobles
def load_nobles(file: Path) -> list[Noble]:
"""Load nobles from a file."""
nobles = json.loads(file.read_text())
return [Noble(**noble) for noble in nobles]
def load_cards(file: Path) -> list[Card]:
"""Load cards from a file."""
cards = json.loads(file.read_text())
return [Card(**card) for card in cards]
def new_game(
strategies: Sequence[Strategy],
config: GameConfig,
) -> GameState:
"""Create a new game state from a config + list of players."""
num_players = len(strategies)
bank = get_default_starting_tokens(num_players)
decks_by_tier: dict[int, list[Card]] = {1: [], 2: [], 3: []}
for card in config.cards:
decks_by_tier.setdefault(card.tier, []).append(card)
for deck in decks_by_tier.values():
random.shuffle(deck)
table_by_tier: dict[int, list[Card]] = {1: [], 2: [], 3: []}
players = [PlayerState(strategy=strategy) for strategy in strategies]
nobles = list(config.nobles)
random.shuffle(nobles)
nobles = nobles[: num_players + 1]
game = GameState(
config=config,
players=players,
bank=bank,
decks_by_tier=decks_by_tier,
table_by_tier=table_by_tier,
available_nobles=nobles,
)
game.refill_table()
return game
def run_game(game: GameState) -> tuple[PlayerState, int]:
"""Run a full game loop until someone wins or a player returns None."""
turn_count = 0
while not game.finished:
turn_count += 1
player = game.current_player
strategy = player.strategy
action = strategy.choose_action(game, player)
if action is None:
game.finished = True
break
apply_action(game, strategy, action)
check_nobles_for_player(game, strategy, player)
winner = game.check_winner_simple()
if winner is not None:
return winner, turn_count
game.next_player()
if turn_count >= game.config.turn_limit:
break
fallback = max(game.players, key=lambda player: player.score)
return fallback, turn_count

View File

@@ -1,288 +0,0 @@
"""Bot for Splendor game."""
from __future__ import annotations
import random
from .base import (
BASE_COLORS,
Action,
BuyCard,
BuyCardReserved,
Card,
GameState,
GemColor,
PlayerState,
ReserveCard,
Strategy,
TakeDifferent,
TakeDouble,
auto_discard_tokens,
get_legal_actions,
)
def can_bot_afford(player: PlayerState, card: Card) -> bool:
"""Check if player can afford card, using discounts + gold."""
missing = 0
gold = player.tokens["gold"]
for color, cost in card.cost.items():
missing += max(0, cost - player.discounts.get(color, 0) - player.tokens.get(color, 0))
if missing > gold:
return False
return True
class RandomBot(Strategy):
"""Dumb bot that follows rules but doesn't think."""
def __init__(self, name: str) -> None:
"""Initialize the bot."""
super().__init__(name=name)
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Choose an action for the current player."""
affordable: list[tuple[int, int]] = []
for tier, row in game.table_by_tier.items():
for idx, card in enumerate(row):
if can_bot_afford(player, card):
affordable.append((tier, idx))
if affordable and random.random() < 0.5:
tier, idx = random.choice(affordable)
return BuyCard(tier=tier, index=idx)
if random.random() < 0.2:
tier = random.choice([1, 2, 3])
row = game.table_by_tier.get(tier, [])
if row:
idx = random.randrange(len(row))
return ReserveCard(tier=tier, index=idx, from_deck=False)
if random.random() < 0.5:
colors_for_double = [c for c in BASE_COLORS if game.bank[c] >= 4]
if colors_for_double:
return TakeDouble(color=random.choice(colors_for_double))
colors_for_diff = [c for c in BASE_COLORS if game.bank[c] > 0]
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[:3])
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Choose how many tokens to discard."""
return auto_discard_tokens(player, excess)
def check_cards_in_tier(row: list[Card], player: PlayerState) -> list[int]:
"""Check if player can afford card, using discounts + gold."""
return [index for index, card in enumerate(row) if can_bot_afford(player, card)]
class PersonalizedBot(Strategy):
"""PersonalizedBot."""
"""Dumb bot that follows rules but doesn't think."""
def __init__(self, name: str) -> None:
"""Initialize the bot."""
super().__init__(name=name)
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Choose an action for the current player."""
for tier in (1, 2, 3):
row = game.table_by_tier[tier]
if affordable := check_cards_in_tier(row, player):
index = random.choice(affordable)
return BuyCard(tier=tier, index=index)
colors_for_diff = [c for c in BASE_COLORS if game.bank[c] > 0]
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[:3])
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Choose how many tokens to discard."""
return auto_discard_tokens(player, excess)
class PersonalizedBot2(Strategy):
"""PersonalizedBot2."""
"""Dumb bot that follows rules but doesn't think."""
def __init__(self, name: str) -> None:
"""Initialize the bot."""
super().__init__(name=name)
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Choose an action for the current player."""
tiers = (1, 2, 3)
for tier in tiers:
row = game.table_by_tier[tier]
if affordable := check_cards_in_tier(row, player):
index = random.choice(affordable)
return BuyCard(tier=tier, index=index)
if affordable := check_cards_in_tier(player.reserved, player):
index = random.choice(affordable)
return BuyCardReserved(index=index)
colors_for_diff = [c for c in BASE_COLORS if game.bank[c] > 0]
if len(colors_for_diff) >= 3:
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[:3])
for tier in tiers:
len_deck = len(game.decks_by_tier[tier])
if len_deck:
return ReserveCard(tier=tier, index=None, from_deck=True)
return TakeDifferent(colors=colors_for_diff[:3])
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Choose how many tokens to discard."""
return auto_discard_tokens(player, excess)
def buy_card_reserved(player: PlayerState) -> Action | None:
"""Buy a card reserved."""
if affordable := check_cards_in_tier(player.reserved, player):
index = random.choice(affordable)
return BuyCardReserved(index=index)
return None
def buy_card(game: GameState, player: PlayerState) -> Action | None:
"""Buy a card."""
for tier in (1, 2, 3):
row = game.table_by_tier[tier]
if affordable := check_cards_in_tier(row, player):
index = random.choice(affordable)
return BuyCard(tier=tier, index=index)
return None
def take_tokens(game: GameState) -> Action | None:
"""Take tokens."""
colors_for_diff = [color for color in BASE_COLORS if game.bank[color] > 0]
if len(colors_for_diff) >= 3:
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[: game.config.max_token_take])
return None
class PersonalizedBot3(Strategy):
"""PersonalizedBot3."""
"""Dumb bot that follows rules but doesn't think."""
def __init__(self, name: str) -> None:
"""Initialize the bot."""
super().__init__(name=name)
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Choose an action for the current player."""
print(len(get_legal_actions(game, player)))
print(get_legal_actions(game, player))
if action := buy_card_reserved(player):
return action
if action := buy_card(game, player):
return action
colors_for_diff = [color for color in BASE_COLORS if game.bank[color] > 0]
if len(colors_for_diff) >= 3:
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[:3])
for tier in (1, 2, 3):
len_deck = len(game.decks_by_tier[tier])
if len_deck:
return ReserveCard(tier=tier, index=None, from_deck=True)
return TakeDifferent(colors=colors_for_diff[:3])
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Choose how many tokens to discard."""
return auto_discard_tokens(player, excess)
def estimate_value_of_card(game: GameState, player: PlayerState, color: GemColor) -> int:
"""Estimate value of a color in the player's bank."""
return game.bank[color] - player.discounts.get(color, 0)
def estimate_value_of_token(game: GameState, player: PlayerState, color: GemColor) -> int:
"""Estimate value of a color in the player's bank."""
return game.bank[color] - player.discounts.get(color, 0)
class PersonalizedBot4(Strategy):
"""PersonalizedBot4."""
def __init__(self, name: str) -> None:
"""Initialize the bot."""
super().__init__(name=name)
def filter_actions(self, actions: list[Action]) -> list[Action]:
"""Filter actions to only take different."""
return [
action
for action in actions
if (isinstance(action, TakeDifferent) and len(action.colors) == 3) or not isinstance(action, TakeDifferent)
]
def choose_action(self, game: GameState, player: PlayerState) -> Action | None:
"""Choose an action for the current player."""
legal_actions = get_legal_actions(game, player)
print(len(legal_actions))
good_actions = self.filter_actions(legal_actions)
print(len(good_actions))
print(good_actions)
print(len(get_legal_actions(game, player)))
if action := buy_card_reserved(player):
return action
if action := buy_card(game, player):
return action
colors_for_diff = [color for color in BASE_COLORS if game.bank[color] > 0]
if len(colors_for_diff) >= 3:
random.shuffle(colors_for_diff)
return TakeDifferent(colors=colors_for_diff[:3])
for tier in (1, 2, 3):
len_deck = len(game.decks_by_tier[tier])
if len_deck:
return ReserveCard(tier=tier, index=None, from_deck=True)
return TakeDifferent(colors=colors_for_diff[:3])
def choose_discard(
self,
game: GameState, # noqa: ARG002
player: PlayerState,
excess: int,
) -> dict[GemColor, int]:
"""Choose how many tokens to discard."""
return auto_discard_tokens(player, excess)

View File

@@ -1,724 +0,0 @@
"""Splendor game."""
from __future__ import annotations
import sys
from typing import TYPE_CHECKING, Any
from textual.app import App, ComposeResult
from textual.containers import Horizontal, Vertical
from textual.widget import Widget
from textual.widgets import Footer, Header, Input, Static
from .base import (
BASE_COLORS,
GEM_COLORS,
Action,
BuyCard,
BuyCardReserved,
Card,
GameState,
GemColor,
Noble,
PlayerState,
ReserveCard,
Strategy,
TakeDifferent,
TakeDouble,
)
if TYPE_CHECKING:
from collections.abc import Mapping
# Abbreviations used when rendering costs
COST_ABBR: dict[GemColor, str] = {
"white": "W",
"blue": "B",
"green": "G",
"red": "R",
"black": "K",
"gold": "O",
}
# Abbreviations players can type on the command line
COLOR_ABBR_TO_FULL: dict[str, GemColor] = {
"w": "white",
"b": "blue",
"g": "green",
"r": "red",
"k": "black",
"o": "gold",
}
def parse_color_token(raw: str) -> GemColor:
"""Convert user input into a GemColor.
Supports:
- full names: white, blue, green, red, black, gold
- abbreviations: w, b, g, r, k, o
"""
key = raw.lower()
# full color names first
if key in BASE_COLORS:
return key # type: ignore[return-value]
# abbreviations
if key in COLOR_ABBR_TO_FULL:
return COLOR_ABBR_TO_FULL[key]
error = f"Unknown color: {raw}"
raise ValueError(error)
def format_cost(cost: Mapping[GemColor, int]) -> str:
"""Format a cost/requirements dict as colored tokens like 'B:2, R:1'.
Uses `color_token` internally so colors are guaranteed to match your bank.
"""
parts: list[str] = []
for color in GEM_COLORS:
n = cost.get(color, 0)
if not n:
continue
# color_token gives us e.g. "[blue]blue: 3[/]"
token = color_token(color, n)
# Turn the leading color name into the abbreviation (blue: 3 → B:3)
# We only replace the first occurrence.
full = f"{color}:"
abbr = f"{COST_ABBR[color]}:"
token = token.replace(full, abbr, 1)
parts.append(token)
return ", ".join(parts) if parts else "-"
def format_card(card: Card) -> str:
"""Readable card line using dataclass fields instead of __str__."""
color_abbr = COST_ABBR[card.color]
header = f"T{card.tier} {color_abbr} P{card.points}"
cost_str = format_cost(card.cost)
return f"{header} ({cost_str})"
def format_noble(noble: Noble) -> str:
"""Readable noble line using dataclass fields instead of __str__."""
cost_str = format_cost(noble.requirements)
return f"{noble.name} +{noble.points} ({cost_str})"
def format_tokens(tokens: Mapping[GemColor, int]) -> str:
"""Colored 'color: n' list for a token dict."""
return " ".join(color_token(c, tokens.get(c, 0)) for c in GEM_COLORS)
def format_discounts(discounts: Mapping[GemColor, int]) -> str:
"""Colored discounts, skipping zeros."""
parts: list[str] = []
for c in GEM_COLORS:
n = discounts.get(c, 0)
if not n:
continue
abbr = COST_ABBR[c]
fg, bg = COLOR_STYLE[c]
parts.append(f"[{fg} on {bg}]{abbr}:{n}[/{fg} on {bg}]")
return ", ".join(parts) if parts else "-"
COLOR_STYLE: dict[GemColor, tuple[str, str]] = {
"white": ("black", "white"), # fg, bg
"blue": ("bright_white", "blue"),
"green": ("bright_white", "sea_green4"),
"red": ("white", "red3"),
"black": ("white", "grey0"),
"gold": ("black", "yellow3"),
}
def fmt_gem(color: GemColor) -> str:
"""Render gem name with fg/bg matching real token color."""
fg, bg = COLOR_STYLE[color]
return f"[{fg} on {bg}] {color} [/{fg} on {bg}]"
def fmt_number(value: int) -> str:
"""Return a Rich-markup colored 'value' string."""
return f"[bold cyan]{value}[/]"
def color_token(name: GemColor, amount: int) -> str:
"""Return a Rich-markup colored 'name: n' string."""
# Map Splendor colors -> terminal colors
color_map: Mapping[GemColor, str] = {
"white": "white",
"blue": "blue",
"green": "green",
"red": "red",
"black": "grey70", # 'black' is unreadable on dark backgrounds
"gold": "yellow",
}
style = color_map.get(name, "white")
return f"[{style}]{name}: {amount}[/]"
class Board(Widget):
"""Big board widget with the layout you sketched."""
def __init__(self, game: GameState, me: PlayerState, **kwargs: Any) -> None: # noqa: ANN401
"""Initialize the board widget."""
super().__init__(**kwargs)
self.game = game
self.me = me
def compose(self) -> ComposeResult:
"""Compose the board widget."""
# Structure:
# ┌ bank row
# ├ middle row (tiers | nobles)
# └ players row
with Vertical(id="board_root"):
yield Static(id="bank_box")
with Horizontal(id="middle_row"):
with Vertical(id="tiers_box"):
yield Static(id="tier1_box")
yield Static(id="tier2_box")
yield Static(id="tier3_box")
yield Static(id="nobles_box")
yield Static(id="players_box")
def on_mount(self) -> None:
"""Refresh the board content."""
self.refresh_content()
def refresh_content(self) -> None:
"""Refresh the board content."""
self._render_bank()
self._render_tiers()
self._render_nobles()
self._render_players()
# --- sections ----------------------------------------------------
def _render_bank(self) -> None:
bank = self.game.bank
parts: list[str] = ["[b]Bank:[/b]"]
# One line, all tokens colored
parts.append(format_tokens(bank))
self.query_one("#bank_box", Static).update("\n".join(parts))
def _render_tiers(self) -> None:
for tier in (1, 2, 3):
box = self.query_one(f"#tier{tier}_box", Static)
cards: list[Card] = self.game.table_by_tier.get(tier, [])
lines: list[str] = [f"[b]Tier {tier} cards:[/b]"]
if not cards:
lines.append(" (none)")
else:
for idx, card in enumerate(cards):
lines.append(f" [{idx}] {format_card(card)}")
box.update("\n".join(lines))
def _render_nobles(self) -> None:
nobles_box = self.query_one("#nobles_box", Static)
lines: list[str] = ["[b]Nobles[/b]"]
if not self.game.available_nobles:
lines.append(" (none)")
else:
lines.extend(" - " + format_noble(noble) for noble in self.game.available_nobles)
nobles_box.update("\n".join(lines))
def _render_players(self) -> None:
players_box = self.query_one("#players_box", Static)
lines: list[str] = ["[b]Players:[/b]", ""]
for player in self.game.players:
mark = "*" if player is self.me else " "
token_str = format_tokens(player.tokens)
discount_str = format_discounts(player.discounts)
lines.append(
f"{mark} {player.name:10} Score={player.score:2d} Discounts={discount_str}",
)
lines.append(f" Tokens: {token_str}")
if player.nobles:
noble_names = ", ".join(n.name for n in player.nobles)
lines.append(f" Nobles: {noble_names}")
# Optional: show counts of cards / reserved
if player.cards:
lines.append(f" Cards: {len(player.cards)}")
if player.reserved:
lines.append(f" Reserved: {len(player.reserved)}")
lines.append("")
players_box.update("\n".join(lines))
class ActionApp(App[None]):
"""Textual app that asks for a single action command and returns an Action."""
CSS = """
Screen {
/* 3 rows: command zone, board, footer */
layout: grid;
grid-size: 1 3;
grid-rows: auto 1fr auto;
}
/* Top area with input + instructions */
#command_zone {
grid-columns: 1;
grid-rows: 1;
padding: 1 1;
}
/* Board sits in the middle row and can grow */
#board {
grid-columns: 1;
grid-rows: 2;
padding: 0 1 1 1;
}
Footer {
grid-columns: 1;
grid-rows: 3;
}
Input {
border: round $accent;
}
/* === Board layout === */
#board_root {
/* outer frame around the whole board area */
border: heavy white;
padding: 0 1;
}
/* Bank row: full width */
#bank_box {
border: heavy white;
padding: 0 1;
}
/* Middle row: tiers (left) + nobles (right) */
#middle_row {
layout: horizontal;
}
#tiers_box {
border: heavy white;
padding: 0 1;
width: 70%;
}
#tier1_box,
#tier2_box,
#tier3_box {
border-bottom: heavy white;
padding: 0 0 1 0;
margin-bottom: 1;
}
#nobles_box {
border: heavy white;
padding: 0 1;
width: 30%;
}
/* Players row: full width at bottom */
#players_box {
border: heavy white;
padding: 0 1;
}
"""
def __init__(self, game: GameState, player: PlayerState) -> None:
"""Initialize the action app."""
super().__init__()
self.game = game
self.player = player
self.result: Action | None = None
self.message: str = ""
def compose(self) -> ComposeResult:
"""Compose the action app."""
# Row 1: input + Actions text
with Vertical(id="command_zone"):
yield Input(
placeholder="Enter command, e.g. '1 white blue red' or '1 w b r' or 'q'",
id="input_line",
)
yield Static("", id="prompt")
# Row 2: board
yield Board(self.game, self.player, id="board")
# Row 3: footer
yield Footer()
def on_mount(self) -> None:
"""Mount the action app."""
self._update_prompt()
self.query_one(Input).focus()
def _update_prompt(self) -> None:
lines: list[str] = []
lines.append("[bold underline]Actions:[/]")
lines.append(
" [bold green]1[/] <colors...> - Take up to 3 different gem colors "
"(e.g. [cyan]1 white blue red[/] or [cyan]1 w b r[/])",
)
lines.append(
f" [bold green]2[/] <color> - Take 2 of the same color (needs {fmt_number(4)} in bank, "
"e.g. [cyan]2 blue[/] or [cyan]2 b[/])",
)
lines.append(
" [bold green]3[/] <tier> <idx> - Buy a face-up card (e.g. [cyan]3 1 0[/] for tier 1, index 0)",
)
lines.append(" [bold green]4[/] <idx> - Buy a reserved card")
lines.append(" [bold green]5[/] <tier> <idx> - Reserve a face-up card")
lines.append(" [bold green]6[/] <tier> - Reserve top card of a deck")
lines.append(" [bold red]q[/] - Quit game")
if self.message:
lines.append("")
lines.append(f"[bold red]Message:[/] {self.message}")
self.query_one("#prompt", Static).update("\n".join(lines))
def _cmd_1(self, parts: list[str]) -> str | None:
"""Take up to 3 different gem colors: 1 white blue red OR 1 w b r."""
color_names = parts[1:]
if not color_names:
return "Need at least one color (full name or abbreviation)."
colors: list[GemColor] = []
for name in color_names:
color = parse_color_token(name)
if self.game.bank[color] <= 0:
return f"No tokens left for color: {color}"
colors.append(color)
self.result = TakeDifferent(colors=colors[:3])
self.exit()
return None
def _cmd_2(self, parts: list[str]) -> str | None:
"""Take two of the same color."""
if len(parts) < 2:
return "Usage: 2 <color>"
color = parse_color_token(parts[1])
if self.game.bank[color] < self.game.config.minimum_tokens_to_buy_2:
return "Bank must have at least 4 of that color."
self.result = TakeDouble(color=color)
self.exit()
return None
def _cmd_3(self, parts: list[str]) -> str | None:
"""Buy face-up card."""
if len(parts) < 3:
return "Usage: 3 <tier> <index>"
tier = int(parts[1])
idx = int(parts[2])
self.result = BuyCard(tier=tier, index=idx)
self.exit()
return None
def _cmd_4(self, parts: list[str]) -> str | None:
"""Buy reserved card."""
if len(parts) < 2:
return "Usage: 4 <reserved_index>"
idx = int(parts[1])
if not (0 <= idx < len(self.player.reserved)):
return "Reserved index out of range."
self.result = BuyCardReserved(tier=0, index=idx)
self.exit()
return None
def _cmd_5(self, parts: list[str]) -> str | None:
"""Reserve face-up card."""
if len(parts) < 3:
return "Usage: 5 <tier> <index>"
tier = int(parts[1])
idx = int(parts[2])
self.result = ReserveCard(tier=tier, index=idx, from_deck=False)
self.exit()
return None
def _cmd_6(self, parts: list[str]) -> str | None:
"""Reserve top of deck."""
if len(parts) < 2:
return "Usage: 6 <tier>"
tier = int(parts[1])
self.result = ReserveCard(tier=tier, index=None, from_deck=True)
self.exit()
return None
def _unknown_cmd(self, _parts: list[str]) -> str:
return "Unknown command."
def on_input_submitted(self, event: Input.Submitted) -> None:
"""Handle user input."""
text = (event.value or "").strip()
event.input.value = ""
if not text:
return
if text.lower() in {"q", "quit", "0"}:
self.result = None
self.exit()
return
parts = text.split()
cmds = {
"1": self._cmd_1,
"2": self._cmd_2,
"3": self._cmd_3,
"4": self._cmd_4,
"5": self._cmd_5,
"6": self._cmd_6,
}
cmd = parts[0]
error = cmds.get(cmd, self._unknown_cmd)(parts)
if error:
self.message = error
self._update_prompt()
return
class DiscardApp(App[None]):
"""Textual app to choose discards when over token limit."""
CSS = """
Screen {
layout: vertical;
}
#command_zone {
padding: 1 1;
}
#board {
padding: 0 1 1 1;
}
Input {
border: round $accent;
}
"""
def __init__(self, game: GameState, player: PlayerState) -> None:
"""Initialize the discard app."""
super().__init__()
self.game = game
self.player = player
self.discards: dict[GemColor, int] = dict.fromkeys(GEM_COLORS, 0)
self.message: str = ""
def compose(self) -> ComposeResult: # type: ignore[override]
"""Compose the discard app."""
yield Header(show_clock=False)
with Vertical(id="command_zone"):
yield Input(
placeholder="Enter color to discard, e.g. 'blue' or 'b'",
id="input_line",
)
yield Static("", id="prompt")
# Board directly under the command zone
yield Board(self.game, self.player, id="board")
yield Footer()
def on_mount(self) -> None: # type: ignore[override]
"""Mount the discard app."""
self._update_prompt()
self.query_one(Input).focus()
def _remaining_to_discard(self) -> int:
return self.player.total_tokens() - sum(self.discards.values()) - self.game.config.token_limit
def _update_prompt(self) -> None:
remaining = max(self._remaining_to_discard(), 0)
lines: list[str] = []
lines.append(
"You must discard "
f"{fmt_number(remaining)} token(s) "
f"to get down to {fmt_number(self.game.config.token_limit)}.",
)
disc_str = ", ".join(f"{fmt_gem(c)}={fmt_number(self.discards[c])}" for c in GEM_COLORS)
lines.append(f"Current planned discards: {{ {disc_str} }}")
lines.append(
"Type a color name or abbreviation (e.g. 'blue' or 'b') to discard one token.",
)
if self.message:
lines.append("")
lines.append(f"[bold red]Message:[/] {self.message}")
self.query_one("#prompt", Static).update("\n".join(lines))
def on_input_submitted(self, event: Input.Submitted) -> None: # type: ignore[override]
"""Handle user input."""
raw = (event.value or "").strip()
event.input.value = ""
if not raw:
return
try:
color = parse_color_token(raw)
except ValueError:
self.message = f"Unknown color: {raw}"
self._update_prompt()
return
available = self.player.tokens[color] - self.discards[color]
if available <= 0:
self.message = f"No more {color} tokens available to discard."
self._update_prompt()
return
self.discards[color] += 1
if self._remaining_to_discard() <= 0:
self.exit()
return
self.message = ""
self._update_prompt()
# ---------------------------------------------------------------------------
# Noble choice app
# ---------------------------------------------------------------------------
class NobleChoiceApp(App[None]):
"""Textual app to choose one noble."""
CSS = """
Screen {
layout: vertical;
}
#command_zone {
padding: 1 1;
}
#board {
padding: 0 1 1 1;
}
Input {
border: round $accent;
}
"""
def __init__(
self,
game: GameState,
player: PlayerState,
nobles: list[Noble],
) -> None:
"""Initialize the noble choice app."""
super().__init__()
self.game = game
self.player = player
self.nobles = nobles
self.result: Noble | None = None
self.message: str = ""
def compose(self) -> ComposeResult: # type: ignore[override]
"""Compose the noble choice app."""
yield Header(show_clock=False)
with Vertical(id="command_zone"):
yield Input(
placeholder="Enter noble index, e.g. '0'",
id="input_line",
)
yield Static("", id="prompt")
# Board directly under the command zone
yield Board(self.game, self.player, id="board")
yield Footer()
def on_mount(self) -> None: # type: ignore[override]
"""Mount the noble choice app."""
self._update_prompt()
self.query_one(Input).focus()
def _update_prompt(self) -> None:
lines: list[str] = []
lines.append("[bold underline]You qualify for nobles:[/]")
for i, noble in enumerate(self.nobles):
lines.append(f" [bright_cyan]{i})[/] {format_noble(noble)}")
lines.append("Enter the index of the noble you want.")
if self.message:
lines.append("")
lines.append(f"[bold red]Message:[/] {self.message}")
self.query_one("#prompt", Static).update("\n".join(lines))
def on_input_submitted(self, event: Input.Submitted) -> None: # type: ignore[override]
"""Handle user input."""
raw = (event.value or "").strip()
event.input.value = ""
if not raw:
return
try:
idx = int(raw)
except ValueError:
self.message = "Please enter a valid integer index."
self._update_prompt()
return
if not (0 <= idx < len(self.nobles)):
self.message = "Index out of range."
self._update_prompt()
return
self.result = self.nobles[idx]
self.exit()
class TuiHuman(Strategy):
"""Textual-based human player Strategy with colorful board."""
def choose_action(
self,
game: GameState,
player: PlayerState,
) -> Action | None:
"""Choose an action for the player."""
if not sys.stdout.isatty():
return None
app = ActionApp(game, player)
app.run()
return app.result
def choose_discard(
self,
game: GameState,
player: PlayerState,
excess: int, # noqa: ARG002
) -> dict[GemColor, int]:
"""Choose tokens to discard."""
if not sys.stdout.isatty():
return dict.fromkeys(GEM_COLORS, 0)
app = DiscardApp(game, player)
app.run()
return app.discards
def choose_noble(
self,
game: GameState,
player: PlayerState,
nobles: list[Noble],
) -> Noble:
"""Choose a noble for the player."""
if not sys.stdout.isatty():
return nobles[0]
app = NobleChoiceApp(game, player, nobles)
app.run()
return app.result

View File

@@ -1,19 +0,0 @@
"""Main entry point for Splendor game."""
from __future__ import annotations
from .base import new_game, run_game
from .bot import RandomBot
from .human import TuiHuman
def main() -> None:
"""Main entry point."""
human = TuiHuman()
bot = RandomBot()
game_state = new_game(["You", "Bot A"])
run_game(game_state, [human, bot])
if __name__ == "__main__":
main()

View File

@@ -1,111 +0,0 @@
"""Public state for RL/search."""
from __future__ import annotations
from dataclasses import dataclass
from .base import (
BASE_COLORS,
BASE_INDEX,
GEM_ORDER,
Card,
GameState,
Noble,
PlayerState,
)
@dataclass(frozen=True)
class ObsCard:
"""Numeric-ish card view for RL/search."""
tier: int
points: int
color_index: int
cost: list[int]
@dataclass(frozen=True)
class ObsNoble:
"""Numeric-ish noble view for RL/search."""
points: int
requirements: list[int]
@dataclass(frozen=True)
class ObsPlayer:
"""Numeric-ish player view for RL/search."""
tokens: list[int]
discounts: list[int]
score: int
cards: list[ObsCard]
reserved: list[ObsCard]
nobles: list[ObsNoble]
@dataclass(frozen=True)
class Observation:
"""Full public state for RL/search."""
current_player: int
bank: list[int]
players: list[ObsPlayer]
table_by_tier: dict[int, list[ObsCard]]
decks_remaining: dict[int, int]
available_nobles: list[ObsNoble]
def _encode_card(card: Card) -> ObsCard:
color_index = BASE_INDEX.get(card.color, -1)
cost_vec = [card.cost.get(c, 0) for c in BASE_COLORS]
return ObsCard(
tier=card.tier,
points=card.points,
color_index=color_index,
cost=cost_vec,
)
def _encode_noble(noble: Noble) -> ObsNoble:
req_vec = [noble.requirements.get(c, 0) for c in BASE_COLORS]
return ObsNoble(
points=noble.points,
requirements=req_vec,
)
def _encode_player(player: PlayerState) -> ObsPlayer:
tokens_vec = [player.tokens[c] for c in GEM_ORDER]
discounts_vec = [player.discounts[c] for c in GEM_ORDER]
cards_enc = [_encode_card(c) for c in player.cards]
reserved_enc = [_encode_card(c) for c in player.reserved]
nobles_enc = [_encode_noble(n) for n in player.nobles]
return ObsPlayer(
tokens=tokens_vec,
discounts=discounts_vec,
score=player.score,
cards=cards_enc,
reserved=reserved_enc,
nobles=nobles_enc,
)
def to_observation(game: GameState) -> Observation:
"""Create a structured observation of the full public state."""
bank_vec = [game.bank[c] for c in GEM_ORDER]
players_enc = [_encode_player(p) for p in game.players]
table_enc: dict[int, list[ObsCard]] = {
tier: [_encode_card(c) for c in row] for tier, row in game.table_by_tier.items()
}
decks_remaining = {tier: len(deck) for tier, deck in game.decks_by_tier.items()}
nobles_enc = [_encode_noble(n) for n in game.available_nobles]
return Observation(
current_player=game.current_player_index,
bank=bank_vec,
players=players_enc,
table_by_tier=table_enc,
decks_remaining=decks_remaining,
available_nobles=nobles_enc,
)

View File

@@ -1,36 +0,0 @@
"""Simulate a step in the game."""
from __future__ import annotations
import copy
from .base import Action, GameState, PlayerState, apply_action, check_nobles_for_player
from .bot import RandomBot
class SimStrategy(RandomBot):
"""Strategy used in simulate_step.
We never call choose_action here (caller chooses actions),
but we reuse discard/noble-selection logic.
"""
def choose_action(self, game: GameState, player: PlayerState) -> Action | None: # noqa: ARG002
"""Choose an action for the current player."""
msg = "SimStrategy.choose_action should not be used in simulate_step"
raise RuntimeError(msg)
def simulate_step(game: GameState, action: Action) -> GameState:
"""Return a deep-copied next state after applying action for the current player.
Useful for tree search / MCTS:
next_state = simulate_step(state, action)
"""
next_state = copy.deepcopy(game)
sim_strategy = SimStrategy()
apply_action(next_state, sim_strategy, action)
check_nobles_for_player(next_state, sim_strategy, next_state.current_player)
next_state.next_player()
return next_state

View File

@@ -1,50 +0,0 @@
"""Simulator for Splendor game."""
from __future__ import annotations
from collections import defaultdict
from pathlib import Path
from statistics import mean
from .base import GameConfig, load_cards, load_nobles, new_game, run_game
from .bot import PersonalizedBot4, RandomBot
def main() -> None:
"""Main entry point."""
turn_limit = 1000
good_games = 0
games = 1
winners: dict[str, list] = defaultdict(list)
game_data = Path(__file__).parent / "game_data"
cards = load_cards(game_data / "cards/default.json")
nobles = load_nobles(game_data / "nobles/default.json")
for _ in range(games):
bot_a = RandomBot("bot_a")
bot_b = RandomBot("bot_b")
bot_c = RandomBot("bot_c")
bot_d = PersonalizedBot4("my_bot")
config = GameConfig(
cards=cards,
nobles=nobles,
turn_limit=turn_limit,
)
players = (bot_a, bot_b, bot_c, bot_d)
game_state = new_game(players, config)
winner, turns = run_game(game_state)
if turns < turn_limit:
good_games += 1
winners[winner.strategy.name].append(turns)
print(
f"out of {games} {turn_limit} turn games with {len(players)}"
f"random bots there where {good_games} games where a bot won"
)
for name, turns in winners.items():
print(f"{name} won {len(turns)} games in {mean(turns):.2f} turns")
if __name__ == "__main__":
main()

View File

@@ -1 +0,0 @@
"""system_tests."""

View File

@@ -1,99 +0,0 @@
"""Validate Jeeves."""
from __future__ import annotations
import logging
from copy import copy
from re import search
from time import sleep
from typing import TYPE_CHECKING
from python.common import bash_wrapper
from python.zfs import Zpool
if TYPE_CHECKING:
from collections.abc import Sequence
logger = logging.getLogger(__name__)
def zpool_tests(pool_names: Sequence[str], zpool_capacity_threshold: int = 90) -> list[str] | None:
"""Check the zpool health and capacity.
Args:
pool_names (Sequence[str]): A list of pool names to test.
zpool_capacity_threshold (int, optional): The threshold for the zpool capacity. Defaults to 90.
Returns:
list[str] | None: A list of errors if any.
"""
logger.info("Testing zpool")
errors: list[str] = []
for pool_name in pool_names:
pool = Zpool(pool_name)
if pool.health != "ONLINE":
errors.append(f"{pool.name} is {pool.health}")
if pool.capacity >= zpool_capacity_threshold:
errors.append(f"{pool.name} is low on space")
upgrade_status, _ = bash_wrapper("zpool upgrade")
if not search(r"Every feature flags pool has all supported and requested features enabled.", upgrade_status):
errors.append("ZPool out of date run `sudo zpool upgrade -a`")
return errors
def systemd_tests(
service_names: Sequence[str],
max_retries: int = 30,
retry_delay_secs: int = 1,
retryable_statuses: Sequence[str] | None = None,
valid_statuses: Sequence[str] | None = None,
) -> list[str] | None:
"""Tests a systemd services.
Args:
service_names (Sequence[str]): A list of service names to test.
max_retries (int, optional): The maximum number of retries. Defaults to 30.
minimum value is 1.
retry_delay_secs (int, optional): The delay between retries in seconds. Defaults to 1.
minimum value is 1.
retryable_statuses (Sequence[str] | None, optional): A list of retryable statuses. Defaults to None.
valid_statuses (Sequence[str] | None, optional): A list of valid statuses. Defaults to None.
Returns:
list[str] | None: A list of errors if any.
"""
logger.info("Testing systemd service")
max_retries = max(max_retries, 1)
retry_delay_secs = max(retry_delay_secs, 1)
last_try = max_retries - 1
if retryable_statuses is None:
retryable_statuses = ("inactive\n", "activating\n")
if valid_statuses is None:
valid_statuses = ("active\n",)
service_names_set = set(service_names)
errors: set[str] = set()
for retry in range(max_retries):
if not service_names_set:
break
logger.info(f"Testing systemd service in {retry + 1} of {max_retries}")
service_names_to_test = copy(service_names_set)
for service_name in service_names_to_test:
service_status, _ = bash_wrapper(f"systemctl is-active {service_name}")
if service_status in valid_statuses:
service_names_set.remove(service_name)
continue
if service_status in retryable_statuses and retry < last_try:
continue
errors.add(f"{service_name} is {service_status.strip()}")
sleep(retry_delay_secs)
return list(errors)

View File

@@ -1,66 +0,0 @@
"""Validate {server_name}."""
import logging
import sys
import tomllib
from os import environ
from pathlib import Path
from socket import gethostname
import typer
from python.common import configure_logger, signal_alert
from python.system_tests.components import systemd_tests, zpool_tests
logger = logging.getLogger(__name__)
def load_config_data(config_file: Path) -> dict[str, list[str]]:
"""Load a TOML configuration file.
Args:
config_file (Path): The path to the configuration file.
Returns:
dict: The configuration data.
"""
return tomllib.loads(config_file.read_text())
def main(config_file: Path) -> None:
"""Main."""
configure_logger(level=environ.get("LOG_LEVEL", "INFO"))
server_name = gethostname()
logger.info(f"Starting {server_name} validation")
config_data = load_config_data(config_file)
errors: list[str] = []
try:
if config_data.get("zpools") and (zpool_errors := zpool_tests(config_data["zpools"])):
errors.extend(zpool_errors)
if config_data.get("services") and (systemd_errors := systemd_tests(config_data["services"])):
errors.extend(systemd_errors)
except Exception as error:
logger.exception(f"{server_name} validation failed")
errors.append(f"{server_name} validation failed: {error}")
if errors:
logger.error(f"{server_name} validation failed: \n{'\n'.join(errors)}")
signal_alert(f"{server_name} validation failed {errors}")
sys.exit(1)
logger.info(f"{server_name} validation passed")
def cli() -> None:
"""CLI."""
typer.run(main)
if __name__ == "__main__":
cli()

View File

@@ -1 +0,0 @@
"""init."""

View File

@@ -1 +0,0 @@
"""init."""

View File

@@ -1,11 +0,0 @@
"""Bar."""
import logging
logger = logging.getLogger(__name__)
def bar() -> None:
"""Bar."""
logger.debug(f"bar {__name__}")
logger.debug("bar")

View File

@@ -1,20 +0,0 @@
"""configure_logger."""
import logging
import sys
def configure_logger(level: str = "INFO", test: str | None = None) -> None:
"""Configure the logger.
Args:
level (str, optional): The logging level. Defaults to "INFO".
test (str | None, optional): The test name. Defaults to None.
"""
logging.basicConfig(
level=level,
datefmt="%Y-%m-%dT%H:%M:%S%z",
format="%(asctime)s %(levelname)s %(filename)s:%(lineno)d - %(message)s" # this is nesiseary
f" {test}",
handlers=[logging.StreamHandler(sys.stdout)],
)

View File

@@ -1,17 +0,0 @@
"""foo."""
import logging
from python.testing.logging.bar import bar
from python.testing.logging.configure_logger import configure_logger
logger = logging.getLogger(__name__)
def foo() -> None:
"""Foo."""
configure_logger("DEBUG", "FOO")
logger.debug(f"foo {__name__}")
logger.debug("foo")
bar()

View File

@@ -1,33 +0,0 @@
"""main."""
import logging
from python.testing.logging.bar import bar
from python.testing.logging.configure_logger import configure_logger
from python.testing.logging.foo import foo
logger = logging.getLogger(__name__)
def main() -> None:
"""Main."""
configure_logger("DEBUG")
# handler = logging.StreamHandler()
# Create and attach a formatter
# formatter = logging.Formatter(
# "%(asctime)s %(levelname)s %(filename)s:%(lineno)d - %(message)s FOO"
# )
# handler.setFormatter(formatter)
# Attach handler to logger
# foo_logger = logging.getLogger("python.testing.logging.foo")
# foo_logger.addHandler(handler)
# foo_logger.propagate = True
logger.debug("main")
foo()
bar()
if __name__ == "__main__":
main()

View File

@@ -1 +0,0 @@
"""Server Tools."""

View File

@@ -1,144 +0,0 @@
"""snapshot_manager."""
from __future__ import annotations
import logging
import sys
import tomllib
from functools import cache
from pathlib import Path # noqa: TC003 This is required for the typer CLI
from re import compile as re_compile
from re import search
import typer
from python.common import configure_logger, signal_alert, utcnow
from python.zfs import Dataset, get_datasets
logger = logging.getLogger(__name__)
def main(config_file: Path) -> None:
"""Main."""
configure_logger(level="DEBUG")
logger.info("Starting snapshot_manager")
try:
time_stamp = get_time_stamp()
for dataset in get_datasets():
status = dataset.create_snapshot(time_stamp)
logger.debug(f"{status=}")
if status != "snapshot created":
msg = f"{dataset.name} failed to create snapshot {time_stamp}"
logger.error(msg)
signal_alert(msg)
continue
get_snapshots_to_delete(dataset, get_count_lookup(config_file, dataset.name))
except Exception:
logger.exception("snapshot_manager failed")
signal_alert("snapshot_manager failed")
sys.exit(1)
else:
logger.info("snapshot_manager completed")
def get_count_lookup(config_file: Path, dataset_name: str) -> dict[str, int]:
"""Get the count lookup.
Args:
config_file (Path): The path to the configuration file.
dataset_name (str): The name of the dataset.
Returns:
dict[str, int]: The count lookup.
"""
config_data = load_config_data(config_file)
return config_data.get(dataset_name, get_default_config(config_data))
def get_default_config(config_data: dict[str, dict[str, int]]) -> dict[str, int]:
"""Get the default configuration.
Args:
config_data (dict[str, dict[str, int]]): The configuration data.
Returns:
dict[str, int]: The default configuration.
"""
return config_data.get(
"default",
{"15_min": 4, "hourly": 12, "daily": 0, "monthly": 0},
)
@cache
def load_config_data(config_file: Path) -> dict[str, dict[str, int]]:
"""Load a TOML configuration file.
Args:
config_file (Path): The path to the configuration file.
Returns:
dict: The configuration data.
"""
return tomllib.loads(config_file.read_text())
def get_snapshots_to_delete(
dataset: Dataset,
count_lookup: dict[str, int],
) -> None:
"""Get snapshots to delete.
Args:
dataset (Dataset): the dataset
count_lookup (dict[str, int]): the count lookup
"""
snapshots = dataset.get_snapshots()
if not snapshots:
logger.info(f"{dataset.name} has no snapshots")
return
filters = (
("15_min", re_compile(r"auto_\d{10}(?:15|30|45)")),
("hourly", re_compile(r"auto_\d{8}(?!00)\d{2}00")),
("daily", re_compile(r"auto_\d{6}(?!01)\d{2}0000")),
("monthly", re_compile(r"auto_\d{6}010000")),
)
for filter_name, snapshot_filter in filters:
logger.debug(f"{filter_name=}\n{snapshot_filter=}")
filtered_snapshots = sorted(snapshot.name for snapshot in snapshots if search(snapshot_filter, snapshot.name))
logger.debug(f"{filtered_snapshots=}")
snapshots_wanted = count_lookup[filter_name]
snapshots_being_deleted = filtered_snapshots[:-snapshots_wanted] if snapshots_wanted > 0 else filtered_snapshots
logger.info(f"{snapshots_being_deleted} are being deleted")
for snapshot in snapshots_being_deleted:
if error := dataset.delete_snapshot(snapshot):
error_message = f"{dataset.name}@{snapshot} failed to delete: {error}"
signal_alert(error_message)
logger.error(error_message)
def get_time_stamp() -> str:
"""Get the time stamp."""
now = utcnow()
nearest_15_min = now.replace(minute=(now.minute - (now.minute % 15)))
return nearest_15_min.strftime("auto_%Y%m%d%H%M")
def cli() -> None:
"""CLI."""
typer.run(main)
if __name__ == "__main__":
cli()

View File

@@ -1,11 +0,0 @@
"""init."""
from python.zfs.dataset import Dataset, Snapshot, get_datasets
from python.zfs.zpool import Zpool
__all__ = [
"Dataset",
"Snapshot",
"Zpool",
"get_datasets",
]

View File

@@ -1,214 +0,0 @@
"""dataset."""
from __future__ import annotations
import json
import logging
from datetime import UTC, datetime
from typing import Any
from python.common import bash_wrapper
logger = logging.getLogger(__name__)
def _zfs_list(zfs_list: str) -> dict[str, Any]:
"""Check the version of zfs."""
raw_zfs_list_data, _ = bash_wrapper(zfs_list)
zfs_list_data = json.loads(raw_zfs_list_data)
vers_major = zfs_list_data["output_version"]["vers_major"]
vers_minor = zfs_list_data["output_version"]["vers_minor"]
command = zfs_list_data["output_version"]["command"]
if vers_major != 0 or vers_minor != 1 or command != "zfs list":
error = f"Datasets are not in the correct format {vers_major=} {vers_minor=} {command=}"
raise RuntimeError(error)
return zfs_list_data
class Snapshot:
"""Snapshot."""
def __init__(self, snapshot_data: dict[str, Any]) -> None:
"""__init__."""
properties = snapshot_data["properties"]
self.createtxg = int(snapshot_data["createtxg"])
self.creation = datetime.fromtimestamp(int(properties["creation"]["value"]), tz=UTC)
self.defer_destroy = properties["defer_destroy"]["value"]
self.guid = int(properties["guid"]["value"])
self.name = snapshot_data["name"].split("@")[1]
self.objsetid = int(properties["objsetid"]["value"])
self.referenced = int(properties["referenced"]["value"])
self.used = int(properties["used"]["value"])
self.userrefs = int(properties["userrefs"]["value"])
self.version = int(properties["version"]["value"])
self.written = int(properties["written"]["value"])
def __repr__(self) -> str:
"""__repr__."""
return f"name={self.name} used={self.used} refer={self.referenced}"
class Dataset:
"""Dataset."""
def __init__(self, name: str) -> None:
"""__init__."""
dataset_data = _zfs_list(f"zfs list {name} -pHj -o all")
properties = dataset_data["datasets"][name]["properties"]
self.aclinherit = properties["aclinherit"]["value"]
self.aclmode = properties["aclmode"]["value"]
self.acltype = properties["acltype"]["value"]
self.available = int(properties["available"]["value"])
self.canmount = properties["canmount"]["value"]
self.checksum = properties["checksum"]["value"]
self.clones = properties["clones"]["value"]
self.compression = properties["compression"]["value"]
self.copies = int(properties["copies"]["value"])
self.createtxg = int(properties["createtxg"]["value"])
self.creation = datetime.fromtimestamp(int(properties["creation"]["value"]), tz=UTC)
self.dedup = properties["dedup"]["value"]
self.devices = properties["devices"]["value"]
self.encryption = properties["encryption"]["value"]
self.exec = properties["exec"]["value"]
self.filesystem_limit = properties["filesystem_limit"]["value"]
self.guid = int(properties["guid"]["value"])
self.keystatus = properties["keystatus"]["value"]
self.logbias = properties["logbias"]["value"]
self.mlslabel = properties["mlslabel"]["value"]
self.mounted = properties["mounted"]["value"]
self.mountpoint = properties["mountpoint"]["value"]
self.name = name
self.quota = int(properties["quota"]["value"])
self.readonly = properties["readonly"]["value"]
self.recordsize = int(properties["recordsize"]["value"])
self.redundant_metadata = properties["redundant_metadata"]["value"]
self.referenced = int(properties["referenced"]["value"])
self.refquota = int(properties["refquota"]["value"])
self.refreservation = int(properties["refreservation"]["value"])
self.reservation = int(properties["reservation"]["value"])
self.setuid = properties["setuid"]["value"]
self.sharenfs = properties["sharenfs"]["value"]
self.snapdir = properties["snapdir"]["value"]
self.snapshot_limit = properties["snapshot_limit"]["value"]
self.sync = properties["sync"]["value"]
self.used = int(properties["used"]["value"])
self.usedbychildren = int(properties["usedbychildren"]["value"])
self.usedbydataset = int(properties["usedbydataset"]["value"])
self.usedbysnapshots = int(properties["usedbysnapshots"]["value"])
self.version = int(properties["version"]["value"])
self.volmode = properties["volmode"]["value"]
self.volsize = properties["volsize"]["value"]
self.vscan = properties["vscan"]["value"]
self.written = int(properties["written"]["value"])
self.xattr = properties["xattr"]["value"]
def get_snapshots(self) -> list[Snapshot] | None:
"""Get all snapshots from zfs and process then is test dicts of sets."""
snapshots_data = _zfs_list(f"zfs list -t snapshot -pHj {self.name} -o all")
return [Snapshot(properties) for properties in snapshots_data["datasets"].values()]
def create_snapshot(self, snapshot_name: str) -> str:
"""Creates a zfs snapshot.
Args:
snapshot_name (str): a snapshot name
"""
logger.debug(f"Creating {self.name}@{snapshot_name}")
_, return_code = bash_wrapper(f"zfs snapshot {self.name}@{snapshot_name}")
if return_code == 0:
return "snapshot created"
if snapshots := self.get_snapshots():
snapshot_names = {snapshot.name for snapshot in snapshots}
if snapshot_name in snapshot_names:
return f"Snapshot {snapshot_name} already exists for {self.name}"
return f"Failed to create snapshot {snapshot_name} for {self.name}"
def delete_snapshot(self, snapshot_name: str) -> str | None:
"""Deletes a zfs snapshot.
Args:
snapshot_name (str): a snapshot name
"""
logger.debug(f"deleting {self.name}@{snapshot_name}")
msg, return_code = bash_wrapper(f"zfs destroy {self.name}@{snapshot_name}")
if return_code != 0:
if msg.startswith(f"cannot destroy '{self.name}@{snapshot_name}': snapshot has dependent clones"):
return "snapshot has dependent clones"
error = f"Failed to delete snapshot {snapshot_name=} for {self.name}"
raise RuntimeError(error)
return None
def __repr__(self) -> str:
"""__repr__."""
return (
f"{self.aclinherit=}\n"
f"{self.aclmode=}\n"
f"{self.acltype=}\n"
f"{self.available=}\n"
f"{self.canmount=}\n"
f"{self.checksum=}\n"
f"{self.clones=}\n"
f"{self.compression=}\n"
f"{self.copies=}\n"
f"{self.createtxg=}\n"
f"{self.creation=}\n"
f"{self.dedup=}\n"
f"{self.devices=}\n"
f"{self.encryption=}\n"
f"{self.exec=}\n"
f"{self.filesystem_limit=}\n"
f"{self.guid=}\n"
f"{self.keystatus=}\n"
f"{self.logbias=}\n"
f"{self.mlslabel=}\n"
f"{self.mounted=}\n"
f"{self.mountpoint=}\n"
f"{self.name=}\n"
f"{self.quota=}\n"
f"{self.readonly=}\n"
f"{self.recordsize=}\n"
f"{self.redundant_metadata=}\n"
f"{self.referenced=}\n"
f"{self.refquota=}\n"
f"{self.refreservation=}\n"
f"{self.reservation=}\n"
f"{self.setuid=}\n"
f"{self.sharenfs=}\n"
f"{self.snapdir=}\n"
f"{self.snapshot_limit=}\n"
f"{self.sync=}\n"
f"{self.used=}\n"
f"{self.usedbychildren=}\n"
f"{self.usedbydataset=}\n"
f"{self.usedbysnapshots=}\n"
f"{self.version=}\n"
f"{self.volmode=}\n"
f"{self.volsize=}\n"
f"{self.vscan=}\n"
f"{self.written=}\n"
f"{self.xattr=}\n"
)
def get_datasets() -> list[Dataset]:
"""Get zfs list.
Returns:
list[Dataset]: A list of zfs datasets.
"""
logger.info("Getting zfs list")
dataset_names, _ = bash_wrapper("zfs list -Hp -t filesystem -o name")
cleaned_datasets = dataset_names.strip().split("\n")
return [Dataset(dataset_name) for dataset_name in cleaned_datasets if "/" in dataset_name]

View File

@@ -1,86 +0,0 @@
"""test."""
from __future__ import annotations
import json
from typing import Any
from python.common import bash_wrapper
def _zpool_list(zfs_list: str) -> dict[str, Any]:
"""Check the version of zfs."""
raw_zfs_list_data, _ = bash_wrapper(zfs_list)
zfs_list_data = json.loads(raw_zfs_list_data)
vers_major = zfs_list_data["output_version"]["vers_major"]
vers_minor = zfs_list_data["output_version"]["vers_minor"]
command = zfs_list_data["output_version"]["command"]
if vers_major != 0 or vers_minor != 1 or command != "zpool list":
error = f"Datasets are not in the correct format {vers_major=} {vers_minor=} {command=}"
raise RuntimeError(error)
return zfs_list_data
class Zpool:
"""Zpool."""
def __init__(
self,
name: str,
) -> None:
"""__init__."""
zpool_data = _zpool_list(f"zpool list {name} -pHj -o all")
properties = zpool_data["pools"][name]["properties"]
self.name = name
self.allocated = int(properties["allocated"]["value"])
self.altroot = properties["altroot"]["value"]
self.ashift = int(properties["ashift"]["value"])
self.autoexpand = properties["autoexpand"]["value"]
self.autoreplace = properties["autoreplace"]["value"]
self.autotrim = properties["autotrim"]["value"]
self.capacity = int(properties["capacity"]["value"])
self.comment = properties["comment"]["value"]
self.dedupratio = properties["dedupratio"]["value"]
self.delegation = properties["delegation"]["value"]
self.expandsize = properties["expandsize"]["value"]
self.failmode = properties["failmode"]["value"]
self.fragmentation = int(properties["fragmentation"]["value"])
self.free = properties["free"]["value"]
self.freeing = int(properties["freeing"]["value"])
self.guid = int(properties["guid"]["value"])
self.health = properties["health"]["value"]
self.leaked = int(properties["leaked"]["value"])
self.readonly = properties["readonly"]["value"]
self.size = int(properties["size"]["value"])
def __repr__(self) -> str:
"""__repr__."""
return (
f"{self.name=}\n"
f"{self.allocated=}\n"
f"{self.altroot=}\n"
f"{self.ashift=}\n"
f"{self.autoexpand=}\n"
f"{self.autoreplace=}\n"
f"{self.autotrim=}\n"
f"{self.capacity=}\n"
f"{self.comment=}\n"
f"{self.dedupratio=}\n"
f"{self.delegation=}\n"
f"{self.expandsize=}\n"
f"{self.failmode=}\n"
f"{self.fragmentation=}\n"
f"{self.freeing=}\n"
f"{self.guid=}\n"
f"{self.health=}\n"
f"{self.leaked=}\n"
f"{self.readonly=}\n"
f"{self.size=}"
)

View File

@@ -6,14 +6,13 @@
default = pkgs.mkShell {
NIX_CONFIG = "extra-experimental-features = nix-command flakes ca-derivations";
nativeBuildInputs = with pkgs; [
age
busybox
git
gnupg
home-manager
my_python
nix
home-manager
git
ssh-to-age
gnupg
age
];
};
}

View File

@@ -1,21 +1,21 @@
{ inputs, ... }:
{
imports = [
"${inputs.self}/users/richie"
"${inputs.self}/users/gaming"
"${inputs.self}/common/global"
"${inputs.self}/common/optional/desktop.nix"
"${inputs.self}/common/optional/docker.nix"
"${inputs.self}/common/optional/scanner.nix"
"${inputs.self}/common/optional/steam.nix"
"${inputs.self}/common/optional/syncthing_base.nix"
"${inputs.self}/common/optional/systemd-boot.nix"
"${inputs.self}/common/optional/update.nix"
"${inputs.self}/common/optional/yubikey.nix"
"${inputs.self}/common/optional/zerotier.nix"
"${inputs.self}/common/optional/nvidia.nix"
../../users/richie
../../users/gaming
../../common/global
../../common/optional/desktop.nix
../../common/optional/docker.nix
../../common/optional/scanner.nix
../../common/optional/steam.nix
../../common/optional/syncthing_base.nix
../../common/optional/systemd-boot.nix
../../common/optional/update.nix
../../common/optional/yubikey.nix
../../common/optional/zerotier.nix
../../common/optional/nvidia.nix
./hardware.nix
./syncthing.nix
./games.nix
./llms.nix
];

View File

@@ -1,7 +1,7 @@
{ pkgs, ... }:
{
environment.systemPackages = with pkgs; [
filebot
docker-compose
osu-lazer-bin
jellyfin-media-player
];
}

View File

@@ -16,6 +16,7 @@
"Qihoo360-Light-R1-32B"
];
models = "/zfs/models";
acceleration = "cuda";
openFirewall = true;
};
# open-webui = {

View File

@@ -3,7 +3,6 @@
"dotfiles" = {
path = "/home/richie/dotfiles";
devices = [
"brain"
"jeeves"
"rhapsody-in-green"
];
@@ -13,9 +12,8 @@
id = "4ckma-gtshs"; # cspell:disable-line
path = "/home/richie/important";
devices = [
"brain"
"jeeves"
"phone"
"jeeves"
"rhapsody-in-green"
];
fsWatcherEnabled = true;

View File

@@ -1,39 +0,0 @@
{ inputs, ... }:
{
imports = [
"${inputs.self}/users/richie"
"${inputs.self}/common/global"
"${inputs.self}/common/optional/docker.nix"
"${inputs.self}/common/optional/ssh_decrypt.nix"
"${inputs.self}/common/optional/syncthing_base.nix"
"${inputs.self}/common/optional/systemd-boot.nix"
"${inputs.self}/common/optional/update.nix"
"${inputs.self}/common/optional/zerotier.nix"
./docker
./hardware.nix
./programs.nix
./services
./syncthing.nix
inputs.nixos-hardware.nixosModules.framework-11th-gen-intel
];
networking = {
hostName = "brain";
hostId = "93a06c6e";
firewall.enable = true;
networkmanager.enable = true;
};
hardware.bluetooth = {
enable = true;
powerOnBoot = true;
};
services = {
openssh.ports = [ 129 ];
smartd.enable = true;
};
system.stateVersion = "25.05";
}

View File

@@ -1,11 +0,0 @@
{ lib, ... }:
{
imports =
let
files = builtins.attrNames (builtins.readDir ./.);
nixFiles = builtins.filter (name: lib.hasSuffix ".nix" name && name != "default.nix") files;
in
map (file: ./. + "/${file}") nixFiles;
virtualisation.oci-containers.backend = "docker";
}

View File

@@ -1,3 +0,0 @@
# docker_networks
docker network create -d bridge web

View File

@@ -1,71 +0,0 @@
{
config,
lib,
modulesPath,
...
}:
{
imports = [ (modulesPath + "/installer/scan/not-detected.nix") ];
boot = {
initrd = {
availableKernelModules = [
"ahci"
"ehci_pci"
"nvme"
"sd_mod"
"uas"
"usb_storage"
"usbhid"
"xhci_pci"
];
kernelModules = [ ];
luks.devices."luks-root-pool-nvme-Samsung_SSD_990_PRO_2TB_S7KHNJ0Y121613P-part2" = {
device = "/dev/disk/by-id/nvme-Samsung_SSD_990_PRO_2TB_S7KHNJ0Y121613P-part2";
bypassWorkqueues = true;
allowDiscards = true;
keyFileSize = 4096;
keyFile = "/dev/disk/by-id/usb-USB_SanDisk_3.2Gen1_03021630090925173333-0:0";
fallbackToPassword = true;
};
};
kernelModules = [ "kvm-intel" ];
extraModulePackages = [ ];
};
fileSystems = {
"/" = lib.mkDefault {
device = "root_pool/root";
fsType = "zfs";
};
"/home" = {
device = "root_pool/home";
fsType = "zfs";
};
"/var" = {
device = "root_pool/var";
fsType = "zfs";
};
"/nix" = {
device = "root_pool/nix";
fsType = "zfs";
};
"/boot" = {
device = "/dev/disk/by-uuid/12CE-A600";
fsType = "vfat";
options = [
"fmask=0077"
"dmask=0077"
];
};
};
swapDevices = [ ];
nixpkgs.hostPlatform = lib.mkDefault "x86_64-linux";
hardware.cpu.intel.updateMicrocode = lib.mkDefault config.hardware.enableRedistributableFirmware;
}

View File

@@ -1,9 +0,0 @@
{ lib, ... }:
{
imports =
let
files = builtins.attrNames (builtins.readDir ./.);
nixFiles = builtins.filter (name: lib.hasSuffix ".nix" name && name != "default.nix") files;
in
map (file: ./. + "/${file}") nixFiles;
}

View File

@@ -1,86 +0,0 @@
{
users = {
users.hass = {
isSystemUser = true;
group = "hass";
};
groups.hass = { };
};
services = {
home-assistant = {
enable = true;
openFirewall = true;
config = {
http = {
server_port = 8123;
server_host = [
"192.168.90.35"
"192.168.95.35"
"127.0.0.1"
];
};
homeassistant = {
time_zone = "America/New_York";
unit_system = "us_customary";
temperature_unit = "F";
packages = {
victron_modbuss = "!include ${./home_assistant/victron_modbuss.yaml}";
battery_sensors = "!include ${./home_assistant/battery_sensors.yaml}";
};
};
recorder = {
db_url = "postgresql://@/hass";
auto_purge = true;
purge_keep_days = 3650;
db_retry_wait = 15;
};
assist_pipeline = { };
backup = { };
bluetooth = { };
config = { };
dhcp = { };
energy = { };
history = { };
homeassistant_alerts = { };
image_upload = { };
logbook = { };
media_source = { };
mobile_app = { };
ssdp = { };
sun = { };
webhook = { };
cloud = { };
zeroconf = { };
automation = "!include automations.yaml";
script = "!include scripts.yaml";
scene = "!include scenes.yaml";
group = "!include groups.yaml";
};
extraPackages =
python3Packages: with python3Packages; [
aioesphomeapi # for esphome
aiounifi # for ubiquiti integration
bleak-esphome # for esphome
esphome-dashboard-api # for esphome
forecast-solar # for solar forecast
gtts # not sure what wants this
ical # for todo
jellyfin-apiclient-python # for jellyfin
paho-mqtt # for mqtt
psycopg2 # for postgresql
py-improv-ble-client # for esphome
pymodbus # for modbus
pyopenweathermap # for weather
uiprotect # for ubiquiti integration
unifi-discovery # for ubiquiti integration
];
extraComponents = [ "isal" ];
};
esphome = {
enable = true;
openFirewall = true;
address = "192.168.90.35";
};
};
}

View File

@@ -1,61 +0,0 @@
sensor:
# Battery 0
- platform: integration
source: sensor.batteries_jk0_charging_power
name: "JK0 energy in"
unique_id: jk0_energy_in_kwh
unit_prefix: k
method: trapezoidal
round: 3
max_sub_interval:
minutes: 5
- platform: integration
source: sensor.batteries_jk0_discharging_power
name: "JK0 energy out"
unique_id: jk0_energy_out_kwh
unit_prefix: k
method: trapezoidal
round: 3
max_sub_interval:
minutes: 5
# Battery 1
- platform: integration
source: sensor.battery1_jk1_charging_power
name: "JK1 energy in"
unique_id: jk1_energy_in_kwh
unit_prefix: k
method: trapezoidal
round: 3
max_sub_interval:
minutes: 5
- platform: integration
source: sensor.battery1_jk1_discharging_power
name: "JK1 energy out"
unique_id: jk1_energy_out_kwh
unit_prefix: k
method: trapezoidal
round: 3
max_sub_interval:
minutes: 5
utility_meter:
# Battery 0
jk0_energy_in_daily:
source: sensor.jk0_energy_in
name: "JK0 Energy In Daily"
cycle: daily
jk0_energy_out_daily:
source: sensor.jk0_energy_out
name: "JK0 Energy Out Daily"
cycle: daily
# Battery 1
jk1_energy_in_daily:
source: sensor.jk1_energy_in
name: "JK1 Energy In Daily"
cycle: daily
jk1_energy_out_daily:
source: sensor.jk1_energy_out
name: "JK1 Energy Out Daily"
cycle: daily

View File

@@ -1,347 +0,0 @@
modbus:
- name: victron_gx
type: tcp
host: 192.168.103.30
port: 502
timeout: 3
delay: 2
sensors:
# ---- SOLAR CHARGER (Unit ID 226) ----
- name: Solar Voltage
slave: 226
address: 776
input_type: holding
data_type: uint16
scale: 0.01
precision: 2
unit_of_measurement: "V"
device_class: voltage
state_class: measurement
- name: Solar Amperage
slave: 226
address: 777
input_type: holding
data_type: int16
scale: 0.1
precision: 1
unit_of_measurement: "A"
device_class: current
state_class: measurement
- name: Solar Wattage
slave: 226
address: 789
input_type: holding
data_type: uint16
scale: 0.1
unit_of_measurement: "W"
device_class: power
state_class: measurement
- name: Solar Yield Today
slave: 226
address: 784
input_type: holding
data_type: uint16
scale: 0.1
precision: 3
unit_of_measurement: "kWh"
device_class: energy
state_class: total
# DC system
- name: DC Voltage
slave: 100
address: 840
input_type: holding
data_type: uint16
scale: 0.1
precision: 2
unit_of_measurement: "V"
device_class: voltage
state_class: measurement
unique_id: dc_voltage
- name: DC Wattage
slave: 100
address: 860
input_type: holding
data_type: int16
scale: 1
precision: 0
unit_of_measurement: "W"
device_class: power
state_class: measurement
unique_id: dc_wattage
# GPS
- name: GPS Latitude
slave: 1
address: 2800
input_type: holding
data_type: int32
scale: 0.0000001
precision: 7
state_class: measurement
unique_id: gps_latitude
- name: GPS Longitude
slave: 1
address: 2802
input_type: holding
data_type: int32
scale: 0.0000001
precision: 7
state_class: measurement
unique_id: gps_longitude
- name: GPS Course
slave: 1
address: 2804
input_type: holding
data_type: uint16
scale: 0.01
precision: 2
unit_of_measurement: "°"
state_class: measurement
unique_id: gps_course
- name: GPS Speed
slave: 1
address: 2805
input_type: holding
data_type: uint16
scale: 0.01
precision: 2
unit_of_measurement: "m/s"
state_class: measurement
unique_id: gps_speed
- name: GPS Fix
slave: 1
address: 2806
input_type: holding
data_type: uint16
scale: 1
state_class: measurement
unique_id: gps_fix
- name: GPS Satellites
slave: 1
address: 2807
input_type: holding
data_type: uint16
scale: 1
state_class: measurement
unique_id: gps_satellites
- name: GPS Altitude
slave: 1
address: 2808
input_type: holding
data_type: int32
scale: 0.16
precision: 1
unit_of_measurement: "m"
state_class: measurement
unique_id: gps_altitude
# ---- CHARGER (Unit ID 223) ----
- name: Charger Output 1 Voltage
slave: 223
address: 2307
input_type: holding
data_type: uint16
scale: 0.01
precision: 2
unit_of_measurement: "V"
device_class: voltage
state_class: measurement
unique_id: charger_output_1_voltage
- name: Charger Output 1 Current
slave: 223
address: 2308
input_type: holding
data_type: int16
scale: 0.1
precision: 1
unit_of_measurement: "A"
device_class: current
state_class: measurement
unique_id: charger_output_1_current
- name: Charger Output 1 Temperature
slave: 223
address: 2309
input_type: holding
data_type: int16
scale: 0.1
precision: 1
unit_of_measurement: "°C"
device_class: temperature
state_class: measurement
unique_id: charger_output_1_temperature
- name: Charger AC Current
slave: 223
address: 2314
input_type: holding
data_type: int16
scale: 0.1
precision: 1
unit_of_measurement: "A"
device_class: current
state_class: measurement
unique_id: charger_ac_current
- name: Charger AC Current Limit
slave: 223
address: 2316
input_type: holding
data_type: int16
scale: 0.1
precision: 1
unit_of_measurement: "A"
device_class: current
state_class: measurement
unique_id: charger_ac_current_limit
- name: Charger On Off Raw
slave: 223
address: 2317
input_type: holding
data_type: uint16
scale: 1
state_class: measurement
unique_id: charger_on_off_raw
- name: Charger Charge State Raw
slave: 223
address: 2318
input_type: holding
data_type: uint16
scale: 1
state_class: measurement
unique_id: charger_charge_state_raw
- name: Charger Error Code
slave: 223
address: 2319
input_type: holding
data_type: uint16
scale: 1
state_class: measurement
unique_id: charger_error_code
- name: Charger Relay State
slave: 223
address: 2320
input_type: holding
data_type: uint16
scale: 1
state_class: measurement
unique_id: charger_relay_state
- name: Charger Low Voltage Alarm
slave: 223
address: 2321
input_type: holding
data_type: uint16
scale: 1
state_class: measurement
unique_id: charger_low_voltage_alarm
- name: Charger High Voltage Alarm
slave: 223
address: 2322
input_type: holding
data_type: uint16
scale: 1
state_class: measurement
unique_id: charger_high_voltage_alarm
template:
- sensor:
- name: Charger On Off
state: >-
{% set v = states('sensor.charger_on_off_raw')|int %}
{{ {0:'Off',1:'On',2:'Error',3:'Unavailable'}.get(v, 'Unknown') }}
- name: Charger Charge State
state: >-
{% set v = states('sensor.charger_charge_state_raw')|int %}
{{ {
0:'Off',1:'Low Power',2:'Fault',3:'Bulk',4:'Absorption',5:'Float',
6:'Storage',7:'Equalize/Manual',8:'External Control'
}.get(v,'Unknown') }}
- name: "Charger DC Wattage"
unique_id: charger_dc_wattage
unit_of_measurement: "W"
device_class: power
state_class: measurement
state: >-
{% set v = states('sensor.charger_output_1_voltage')|float(0) %}
{% set a = states('sensor.charger_output_1_current')|float(0) %}
{{ (v * a) | round(1) }}
- binary_sensor:
- name: Charger Low Voltage Alarm Active
state: "{{ states('sensor.charger_low_voltage_alarm')|int == 2 }}"
- name: Charger High Voltage Alarm Active
state: "{{ states('sensor.charger_high_voltage_alarm')|int == 2 }}"
sensor:
- platform: integration
source: sensor.dc_wattage
name: DC System Energy
unit_prefix: k
round: 2
method: trapezoidal
max_sub_interval:
minutes: 5
- platform: integration
source: sensor.solar_wattage
name: Solar Yield
unit_prefix: k
round: 2
method: trapezoidal
max_sub_interval:
minutes: 5
- platform: integration
source: sensor.charger_dc_wattage
name: DC Charger Energy
unit_prefix: k
round: 2
method: trapezoidal
max_sub_interval:
minutes: 5
utility_meter:
dc_load_energy_daily:
source: sensor.dc_system_energy
cycle: daily
dc_load_energy_monthly:
source: sensor.dc_system_energy
cycle: monthly
solar_yield_daily:
source: sensor.solar_yield
cycle: daily
solar_yield_monthly:
source: sensor.solar_yield
cycle: monthly
charger_dc_wattage_daily:
source: sensor.dc_charger_energy
cycle: daily
charger_dc_wattage_monthly:
source: sensor.dc_charger_energy
cycle: monthly

View File

@@ -1,6 +0,0 @@
{
services.jellyfin = {
enable = true;
openFirewall = true;
};
}

View File

@@ -1,151 +0,0 @@
{ pkgs, ... }:
{
networking.firewall.allowedTCPPorts = [ 5432 ];
services.postgresql = {
enable = true;
package = pkgs.postgresql_17_jit;
enableTCPIP = true;
enableJIT = true;
authentication = pkgs.lib.mkOverride 10 ''
# admins
local all postgres trust
host all postgres 127.0.0.1/32 trust
host all postgres ::1/128 trust
local all richie trust
host all richie 127.0.0.1/32 trust
host all richie ::1/128 trust
host all richie 192.168.90.1/24 trust
host all richie 192.168.99.1/24 trust
#type database DBuser origin-address auth-method
local hass hass trust
# ipv4
host hass hass 192.168.90.1/24 trust
host hass hass 127.0.0.1/32 trust
# ipv6
host hass hass ::1/128 trust
'';
identMap = ''
# ArbitraryMapName systemUser DBUser
superuser_map root postgres
superuser_map postgres postgres
# Let other names login as themselves
superuser_map richie postgres
superuser_map hass hass
'';
ensureUsers = [
{
name = "postgres";
ensureClauses = {
superuser = true;
login = true;
createrole = true;
createdb = true;
replication = true;
};
}
{
name = "richie";
ensureDBOwnership = true;
ensureClauses = {
superuser = true;
login = true;
createrole = true;
createdb = true;
replication = true;
};
}
{
name = "hass";
ensureDBOwnership = true;
ensureClauses = {
login = true;
createrole = true;
createdb = true;
replication = true;
};
}
];
ensureDatabases = [
"hass"
"richie"
];
# Thank you NotAShelf
# https://github.com/NotAShelf/nyx/blob/d407b4d6e5ab7f60350af61a3d73a62a5e9ac660/modules/core/roles/server/system/services/databases/postgresql.nix#L74
settings = {
# Connectivity;
max_connections = 100;
superuser_reserved_connections = 3;
# Memory Settings;
shared_buffers = "1024 MB";
work_mem = "32 MB";
maintenance_work_mem = "320 MB";
huge_pages = "off";
effective_cache_size = "2 GB";
effective_io_concurrency = 100; # concurrent IO only really activated if OS supports posix_fadvise function;
random_page_cost = 1.25; # speed of random disk access relative to sequential access (1.0);
# Monitoring;
shared_preload_libraries = "pg_stat_statements,auto_explain"; # per statement resource usage stats & log explain statements for slow queries
track_io_timing = "on"; # measure exact block IO times;
track_functions = "pl"; # track execution times of pl-language procedures if any;
# Replication;
wal_level = "replica"; # consider using at least "replica";
max_wal_senders = 0;
synchronous_commit = "on";
# Checkpointing: ;
checkpoint_timeout = "15 min";
checkpoint_completion_target = 0.9;
max_wal_size = "1024 MB";
min_wal_size = "512 MB";
# WAL writing;
wal_compression = "on";
wal_buffers = -1; # auto-tuned by Postgres till maximum of segment size (16MB by default);
wal_writer_delay = "200ms";
wal_writer_flush_after = "1MB";
# Background writer;
bgwriter_delay = "200ms";
bgwriter_lru_maxpages = 100;
bgwriter_lru_multiplier = 2.0;
bgwriter_flush_after = 0;
# Parallel queries: ;
max_worker_processes = 6;
max_parallel_workers_per_gather = 3;
max_parallel_maintenance_workers = 3;
max_parallel_workers = 6;
parallel_leader_participation = "on";
# Advanced features ;
enable_partitionwise_join = "on";
enable_partitionwise_aggregate = "on";
jit = "on";
jit_above_cost = 100000;
jit_inline_above_cost = 150000;
jit_optimize_above_cost = 500000;
# log slow queries
log_min_duration_statement = 100;
"auto_explain.log_min_duration" = 100;
# logging configuration
log_connections = true;
log_statement = "ddl";
logging_collector = true;
log_disconnections = true;
log_rotation_age = "14d";
};
};
}

View File

@@ -1,30 +0,0 @@
{
networking.firewall.allowedTCPPorts = [ 8384 ];
services.syncthing = {
overrideFolders = false;
guiAddress = "192.168.90.35:8384";
settings = {
"dotfiles" = {
path = "/home/richie/dotfiles";
devices = [
"bob"
"jeeves"
"rhapsody-in-green"
];
fsWatcherEnabled = true;
};
"important" = {
id = "4ckma-gtshs"; # cspell:disable-line
path = "/home/richie/important";
devices = [
"bob"
"jeeves"
"phone"
"rhapsody-in-green"
];
fsWatcherEnabled = true;
};
};
};
}

View File

@@ -1,22 +1,16 @@
{ inputs, ... }:
let
vars = import ./vars.nix;
in
{
imports = [
"${inputs.self}/users/richie"
"${inputs.self}/users/math"
"${inputs.self}/users/dov"
"${inputs.self}/common/global"
"${inputs.self}/common/optional/docker.nix"
"${inputs.self}/common/optional/ssh_decrypt.nix"
"${inputs.self}/common/optional/syncthing_base.nix"
"${inputs.self}/common/optional/update.nix"
"${inputs.self}/common/optional/zerotier.nix"
../../users/richie
../../common/global
../../common/optional/docker.nix
../../common/optional/ssh_decrypt.nix
../../common/optional/syncthing_base.nix
../../common/optional/zerotier.nix
./docker
./services
./hardware.nix
./networking.nix
./nvidia.nix
./programs.nix
./runners
./syncthing.nix
@@ -27,12 +21,7 @@ in
smartd.enable = true;
snapshot_manager = {
path = ./snapshot_config.toml;
EnvironmentFile = "${vars.secrets}/services/snapshot_manager";
};
zerotierone.joinNetworks = [ "a09acf02330d37b9" ];
snapshot_manager.path = ./snapshot_config.toml;
};
system.stateVersion = "24.05";

View File

@@ -1,6 +1,7 @@
{
config,
pkgs,
lib,
...
}:

View File

@@ -0,0 +1,47 @@
let
vars = import ../vars.nix;
in
{
networking.firewall = {
allowedTCPPorts = [
6882
8081
8118
];
allowedUDPPorts = [ 6882 ];
};
virtualisation.oci-containers.containers.qbitvpn = {
image = "binhex/arch-qbittorrentvpn:5.0.3-1-01";
devices = [ "/dev/net/tun:/dev/net/tun" ];
extraOptions = [ "--cap-add=NET_ADMIN" ];
ports = [
"6882:6881"
"6882:6881/udp"
"8081:8081"
"8118:8118"
];
volumes = [
"${vars.docker_configs}/qbitvpn:/config"
"${vars.qbitvpn}:/data"
"${vars.qbitvpn_scratch}:/data/incomplete"
"/etc/localtime:/etc/localtime:ro"
];
environment = {
WEBUI_PORT = "8081";
PUID = "600";
PGID = "100";
VPN_ENABLED = "yes";
VPN_CLIENT = "openvpn";
STRICT_PORT_FORWARD = "yes";
ENABLE_PRIVOXY = "yes";
LAN_NETWORK = "192.168.90.0/24";
NAME_SERVERS = "1.1.1.1,1.0.0.1";
UMASK = "000";
DEBUG = "false";
DELUGE_DAEMON_LOG_LEVEL = "debug";
DELUGE_WEB_LOG_LEVEL = "debug";
};
environmentFiles = [ "${vars.secrets}/docker/qbitvpn" ];
autoStart = true;
};
}

View File

@@ -1,21 +0,0 @@
let
vars = import ../vars.nix;
in
{
networking.firewall.allowedTCPPorts = [
8989
];
virtualisation.oci-containers.containers.signal_cli_rest_api = {
image = "bbernhard/signal-cli-rest-api:latest";
ports = [
"8989:8080"
];
volumes = [
"${vars.docker_configs}/signal-cli-config:/home/.local/share/signal-cli"
];
environment = {
MODE = "json-rpc";
};
autoStart = true;
};
}

View File

@@ -71,6 +71,12 @@ in
"luks-media_pool-nvme-INTEL_SSDPE2ME012T4_CVMD5130000U1P2HGN-part1" =
makeLuksSSD "/dev/disk/by-id/nvme-INTEL_SSDPE2ME012T4_CVMD5130000U1P2HGN-part1";
# Scratch pool
"luks-scratch-pool-ata-CT480BX500SSD1_2314E6C3C01C-part1" =
makeLuksSSD "/dev/disk/by-id/ata-CT480BX500SSD1_2314E6C3C01C-part1";
"luks-scratch-pool-ata-CT480BX500SSD1_2314E6C3C01E-part1" =
makeLuksSSD "/dev/disk/by-id/ata-CT480BX500SSD1_2314E6C3C01E-part1";
# Storage pool
"luks-storage_pool-nvme-Samsung_SSD_970_EVO_Plus_2TB_S6S2NS0T834822N-part1" =
makeLuksSSD "/dev/disk/by-id/nvme-Samsung_SSD_970_EVO_Plus_2TB_S6S2NS0T834822N-part1";

View File

@@ -2,71 +2,31 @@
networking = {
hostName = "jeeves";
hostId = "0e15ce35";
firewall = {
enable = true;
interfaces.br-nix-builder = {
allowedTCPPorts = [ ];
allowedUDPPorts = [ ];
};
};
firewall.enable = true;
useNetworkd = true;
};
systemd.network = {
enable = true;
wait-online = {
enable = false;
anyInterface = true;
};
netdevs = {
"20-br-nix-builder" = {
netdevConfig = {
Kind = "bridge";
Name = "br-nix-builder";
};
};
"30-internet-vlan" = {
netdevConfig = {
Kind = "vlan";
Name = "internet-vlan";
};
vlanConfig.Id = 100;
};
};
networks = {
"10-1GB_Primary" = {
matchConfig.Name = "enp97s0f1";
matchConfig.Name = "enp98s0f0";
address = [ "192.168.99.14/24" ];
routes = [ { Gateway = "192.168.99.1"; } ];
vlan = [ "internet-vlan" ];
linkConfig.RequiredForOnline = "routable";
};
"50-internet-vlan" = {
matchConfig.Name = "internet-vlan";
bridge = [ "br-nix-builder" ];
linkConfig.RequiredForOnline = "no";
"10-1GB_Secondary" = {
matchConfig.Name = "enp98s0f1";
DHCP = "yes";
};
"60-br-nix-builder" = {
matchConfig.Name = "br-nix-builder";
bridgeConfig = { };
address = [ "192.168.3.10/24" ];
routingPolicyRules = [
{
From = "192.168.3.0/24";
Table = 100;
Priority = 100;
}
];
routes = [
{
Gateway = "192.168.3.1";
Table = 100;
GatewayOnLink = false;
Metric = 2048;
PreferredSource = "192.168.3.10";
}
];
linkConfig.RequiredForOnline = "no";
"10-10GB_Primary" = {
matchConfig.Name = "enp97s0f0np0";
DHCP = "yes";
linkConfig.RequiredForOnline = "routable";
};
"10-10GB_Secondary" = {
matchConfig.Name = "enp97s0f1np1";
DHCP = "yes";
};
};
};

16
systems/jeeves/nvidia.nix Normal file
View File

@@ -0,0 +1,16 @@
{ config, ... }:
{
nixpkgs.config.cudaSupport = true;
services.xserver.videoDrivers = [ "nvidia" ];
hardware = {
nvidia = {
modesetting.enable = true;
powerManagement.enable = true;
package = config.boot.kernelPackages.nvidiaPackages.beta;
nvidiaSettings = true;
open = false;
};
nvidia-container-toolkit.enable = true;
};
}

View File

@@ -16,20 +16,11 @@
};
services.nix_builder.containers = {
nix-builder-00.enable = true;
nix-builder-01.enable = true;
nix-builder-02.enable = true;
nix-builder-03.enable = true;
nix-builder-04.enable = true;
nix-builder-05.enable = true;
nix-builder-06.enable = true;
nix-builder-07.enable = true;
nix-builder-08.enable = true;
nix-builder-09.enable = true;
nix-builder-10.enable = true;
nix-builder-11.enable = true;
nix-builder-12.enable = true;
nix-builder-13.enable = true;
nix-builder-14.enable = true;
nix-builder-0.enable = true;
nix-builder-1.enable = true;
nix-builder-2.enable = true;
nix-builder-3.enable = true;
nix-builder-4.enable = true;
nix-builder-5.enable = true;
};
}

View File

@@ -1,24 +1,12 @@
{
config,
lib,
outputs,
...
}:
{ config, lib, ... }:
with lib;
let
vars = import ../vars.nix;
cfg = config.services.nix_builder;
in
{
options.services.nix_builder = {
bridgeName = mkOption {
type = types.str;
default = "br-nix-builder";
description = "Bridge name for the builder containers.";
};
containers = mkOption {
options.services.nix_builder.containers = mkOption {
type = types.attrsOf (
types.submodule (
{ name, ... }:
@@ -30,32 +18,18 @@ in
default = { };
description = "GitHub runner container configurations";
};
};
config = {
containers = mapAttrs (
name: containerCfg:
mkIf containerCfg.enable {
config.containers = mapAttrs (
name: cfg:
mkIf cfg.enable {
autoStart = true;
privateNetwork = true;
hostBridge = cfg.bridgeName;
ephemeral = true;
bindMounts = {
storage = {
hostPath = "/zfs/media/github-runners/${name}";
"/storage" = {
mountPoint = "/zfs/media/github-runners/${name}";
isReadOnly = false;
};
host-nix = {
mountPoint = "/host-nix/var/nix/daemon-socket";
hostPath = "/nix/var/nix/daemon-socket";
isReadOnly = false;
};
pat = {
hostPath = "${vars.secrets}/services/github-runners/runner_pat";
mountPoint = "${vars.secrets}/services/github-runners/runner_pat";
isReadOnly = true;
};
"/secrets".mountPoint = "${vars.secrets}/services/github-runners/${name}";
"ssh-keys".mountPoint = "${vars.secrets}/services/github-runners/id_ed25519_github-runners";
};
config =
{
@@ -65,12 +39,6 @@ in
...
}:
{
networking = {
useDHCP = lib.mkDefault true;
interfaces.eth0.useDHCP = true;
# Ensure containers don't inherit the host's stub resolver (127.0.0.53) which was causing issues
useHostResolvConf = false;
};
nix.settings = {
trusted-substituters = [
"https://cache.nixos.org"
@@ -91,32 +59,30 @@ in
"flakes"
"nix-command"
];
sandbox = true;
allowed-users = [ "github-runners" ];
trusted-users = [
"root"
"github-runners"
];
};
nixpkgs = {
overlays = builtins.attrValues outputs.overlays;
config.allowUnfree = true;
};
programs.ssh.extraConfig = ''
Host jeeves
Port 629
User github-runners
HostName 192.168.99.14
IdentityFile ${vars.secrets}/services/github-runners/id_ed25519_github-runners
StrictHostKeyChecking no
UserKnownHostsFile /dev/null
'';
services.github-runners.${name} = {
enable = true;
replace = true;
workDir = "/zfs/media/github-runners/${name}";
url = "https://github.com/RichieCahill/dotfiles";
extraLabels = [ "nixos" ];
tokenFile = "${vars.secrets}/services/github-runners/runner_pat";
tokenFile = "${vars.secrets}/services/github-runners/${name}";
user = "github-runners";
group = "github-runners";
extraPackages = with pkgs; [
busybox
nixfmt-rfc-style
nixos-rebuild
openssh
treefmt
my_python
];
};
users = {
@@ -128,9 +94,8 @@ in
};
groups.github-runners.gid = 601;
};
system.stateVersion = "24.05";
system.stateVersion = "24.11";
};
}
) cfg.containers;
};
) config.services.nix_builder.containers;
}

View File

@@ -12,7 +12,7 @@ sudo zpool add storage -o ashift=12 special mirror
sudo zpool add storage -o ashift=12 logs mirror
# scratch
sudo zpool create scratch -o ashift=12 -O acltype=posixacl -O atime=off -O dnodesize=auto -O xattr=sa -O compression=zstd -O encryption=aes-256-gcm -O keyformat=hex -O keylocation=file:///key -m /zfs/scratch
sudo zpool create -o ashift=12 -O acltype=posixacl -O atime=off -O dnodesize=auto -O xattr=sa -O compression=zstd -m /zfs/scratch scratch
# media datasets
sudo zfs create -o compression=zstd-9 media/docker
@@ -25,8 +25,8 @@ sudo zfs create -o exec=off media/share
sudo zfs create -o recordsize=16k -o primarycache=metadata -o mountpoint=/zfs/media/database/postgres media/postgres
# scratch datasets
sudo zfs create scratch/kafka -o mountpoint=/zfs/scratch/kafka -o recordsize=1M
sudo zfs create scratch/transmission -o mountpoint=/zfs/scratch/transmission -o recordsize=16k -o sync=disabled
sudo zfs create -o recordsize=16k -o sync=disabled scratch/qbitvpn
sudo zfs create -o recordsize=16k -o sync=disabled scratch/transmission
# storage datasets
sudo zfs create -o recordsize=1M -o compression=zstd-19 storage/archive
@@ -38,4 +38,3 @@ sudo zfs create -o compression=zstd-19 storage/syncthing
sudo zfs create -o recordsize=1M -o compression=zstd-9 -o exec=off -o sync=disabled storage/qbitvpn
sudo zfs create -o recordsize=1M -o compression=zstd-9 -o exec=off -o sync=disabled storage/transmission
sudo zfs create -o recordsize=1M -o compression=zstd-19 storage/library
sudo zfs create -o recordsize=1M -o compression=zstd-19 -o sync=disabled storage/ollama

View File

@@ -0,0 +1,21 @@
{
pkgs,
...
}:
let
vars = import ../vars.nix;
in
{
systemd.services.filebrowser = {
description = "filebrowser";
after = [ "network.target" ];
wantedBy = [ "multi-user.target" ];
serviceConfig = {
Type = "simple";
User = "richie";
Group = "users";
ExecStart = "${pkgs.filebrowser}/bin/filebrowser --root=/zfs --address=0.0.0.0 --database=${vars.docker_configs}/filebrowser/filebrowser.db";
Restart = "on-failure";
};
};
}

View File

@@ -1,46 +0,0 @@
let
vars = import ../vars.nix;
in
{
networking.firewall.allowedTCPPorts = [ 6443 ];
services.gitea = {
enable = true;
appName = "TMM Workshop";
stateDir = "${vars.services}/gitea/";
lfs.enable = true;
database = {
type = "postgres";
name = "gitea";
user = "gitea";
socket = "/run/postgresql";
port = 5432;
createDatabase = false;
};
settings = {
service.DISABLE_REGISTRATION = true;
server = {
DOMAIN = "tmmworkshop.com";
ROOT_URL = "https://gitea.tmmworkshop.com/";
HTTP_PORT = 6443;
SSH_PORT = 2223;
SSH_LISTEN_PORT = 2224;
START_SSH_SERVER = true;
PUBLIC_URL_DETECTION = "auto";
};
repository = {
ENABLE_PUSH_CREATE_USER = true;
DEFAULT_MERGE_STYLE = "rebase-merge";
};
log = {
LEVEL = "Trace";
ENABLE_SSH_LOG = true;
};
};
};
systemd.services.gitea = {
requires = [ "docker.service" ];
after = [ "docker.service" ];
};
}

View File

@@ -27,21 +27,19 @@ frontend ContentSwitching
# tmmworkshop.com
acl host_audiobookshelf hdr(host) -i audiobookshelf.tmmworkshop.com
acl host_cache hdr(host) -i cache.tmmworkshop.com
acl host_filebrowser hdr(host) -i filebrowser.tmmworkshop.com
acl host_homeassistant hdr(host) -i homeassistant.tmmworkshop.com
acl host_jellyfin hdr(host) -i jellyfin.tmmworkshop.com
acl host_share hdr(host) -i share.tmmworkshop.com
acl host_gcw hdr(host) -i gcw.tmmworkshop.com
acl host_n8n hdr(host) -i n8n.tmmworkshop.com
acl host_gitea hdr(host) -i gitea.tmmworkshop.com
use_backend audiobookshelf_nodes if host_audiobookshelf
use_backend cache_nodes if host_cache
use_backend filebrowser_nodes if host_filebrowser
use_backend homeassistant_nodes if host_homeassistant
use_backend jellyfin if host_jellyfin
use_backend share_nodes if host_share
use_backend gcw_nodes if host_gcw
use_backend n8n if host_n8n
use_backend gitea if host_gitea
backend audiobookshelf_nodes
mode http
@@ -51,9 +49,13 @@ backend cache_nodes
mode http
server server 127.0.0.1:5000
backend filebrowser_nodes
mode http
server server 127.0.0.1:8080
backend homeassistant_nodes
mode http
server server 192.168.90.35:8123
server server 127.0.0.1:8123
backend jellyfin
option httpchk
@@ -69,11 +71,3 @@ backend share_nodes
backend gcw_nodes
mode http
server server 127.0.0.1:8092
backend n8n
mode http
server server 127.0.0.1:5678
backend gitea
mode http
server server 127.0.0.1:6443

View File

@@ -68,7 +68,7 @@ in
jellyfin-apiclient-python
psycopg2
pymetno
aio-ownet
pyownet
rokuecp
uiprotect
wakeonlan

View File

@@ -1,12 +0,0 @@
let
vars = import ../vars.nix;
in
{
services.apache-kafka = {
enable = false;
settings = {
listeners = [ "PLAINTEXT://localhost:9092" ];
"log.dirs" = [ vars.kafka ];
};
};
}

View File

@@ -1,38 +0,0 @@
let
vars = import ../vars.nix;
in
{
services = {
ollama = {
user = "ollama";
enable = true;
host = "0.0.0.0";
loadModels = [
"codellama:7b"
"deepseek-r1:14b"
"deepseek-r1:32b"
"deepseek-r1:8b"
"gemma3:12b"
"gemma3:27b"
"gpt-oss:120b"
"gpt-oss:20b"
"qwen3:14b"
"qwen3:30b"
];
models = vars.ollama;
openFirewall = true;
};
};
systemd.services = {
ollama.serviceConfig = {
Nice = 19;
IOSchedulingPriority = 7;
};
ollama-model-loader.serviceConfig = {
Nice = 19;
CPUWeight = 50;
IOSchedulingClass = "idle";
IOSchedulingPriority = 7;
};
};
}

View File

@@ -1,10 +1,8 @@
{ pkgs, ... }:
let
vars = import ../vars.nix;
in
{
services.nix-serve = {
package = pkgs.nix-serve-ng;
enable = true;
secretKeyFile = "${vars.secrets}/services/nix-cache/cache-priv-key.pem";
openFirewall = true;

View File

@@ -28,7 +28,13 @@ in
#type database DBuser origin-address auth-method
local hass hass trust
local gitea gitea trust
# ipv4
host hass hass 192.168.90.1/24 trust
host hass hass 127.0.0.1/32 trust
# ipv6
host hass hass ::1/128 trust
# megan
host megan megan 192.168.90.1/24 trust
@@ -42,12 +48,6 @@ in
host gcw gcw 192.168.90.1/24 trust
host gcw gcw 127.0.0.1/32 trust
# math
local postgres math trust
host postgres math 127.0.0.1/32 trust
host postgres math ::1/128 trust
host postgres math 192.168.90.1/24 trust
'';
identMap = ''
@@ -90,16 +90,6 @@ in
replication = true;
};
}
{
name = "gitea";
ensureDBOwnership = true;
ensureClauses = {
login = true;
createrole = true;
createdb = true;
replication = true;
};
}
{
name = "megan";
ensureDBOwnership = true;
@@ -120,26 +110,13 @@ in
replication = true;
};
}
{
name = "math";
ensureDBOwnership = true;
ensureClauses = {
login = true;
createrole = true;
createdb = true;
replication = true;
};
}
];
ensureDatabases = [
"gcw"
"hass"
"gitea"
"math"
"megan"
"mxr_dev"
"mxr_prod"
"n8n"
"richie"
];
# Thank you NotAShelf
@@ -207,10 +184,9 @@ in
# logging configuration
log_connections = true;
log_statement = "ddl";
log_statement = "all";
logging_collector = true;
log_disconnections = true;
log_rotation_age = "14d";
};
};
}

View File

@@ -1,6 +1,6 @@
{
pkgs,
inputs,
pkgs,
...
}:
let
@@ -22,13 +22,10 @@ in
wantedBy = [ "multi-user.target" ];
description = "validates startup";
path = [ pkgs.zfs ];
environment = {
PYTHONPATH = "${inputs.self}/";
};
serviceConfig = {
EnvironmentFile = "${vars.secrets}/services/server-validation";
Type = "oneshot";
ExecStart = "${pkgs.my_python}/bin/python -m python.system_tests.validate_system '${./validate_system.toml}'";
ExecStart = "${inputs.system_tools.packages.x86_64-linux.default}/bin/validate_system --config-file='${./validate_system.toml}'";
};
};
};

View File

@@ -3,7 +3,9 @@ services = [
"audiobookshelf",
"cloud_flare_tunnel",
"haproxy",
"docker-qbitvpn",
"docker",
"filebrowser",
"home-assistant",
"jellyfin",
]

View File

@@ -51,33 +51,3 @@ monthly = 12
hourly = 12
daily = 14
monthly = 2
["media/services"]
15_min = 3
hourly = 12
daily = 14
monthly = 2
["media/home_assistant"]
15_min = 3
hourly = 12
daily = 14
monthly = 2
["scratch/transmission"]
15_min = 0
hourly = 0
daily = 0
monthly = 0
["storage/transmission"]
15_min = 0
hourly = 0
daily = 0
monthly = 0
["storage/ollama"]
15_min = 0
hourly = 0
daily = 0
monthly = 0

Some files were not shown because too many files have changed in this diff Show More