diff --git a/python/installer/old_installer.py b/python/installer/old_installer.py index 6b2bf91..38be8ec 100644 --- a/python/installer/old_installer.py +++ b/python/installer/old_installer.py @@ -16,9 +16,13 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: from collections.abc import Sequence +logger = logging.getLogger(__name__) +ESCAPE_KEY = 27 + def configure_logger(level: str = "INFO") -> None: """Configure the logger. + Args: level (str, optional): The logging level. Defaults to "INFO". """ @@ -32,15 +36,17 @@ def configure_logger(level: str = "INFO") -> None: def bash_wrapper(command: str) -> str: """Execute a bash command and capture the output. + Args: command (str): The bash command to be executed. + Returns: Tuple[str, int]: A tuple containing the output of the command (stdout) as a string, the error output (stderr) as a string (optional), and the return code as an integer. """ - logging.debug(f"running {command=}") + logger.debug(f"running {command=}") # This is a acceptable risk - process = Popen(command.split(), stdout=PIPE, stderr=PIPE) # noqa: S603 + process = Popen(command.split(), stdout=PIPE, stderr=PIPE) output, _ = process.communicate() if process.returncode != 0: error = f"Failed to run command {command=} return code {process.returncode=}" @@ -51,6 +57,7 @@ def bash_wrapper(command: str) -> str: def partition_disk(disk: str, swap_size: int, reserve: int = 0) -> None: """Partition a disk. + Args: disk (str): The disk to partition. swap_size (int): The size of the swap partition in GB. @@ -58,7 +65,7 @@ def partition_disk(disk: str, swap_size: int, reserve: int = 0) -> None: reserve (int, optional): The size of the reserve partition in GB. Defaults to 0. minimum value is 0. """ - logging.info(f"partitioning {disk=}") + logger.info(f"partitioning {disk=}") swap_size = max(swap_size, 1) reserve = max(reserve, 0) @@ -66,16 +73,16 @@ def partition_disk(disk: str, swap_size: int, reserve: int = 0) -> None: if reserve > 0: msg = f"Creating swap partition on {disk=} with size {swap_size=}GiB and reserve {reserve=}GiB" - logging.info(msg) + logger.info(msg) swap_start = swap_size + reserve swap_partition = f"mkpart swap -{swap_start}GiB -{reserve}GiB " else: - logging.info(f"Creating swap partition on {disk=} with size {swap_size=}GiB") + logger.info(f"Creating swap partition on {disk=} with size {swap_size=}GiB") swap_start = swap_size swap_partition = f"mkpart swap -{swap_start}GiB 100% " - logging.debug(f"{swap_partition=}") + logger.debug(f"{swap_partition=}") create_partitions = ( f"parted --script --align=optimal {disk} -- " @@ -87,13 +94,14 @@ def partition_disk(disk: str, swap_size: int, reserve: int = 0) -> None: ) bash_wrapper(create_partitions) - logging.info(f"{disk=} successfully partitioned") + logger.info(f"{disk=} successfully partitioned") def create_zfs_pool(pool_disks: Sequence[str], mnt_dir: str) -> None: """Create a ZFS pool. + Args: - disks (Sequence[str]): A tuple of disks to use for the pool. + pool_disks (Sequence[str]): A tuple of disks to use for the pool. mnt_dir (str): The mount directory. """ if len(pool_disks) <= 0: @@ -125,13 +133,12 @@ def create_zfs_pool(pool_disks: Sequence[str], mnt_dir: str) -> None: bash_wrapper(zpool_create) zpools = bash_wrapper("zpool list -o name") if "root_pool" not in zpools.splitlines(): - logging.critical("Failed to create root_pool") + logger.critical("Failed to create root_pool") sys.exit(1) def create_zfs_datasets() -> None: """Create ZFS datasets.""" - bash_wrapper("zfs create -o canmount=noauto -o reservation=10G root_pool/root") bash_wrapper("zfs create root_pool/home") bash_wrapper("zfs create root_pool/var -o reservation=1G") @@ -146,7 +153,7 @@ def create_zfs_datasets() -> None: } missing_datasets = expected_datasets.difference(datasets.splitlines()) if missing_datasets: - logging.critical(f"Failed to create pools {missing_datasets}") + logger.critical(f"Failed to create pools {missing_datasets}") sys.exit(1) @@ -159,6 +166,8 @@ def get_cpu_manufacturer() -> str: for line in output.splitlines(): if "vendor_id" in line: return id_vendor[line.split(": ")[1].strip()] + error = "Failed to get CPU manufacturer" + raise RuntimeError(error) def get_boot_drive_id(disk: str) -> str: @@ -167,9 +176,8 @@ def get_boot_drive_id(disk: str) -> str: return output.splitlines()[1] -def create_nix_hardware_file(mnt_dir: str, disks: Sequence[str], encrypt: bool) -> None: +def create_nix_hardware_file(mnt_dir: str, disks: Sequence[str], *, encrypt: bool) -> None: """Create a NixOS hardware file.""" - cpu_manufacturer = get_cpu_manufacturer() devices = "" @@ -193,7 +201,15 @@ def create_nix_hardware_file(mnt_dir: str, disks: Sequence[str], encrypt: bool) ' imports = [ (modulesPath + "/installer/scan/not-detected.nix") ];\n\n' " boot = {\n" " initrd = {\n" - ' availableKernelModules = [ \n "ahci"\n "ehci_pci"\n "nvme"\n "sd_mod"\n "usb_storage"\n "usbhid"\n "xhci_pci"\n ];\n' + " availableKernelModules = [ \n" + ' "ahci"\n' + ' "ehci_pci"\n' + ' "nvme"\n' + ' "sd_mod"\n' + ' "usb_storage"\n' + ' "usbhid"\n' + ' "xhci_pci"\n' + " ];\n" " kernelModules = [ ];\n" f" {devices}" " };\n" @@ -207,11 +223,18 @@ def create_nix_hardware_file(mnt_dir: str, disks: Sequence[str], encrypt: bool) ' "/nix" = {\n device = "root_pool/nix";\n fsType = "zfs";\n };\n\n' ' "/boot" = {\n' f' device = "/dev/disk/by-uuid/{get_boot_drive_id(disks[0])}";\n' - ' fsType = "vfat";\n options = [\n "fmask=0077"\n "dmask=0077"\n ];\n };\n };\n\n' + ' fsType = "vfat";\n' + " options = [\n" + ' "fmask=0077"\n' + ' "dmask=0077"\n' + " ];\n" + " };\n" + " };\n\n" " swapDevices = [ ];\n\n" " networking.useDHCP = lib.mkDefault true;\n\n" ' nixpkgs.hostPlatform = lib.mkDefault "x86_64-linux";\n' - f" hardware.cpu.{cpu_manufacturer}.updateMicrocode = lib.mkDefault config.hardware.enableRedistributableFirmware;\n" + f" hardware.cpu.{cpu_manufacturer}.updateMicrocode = lib.mkDefault " + "config.hardware.enableRedistributableFirmware;\n" f' networking.hostId = "{host_id}";\n' "}\n" ) @@ -219,7 +242,7 @@ def create_nix_hardware_file(mnt_dir: str, disks: Sequence[str], encrypt: bool) Path(f"{mnt_dir}/etc/nixos/hardware-configuration.nix").write_text(nix_hardware) -def install_nixos(mnt_dir: str, disks: Sequence[str], encrypt: bool) -> None: +def install_nixos(mnt_dir: str, disks: Sequence[str], *, encrypt: bool) -> None: """Install NixOS.""" bash_wrapper(f"mount -o X-mount.mkdir -t zfs root_pool/root {mnt_dir}") bash_wrapper(f"mount -o X-mount.mkdir -t zfs root_pool/home {mnt_dir}/home") @@ -230,14 +253,16 @@ def install_nixos(mnt_dir: str, disks: Sequence[str], encrypt: bool) -> None: bash_wrapper(f"mkfs.vfat -n EFI {disk}-part1") # set up mirroring afterwards if more than one disk - boot_partition = f"mount -t vfat -o fmask=0077,dmask=0077,iocharset=iso8859-1,X-mount.mkdir {disks[0]}-part1 {mnt_dir}/boot" + boot_partition = ( + f"mount -t vfat -o fmask=0077,dmask=0077,iocharset=iso8859-1,X-mount.mkdir {disks[0]}-part1 {mnt_dir}/boot" + ) bash_wrapper(boot_partition) bash_wrapper(f"nixos-generate-config --root {mnt_dir}") - create_nix_hardware_file(mnt_dir, disks, encrypt) + create_nix_hardware_file(mnt_dir, disks, encrypt=encrypt) - run(("nixos-install", "--root", mnt_dir), check=True) # noqa: S603 + run(("nixos-install", "--root", mnt_dir), check=True) def installer( @@ -247,27 +272,37 @@ def installer( encrypt_key: str | None, ) -> None: """Main.""" - logging.info("Starting installation") + logger.info("Starting installation") for disk in disks: partition_disk(disk, swap_size, reserve) if encrypt_key: sleep(1) - for command in ( - f'printf "{encrypt_key}" | cryptsetup luksFormat --type luks2 {disk}-part2 -', - f'printf "{encrypt_key}" | cryptsetup luksOpen {disk}-part2 luks-root-pool-{disk.split("/")[-1]}-part2 -', - ): - run(command, shell=True, check=True) + key_input = encrypt_key.encode() + run( + ("cryptsetup", "luksFormat", "--type", "luks2", f"{disk}-part2", "-"), + input=key_input, + check=True, + ) + run( + ( + "cryptsetup", + "luksOpen", + f"{disk}-part2", + f"luks-root-pool-{disk.split('/')[-1]}-part2", + "-", + ), + input=key_input, + check=True, + ) mnt_dir = "/tmp/nix_install" # noqa: S108 Path(mnt_dir).mkdir(parents=True, exist_ok=True) if encrypt_key: - pool_disks = [ - f"/dev/mapper/luks-root-pool-{disk.split('/')[-1]}-part2" for disk in disks - ] + pool_disks = [f"/dev/mapper/luks-root-pool-{disk.split('/')[-1]}-part2" for disk in disks] else: pool_disks = [f"{disk}-part2" for disk in disks] @@ -275,57 +310,73 @@ def installer( create_zfs_datasets() - install_nixos(mnt_dir, disks, encrypt_key) + install_nixos(mnt_dir, disks, encrypt=bool(encrypt_key)) - logging.info("Installation complete") + logger.info("Installation complete") class Cursor: - def __init__(self): + """Track cursor position and constrain movement to screen bounds.""" + + def __init__(self) -> None: + """Initialize cursor position and screen dimensions.""" self.x_position = 0 self.y_position = 0 self.height = 0 self.width = 0 - def set_height(self, height: int): + def set_height(self, height: int) -> None: + """Set the maximum screen height.""" self.height = height - def set_width(self, width: int): + def set_width(self, width: int) -> None: + """Set the maximum screen width.""" self.width = width def x_bounce_check(self, cursor: int) -> int: + """Clamp an x position to the screen width.""" cursor = max(0, cursor) return min(self.width - 1, cursor) def y_bounce_check(self, cursor: int) -> int: + """Clamp a y position to the screen height.""" cursor = max(0, cursor) return min(self.height - 1, cursor) - def set_x(self, x: int): + def set_x(self, x: int) -> None: + """Set the cursor x position.""" self.x_position = self.x_bounce_check(x) - def set_y(self, y: int): + def set_y(self, y: int) -> None: + """Set the cursor y position.""" self.y_position = self.y_bounce_check(y) def get_x(self) -> int: + """Get the cursor x position.""" return self.x_position def get_y(self) -> int: + """Get the cursor y position.""" return self.y_position - def move_up(self): + def move_up(self) -> None: + """Move the cursor up one row.""" self.set_y(self.y_position - 1) - def move_down(self): + def move_down(self) -> None: + """Move the cursor down one row.""" self.set_y(self.y_position + 1) - def move_left(self): + def move_left(self) -> None: + """Move the cursor left one column.""" self.set_x(self.x_position - 1) - def move_right(self): + def move_right(self) -> None: + """Move the cursor right one column.""" self.set_x(self.x_position + 1) def navigation(self, key: int) -> None: + """Move the cursor for a curses navigation key.""" action = { curses.KEY_DOWN: self.move_down, curses.KEY_UP: self.move_up, @@ -339,7 +390,8 @@ class Cursor: class State: """State class to store the state of the program.""" - def __init__(self): + def __init__(self) -> None: + """Initialize installer menu state.""" self.key = 0 self.cursor = Cursor() @@ -357,11 +409,9 @@ class State: def get_device(raw_device: str) -> dict[str, str]: + """Parse an lsblk key-value device row.""" raw_device_components = raw_device.split(" ") - return { - thing.split("=")[0].lower(): thing.split("=")[1].strip('"') - for thing in raw_device_components - } + return {thing.split("=")[0].lower(): thing.split("=")[1].strip('"') for thing in raw_device_components} def get_devices() -> list[dict[str, str]]: @@ -373,6 +423,7 @@ def get_devices() -> list[dict[str, str]]: def get_device_id_mapping() -> dict[str, set[str]]: """Get a list of device ids. + Returns: list[str]: the list of device ids """ @@ -387,9 +438,8 @@ def get_device_id_mapping() -> dict[str, set[str]]: return device_id_mapping -def calculate_device_menu_padding( - devices: list[dict[str, str]], column: str, padding: int = 0 -) -> int: +def calculate_device_menu_padding(devices: list[dict[str, str]], column: str, padding: int = 0) -> int: + """Calculate the width needed for a device menu column.""" return max(len(device[column]) for device in devices) + padding @@ -401,6 +451,7 @@ def draw_device_ids( menu_width: list[int], device_ids: set[str], ) -> tuple[State, int]: + """Draw selectable device IDs for a device row.""" for device_id in sorted(device_ids): row_number = row_number + 1 if row_number == state.cursor.get_y() and state.cursor.get_x() in menu_width: @@ -429,8 +480,9 @@ def draw_device_menu( state: State, menu_start_y: int = 0, menu_start_x: int = 0, -) -> State: - """draw the device menu and handle user input +) -> tuple[State, int]: + """Draw the device menu and handle user input. + Args: std_screen (curses.window): the curses window to draw on devices (list[dict[str, str]]): the list of devices to draw @@ -438,6 +490,7 @@ def draw_device_menu( state (State): the state object to update menu_start_y (int, optional): the y position to start drawing the menu. Defaults to 0. menu_start_x (int, optional): the x position to start drawing the menu. Defaults to 0. + Returns: State: the updated state object """ @@ -448,7 +501,9 @@ def draw_device_menu( type_padding = calculate_device_menu_padding(devices, "type", padding) mountpoints_padding = calculate_device_menu_padding(devices, "mountpoints", padding) - device_header = f"{'Name':{name_padding}}{'Size':{size_padding}}{'Type':{type_padding}}{'Mountpoints':{mountpoints_padding}}" + device_header = ( + f"{'Name':{name_padding}}{'Size':{size_padding}}{'Type':{type_padding}}{'Mountpoints':{mountpoints_padding}}" + ) menu_width = range(menu_start_x, len(device_header) + menu_start_x) @@ -481,8 +536,9 @@ def draw_device_menu( def debug_menu(std_screen: curses.window, key: int) -> None: + """Draw debug information for the current curses screen.""" height, width = std_screen.getmaxyx() - width_height = "Width: {}, Height: {}".format(width, height) + width_height = f"Width: {width}, Height: {height}" std_screen.addstr(height - 4, 0, width_height, curses.color_pair(5)) key_pressed = f"Last key pressed: {key}"[: width - 1] @@ -490,7 +546,7 @@ def debug_menu(std_screen: curses.window, key: int) -> None: key_pressed = "No key press detected..."[: width - 1] std_screen.addstr(height - 3, 0, key_pressed) - for i in range(0, 8): + for i in range(8): std_screen.addstr(height - 2, i * 3, f"{i}██", curses.color_pair(i)) @@ -500,12 +556,11 @@ def status_bar( width: int, height: int, ) -> None: + """Draw the footer status bar.""" std_screen.attron(curses.A_REVERSE) std_screen.attron(curses.color_pair(3)) - status_bar = ( - f"Press 'q' to exit | STATUS BAR | Pos: {cursor.get_x()}, {cursor.get_y()}" - ) + status_bar = f"Press 'q' to exit | STATUS BAR | Pos: {cursor.get_x()}, {cursor.get_y()}" std_screen.addstr(height - 1, 0, status_bar) std_screen.addstr(height - 1, len(status_bar), " " * (width - len(status_bar) - 1)) @@ -514,13 +569,15 @@ def status_bar( def set_color() -> None: + """Initialize curses color pairs.""" curses.start_color() curses.use_default_colors() - for i in range(0, curses.COLORS): + for i in range(curses.COLORS): curses.init_pair(i + 1, i, -1) def get_text_input(std_screen: curses.window, prompt: str, y: int, x: int) -> str: + """Read text input from a curses screen.""" curses.echo() std_screen.addstr(y, x, prompt) input_str = "" @@ -528,10 +585,10 @@ def get_text_input(std_screen: curses.window, prompt: str, y: int, x: int) -> st key = std_screen.getch() if key == ord("\n"): break - elif key == 27: # ESC key + if key == ESCAPE_KEY: input_str = "" break - elif key in (curses.KEY_BACKSPACE, ord("\b"), 127): + if key in (curses.KEY_BACKSPACE, ord("\b"), 127): input_str = input_str[:-1] std_screen.addstr(y, x + len(prompt), input_str + " ") else: @@ -546,6 +603,7 @@ def swap_size_input( state: State, swap_offset: int, ) -> State: + """Handle swap size input.""" swap_size_text = "Swap size (GB): " std_screen.addstr(swap_offset, 0, f"{swap_size_text}{state.swap_size}") if state.key == ord("\n") and state.cursor.get_y() == swap_offset: @@ -557,9 +615,7 @@ def swap_size_input( state.swap_size = int(swap_size_str) state.show_swap_input = False except ValueError: - std_screen.addstr( - swap_offset, 0, "Invalid input. Press any key to continue." - ) + std_screen.addstr(swap_offset, 0, "Invalid input. Press any key to continue.") std_screen.getch() state.show_swap_input = False @@ -571,22 +627,19 @@ def reserve_size_input( state: State, reserve_offset: int, ) -> State: + """Handle reserve size input.""" reserve_size_text = "reserve size (GB): " std_screen.addstr(reserve_offset, 0, f"{reserve_size_text}{state.reserve_size}") if state.key == ord("\n") and state.cursor.get_y() == reserve_offset: state.show_reserve_input = True if state.show_reserve_input: - reserve_size_str = get_text_input( - std_screen, reserve_size_text, reserve_offset, 0 - ) + reserve_size_str = get_text_input(std_screen, reserve_size_text, reserve_offset, 0) try: state.reserve_size = int(reserve_size_str) state.show_reserve_input = False except ValueError: - std_screen.addstr( - reserve_offset, 0, "Invalid input. Press any key to continue." - ) + std_screen.addstr(reserve_offset, 0, "Invalid input. Press any key to continue.") std_screen.getch() state.show_reserve_input = False @@ -594,9 +647,11 @@ def reserve_size_input( def draw_menu(std_screen: curses.window) -> State: - """draw the menu and handle user input + """Draw the menu and handle user input. + Args: std_screen (curses.window): the curses window to draw on + Returns: State: the state object """ @@ -656,17 +711,18 @@ def draw_menu(std_screen: curses.window) -> State: def main() -> None: + """Run the installer menu and start installation.""" configure_logger("DEBUG") state = curses.wrapper(draw_menu) encrypt_key = getenv("ENCRYPT_KEY") - logging.info("installing_nixos") - logging.info(f"disks: {state.selected_device_ids}") - logging.info(f"swap_size: {state.swap_size}") - logging.info(f"reserve: {state.reserve_size}") - logging.info(f"encrypted: {bool(encrypt_key)}") + logger.info("installing_nixos") + logger.info(f"disks: {state.selected_device_ids}") + logger.info(f"swap_size: {state.swap_size}") + logger.info(f"reserve: {state.reserve_size}") + logger.info(f"encrypted: {bool(encrypt_key)}") sleep(3)