Files
dotfiles/python/installer/tui.py

499 lines
14 KiB
Python

"""TUI module."""
from __future__ import annotations
import curses
import logging
from collections import defaultdict
from subprocess import PIPE, Popen
logger = logging.getLogger(__name__)
def bash_wrapper(command: str) -> str:
"""Execute a bash command and capture the output.
Args:
command (str): The bash command to be executed.
Returns:
Tuple[str, int]: A tuple containing the output of the command (stdout) as a string,
the error output (stderr) as a string (optional), and the return code as an integer.
"""
logger.debug(f"running {command=}")
# This is a acceptable risk
process = Popen(command.split(), stdout=PIPE, stderr=PIPE)
output, _ = process.communicate()
if process.returncode != 0:
error = f"Failed to run command {command=} return code {process.returncode=}"
raise RuntimeError(error)
return output.decode()
class Cursor:
"""Cursor class."""
def __init__(self) -> None:
"""Initialize the Cursor class."""
self.x_position = 0
self.y_position = 0
self.height = 0
self.width = 0
def set_height(self, height: int) -> None:
"""Set height."""
self.height = height
def set_width(self, width: int) -> None:
"""Set width."""
self.width = width
def x_bounce_check(self, cursor: int) -> int:
"""X bounce check."""
cursor = max(0, cursor)
return min(self.width - 1, cursor)
def y_bounce_check(self, cursor: int) -> int:
"""Y bounce check."""
cursor = max(0, cursor)
return min(self.height - 1, cursor)
def set_x(self, x: int) -> None:
"""Set x."""
self.x_position = self.x_bounce_check(x)
def set_y(self, y: int) -> None:
"""Set y."""
self.y_position = self.y_bounce_check(y)
def get_x(self) -> int:
"""Get x."""
return self.x_position
def get_y(self) -> int:
"""Get y."""
return self.y_position
def move_up(self) -> None:
"""Move up."""
self.set_y(self.y_position - 1)
def move_down(self) -> None:
"""Move down."""
self.set_y(self.y_position + 1)
def move_left(self) -> None:
"""Move left."""
self.set_x(self.x_position - 1)
def move_right(self) -> None:
"""Move right."""
self.set_x(self.x_position + 1)
def navigation(self, key: int) -> None:
"""Navigation.
Args:
key (int): The key.
"""
action = {
curses.KEY_DOWN: self.move_down,
curses.KEY_UP: self.move_up,
curses.KEY_RIGHT: self.move_right,
curses.KEY_LEFT: self.move_left,
}
action.get(key, lambda: None)()
class State:
"""State class to store the state of the program."""
def __init__(self) -> None:
"""Initialize the State class."""
self.key = 0
self.cursor = Cursor()
self.swap_size = 0
self.show_swap_input = False
self.reserve_size = 0
self.show_reserve_input = False
self.selected_device_ids: set[str] = set()
def get_selected_devices(self) -> tuple[str, ...]:
"""Get selected devices."""
return tuple(self.selected_device_ids)
def get_device(raw_device: str) -> dict[str, str]:
"""Get a device.
Args:
raw_device (str): The raw device.
Returns:
dict[str, str]: The device.
"""
raw_device_components = raw_device.split(" ")
return {thing.split("=")[0].lower(): thing.split("=")[1].strip('"') for thing in raw_device_components}
def get_devices() -> list[dict[str, str]]:
"""Get a list of devices."""
# --bytes
raw_devices = bash_wrapper("lsblk --paths --pairs").splitlines()
return [get_device(raw_device) for raw_device in raw_devices]
def set_color() -> None:
"""Set the color."""
curses.start_color()
curses.use_default_colors()
for i in range(curses.COLORS):
curses.init_pair(i + 1, i, -1)
def debug_menu(std_screen: curses.window, key: int) -> None:
"""Debug menu.
Args:
std_screen (curses.window): The curses window.
key (int): The key.
"""
height, width = std_screen.getmaxyx()
std_screen.addstr(height - 4, 0, f"Width: {width}, Height: {height}", curses.color_pair(5))
key_pressed = f"Last key pressed: {key}"[: width - 1]
if key == 0:
key_pressed = "No key press detected..."[: width - 1]
std_screen.addstr(height - 3, 0, key_pressed)
for i in range(8):
std_screen.addstr(height - 2, i * 3, f"{i}██", curses.color_pair(i))
def get_text_input(std_screen: curses.window, prompt: str, y: int, x: int) -> str:
"""Get text input.
Args:
std_screen (curses.window): The curses window.
prompt (str): The prompt.
y (int): The y position.
x (int): The x position.
Returns:
str: The input string.
"""
esc_key = 27
curses.echo()
std_screen.addstr(y, x, prompt)
input_str = ""
while True:
key = std_screen.getch()
if key == ord("\n"):
break
if key == esc_key:
input_str = ""
break
if key in (curses.KEY_BACKSPACE, ord("\b"), 127):
input_str = input_str[:-1]
std_screen.addstr(y, x + len(prompt), input_str + " ")
else:
input_str += chr(key)
std_screen.refresh()
curses.noecho()
return input_str
def swap_size_input(
std_screen: curses.window,
state: State,
swap_offset: int,
) -> State:
"""Reserve size input.
Args:
std_screen (curses.window): The curses window.
state (State): The state object.
swap_offset (int): The swap offset.
Returns:
State: The updated state object.
"""
swap_size_text = "Swap size (GB): "
std_screen.addstr(swap_offset, 0, f"{swap_size_text}{state.swap_size}")
if state.key == ord("\n") and state.cursor.get_y() == swap_offset:
state.show_swap_input = True
if state.show_swap_input:
swap_size_str = get_text_input(std_screen, swap_size_text, swap_offset, 0)
try:
state.swap_size = int(swap_size_str)
state.show_swap_input = False
except ValueError:
std_screen.addstr(swap_offset, 0, "Invalid input. Press any key to continue.")
std_screen.getch()
state.show_swap_input = False
return state
def reserve_size_input(
std_screen: curses.window,
state: State,
reserve_offset: int,
) -> State:
"""Reserve size input.
Args:
std_screen (curses.window): The curses window.
state (State): The state object.
reserve_offset (int): The reserve offset.
Returns:
State: The updated state object.
"""
reserve_size_text = "reserve size (GB): "
std_screen.addstr(reserve_offset, 0, f"{reserve_size_text}{state.reserve_size}")
if state.key == ord("\n") and state.cursor.get_y() == reserve_offset:
state.show_reserve_input = True
if state.show_reserve_input:
reserve_size_str = get_text_input(std_screen, reserve_size_text, reserve_offset, 0)
try:
state.reserve_size = int(reserve_size_str)
state.show_reserve_input = False
except ValueError:
std_screen.addstr(reserve_offset, 0, "Invalid input. Press any key to continue.")
std_screen.getch()
state.show_reserve_input = False
return state
def status_bar(
std_screen: curses.window,
cursor: Cursor,
width: int,
height: int,
) -> None:
"""Draw the status bar.
Args:
std_screen (curses.window): The curses window.
cursor (Cursor): The cursor.
width (int): The width.
height (int): The height.
"""
std_screen.attron(curses.A_REVERSE)
std_screen.attron(curses.color_pair(3))
status_bar = f"Press 'q' to exit | STATUS BAR | Pos: {cursor.get_x()}, {cursor.get_y()}"
std_screen.addstr(height - 1, 0, status_bar)
std_screen.addstr(height - 1, len(status_bar), " " * (width - len(status_bar) - 1))
std_screen.attroff(curses.color_pair(3))
std_screen.attroff(curses.A_REVERSE)
def get_device_id_mapping() -> dict[str, set[str]]:
"""Get a list of device ids.
Returns:
list[str]: the list of device ids
"""
device_ids = bash_wrapper("find /dev/disk/by-id -type l").splitlines()
device_id_mapping: dict[str, set[str]] = defaultdict(set)
for device_id in device_ids:
device = bash_wrapper(f"readlink -f {device_id}").strip()
device_id_mapping[device].add(device_id)
return device_id_mapping
def calculate_device_menu_padding(devices: list[dict[str, str]], column: str, padding: int = 0) -> int:
"""Calculate the device menu padding.
Args:
devices (list[dict[str, str]]): The devices.
column (str): The column.
padding (int, optional): The padding. Defaults to 0.
Returns:
int: The calculated padding.
"""
return max(len(device[column]) for device in devices) + padding
def draw_device_ids(
state: State,
row_number: int,
menu_start_x: int,
std_screen: curses.window,
menu_width: list[int],
device_ids: set[str],
) -> tuple[State, int]:
"""Draw device IDs.
Args:
state (State): The state object.
row_number (int): The row number.
menu_start_x (int): The menu start x.
std_screen (curses.window): The curses window.
menu_width (list[int]): The menu width.
device_ids (set[str]): The device IDs.
Returns:
tuple[State, int]: The updated state object and the row number.
"""
for device_id in sorted(device_ids):
row_number = row_number + 1
if row_number == state.cursor.get_y() and state.cursor.get_x() in menu_width:
std_screen.attron(curses.A_BOLD)
if state.key == ord(" "):
if device_id not in state.selected_device_ids:
state.selected_device_ids.add(device_id)
else:
state.selected_device_ids.remove(device_id)
if device_id in state.selected_device_ids:
std_screen.attron(curses.color_pair(7))
std_screen.addstr(row_number, menu_start_x, f" {device_id}")
std_screen.attroff(curses.color_pair(7))
std_screen.attroff(curses.A_BOLD)
return state, row_number
def draw_device_menu(
std_screen: curses.window,
devices: list[dict[str, str]],
device_id_mapping: dict[str, set[str]],
state: State,
menu_start_y: int = 0,
menu_start_x: int = 0,
) -> tuple[State, int]:
"""Draw the device menu and handle user input.
Args:
std_screen (curses.window): the curses window to draw on
devices (list[dict[str, str]]): the list of devices to draw
device_id_mapping (dict[str, set[str]]): the list of device ids to draw
state (State): the state object to update
menu_start_y (int, optional): the y position to start drawing the menu. Defaults to 0.
menu_start_x (int, optional): the x position to start drawing the menu. Defaults to 0.
Returns:
State: the updated state object
"""
padding = 2
name_padding = calculate_device_menu_padding(devices, "name", padding)
size_padding = calculate_device_menu_padding(devices, "size", padding)
type_padding = calculate_device_menu_padding(devices, "type", padding)
mountpoints_padding = calculate_device_menu_padding(devices, "mountpoints", padding)
device_header = (
f"{'Name':{name_padding}}{'Size':{size_padding}}{'Type':{type_padding}}{'Mountpoints':{mountpoints_padding}}"
)
menu_width = list(range(menu_start_x, len(device_header) + menu_start_x))
std_screen.addstr(menu_start_y, menu_start_x, device_header, curses.color_pair(5))
devises_list_start = menu_start_y + 1
row_number = devises_list_start
for device in devices:
row_number = row_number + 1
device_name = device["name"]
device_row = (
f"{device_name:{name_padding}}"
f"{device['size']:{size_padding}}"
f"{device['type']:{type_padding}}"
f"{device['mountpoints']:{mountpoints_padding}}"
)
std_screen.addstr(row_number, menu_start_x, device_row)
state, row_number = draw_device_ids(
state=state,
row_number=row_number,
menu_start_x=menu_start_x,
std_screen=std_screen,
menu_width=menu_width,
device_ids=device_id_mapping[device_name],
)
return state, row_number
def draw_menu(std_screen: curses.window) -> State:
"""Draw the menu and handle user input.
Args:
std_screen (curses.window): the curses window to draw on
Returns:
State: the state object
"""
# Clear and refresh the screen for a blank canvas
std_screen.clear()
std_screen.refresh()
set_color()
state = State()
devices = get_devices()
device_id_mapping = get_device_id_mapping()
# Loop where k is the last character pressed
while state.key != ord("q"):
std_screen.clear()
height, width = std_screen.getmaxyx()
state.cursor.set_height(height)
state.cursor.set_width(width)
state.cursor.navigation(state.key)
state, device_menu_size = draw_device_menu(
std_screen=std_screen,
state=state,
devices=devices,
device_id_mapping=device_id_mapping,
)
swap_offset = device_menu_size + 2
swap_size_input(
std_screen=std_screen,
state=state,
swap_offset=swap_offset,
)
reserve_size_input(
std_screen=std_screen,
state=state,
reserve_offset=swap_offset + 1,
)
status_bar(std_screen, state.cursor, width, height)
debug_menu(std_screen, state.key)
std_screen.move(state.cursor.get_y(), state.cursor.get_x())
std_screen.refresh()
state.key = std_screen.getch()
return state