converted to device ids instead of device aliases

This commit is contained in:
2025-01-05 10:49:00 -05:00
parent d991f94d7e
commit b3be9dd2c8

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
import curses import curses
import logging import logging
import sys import sys
from collections import defaultdict
from os import getenv from os import getenv
from pathlib import Path from pathlib import Path
from random import getrandbits from random import getrandbits
@@ -138,7 +139,12 @@ def create_zfs_datasets() -> None:
datasets = bash_wrapper("zfs list -o name") datasets = bash_wrapper("zfs list -o name")
expected_datasets = {"root_pool/root", "root_pool/home", "root_pool/var"} expected_datasets = {
"root_pool/root",
"root_pool/home",
"root_pool/var",
"root_pool/nix",
}
missing_datasets = expected_datasets.difference(datasets.splitlines()) missing_datasets = expected_datasets.difference(datasets.splitlines())
if missing_datasets: if missing_datasets:
logging.critical(f"Failed to create pools {missing_datasets}") logging.critical(f"Failed to create pools {missing_datasets}")
@@ -202,13 +208,12 @@ def create_nix_hardware_file(mnt_dir: str, disks: Sequence[str], encrypt: bool)
' "/nix" = {\n device = "root_pool/var";\n fsType = "zfs";\n };\n\n' ' "/nix" = {\n device = "root_pool/var";\n fsType = "zfs";\n };\n\n'
' "/boot" = {\n' ' "/boot" = {\n'
f" device = /dev/disk/by-uuid/{get_boot_drive_id(disks[0])};\n" f" device = /dev/disk/by-uuid/{get_boot_drive_id(disks[0])};\n"
' fsType = "vfat";\n' ' fsType = "vfat";\n options = [\n "fmask=0077"\n "dmask=0077"\n ];\n };\n };\n\n'
' options = [\n "fmask=0077"\n "dmask=0077"\n ];\n };\n };\n\n'
" swapDevices = [ ];\n\n" " swapDevices = [ ];\n\n"
" networking.useDHCP = lib.mkDefault true;\n\n" " networking.useDHCP = lib.mkDefault true;\n\n"
' nixpkgs.hostPlatform = lib.mkDefault "x86_64-linux";\n' ' nixpkgs.hostPlatform = lib.mkDefault "x86_64-linux";\n'
f" hardware.cpu.{cpu_manufacturer}.updateMicrocode = lib.mkDefault config.hardware.enableRedistributableFirmware;\n" f" hardware.cpu.{cpu_manufacturer}.updateMicrocode = lib.mkDefault config.hardware.enableRedistributableFirmware;\n"
f'networking.hostId = "{host_id}";\n' f' networking.hostId = "{host_id}";\n'
"}\n" "}\n"
) )
@@ -331,6 +336,8 @@ class Cursor:
class State: class State:
"""State class to store the state of the program."""
def __init__(self): def __init__(self):
self.key = 0 self.key = 0
self.cursor = Cursor() self.cursor = Cursor()
@@ -341,7 +348,11 @@ class State:
self.reserve_size = 0 self.reserve_size = 0
self.show_reserve_input = False self.show_reserve_input = False
self.selected_devices = set() self.selected_device_ids = set()
def get_selected_devices(self) -> tuple[str]:
"""Get selected devices."""
return tuple(self.selected_device_ids)
def get_device(raw_device: str) -> dict[str, str]: def get_device(raw_device: str) -> dict[str, str]:
@@ -359,19 +370,76 @@ def get_devices() -> list[dict[str, str]]:
return [get_device(raw_device) for raw_device in raw_devices] return [get_device(raw_device) for raw_device in raw_devices]
def get_device_id_mapping() -> dict[str, set[str]]:
"""Get a list of device ids.
Returns:
list[str]: the list of device ids
"""
device_ids = bash_wrapper("find /dev/disk/by-id -type l").splitlines()
device_id_mapping: dict[str, set[str]] = defaultdict(set)
for device_id in device_ids:
device = bash_wrapper(f"readlink -f {device_id}").strip()
device_id_mapping[device].add(device_id)
return device_id_mapping
def calculate_device_menu_padding( def calculate_device_menu_padding(
devices: list[dict[str, str]], column: str, padding: int = 0 devices: list[dict[str, str]], column: str, padding: int = 0
) -> int: ) -> int:
return max(len(device[column]) for device in devices) + padding return max(len(device[column]) for device in devices) + padding
def draw_device_ids(
state: State,
row_number: int,
menu_start_x: int,
std_screen: curses.window,
menu_width: list[int],
device_ids: set[str],
) -> tuple[State, int]:
for device_id in sorted(device_ids):
row_number = row_number + 1
if row_number == state.cursor.get_y() and state.cursor.get_x() in menu_width:
std_screen.attron(curses.A_BOLD)
if state.key == ord(" "):
if device_id not in state.selected_device_ids:
state.selected_device_ids.add(device_id)
else:
state.selected_device_ids.remove(device_id)
if device_id in state.selected_device_ids:
std_screen.attron(curses.color_pair(7))
std_screen.addstr(row_number, menu_start_x, f" {device_id}")
std_screen.attroff(curses.color_pair(7))
std_screen.attroff(curses.A_BOLD)
return state, row_number
def draw_device_menu( def draw_device_menu(
std_screen: curses.window, std_screen: curses.window,
devices: list[dict[str, str]], devices: list[dict[str, str]],
device_id_mapping: dict[str, set[str]],
state: State, state: State,
menu_start_y: int = 0, menu_start_y: int = 0,
menu_start_x: int = 0, menu_start_x: int = 0,
) -> State: ) -> State:
"""draw the device menu and handle user input
Args:
std_screen (curses.window): the curses window to draw on
devices (list[dict[str, str]]): the list of devices to draw
device_id_mapping (dict[str, set[str]]): the list of device ids to draw
state (State): the state object to update
menu_start_y (int, optional): the y position to start drawing the menu. Defaults to 0.
menu_start_x (int, optional): the x position to start drawing the menu. Defaults to 0.
Returns:
State: the updated state object
"""
padding = 2 padding = 2
name_padding = calculate_device_menu_padding(devices, "name", padding) name_padding = calculate_device_menu_padding(devices, "name", padding)
@@ -386,32 +454,29 @@ def draw_device_menu(
std_screen.addstr(menu_start_y, menu_start_x, device_header, curses.color_pair(5)) std_screen.addstr(menu_start_y, menu_start_x, device_header, curses.color_pair(5))
devises_list_start = menu_start_y + 1 devises_list_start = menu_start_y + 1
for index, device in enumerate(devices): row_number = devises_list_start
device_row_y = devises_list_start + index
for device in devices:
row_number = row_number + 1
device_name = device["name"]
device_row = ( device_row = (
f"{device['name']:{name_padding}}" f"{device_name:{name_padding}}"
f"{device['size']:{size_padding}}" f"{device['size']:{size_padding}}"
f"{device['type']:{type_padding}}" f"{device['type']:{type_padding}}"
f"{device['mountpoints']:{mountpoints_padding}}" f"{device['mountpoints']:{mountpoints_padding}}"
) )
std_screen.addstr(row_number, menu_start_x, device_row)
if device_row_y == state.cursor.get_y() and state.cursor.get_x() in menu_width: state, row_number = draw_device_ids(
std_screen.attron(curses.A_BOLD) state=state,
if state.key == ord(" "): row_number=row_number,
if device["name"] not in state.selected_devices: menu_start_x=menu_start_x,
state.selected_devices.add(device["name"]) std_screen=std_screen,
else: menu_width=menu_width,
state.selected_devices.remove(device["name"]) device_ids=device_id_mapping[device_name],
)
if device["name"] in state.selected_devices: return state, row_number
std_screen.attron(curses.color_pair(7))
std_screen.addstr(device_row_y, menu_start_x, device_row)
std_screen.attroff(curses.color_pair(7))
std_screen.attroff(curses.A_BOLD)
return state
def debug_menu(std_screen: curses.window, key: int) -> None: def debug_menu(std_screen: curses.window, key: int) -> None:
@@ -528,6 +593,12 @@ def reserve_size_input(
def draw_menu(std_screen: curses.window) -> State: def draw_menu(std_screen: curses.window) -> State:
"""draw the menu and handle user input
Args:
std_screen (curses.window): the curses window to draw on
Returns:
State: the state object
"""
# Clear and refresh the screen for a blank canvas # Clear and refresh the screen for a blank canvas
std_screen.clear() std_screen.clear()
std_screen.refresh() std_screen.refresh()
@@ -538,6 +609,8 @@ def draw_menu(std_screen: curses.window) -> State:
devices = get_devices() devices = get_devices()
device_id_mapping = get_device_id_mapping()
# Loop where k is the last character pressed # Loop where k is the last character pressed
while state.key != ord("q"): while state.key != ord("q"):
std_screen.clear() std_screen.clear()
@@ -548,15 +621,14 @@ def draw_menu(std_screen: curses.window) -> State:
state.cursor.navigation(state.key) state.cursor.navigation(state.key)
state = draw_device_menu( state, device_menu_size = draw_device_menu(
std_screen=std_screen, std_screen=std_screen,
state=state, state=state,
devices=devices, devices=devices,
device_id_mapping=device_id_mapping,
) )
device_count = len(devices) swap_offset = device_menu_size + 2
swap_offset = device_count + 2
swap_size_input( swap_size_input(
std_screen=std_screen, std_screen=std_screen,
@@ -590,13 +662,15 @@ def main() -> None:
encrypt_key = getenv("ENCRYPT_KEY") encrypt_key = getenv("ENCRYPT_KEY")
logging.info("installing_nixos") logging.info("installing_nixos")
logging.info(f"disks: {state.selected_devices}") logging.info(f"disks: {state.selected_device_ids}")
logging.info(f"swap_size: {state.swap_size}") logging.info(f"swap_size: {state.swap_size}")
logging.info(f"reserve: {state.reserve_size}") logging.info(f"reserve: {state.reserve_size}")
logging.info(f"encrypted: {bool(encrypt_key)}") logging.info(f"encrypted: {bool(encrypt_key)}")
sleep(3)
installer( installer(
disks=state.selected_devices, disks=state.get_selected_devices(),
swap_size=state.swap_size, swap_size=state.swap_size,
reserve=state.reserve_size, reserve=state.reserve_size,
encrypt_key=encrypt_key, encrypt_key=encrypt_key,