diff --git a/tools/installer.py b/tools/installer.py index 08bb738..9cf2b1a 100644 --- a/tools/installer.py +++ b/tools/installer.py @@ -2,6 +2,7 @@ from __future__ import annotations +import curses import logging import sys from os import getenv @@ -110,6 +111,8 @@ def create_zfs_pool(pool_disks: Sequence[str], mnt_dir: str) -> None: "-O relatime=on " "-O xattr=sa " "-O mountpoint=none " + "-O primarycache=metadata " + "-O compression=zstd " "root_pool " ) if len(pool_disks) == 1: @@ -180,20 +183,15 @@ def install_nixos(mnt_dir: str, disks: Sequence[str], encrypt: bool) -> None: run(("nixos-install", "--root", mnt_dir), check=True) # noqa: S603 -def main() -> None: +def installer( + disks: set[str], + swap_size: int, + reserve: int, + encrypt_key: str | None, +) -> None: """Main.""" - configure_logger("DEBUG") - logging.info("Starting installation") - disks = ("/dev/disk/by-id/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",) - - # Set swap size in GB, set to 1 if you don't want swap to take up too much space - swap_size = 1 - reserve = 0 - - encrypt_key = getenv("ENCRYPT_KEY") - for disk in disks: partition_disk(disk, swap_size, reserve) @@ -225,5 +223,334 @@ def main() -> None: logging.info("Installation complete") +class Cursor: + def __init__(self): + self.x_position = 0 + self.y_position = 0 + self.height = 0 + self.width = 0 + + def set_height(self, height: int): + self.height = height + + def set_width(self, width: int): + self.width = width + + def x_bounce_check(self, cursor: int) -> int: + cursor = max(0, cursor) + return min(self.width - 1, cursor) + + def y_bounce_check(self, cursor: int) -> int: + cursor = max(0, cursor) + return min(self.height - 1, cursor) + + def set_x(self, x: int): + self.x_position = self.x_bounce_check(x) + + def set_y(self, y: int): + self.y_position = self.y_bounce_check(y) + + def get_x(self) -> int: + return self.x_position + + def get_y(self) -> int: + return self.y_position + + def move_up(self): + self.set_y(self.y_position - 1) + + def move_down(self): + self.set_y(self.y_position + 1) + + def move_left(self): + self.set_x(self.x_position - 1) + + def move_right(self): + self.set_x(self.x_position + 1) + + def navigation(self, key: int) -> None: + action = { + curses.KEY_DOWN: self.move_down, + curses.KEY_UP: self.move_up, + curses.KEY_RIGHT: self.move_right, + curses.KEY_LEFT: self.move_left, + } + + action.get(key, lambda: None)() + + +class State: + def __init__(self): + self.key = 0 + self.cursor = Cursor() + + self.swap_size = 0 + self.show_swap_input = False + + self.reserve_size = 0 + self.show_reserve_input = False + + self.selected_devices = set() + + +def get_device(raw_device: str) -> dict[str, str]: + raw_device_components = raw_device.split(" ") + return { + thing.split("=")[0].lower(): thing.split("=")[1].strip('"') + for thing in raw_device_components + } + + +def get_devices() -> list[dict[str, str]]: + """Get a list of devices.""" + # --bytes + raw_devices = bash_wrapper("lsblk --paths --pairs").splitlines() + return [get_device(raw_device) for raw_device in raw_devices] + + +def calculate_device_menu_padding( + devices: list[dict[str, str]], column: str, padding: int = 0 +) -> int: + return max(len(device[column]) for device in devices) + padding + + +def draw_device_menu( + std_screen: curses.window, + devices: list[dict[str, str]], + state: State, + menu_start_y: int = 0, + menu_start_x: int = 0, +) -> State: + padding = 2 + + name_padding = calculate_device_menu_padding(devices, "name", padding) + size_padding = calculate_device_menu_padding(devices, "size", padding) + type_padding = calculate_device_menu_padding(devices, "type", padding) + mountpoints_padding = calculate_device_menu_padding(devices, "mountpoints", padding) + + device_header = f"{"Name":{name_padding}}{"Size":{size_padding}}{"Type":{type_padding}}{"Mountpoints":{mountpoints_padding}}" + + menu_width = range(menu_start_x, len(device_header) + menu_start_x) + + std_screen.addstr(menu_start_y, menu_start_x, device_header, curses.color_pair(5)) + devises_list_start = menu_start_y + 1 + + for index, device in enumerate(devices): + device_row_y = devises_list_start + index + device_row = ( + f"{device['name']:{name_padding}}" + f"{device['size']:{size_padding}}" + f"{device['type']:{type_padding}}" + f"{device['mountpoints']:{mountpoints_padding}}" + ) + + if device_row_y == state.cursor.get_y() and state.cursor.get_x() in menu_width: + std_screen.attron(curses.A_BOLD) + if state.key == ord(" "): + if device["name"] not in state.selected_devices: + state.selected_devices.add(device["name"]) + else: + state.selected_devices.remove(device["name"]) + + if device["name"] in state.selected_devices: + std_screen.attron(curses.color_pair(7)) + + std_screen.addstr(device_row_y, menu_start_x, device_row) + + std_screen.attroff(curses.color_pair(7)) + std_screen.attroff(curses.A_BOLD) + + return state + + +def debug_menu(std_screen: curses.window, key: int) -> None: + height, width = std_screen.getmaxyx() + width_height = "Width: {}, Height: {}".format(width, height) + std_screen.addstr(height - 4, 0, width_height, curses.color_pair(5)) + + key_pressed = f"Last key pressed: {key}"[: width - 1] + if key == 0: + key_pressed = "No key press detected..."[: width - 1] + std_screen.addstr(height - 3, 0, key_pressed) + + for i in range(0, 8): + std_screen.addstr(height - 2, i * 3, f"{i}██", curses.color_pair(i)) + + +def status_bar( + std_screen: curses.window, + cursor: Cursor, + width: int, + height: int, +) -> None: + std_screen.attron(curses.A_REVERSE) + std_screen.attron(curses.color_pair(3)) + + status_bar = ( + f"Press 'q' to exit | STATUS BAR | Pos: {cursor.get_x()}, {cursor.get_y()}" + ) + std_screen.addstr(height - 1, 0, status_bar) + std_screen.addstr(height - 1, len(status_bar), " " * (width - len(status_bar) - 1)) + + std_screen.attroff(curses.color_pair(3)) + std_screen.attroff(curses.A_REVERSE) + + +def set_color() -> None: + curses.start_color() + curses.use_default_colors() + for i in range(0, curses.COLORS): + curses.init_pair(i + 1, i, -1) + + +def get_text_input(std_screen: curses.window, prompt: str, y: int, x: int) -> str: + curses.echo() + std_screen.addstr(y, x, prompt) + input_str = "" + while True: + key = std_screen.getch() + if key == ord("\n"): + break + elif key == 27: # ESC key + input_str = "" + break + elif key in (curses.KEY_BACKSPACE, ord("\b"), 127): + input_str = input_str[:-1] + std_screen.addstr(y, x + len(prompt), input_str + " ") + else: + input_str += chr(key) + std_screen.refresh() + curses.noecho() + return input_str + + +def swap_size_input( + std_screen: curses.window, + state: State, + swap_offset: int, +) -> State: + swap_size_text = "Swap size (GB): " + std_screen.addstr(swap_offset, 0, f"{swap_size_text}{state.swap_size}") + if state.key == ord("\n") and state.cursor.get_y() == swap_offset: + state.show_swap_input = True + + if state.show_swap_input: + swap_size_str = get_text_input(std_screen, swap_size_text, swap_offset, 0) + try: + state.swap_size = int(swap_size_str) + state.show_swap_input = False + except ValueError: + std_screen.addstr( + swap_offset, 0, "Invalid input. Press any key to continue." + ) + std_screen.getch() + state.show_swap_input = False + + return state + + +def reserve_size_input( + std_screen: curses.window, + state: State, + reserve_offset: int, +) -> State: + reserve_size_text = "reserve size (GB): " + std_screen.addstr(reserve_offset, 0, f"{reserve_size_text}{state.reserve_size}") + if state.key == ord("\n") and state.cursor.get_y() == reserve_offset: + state.show_reserve_input = True + + if state.show_reserve_input: + reserve_size_str = get_text_input( + std_screen, reserve_size_text, reserve_offset, 0 + ) + try: + state.reserve_size = int(reserve_size_str) + state.show_reserve_input = False + except ValueError: + std_screen.addstr( + reserve_offset, 0, "Invalid input. Press any key to continue." + ) + std_screen.getch() + state.show_reserve_input = False + + return state + + +def draw_menu(std_screen: curses.window) -> State: + # Clear and refresh the screen for a blank canvas + std_screen.clear() + std_screen.refresh() + + set_color() + + state = State() + + devices = get_devices() + + # Loop where k is the last character pressed + while state.key != ord("q"): + std_screen.clear() + height, width = std_screen.getmaxyx() + + state.cursor.set_height(height) + state.cursor.set_width(width) + + state.cursor.navigation(state.key) + + state = draw_device_menu( + std_screen=std_screen, + state=state, + devices=devices, + ) + + device_count = len(devices) + + swap_offset = device_count + 2 + + swap_size_input( + std_screen=std_screen, + state=state, + swap_offset=swap_offset, + ) + reserve_size_input( + std_screen=std_screen, + state=state, + reserve_offset=swap_offset + 1, + ) + + status_bar(std_screen, state.cursor, width, height) + + debug_menu(std_screen, state.key) + + std_screen.move(state.cursor.get_y(), state.cursor.get_x()) + + std_screen.refresh() + + state.key = std_screen.getch() + + return state + + +def main() -> None: + configure_logger("DEBUG") + + state = curses.wrapper(draw_menu) + + encrypt_key = getenv("ENCRYPT_KEY") + + logging.info("installing_nixos") + logging.info(f"disks: {state.selected_devices}") + logging.info(f"swap_size: {state.swap_size}") + logging.info(f"reserve: {state.reserve_size}") + logging.info(f"encrypted: {bool(encrypt_key)}") + + installer( + disks=state.selected_devices, + swap_size=state.swap_size, + reserve=state.reserve_size, + encrypt_key=encrypt_key, + ) + + if __name__ == "__main__": main()