From bcd855cb880b45bc6910593930d7db09abaccf01 Mon Sep 17 00:00:00 2001 From: Richie Cahill Date: Fri, 12 Jun 2026 13:24:31 -0400 Subject: [PATCH] cleaned up installer.py --- python/tools/installer.py | 104 +++++++++++++++++++------------------- 1 file changed, 53 insertions(+), 51 deletions(-) diff --git a/python/tools/installer.py b/python/tools/installer.py index 6b2bf91..cb87c92 100644 --- a/python/tools/installer.py +++ b/python/tools/installer.py @@ -1,3 +1,4 @@ +# ruff: noqa: LOG015, E501, D102, D103, D107 These need the be fixed """Install NixOS on a ZFS pool.""" from __future__ import annotations @@ -19,6 +20,7 @@ if TYPE_CHECKING: def configure_logger(level: str = "INFO") -> None: """Configure the logger. + Args: level (str, optional): The logging level. Defaults to "INFO". """ @@ -32,15 +34,17 @@ def configure_logger(level: str = "INFO") -> None: def bash_wrapper(command: str) -> str: """Execute a bash command and capture the output. + Args: command (str): The bash command to be executed. + Returns: Tuple[str, int]: A tuple containing the output of the command (stdout) as a string, the error output (stderr) as a string (optional), and the return code as an integer. """ logging.debug(f"running {command=}") # This is a acceptable risk - process = Popen(command.split(), stdout=PIPE, stderr=PIPE) # noqa: S603 + process = Popen(command.split(), stdout=PIPE, stderr=PIPE) output, _ = process.communicate() if process.returncode != 0: error = f"Failed to run command {command=} return code {process.returncode=}" @@ -51,6 +55,7 @@ def bash_wrapper(command: str) -> str: def partition_disk(disk: str, swap_size: int, reserve: int = 0) -> None: """Partition a disk. + Args: disk (str): The disk to partition. swap_size (int): The size of the swap partition in GB. @@ -92,8 +97,9 @@ def partition_disk(disk: str, swap_size: int, reserve: int = 0) -> None: def create_zfs_pool(pool_disks: Sequence[str], mnt_dir: str) -> None: """Create a ZFS pool. + Args: - disks (Sequence[str]): A tuple of disks to use for the pool. + pool_disks (Sequence[str]): A tuple of disks to use for the pool. mnt_dir (str): The mount directory. """ if len(pool_disks) <= 0: @@ -131,7 +137,6 @@ def create_zfs_pool(pool_disks: Sequence[str], mnt_dir: str) -> None: def create_zfs_datasets() -> None: """Create ZFS datasets.""" - bash_wrapper("zfs create -o canmount=noauto -o reservation=10G root_pool/root") bash_wrapper("zfs create root_pool/home") bash_wrapper("zfs create root_pool/var -o reservation=1G") @@ -160,6 +165,9 @@ def get_cpu_manufacturer() -> str: if "vendor_id" in line: return id_vendor[line.split(": ")[1].strip()] + error = "CPU manufacturer not found" + raise RuntimeError(error) + def get_boot_drive_id(disk: str) -> str: """Get the boot drive ID.""" @@ -167,9 +175,8 @@ def get_boot_drive_id(disk: str) -> str: return output.splitlines()[1] -def create_nix_hardware_file(mnt_dir: str, disks: Sequence[str], encrypt: bool) -> None: +def create_nix_hardware_file(mnt_dir: str, disks: Sequence[str], *, encrypt: bool) -> None: """Create a NixOS hardware file.""" - cpu_manufacturer = get_cpu_manufacturer() devices = "" @@ -219,7 +226,7 @@ def create_nix_hardware_file(mnt_dir: str, disks: Sequence[str], encrypt: bool) Path(f"{mnt_dir}/etc/nixos/hardware-configuration.nix").write_text(nix_hardware) -def install_nixos(mnt_dir: str, disks: Sequence[str], encrypt: bool) -> None: +def install_nixos(mnt_dir: str, disks: Sequence[str], *, encrypt: bool) -> None: """Install NixOS.""" bash_wrapper(f"mount -o X-mount.mkdir -t zfs root_pool/root {mnt_dir}") bash_wrapper(f"mount -o X-mount.mkdir -t zfs root_pool/home {mnt_dir}/home") @@ -230,14 +237,16 @@ def install_nixos(mnt_dir: str, disks: Sequence[str], encrypt: bool) -> None: bash_wrapper(f"mkfs.vfat -n EFI {disk}-part1") # set up mirroring afterwards if more than one disk - boot_partition = f"mount -t vfat -o fmask=0077,dmask=0077,iocharset=iso8859-1,X-mount.mkdir {disks[0]}-part1 {mnt_dir}/boot" + boot_partition = ( + f"mount -t vfat -o fmask=0077,dmask=0077,iocharset=iso8859-1,X-mount.mkdir {disks[0]}-part1 {mnt_dir}/boot" + ) bash_wrapper(boot_partition) bash_wrapper(f"nixos-generate-config --root {mnt_dir}") - create_nix_hardware_file(mnt_dir, disks, encrypt) + create_nix_hardware_file(mnt_dir, disks, encrypt=encrypt) - run(("nixos-install", "--root", mnt_dir), check=True) # noqa: S603 + run(("nixos-install", "--root", mnt_dir), check=True) def installer( @@ -258,16 +267,14 @@ def installer( f'printf "{encrypt_key}" | cryptsetup luksFormat --type luks2 {disk}-part2 -', f'printf "{encrypt_key}" | cryptsetup luksOpen {disk}-part2 luks-root-pool-{disk.split("/")[-1]}-part2 -', ): - run(command, shell=True, check=True) + run(command, shell=True, check=True) # noqa: S602 mnt_dir = "/tmp/nix_install" # noqa: S108 Path(mnt_dir).mkdir(parents=True, exist_ok=True) if encrypt_key: - pool_disks = [ - f"/dev/mapper/luks-root-pool-{disk.split('/')[-1]}-part2" for disk in disks - ] + pool_disks = [f"/dev/mapper/luks-root-pool-{disk.split('/')[-1]}-part2" for disk in disks] else: pool_disks = [f"{disk}-part2" for disk in disks] @@ -275,22 +282,24 @@ def installer( create_zfs_datasets() - install_nixos(mnt_dir, disks, encrypt_key) + install_nixos(mnt_dir, disks, encrypt=encrypt_key) logging.info("Installation complete") class Cursor: - def __init__(self): + """Cursor class to store the cursor position.""" + + def __init__(self) -> None: self.x_position = 0 self.y_position = 0 self.height = 0 self.width = 0 - def set_height(self, height: int): + def set_height(self, height: int) -> None: self.height = height - def set_width(self, width: int): + def set_width(self, width: int) -> None: self.width = width def x_bounce_check(self, cursor: int) -> int: @@ -301,10 +310,10 @@ class Cursor: cursor = max(0, cursor) return min(self.height - 1, cursor) - def set_x(self, x: int): + def set_x(self, x: int) -> None: self.x_position = self.x_bounce_check(x) - def set_y(self, y: int): + def set_y(self, y: int) -> None: self.y_position = self.y_bounce_check(y) def get_x(self) -> int: @@ -313,16 +322,16 @@ class Cursor: def get_y(self) -> int: return self.y_position - def move_up(self): + def move_up(self) -> None: self.set_y(self.y_position - 1) - def move_down(self): + def move_down(self) -> None: self.set_y(self.y_position + 1) - def move_left(self): + def move_left(self) -> None: self.set_x(self.x_position - 1) - def move_right(self): + def move_right(self) -> None: self.set_x(self.x_position + 1) def navigation(self, key: int) -> None: @@ -339,7 +348,7 @@ class Cursor: class State: """State class to store the state of the program.""" - def __init__(self): + def __init__(self) -> None: self.key = 0 self.cursor = Cursor() @@ -358,10 +367,7 @@ class State: def get_device(raw_device: str) -> dict[str, str]: raw_device_components = raw_device.split(" ") - return { - thing.split("=")[0].lower(): thing.split("=")[1].strip('"') - for thing in raw_device_components - } + return {thing.split("=")[0].lower(): thing.split("=")[1].strip('"') for thing in raw_device_components} def get_devices() -> list[dict[str, str]]: @@ -373,6 +379,7 @@ def get_devices() -> list[dict[str, str]]: def get_device_id_mapping() -> dict[str, set[str]]: """Get a list of device ids. + Returns: list[str]: the list of device ids """ @@ -387,9 +394,7 @@ def get_device_id_mapping() -> dict[str, set[str]]: return device_id_mapping -def calculate_device_menu_padding( - devices: list[dict[str, str]], column: str, padding: int = 0 -) -> int: +def calculate_device_menu_padding(devices: list[dict[str, str]], column: str, padding: int = 0) -> int: return max(len(device[column]) for device in devices) + padding @@ -430,7 +435,8 @@ def draw_device_menu( menu_start_y: int = 0, menu_start_x: int = 0, ) -> State: - """draw the device menu and handle user input + """Draw the device menu and handle user input. + Args: std_screen (curses.window): the curses window to draw on devices (list[dict[str, str]]): the list of devices to draw @@ -438,6 +444,7 @@ def draw_device_menu( state (State): the state object to update menu_start_y (int, optional): the y position to start drawing the menu. Defaults to 0. menu_start_x (int, optional): the x position to start drawing the menu. Defaults to 0. + Returns: State: the updated state object """ @@ -448,7 +455,9 @@ def draw_device_menu( type_padding = calculate_device_menu_padding(devices, "type", padding) mountpoints_padding = calculate_device_menu_padding(devices, "mountpoints", padding) - device_header = f"{'Name':{name_padding}}{'Size':{size_padding}}{'Type':{type_padding}}{'Mountpoints':{mountpoints_padding}}" + device_header = ( + f"{'Name':{name_padding}}{'Size':{size_padding}}{'Type':{type_padding}}{'Mountpoints':{mountpoints_padding}}" + ) menu_width = range(menu_start_x, len(device_header) + menu_start_x) @@ -482,7 +491,7 @@ def draw_device_menu( def debug_menu(std_screen: curses.window, key: int) -> None: height, width = std_screen.getmaxyx() - width_height = "Width: {}, Height: {}".format(width, height) + width_height = f"Width: {width}, Height: {height}" std_screen.addstr(height - 4, 0, width_height, curses.color_pair(5)) key_pressed = f"Last key pressed: {key}"[: width - 1] @@ -490,7 +499,7 @@ def debug_menu(std_screen: curses.window, key: int) -> None: key_pressed = "No key press detected..."[: width - 1] std_screen.addstr(height - 3, 0, key_pressed) - for i in range(0, 8): + for i in range(8): std_screen.addstr(height - 2, i * 3, f"{i}██", curses.color_pair(i)) @@ -503,9 +512,7 @@ def status_bar( std_screen.attron(curses.A_REVERSE) std_screen.attron(curses.color_pair(3)) - status_bar = ( - f"Press 'q' to exit | STATUS BAR | Pos: {cursor.get_x()}, {cursor.get_y()}" - ) + status_bar = f"Press 'q' to exit | STATUS BAR | Pos: {cursor.get_x()}, {cursor.get_y()}" std_screen.addstr(height - 1, 0, status_bar) std_screen.addstr(height - 1, len(status_bar), " " * (width - len(status_bar) - 1)) @@ -516,7 +523,7 @@ def status_bar( def set_color() -> None: curses.start_color() curses.use_default_colors() - for i in range(0, curses.COLORS): + for i in range(curses.COLORS): curses.init_pair(i + 1, i, -1) @@ -528,10 +535,10 @@ def get_text_input(std_screen: curses.window, prompt: str, y: int, x: int) -> st key = std_screen.getch() if key == ord("\n"): break - elif key == 27: # ESC key + if key == 27: # ESC key # noqa: PLR2004 input_str = "" break - elif key in (curses.KEY_BACKSPACE, ord("\b"), 127): + if key in (curses.KEY_BACKSPACE, ord("\b"), 127): input_str = input_str[:-1] std_screen.addstr(y, x + len(prompt), input_str + " ") else: @@ -557,9 +564,7 @@ def swap_size_input( state.swap_size = int(swap_size_str) state.show_swap_input = False except ValueError: - std_screen.addstr( - swap_offset, 0, "Invalid input. Press any key to continue." - ) + std_screen.addstr(swap_offset, 0, "Invalid input. Press any key to continue.") std_screen.getch() state.show_swap_input = False @@ -577,16 +582,12 @@ def reserve_size_input( state.show_reserve_input = True if state.show_reserve_input: - reserve_size_str = get_text_input( - std_screen, reserve_size_text, reserve_offset, 0 - ) + reserve_size_str = get_text_input(std_screen, reserve_size_text, reserve_offset, 0) try: state.reserve_size = int(reserve_size_str) state.show_reserve_input = False except ValueError: - std_screen.addstr( - reserve_offset, 0, "Invalid input. Press any key to continue." - ) + std_screen.addstr(reserve_offset, 0, "Invalid input. Press any key to continue.") std_screen.getch() state.show_reserve_input = False @@ -594,7 +595,8 @@ def reserve_size_input( def draw_menu(std_screen: curses.window) -> State: - """draw the menu and handle user input + """Draw the menu and handle user input. + Args: std_screen (curses.window): the curses window to draw on Returns: