Compare commits

..

1 Commits

Author SHA1 Message Date
0248bc320c update desktop.nix kernelPackages 2025-06-19 15:05:39 -04:00
131 changed files with 128 additions and 5598 deletions

View File

@@ -15,14 +15,12 @@ jobs:
matrix: matrix:
system: system:
- "bob" - "bob"
- "brain"
- "jeeves" - "jeeves"
- "leviathan"
- "rhapsody-in-green" - "rhapsody-in-green"
continue-on-error: true
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- name: Build default package - name: Build default package
run: "nixos-rebuild build --flake ./#${{ matrix.system }}" run: "nixos-rebuild build --flake ./#${{ matrix.system }}"
- name: copy to nix-cache - name: copy to nix-cache
run: nix copy --to ssh://jeeves .#nixosConfigurations.${{ matrix.system }}.config.system.build.toplevel run: nix copy --to ssh://jeeves .#nixosConfigurations.${{ matrix.system }}.config.system.build.toplevel

View File

@@ -1,29 +0,0 @@
name: merge_flake_lock_update
on:
workflow_dispatch:
schedule:
- cron: "0 2 * * 6"
jobs:
merge:
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: merge_flake_lock_update
run: |
pr_number=$(gh pr list --state open --author RichieCahill --label flake_lock_update --json number --jq '.[0].number')
echo "pr_number=$pr_number" >> $GITHUB_ENV
if [ -n "$pr_number" ]; then
gh pr merge "$pr_number" --rebase
else
echo "No open PR found with label flake_lock_update"
fi
env:
GITHUB_TOKEN: ${{ secrets.GH_TOKEN_FOR_UPDATES }}

View File

@@ -1,19 +0,0 @@
name: pytest
on:
push:
branches:
- main
pull_request:
branches:
- main
merge_group:
jobs:
pytest:
runs-on: self-hosted
steps:
- uses: actions/checkout@v4
- name: Run tests
run: pytest tests

View File

@@ -2,7 +2,7 @@ name: update-flake-lock
on: on:
workflow_dispatch: workflow_dispatch:
schedule: schedule:
- cron: "0 0 * * 6" - cron: "0 0 * * *"
jobs: jobs:
lockfile: lockfile:
@@ -20,4 +20,3 @@ jobs:
pr-labels: | pr-labels: |
dependencies dependencies
automated automated
flake_lock_update

View File

@@ -1,13 +1,9 @@
# Generate AGE keys from SSH keys with:
# ssh-keygen -A
# nix-shell -p ssh-to-age --run 'cat /etc/ssh/ssh_host_ed25519_key.pub | ssh-to-age'
keys: keys:
- &admin_richie age1u8zj599elqqvcmhxn8zuwrufsz8w8w366d3ayrljjejljt2q45kq8mxw9c # cspell:disable-line - &admin_richie age1u8zj599elqqvcmhxn8zuwrufsz8w8w366d3ayrljjejljt2q45kq8mxw9c # cspell:disable-line
- &system_bob age1q47vup0tjhulkg7d6xwmdsgrw64h4ax3la3evzqpxyy4adsmk9fs56qz3y # cspell:disable-line - &system_bob age1q47vup0tjhulkg7d6xwmdsgrw64h4ax3la3evzqpxyy4adsmk9fs56qz3y # cspell:disable-line
- &system_brain age1jhf7vm0005j60mjq63696frrmjhpy8kpc2d66mw044lqap5mjv4snmwvwm # cspell:disable-line
- &system_jeeves age13lmqgc3jvkyah5e3vcwmj4s5wsc2akctcga0lpc0x8v8du3fxprqp4ldkv # cspell:disable-line - &system_jeeves age13lmqgc3jvkyah5e3vcwmj4s5wsc2akctcga0lpc0x8v8du3fxprqp4ldkv # cspell:disable-line
- &system_leviathan age1l272y8udvg60z7edgje42fu49uwt4x2gxn5zvywssnv9h2krms8s094m4k # cspell:disable-line - &system_router age1xzxryqq63x65yuza9lmmkud7crjjxpnkdew070yhx6xn7xe4tdws5twxsv # cspell:disable-line
- &system_rhapsody age1ufnewppysaq2wwcl4ugngjz8pfzc5a35yg7luq0qmuqvctajcycs5lf6k4 # cspell:disable-line - &system_rhapsody age1ufnewppysaq2wwcl4ugngjz8pfzc5a35yg7luq0qmuqvctajcycs5lf6k4 # cspell:disable-line
creation_rules: creation_rules:
@@ -16,7 +12,6 @@ creation_rules:
- age: - age:
- *admin_richie - *admin_richie
- *system_bob - *system_bob
- *system_brain
- *system_jeeves - *system_jeeves
- *system_leviathan - *system_router
- *system_rhapsody - *system_rhapsody

18
.vscode/settings.json vendored
View File

@@ -2,7 +2,6 @@
"cSpell.words": [ "cSpell.words": [
"aboutwelcome", "aboutwelcome",
"acltype", "acltype",
"addopts",
"addstr", "addstr",
"advplyr", "advplyr",
"ahci", "ahci",
@@ -10,7 +9,6 @@
"aiounifi", "aiounifi",
"alsa", "alsa",
"apiclient", "apiclient",
"apscheduler",
"archlinux", "archlinux",
"ashift", "ashift",
"asrouter", "asrouter",
@@ -116,7 +114,6 @@
"httpchk", "httpchk",
"hurlenko", "hurlenko",
"hwloc", "hwloc",
"ignorelist",
"INITDB", "INITDB",
"iocharset", "iocharset",
"ioit", "ioit",
@@ -151,15 +148,11 @@
"mixtral", "mixtral",
"mklabel", "mklabel",
"mkpart", "mkpart",
"modbus",
"modbuss",
"modesetting", "modesetting",
"mountpoint", "mountpoint",
"mountpoints", "mountpoints",
"mousewheel", "mousewheel",
"mqtt",
"mtxr", "mtxr",
"mypy",
"ncdu", "ncdu",
"nemo", "nemo",
"neofetch", "neofetch",
@@ -191,7 +184,6 @@
"overalljails", "overalljails",
"overscroll", "overscroll",
"overseerr", "overseerr",
"paho",
"partitionwise", "partitionwise",
"pbmode", "pbmode",
"pciutils", "pciutils",
@@ -219,14 +211,9 @@
"pulseaudio", "pulseaudio",
"punycode", "punycode",
"pychromecast", "pychromecast",
"pydocstyle",
"pyfakefs",
"pylance", "pylance",
"pylint",
"pymetno", "pymetno",
"pymodbus",
"pyownet", "pyownet",
"pytest",
"qbit", "qbit",
"qbittorrent", "qbittorrent",
"qbittorrentvpn", "qbittorrentvpn",
@@ -274,7 +261,6 @@
"tabmanager", "tabmanager",
"tamasfe", "tamasfe",
"TCPIP", "TCPIP",
"testdisk",
"tiktok", "tiktok",
"timonwong", "timonwong",
"titlebar", "titlebar",
@@ -284,7 +270,6 @@
"topstories", "topstories",
"treefmt", "treefmt",
"twimg", "twimg",
"typer",
"uaccess", "uaccess",
"ublock", "ublock",
"uiprotect", "uiprotect",
@@ -300,7 +285,6 @@
"usernamehw", "usernamehw",
"userprefs", "userprefs",
"vfat", "vfat",
"victron",
"virt", "virt",
"virtualisation", "virtualisation",
"vpnpromourl", "vpnpromourl",
@@ -312,8 +296,6 @@
"wireshark", "wireshark",
"Workqueues", "Workqueues",
"xattr", "xattr",
"xcursorgen",
"xdist",
"xhci", "xhci",
"yazi", "yazi",
"yubikey", "yubikey",

View File

@@ -1,10 +1,4 @@
{ lib, pkgs, ... }: { lib, pkgs, ... }:
let
libPath = pkgs.lib.makeLibraryPath [
pkgs.zlib
pkgs.stdenv.cc.cc.lib
];
in
{ {
programs.nix-ld = { programs.nix-ld = {
enable = lib.mkDefault true; enable = lib.mkDefault true;
@@ -21,7 +15,6 @@ in
libxml2 libxml2
openssl openssl
stdenv.cc.cc stdenv.cc.cc
stdenv.cc.cc.lib
systemd systemd
util-linux util-linux
xz xz
@@ -30,9 +23,4 @@ in
zstd zstd
]; ];
}; };
environment = {
sessionVariables.LD_LIBRARY_PATH = lib.mkDefault libPath;
variables.LD_LIBRARY_PATH = lib.mkDefault libPath;
};
} }

View File

@@ -2,6 +2,6 @@
{ {
environment.systemPackages = with pkgs; [ environment.systemPackages = with pkgs; [
git git
my_python python313
]; ];
} }

View File

@@ -11,41 +11,33 @@ in
{ {
options = { options = {
services.snapshot_manager = { services.snapshot_manager = {
enable = lib.mkEnableOption "ZFS snapshot manager"; enable = lib.mkOption {
default = true;
example = true;
description = "Whether to enable k3s-net.";
type = lib.types.bool;
};
path = lib.mkOption { path = lib.mkOption {
type = lib.types.path; type = lib.types.path;
description = "Path that needs to be updated via git pull";
default = ./snapshot_config.toml; default = ./snapshot_config.toml;
description = "Path to the snapshot_manager TOML config.";
};
EnvironmentFile = lib.mkOption {
type = lib.types.nullOr (lib.types.coercedTo lib.types.path toString lib.types.str);
default = null;
description = ''
Single environment file for the service (e.g. /etc/snapshot-manager/env).
Use a leading "-" to ignore if missing (systemd feature).
'';
}; };
}; };
}; };
config = lib.mkIf cfg.enable { config = lib.mkIf cfg.enable {
systemd = { systemd = {
services.snapshot_manager = { services."snapshot_manager" = {
description = "ZFS Snapshot Manager"; description = "ZFS Snapshot Manager";
requires = [ "zfs-import.target" ]; requires = [ "zfs-import.target" ];
after = [ "zfs-import.target" ]; after = [ "zfs-import.target" ];
path = [ pkgs.zfs ]; path = [ pkgs.zfs ];
serviceConfig = { serviceConfig = {
Type = "oneshot"; Type = "oneshot";
ExecStart = "${ ExecStart = "${inputs.system_tools.packages.x86_64-linux.default}/bin/snapshot_manager --config-file='${cfg.path}'";
inputs.system_tools.packages.${pkgs.system}.default
}/bin/snapshot_manager ${lib.escapeShellArg cfg.path}";
}
// lib.optionalAttrs (cfg.EnvironmentFile != null) {
EnvironmentFile = cfg.EnvironmentFile;
}; };
}; };
timers.snapshot_manager = { timers."snapshot_manager" = {
wantedBy = [ "timers.target" ]; wantedBy = [ "timers.target" ];
timerConfig = { timerConfig = {
OnBootSec = "15m"; OnBootSec = "15m";

View File

@@ -1,7 +1,7 @@
{ pkgs, ... }: { pkgs, ... }:
{ {
boot = { boot = {
kernelPackages = pkgs.linuxPackages_6_17; kernelPackages = pkgs.linuxPackages_6_15;
zfs.package = pkgs.zfs_2_3; zfs.package = pkgs.zfs_2_3;
}; };

View File

@@ -10,9 +10,6 @@
authorizedKeys = config.users.users.richie.openssh.authorizedKeys.keys; authorizedKeys = config.users.users.richie.openssh.authorizedKeys.keys;
}; };
}; };
availableKernelModules = [ availableKernelModules = [ "igb" ];
"igb"
"r8152"
];
}; };
} }

View File

@@ -8,11 +8,10 @@
dataDir = "/home/richie/Syncthing"; dataDir = "/home/richie/Syncthing";
configDir = "/home/richie/.config/syncthing"; configDir = "/home/richie/.config/syncthing";
settings.devices = { settings.devices = {
bob.id = "CJIAPEJ-VO74RR4-F75VU6M-QNZAMYG-FYUJG7Y-6AT62HJ-355PRPL-PJFETAZ"; # cspell:disable-line
brain.id = "SSCGIPI-IV3VYKB-TRNIJE3-COV4T2H-CDBER7F-I2CGHYA-NWOEUDU-3T5QAAN"; # cspell:disable-line
ipad.id = "KI76T3X-SFUGV2L-VSNYTKR-TSIUV5L-SHWD3HE-GQRGRCN-GY4UFMD-CW6Z6AX"; # cspell:disable-line
jeeves.id = "ICRHXZW-ECYJCUZ-I4CZ64R-3XRK7CG-LL2HAAK-FGOHD22-BQA4AI6-5OAL6AG"; # cspell:disable-line
phone.id = "TBRULKD-7DZPGGZ-F6LLB7J-MSO54AY-7KLPBIN-QOFK6PX-W2HBEWI-PHM2CQI"; # cspell:disable-line phone.id = "TBRULKD-7DZPGGZ-F6LLB7J-MSO54AY-7KLPBIN-QOFK6PX-W2HBEWI-PHM2CQI"; # cspell:disable-line
jeeves.id = "ICRHXZW-ECYJCUZ-I4CZ64R-3XRK7CG-LL2HAAK-FGOHD22-BQA4AI6-5OAL6AG"; # cspell:disable-line
ipad.id = "KI76T3X-SFUGV2L-VSNYTKR-TSIUV5L-SHWD3HE-GQRGRCN-GY4UFMD-CW6Z6AX"; # cspell:disable-line
bob.id = "CJIAPEJ-VO74RR4-F75VU6M-QNZAMYG-FYUJG7Y-6AT62HJ-355PRPL-PJFETAZ"; # cspell:disable-line
rhapsody-in-green.id = "ASL3KC4-3XEN6PA-7BQBRKE-A7JXLI6-DJT43BY-Q4WPOER-7UALUAZ-VTPQ6Q4"; # cspell:disable-line rhapsody-in-green.id = "ASL3KC4-3XEN6PA-7BQBRKE-A7JXLI6-DJT43BY-Q4WPOER-7UALUAZ-VTPQ6Q4"; # cspell:disable-line
}; };
}; };

View File

@@ -5,7 +5,5 @@
randomizedDelaySec = "1h"; randomizedDelaySec = "1h";
persistent = true; persistent = true;
flake = "github:RichieCahill/dotfiles"; flake = "github:RichieCahill/dotfiles";
allowReboot = true;
dates = "Sat *-*-* 06:00:00";
}; };
} }

View File

@@ -1,4 +0,0 @@
source "https://rubygems.org"
# The github-pages gem pins all compatible versions of Jekyll and its plugins
gem "github-pages", group: :jekyll_plugins

View File

@@ -1,23 +0,0 @@
title: "Richie Cahill"
description: "ALL THE CHAOS THAT I CANT DO AT WORK"
baseurl: "/dotfiles"
url: "https://richiecahill.github.io"
remote_theme: pages-themes/hacker@v0.2.0
plugins:
- jekyll-feed
- jekyll-remote-theme
- jekyll-seo-tag
- jekyll-sitemap
- jekyll-paginate
paginate: 5
paginate_path: "/page:num"
author:
name: "Richie Cahill"
email: "richie@tmmworkshop.com"
social_links:
github: "RichieCahill"
website: "https://tmmworkshop.com"

View File

@@ -1,13 +0,0 @@
# The MONOREPO experiment
Im testing a [MONOREPO](https://en.wikipedia.org/wiki/Monorepo) because Phil said this was a bad idea. To that i say hold my beer.
In all seriousness, I Think that for a small dev team/solo dev. The simplicity is worth higher barer to entry. One of my most annoying processes was updating my system tools. I had to build my update in a feature branch and then merge it into my main branch. then go to my dotfiles create a feature branch update the system tools merge it into main.
It will be starting with my Nix Dotfiles Python tools and now my blog.
I will be reaching ot to phil on 2030-10-31 and 2035-10-31 to give him updates on the progress.
Known Issues:
- the python tests are running on the current derivation not the one the derivation im updating to.

View File

@@ -1,17 +0,0 @@
---
layout: default
title: "Welcome"
---
Welcome to my build logs, notes, and experiments.
You can read my latest posts below
<ul>
{% for post in site.posts %}
<li>
<a href="{{ post.url | relative_url }}">{{ post.title }}</a>
<small>— {{ post.date | date: "%Y-%m-%d" }}</small>
</li>
{% endfor %}
</ul>

3
esphome/.gitignore vendored
View File

@@ -1,3 +0,0 @@
# esphome
/.esphome/
/secrets.yaml

View File

@@ -1,129 +0,0 @@
esphome:
name: batteries
friendly_name: batteries
esp32:
board: esp32dev
framework:
type: arduino
logger:
api:
encryption:
key: !secret api_key
external_components:
- source: github://syssi/esphome-jk-bms@main
ota:
- platform: esphome
password: !secret ota_password
wifi:
ssid: !secret wifi_ssid
password: !secret wifi_password
captive_portal:
esp32_ble_tracker:
scan_parameters:
interval: 1100ms
window: 1100ms
active: true
ble_client:
- mac_address: "C8:47:80:29:0F:DB"
id: jk_ble0
- mac_address: "C8:47:80:37:9D:DD"
id: jk_ble1
jk_bms_ble:
- ble_client_id: jk_ble0
protocol_version: JK02_32S
throttle: 1s
id: jk_bms0
- ble_client_id: jk_ble1
protocol_version: JK02_32S
throttle: 1s
id: jk_bms1
sensor:
# BMS1 sensors
- platform: jk_bms_ble
jk_bms_ble_id: jk_bms0
total_voltage:
name: "JK0 Total Voltage"
current:
name: "JK0 Current"
state_of_charge:
name: "JK0 SoC"
power:
name: "JK0 Power"
temperature_sensor_1:
name: "JK0 Temp 1"
temperature_sensor_2:
name: "JK0 Temp 2"
balancing:
name: "JK0 balancing"
charging_cycles:
name: "JK0 charging cycles"
total_runtime:
name: "JK0 total runtime"
balancing_current:
name: "JK0 balancing current"
# BMS2 sensors
- platform: jk_bms_ble
jk_bms_ble_id: jk_bms1
total_voltage:
name: "JK1 Total Voltage"
current:
name: "JK1 Current"
state_of_charge:
name: "JK1 SoC"
power:
name: "Jk1 Power"
temperature_sensor_1:
name: "JK1 Temp 1"
temperature_sensor_2:
name: "Jk1 Temp 2"
balancing:
name: "JK1 balancing"
charging_cycles:
name: "JK1 charging cycles"
total_runtime:
name: "JK1 total runtime"
balancing_current:
name: "JK1 balancing current"
text_sensor:
- platform: jk_bms_ble
jk_bms_ble_id: jk_bms0
errors:
name: "JK0 Errors"
- platform: jk_bms_ble
jk_bms_ble_id: jk_bms1
errors:
name: "JK1 Errors"
switch:
- platform: jk_bms_ble
jk_bms_ble_id: jk_bms0
charging:
name: "JK0 Charging"
discharging:
name: "JK0 Discharging"
balancer:
name: "JK0 Balancing"
- platform: jk_bms_ble
jk_bms_ble_id: jk_bms1
charging:
name: "JK1 Charging"
discharging:
name: "JK1 Discharging"
balancer:
name: "JK1 Balancing"

File diff suppressed because one or more lines are too long

42
flake.lock generated
View File

@@ -8,11 +8,11 @@
}, },
"locked": { "locked": {
"dir": "pkgs/firefox-addons", "dir": "pkgs/firefox-addons",
"lastModified": 1760673822, "lastModified": 1750219402,
"narHash": "sha256-h+liPhhMw1yYvkDGLHzQJQShQs+yLjNgjfAyZX+sRrM=", "narHash": "sha256-b3y7V7db0VwLGtpcLRmT1Aa9dpAKoHQdem55UhgB/fw=",
"owner": "rycee", "owner": "rycee",
"repo": "nur-expressions", "repo": "nur-expressions",
"rev": "5cca27f1bb30a26140d0cf60ab34daa45b4fa11f", "rev": "a00ce73b626ed274fbfe9f51627861e140b08f6d",
"type": "gitlab" "type": "gitlab"
}, },
"original": { "original": {
@@ -29,11 +29,11 @@
] ]
}, },
"locked": { "locked": {
"lastModified": 1760662441, "lastModified": 1750275572,
"narHash": "sha256-mlDqR1Ntgs9uYYEAUR1IhamKBO0lxoNS4zGLzEZaY0A=", "narHash": "sha256-upC/GIlsIgtdtWRGd1obzdXWYQptNkfzZeyAFWgsgf0=",
"owner": "nix-community", "owner": "nix-community",
"repo": "home-manager", "repo": "home-manager",
"rev": "722792af097dff5790f1a66d271a47759f477755", "rev": "0f355844e54e4c70906b1ef5cc35a0047d666c04",
"type": "github" "type": "github"
}, },
"original": { "original": {
@@ -44,11 +44,11 @@
}, },
"nixos-hardware": { "nixos-hardware": {
"locked": { "locked": {
"lastModified": 1760106635, "lastModified": 1750083401,
"narHash": "sha256-2GoxVaKWTHBxRoeUYSjv0AfSOx4qw5CWSFz2b+VolKU=", "narHash": "sha256-ynqbgIYrg7P1fAKYqe8I/PMiLABBcNDYG9YaAP/d/C4=",
"owner": "nixos", "owner": "nixos",
"repo": "nixos-hardware", "repo": "nixos-hardware",
"rev": "9ed85f8afebf2b7478f25db0a98d0e782c0ed903", "rev": "61837d2a33ccc1582c5fabb7bf9130d39fee59ad",
"type": "github" "type": "github"
}, },
"original": { "original": {
@@ -60,11 +60,11 @@
}, },
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1760524057, "lastModified": 1750134718,
"narHash": "sha256-EVAqOteLBFmd7pKkb0+FIUyzTF61VKi7YmvP1tw4nEw=", "narHash": "sha256-v263g4GbxXv87hMXMCpjkIxd/viIF7p3JpJrwgKdNiI=",
"owner": "nixos", "owner": "nixos",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "544961dfcce86422ba200ed9a0b00dd4b1486ec5", "rev": "9e83b64f727c88a7711a2c463a7b16eedb69a84c",
"type": "github" "type": "github"
}, },
"original": { "original": {
@@ -76,11 +76,11 @@
}, },
"nixpkgs-master": { "nixpkgs-master": {
"locked": { "locked": {
"lastModified": 1760751316, "lastModified": 1750291913,
"narHash": "sha256-1296zQfPiLZNrLKzX1t+kunadeI/mH82hKze3voduEI=", "narHash": "sha256-JW40+zIiDS+rZavb9IYdIN40/GmErO2+0+A66rM6/b8=",
"owner": "nixos", "owner": "nixos",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "d85429339c0bcf0428084fe1306c970aed364417", "rev": "ba92ab5dc0759a8740003ca34b5c1b888f4766d4",
"type": "github" "type": "github"
}, },
"original": { "original": {
@@ -176,11 +176,11 @@
] ]
}, },
"locked": { "locked": {
"lastModified": 1760393368, "lastModified": 1750119275,
"narHash": "sha256-8mN3kqyqa2PKY0wwZ2UmMEYMcxvNTwLaOrrDsw6Qi4E=", "narHash": "sha256-Rr7Pooz9zQbhdVxux16h7URa6mA80Pb/G07T4lHvh0M=",
"owner": "Mic92", "owner": "Mic92",
"repo": "sops-nix", "repo": "sops-nix",
"rev": "ab8d56e85b8be14cff9d93735951e30c3e86a437", "rev": "77c423a03b9b2b79709ea2cb63336312e78b72e2",
"type": "github" "type": "github"
}, },
"original": { "original": {
@@ -199,11 +199,11 @@
"uv2nix": "uv2nix" "uv2nix": "uv2nix"
}, },
"locked": { "locked": {
"lastModified": 1760751967, "lastModified": 1747501237,
"narHash": "sha256-u/uciy9kpM/CBZKl05iAZRaOTwUHiuI0L/qbkk2mLUg=", "narHash": "sha256-woyaUwmZurfNTXBEFM6M7ueSd/Udixs+4DUInhL835c=",
"owner": "RichieCahill", "owner": "RichieCahill",
"repo": "system_tools", "repo": "system_tools",
"rev": "a125c3e5c01cecbc3f2a842ffb1abb1210c35706", "rev": "68ab5d1c17ac3fe2487f73dbbb4848bd2291139e",
"type": "github" "type": "github"
}, },
"original": { "original": {

View File

@@ -77,12 +77,6 @@
]; ];
specialArgs = { inherit inputs outputs; }; specialArgs = { inherit inputs outputs; };
}; };
brain = lib.nixosSystem {
modules = [
./systems/brain
];
specialArgs = { inherit inputs outputs; };
};
jeeves = lib.nixosSystem { jeeves = lib.nixosSystem {
modules = [ modules = [
./systems/jeeves ./systems/jeeves
@@ -95,12 +89,6 @@
]; ];
specialArgs = { inherit inputs outputs; }; specialArgs = { inherit inputs outputs; };
}; };
leviathan = lib.nixosSystem {
modules = [
./systems/leviathan
];
specialArgs = { inherit inputs outputs; };
};
}; };
}; };
} }

View File

@@ -14,24 +14,4 @@
config.allowUnfree = true; config.allowUnfree = true;
}; };
}; };
python-env = final: _prev: {
my_python = final.python313.withPackages (
ps: with ps; [
apprise
apscheduler
mypy
polars
pyfakefs
pytest
pytest-cov
pytest-mock
pytest-xdist
requests
ruff
typer
types-requests
]
);
};
} }

View File

@@ -1,73 +0,0 @@
[project]
name = "system_tools"
version = "0.1.0"
description = ""
authors = [{ name = "Richie Cahill", email = "richie@tmmworkshop.com" }]
requires-python = "~=3.13.0"
readme = "README.md"
license = "MIT"
# these dependencies are a best effort and aren't guaranteed to work
dependencies = ["apprise", "apscheduler", "polars", "requests", "typer"]
[dependency-groups]
dev = [
"mypy",
"pyfakefs",
"pytest-cov",
"pytest-mock",
"pytest-xdist",
"pytest",
"ruff",
"types-requests",
]
[tool.ruff]
target-version = "py313"
line-length = 120
lint.select = ["ALL"]
lint.ignore = [
"G004", # (PERM) This is a performers nit
"COM812", # (TEMP) conflicts when used with the formatter
"ISC001", # (TEMP) conflicts when used with the formatter
"S603", # (PERM) This is known to cause a false positive
]
[tool.ruff.lint.per-file-ignores]
"tests/**" = [
"S101", # (perm) pytest needs asserts
]
"python/random/**" = [
"T201", # (perm) I don't care about print statements dir
]
"python/testing/**" = [
"T201", # (perm) I don't care about print statements dir
"ERA001", # (perm) I don't care about print statements dir
]
[tool.ruff.lint.pydocstyle]
convention = "google"
[tool.ruff.lint.flake8-builtins]
builtins-ignorelist = ["id"]
[tool.ruff.lint.pylint]
max-args = 9
[tool.coverage.run]
source = ["system_tools"]
[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"if TYPE_CHECKING:",
"raise NotImplementedError",
"if __name__ == \"__main__\":",
]
[tool.pytest.ini_options]
addopts = "-n auto -ra"
# --cov=system_tools --cov-report=term-missing --cov-report=xml --cov-report=html --cov-branch

View File

@@ -1 +0,0 @@
"""Server Tools."""

View File

@@ -1,72 +0,0 @@
"""common."""
from __future__ import annotations
import logging
import sys
from datetime import UTC, datetime
from os import getenv
from subprocess import PIPE, Popen
from apprise import Apprise
logger = logging.getLogger(__name__)
def configure_logger(level: str = "INFO") -> None:
"""Configure the logger.
Args:
level (str, optional): The logging level. Defaults to "INFO".
"""
logging.basicConfig(
level=level,
datefmt="%Y-%m-%dT%H:%M:%S%z",
format="%(asctime)s %(levelname)s %(filename)s:%(lineno)d - %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
def bash_wrapper(command: str) -> tuple[str, int]:
"""Execute a bash command and capture the output.
Args:
command (str): The bash command to be executed.
Returns:
Tuple[str, int]: A tuple containing the output of the command (stdout) as a string,
the error output (stderr) as a string (optional), and the return code as an integer.
"""
# This is a acceptable risk
process = Popen(command.split(), stdout=PIPE, stderr=PIPE)
output, error = process.communicate()
if error:
logger.error(f"{error=}")
return error.decode(), process.returncode
return output.decode(), process.returncode
def signal_alert(body: str, title: str = "") -> None:
"""Send a signal alert.
Args:
body (str): The body of the alert.
title (str, optional): The title of the alert. Defaults to "".
"""
apprise_client = Apprise()
from_phone = getenv("SIGNAL_ALERT_FROM_PHONE")
to_phone = getenv("SIGNAL_ALERT_TO_PHONE")
if not from_phone or not to_phone:
logger.info("SIGNAL_ALERT_FROM_PHONE or SIGNAL_ALERT_TO_PHONE not set")
return
apprise_client.add(f"signal://localhost:8989/{from_phone}/{to_phone}")
apprise_client.notify(title=title, body=body)
def utcnow() -> datetime:
"""Get the current UTC time."""
return datetime.now(tz=UTC)

View File

@@ -1 +0,0 @@
"""installer."""

View File

@@ -1,308 +0,0 @@
"""Install NixOS on a ZFS pool."""
from __future__ import annotations
import curses
import logging
import sys
from os import getenv
from pathlib import Path
from random import getrandbits
from subprocess import PIPE, Popen, run
from time import sleep
from typing import TYPE_CHECKING
from python.common import configure_logger
from python.installer.tui import draw_menu
if TYPE_CHECKING:
from collections.abc import Sequence
logger = logging.getLogger(__name__)
def bash_wrapper(command: str) -> str:
"""Execute a bash command and capture the output.
Args:
command (str): The bash command to be executed.
Returns:
Tuple[str, int]: A tuple containing the output of the command (stdout) as a string,
the error output (stderr) as a string (optional), and the return code as an integer.
"""
logger.debug(f"running {command=}")
# This is a acceptable risk
process = Popen(command.split(), stdout=PIPE, stderr=PIPE)
output, _ = process.communicate()
if process.returncode != 0:
error = f"Failed to run command {command=} return code {process.returncode=}"
raise RuntimeError(error)
return output.decode()
def partition_disk(disk: str, swap_size: int, reserve: int = 0) -> None:
"""Partition a disk.
Args:
disk (str): The disk to partition.
swap_size (int): The size of the swap partition in GB.
minimum value is 1.
reserve (int, optional): The size of the reserve partition in GB. Defaults to 0.
minimum value is 0.
"""
logger.info(f"partitioning {disk=}")
swap_size = max(swap_size, 1)
reserve = max(reserve, 0)
bash_wrapper(f"blkdiscard -f {disk}")
if reserve > 0:
msg = f"Creating swap partition on {disk=} with size {swap_size=}GiB and reserve {reserve=}GiB"
logger.info(msg)
swap_start = swap_size + reserve
swap_partition = f"mkpart swap -{swap_start}GiB -{reserve}GiB "
else:
logger.info(f"Creating swap partition on {disk=} with size {swap_size=}GiB")
swap_start = swap_size
swap_partition = f"mkpart swap -{swap_start}GiB 100% "
logger.debug(f"{swap_partition=}")
create_partitions = (
f"parted --script --align=optimal {disk} -- "
"mklabel gpt "
"mkpart EFI 1MiB 4GiB "
f"mkpart root_pool 4GiB -{swap_start}GiB "
f"{swap_partition}"
"set 1 esp on"
)
bash_wrapper(create_partitions)
logger.info(f"{disk=} successfully partitioned")
def create_zfs_pool(pool_disks: Sequence[str], mnt_dir: str) -> None:
"""Create a ZFS pool.
Args:
pool_disks (Sequence[str]): A tuple of disks to use for the pool.
mnt_dir (str): The mount directory.
"""
if len(pool_disks) <= 0:
error = "disks must be a tuple of at least length 1"
raise ValueError(error)
zpool_create = (
"zpool create "
"-o ashift=12 "
"-o autotrim=on "
f"-R {mnt_dir} "
"-O acltype=posixacl "
"-O canmount=off "
"-O dnodesize=auto "
"-O normalization=formD "
"-O relatime=on "
"-O xattr=sa "
"-O mountpoint=legacy "
"-O compression=zstd "
"-O atime=off "
"root_pool "
)
if len(pool_disks) == 1:
zpool_create += pool_disks[0]
else:
zpool_create += "mirror "
zpool_create += " ".join(pool_disks)
bash_wrapper(zpool_create)
zpools = bash_wrapper("zpool list -o name")
if "root_pool" not in zpools.splitlines():
logger.critical("Failed to create root_pool")
sys.exit(1)
def create_zfs_datasets() -> None:
"""Create ZFS datasets."""
bash_wrapper("zfs create -o canmount=noauto -o reservation=10G root_pool/root")
bash_wrapper("zfs create root_pool/home")
bash_wrapper("zfs create root_pool/var -o reservation=1G")
bash_wrapper("zfs create -o compression=zstd-9 -o reservation=10G root_pool/nix")
datasets = bash_wrapper("zfs list -o name")
expected_datasets = {
"root_pool/root",
"root_pool/home",
"root_pool/var",
"root_pool/nix",
}
missing_datasets = expected_datasets.difference(datasets.splitlines())
if missing_datasets:
logger.critical(f"Failed to create pools {missing_datasets}")
sys.exit(1)
def get_cpu_manufacturer() -> str:
"""Get the CPU manufacturer."""
output = bash_wrapper("cat /proc/cpuinfo")
id_vendor = {"AuthenticAMD": "amd", "GenuineIntel": "intel"}
for line in output.splitlines():
if "vendor_id" in line:
return id_vendor[line.split(": ")[1].strip()]
error = "Failed to get CPU manufacturer"
raise RuntimeError(error)
def get_boot_drive_id(disk: str) -> str:
"""Get the boot drive ID."""
output = bash_wrapper(f"lsblk -o UUID {disk}-part1")
return output.splitlines()[1]
def create_nix_hardware_file(mnt_dir: str, disks: Sequence[str], encrypt: str | None) -> None:
"""Create a NixOS hardware file."""
cpu_manufacturer = get_cpu_manufacturer()
devices = ""
if encrypt:
disk = disks[0]
devices = (
f' luks.devices."luks-root-pool-{disk.split("/")[-1]}-part2"'
"= {\n"
f' device = "{disk}-part2";\n'
" bypassWorkqueues = true;\n"
" allowDiscards = true;\n"
" };\n"
)
host_id = format(getrandbits(32), "08x")
nix_hardware = (
"{ config, lib, modulesPath, ... }:\n"
"{\n"
' imports = [ (modulesPath + "/installer/scan/not-detected.nix") ];\n\n'
" boot = {\n"
" initrd = {\n"
' availableKernelModules = [ \n "ahci"\n "ehci_pci"\n "nvme"\n "sd_mod"\n'
' "usb_storage"\n "usbhid"\n "xhci_pci"\n ];\n'
" kernelModules = [ ];\n"
f" {devices}"
" };\n"
f' kernelModules = [ "kvm-{cpu_manufacturer}" ];\n'
" extraModulePackages = [ ];\n"
" };\n\n"
" fileSystems = {\n"
' "/" = lib.mkDefault {\n device = "root_pool/root";\n fsType = "zfs";\n };\n\n'
' "/home" = {\n device = "root_pool/home";\n fsType = "zfs";\n };\n\n'
' "/var" = {\n device = "root_pool/var";\n fsType = "zfs";\n };\n\n'
' "/nix" = {\n device = "root_pool/nix";\n fsType = "zfs";\n };\n\n'
' "/boot" = {\n'
f' device = "/dev/disk/by-uuid/{get_boot_drive_id(disks[0])}";\n'
' fsType = "vfat";\n options = [\n "fmask=0077"\n'
' "dmask=0077"\n ];\n };\n };\n\n'
" swapDevices = [ ];\n\n"
" networking.useDHCP = lib.mkDefault true;\n\n"
' nixpkgs.hostPlatform = lib.mkDefault "x86_64-linux";\n'
f" hardware.cpu.{cpu_manufacturer}.updateMicrocode = "
"lib.mkDefault config.hardware.enableRedistributableFirmware;\n"
f' networking.hostId = "{host_id}";\n'
"}\n"
)
Path(f"{mnt_dir}/etc/nixos/hardware-configuration.nix").write_text(nix_hardware)
def install_nixos(mnt_dir: str, disks: Sequence[str], encrypt: str | None) -> None:
"""Install NixOS."""
bash_wrapper(f"mount -o X-mount.mkdir -t zfs root_pool/root {mnt_dir}")
bash_wrapper(f"mount -o X-mount.mkdir -t zfs root_pool/home {mnt_dir}/home")
bash_wrapper(f"mount -o X-mount.mkdir -t zfs root_pool/var {mnt_dir}/var")
bash_wrapper(f"mount -o X-mount.mkdir -t zfs root_pool/nix {mnt_dir}/nix")
for disk in disks:
bash_wrapper(f"mkfs.vfat -n EFI {disk}-part1")
# set up mirroring afterwards if more than one disk
boot_partition = (
f"mount -t vfat -o fmask=0077,dmask=0077,iocharset=iso8859-1,X-mount.mkdir {disks[0]}-part1 {mnt_dir}/boot"
)
bash_wrapper(boot_partition)
bash_wrapper(f"nixos-generate-config --root {mnt_dir}")
create_nix_hardware_file(mnt_dir, disks, encrypt)
run(("nixos-install", "--root", mnt_dir), check=True)
def installer(
disks: Sequence[str],
swap_size: int,
reserve: int,
encrypt_key: str | None,
) -> None:
"""Main."""
logger.info("Starting installation")
for disk in disks:
partition_disk(disk, swap_size, reserve)
test = Popen(("printf", f"'{encrypt_key}'"), stdout=PIPE)
if encrypt_key:
sleep(1)
for command in (
f"cryptsetup luksFormat --type luks2 {disk}-part2 -",
f"cryptsetup luksOpen {disk}-part2 luks-root-pool-{disk.split('/')[-1]}-part2 -",
):
run(command, check=True, stdin=test.stdout)
mnt_dir = "/tmp/nix_install" # noqa: S108
Path(mnt_dir).mkdir(parents=True, exist_ok=True)
if encrypt_key:
pool_disks = [f"/dev/mapper/luks-root-pool-{disk.split('/')[-1]}-part2" for disk in disks]
else:
pool_disks = [f"{disk}-part2" for disk in disks]
create_zfs_pool(pool_disks, mnt_dir)
create_zfs_datasets()
install_nixos(mnt_dir, disks, encrypt_key)
logger.info("Installation complete")
def main() -> None:
"""Main."""
configure_logger("DEBUG")
state = curses.wrapper(draw_menu)
encrypt_key = getenv("ENCRYPT_KEY")
logger.info("installing_nixos")
logger.info(f"disks: {state.selected_device_ids}")
logger.info(f"swap_size: {state.swap_size}")
logger.info(f"reserve: {state.reserve_size}")
logger.info(f"encrypted: {bool(encrypt_key)}")
sleep(3)
installer(
disks=state.get_selected_devices(),
swap_size=state.swap_size,
reserve=state.reserve_size,
encrypt_key=encrypt_key,
)
if __name__ == "__main__":
main()

View File

@@ -1,498 +0,0 @@
"""TUI module."""
from __future__ import annotations
import curses
import logging
from collections import defaultdict
from subprocess import PIPE, Popen
logger = logging.getLogger(__name__)
def bash_wrapper(command: str) -> str:
"""Execute a bash command and capture the output.
Args:
command (str): The bash command to be executed.
Returns:
Tuple[str, int]: A tuple containing the output of the command (stdout) as a string,
the error output (stderr) as a string (optional), and the return code as an integer.
"""
logger.debug(f"running {command=}")
# This is a acceptable risk
process = Popen(command.split(), stdout=PIPE, stderr=PIPE)
output, _ = process.communicate()
if process.returncode != 0:
error = f"Failed to run command {command=} return code {process.returncode=}"
raise RuntimeError(error)
return output.decode()
class Cursor:
"""Cursor class."""
def __init__(self) -> None:
"""Initialize the Cursor class."""
self.x_position = 0
self.y_position = 0
self.height = 0
self.width = 0
def set_height(self, height: int) -> None:
"""Set height."""
self.height = height
def set_width(self, width: int) -> None:
"""Set width."""
self.width = width
def x_bounce_check(self, cursor: int) -> int:
"""X bounce check."""
cursor = max(0, cursor)
return min(self.width - 1, cursor)
def y_bounce_check(self, cursor: int) -> int:
"""Y bounce check."""
cursor = max(0, cursor)
return min(self.height - 1, cursor)
def set_x(self, x: int) -> None:
"""Set x."""
self.x_position = self.x_bounce_check(x)
def set_y(self, y: int) -> None:
"""Set y."""
self.y_position = self.y_bounce_check(y)
def get_x(self) -> int:
"""Get x."""
return self.x_position
def get_y(self) -> int:
"""Get y."""
return self.y_position
def move_up(self) -> None:
"""Move up."""
self.set_y(self.y_position - 1)
def move_down(self) -> None:
"""Move down."""
self.set_y(self.y_position + 1)
def move_left(self) -> None:
"""Move left."""
self.set_x(self.x_position - 1)
def move_right(self) -> None:
"""Move right."""
self.set_x(self.x_position + 1)
def navigation(self, key: int) -> None:
"""Navigation.
Args:
key (int): The key.
"""
action = {
curses.KEY_DOWN: self.move_down,
curses.KEY_UP: self.move_up,
curses.KEY_RIGHT: self.move_right,
curses.KEY_LEFT: self.move_left,
}
action.get(key, lambda: None)()
class State:
"""State class to store the state of the program."""
def __init__(self) -> None:
"""Initialize the State class."""
self.key = 0
self.cursor = Cursor()
self.swap_size = 0
self.show_swap_input = False
self.reserve_size = 0
self.show_reserve_input = False
self.selected_device_ids: set[str] = set()
def get_selected_devices(self) -> tuple[str, ...]:
"""Get selected devices."""
return tuple(self.selected_device_ids)
def get_device(raw_device: str) -> dict[str, str]:
"""Get a device.
Args:
raw_device (str): The raw device.
Returns:
dict[str, str]: The device.
"""
raw_device_components = raw_device.split(" ")
return {thing.split("=")[0].lower(): thing.split("=")[1].strip('"') for thing in raw_device_components}
def get_devices() -> list[dict[str, str]]:
"""Get a list of devices."""
# --bytes
raw_devices = bash_wrapper("lsblk --paths --pairs").splitlines()
return [get_device(raw_device) for raw_device in raw_devices]
def set_color() -> None:
"""Set the color."""
curses.start_color()
curses.use_default_colors()
for i in range(curses.COLORS):
curses.init_pair(i + 1, i, -1)
def debug_menu(std_screen: curses.window, key: int) -> None:
"""Debug menu.
Args:
std_screen (curses.window): The curses window.
key (int): The key.
"""
height, width = std_screen.getmaxyx()
std_screen.addstr(height - 4, 0, f"Width: {width}, Height: {height}", curses.color_pair(5))
key_pressed = f"Last key pressed: {key}"[: width - 1]
if key == 0:
key_pressed = "No key press detected..."[: width - 1]
std_screen.addstr(height - 3, 0, key_pressed)
for i in range(8):
std_screen.addstr(height - 2, i * 3, f"{i}██", curses.color_pair(i))
def get_text_input(std_screen: curses.window, prompt: str, y: int, x: int) -> str:
"""Get text input.
Args:
std_screen (curses.window): The curses window.
prompt (str): The prompt.
y (int): The y position.
x (int): The x position.
Returns:
str: The input string.
"""
esc_key = 27
curses.echo()
std_screen.addstr(y, x, prompt)
input_str = ""
while True:
key = std_screen.getch()
if key == ord("\n"):
break
if key == esc_key:
input_str = ""
break
if key in (curses.KEY_BACKSPACE, ord("\b"), 127):
input_str = input_str[:-1]
std_screen.addstr(y, x + len(prompt), input_str + " ")
else:
input_str += chr(key)
std_screen.refresh()
curses.noecho()
return input_str
def swap_size_input(
std_screen: curses.window,
state: State,
swap_offset: int,
) -> State:
"""Reserve size input.
Args:
std_screen (curses.window): The curses window.
state (State): The state object.
swap_offset (int): The swap offset.
Returns:
State: The updated state object.
"""
swap_size_text = "Swap size (GB): "
std_screen.addstr(swap_offset, 0, f"{swap_size_text}{state.swap_size}")
if state.key == ord("\n") and state.cursor.get_y() == swap_offset:
state.show_swap_input = True
if state.show_swap_input:
swap_size_str = get_text_input(std_screen, swap_size_text, swap_offset, 0)
try:
state.swap_size = int(swap_size_str)
state.show_swap_input = False
except ValueError:
std_screen.addstr(swap_offset, 0, "Invalid input. Press any key to continue.")
std_screen.getch()
state.show_swap_input = False
return state
def reserve_size_input(
std_screen: curses.window,
state: State,
reserve_offset: int,
) -> State:
"""Reserve size input.
Args:
std_screen (curses.window): The curses window.
state (State): The state object.
reserve_offset (int): The reserve offset.
Returns:
State: The updated state object.
"""
reserve_size_text = "reserve size (GB): "
std_screen.addstr(reserve_offset, 0, f"{reserve_size_text}{state.reserve_size}")
if state.key == ord("\n") and state.cursor.get_y() == reserve_offset:
state.show_reserve_input = True
if state.show_reserve_input:
reserve_size_str = get_text_input(std_screen, reserve_size_text, reserve_offset, 0)
try:
state.reserve_size = int(reserve_size_str)
state.show_reserve_input = False
except ValueError:
std_screen.addstr(reserve_offset, 0, "Invalid input. Press any key to continue.")
std_screen.getch()
state.show_reserve_input = False
return state
def status_bar(
std_screen: curses.window,
cursor: Cursor,
width: int,
height: int,
) -> None:
"""Draw the status bar.
Args:
std_screen (curses.window): The curses window.
cursor (Cursor): The cursor.
width (int): The width.
height (int): The height.
"""
std_screen.attron(curses.A_REVERSE)
std_screen.attron(curses.color_pair(3))
status_bar = f"Press 'q' to exit | STATUS BAR | Pos: {cursor.get_x()}, {cursor.get_y()}"
std_screen.addstr(height - 1, 0, status_bar)
std_screen.addstr(height - 1, len(status_bar), " " * (width - len(status_bar) - 1))
std_screen.attroff(curses.color_pair(3))
std_screen.attroff(curses.A_REVERSE)
def get_device_id_mapping() -> dict[str, set[str]]:
"""Get a list of device ids.
Returns:
list[str]: the list of device ids
"""
device_ids = bash_wrapper("find /dev/disk/by-id -type l").splitlines()
device_id_mapping: dict[str, set[str]] = defaultdict(set)
for device_id in device_ids:
device = bash_wrapper(f"readlink -f {device_id}").strip()
device_id_mapping[device].add(device_id)
return device_id_mapping
def calculate_device_menu_padding(devices: list[dict[str, str]], column: str, padding: int = 0) -> int:
"""Calculate the device menu padding.
Args:
devices (list[dict[str, str]]): The devices.
column (str): The column.
padding (int, optional): The padding. Defaults to 0.
Returns:
int: The calculated padding.
"""
return max(len(device[column]) for device in devices) + padding
def draw_device_ids(
state: State,
row_number: int,
menu_start_x: int,
std_screen: curses.window,
menu_width: list[int],
device_ids: set[str],
) -> tuple[State, int]:
"""Draw device IDs.
Args:
state (State): The state object.
row_number (int): The row number.
menu_start_x (int): The menu start x.
std_screen (curses.window): The curses window.
menu_width (list[int]): The menu width.
device_ids (set[str]): The device IDs.
Returns:
tuple[State, int]: The updated state object and the row number.
"""
for device_id in sorted(device_ids):
row_number = row_number + 1
if row_number == state.cursor.get_y() and state.cursor.get_x() in menu_width:
std_screen.attron(curses.A_BOLD)
if state.key == ord(" "):
if device_id not in state.selected_device_ids:
state.selected_device_ids.add(device_id)
else:
state.selected_device_ids.remove(device_id)
if device_id in state.selected_device_ids:
std_screen.attron(curses.color_pair(7))
std_screen.addstr(row_number, menu_start_x, f" {device_id}")
std_screen.attroff(curses.color_pair(7))
std_screen.attroff(curses.A_BOLD)
return state, row_number
def draw_device_menu(
std_screen: curses.window,
devices: list[dict[str, str]],
device_id_mapping: dict[str, set[str]],
state: State,
menu_start_y: int = 0,
menu_start_x: int = 0,
) -> tuple[State, int]:
"""Draw the device menu and handle user input.
Args:
std_screen (curses.window): the curses window to draw on
devices (list[dict[str, str]]): the list of devices to draw
device_id_mapping (dict[str, set[str]]): the list of device ids to draw
state (State): the state object to update
menu_start_y (int, optional): the y position to start drawing the menu. Defaults to 0.
menu_start_x (int, optional): the x position to start drawing the menu. Defaults to 0.
Returns:
State: the updated state object
"""
padding = 2
name_padding = calculate_device_menu_padding(devices, "name", padding)
size_padding = calculate_device_menu_padding(devices, "size", padding)
type_padding = calculate_device_menu_padding(devices, "type", padding)
mountpoints_padding = calculate_device_menu_padding(devices, "mountpoints", padding)
device_header = (
f"{'Name':{name_padding}}{'Size':{size_padding}}{'Type':{type_padding}}{'Mountpoints':{mountpoints_padding}}"
)
menu_width = list(range(menu_start_x, len(device_header) + menu_start_x))
std_screen.addstr(menu_start_y, menu_start_x, device_header, curses.color_pair(5))
devises_list_start = menu_start_y + 1
row_number = devises_list_start
for device in devices:
row_number = row_number + 1
device_name = device["name"]
device_row = (
f"{device_name:{name_padding}}"
f"{device['size']:{size_padding}}"
f"{device['type']:{type_padding}}"
f"{device['mountpoints']:{mountpoints_padding}}"
)
std_screen.addstr(row_number, menu_start_x, device_row)
state, row_number = draw_device_ids(
state=state,
row_number=row_number,
menu_start_x=menu_start_x,
std_screen=std_screen,
menu_width=menu_width,
device_ids=device_id_mapping[device_name],
)
return state, row_number
def draw_menu(std_screen: curses.window) -> State:
"""Draw the menu and handle user input.
Args:
std_screen (curses.window): the curses window to draw on
Returns:
State: the state object
"""
# Clear and refresh the screen for a blank canvas
std_screen.clear()
std_screen.refresh()
set_color()
state = State()
devices = get_devices()
device_id_mapping = get_device_id_mapping()
# Loop where k is the last character pressed
while state.key != ord("q"):
std_screen.clear()
height, width = std_screen.getmaxyx()
state.cursor.set_height(height)
state.cursor.set_width(width)
state.cursor.navigation(state.key)
state, device_menu_size = draw_device_menu(
std_screen=std_screen,
state=state,
devices=devices,
device_id_mapping=device_id_mapping,
)
swap_offset = device_menu_size + 2
swap_size_input(
std_screen=std_screen,
state=state,
swap_offset=swap_offset,
)
reserve_size_input(
std_screen=std_screen,
state=state,
reserve_offset=swap_offset + 1,
)
status_bar(std_screen, state.cursor, width, height)
debug_menu(std_screen, state.key)
std_screen.move(state.cursor.get_y(), state.cursor.get_x())
std_screen.refresh()
state.key = std_screen.getch()
return state

View File

@@ -1,155 +0,0 @@
"""Thing."""
from __future__ import annotations
import logging
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from dataclasses import dataclass
from multiprocessing import cpu_count
from typing import TYPE_CHECKING, Any, Literal, TypeVar
if TYPE_CHECKING:
from collections.abc import Callable, Mapping, Sequence
logger = logging.getLogger(__name__)
R = TypeVar("R")
modes = Literal["normal", "early_error"]
@dataclass
class ExecutorResults[R]:
"""Dataclass to store the results and exceptions of the parallel execution."""
results: list[R]
exceptions: list[BaseException]
def __repr__(self) -> str:
"""Return a string representation of the object."""
return f"results={self.results} exceptions={self.exceptions}"
def _parallelize_base[R](
executor_type: type[ThreadPoolExecutor | ProcessPoolExecutor],
func: Callable[..., R],
kwargs_list: Sequence[Mapping[str, Any]],
max_workers: int | None,
progress_tracker: int | None,
mode: modes,
) -> ExecutorResults:
total_work = len(kwargs_list)
with executor_type(max_workers=max_workers) as executor:
futures = [executor.submit(func, **kwarg) for kwarg in kwargs_list]
results = []
exceptions = []
for index, future in enumerate(futures, 1):
if exception := future.exception():
logger.error(f"{future} raised {exception.__class__.__name__}")
exceptions.append(exception)
if mode == "early_error":
executor.shutdown(wait=False)
raise exception
continue
results.append(future.result())
if progress_tracker and index % progress_tracker == 0:
logger.info(f"Progress: {index}/{total_work}")
return ExecutorResults(results, exceptions)
def parallelize_thread[R](
func: Callable[..., R],
kwargs_list: Sequence[Mapping[str, Any]],
max_workers: int | None = None,
progress_tracker: int | None = None,
mode: modes = "normal",
) -> ExecutorResults:
"""Generic function to run a function with multiple arguments in threads.
Args:
func (Callable[..., R]): Function to run in threads.
kwargs_list (Sequence[Mapping[str, Any]]): List of dictionaries with the arguments for the function.
max_workers (int, optional): Number of workers to use. Defaults to 8.
progress_tracker (int, optional): Number of tasks to complete before logging progress.
mode (modes, optional): Mode to use. Defaults to "normal".
Returns:
tuple[list[R], list[Exception]]: List with the results and a list with the exceptions.
"""
return _parallelize_base(
executor_type=ThreadPoolExecutor,
func=func,
kwargs_list=kwargs_list,
max_workers=max_workers,
progress_tracker=progress_tracker,
mode=mode,
)
def parallelize_process[R](
func: Callable[..., R],
kwargs_list: Sequence[Mapping[str, Any]],
max_workers: int | None = None,
progress_tracker: int | None = None,
mode: modes = "normal",
) -> ExecutorResults:
"""Generic function to run a function with multiple arguments in process.
Args:
func (Callable[..., R]): Function to run in process.
kwargs_list (Sequence[Mapping[str, Any]]): List of dictionaries with the arguments for the function.
max_workers (int, optional): Number of workers to use. Defaults to 4.
progress_tracker (int, optional): Number of tasks to complete before logging progress.
mode (modes, optional): Mode to use. Defaults to "normal".
Returns:
tuple[list[R], list[Exception]]: List with the results and a list with the exceptions.
"""
if max_workers and max_workers > cpu_count():
error = f"max_workers must be less than or equal to {cpu_count()}"
raise RuntimeError(error)
return process_executor_unchecked(
func=func,
kwargs_list=kwargs_list,
max_workers=max_workers,
progress_tracker=progress_tracker,
mode=mode,
)
def process_executor_unchecked[R](
func: Callable[..., R],
kwargs_list: Sequence[Mapping[str, Any]],
max_workers: int | None,
progress_tracker: int | None,
mode: modes = "normal",
) -> ExecutorResults:
"""Generic function to run a function with multiple arguments in parallel.
Note: this function does not check if the number of workers is greater than the number of CPUs.
This can cause the system to become unresponsive.
Args:
func (Callable[..., R]): Function to run in parallel.
kwargs_list (Sequence[Mapping[str, Any]]): List of dictionaries with the arguments for the function.
max_workers (int, optional): Number of workers to use. Defaults to 8.
progress_tracker (int, optional): Number of tasks to complete before logging progress.
mode (modes, optional): Mode to use. Defaults to "normal".
Returns:
tuple[list[R], list[Exception]]: List with the results and a list with the exceptions.
"""
return _parallelize_base(
executor_type=ProcessPoolExecutor,
func=func,
kwargs_list=kwargs_list,
max_workers=max_workers,
progress_tracker=progress_tracker,
mode=mode,
)

View File

@@ -1 +0,0 @@
"""init."""

View File

@@ -1,40 +0,0 @@
"""capasitor."""
def calculate_capacitor_capacity(voltage: float, farads: float) -> float:
"""Calculate capacitor capacity."""
joules = (farads * voltage**2) // 2
return joules // 3600
def calculate_pack_capacity(cells: int, cell_voltage: float, farads: float) -> float:
"""Calculate pack capacity."""
return calculate_capacitor_capacity(cells * cell_voltage, farads / cells)
def calculate_pack_capacity2(cells: int, cell_voltage: float, farads: float, cell_cost: float) -> tuple[float, float]:
"""Calculate pack capacity."""
capacitor_capacity = calculate_capacitor_capacity(cells * cell_voltage, farads / cells)
return capacitor_capacity, cell_cost * cells
def main() -> None:
"""Main."""
watt_hours = calculate_pack_capacity(cells=10, cell_voltage=2.7, farads=500)
print(f"{watt_hours=}")
print(f"{watt_hours*16=}")
watt_hours = calculate_pack_capacity(cells=1, cell_voltage=2.7, farads=5000)
print(f"{watt_hours=}")
watt_hours, cost = calculate_pack_capacity2(
cells=10,
cell_voltage=2.7,
farads=3000,
cell_cost=11.60,
)
print(f"{watt_hours=}")
print(f"{cost=}")
if __name__ == "__main__":
main()

View File

@@ -1,25 +0,0 @@
"""thing."""
def caculat_batry_specs(
cell_amp_hour: int,
cell_voltage: float,
cells_per_pack: int,
packs: int,
) -> tuple[float, float]:
"""Caculat battry specs."""
pack_voltage = cell_voltage * cells_per_pack
pack_watt_hours = pack_voltage * cell_amp_hour
battry_capacity = pack_watt_hours * packs
return (
battry_capacity,
pack_voltage,
)
battry_capacity, pack_voltage = caculat_batry_specs(300, 3.2, 8, 2)
print(f"{battry_capacity=} {pack_voltage=}")
cost = 1700
print(f"$/kWh {cost / battry_capacity}")

View File

@@ -1,196 +0,0 @@
"""voltage_drop."""
import math
from enum import Enum
class TemperatureUnit(Enum):
"""Temperature unit."""
CELSIUS = "c"
FAHRENHEIT = "f"
KELVIN = "k"
class Temperature:
"""Temperature."""
def __init__(
self,
temperature: float,
unit: TemperatureUnit = TemperatureUnit.CELSIUS,
) -> None:
"""__init__."""
unit_modifier = {
TemperatureUnit.CELSIUS: 1,
TemperatureUnit.FAHRENHEIT: 0.5556,
TemperatureUnit.KELVIN: 1.8,
}
self.temperature = temperature * unit_modifier[unit]
def __float__(self) -> float:
"""Return the temperature in degrees Celsius."""
return self.temperature
class LengthUnit(Enum):
"""Length unit."""
METERS = "m"
FEET = "ft"
INCHES = "in"
class Length:
"""Length."""
def __init__(self, length: float, unit: LengthUnit) -> None:
"""__init__."""
self.meters = self._convert_to_meters(length, unit)
def _convert_to_meters(self, length: float, unit: LengthUnit) -> float:
thing = {
LengthUnit.METERS: 1,
LengthUnit.FEET: 0.3048,
LengthUnit.INCHES: 0.0254,
}
test = thing.get(unit)
if test:
return length * test
error = f"Unsupported unit: {unit}"
raise ValueError(error)
def __float__(self) -> float:
"""Return the length in meters."""
return self.meters
def feet(self) -> float:
"""Return the length in feet."""
return self.meters * 3.2808
class MaterialType(Enum):
"""Material type."""
COPPER = "copper"
ALUMINUM = "aluminum"
CCA = "cca"
SILVER = "silver"
GOLD = "gold"
def get_material_resistivity(
material: MaterialType,
temperature: Temperature | None = None,
) -> float:
"""Get the resistivity of a material."""
if not temperature:
Temperature(20.0)
material_info = {
MaterialType.COPPER: (1.724e-8, 0.00393),
MaterialType.ALUMINUM: (2.908e-8, 0.00403),
MaterialType.CCA: (2.577e-8, 0.00397),
MaterialType.SILVER: (1.632e-8, 0.00380),
MaterialType.GOLD: (2.503e-8, 0.00340),
}
base_resistivity, temp_coefficient = material_info[material]
return base_resistivity * (1 + temp_coefficient * float(temperature))
def calculate_awg_diameter_mm(gauge: int) -> float:
"""Calculate wire diameter in millimeters for a given AWG gauge."""
return round(0.127 * 92 ** ((36 - gauge) / 39), 3)
def calculate_wire_area_m2(gauge: int) -> float:
"""Calculate the area of a wire in square meters.
Args:
gauge (int): The AWG (American Wire Gauge) number of the wire
Returns:
float: The area of the wire in square meters
"""
return math.pi * (calculate_awg_diameter_mm(gauge) / 2000) ** 2
def calculate_resistance_per_meter(gauge: int) -> float:
"""Calculate the resistance per meter of a wire.
Args:
gauge (int): The AWG (American Wire Gauge) number of the wire
Returns:
float: The resistance per meter of the wire
"""
return get_material_resistivity(MaterialType.COPPER) / calculate_wire_area_m2(gauge)
def voltage_drop(
gauge: int,
material: MaterialType,
length: Length,
current_a: float,
) -> float:
"""Calculate the voltage drop of a wire.
Args:
gauge (int): The AWG (American Wire Gauge) number of the wire
material (MaterialType): The type of conductor material (e.g., copper, aluminum)
length (Length): The length of the wire in meters
current_a (float): The current flowing through the wire in amperes
Returns:
float: The voltage drop of the wire in volts
"""
resistivity = get_material_resistivity(material)
resistance_per_meter = resistivity / calculate_wire_area_m2(gauge)
total_resistance = resistance_per_meter * float(length) * 2 # round-trip
return total_resistance * current_a
print(
voltage_drop(
gauge=10,
material=MaterialType.CCA,
length=Length(length=20, unit=LengthUnit.FEET),
current_a=20,
)
)
def max_wire_length(
gauge: int,
material: MaterialType,
current_amps: float,
voltage_drop: float = 0.3,
temperature: Temperature | None = None,
) -> Length:
"""Calculate the maximum allowable wire length based on voltage drop criteria.
Args:
gauge (int): The AWG (American Wire Gauge) number of the wire
material (MaterialType): The type of conductor material (e.g., copper, aluminum)
current_amps (float): The current flowing through the wire in amperes
voltage_drop (float, optional): Maximum allowable voltage drop as a decimal (default 0.1 or 10%)
temperature (Temperature | None, optional): The temperature of the wire. Defaults to None.
Returns:
float: Maximum wire length in meters that maintains the specified voltage drop
"""
if not temperature:
Temperature(100.0, unit=TemperatureUnit.FAHRENHEIT)
resistivity = get_material_resistivity(material, temperature)
resistance_per_meter = resistivity / calculate_wire_area_m2(gauge)
# V = IR, solve for length where V is the allowed voltage drop
return Length(
voltage_drop / (current_amps * resistance_per_meter),
LengthUnit.METERS,
)
print(max_wire_length(gauge=10, material=MaterialType.CCA, current_amps=20).feet())
print(max_wire_length(gauge=10, material=MaterialType.CCA, current_amps=10).feet())
print(max_wire_length(gauge=10, material=MaterialType.CCA, current_amps=5).feet())

View File

@@ -1 +0,0 @@
"""system_tests."""

View File

@@ -1,99 +0,0 @@
"""Validate Jeeves."""
from __future__ import annotations
import logging
from copy import copy
from re import search
from time import sleep
from typing import TYPE_CHECKING
from python.common import bash_wrapper
from python.zfs import Zpool
if TYPE_CHECKING:
from collections.abc import Sequence
logger = logging.getLogger(__name__)
def zpool_tests(pool_names: Sequence[str], zpool_capacity_threshold: int = 90) -> list[str] | None:
"""Check the zpool health and capacity.
Args:
pool_names (Sequence[str]): A list of pool names to test.
zpool_capacity_threshold (int, optional): The threshold for the zpool capacity. Defaults to 90.
Returns:
list[str] | None: A list of errors if any.
"""
logger.info("Testing zpool")
errors: list[str] = []
for pool_name in pool_names:
pool = Zpool(pool_name)
if pool.health != "ONLINE":
errors.append(f"{pool.name} is {pool.health}")
if pool.capacity >= zpool_capacity_threshold:
errors.append(f"{pool.name} is low on space")
upgrade_status, _ = bash_wrapper("zpool upgrade")
if not search(r"Every feature flags pool has all supported and requested features enabled.", upgrade_status):
errors.append("ZPool out of date run `sudo zpool upgrade -a`")
return errors
def systemd_tests(
service_names: Sequence[str],
max_retries: int = 30,
retry_delay_secs: int = 1,
retryable_statuses: Sequence[str] | None = None,
valid_statuses: Sequence[str] | None = None,
) -> list[str] | None:
"""Tests a systemd services.
Args:
service_names (Sequence[str]): A list of service names to test.
max_retries (int, optional): The maximum number of retries. Defaults to 30.
minimum value is 1.
retry_delay_secs (int, optional): The delay between retries in seconds. Defaults to 1.
minimum value is 1.
retryable_statuses (Sequence[str] | None, optional): A list of retryable statuses. Defaults to None.
valid_statuses (Sequence[str] | None, optional): A list of valid statuses. Defaults to None.
Returns:
list[str] | None: A list of errors if any.
"""
logger.info("Testing systemd service")
max_retries = max(max_retries, 1)
retry_delay_secs = max(retry_delay_secs, 1)
last_try = max_retries - 1
if retryable_statuses is None:
retryable_statuses = ("inactive\n", "activating\n")
if valid_statuses is None:
valid_statuses = ("active\n",)
service_names_set = set(service_names)
errors: set[str] = set()
for retry in range(max_retries):
if not service_names_set:
break
logger.info(f"Testing systemd service in {retry + 1} of {max_retries}")
service_names_to_test = copy(service_names_set)
for service_name in service_names_to_test:
service_status, _ = bash_wrapper(f"systemctl is-active {service_name}")
if service_status in valid_statuses:
service_names_set.remove(service_name)
continue
if service_status in retryable_statuses and retry < last_try:
continue
errors.add(f"{service_name} is {service_status.strip()}")
sleep(retry_delay_secs)
return list(errors)

View File

@@ -1,66 +0,0 @@
"""Validate {server_name}."""
import logging
import sys
import tomllib
from os import environ
from pathlib import Path
from socket import gethostname
import typer
from python.common import configure_logger, signal_alert
from python.system_tests.components import systemd_tests, zpool_tests
logger = logging.getLogger(__name__)
def load_config_data(config_file: Path) -> dict[str, list[str]]:
"""Load a TOML configuration file.
Args:
config_file (Path): The path to the configuration file.
Returns:
dict: The configuration data.
"""
return tomllib.loads(config_file.read_text())
def main(config_file: Path) -> None:
"""Main."""
configure_logger(level=environ.get("LOG_LEVEL", "INFO"))
server_name = gethostname()
logger.info(f"Starting {server_name} validation")
config_data = load_config_data(config_file)
errors: list[str] = []
try:
if config_data.get("zpools") and (zpool_errors := zpool_tests(config_data["zpools"])):
errors.extend(zpool_errors)
if config_data.get("services") and (systemd_errors := systemd_tests(config_data["services"])):
errors.extend(systemd_errors)
except Exception as error:
logger.exception(f"{server_name} validation failed")
errors.append(f"{server_name} validation failed: {error}")
if errors:
logger.error(f"{server_name} validation failed: \n{'\n'.join(errors)}")
signal_alert(f"{server_name} validation failed {errors}")
sys.exit(1)
logger.info(f"{server_name} validation passed")
def cli() -> None:
"""CLI."""
typer.run(main)
if __name__ == "__main__":
cli()

View File

@@ -1 +0,0 @@
"""init."""

View File

@@ -1 +0,0 @@
"""init."""

View File

@@ -1,11 +0,0 @@
"""Bar."""
import logging
logger = logging.getLogger(__name__)
def bar() -> None:
"""Bar."""
logger.debug(f"bar {__name__}")
logger.debug("bar")

View File

@@ -1,20 +0,0 @@
"""configure_logger."""
import logging
import sys
def configure_logger(level: str = "INFO", test: str | None = None) -> None:
"""Configure the logger.
Args:
level (str, optional): The logging level. Defaults to "INFO".
test (str | None, optional): The test name. Defaults to None.
"""
logging.basicConfig(
level=level,
datefmt="%Y-%m-%dT%H:%M:%S%z",
format="%(asctime)s %(levelname)s %(filename)s:%(lineno)d - %(message)s" # this is nesiseary
f" {test}",
handlers=[logging.StreamHandler(sys.stdout)],
)

View File

@@ -1,17 +0,0 @@
"""foo."""
import logging
from python.testing.logging.bar import bar
from python.testing.logging.configure_logger import configure_logger
logger = logging.getLogger(__name__)
def foo() -> None:
"""Foo."""
configure_logger("DEBUG", "FOO")
logger.debug(f"foo {__name__}")
logger.debug("foo")
bar()

View File

@@ -1,33 +0,0 @@
"""main."""
import logging
from python.testing.logging.bar import bar
from python.testing.logging.configure_logger import configure_logger
from python.testing.logging.foo import foo
logger = logging.getLogger(__name__)
def main() -> None:
"""Main."""
configure_logger("DEBUG")
# handler = logging.StreamHandler()
# Create and attach a formatter
# formatter = logging.Formatter(
# "%(asctime)s %(levelname)s %(filename)s:%(lineno)d - %(message)s FOO"
# )
# handler.setFormatter(formatter)
# Attach handler to logger
# foo_logger = logging.getLogger("python.testing.logging.foo")
# foo_logger.addHandler(handler)
# foo_logger.propagate = True
logger.debug("main")
foo()
bar()
if __name__ == "__main__":
main()

View File

@@ -1 +0,0 @@
"""Server Tools."""

View File

@@ -1,144 +0,0 @@
"""snapshot_manager."""
from __future__ import annotations
import logging
import sys
import tomllib
from functools import cache
from pathlib import Path # noqa: TC003 This is required for the typer CLI
from re import compile as re_compile
from re import search
import typer
from python.common import configure_logger, signal_alert, utcnow
from python.zfs import Dataset, get_datasets
logger = logging.getLogger(__name__)
def main(config_file: Path) -> None:
"""Main."""
configure_logger(level="DEBUG")
logger.info("Starting snapshot_manager")
try:
time_stamp = get_time_stamp()
for dataset in get_datasets():
status = dataset.create_snapshot(time_stamp)
logger.debug(f"{status=}")
if status != "snapshot created":
msg = f"{dataset.name} failed to create snapshot {time_stamp}"
logger.error(msg)
signal_alert(msg)
continue
get_snapshots_to_delete(dataset, get_count_lookup(config_file, dataset.name))
except Exception:
logger.exception("snapshot_manager failed")
signal_alert("snapshot_manager failed")
sys.exit(1)
else:
logger.info("snapshot_manager completed")
def get_count_lookup(config_file: Path, dataset_name: str) -> dict[str, int]:
"""Get the count lookup.
Args:
config_file (Path): The path to the configuration file.
dataset_name (str): The name of the dataset.
Returns:
dict[str, int]: The count lookup.
"""
config_data = load_config_data(config_file)
return config_data.get(dataset_name, get_default_config(config_data))
def get_default_config(config_data: dict[str, dict[str, int]]) -> dict[str, int]:
"""Get the default configuration.
Args:
config_data (dict[str, dict[str, int]]): The configuration data.
Returns:
dict[str, int]: The default configuration.
"""
return config_data.get(
"default",
{"15_min": 4, "hourly": 12, "daily": 0, "monthly": 0},
)
@cache
def load_config_data(config_file: Path) -> dict[str, dict[str, int]]:
"""Load a TOML configuration file.
Args:
config_file (Path): The path to the configuration file.
Returns:
dict: The configuration data.
"""
return tomllib.loads(config_file.read_text())
def get_snapshots_to_delete(
dataset: Dataset,
count_lookup: dict[str, int],
) -> None:
"""Get snapshots to delete.
Args:
dataset (Dataset): the dataset
count_lookup (dict[str, int]): the count lookup
"""
snapshots = dataset.get_snapshots()
if not snapshots:
logger.info(f"{dataset.name} has no snapshots")
return
filters = (
("15_min", re_compile(r"auto_\d{10}(?:15|30|45)")),
("hourly", re_compile(r"auto_\d{8}(?!00)\d{2}00")),
("daily", re_compile(r"auto_\d{6}(?!01)\d{2}0000")),
("monthly", re_compile(r"auto_\d{6}010000")),
)
for filter_name, snapshot_filter in filters:
logger.debug(f"{filter_name=}\n{snapshot_filter=}")
filtered_snapshots = sorted(snapshot.name for snapshot in snapshots if search(snapshot_filter, snapshot.name))
logger.debug(f"{filtered_snapshots=}")
snapshots_wanted = count_lookup[filter_name]
snapshots_being_deleted = filtered_snapshots[:-snapshots_wanted] if snapshots_wanted > 0 else filtered_snapshots
logger.info(f"{snapshots_being_deleted} are being deleted")
for snapshot in snapshots_being_deleted:
if error := dataset.delete_snapshot(snapshot):
error_message = f"{dataset.name}@{snapshot} failed to delete: {error}"
signal_alert(error_message)
logger.error(error_message)
def get_time_stamp() -> str:
"""Get the time stamp."""
now = utcnow()
nearest_15_min = now.replace(minute=(now.minute - (now.minute % 15)))
return nearest_15_min.strftime("auto_%Y%m%d%H%M")
def cli() -> None:
"""CLI."""
typer.run(main)
if __name__ == "__main__":
cli()

View File

@@ -1,11 +0,0 @@
"""init."""
from python.zfs.dataset import Dataset, Snapshot, get_datasets
from python.zfs.zpool import Zpool
__all__ = [
"Dataset",
"Snapshot",
"Zpool",
"get_datasets",
]

View File

@@ -1,214 +0,0 @@
"""dataset."""
from __future__ import annotations
import json
import logging
from datetime import UTC, datetime
from typing import Any
from python.common import bash_wrapper
logger = logging.getLogger(__name__)
def _zfs_list(zfs_list: str) -> dict[str, Any]:
"""Check the version of zfs."""
raw_zfs_list_data, _ = bash_wrapper(zfs_list)
zfs_list_data = json.loads(raw_zfs_list_data)
vers_major = zfs_list_data["output_version"]["vers_major"]
vers_minor = zfs_list_data["output_version"]["vers_minor"]
command = zfs_list_data["output_version"]["command"]
if vers_major != 0 or vers_minor != 1 or command != "zfs list":
error = f"Datasets are not in the correct format {vers_major=} {vers_minor=} {command=}"
raise RuntimeError(error)
return zfs_list_data
class Snapshot:
"""Snapshot."""
def __init__(self, snapshot_data: dict[str, Any]) -> None:
"""__init__."""
properties = snapshot_data["properties"]
self.createtxg = int(snapshot_data["createtxg"])
self.creation = datetime.fromtimestamp(int(properties["creation"]["value"]), tz=UTC)
self.defer_destroy = properties["defer_destroy"]["value"]
self.guid = int(properties["guid"]["value"])
self.name = snapshot_data["name"].split("@")[1]
self.objsetid = int(properties["objsetid"]["value"])
self.referenced = int(properties["referenced"]["value"])
self.used = int(properties["used"]["value"])
self.userrefs = int(properties["userrefs"]["value"])
self.version = int(properties["version"]["value"])
self.written = int(properties["written"]["value"])
def __repr__(self) -> str:
"""__repr__."""
return f"name={self.name} used={self.used} refer={self.referenced}"
class Dataset:
"""Dataset."""
def __init__(self, name: str) -> None:
"""__init__."""
dataset_data = _zfs_list(f"zfs list {name} -pHj -o all")
properties = dataset_data["datasets"][name]["properties"]
self.aclinherit = properties["aclinherit"]["value"]
self.aclmode = properties["aclmode"]["value"]
self.acltype = properties["acltype"]["value"]
self.available = int(properties["available"]["value"])
self.canmount = properties["canmount"]["value"]
self.checksum = properties["checksum"]["value"]
self.clones = properties["clones"]["value"]
self.compression = properties["compression"]["value"]
self.copies = int(properties["copies"]["value"])
self.createtxg = int(properties["createtxg"]["value"])
self.creation = datetime.fromtimestamp(int(properties["creation"]["value"]), tz=UTC)
self.dedup = properties["dedup"]["value"]
self.devices = properties["devices"]["value"]
self.encryption = properties["encryption"]["value"]
self.exec = properties["exec"]["value"]
self.filesystem_limit = properties["filesystem_limit"]["value"]
self.guid = int(properties["guid"]["value"])
self.keystatus = properties["keystatus"]["value"]
self.logbias = properties["logbias"]["value"]
self.mlslabel = properties["mlslabel"]["value"]
self.mounted = properties["mounted"]["value"]
self.mountpoint = properties["mountpoint"]["value"]
self.name = name
self.quota = int(properties["quota"]["value"])
self.readonly = properties["readonly"]["value"]
self.recordsize = int(properties["recordsize"]["value"])
self.redundant_metadata = properties["redundant_metadata"]["value"]
self.referenced = int(properties["referenced"]["value"])
self.refquota = int(properties["refquota"]["value"])
self.refreservation = int(properties["refreservation"]["value"])
self.reservation = int(properties["reservation"]["value"])
self.setuid = properties["setuid"]["value"]
self.sharenfs = properties["sharenfs"]["value"]
self.snapdir = properties["snapdir"]["value"]
self.snapshot_limit = properties["snapshot_limit"]["value"]
self.sync = properties["sync"]["value"]
self.used = int(properties["used"]["value"])
self.usedbychildren = int(properties["usedbychildren"]["value"])
self.usedbydataset = int(properties["usedbydataset"]["value"])
self.usedbysnapshots = int(properties["usedbysnapshots"]["value"])
self.version = int(properties["version"]["value"])
self.volmode = properties["volmode"]["value"]
self.volsize = properties["volsize"]["value"]
self.vscan = properties["vscan"]["value"]
self.written = int(properties["written"]["value"])
self.xattr = properties["xattr"]["value"]
def get_snapshots(self) -> list[Snapshot] | None:
"""Get all snapshots from zfs and process then is test dicts of sets."""
snapshots_data = _zfs_list(f"zfs list -t snapshot -pHj {self.name} -o all")
return [Snapshot(properties) for properties in snapshots_data["datasets"].values()]
def create_snapshot(self, snapshot_name: str) -> str:
"""Creates a zfs snapshot.
Args:
snapshot_name (str): a snapshot name
"""
logger.debug(f"Creating {self.name}@{snapshot_name}")
_, return_code = bash_wrapper(f"zfs snapshot {self.name}@{snapshot_name}")
if return_code == 0:
return "snapshot created"
if snapshots := self.get_snapshots():
snapshot_names = {snapshot.name for snapshot in snapshots}
if snapshot_name in snapshot_names:
return f"Snapshot {snapshot_name} already exists for {self.name}"
return f"Failed to create snapshot {snapshot_name} for {self.name}"
def delete_snapshot(self, snapshot_name: str) -> str | None:
"""Deletes a zfs snapshot.
Args:
snapshot_name (str): a snapshot name
"""
logger.debug(f"deleting {self.name}@{snapshot_name}")
msg, return_code = bash_wrapper(f"zfs destroy {self.name}@{snapshot_name}")
if return_code != 0:
if msg.startswith(f"cannot destroy '{self.name}@{snapshot_name}': snapshot has dependent clones"):
return "snapshot has dependent clones"
error = f"Failed to delete snapshot {snapshot_name=} for {self.name}"
raise RuntimeError(error)
return None
def __repr__(self) -> str:
"""__repr__."""
return (
f"{self.aclinherit=}\n"
f"{self.aclmode=}\n"
f"{self.acltype=}\n"
f"{self.available=}\n"
f"{self.canmount=}\n"
f"{self.checksum=}\n"
f"{self.clones=}\n"
f"{self.compression=}\n"
f"{self.copies=}\n"
f"{self.createtxg=}\n"
f"{self.creation=}\n"
f"{self.dedup=}\n"
f"{self.devices=}\n"
f"{self.encryption=}\n"
f"{self.exec=}\n"
f"{self.filesystem_limit=}\n"
f"{self.guid=}\n"
f"{self.keystatus=}\n"
f"{self.logbias=}\n"
f"{self.mlslabel=}\n"
f"{self.mounted=}\n"
f"{self.mountpoint=}\n"
f"{self.name=}\n"
f"{self.quota=}\n"
f"{self.readonly=}\n"
f"{self.recordsize=}\n"
f"{self.redundant_metadata=}\n"
f"{self.referenced=}\n"
f"{self.refquota=}\n"
f"{self.refreservation=}\n"
f"{self.reservation=}\n"
f"{self.setuid=}\n"
f"{self.sharenfs=}\n"
f"{self.snapdir=}\n"
f"{self.snapshot_limit=}\n"
f"{self.sync=}\n"
f"{self.used=}\n"
f"{self.usedbychildren=}\n"
f"{self.usedbydataset=}\n"
f"{self.usedbysnapshots=}\n"
f"{self.version=}\n"
f"{self.volmode=}\n"
f"{self.volsize=}\n"
f"{self.vscan=}\n"
f"{self.written=}\n"
f"{self.xattr=}\n"
)
def get_datasets() -> list[Dataset]:
"""Get zfs list.
Returns:
list[Dataset]: A list of zfs datasets.
"""
logger.info("Getting zfs list")
dataset_names, _ = bash_wrapper("zfs list -Hp -t filesystem -o name")
cleaned_datasets = dataset_names.strip().split("\n")
return [Dataset(dataset_name) for dataset_name in cleaned_datasets if "/" in dataset_name]

View File

@@ -1,86 +0,0 @@
"""test."""
from __future__ import annotations
import json
from typing import Any
from python.common import bash_wrapper
def _zpool_list(zfs_list: str) -> dict[str, Any]:
"""Check the version of zfs."""
raw_zfs_list_data, _ = bash_wrapper(zfs_list)
zfs_list_data = json.loads(raw_zfs_list_data)
vers_major = zfs_list_data["output_version"]["vers_major"]
vers_minor = zfs_list_data["output_version"]["vers_minor"]
command = zfs_list_data["output_version"]["command"]
if vers_major != 0 or vers_minor != 1 or command != "zpool list":
error = f"Datasets are not in the correct format {vers_major=} {vers_minor=} {command=}"
raise RuntimeError(error)
return zfs_list_data
class Zpool:
"""Zpool."""
def __init__(
self,
name: str,
) -> None:
"""__init__."""
zpool_data = _zpool_list(f"zpool list {name} -pHj -o all")
properties = zpool_data["pools"][name]["properties"]
self.name = name
self.allocated = int(properties["allocated"]["value"])
self.altroot = properties["altroot"]["value"]
self.ashift = int(properties["ashift"]["value"])
self.autoexpand = properties["autoexpand"]["value"]
self.autoreplace = properties["autoreplace"]["value"]
self.autotrim = properties["autotrim"]["value"]
self.capacity = int(properties["capacity"]["value"])
self.comment = properties["comment"]["value"]
self.dedupratio = properties["dedupratio"]["value"]
self.delegation = properties["delegation"]["value"]
self.expandsize = properties["expandsize"]["value"]
self.failmode = properties["failmode"]["value"]
self.fragmentation = int(properties["fragmentation"]["value"])
self.free = properties["free"]["value"]
self.freeing = int(properties["freeing"]["value"])
self.guid = int(properties["guid"]["value"])
self.health = properties["health"]["value"]
self.leaked = int(properties["leaked"]["value"])
self.readonly = properties["readonly"]["value"]
self.size = int(properties["size"]["value"])
def __repr__(self) -> str:
"""__repr__."""
return (
f"{self.name=}\n"
f"{self.allocated=}\n"
f"{self.altroot=}\n"
f"{self.ashift=}\n"
f"{self.autoexpand=}\n"
f"{self.autoreplace=}\n"
f"{self.autotrim=}\n"
f"{self.capacity=}\n"
f"{self.comment=}\n"
f"{self.dedupratio=}\n"
f"{self.delegation=}\n"
f"{self.expandsize=}\n"
f"{self.failmode=}\n"
f"{self.fragmentation=}\n"
f"{self.freeing=}\n"
f"{self.guid=}\n"
f"{self.health=}\n"
f"{self.leaked=}\n"
f"{self.readonly=}\n"
f"{self.size=}"
)

View File

@@ -15,6 +15,7 @@
../../common/optional/nvidia.nix ../../common/optional/nvidia.nix
./hardware.nix ./hardware.nix
./syncthing.nix ./syncthing.nix
./games.nix
./llms.nix ./llms.nix
]; ];

View File

@@ -1,7 +1,7 @@
{ pkgs, ... }: { pkgs, ... }:
{ {
environment.systemPackages = with pkgs; [ environment.systemPackages = with pkgs; [
filebot osu-lazer-bin
docker-compose jellyfin-media-player
]; ];
} }

View File

@@ -3,7 +3,6 @@
"dotfiles" = { "dotfiles" = {
path = "/home/richie/dotfiles"; path = "/home/richie/dotfiles";
devices = [ devices = [
"brain"
"jeeves" "jeeves"
"rhapsody-in-green" "rhapsody-in-green"
]; ];
@@ -13,9 +12,8 @@
id = "4ckma-gtshs"; # cspell:disable-line id = "4ckma-gtshs"; # cspell:disable-line
path = "/home/richie/important"; path = "/home/richie/important";
devices = [ devices = [
"brain"
"jeeves"
"phone" "phone"
"jeeves"
"rhapsody-in-green" "rhapsody-in-green"
]; ];
fsWatcherEnabled = true; fsWatcherEnabled = true;

View File

@@ -1,39 +0,0 @@
{ inputs, ... }:
{
imports = [
../../users/richie
../../common/global
../../common/optional/docker.nix
../../common/optional/ssh_decrypt.nix
../../common/optional/syncthing_base.nix
../../common/optional/systemd-boot.nix
../../common/optional/update.nix
../../common/optional/zerotier.nix
./docker
./hardware.nix
./programs.nix
./services
./syncthing.nix
inputs.nixos-hardware.nixosModules.framework-11th-gen-intel
];
networking = {
hostName = "brain";
hostId = "93a06c6e";
firewall.enable = true;
networkmanager.enable = true;
};
hardware.bluetooth = {
enable = true;
powerOnBoot = true;
};
services = {
openssh.ports = [ 129 ];
smartd.enable = true;
};
system.stateVersion = "25.05";
}

View File

@@ -1,11 +0,0 @@
{ lib, ... }:
{
imports =
let
files = builtins.attrNames (builtins.readDir ./.);
nixFiles = builtins.filter (name: lib.hasSuffix ".nix" name && name != "default.nix") files;
in
map (file: ./. + "/${file}") nixFiles;
virtualisation.oci-containers.backend = "docker";
}

View File

@@ -1,3 +0,0 @@
# docker_networks
docker network create -d bridge web

View File

@@ -1,71 +0,0 @@
{
config,
lib,
modulesPath,
...
}:
{
imports = [ (modulesPath + "/installer/scan/not-detected.nix") ];
boot = {
initrd = {
availableKernelModules = [
"ahci"
"ehci_pci"
"nvme"
"sd_mod"
"uas"
"usb_storage"
"usbhid"
"xhci_pci"
];
kernelModules = [ ];
luks.devices."luks-root-pool-nvme-Samsung_SSD_990_PRO_2TB_S7KHNJ0Y121613P-part2" = {
device = "/dev/disk/by-id/nvme-Samsung_SSD_990_PRO_2TB_S7KHNJ0Y121613P-part2";
bypassWorkqueues = true;
allowDiscards = true;
keyFileSize = 4096;
keyFile = "/dev/disk/by-id/usb-USB_SanDisk_3.2Gen1_03021630090925173333-0:0";
fallbackToPassword = true;
};
};
kernelModules = [ "kvm-intel" ];
extraModulePackages = [ ];
};
fileSystems = {
"/" = lib.mkDefault {
device = "root_pool/root";
fsType = "zfs";
};
"/home" = {
device = "root_pool/home";
fsType = "zfs";
};
"/var" = {
device = "root_pool/var";
fsType = "zfs";
};
"/nix" = {
device = "root_pool/nix";
fsType = "zfs";
};
"/boot" = {
device = "/dev/disk/by-uuid/12CE-A600";
fsType = "vfat";
options = [
"fmask=0077"
"dmask=0077"
];
};
};
swapDevices = [ ];
nixpkgs.hostPlatform = lib.mkDefault "x86_64-linux";
hardware.cpu.intel.updateMicrocode = lib.mkDefault config.hardware.enableRedistributableFirmware;
}

View File

@@ -1,9 +0,0 @@
{ lib, ... }:
{
imports =
let
files = builtins.attrNames (builtins.readDir ./.);
nixFiles = builtins.filter (name: lib.hasSuffix ".nix" name && name != "default.nix") files;
in
map (file: ./. + "/${file}") nixFiles;
}

View File

@@ -1,82 +0,0 @@
{
users = {
users.hass = {
isSystemUser = true;
group = "hass";
};
groups.hass = { };
};
services = {
home-assistant = {
enable = true;
openFirewall = true;
config = {
http = {
server_port = 8123;
server_host = [
"192.168.90.35"
"192.168.95.35"
"127.0.0.1"
];
use_x_forwarded_for = true;
};
homeassistant = {
time_zone = "America/New_York";
unit_system = "us_customary";
temperature_unit = "F";
packages = {
victron_modbuss = "!include ${./home_assistant/victron_modbuss.yaml}";
battery_sensors = "!include ${./home_assistant/battery_sensors.yaml}";
};
};
recorder = {
db_url = "postgresql://@/hass";
auto_purge = true;
purge_keep_days = 3650;
db_retry_wait = 15;
};
assist_pipeline = { };
backup = { };
bluetooth = { };
config = { };
dhcp = { };
energy = { };
history = { };
homeassistant_alerts = { };
image_upload = { };
logbook = { };
media_source = { };
mobile_app = { };
ssdp = { };
sun = { };
webhook = { };
cloud = { };
zeroconf = { };
automation = "!include automations.yaml";
script = "!include scripts.yaml";
scene = "!include scenes.yaml";
group = "!include groups.yaml";
};
extraPackages =
python3Packages: with python3Packages; [
pymodbus # for modbus
gtts # not sure what wants this
jellyfin-apiclient-python # for jellyfin
paho-mqtt # for mqtt
psycopg2 # for postgresql
forecast-solar # for solar forecast
aioesphomeapi # for esphome
esphome-dashboard-api # for esphome
py-improv-ble-client # for esphome
bleak-esphome # for esphome
];
extraComponents = [ "isal" ];
};
esphome = {
enable = true;
openFirewall = true;
address = "192.168.90.35";
};
};
}

View File

@@ -1,99 +0,0 @@
template:
- sensor:
# Battery 0
- name: "JK0 charge power W"
unique_id: jk0_charge_power_w
unit_of_measurement: W
device_class: power
state_class: measurement
state: >
{% set p = states('sensor.batteries_jk0_power')|float(0) %}
{{ max(0, p) }}
- name: "JK0 discharge power W"
unique_id: jk0_discharge_power_w
unit_of_measurement: W
device_class: power
state_class: measurement
state: >
{% set p = states('sensor.batteries_jk0_power')|float(0) %}
{{ max(0, -p) }}
# Battery 1
- name: "JK1 charge power W"
unique_id: jk1_charge_power_w
unit_of_measurement: W
device_class: power
state_class: measurement
state: >
{% set p = states('sensor.batteries_jk1_power')|float(0) %}
{{ max(0, p) }}
- name: "JK1 discharge power W"
unique_id: jk1_discharge_power_w
unit_of_measurement: W
device_class: power
state_class: measurement
state: >
{% set p = states('sensor.batteries_jk1_power')|float(0) %}
{{ max(0, -p) }}
sensor:
# Battery 0
- platform: integration
source: sensor.jk0_charge_power_w
name: "JK0 energy in"
unique_id: jk0_energy_in_kwh
unit_prefix: k
method: trapezoidal
round: 3
max_sub_interval:
minutes: 5
- platform: integration
source: sensor.jk0_discharge_power_w
name: "JK0 energy out"
unique_id: jk0_energy_out_kwh
unit_prefix: k
method: trapezoidal
round: 3
max_sub_interval:
minutes: 5
# Battery 1
- platform: integration
source: sensor.jk1_charge_power_w
name: "JK1 energy in"
unique_id: jk1_energy_in_kwh
unit_prefix: k
method: trapezoidal
round: 3
max_sub_interval:
minutes: 5
- platform: integration
source: sensor.jk1_discharge_power_w
name: "JK1 energy out"
unique_id: jk1_energy_out_kwh
unit_prefix: k
method: trapezoidal
round: 3
max_sub_interval:
minutes: 5
utility_meter:
# Battery 0
jk0_energy_in_daily:
source: sensor.jk0_energy_in
name: "JK0 Energy In Daily"
cycle: daily
jk0_energy_out_daily:
source: sensor.jk0_energy_out
name: "JK0 Energy Out Daily"
cycle: daily
# Battery 1
jk1_energy_in_daily:
source: sensor.jk1_energy_in
name: "JK1 Energy In Daily"
cycle: daily
jk1_energy_out_daily:
source: sensor.jk1_energy_out
name: "JK1 Energy Out Daily"
cycle: daily

View File

@@ -1,347 +0,0 @@
modbus:
- name: victron_gx
type: tcp
host: 192.168.103.30
port: 502
timeout: 3
delay: 2
sensors:
# ---- SOLAR CHARGER (Unit ID 226) ----
- name: Solar Voltage
slave: 226
address: 776
input_type: holding
data_type: uint16
scale: 0.01
precision: 2
unit_of_measurement: "V"
device_class: voltage
state_class: measurement
- name: Solar Amperage
slave: 226
address: 777
input_type: holding
data_type: int16
scale: 0.1
precision: 1
unit_of_measurement: "A"
device_class: current
state_class: measurement
- name: Solar Wattage
slave: 226
address: 789
input_type: holding
data_type: uint16
scale: 0.1
unit_of_measurement: "W"
device_class: power
state_class: measurement
- name: Solar Yield Today
slave: 226
address: 784
input_type: holding
data_type: uint16
scale: 0.1
precision: 3
unit_of_measurement: "kWh"
device_class: energy
state_class: total
# DC system
- name: DC Voltage
slave: 100
address: 840
input_type: holding
data_type: uint16
scale: 0.1
precision: 2
unit_of_measurement: "V"
device_class: voltage
state_class: measurement
unique_id: dc_voltage
- name: DC Wattage
slave: 100
address: 860
input_type: holding
data_type: int16
scale: 1
precision: 0
unit_of_measurement: "W"
device_class: power
state_class: measurement
unique_id: dc_wattage
# GPS
- name: GPS Latitude
slave: 100
address: 2800
input_type: holding
data_type: int32
scale: 0.0000001
precision: 7
state_class: measurement
unique_id: gps_latitude
- name: GPS Longitude
slave: 100
address: 2802
input_type: holding
data_type: int32
scale: 0.0000001
precision: 7
state_class: measurement
unique_id: gps_longitude
- name: GPS Course
slave: 100
address: 2804
input_type: holding
data_type: uint16
scale: 0.01
precision: 2
unit_of_measurement: "°"
state_class: measurement
unique_id: gps_course
- name: GPS Speed
slave: 100
address: 2805
input_type: holding
data_type: uint16
scale: 0.01
precision: 2
unit_of_measurement: "m/s"
state_class: measurement
unique_id: gps_speed
- name: GPS Fix
slave: 100
address: 2806
input_type: holding
data_type: uint16
scale: 1
state_class: measurement
unique_id: gps_fix
- name: GPS Satellites
slave: 100
address: 2807
input_type: holding
data_type: uint16
scale: 1
state_class: measurement
unique_id: gps_satellites
- name: GPS Altitude
slave: 100
address: 2808
input_type: holding
data_type: int32
scale: 0.16
precision: 1
unit_of_measurement: "m"
state_class: measurement
unique_id: gps_altitude
# ---- CHARGER (Unit ID 223) ----
- name: Charger Output 1 Voltage
slave: 223
address: 2307
input_type: holding
data_type: uint16
scale: 0.01
precision: 2
unit_of_measurement: "V"
device_class: voltage
state_class: measurement
unique_id: charger_output_1_voltage
- name: Charger Output 1 Current
slave: 223
address: 2308
input_type: holding
data_type: int16
scale: 0.1
precision: 1
unit_of_measurement: "A"
device_class: current
state_class: measurement
unique_id: charger_output_1_current
- name: Charger Output 1 Temperature
slave: 223
address: 2309
input_type: holding
data_type: int16
scale: 0.1
precision: 1
unit_of_measurement: "°C"
device_class: temperature
state_class: measurement
unique_id: charger_output_1_temperature
- name: Charger AC Current
slave: 223
address: 2314
input_type: holding
data_type: int16
scale: 0.1
precision: 1
unit_of_measurement: "A"
device_class: current
state_class: measurement
unique_id: charger_ac_current
- name: Charger AC Current Limit
slave: 223
address: 2316
input_type: holding
data_type: int16
scale: 0.1
precision: 1
unit_of_measurement: "A"
device_class: current
state_class: measurement
unique_id: charger_ac_current_limit
- name: Charger On Off Raw
slave: 223
address: 2317
input_type: holding
data_type: uint16
scale: 1
state_class: measurement
unique_id: charger_on_off_raw
- name: Charger Charge State Raw
slave: 223
address: 2318
input_type: holding
data_type: uint16
scale: 1
state_class: measurement
unique_id: charger_charge_state_raw
- name: Charger Error Code
slave: 223
address: 2319
input_type: holding
data_type: uint16
scale: 1
state_class: measurement
unique_id: charger_error_code
- name: Charger Relay State
slave: 223
address: 2320
input_type: holding
data_type: uint16
scale: 1
state_class: measurement
unique_id: charger_relay_state
- name: Charger Low Voltage Alarm
slave: 223
address: 2321
input_type: holding
data_type: uint16
scale: 1
state_class: measurement
unique_id: charger_low_voltage_alarm
- name: Charger High Voltage Alarm
slave: 223
address: 2322
input_type: holding
data_type: uint16
scale: 1
state_class: measurement
unique_id: charger_high_voltage_alarm
template:
- sensor:
- name: Charger On Off
state: >-
{% set v = states('sensor.charger_on_off_raw')|int %}
{{ {0:'Off',1:'On',2:'Error',3:'Unavailable'}.get(v, 'Unknown') }}
- name: Charger Charge State
state: >-
{% set v = states('sensor.charger_charge_state_raw')|int %}
{{ {
0:'Off',1:'Low Power',2:'Fault',3:'Bulk',4:'Absorption',5:'Float',
6:'Storage',7:'Equalize/Manual',8:'External Control'
}.get(v,'Unknown') }}
- name: "Charger DC Wattage"
unique_id: charger_dc_wattage
unit_of_measurement: "W"
device_class: power
state_class: measurement
state: >-
{% set v = states('sensor.charger_output_1_voltage')|float(0) %}
{% set a = states('sensor.charger_output_1_current')|float(0) %}
{{ (v * a) | round(1) }}
- binary_sensor:
- name: Charger Low Voltage Alarm Active
state: "{{ states('sensor.charger_low_voltage_alarm')|int == 2 }}"
- name: Charger High Voltage Alarm Active
state: "{{ states('sensor.charger_high_voltage_alarm')|int == 2 }}"
sensor:
- platform: integration
source: sensor.dc_wattage
name: DC System Energy
unit_prefix: k
round: 2
method: trapezoidal
max_sub_interval:
minutes: 5
- platform: integration
source: sensor.solar_wattage
name: Solar Yield
unit_prefix: k
round: 2
method: trapezoidal
max_sub_interval:
minutes: 5
- platform: integration
source: sensor.charger_dc_wattage
name: DC Charger Energy
unit_prefix: k
round: 2
method: trapezoidal
max_sub_interval:
minutes: 5
utility_meter:
dc_load_energy_daily:
source: sensor.dc_system_energy
cycle: daily
dc_load_energy_monthly:
source: sensor.dc_system_energy
cycle: monthly
solar_yield_daily:
source: sensor.solar_yield
cycle: daily
solar_yield_monthly:
source: sensor.solar_yield
cycle: monthly
charger_dc_wattage_daily:
source: sensor.dc_charger_energy
cycle: daily
charger_dc_wattage_monthly:
source: sensor.dc_charger_energy
cycle: monthly

View File

@@ -1,6 +0,0 @@
{
services.jellyfin = {
enable = true;
openFirewall = true;
};
}

View File

@@ -1,151 +0,0 @@
{ pkgs, ... }:
{
networking.firewall.allowedTCPPorts = [ 5432 ];
services.postgresql = {
enable = true;
package = pkgs.postgresql_17_jit;
enableTCPIP = true;
enableJIT = true;
authentication = pkgs.lib.mkOverride 10 ''
# admins
local all postgres trust
host all postgres 127.0.0.1/32 trust
host all postgres ::1/128 trust
local all richie trust
host all richie 127.0.0.1/32 trust
host all richie ::1/128 trust
host all richie 192.168.90.1/24 trust
host all richie 192.168.99.1/24 trust
#type database DBuser origin-address auth-method
local hass hass trust
# ipv4
host hass hass 192.168.90.1/24 trust
host hass hass 127.0.0.1/32 trust
# ipv6
host hass hass ::1/128 trust
'';
identMap = ''
# ArbitraryMapName systemUser DBUser
superuser_map root postgres
superuser_map postgres postgres
# Let other names login as themselves
superuser_map richie postgres
superuser_map hass hass
'';
ensureUsers = [
{
name = "postgres";
ensureClauses = {
superuser = true;
login = true;
createrole = true;
createdb = true;
replication = true;
};
}
{
name = "richie";
ensureDBOwnership = true;
ensureClauses = {
superuser = true;
login = true;
createrole = true;
createdb = true;
replication = true;
};
}
{
name = "hass";
ensureDBOwnership = true;
ensureClauses = {
login = true;
createrole = true;
createdb = true;
replication = true;
};
}
];
ensureDatabases = [
"hass"
"richie"
];
# Thank you NotAShelf
# https://github.com/NotAShelf/nyx/blob/d407b4d6e5ab7f60350af61a3d73a62a5e9ac660/modules/core/roles/server/system/services/databases/postgresql.nix#L74
settings = {
# Connectivity;
max_connections = 100;
superuser_reserved_connections = 3;
# Memory Settings;
shared_buffers = "1024 MB";
work_mem = "32 MB";
maintenance_work_mem = "320 MB";
huge_pages = "off";
effective_cache_size = "2 GB";
effective_io_concurrency = 100; # concurrent IO only really activated if OS supports posix_fadvise function;
random_page_cost = 1.25; # speed of random disk access relative to sequential access (1.0);
# Monitoring;
shared_preload_libraries = "pg_stat_statements,auto_explain"; # per statement resource usage stats & log explain statements for slow queries
track_io_timing = "on"; # measure exact block IO times;
track_functions = "pl"; # track execution times of pl-language procedures if any;
# Replication;
wal_level = "replica"; # consider using at least "replica";
max_wal_senders = 0;
synchronous_commit = "on";
# Checkpointing: ;
checkpoint_timeout = "15 min";
checkpoint_completion_target = 0.9;
max_wal_size = "1024 MB";
min_wal_size = "512 MB";
# WAL writing;
wal_compression = "on";
wal_buffers = -1; # auto-tuned by Postgres till maximum of segment size (16MB by default);
wal_writer_delay = "200ms";
wal_writer_flush_after = "1MB";
# Background writer;
bgwriter_delay = "200ms";
bgwriter_lru_maxpages = 100;
bgwriter_lru_multiplier = 2.0;
bgwriter_flush_after = 0;
# Parallel queries: ;
max_worker_processes = 6;
max_parallel_workers_per_gather = 3;
max_parallel_maintenance_workers = 3;
max_parallel_workers = 6;
parallel_leader_participation = "on";
# Advanced features ;
enable_partitionwise_join = "on";
enable_partitionwise_aggregate = "on";
jit = "on";
jit_above_cost = 100000;
jit_inline_above_cost = 150000;
jit_optimize_above_cost = 500000;
# log slow queries
log_min_duration_statement = 100;
"auto_explain.log_min_duration" = 100;
# logging configuration
log_connections = true;
log_statement = "ddl";
logging_collector = true;
log_disconnections = true;
log_rotation_age = "14d";
};
};
}

View File

@@ -1,30 +0,0 @@
{
networking.firewall.allowedTCPPorts = [ 8384 ];
services.syncthing = {
overrideFolders = false;
guiAddress = "192.168.90.35:8384";
settings = {
"dotfiles" = {
path = "/home/richie/dotfiles";
devices = [
"bob"
"jeeves"
"rhapsody-in-green"
];
fsWatcherEnabled = true;
};
"important" = {
id = "4ckma-gtshs"; # cspell:disable-line
path = "/home/richie/important";
devices = [
"bob"
"jeeves"
"phone"
"rhapsody-in-green"
];
fsWatcherEnabled = true;
};
};
};
}

View File

@@ -1,16 +1,10 @@
let
vars = import ./vars.nix;
in
{ {
imports = [ imports = [
../../users/richie ../../users/richie
../../users/math
../../users/dov
../../common/global ../../common/global
../../common/optional/docker.nix ../../common/optional/docker.nix
../../common/optional/ssh_decrypt.nix ../../common/optional/ssh_decrypt.nix
../../common/optional/syncthing_base.nix ../../common/optional/syncthing_base.nix
../../common/optional/update.nix
../../common/optional/zerotier.nix ../../common/optional/zerotier.nix
./docker ./docker
./services ./services
@@ -27,12 +21,7 @@ in
smartd.enable = true; smartd.enable = true;
snapshot_manager = { snapshot_manager.path = ./snapshot_config.toml;
path = ./snapshot_config.toml;
EnvironmentFile = "${vars.secrets}/services/snapshot_manager";
};
zerotierone.joinNetworks = [ "a09acf02330d37b9" ];
}; };
system.stateVersion = "24.05"; system.stateVersion = "24.05";

View File

@@ -1,21 +0,0 @@
let
vars = import ../vars.nix;
in
{
networking.firewall.allowedTCPPorts = [
8989
];
virtualisation.oci-containers.containers.signal_cli_rest_api = {
image = "bbernhard/signal-cli-rest-api:latest";
ports = [
"8989:8080"
];
volumes = [
"${vars.docker_configs}/signal-cli-config:/home/.local/share/signal-cli"
];
environment = {
MODE = "json-rpc";
};
autoStart = true;
};
}

View File

@@ -1,9 +1,4 @@
{ { config, lib, ... }:
config,
lib,
outputs,
...
}:
with lib; with lib;
@@ -69,15 +64,11 @@ in
Host jeeves Host jeeves
Port 629 Port 629
User github-runners User github-runners
HostName jeeves HostName 192.168.99.14
IdentityFile ${vars.secrets}/services/github-runners/id_ed25519_github-runners IdentityFile ${vars.secrets}/services/github-runners/id_ed25519_github-runners
StrictHostKeyChecking no StrictHostKeyChecking no
UserKnownHostsFile /dev/null UserKnownHostsFile /dev/null
''; '';
nixpkgs = {
overlays = builtins.attrValues outputs.overlays;
config.allowUnfree = true;
};
services.github-runners.${name} = { services.github-runners.${name} = {
enable = true; enable = true;
replace = true; replace = true;
@@ -92,7 +83,6 @@ in
nixos-rebuild nixos-rebuild
openssh openssh
treefmt treefmt
my_python
]; ];
}; };
users = { users = {

View File

@@ -27,7 +27,6 @@ sudo zfs create -o recordsize=16k -o primarycache=metadata -o mountpoint=/zfs/me
# scratch datasets # scratch datasets
sudo zfs create -o recordsize=16k -o sync=disabled scratch/qbitvpn sudo zfs create -o recordsize=16k -o sync=disabled scratch/qbitvpn
sudo zfs create -o recordsize=16k -o sync=disabled scratch/transmission sudo zfs create -o recordsize=16k -o sync=disabled scratch/transmission
sudo zfs create -o recordsize=1M scratch/kafka
# storage datasets # storage datasets
sudo zfs create -o recordsize=1M -o compression=zstd-19 storage/archive sudo zfs create -o recordsize=1M -o compression=zstd-19 storage/archive
@@ -39,4 +38,3 @@ sudo zfs create -o compression=zstd-19 storage/syncthing
sudo zfs create -o recordsize=1M -o compression=zstd-9 -o exec=off -o sync=disabled storage/qbitvpn sudo zfs create -o recordsize=1M -o compression=zstd-9 -o exec=off -o sync=disabled storage/qbitvpn
sudo zfs create -o recordsize=1M -o compression=zstd-9 -o exec=off -o sync=disabled storage/transmission sudo zfs create -o recordsize=1M -o compression=zstd-9 -o exec=off -o sync=disabled storage/transmission
sudo zfs create -o recordsize=1M -o compression=zstd-19 storage/library sudo zfs create -o recordsize=1M -o compression=zstd-19 storage/library
sudo zfs create -o recordsize=1M -o compression=zstd-19 -o sync=disabled storage/ollama

View File

@@ -32,7 +32,6 @@ frontend ContentSwitching
acl host_jellyfin hdr(host) -i jellyfin.tmmworkshop.com acl host_jellyfin hdr(host) -i jellyfin.tmmworkshop.com
acl host_share hdr(host) -i share.tmmworkshop.com acl host_share hdr(host) -i share.tmmworkshop.com
acl host_gcw hdr(host) -i gcw.tmmworkshop.com acl host_gcw hdr(host) -i gcw.tmmworkshop.com
acl host_n8n hdr(host) -i n8n.tmmworkshop.com
use_backend audiobookshelf_nodes if host_audiobookshelf use_backend audiobookshelf_nodes if host_audiobookshelf
use_backend cache_nodes if host_cache use_backend cache_nodes if host_cache
@@ -41,7 +40,6 @@ frontend ContentSwitching
use_backend jellyfin if host_jellyfin use_backend jellyfin if host_jellyfin
use_backend share_nodes if host_share use_backend share_nodes if host_share
use_backend gcw_nodes if host_gcw use_backend gcw_nodes if host_gcw
use_backend n8n if host_n8n
backend audiobookshelf_nodes backend audiobookshelf_nodes
mode http mode http
@@ -57,7 +55,7 @@ backend filebrowser_nodes
backend homeassistant_nodes backend homeassistant_nodes
mode http mode http
server server 192.168.90.35:8123 server server 127.0.0.1:8123
backend jellyfin backend jellyfin
option httpchk option httpchk
@@ -73,7 +71,3 @@ backend share_nodes
backend gcw_nodes backend gcw_nodes
mode http mode http
server server 127.0.0.1:8092 server server 127.0.0.1:8092
backend n8n
mode http
server server 127.0.0.1:5678

View File

@@ -72,6 +72,7 @@ in
rokuecp rokuecp
uiprotect uiprotect
wakeonlan wakeonlan
wyoming
]; ];
extraComponents = [ "isal" ]; extraComponents = [ "isal" ];
}; };
@@ -80,5 +81,23 @@ in
openFirewall = true; openFirewall = true;
address = "192.168.90.40"; address = "192.168.90.40";
}; };
wyoming = {
faster-whisper.servers.main = {
enable = true;
uri = "tcp://0.0.0.0:10300";
model = "medium.en";
language = "en";
device = "cuda";
};
piper.servers.main = {
enable = true;
uri = "tcp://0.0.0.0:10200";
voice = "en_GB-alba-medium";
};
openwakeword = {
enable = true;
uri = "tcp://0.0.0.0:10400";
};
};
}; };
} }

View File

@@ -1,12 +0,0 @@
let
vars = import ../vars.nix;
in
{
services.apache-kafka = {
enable = false;
settings = {
listeners = [ "PLAINTEXT://localhost:9092" ];
"log.dirs" = [ vars.kafka ];
};
};
}

View File

@@ -1,38 +0,0 @@
let
vars = import ../vars.nix;
in
{
services = {
ollama = {
user = "ollama";
enable = true;
host = "0.0.0.0";
loadModels = [
"codellama:7b"
"deepseek-r1:14b"
"deepseek-r1:32b"
"deepseek-r1:8b"
"gemma3:12b"
"gemma3:27b"
"gpt-oss:120b"
"gpt-oss:20b"
"qwen3:14b"
"qwen3:30b"
];
models = vars.ollama;
openFirewall = true;
};
};
systemd.services = {
ollama.serviceConfig = {
Nice = 19;
IOSchedulingPriority = 7;
};
ollama-model-loader.serviceConfig = {
Nice = 19;
CPUWeight = 50;
IOSchedulingClass = "idle";
IOSchedulingPriority = 7;
};
};
}

View File

@@ -1,10 +1,8 @@
{ pkgs, ... }:
let let
vars = import ../vars.nix; vars = import ../vars.nix;
in in
{ {
services.nix-serve = { services.nix-serve = {
package = pkgs.nix-serve-ng;
enable = true; enable = true;
secretKeyFile = "${vars.secrets}/services/nix-cache/cache-priv-key.pem"; secretKeyFile = "${vars.secrets}/services/nix-cache/cache-priv-key.pem";
openFirewall = true; openFirewall = true;

View File

@@ -48,12 +48,6 @@ in
host gcw gcw 192.168.90.1/24 trust host gcw gcw 192.168.90.1/24 trust
host gcw gcw 127.0.0.1/32 trust host gcw gcw 127.0.0.1/32 trust
# math
local postgres math trust
host postgres math 127.0.0.1/32 trust
host postgres math ::1/128 trust
host postgres math 192.168.90.1/24 trust
''; '';
identMap = '' identMap = ''
@@ -116,25 +110,13 @@ in
replication = true; replication = true;
}; };
} }
{
name = "math";
ensureDBOwnership = true;
ensureClauses = {
login = true;
createrole = true;
createdb = true;
replication = true;
};
}
]; ];
ensureDatabases = [ ensureDatabases = [
"gcw" "gcw"
"hass" "hass"
"math"
"megan" "megan"
"mxr_dev" "mxr_dev"
"mxr_prod" "mxr_prod"
"n8n"
"richie" "richie"
]; ];
# Thank you NotAShelf # Thank you NotAShelf
@@ -202,10 +184,9 @@ in
# logging configuration # logging configuration
log_connections = true; log_connections = true;
log_statement = "ddl"; log_statement = "all";
logging_collector = true; logging_collector = true;
log_disconnections = true; log_disconnections = true;
log_rotation_age = "14d";
}; };
}; };
} }

View File

@@ -25,7 +25,7 @@ in
serviceConfig = { serviceConfig = {
EnvironmentFile = "${vars.secrets}/services/server-validation"; EnvironmentFile = "${vars.secrets}/services/server-validation";
Type = "oneshot"; Type = "oneshot";
ExecStart = "${inputs.system_tools.packages.x86_64-linux.default}/bin/validate_system '${./validate_system.toml}'"; ExecStart = "${inputs.system_tools.packages.x86_64-linux.default}/bin/validate_system --config-file='${./validate_system.toml}'";
}; };
}; };
}; };

View File

@@ -51,45 +51,3 @@ monthly = 12
hourly = 12 hourly = 12
daily = 14 daily = 14
monthly = 2 monthly = 2
["media/services"]
15_min = 3
hourly = 12
daily = 14
monthly = 2
["media/home_assistant"]
15_min = 3
hourly = 12
daily = 14
monthly = 2
["scratch/qbitvpn"]
15_min = 0
hourly = 0
daily = 0
monthly = 0
["scratch/transmission"]
15_min = 0
hourly = 0
daily = 0
monthly = 0
["storage/qbitvpn"]
15_min = 0
hourly = 0
daily = 0
monthly = 0
["storage/transmission"]
15_min = 0
hourly = 0
daily = 0
monthly = 0
["storage/ollama"]
15_min = 0
hourly = 0
daily = 0
monthly = 0

View File

@@ -14,7 +14,6 @@ in
path = "/home/richie/dotfiles"; path = "/home/richie/dotfiles";
devices = [ devices = [
"bob" "bob"
"brain"
"rhapsody-in-green" "rhapsody-in-green"
]; ];
fsWatcherEnabled = true; fsWatcherEnabled = true;
@@ -24,10 +23,7 @@ in
path = vars.notes; path = vars.notes;
devices = [ devices = [
"rhapsody-in-green" "rhapsody-in-green"
{ "davids-server"
name = "davids-server";
encryptionPasswordFile = "${vars.secrets}/services/syncthing/davids-server";
}
]; ];
fsWatcherEnabled = true; fsWatcherEnabled = true;
}; };
@@ -36,9 +32,8 @@ in
path = "${vars.syncthing}/important"; path = "${vars.syncthing}/important";
devices = [ devices = [
"bob" "bob"
"brain"
"phone"
"rhapsody-in-green" "rhapsody-in-green"
"phone"
]; ];
fsWatcherEnabled = true; fsWatcherEnabled = true;
}; };
@@ -72,20 +67,14 @@ in
path = "/home/richie/vault"; path = "/home/richie/vault";
devices = [ devices = [
"rhapsody-in-green" "rhapsody-in-green"
{ "davids-server"
name = "davids-server";
encryptionPasswordFile = "${vars.secrets}/services/syncthing/davids-server";
}
]; ];
fsWatcherEnabled = true; fsWatcherEnabled = true;
}; };
"backup" = { "backup" = {
path = "${vars.syncthing}/backup"; path = "${vars.syncthing}/backup";
devices = [ devices = [
{ "davids-server"
name = "davids-server";
encryptionPasswordFile = "${vars.secrets}/services/syncthing/davids-server";
}
]; ];
fsWatcherEnabled = true; fsWatcherEnabled = true;
}; };

View File

@@ -17,7 +17,5 @@ in
share = "${zfs_media}/share"; share = "${zfs_media}/share";
syncthing = "${zfs_storage}/syncthing"; syncthing = "${zfs_storage}/syncthing";
transmission = "${zfs_storage}/transmission"; transmission = "${zfs_storage}/transmission";
ollama = "${zfs_storage}/ollama";
transmission_scratch = "${zfs_scratch}/transmission"; transmission_scratch = "${zfs_scratch}/transmission";
kafka = "${zfs_scratch}/kafka";
} }

View File

@@ -1,28 +0,0 @@
{ inputs, ... }:
{
imports = [
../../users/elise
../../users/richie
../../common/global
../../common/optional/desktop.nix
../../common/optional/steam.nix
../../common/optional/systemd-boot.nix
../../common/optional/update.nix
../../common/optional/zerotier.nix
./hardware.nix
inputs.nixos-hardware.nixosModules.framework-13-7040-amd
];
networking = {
hostName = "leviathan";
hostId = "cb9b64d8";
firewall.enable = true;
networkmanager.enable = true;
};
services = {
openssh.ports = [ 332 ];
};
system.stateVersion = "25.05";
}

View File

@@ -1,69 +0,0 @@
{
config,
lib,
modulesPath,
...
}:
{
imports = [ (modulesPath + "/installer/scan/not-detected.nix") ];
boot = {
initrd = {
availableKernelModules = [
"ahci"
"ehci_pci"
"nvme"
"sd_mod"
"usb_storage"
"usbhid"
"xhci_pci"
];
kernelModules = [ ];
luks.devices."luks-root-pool-nvme-Samsung_SSD_970_EVO_Plus_1TB_S6S1NS0T617615W-part2" = {
device = "/dev/disk/by-id/nvme-Samsung_SSD_970_EVO_Plus_1TB_S6S1NS0T617615W-part2";
bypassWorkqueues = true;
allowDiscards = true;
};
};
kernelModules = [ "kvm-amd" ];
extraModulePackages = [ ];
};
fileSystems = {
"/" = lib.mkDefault {
device = "root_pool/root";
fsType = "zfs";
};
"/home" = {
device = "root_pool/home";
fsType = "zfs";
};
"/var" = {
device = "root_pool/var";
fsType = "zfs";
};
"/nix" = {
device = "root_pool/nix";
fsType = "zfs";
};
"/boot" = {
device = "/dev/disk/by-uuid/12CE-A600";
fsType = "vfat";
options = [
"fmask=0077"
"dmask=0077"
];
};
};
swapDevices = [ ];
networking.useDHCP = lib.mkDefault true;
nixpkgs.hostPlatform = lib.mkDefault "x86_64-linux";
hardware.cpu.amd.updateMicrocode = lib.mkDefault config.hardware.enableRedistributableFirmware;
}

View File

@@ -11,7 +11,6 @@
../../common/optional/yubikey.nix ../../common/optional/yubikey.nix
../../common/optional/zerotier.nix ../../common/optional/zerotier.nix
./hardware.nix ./hardware.nix
./llms.nix
./syncthing.nix ./syncthing.nix
inputs.nixos-hardware.nixosModules.framework-13-7040-amd inputs.nixos-hardware.nixosModules.framework-13-7040-amd
]; ];
@@ -19,10 +18,7 @@
networking = { networking = {
hostName = "rhapsody-in-green"; hostName = "rhapsody-in-green";
hostId = "6404140d"; hostId = "6404140d";
firewall = { firewall.enable = true;
enable = true;
allowedTCPPorts = [ ];
};
networkmanager.enable = true; networkmanager.enable = true;
}; };

View File

@@ -1,30 +0,0 @@
{
services.ollama = {
user = "ollama";
enable = true;
host = "127.0.0.1";
loadModels = [
"codellama:7b"
"deepseek-r1:14b"
"deepseek-r1:32b"
"deepseek-r1:8b"
"gemma3:12b"
"gemma3:27b"
"gpt-oss:20b"
"qwen3:14b"
"qwen3:30b"
];
};
systemd.services = {
ollama.serviceConfig = {
Nice = 19;
IOSchedulingPriority = 7;
};
ollama-model-loader.serviceConfig = {
Nice = 19;
CPUWeight = 50;
IOSchedulingClass = "idle";
IOSchedulingPriority = 7;
};
};
}

View File

@@ -3,9 +3,8 @@
"dotfiles" = { "dotfiles" = {
path = "/home/richie/dotfiles"; path = "/home/richie/dotfiles";
devices = [ devices = [
"bob"
"brain"
"jeeves" "jeeves"
"bob"
]; ];
fsWatcherEnabled = true; fsWatcherEnabled = true;
}; };
@@ -22,7 +21,6 @@
path = "/home/richie/important"; path = "/home/richie/important";
devices = [ devices = [
"bob" "bob"
"brain"
"jeeves" "jeeves"
"phone" "phone"
]; ];

View File

@@ -1,258 +0,0 @@
{
"nodes": {
"firefox-addons": {
"inputs": {
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"dir": "pkgs/firefox-addons",
"lastModified": 1757649814,
"narHash": "sha256-VjtA+fqkraKHbGzjKJBPfDj+SXysXiR4SrghTr10HoY=",
"owner": "rycee",
"repo": "nur-expressions",
"rev": "789920825fc982a93a2bf91a714367fa8f7ea0a6",
"type": "gitlab"
},
"original": {
"dir": "pkgs/firefox-addons",
"owner": "rycee",
"repo": "nur-expressions",
"type": "gitlab"
}
},
"home-manager": {
"inputs": {
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1757698511,
"narHash": "sha256-UqHHGydF/q3jfYXCpvYLA0TWtvByOp1NwOKCUjhYmPs=",
"owner": "nix-community",
"repo": "home-manager",
"rev": "a3fcc92180c7462082cd849498369591dfb20855",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "home-manager",
"type": "github"
}
},
"nixos-hardware": {
"locked": {
"lastModified": 1757103352,
"narHash": "sha256-PtT7ix43ss8PONJ1VJw3f6t2yAoGH+q462Sn8lrmWmk=",
"owner": "nixos",
"repo": "nixos-hardware",
"rev": "11b2a10c7be726321bb854403fdeec391e798bf0",
"type": "github"
},
"original": {
"owner": "nixos",
"ref": "master",
"repo": "nixos-hardware",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1757487488,
"narHash": "sha256-zwE/e7CuPJUWKdvvTCB7iunV4E/+G0lKfv4kk/5Izdg=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "ab0f3607a6c7486ea22229b92ed2d355f1482ee0",
"type": "github"
},
"original": {
"owner": "nixos",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs-master": {
"locked": {
"lastModified": 1757720853,
"narHash": "sha256-VBS5+YKIT8Aj81ZW+8Bg9MuYoI6OqO6HSrwG4dpHpW4=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "2ca437b4796d049192eb30576a50fef139038c09",
"type": "github"
},
"original": {
"owner": "nixos",
"ref": "master",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs-stable": {
"locked": {
"lastModified": 1735563628,
"narHash": "sha256-OnSAY7XDSx7CtDoqNh8jwVwh4xNL/2HaJxGjryLWzX8=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "b134951a4c9f3c995fd7be05f3243f8ecd65d798",
"type": "github"
},
"original": {
"owner": "nixos",
"ref": "nixos-24.05",
"repo": "nixpkgs",
"type": "github"
}
},
"pyproject-build-systems": {
"inputs": {
"nixpkgs": [
"system_tools",
"nixpkgs"
],
"pyproject-nix": [
"system_tools",
"pyproject-nix"
],
"uv2nix": [
"system_tools",
"uv2nix"
]
},
"locked": {
"lastModified": 1744599653,
"narHash": "sha256-nysSwVVjG4hKoOjhjvE6U5lIKA8sEr1d1QzEfZsannU=",
"owner": "pyproject-nix",
"repo": "build-system-pkgs",
"rev": "7dba6dbc73120e15b558754c26024f6c93015dd7",
"type": "github"
},
"original": {
"owner": "pyproject-nix",
"repo": "build-system-pkgs",
"type": "github"
}
},
"pyproject-nix": {
"inputs": {
"nixpkgs": [
"system_tools",
"nixpkgs"
]
},
"locked": {
"lastModified": 1746540146,
"narHash": "sha256-QxdHGNpbicIrw5t6U3x+ZxeY/7IEJ6lYbvsjXmcxFIM=",
"owner": "pyproject-nix",
"repo": "pyproject.nix",
"rev": "e09c10c24ebb955125fda449939bfba664c467fd",
"type": "github"
},
"original": {
"owner": "pyproject-nix",
"repo": "pyproject.nix",
"type": "github"
}
},
"root": {
"inputs": {
"firefox-addons": "firefox-addons",
"home-manager": "home-manager",
"nixos-hardware": "nixos-hardware",
"nixpkgs": "nixpkgs",
"nixpkgs-master": "nixpkgs-master",
"nixpkgs-stable": "nixpkgs-stable",
"sops-nix": "sops-nix",
"system_tools": "system_tools",
"systems": "systems"
}
},
"sops-nix": {
"inputs": {
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1757503115,
"narHash": "sha256-S9F6bHUBh+CFEUalv/qxNImRapCxvSnOzWBUZgK1zDU=",
"owner": "Mic92",
"repo": "sops-nix",
"rev": "0bf793823386187dff101ee2a9d4ed26de8bbf8c",
"type": "github"
},
"original": {
"owner": "Mic92",
"repo": "sops-nix",
"type": "github"
}
},
"system_tools": {
"inputs": {
"nixpkgs": [
"nixpkgs"
],
"pyproject-build-systems": "pyproject-build-systems",
"pyproject-nix": "pyproject-nix",
"uv2nix": "uv2nix"
},
"locked": {
"lastModified": 1757910132,
"narHash": "sha256-6r45DD/tMN+hYgnMc2/c82Z0bb1A7FnI/nvU8kZf/Us=",
"owner": "RichieCahill",
"repo": "system_tools",
"rev": "d63c486fe3b76c24b2ed2fff33d6f54c847b50e8",
"type": "github"
},
"original": {
"owner": "RichieCahill",
"repo": "system_tools",
"type": "github"
}
},
"systems": {
"locked": {
"lastModified": 1689347949,
"narHash": "sha256-12tWmuL2zgBgZkdoB6qXZsgJEH9LR3oUgpaQq2RbI80=",
"owner": "nix-systems",
"repo": "default-linux",
"rev": "31732fcf5e8fea42e59c2488ad31a0e651500f68",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default-linux",
"type": "github"
}
},
"uv2nix": {
"inputs": {
"nixpkgs": [
"system_tools",
"nixpkgs"
],
"pyproject-nix": [
"system_tools",
"pyproject-nix"
]
},
"locked": {
"lastModified": 1747441483,
"narHash": "sha256-W8BFXk5R0TuJcjIhcGoMpSOaIufGXpizK0pm+uTqynA=",
"owner": "pyproject-nix",
"repo": "uv2nix",
"rev": "582024dc64663e9f88d467c2f7f7b20d278349de",
"type": "github"
},
"original": {
"owner": "pyproject-nix",
"repo": "uv2nix",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

View File

@@ -1 +0,0 @@
"""Tests."""

View File

@@ -1,61 +0,0 @@
"""test_common."""
from __future__ import annotations
from os import environ
from typing import TYPE_CHECKING
from apprise import Apprise
from python.common import bash_wrapper, signal_alert, utcnow
if TYPE_CHECKING:
from pytest_mock import MockerFixture
def test_utcnow() -> None:
"""test_utcnow."""
utcnow()
def test_signal_alert(mocker: MockerFixture) -> None:
"""test_signal_alert."""
environ["SIGNAL_ALERT_FROM_PHONE"] = "1234567890"
environ["SIGNAL_ALERT_TO_PHONE"] = "0987654321"
mock_logger = mocker.patch("python.common.logger")
mock_apprise_client = mocker.MagicMock(spec=Apprise)
mocker.patch("python.common.Apprise", return_value=mock_apprise_client)
signal_alert("test")
mock_logger.info.assert_not_called()
mock_apprise_client.add.assert_called_once_with("signal://localhost:8989/1234567890/0987654321")
mock_apprise_client.notify.assert_called_once_with(title="", body="test")
def test_signal_alert_no_phones(mocker: MockerFixture) -> None:
"""test_signal_alert_no_phones."""
if "SIGNAL_ALERT_FROM_PHONE" in environ:
del environ["SIGNAL_ALERT_FROM_PHONE"]
if "SIGNAL_ALERT_TO_PHONE" in environ:
del environ["SIGNAL_ALERT_TO_PHONE"]
mock_logger = mocker.patch("python.common.logger")
signal_alert("test")
mock_logger.info.assert_called_once_with("SIGNAL_ALERT_FROM_PHONE or SIGNAL_ALERT_TO_PHONE not set")
def test_test_bash_wrapper() -> None:
"""test_test_bash_wrapper."""
stdout, returncode = bash_wrapper("echo test")
assert stdout == "test\n"
assert returncode == 0
def test_test_bash_wrapper_error() -> None:
"""test_test_bash_wrapper_error."""
expected_error = 2
stdout, returncode = bash_wrapper("ls /this/path/does/not/exist")
assert stdout == "ls: cannot access '/this/path/does/not/exist': No such file or directory\n"
assert returncode == expected_error

View File

@@ -1,104 +0,0 @@
"""test_components."""
from pytest_mock import MockerFixture
from python.system_tests.components import systemd_tests, zpool_tests
from python.zfs import Zpool
temp = "Every feature flags pool has all supported and requested features enabled.\n"
SYSTEM_TESTS_COMPONENTS = "python.system_tests.components"
def test_zpool_tests(mocker: MockerFixture) -> None:
"""test_zpool_tests."""
mock_zpool = mocker.MagicMock(spec=Zpool)
mock_zpool.health = "ONLINE"
mock_zpool.capacity = 70
mock_zpool.name = "Main"
mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.Zpool", return_value=mock_zpool)
mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.bash_wrapper", return_value=(temp, ""))
errors = zpool_tests(("Main",))
assert errors == []
def test_zpool_tests_out_of_date(mocker: MockerFixture) -> None:
"""test_zpool_tests_out_of_date."""
mock_zpool = mocker.MagicMock(spec=Zpool)
mock_zpool.health = "ONLINE"
mock_zpool.capacity = 70
mock_zpool.name = "Main"
mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.Zpool", return_value=mock_zpool)
mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.bash_wrapper", return_value=("", ""))
errors = zpool_tests(("Main",))
assert errors == ["ZPool out of date run `sudo zpool upgrade -a`"]
def test_zpool_tests_out_of_space(mocker: MockerFixture) -> None:
"""test_zpool_tests_out_of_space."""
mock_zpool = mocker.MagicMock(spec=Zpool)
mock_zpool.health = "ONLINE"
mock_zpool.capacity = 100
mock_zpool.name = "Main"
mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.Zpool", return_value=mock_zpool)
mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.bash_wrapper", return_value=(temp, ""))
errors = zpool_tests(("Main",))
assert errors == ["Main is low on space"]
def test_zpool_tests_offline(mocker: MockerFixture) -> None:
"""test_zpool_tests_offline."""
mock_zpool = mocker.MagicMock(spec=Zpool)
mock_zpool.health = "OFFLINE"
mock_zpool.capacity = 70
mock_zpool.name = "Main"
mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.Zpool", return_value=mock_zpool)
mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.bash_wrapper", return_value=(temp, ""))
errors = zpool_tests(("Main",))
assert errors == ["Main is OFFLINE"]
def test_systemd_tests(mocker: MockerFixture) -> None:
"""test_systemd_tests."""
mocker.patch(
f"{SYSTEM_TESTS_COMPONENTS}.bash_wrapper",
side_effect=[
("inactive\n", ""),
("active\n", ""),
],
)
errors = systemd_tests(("docker",))
assert errors == []
"""test_systemd_tests."""
def test_systemd_tests_multiple_negative_retries(mocker: MockerFixture) -> None:
"""test_systemd_tests_fail."""
mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.bash_wrapper", return_value=("active\n", ""))
errors = systemd_tests(("docker",), max_retries=-1, retry_delay_secs=-1)
assert errors == []
def test_systemd_tests_multiple_pass(mocker: MockerFixture) -> None:
"""test_systemd_tests_fail."""
mocker.patch(
f"{SYSTEM_TESTS_COMPONENTS}.bash_wrapper",
side_effect=[
("inactive\n", ""),
("activating\n", ""),
("active\n", ""),
],
)
errors = systemd_tests(
("docker",),
retryable_statuses=("inactive\n", "activating\n"),
valid_statuses=("active\n",),
)
assert errors == []
def test_systemd_tests_fail(mocker: MockerFixture) -> None:
"""test_systemd_tests_fail."""
mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.bash_wrapper", return_value=("inactive\n", ""))
errors = systemd_tests(("docker",), max_retries=5)
assert errors == ["docker is inactive"]

View File

@@ -1,123 +0,0 @@
"""test_executors."""
from __future__ import annotations
import logging
from concurrent.futures import Future, ThreadPoolExecutor
from typing import TYPE_CHECKING, Any
import pytest
from python.parallelize import _parallelize_base, parallelize_process, parallelize_thread
if TYPE_CHECKING:
from collections.abc import Callable
from pytest_mock import MockerFixture
class MockFuture(Future):
"""MockFuture."""
def __init__(self, result: Any) -> None: # noqa: ANN401
"""Init."""
super().__init__()
self._result = result
self._exception: BaseException | None = None
self.set_result(result)
def exception(self, timeout: float | None = None) -> BaseException | None:
"""Exception."""
logging.debug(f"{timeout}=")
return self._exception
def result(self, timeout: float | None = None) -> Any: # noqa: ANN401
"""Result."""
logging.debug(f"{timeout}=")
return self._result
class MockPoolExecutor(ThreadPoolExecutor):
"""MockPoolExecutor."""
def __init__(self, *args: Any, **kwargs: Any) -> None: # noqa: ANN401
"""Initializes a new ThreadPoolExecutor instance."""
super().__init__(*args, **kwargs)
def submit(self, fn: Callable[..., Any], /, *args: Any, **kwargs: Any) -> Future: # noqa: ANN401
"""Submits a callable to be executed with the given arguments.
Args:
fn: The callable to execute.
*args: The positional arguments to pass to the callable.
**kwargs: The keyword arguments to pass to the callable.
Returns:
A Future instance representing the execution of the callable.
"""
result = fn(*args, **kwargs)
return MockFuture(result)
def add(a: int, b: int) -> int:
"""Add."""
return a + b
def test_parallelize_thread() -> None:
"""test_parallelize_thread."""
kwargs_list = [{"a": 1, "b": 2}, {"a": 3, "b": 4}]
results = parallelize_thread(func=add, kwargs_list=kwargs_list, progress_tracker=1)
assert results.results == [3, 7]
assert not results.exceptions
def test_parallelize_thread_exception() -> None:
"""test_parallelize_thread."""
kwargs_list: list[dict[str, int | None]] = [{"a": 1, "b": 2}, {"a": 3, "b": None}]
results = parallelize_thread(func=add, kwargs_list=kwargs_list)
assert results.results == [3]
output = """[TypeError("unsupported operand type(s) for +: 'int' and 'NoneType'")]"""
assert str(results.exceptions) == output
def test_parallelize_process() -> None:
"""test_parallelize_process."""
kwargs_list = [{"a": 1, "b": 2}, {"a": 3, "b": 4}]
results = parallelize_process(func=add, kwargs_list=kwargs_list)
assert results.results == [3, 7]
assert not results.exceptions
def test_parallelize_process_to_many_max_workers(mocker: MockerFixture) -> None:
"""test_parallelize_process."""
mocker.patch(target="python.parallelize.cpu_count", return_value=1)
with pytest.raises(RuntimeError, match="max_workers must be less than or equal to 1"):
parallelize_process(func=add, kwargs_list=[{"a": 1, "b": 2}], max_workers=8)
def test_executor_results_repr() -> None:
"""test_ExecutorResults_repr."""
results = parallelize_thread(func=add, kwargs_list=[{"a": 1, "b": 2}])
assert repr(results) == "results=[3] exceptions=[]"
def test_early_error() -> None:
"""test_early_error."""
kwargs_list: list[dict[str, int | None]] = [{"a": 1, "b": 2}, {"a": 3, "b": None}]
with pytest.raises(TypeError, match=r"unsupported operand type\(s\) for \+\: 'int' and 'NoneType'"):
parallelize_thread(func=add, kwargs_list=kwargs_list, mode="early_error")
def test_mock_pool_executor() -> None:
"""test_mock_pool_executor."""
results = _parallelize_base(
executor_type=MockPoolExecutor,
func=add,
kwargs_list=[{"a": 1, "b": 2}, {"a": 3, "b": 4}],
max_workers=None,
progress_tracker=None,
mode="normal",
)
assert repr(results) == "results=[3, 7] exceptions=[]"

View File

@@ -1,60 +0,0 @@
"""test_server_validate_scripts."""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from pytest_mock import MockerFixture
from python.system_tests.validate_system import main
if TYPE_CHECKING:
from pyfakefs.fake_filesystem import FakeFilesystem
from pytest_mock import MockerFixture
VALIDATE_SYSTEM = "python.system_tests.validate_system"
def test_validate_system(mocker: MockerFixture, fs: FakeFilesystem) -> None:
"""test_validate_system."""
fs.create_file(
"/mock_snapshot_config.toml",
contents='zpool = ["root_pool", "storage", "media"]\nservices = ["docker"]\n',
)
mocker.patch(f"{VALIDATE_SYSTEM}.systemd_tests", return_value=None)
mocker.patch(f"{VALIDATE_SYSTEM}.zpool_tests", return_value=None)
main(Path("/mock_snapshot_config.toml"))
def test_validate_system_errors(mocker: MockerFixture, fs: FakeFilesystem) -> None:
"""test_validate_system_errors."""
fs.create_file(
"/mock_snapshot_config.toml",
contents='zpool = ["root_pool", "storage", "media"]\nservices = ["docker"]\n',
)
mocker.patch(f"{VALIDATE_SYSTEM}.systemd_tests", return_value=["systemd_tests error"])
mocker.patch(f"{VALIDATE_SYSTEM}.zpool_tests", return_value=["zpool_tests error"])
with pytest.raises(SystemExit) as exception_info:
main(Path("/mock_snapshot_config.toml"))
assert exception_info.value.code == 1
def test_validate_system_execution(mocker: MockerFixture, fs: FakeFilesystem) -> None:
"""test_validate_system_execution."""
fs.create_file(
"/mock_snapshot_config.toml",
contents='zpool = ["root_pool", "storage", "media"]\nservices = ["docker"]\n',
)
mocker.patch(f"{VALIDATE_SYSTEM}.zpool_tests", side_effect=RuntimeError("zpool_tests error"))
with pytest.raises(SystemExit) as exception_info:
main(Path("/mock_snapshot_config.toml"))
assert exception_info.value.code == 1

View File

@@ -1,167 +0,0 @@
"""test_snapshot_manager."""
from __future__ import annotations
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from python.tools.snapshot_manager import get_snapshots_to_delete, get_time_stamp, load_config_data, main
from python.zfs.dataset import Dataset, Snapshot
if TYPE_CHECKING:
from pyfakefs.fake_filesystem import FakeFilesystem
from pytest_mock import MockerFixture
SNAPSHOT_MANAGER = "python.tools.snapshot_manager"
def patch_utcnow(mocker: MockerFixture, datetime_value: datetime) -> None:
"""patch_utcnow."""
mocker.patch("python.tools.snapshot_manager.utcnow", return_value=datetime_value)
def create_mock_snapshot(mocker: MockerFixture, name: str) -> Snapshot:
"""create_mock_snapshot."""
mock_snapshot = mocker.MagicMock(spec=Snapshot)
mock_snapshot.name = name
return mock_snapshot
def test_main(mocker: MockerFixture, fs: FakeFilesystem) -> None:
"""Test main."""
load_config_data.cache_clear()
mocker.patch(f"{SNAPSHOT_MANAGER}.get_time_stamp", return_value="2023-01-01T00:00:00")
mock_dataset = mocker.MagicMock(spec=Dataset)
mock_dataset.name = "test_dataset"
mock_dataset.create_snapshot.return_value = "snapshot created"
mock_get_datasets = mocker.patch(f"{SNAPSHOT_MANAGER}.get_datasets", return_value=(mock_dataset,))
mock_get_snapshots_to_delete = mocker.patch(f"{SNAPSHOT_MANAGER}.get_snapshots_to_delete")
mock_signal_alert = mocker.patch(f"{SNAPSHOT_MANAGER}.signal_alert")
mock_snapshot_config_toml = '["default"]\n15_min = 8\nhourly = 24\ndaily = 0\nmonthly = 0\n'
fs.create_file("/mock_snapshot_config.toml", contents=mock_snapshot_config_toml)
main(Path("/mock_snapshot_config.toml"))
mock_signal_alert.assert_not_called()
mock_get_datasets.assert_called_once()
mock_get_snapshots_to_delete.assert_called_once_with(
mock_dataset,
{
"15_min": 8,
"hourly": 24,
"daily": 0,
"monthly": 0,
},
)
def test_main_create_snapshot_failure(mocker: MockerFixture, fs: FakeFilesystem) -> None:
"""Test main."""
load_config_data.cache_clear()
mocker.patch(f"{SNAPSHOT_MANAGER}.get_time_stamp", return_value="2023-01-01T00:00:00")
mock_dataset = mocker.MagicMock(spec=Dataset)
mock_dataset.name = "test_dataset"
mock_dataset.create_snapshot.return_value = "snapshot not created"
mock_get_datasets = mocker.patch(f"{SNAPSHOT_MANAGER}.get_datasets", return_value=(mock_dataset,))
mock_get_snapshots_to_delete = mocker.patch(f"{SNAPSHOT_MANAGER}.get_snapshots_to_delete")
mock_signal_alert = mocker.patch(f"{SNAPSHOT_MANAGER}.signal_alert")
mock_snapshot_config_toml = '["default"]\n15_min = 8\nhourly = 24\ndaily = 0\nmonthly = 0\n'
fs.create_file("/mock_snapshot_config.toml", contents=mock_snapshot_config_toml)
main(Path("/mock_snapshot_config.toml"))
mock_signal_alert.assert_called_once_with("test_dataset failed to create snapshot 2023-01-01T00:00:00")
mock_get_datasets.assert_called_once()
mock_get_snapshots_to_delete.assert_not_called()
def test_main_exception(mocker: MockerFixture, fs: FakeFilesystem) -> None:
"""Test main."""
load_config_data.cache_clear()
mocker.patch(f"{SNAPSHOT_MANAGER}.get_time_stamp", return_value="2023-01-01T00:00:00")
mock_dataset = mocker.MagicMock(spec=Dataset)
mock_dataset.name = "test_dataset"
mock_dataset.create_snapshot.return_value = "snapshot created"
mock_get_datasets = mocker.patch(f"{SNAPSHOT_MANAGER}.get_datasets", side_effect=Exception("test"))
mock_get_snapshots_to_delete = mocker.patch(f"{SNAPSHOT_MANAGER}.get_snapshots_to_delete")
mock_signal_alert = mocker.patch(f"{SNAPSHOT_MANAGER}.signal_alert")
mock_snapshot_config_toml = '["default"]\n15_min = 8\nhourly = 24\ndaily = 0\nmonthly = 0\n'
fs.create_file("/mock_snapshot_config.toml", contents=mock_snapshot_config_toml)
with pytest.raises(SystemExit) as pytest_wrapped_e:
main(Path("/mock_snapshot_config.toml"))
assert isinstance(pytest_wrapped_e.value, SystemExit)
assert pytest_wrapped_e.value.code == 1
mock_signal_alert.assert_called_once_with("snapshot_manager failed")
mock_get_datasets.assert_called_once()
mock_get_snapshots_to_delete.assert_not_called()
def test_get_snapshots_to_delete(mocker: MockerFixture) -> None:
"""test_get_snapshots_to_delete."""
mock_snapshot_0 = create_mock_snapshot(mocker, "auto_202509150415")
mock_snapshot_1 = create_mock_snapshot(mocker, "auto_202509150415")
mock_dataset = mocker.MagicMock(spec=Dataset)
mock_dataset.name = "test_dataset"
mock_dataset.get_snapshots.return_value = (mock_snapshot_0, mock_snapshot_1)
mock_dataset.delete_snapshot.return_value = None
mock_signal_alert = mocker.patch(f"{SNAPSHOT_MANAGER}.signal_alert")
get_snapshots_to_delete(mock_dataset, {"15_min": 1, "hourly": 0, "daily": 0, "monthly": 0})
mock_signal_alert.assert_not_called()
mock_dataset.delete_snapshot.assert_called_once_with("auto_202509150415")
def test_get_snapshots_to_delete_no_snapshot(mocker: MockerFixture) -> None:
"""test_get_snapshots_to_delete_no_snapshot."""
mock_dataset = mocker.MagicMock(spec=Dataset)
mock_dataset.name = "test_dataset"
mock_dataset.get_snapshots.return_value = ()
mock_dataset.delete_snapshot.return_value = None
mock_signal_alert = mocker.patch(f"{SNAPSHOT_MANAGER}.signal_alert")
get_snapshots_to_delete(mock_dataset, {"15_min": 1, "hourly": 0, "daily": 0, "monthly": 0})
mock_signal_alert.assert_not_called()
mock_dataset.delete_snapshot.assert_not_called()
def test_get_snapshots_to_delete_errored(mocker: MockerFixture) -> None:
"""test_get_snapshots_to_delete_errored."""
mock_snapshot_0 = create_mock_snapshot(mocker, "auto_202509150415")
mock_snapshot_1 = create_mock_snapshot(mocker, "auto_202509150415")
mock_dataset = mocker.MagicMock(spec=Dataset)
mock_dataset.name = "test_dataset"
mock_dataset.get_snapshots.return_value = (mock_snapshot_0, mock_snapshot_1)
mock_dataset.delete_snapshot.return_value = "snapshot has dependent clones"
mock_signal_alert = mocker.patch(f"{SNAPSHOT_MANAGER}.signal_alert")
get_snapshots_to_delete(mock_dataset, {"15_min": 1, "hourly": 0, "daily": 0, "monthly": 0})
mock_signal_alert.assert_called_once_with(
"test_dataset@auto_202509150415 failed to delete: snapshot has dependent clones"
)
mock_dataset.delete_snapshot.assert_called_once_with("auto_202509150415")
def test_get_time_stamp(mocker: MockerFixture) -> None:
"""Test get_time_stamp."""
patch_utcnow(mocker, datetime(2023, 1, 1, 0, 0, 0, tzinfo=UTC))
assert get_time_stamp() == "auto_202301010000"

View File

@@ -1,309 +0,0 @@
"""Test zfs."""
import json
from datetime import UTC, datetime
from unittest.mock import call
import pytest
from pytest_mock import MockerFixture
from python.zfs import Dataset, Snapshot, Zpool, get_datasets
from python.zfs.dataset import _zfs_list
from python.zfs.zpool import _zpool_list
DATASET = "python.zfs.dataset"
ZPOOL = "python.zfs.zpool"
SAMPLE_SNAPSHOT_DATA = {
"createtxg": "123",
"properties": {
"creation": {"value": "1620000000"},
"defer_destroy": {"value": "off"},
"guid": {"value": "456"},
"objsetid": {"value": "789"},
"referenced": {"value": "1024"},
"used": {"value": "512"},
"userrefs": {"value": "0"},
"version": {"value": "1"},
"written": {"value": "2048"},
},
"name": "pool/dataset@snap1",
}
SAMPLE_DATASET_DATA = {
"output_version": {"vers_major": 0, "vers_minor": 1, "command": "zfs list"},
"datasets": {
"pool/dataset": {
"properties": {
"aclinherit": {"value": "restricted"},
"aclmode": {"value": "discard"},
"acltype": {"value": "off"},
"available": {"value": "1000000"},
"canmount": {"value": "on"},
"checksum": {"value": "on"},
"clones": {"value": ""},
"compression": {"value": "lz4"},
"copies": {"value": "1"},
"createtxg": {"value": "1234"},
"creation": {"value": "1620000000"},
"dedup": {"value": "off"},
"devices": {"value": "on"},
"encryption": {"value": "off"},
"exec": {"value": "on"},
"filesystem_limit": {"value": "none"},
"guid": {"value": "5678"},
"keystatus": {"value": "none"},
"logbias": {"value": "latency"},
"mlslabel": {"value": "none"},
"mounted": {"value": "yes"},
"mountpoint": {"value": "/pool/dataset"},
"quota": {"value": "0"},
"readonly": {"value": "off"},
"recordsize": {"value": "131072"},
"redundant_metadata": {"value": "all"},
"referenced": {"value": "512000"},
"refquota": {"value": "0"},
"refreservation": {"value": "0"},
"reservation": {"value": "0"},
"setuid": {"value": "on"},
"sharenfs": {"value": "off"},
"snapdir": {"value": "hidden"},
"snapshot_limit": {"value": "none"},
"sync": {"value": "standard"},
"used": {"value": "1024000"},
"usedbychildren": {"value": "512000"},
"usedbydataset": {"value": "256000"},
"usedbysnapshots": {"value": "256000"},
"version": {"value": "5"},
"volmode": {"value": "default"},
"volsize": {"value": "none"},
"vscan": {"value": "off"},
"written": {"value": "4096"},
"xattr": {"value": "on"},
}
}
},
}
SAMPLE_ZPOOL_DATA = {
"output_version": {"vers_major": 0, "vers_minor": 1, "command": "zpool list"},
"pools": {
"testpool": {
"properties": {
"allocated": {"value": "1000000"},
"altroot": {"value": "none"},
"ashift": {"value": "12"},
"autoexpand": {"value": "off"},
"autoreplace": {"value": "off"},
"autotrim": {"value": "off"},
"capacity": {"value": "50"},
"comment": {"value": "test pool"},
"dedupratio": {"value": "1.00x"},
"delegation": {"value": "on"},
"expandsize": {"value": "0"},
"failmode": {"value": "wait"},
"fragmentation": {"value": "20"},
"free": {"value": "1000000"},
"freeing": {"value": "0"},
"guid": {"value": "12345678"},
"health": {"value": "ONLINE"},
"leaked": {"value": "0"},
"readonly": {"value": "off"},
"size": {"value": "2000000"},
}
}
},
}
def test_dataset_initialization(mocker: MockerFixture) -> None:
"""Test Dataset class initialization with mocked ZFS data."""
mocker.patch(f"{DATASET}._zfs_list", return_value=SAMPLE_DATASET_DATA)
dataset = Dataset("pool/dataset")
assert dataset.__dict__ == {
"aclinherit": "restricted",
"aclmode": "discard",
"acltype": "off",
"available": 1000000,
"canmount": "on",
"checksum": "on",
"clones": "",
"compression": "lz4",
"copies": 1,
"createtxg": 1234,
"creation": datetime(2021, 5, 3, 0, 0, tzinfo=UTC),
"dedup": "off",
"devices": "on",
"encryption": "off",
"exec": "on",
"filesystem_limit": "none",
"guid": 5678,
"keystatus": "none",
"logbias": "latency",
"mlslabel": "none",
"mounted": "yes",
"mountpoint": "/pool/dataset",
"name": "pool/dataset",
"quota": 0,
"readonly": "off",
"recordsize": 131072,
"redundant_metadata": "all",
"referenced": 512000,
"refquota": 0,
"refreservation": 0,
"reservation": 0,
"setuid": "on",
"sharenfs": "off",
"snapdir": "hidden",
"snapshot_limit": "none",
"sync": "standard",
"used": 1024000,
"usedbychildren": 512000,
"usedbydataset": 256000,
"usedbysnapshots": 256000,
"version": 5,
"volmode": "default",
"volsize": "none",
"vscan": "off",
"written": 4096,
"xattr": "on",
}
def test_snapshot_initialization() -> None:
"""Test Snapshot class initialization with mocked ZFS data."""
snapshot = Snapshot(SAMPLE_SNAPSHOT_DATA)
assert snapshot.__dict__ == {
"createtxg": 123,
"creation": datetime(2021, 5, 3, 0, 0, tzinfo=UTC),
"defer_destroy": "off",
"guid": 456,
"name": "snap1",
"objsetid": 789,
"referenced": 1024,
"used": 512,
"userrefs": 0,
"version": 1,
"written": 2048,
}
def test_zfs_list_version_check(mocker: MockerFixture) -> None:
"""Test version validation in _zfs_list."""
mocker.patch(
f"{DATASET}.bash_wrapper",
return_value=(
json.dumps({"output_version": {"vers_major": 1, "vers_minor": 0, "command": "zfs list"}}),
0,
),
)
with pytest.raises(RuntimeError) as excinfo:
_zfs_list("zfs list invalid -pHj -o all")
assert "Datasets are not in the correct format" in str(excinfo.value)
def test_get_datasets(mocker: MockerFixture) -> None:
"""Test get_datasets."""
mock_bash = mocker.patch(f"{DATASET}.bash_wrapper", return_value=("pool/dataset\npool/other\ninvalid", 0))
mock_dataset = mocker.patch(f"{DATASET}.Dataset")
get_datasets()
mock_bash.assert_called_once_with("zfs list -Hp -t filesystem -o name")
calls = [call("pool/dataset"), call("pool/other")]
mock_dataset.assert_has_calls(calls)
def test_zpool_initialization(mocker: MockerFixture) -> None:
"""Test Zpool class initialization with mocked ZFS data."""
mocker.patch(f"{ZPOOL}._zpool_list", return_value=SAMPLE_ZPOOL_DATA)
zpool = Zpool("testpool")
assert zpool.__dict__ == {
"name": "testpool",
"allocated": 1000000,
"altroot": "none",
"ashift": 12,
"autoexpand": "off",
"autoreplace": "off",
"autotrim": "off",
"capacity": 50,
"comment": "test pool",
"dedupratio": "1.00x",
"delegation": "on",
"expandsize": "0",
"failmode": "wait",
"fragmentation": 20,
"free": "1000000",
"freeing": 0,
"guid": 12345678,
"health": "ONLINE",
"leaked": 0,
"readonly": "off",
"size": 2000000,
}
def test_zpool_repr(mocker: MockerFixture) -> None:
"""Test Zpool string representation."""
mocker.patch(f"{ZPOOL}._zpool_list", return_value=SAMPLE_ZPOOL_DATA)
zpool = Zpool("testpool")
repr_string = repr(zpool)
expected_attrs = [
"name",
"allocated",
"altroot",
"ashift",
"autoexpand",
"autoreplace",
"autotrim",
"capacity",
"comment",
"dedupratio",
"delegation",
"expandsize",
"failmode",
"fragmentation",
"freeing",
"guid",
"health",
"leaked",
"readonly",
"size",
]
for attr in expected_attrs:
assert f"{attr}=" in repr_string
def test_zpool_list(mocker: MockerFixture) -> None:
"""Test version validation in _zpool_list."""
mocker.patch(
f"{ZPOOL}.bash_wrapper",
return_value=(json.dumps({"output_version": {"vers_major": 0, "vers_minor": 1, "command": "zpool list"}}), 0),
)
result = _zpool_list("zpool list invalid -pHj -o all")
assert result == {"output_version": {"command": "zpool list", "vers_major": 0, "vers_minor": 1}}
def test_zpool_list_version_check(mocker: MockerFixture) -> None:
"""Test version validation in _zpool_list."""
mocker.patch(
f"{ZPOOL}.bash_wrapper",
return_value=(json.dumps({"output_version": {"vers_major": 1, "vers_minor": 0, "command": "zpool list"}}), 0),
)
with pytest.raises(RuntimeError) as excinfo:
_zpool_list("zpool list invalid -pHj -o all")
assert "Datasets are not in the correct format" in str(excinfo.value)

View File

@@ -12,15 +12,3 @@ command = "nixfmt"
#options = [] #options = []
# Glob pattern of files to include # Glob pattern of files to include
includes = ["*.nix"] includes = ["*.nix"]
[formatter.ruff-format]
command = "ruff"
options = ["format"]
includes = ["python/**/*.py"]
priority = 0
[formatter.ruff]
command = "ruff"
options = ["check", "--fix"]
includes = ["python/**/*.py"]
priority = 1

View File

@@ -1,26 +0,0 @@
{
pkgs,
...
}:
{
users = {
users.dov = {
isNormalUser = true;
shell = pkgs.zsh;
group = "dov";
openssh.authorizedKeys.keys = [
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQCoSBmMfvp6aThkrfkLQ4TDwZJD0HCC0lsTIRNREIAWeduNkVFdkO3T1SMLmCKJ+zlL9xziNBEqB8NFl4TabAhptIGlKgTOc8C9eDaPQMQV8PB/4DxZhCt7O2qb4Vfcz82IHPtdwwaIsGpssgh81mQ4tPtP8BI0TluRBy+2v095s20j+PYRGrAXJtOWpVY9zaXxEJ8dXDhHDF2xzxvUcwu9NsoY8t+4/ZJ1mrTTG+eGp5gLAKnSVhAGgsmqCY577Nkso1jSzKer1XcCiaGIedpLuTzmUbOFFEVhhTSy+Ki1NLEcjGW2e6Vyg5Pm8VGN75MqyDZsi5igv9Grxq62EHQ4mFea9rns48B74O/bNQ1OoLVO9u/HwdLCgBTZzptrCmNwML6kBWrmCogoH3ueVbHwDCW5kTTMBCoVV+HaZ+qDWu7xZVx49MuCK29QGZj/IrN0N7h78KL0NYajdei87R0mcmWGP0YaJTdCQ4iKi9c77BUYQp+Qpqt+mnIX0cgjQOU= dkruger@kzin" # cspell:disable-line
];
extraGroups = [
"audio"
"video"
"users"
];
uid = 1004;
};
groups.dov.gid = 1004;
};
}

View File

@@ -1,9 +0,0 @@
{
imports = [
./direnv.nix
./git.nix
./zsh.nix
];
programs.starship.enable = true;
}

View File

@@ -1,8 +0,0 @@
{
programs.direnv = {
enable = true;
enableZshIntegration = true;
nix-direnv.enable = true;
};
}

View File

@@ -1,12 +0,0 @@
{
programs.git = {
enable = true;
userEmail = "dov.kruger@gmail.com";
userName = "Dov Kruger";
extraConfig = {
pull.rebase = true;
color.ui = true;
};
lfs.enable = true;
};
}

View File

@@ -1,27 +0,0 @@
{
programs.zsh = {
enable = true;
syntaxHighlighting.enable = true;
history.size = 10000;
oh-my-zsh = {
enable = true;
plugins = [
"git"
"docker"
"docker-compose"
"colored-man-pages"
"rust"
"systemd"
"tmux"
"ufw"
"z"
];
};
shellAliases = {
"lrt" = "eza --icons -lsnew";
"ls" = "eza";
"ll" = "eza --long --group";
"la" = "eza --all";
};
};
}

View File

@@ -1,22 +0,0 @@
{ config, ... }:
{
imports = [
./cli
./programs.nix
./ssh_config.nix
];
programs = {
home-manager.enable = true;
git.enable = true;
};
home = {
username = "dov";
homeDirectory = "/home/${config.home.username}";
stateVersion = "24.05";
sessionVariables = {
FLAKE = "$HOME/dotfiles";
};
};
}

View File

@@ -1,58 +0,0 @@
{ pkgs, ... }:
{
home.packages = with pkgs; [
# cli
bat
btop
eza
fd
ffmpegthumbnailer
fzf
git
gnupg
imagemagick
jq
ncdu
neofetch
ouch
p7zip
poppler
rar
ripgrep
starship
tmux
unzip
yazi
zoxide
# system info
hwloc
lynis
pciutils
smartmontools
usbutils
# networking
iperf3
nmap
wget
# python
poetry
ruff
uv
# nodejs
nodejs
# Rust packages
trunk
wasm-pack
cargo-watch
cargo-generate
cargo-audit
cargo-update
# nix
nix-init
nix-output-monitor
nix-prefetch
nix-tree
nixfmt-rfc-style
treefmt
];
}

View File

@@ -1,6 +0,0 @@
{
programs.ssh = {
enable = true;
enableDefaultConfig = false;
};
}

View File

@@ -1,5 +0,0 @@
{
imports = [
../home/global.nix
];
}

View File

@@ -1,45 +0,0 @@
{
pkgs,
config,
...
}:
let
ifTheyExist = groups: builtins.filter (group: builtins.hasAttr group config.users.groups) groups;
in
{
sops.secrets.elise_password = {
sopsFile = ../secrets.yaml;
neededForUsers = true;
};
users = {
users.elise = {
isNormalUser = true;
hashedPasswordFile = "${config.sops.secrets.elise_password.path}";
shell = pkgs.zsh;
group = "elise";
openssh.authorizedKeys.keys = [
"ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIJYZFsc9CSH03ZUP7y81AHwSyjLwFmcshVFCyxDcYhBT rhapsody-in-green" # cspell:disable-line
];
extraGroups = [
"audio"
"video"
"users"
]
++ ifTheyExist [
"dialout"
"networkmanager"
"plugdev"
"scanner"
];
uid = 1010;
};
groups.elise.gid = 1010;
};
home-manager.users.elise = import ./systems/${config.networking.hostName}.nix;
}

Some files were not shown because too many files have changed in this diff Show More