cleaned up installer.py
treefmt / nix fmt (pull_request) Successful in 6s
pytest / pytest (pull_request) Successful in 31s
build_systems / build-brain (pull_request) Successful in 47s
build_systems / build-bob (pull_request) Successful in 51s
build_systems / build-leviathan (pull_request) Successful in 56s
build_systems / build-rhapsody-in-green (pull_request) Successful in 1m3s
build_systems / build-jeeves (pull_request) Successful in 2m39s

This commit is contained in:
2026-06-12 13:24:31 -04:00
parent 8d43fcbc09
commit 19312962f8
+53 -51
View File
@@ -1,3 +1,4 @@
# ruff: noqa: LOG015, E501, D102, D103, D107 These need the be fixed
"""Install NixOS on a ZFS pool.""" """Install NixOS on a ZFS pool."""
from __future__ import annotations from __future__ import annotations
@@ -19,6 +20,7 @@ if TYPE_CHECKING:
def configure_logger(level: str = "INFO") -> None: def configure_logger(level: str = "INFO") -> None:
"""Configure the logger. """Configure the logger.
Args: Args:
level (str, optional): The logging level. Defaults to "INFO". level (str, optional): The logging level. Defaults to "INFO".
""" """
@@ -32,15 +34,17 @@ def configure_logger(level: str = "INFO") -> None:
def bash_wrapper(command: str) -> str: def bash_wrapper(command: str) -> str:
"""Execute a bash command and capture the output. """Execute a bash command and capture the output.
Args: Args:
command (str): The bash command to be executed. command (str): The bash command to be executed.
Returns: Returns:
Tuple[str, int]: A tuple containing the output of the command (stdout) as a string, Tuple[str, int]: A tuple containing the output of the command (stdout) as a string,
the error output (stderr) as a string (optional), and the return code as an integer. the error output (stderr) as a string (optional), and the return code as an integer.
""" """
logging.debug(f"running {command=}") logging.debug(f"running {command=}")
# This is a acceptable risk # This is a acceptable risk
process = Popen(command.split(), stdout=PIPE, stderr=PIPE) # noqa: S603 process = Popen(command.split(), stdout=PIPE, stderr=PIPE)
output, _ = process.communicate() output, _ = process.communicate()
if process.returncode != 0: if process.returncode != 0:
error = f"Failed to run command {command=} return code {process.returncode=}" error = f"Failed to run command {command=} return code {process.returncode=}"
@@ -51,6 +55,7 @@ def bash_wrapper(command: str) -> str:
def partition_disk(disk: str, swap_size: int, reserve: int = 0) -> None: def partition_disk(disk: str, swap_size: int, reserve: int = 0) -> None:
"""Partition a disk. """Partition a disk.
Args: Args:
disk (str): The disk to partition. disk (str): The disk to partition.
swap_size (int): The size of the swap partition in GB. swap_size (int): The size of the swap partition in GB.
@@ -92,8 +97,9 @@ def partition_disk(disk: str, swap_size: int, reserve: int = 0) -> None:
def create_zfs_pool(pool_disks: Sequence[str], mnt_dir: str) -> None: def create_zfs_pool(pool_disks: Sequence[str], mnt_dir: str) -> None:
"""Create a ZFS pool. """Create a ZFS pool.
Args: Args:
disks (Sequence[str]): A tuple of disks to use for the pool. pool_disks (Sequence[str]): A tuple of disks to use for the pool.
mnt_dir (str): The mount directory. mnt_dir (str): The mount directory.
""" """
if len(pool_disks) <= 0: if len(pool_disks) <= 0:
@@ -131,7 +137,6 @@ def create_zfs_pool(pool_disks: Sequence[str], mnt_dir: str) -> None:
def create_zfs_datasets() -> None: def create_zfs_datasets() -> None:
"""Create ZFS datasets.""" """Create ZFS datasets."""
bash_wrapper("zfs create -o canmount=noauto -o reservation=10G root_pool/root") bash_wrapper("zfs create -o canmount=noauto -o reservation=10G root_pool/root")
bash_wrapper("zfs create root_pool/home") bash_wrapper("zfs create root_pool/home")
bash_wrapper("zfs create root_pool/var -o reservation=1G") bash_wrapper("zfs create root_pool/var -o reservation=1G")
@@ -160,6 +165,9 @@ def get_cpu_manufacturer() -> str:
if "vendor_id" in line: if "vendor_id" in line:
return id_vendor[line.split(": ")[1].strip()] return id_vendor[line.split(": ")[1].strip()]
error = "CPU manufacturer not found"
raise RuntimeError(error)
def get_boot_drive_id(disk: str) -> str: def get_boot_drive_id(disk: str) -> str:
"""Get the boot drive ID.""" """Get the boot drive ID."""
@@ -167,9 +175,8 @@ def get_boot_drive_id(disk: str) -> str:
return output.splitlines()[1] return output.splitlines()[1]
def create_nix_hardware_file(mnt_dir: str, disks: Sequence[str], encrypt: bool) -> None: def create_nix_hardware_file(mnt_dir: str, disks: Sequence[str], *, encrypt: bool) -> None:
"""Create a NixOS hardware file.""" """Create a NixOS hardware file."""
cpu_manufacturer = get_cpu_manufacturer() cpu_manufacturer = get_cpu_manufacturer()
devices = "" devices = ""
@@ -219,7 +226,7 @@ def create_nix_hardware_file(mnt_dir: str, disks: Sequence[str], encrypt: bool)
Path(f"{mnt_dir}/etc/nixos/hardware-configuration.nix").write_text(nix_hardware) Path(f"{mnt_dir}/etc/nixos/hardware-configuration.nix").write_text(nix_hardware)
def install_nixos(mnt_dir: str, disks: Sequence[str], encrypt: bool) -> None: def install_nixos(mnt_dir: str, disks: Sequence[str], *, encrypt: bool) -> None:
"""Install NixOS.""" """Install NixOS."""
bash_wrapper(f"mount -o X-mount.mkdir -t zfs root_pool/root {mnt_dir}") bash_wrapper(f"mount -o X-mount.mkdir -t zfs root_pool/root {mnt_dir}")
bash_wrapper(f"mount -o X-mount.mkdir -t zfs root_pool/home {mnt_dir}/home") bash_wrapper(f"mount -o X-mount.mkdir -t zfs root_pool/home {mnt_dir}/home")
@@ -230,14 +237,16 @@ def install_nixos(mnt_dir: str, disks: Sequence[str], encrypt: bool) -> None:
bash_wrapper(f"mkfs.vfat -n EFI {disk}-part1") bash_wrapper(f"mkfs.vfat -n EFI {disk}-part1")
# set up mirroring afterwards if more than one disk # set up mirroring afterwards if more than one disk
boot_partition = f"mount -t vfat -o fmask=0077,dmask=0077,iocharset=iso8859-1,X-mount.mkdir {disks[0]}-part1 {mnt_dir}/boot" boot_partition = (
f"mount -t vfat -o fmask=0077,dmask=0077,iocharset=iso8859-1,X-mount.mkdir {disks[0]}-part1 {mnt_dir}/boot"
)
bash_wrapper(boot_partition) bash_wrapper(boot_partition)
bash_wrapper(f"nixos-generate-config --root {mnt_dir}") bash_wrapper(f"nixos-generate-config --root {mnt_dir}")
create_nix_hardware_file(mnt_dir, disks, encrypt) create_nix_hardware_file(mnt_dir, disks, encrypt=encrypt)
run(("nixos-install", "--root", mnt_dir), check=True) # noqa: S603 run(("nixos-install", "--root", mnt_dir), check=True)
def installer( def installer(
@@ -258,16 +267,14 @@ def installer(
f'printf "{encrypt_key}" | cryptsetup luksFormat --type luks2 {disk}-part2 -', f'printf "{encrypt_key}" | cryptsetup luksFormat --type luks2 {disk}-part2 -',
f'printf "{encrypt_key}" | cryptsetup luksOpen {disk}-part2 luks-root-pool-{disk.split("/")[-1]}-part2 -', f'printf "{encrypt_key}" | cryptsetup luksOpen {disk}-part2 luks-root-pool-{disk.split("/")[-1]}-part2 -',
): ):
run(command, shell=True, check=True) run(command, shell=True, check=True) # noqa: S602
mnt_dir = "/tmp/nix_install" # noqa: S108 mnt_dir = "/tmp/nix_install" # noqa: S108
Path(mnt_dir).mkdir(parents=True, exist_ok=True) Path(mnt_dir).mkdir(parents=True, exist_ok=True)
if encrypt_key: if encrypt_key:
pool_disks = [ pool_disks = [f"/dev/mapper/luks-root-pool-{disk.split('/')[-1]}-part2" for disk in disks]
f"/dev/mapper/luks-root-pool-{disk.split('/')[-1]}-part2" for disk in disks
]
else: else:
pool_disks = [f"{disk}-part2" for disk in disks] pool_disks = [f"{disk}-part2" for disk in disks]
@@ -275,22 +282,24 @@ def installer(
create_zfs_datasets() create_zfs_datasets()
install_nixos(mnt_dir, disks, encrypt_key) install_nixos(mnt_dir, disks, encrypt=encrypt_key)
logging.info("Installation complete") logging.info("Installation complete")
class Cursor: class Cursor:
def __init__(self): """Cursor class to store the cursor position."""
def __init__(self) -> None:
self.x_position = 0 self.x_position = 0
self.y_position = 0 self.y_position = 0
self.height = 0 self.height = 0
self.width = 0 self.width = 0
def set_height(self, height: int): def set_height(self, height: int) -> None:
self.height = height self.height = height
def set_width(self, width: int): def set_width(self, width: int) -> None:
self.width = width self.width = width
def x_bounce_check(self, cursor: int) -> int: def x_bounce_check(self, cursor: int) -> int:
@@ -301,10 +310,10 @@ class Cursor:
cursor = max(0, cursor) cursor = max(0, cursor)
return min(self.height - 1, cursor) return min(self.height - 1, cursor)
def set_x(self, x: int): def set_x(self, x: int) -> None:
self.x_position = self.x_bounce_check(x) self.x_position = self.x_bounce_check(x)
def set_y(self, y: int): def set_y(self, y: int) -> None:
self.y_position = self.y_bounce_check(y) self.y_position = self.y_bounce_check(y)
def get_x(self) -> int: def get_x(self) -> int:
@@ -313,16 +322,16 @@ class Cursor:
def get_y(self) -> int: def get_y(self) -> int:
return self.y_position return self.y_position
def move_up(self): def move_up(self) -> None:
self.set_y(self.y_position - 1) self.set_y(self.y_position - 1)
def move_down(self): def move_down(self) -> None:
self.set_y(self.y_position + 1) self.set_y(self.y_position + 1)
def move_left(self): def move_left(self) -> None:
self.set_x(self.x_position - 1) self.set_x(self.x_position - 1)
def move_right(self): def move_right(self) -> None:
self.set_x(self.x_position + 1) self.set_x(self.x_position + 1)
def navigation(self, key: int) -> None: def navigation(self, key: int) -> None:
@@ -339,7 +348,7 @@ class Cursor:
class State: class State:
"""State class to store the state of the program.""" """State class to store the state of the program."""
def __init__(self): def __init__(self) -> None:
self.key = 0 self.key = 0
self.cursor = Cursor() self.cursor = Cursor()
@@ -358,10 +367,7 @@ class State:
def get_device(raw_device: str) -> dict[str, str]: def get_device(raw_device: str) -> dict[str, str]:
raw_device_components = raw_device.split(" ") raw_device_components = raw_device.split(" ")
return { return {thing.split("=")[0].lower(): thing.split("=")[1].strip('"') for thing in raw_device_components}
thing.split("=")[0].lower(): thing.split("=")[1].strip('"')
for thing in raw_device_components
}
def get_devices() -> list[dict[str, str]]: def get_devices() -> list[dict[str, str]]:
@@ -373,6 +379,7 @@ def get_devices() -> list[dict[str, str]]:
def get_device_id_mapping() -> dict[str, set[str]]: def get_device_id_mapping() -> dict[str, set[str]]:
"""Get a list of device ids. """Get a list of device ids.
Returns: Returns:
list[str]: the list of device ids list[str]: the list of device ids
""" """
@@ -387,9 +394,7 @@ def get_device_id_mapping() -> dict[str, set[str]]:
return device_id_mapping return device_id_mapping
def calculate_device_menu_padding( def calculate_device_menu_padding(devices: list[dict[str, str]], column: str, padding: int = 0) -> int:
devices: list[dict[str, str]], column: str, padding: int = 0
) -> int:
return max(len(device[column]) for device in devices) + padding return max(len(device[column]) for device in devices) + padding
@@ -430,7 +435,8 @@ def draw_device_menu(
menu_start_y: int = 0, menu_start_y: int = 0,
menu_start_x: int = 0, menu_start_x: int = 0,
) -> State: ) -> State:
"""draw the device menu and handle user input """Draw the device menu and handle user input.
Args: Args:
std_screen (curses.window): the curses window to draw on std_screen (curses.window): the curses window to draw on
devices (list[dict[str, str]]): the list of devices to draw devices (list[dict[str, str]]): the list of devices to draw
@@ -438,6 +444,7 @@ def draw_device_menu(
state (State): the state object to update state (State): the state object to update
menu_start_y (int, optional): the y position to start drawing the menu. Defaults to 0. menu_start_y (int, optional): the y position to start drawing the menu. Defaults to 0.
menu_start_x (int, optional): the x position to start drawing the menu. Defaults to 0. menu_start_x (int, optional): the x position to start drawing the menu. Defaults to 0.
Returns: Returns:
State: the updated state object State: the updated state object
""" """
@@ -448,7 +455,9 @@ def draw_device_menu(
type_padding = calculate_device_menu_padding(devices, "type", padding) type_padding = calculate_device_menu_padding(devices, "type", padding)
mountpoints_padding = calculate_device_menu_padding(devices, "mountpoints", padding) mountpoints_padding = calculate_device_menu_padding(devices, "mountpoints", padding)
device_header = f"{'Name':{name_padding}}{'Size':{size_padding}}{'Type':{type_padding}}{'Mountpoints':{mountpoints_padding}}" device_header = (
f"{'Name':{name_padding}}{'Size':{size_padding}}{'Type':{type_padding}}{'Mountpoints':{mountpoints_padding}}"
)
menu_width = range(menu_start_x, len(device_header) + menu_start_x) menu_width = range(menu_start_x, len(device_header) + menu_start_x)
@@ -482,7 +491,7 @@ def draw_device_menu(
def debug_menu(std_screen: curses.window, key: int) -> None: def debug_menu(std_screen: curses.window, key: int) -> None:
height, width = std_screen.getmaxyx() height, width = std_screen.getmaxyx()
width_height = "Width: {}, Height: {}".format(width, height) width_height = f"Width: {width}, Height: {height}"
std_screen.addstr(height - 4, 0, width_height, curses.color_pair(5)) std_screen.addstr(height - 4, 0, width_height, curses.color_pair(5))
key_pressed = f"Last key pressed: {key}"[: width - 1] key_pressed = f"Last key pressed: {key}"[: width - 1]
@@ -490,7 +499,7 @@ def debug_menu(std_screen: curses.window, key: int) -> None:
key_pressed = "No key press detected..."[: width - 1] key_pressed = "No key press detected..."[: width - 1]
std_screen.addstr(height - 3, 0, key_pressed) std_screen.addstr(height - 3, 0, key_pressed)
for i in range(0, 8): for i in range(8):
std_screen.addstr(height - 2, i * 3, f"{i}██", curses.color_pair(i)) std_screen.addstr(height - 2, i * 3, f"{i}██", curses.color_pair(i))
@@ -503,9 +512,7 @@ def status_bar(
std_screen.attron(curses.A_REVERSE) std_screen.attron(curses.A_REVERSE)
std_screen.attron(curses.color_pair(3)) std_screen.attron(curses.color_pair(3))
status_bar = ( status_bar = f"Press 'q' to exit | STATUS BAR | Pos: {cursor.get_x()}, {cursor.get_y()}"
f"Press 'q' to exit | STATUS BAR | Pos: {cursor.get_x()}, {cursor.get_y()}"
)
std_screen.addstr(height - 1, 0, status_bar) std_screen.addstr(height - 1, 0, status_bar)
std_screen.addstr(height - 1, len(status_bar), " " * (width - len(status_bar) - 1)) std_screen.addstr(height - 1, len(status_bar), " " * (width - len(status_bar) - 1))
@@ -516,7 +523,7 @@ def status_bar(
def set_color() -> None: def set_color() -> None:
curses.start_color() curses.start_color()
curses.use_default_colors() curses.use_default_colors()
for i in range(0, curses.COLORS): for i in range(curses.COLORS):
curses.init_pair(i + 1, i, -1) curses.init_pair(i + 1, i, -1)
@@ -528,10 +535,10 @@ def get_text_input(std_screen: curses.window, prompt: str, y: int, x: int) -> st
key = std_screen.getch() key = std_screen.getch()
if key == ord("\n"): if key == ord("\n"):
break break
elif key == 27: # ESC key if key == 27: # ESC key # noqa: PLR2004
input_str = "" input_str = ""
break break
elif key in (curses.KEY_BACKSPACE, ord("\b"), 127): if key in (curses.KEY_BACKSPACE, ord("\b"), 127):
input_str = input_str[:-1] input_str = input_str[:-1]
std_screen.addstr(y, x + len(prompt), input_str + " ") std_screen.addstr(y, x + len(prompt), input_str + " ")
else: else:
@@ -557,9 +564,7 @@ def swap_size_input(
state.swap_size = int(swap_size_str) state.swap_size = int(swap_size_str)
state.show_swap_input = False state.show_swap_input = False
except ValueError: except ValueError:
std_screen.addstr( std_screen.addstr(swap_offset, 0, "Invalid input. Press any key to continue.")
swap_offset, 0, "Invalid input. Press any key to continue."
)
std_screen.getch() std_screen.getch()
state.show_swap_input = False state.show_swap_input = False
@@ -577,16 +582,12 @@ def reserve_size_input(
state.show_reserve_input = True state.show_reserve_input = True
if state.show_reserve_input: if state.show_reserve_input:
reserve_size_str = get_text_input( reserve_size_str = get_text_input(std_screen, reserve_size_text, reserve_offset, 0)
std_screen, reserve_size_text, reserve_offset, 0
)
try: try:
state.reserve_size = int(reserve_size_str) state.reserve_size = int(reserve_size_str)
state.show_reserve_input = False state.show_reserve_input = False
except ValueError: except ValueError:
std_screen.addstr( std_screen.addstr(reserve_offset, 0, "Invalid input. Press any key to continue.")
reserve_offset, 0, "Invalid input. Press any key to continue."
)
std_screen.getch() std_screen.getch()
state.show_reserve_input = False state.show_reserve_input = False
@@ -594,7 +595,8 @@ def reserve_size_input(
def draw_menu(std_screen: curses.window) -> State: def draw_menu(std_screen: curses.window) -> State:
"""draw the menu and handle user input """Draw the menu and handle user input.
Args: Args:
std_screen (curses.window): the curses window to draw on std_screen (curses.window): the curses window to draw on
Returns: Returns: