Compare commits
2 Commits
5d3a851137
...
a9a96db944
| Author | SHA1 | Date | |
|---|---|---|---|
| a9a96db944 | |||
| d34154541d |
@@ -16,9 +16,13 @@ from typing import TYPE_CHECKING
|
|||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from collections.abc import Sequence
|
from collections.abc import Sequence
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
ESCAPE_KEY = 27
|
||||||
|
|
||||||
|
|
||||||
def configure_logger(level: str = "INFO") -> None:
|
def configure_logger(level: str = "INFO") -> None:
|
||||||
"""Configure the logger.
|
"""Configure the logger.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
level (str, optional): The logging level. Defaults to "INFO".
|
level (str, optional): The logging level. Defaults to "INFO".
|
||||||
"""
|
"""
|
||||||
@@ -32,15 +36,17 @@ def configure_logger(level: str = "INFO") -> None:
|
|||||||
|
|
||||||
def bash_wrapper(command: str) -> str:
|
def bash_wrapper(command: str) -> str:
|
||||||
"""Execute a bash command and capture the output.
|
"""Execute a bash command and capture the output.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
command (str): The bash command to be executed.
|
command (str): The bash command to be executed.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Tuple[str, int]: A tuple containing the output of the command (stdout) as a string,
|
Tuple[str, int]: A tuple containing the output of the command (stdout) as a string,
|
||||||
the error output (stderr) as a string (optional), and the return code as an integer.
|
the error output (stderr) as a string (optional), and the return code as an integer.
|
||||||
"""
|
"""
|
||||||
logging.debug(f"running {command=}")
|
logger.debug(f"running {command=}")
|
||||||
# This is a acceptable risk
|
# This is a acceptable risk
|
||||||
process = Popen(command.split(), stdout=PIPE, stderr=PIPE) # noqa: S603
|
process = Popen(command.split(), stdout=PIPE, stderr=PIPE)
|
||||||
output, _ = process.communicate()
|
output, _ = process.communicate()
|
||||||
if process.returncode != 0:
|
if process.returncode != 0:
|
||||||
error = f"Failed to run command {command=} return code {process.returncode=}"
|
error = f"Failed to run command {command=} return code {process.returncode=}"
|
||||||
@@ -51,6 +57,7 @@ def bash_wrapper(command: str) -> str:
|
|||||||
|
|
||||||
def partition_disk(disk: str, swap_size: int, reserve: int = 0) -> None:
|
def partition_disk(disk: str, swap_size: int, reserve: int = 0) -> None:
|
||||||
"""Partition a disk.
|
"""Partition a disk.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
disk (str): The disk to partition.
|
disk (str): The disk to partition.
|
||||||
swap_size (int): The size of the swap partition in GB.
|
swap_size (int): The size of the swap partition in GB.
|
||||||
@@ -58,7 +65,7 @@ def partition_disk(disk: str, swap_size: int, reserve: int = 0) -> None:
|
|||||||
reserve (int, optional): The size of the reserve partition in GB. Defaults to 0.
|
reserve (int, optional): The size of the reserve partition in GB. Defaults to 0.
|
||||||
minimum value is 0.
|
minimum value is 0.
|
||||||
"""
|
"""
|
||||||
logging.info(f"partitioning {disk=}")
|
logger.info(f"partitioning {disk=}")
|
||||||
swap_size = max(swap_size, 1)
|
swap_size = max(swap_size, 1)
|
||||||
reserve = max(reserve, 0)
|
reserve = max(reserve, 0)
|
||||||
|
|
||||||
@@ -66,16 +73,16 @@ def partition_disk(disk: str, swap_size: int, reserve: int = 0) -> None:
|
|||||||
|
|
||||||
if reserve > 0:
|
if reserve > 0:
|
||||||
msg = f"Creating swap partition on {disk=} with size {swap_size=}GiB and reserve {reserve=}GiB"
|
msg = f"Creating swap partition on {disk=} with size {swap_size=}GiB and reserve {reserve=}GiB"
|
||||||
logging.info(msg)
|
logger.info(msg)
|
||||||
|
|
||||||
swap_start = swap_size + reserve
|
swap_start = swap_size + reserve
|
||||||
swap_partition = f"mkpart swap -{swap_start}GiB -{reserve}GiB "
|
swap_partition = f"mkpart swap -{swap_start}GiB -{reserve}GiB "
|
||||||
else:
|
else:
|
||||||
logging.info(f"Creating swap partition on {disk=} with size {swap_size=}GiB")
|
logger.info(f"Creating swap partition on {disk=} with size {swap_size=}GiB")
|
||||||
swap_start = swap_size
|
swap_start = swap_size
|
||||||
swap_partition = f"mkpart swap -{swap_start}GiB 100% "
|
swap_partition = f"mkpart swap -{swap_start}GiB 100% "
|
||||||
|
|
||||||
logging.debug(f"{swap_partition=}")
|
logger.debug(f"{swap_partition=}")
|
||||||
|
|
||||||
create_partitions = (
|
create_partitions = (
|
||||||
f"parted --script --align=optimal {disk} -- "
|
f"parted --script --align=optimal {disk} -- "
|
||||||
@@ -87,13 +94,14 @@ def partition_disk(disk: str, swap_size: int, reserve: int = 0) -> None:
|
|||||||
)
|
)
|
||||||
bash_wrapper(create_partitions)
|
bash_wrapper(create_partitions)
|
||||||
|
|
||||||
logging.info(f"{disk=} successfully partitioned")
|
logger.info(f"{disk=} successfully partitioned")
|
||||||
|
|
||||||
|
|
||||||
def create_zfs_pool(pool_disks: Sequence[str], mnt_dir: str) -> None:
|
def create_zfs_pool(pool_disks: Sequence[str], mnt_dir: str) -> None:
|
||||||
"""Create a ZFS pool.
|
"""Create a ZFS pool.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
disks (Sequence[str]): A tuple of disks to use for the pool.
|
pool_disks (Sequence[str]): A tuple of disks to use for the pool.
|
||||||
mnt_dir (str): The mount directory.
|
mnt_dir (str): The mount directory.
|
||||||
"""
|
"""
|
||||||
if len(pool_disks) <= 0:
|
if len(pool_disks) <= 0:
|
||||||
@@ -125,13 +133,12 @@ def create_zfs_pool(pool_disks: Sequence[str], mnt_dir: str) -> None:
|
|||||||
bash_wrapper(zpool_create)
|
bash_wrapper(zpool_create)
|
||||||
zpools = bash_wrapper("zpool list -o name")
|
zpools = bash_wrapper("zpool list -o name")
|
||||||
if "root_pool" not in zpools.splitlines():
|
if "root_pool" not in zpools.splitlines():
|
||||||
logging.critical("Failed to create root_pool")
|
logger.critical("Failed to create root_pool")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
def create_zfs_datasets() -> None:
|
def create_zfs_datasets() -> None:
|
||||||
"""Create ZFS datasets."""
|
"""Create ZFS datasets."""
|
||||||
|
|
||||||
bash_wrapper("zfs create -o canmount=noauto -o reservation=10G root_pool/root")
|
bash_wrapper("zfs create -o canmount=noauto -o reservation=10G root_pool/root")
|
||||||
bash_wrapper("zfs create root_pool/home")
|
bash_wrapper("zfs create root_pool/home")
|
||||||
bash_wrapper("zfs create root_pool/var -o reservation=1G")
|
bash_wrapper("zfs create root_pool/var -o reservation=1G")
|
||||||
@@ -146,7 +153,7 @@ def create_zfs_datasets() -> None:
|
|||||||
}
|
}
|
||||||
missing_datasets = expected_datasets.difference(datasets.splitlines())
|
missing_datasets = expected_datasets.difference(datasets.splitlines())
|
||||||
if missing_datasets:
|
if missing_datasets:
|
||||||
logging.critical(f"Failed to create pools {missing_datasets}")
|
logger.critical(f"Failed to create pools {missing_datasets}")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
@@ -159,6 +166,8 @@ def get_cpu_manufacturer() -> str:
|
|||||||
for line in output.splitlines():
|
for line in output.splitlines():
|
||||||
if "vendor_id" in line:
|
if "vendor_id" in line:
|
||||||
return id_vendor[line.split(": ")[1].strip()]
|
return id_vendor[line.split(": ")[1].strip()]
|
||||||
|
error = "Failed to get CPU manufacturer"
|
||||||
|
raise RuntimeError(error)
|
||||||
|
|
||||||
|
|
||||||
def get_boot_drive_id(disk: str) -> str:
|
def get_boot_drive_id(disk: str) -> str:
|
||||||
@@ -167,9 +176,8 @@ def get_boot_drive_id(disk: str) -> str:
|
|||||||
return output.splitlines()[1]
|
return output.splitlines()[1]
|
||||||
|
|
||||||
|
|
||||||
def create_nix_hardware_file(mnt_dir: str, disks: Sequence[str], encrypt: bool) -> None:
|
def create_nix_hardware_file(mnt_dir: str, disks: Sequence[str], *, encrypt: bool) -> None:
|
||||||
"""Create a NixOS hardware file."""
|
"""Create a NixOS hardware file."""
|
||||||
|
|
||||||
cpu_manufacturer = get_cpu_manufacturer()
|
cpu_manufacturer = get_cpu_manufacturer()
|
||||||
|
|
||||||
devices = ""
|
devices = ""
|
||||||
@@ -193,7 +201,15 @@ def create_nix_hardware_file(mnt_dir: str, disks: Sequence[str], encrypt: bool)
|
|||||||
' imports = [ (modulesPath + "/installer/scan/not-detected.nix") ];\n\n'
|
' imports = [ (modulesPath + "/installer/scan/not-detected.nix") ];\n\n'
|
||||||
" boot = {\n"
|
" boot = {\n"
|
||||||
" initrd = {\n"
|
" initrd = {\n"
|
||||||
' availableKernelModules = [ \n "ahci"\n "ehci_pci"\n "nvme"\n "sd_mod"\n "usb_storage"\n "usbhid"\n "xhci_pci"\n ];\n'
|
" availableKernelModules = [ \n"
|
||||||
|
' "ahci"\n'
|
||||||
|
' "ehci_pci"\n'
|
||||||
|
' "nvme"\n'
|
||||||
|
' "sd_mod"\n'
|
||||||
|
' "usb_storage"\n'
|
||||||
|
' "usbhid"\n'
|
||||||
|
' "xhci_pci"\n'
|
||||||
|
" ];\n"
|
||||||
" kernelModules = [ ];\n"
|
" kernelModules = [ ];\n"
|
||||||
f" {devices}"
|
f" {devices}"
|
||||||
" };\n"
|
" };\n"
|
||||||
@@ -207,11 +223,18 @@ def create_nix_hardware_file(mnt_dir: str, disks: Sequence[str], encrypt: bool)
|
|||||||
' "/nix" = {\n device = "root_pool/nix";\n fsType = "zfs";\n };\n\n'
|
' "/nix" = {\n device = "root_pool/nix";\n fsType = "zfs";\n };\n\n'
|
||||||
' "/boot" = {\n'
|
' "/boot" = {\n'
|
||||||
f' device = "/dev/disk/by-uuid/{get_boot_drive_id(disks[0])}";\n'
|
f' device = "/dev/disk/by-uuid/{get_boot_drive_id(disks[0])}";\n'
|
||||||
' fsType = "vfat";\n options = [\n "fmask=0077"\n "dmask=0077"\n ];\n };\n };\n\n'
|
' fsType = "vfat";\n'
|
||||||
|
" options = [\n"
|
||||||
|
' "fmask=0077"\n'
|
||||||
|
' "dmask=0077"\n'
|
||||||
|
" ];\n"
|
||||||
|
" };\n"
|
||||||
|
" };\n\n"
|
||||||
" swapDevices = [ ];\n\n"
|
" swapDevices = [ ];\n\n"
|
||||||
" networking.useDHCP = lib.mkDefault true;\n\n"
|
" networking.useDHCP = lib.mkDefault true;\n\n"
|
||||||
' nixpkgs.hostPlatform = lib.mkDefault "x86_64-linux";\n'
|
' nixpkgs.hostPlatform = lib.mkDefault "x86_64-linux";\n'
|
||||||
f" hardware.cpu.{cpu_manufacturer}.updateMicrocode = lib.mkDefault config.hardware.enableRedistributableFirmware;\n"
|
f" hardware.cpu.{cpu_manufacturer}.updateMicrocode = lib.mkDefault "
|
||||||
|
"config.hardware.enableRedistributableFirmware;\n"
|
||||||
f' networking.hostId = "{host_id}";\n'
|
f' networking.hostId = "{host_id}";\n'
|
||||||
"}\n"
|
"}\n"
|
||||||
)
|
)
|
||||||
@@ -219,7 +242,7 @@ def create_nix_hardware_file(mnt_dir: str, disks: Sequence[str], encrypt: bool)
|
|||||||
Path(f"{mnt_dir}/etc/nixos/hardware-configuration.nix").write_text(nix_hardware)
|
Path(f"{mnt_dir}/etc/nixos/hardware-configuration.nix").write_text(nix_hardware)
|
||||||
|
|
||||||
|
|
||||||
def install_nixos(mnt_dir: str, disks: Sequence[str], encrypt: bool) -> None:
|
def install_nixos(mnt_dir: str, disks: Sequence[str], *, encrypt: bool) -> None:
|
||||||
"""Install NixOS."""
|
"""Install NixOS."""
|
||||||
bash_wrapper(f"mount -o X-mount.mkdir -t zfs root_pool/root {mnt_dir}")
|
bash_wrapper(f"mount -o X-mount.mkdir -t zfs root_pool/root {mnt_dir}")
|
||||||
bash_wrapper(f"mount -o X-mount.mkdir -t zfs root_pool/home {mnt_dir}/home")
|
bash_wrapper(f"mount -o X-mount.mkdir -t zfs root_pool/home {mnt_dir}/home")
|
||||||
@@ -230,14 +253,16 @@ def install_nixos(mnt_dir: str, disks: Sequence[str], encrypt: bool) -> None:
|
|||||||
bash_wrapper(f"mkfs.vfat -n EFI {disk}-part1")
|
bash_wrapper(f"mkfs.vfat -n EFI {disk}-part1")
|
||||||
|
|
||||||
# set up mirroring afterwards if more than one disk
|
# set up mirroring afterwards if more than one disk
|
||||||
boot_partition = f"mount -t vfat -o fmask=0077,dmask=0077,iocharset=iso8859-1,X-mount.mkdir {disks[0]}-part1 {mnt_dir}/boot"
|
boot_partition = (
|
||||||
|
f"mount -t vfat -o fmask=0077,dmask=0077,iocharset=iso8859-1,X-mount.mkdir {disks[0]}-part1 {mnt_dir}/boot"
|
||||||
|
)
|
||||||
bash_wrapper(boot_partition)
|
bash_wrapper(boot_partition)
|
||||||
|
|
||||||
bash_wrapper(f"nixos-generate-config --root {mnt_dir}")
|
bash_wrapper(f"nixos-generate-config --root {mnt_dir}")
|
||||||
|
|
||||||
create_nix_hardware_file(mnt_dir, disks, encrypt)
|
create_nix_hardware_file(mnt_dir, disks, encrypt=encrypt)
|
||||||
|
|
||||||
run(("nixos-install", "--root", mnt_dir), check=True) # noqa: S603
|
run(("nixos-install", "--root", mnt_dir), check=True)
|
||||||
|
|
||||||
|
|
||||||
def installer(
|
def installer(
|
||||||
@@ -247,27 +272,37 @@ def installer(
|
|||||||
encrypt_key: str | None,
|
encrypt_key: str | None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Main."""
|
"""Main."""
|
||||||
logging.info("Starting installation")
|
logger.info("Starting installation")
|
||||||
|
|
||||||
for disk in disks:
|
for disk in disks:
|
||||||
partition_disk(disk, swap_size, reserve)
|
partition_disk(disk, swap_size, reserve)
|
||||||
|
|
||||||
if encrypt_key:
|
if encrypt_key:
|
||||||
sleep(1)
|
sleep(1)
|
||||||
for command in (
|
key_input = encrypt_key.encode()
|
||||||
f'printf "{encrypt_key}" | cryptsetup luksFormat --type luks2 {disk}-part2 -',
|
run(
|
||||||
f'printf "{encrypt_key}" | cryptsetup luksOpen {disk}-part2 luks-root-pool-{disk.split("/")[-1]}-part2 -',
|
("cryptsetup", "luksFormat", "--type", "luks2", f"{disk}-part2", "-"),
|
||||||
):
|
input=key_input,
|
||||||
run(command, shell=True, check=True)
|
check=True,
|
||||||
|
)
|
||||||
|
run(
|
||||||
|
(
|
||||||
|
"cryptsetup",
|
||||||
|
"luksOpen",
|
||||||
|
f"{disk}-part2",
|
||||||
|
f"luks-root-pool-{disk.split('/')[-1]}-part2",
|
||||||
|
"-",
|
||||||
|
),
|
||||||
|
input=key_input,
|
||||||
|
check=True,
|
||||||
|
)
|
||||||
|
|
||||||
mnt_dir = "/tmp/nix_install" # noqa: S108
|
mnt_dir = "/tmp/nix_install" # noqa: S108
|
||||||
|
|
||||||
Path(mnt_dir).mkdir(parents=True, exist_ok=True)
|
Path(mnt_dir).mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
if encrypt_key:
|
if encrypt_key:
|
||||||
pool_disks = [
|
pool_disks = [f"/dev/mapper/luks-root-pool-{disk.split('/')[-1]}-part2" for disk in disks]
|
||||||
f"/dev/mapper/luks-root-pool-{disk.split('/')[-1]}-part2" for disk in disks
|
|
||||||
]
|
|
||||||
else:
|
else:
|
||||||
pool_disks = [f"{disk}-part2" for disk in disks]
|
pool_disks = [f"{disk}-part2" for disk in disks]
|
||||||
|
|
||||||
@@ -275,57 +310,73 @@ def installer(
|
|||||||
|
|
||||||
create_zfs_datasets()
|
create_zfs_datasets()
|
||||||
|
|
||||||
install_nixos(mnt_dir, disks, encrypt_key)
|
install_nixos(mnt_dir, disks, encrypt=bool(encrypt_key))
|
||||||
|
|
||||||
logging.info("Installation complete")
|
logger.info("Installation complete")
|
||||||
|
|
||||||
|
|
||||||
class Cursor:
|
class Cursor:
|
||||||
def __init__(self):
|
"""Track cursor position and constrain movement to screen bounds."""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
"""Initialize cursor position and screen dimensions."""
|
||||||
self.x_position = 0
|
self.x_position = 0
|
||||||
self.y_position = 0
|
self.y_position = 0
|
||||||
self.height = 0
|
self.height = 0
|
||||||
self.width = 0
|
self.width = 0
|
||||||
|
|
||||||
def set_height(self, height: int):
|
def set_height(self, height: int) -> None:
|
||||||
|
"""Set the maximum screen height."""
|
||||||
self.height = height
|
self.height = height
|
||||||
|
|
||||||
def set_width(self, width: int):
|
def set_width(self, width: int) -> None:
|
||||||
|
"""Set the maximum screen width."""
|
||||||
self.width = width
|
self.width = width
|
||||||
|
|
||||||
def x_bounce_check(self, cursor: int) -> int:
|
def x_bounce_check(self, cursor: int) -> int:
|
||||||
|
"""Clamp an x position to the screen width."""
|
||||||
cursor = max(0, cursor)
|
cursor = max(0, cursor)
|
||||||
return min(self.width - 1, cursor)
|
return min(self.width - 1, cursor)
|
||||||
|
|
||||||
def y_bounce_check(self, cursor: int) -> int:
|
def y_bounce_check(self, cursor: int) -> int:
|
||||||
|
"""Clamp a y position to the screen height."""
|
||||||
cursor = max(0, cursor)
|
cursor = max(0, cursor)
|
||||||
return min(self.height - 1, cursor)
|
return min(self.height - 1, cursor)
|
||||||
|
|
||||||
def set_x(self, x: int):
|
def set_x(self, x: int) -> None:
|
||||||
|
"""Set the cursor x position."""
|
||||||
self.x_position = self.x_bounce_check(x)
|
self.x_position = self.x_bounce_check(x)
|
||||||
|
|
||||||
def set_y(self, y: int):
|
def set_y(self, y: int) -> None:
|
||||||
|
"""Set the cursor y position."""
|
||||||
self.y_position = self.y_bounce_check(y)
|
self.y_position = self.y_bounce_check(y)
|
||||||
|
|
||||||
def get_x(self) -> int:
|
def get_x(self) -> int:
|
||||||
|
"""Get the cursor x position."""
|
||||||
return self.x_position
|
return self.x_position
|
||||||
|
|
||||||
def get_y(self) -> int:
|
def get_y(self) -> int:
|
||||||
|
"""Get the cursor y position."""
|
||||||
return self.y_position
|
return self.y_position
|
||||||
|
|
||||||
def move_up(self):
|
def move_up(self) -> None:
|
||||||
|
"""Move the cursor up one row."""
|
||||||
self.set_y(self.y_position - 1)
|
self.set_y(self.y_position - 1)
|
||||||
|
|
||||||
def move_down(self):
|
def move_down(self) -> None:
|
||||||
|
"""Move the cursor down one row."""
|
||||||
self.set_y(self.y_position + 1)
|
self.set_y(self.y_position + 1)
|
||||||
|
|
||||||
def move_left(self):
|
def move_left(self) -> None:
|
||||||
|
"""Move the cursor left one column."""
|
||||||
self.set_x(self.x_position - 1)
|
self.set_x(self.x_position - 1)
|
||||||
|
|
||||||
def move_right(self):
|
def move_right(self) -> None:
|
||||||
|
"""Move the cursor right one column."""
|
||||||
self.set_x(self.x_position + 1)
|
self.set_x(self.x_position + 1)
|
||||||
|
|
||||||
def navigation(self, key: int) -> None:
|
def navigation(self, key: int) -> None:
|
||||||
|
"""Move the cursor for a curses navigation key."""
|
||||||
action = {
|
action = {
|
||||||
curses.KEY_DOWN: self.move_down,
|
curses.KEY_DOWN: self.move_down,
|
||||||
curses.KEY_UP: self.move_up,
|
curses.KEY_UP: self.move_up,
|
||||||
@@ -339,7 +390,8 @@ class Cursor:
|
|||||||
class State:
|
class State:
|
||||||
"""State class to store the state of the program."""
|
"""State class to store the state of the program."""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self) -> None:
|
||||||
|
"""Initialize installer menu state."""
|
||||||
self.key = 0
|
self.key = 0
|
||||||
self.cursor = Cursor()
|
self.cursor = Cursor()
|
||||||
|
|
||||||
@@ -357,11 +409,9 @@ class State:
|
|||||||
|
|
||||||
|
|
||||||
def get_device(raw_device: str) -> dict[str, str]:
|
def get_device(raw_device: str) -> dict[str, str]:
|
||||||
|
"""Parse an lsblk key-value device row."""
|
||||||
raw_device_components = raw_device.split(" ")
|
raw_device_components = raw_device.split(" ")
|
||||||
return {
|
return {thing.split("=")[0].lower(): thing.split("=")[1].strip('"') for thing in raw_device_components}
|
||||||
thing.split("=")[0].lower(): thing.split("=")[1].strip('"')
|
|
||||||
for thing in raw_device_components
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def get_devices() -> list[dict[str, str]]:
|
def get_devices() -> list[dict[str, str]]:
|
||||||
@@ -373,6 +423,7 @@ def get_devices() -> list[dict[str, str]]:
|
|||||||
|
|
||||||
def get_device_id_mapping() -> dict[str, set[str]]:
|
def get_device_id_mapping() -> dict[str, set[str]]:
|
||||||
"""Get a list of device ids.
|
"""Get a list of device ids.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
list[str]: the list of device ids
|
list[str]: the list of device ids
|
||||||
"""
|
"""
|
||||||
@@ -387,9 +438,8 @@ def get_device_id_mapping() -> dict[str, set[str]]:
|
|||||||
return device_id_mapping
|
return device_id_mapping
|
||||||
|
|
||||||
|
|
||||||
def calculate_device_menu_padding(
|
def calculate_device_menu_padding(devices: list[dict[str, str]], column: str, padding: int = 0) -> int:
|
||||||
devices: list[dict[str, str]], column: str, padding: int = 0
|
"""Calculate the width needed for a device menu column."""
|
||||||
) -> int:
|
|
||||||
return max(len(device[column]) for device in devices) + padding
|
return max(len(device[column]) for device in devices) + padding
|
||||||
|
|
||||||
|
|
||||||
@@ -401,6 +451,7 @@ def draw_device_ids(
|
|||||||
menu_width: list[int],
|
menu_width: list[int],
|
||||||
device_ids: set[str],
|
device_ids: set[str],
|
||||||
) -> tuple[State, int]:
|
) -> tuple[State, int]:
|
||||||
|
"""Draw selectable device IDs for a device row."""
|
||||||
for device_id in sorted(device_ids):
|
for device_id in sorted(device_ids):
|
||||||
row_number = row_number + 1
|
row_number = row_number + 1
|
||||||
if row_number == state.cursor.get_y() and state.cursor.get_x() in menu_width:
|
if row_number == state.cursor.get_y() and state.cursor.get_x() in menu_width:
|
||||||
@@ -429,8 +480,9 @@ def draw_device_menu(
|
|||||||
state: State,
|
state: State,
|
||||||
menu_start_y: int = 0,
|
menu_start_y: int = 0,
|
||||||
menu_start_x: int = 0,
|
menu_start_x: int = 0,
|
||||||
) -> State:
|
) -> tuple[State, int]:
|
||||||
"""draw the device menu and handle user input
|
"""Draw the device menu and handle user input.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
std_screen (curses.window): the curses window to draw on
|
std_screen (curses.window): the curses window to draw on
|
||||||
devices (list[dict[str, str]]): the list of devices to draw
|
devices (list[dict[str, str]]): the list of devices to draw
|
||||||
@@ -438,6 +490,7 @@ def draw_device_menu(
|
|||||||
state (State): the state object to update
|
state (State): the state object to update
|
||||||
menu_start_y (int, optional): the y position to start drawing the menu. Defaults to 0.
|
menu_start_y (int, optional): the y position to start drawing the menu. Defaults to 0.
|
||||||
menu_start_x (int, optional): the x position to start drawing the menu. Defaults to 0.
|
menu_start_x (int, optional): the x position to start drawing the menu. Defaults to 0.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
State: the updated state object
|
State: the updated state object
|
||||||
"""
|
"""
|
||||||
@@ -448,7 +501,9 @@ def draw_device_menu(
|
|||||||
type_padding = calculate_device_menu_padding(devices, "type", padding)
|
type_padding = calculate_device_menu_padding(devices, "type", padding)
|
||||||
mountpoints_padding = calculate_device_menu_padding(devices, "mountpoints", padding)
|
mountpoints_padding = calculate_device_menu_padding(devices, "mountpoints", padding)
|
||||||
|
|
||||||
device_header = f"{'Name':{name_padding}}{'Size':{size_padding}}{'Type':{type_padding}}{'Mountpoints':{mountpoints_padding}}"
|
device_header = (
|
||||||
|
f"{'Name':{name_padding}}{'Size':{size_padding}}{'Type':{type_padding}}{'Mountpoints':{mountpoints_padding}}"
|
||||||
|
)
|
||||||
|
|
||||||
menu_width = range(menu_start_x, len(device_header) + menu_start_x)
|
menu_width = range(menu_start_x, len(device_header) + menu_start_x)
|
||||||
|
|
||||||
@@ -481,8 +536,9 @@ def draw_device_menu(
|
|||||||
|
|
||||||
|
|
||||||
def debug_menu(std_screen: curses.window, key: int) -> None:
|
def debug_menu(std_screen: curses.window, key: int) -> None:
|
||||||
|
"""Draw debug information for the current curses screen."""
|
||||||
height, width = std_screen.getmaxyx()
|
height, width = std_screen.getmaxyx()
|
||||||
width_height = "Width: {}, Height: {}".format(width, height)
|
width_height = f"Width: {width}, Height: {height}"
|
||||||
std_screen.addstr(height - 4, 0, width_height, curses.color_pair(5))
|
std_screen.addstr(height - 4, 0, width_height, curses.color_pair(5))
|
||||||
|
|
||||||
key_pressed = f"Last key pressed: {key}"[: width - 1]
|
key_pressed = f"Last key pressed: {key}"[: width - 1]
|
||||||
@@ -490,7 +546,7 @@ def debug_menu(std_screen: curses.window, key: int) -> None:
|
|||||||
key_pressed = "No key press detected..."[: width - 1]
|
key_pressed = "No key press detected..."[: width - 1]
|
||||||
std_screen.addstr(height - 3, 0, key_pressed)
|
std_screen.addstr(height - 3, 0, key_pressed)
|
||||||
|
|
||||||
for i in range(0, 8):
|
for i in range(8):
|
||||||
std_screen.addstr(height - 2, i * 3, f"{i}██", curses.color_pair(i))
|
std_screen.addstr(height - 2, i * 3, f"{i}██", curses.color_pair(i))
|
||||||
|
|
||||||
|
|
||||||
@@ -500,12 +556,11 @@ def status_bar(
|
|||||||
width: int,
|
width: int,
|
||||||
height: int,
|
height: int,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
"""Draw the footer status bar."""
|
||||||
std_screen.attron(curses.A_REVERSE)
|
std_screen.attron(curses.A_REVERSE)
|
||||||
std_screen.attron(curses.color_pair(3))
|
std_screen.attron(curses.color_pair(3))
|
||||||
|
|
||||||
status_bar = (
|
status_bar = f"Press 'q' to exit | STATUS BAR | Pos: {cursor.get_x()}, {cursor.get_y()}"
|
||||||
f"Press 'q' to exit | STATUS BAR | Pos: {cursor.get_x()}, {cursor.get_y()}"
|
|
||||||
)
|
|
||||||
std_screen.addstr(height - 1, 0, status_bar)
|
std_screen.addstr(height - 1, 0, status_bar)
|
||||||
std_screen.addstr(height - 1, len(status_bar), " " * (width - len(status_bar) - 1))
|
std_screen.addstr(height - 1, len(status_bar), " " * (width - len(status_bar) - 1))
|
||||||
|
|
||||||
@@ -514,13 +569,15 @@ def status_bar(
|
|||||||
|
|
||||||
|
|
||||||
def set_color() -> None:
|
def set_color() -> None:
|
||||||
|
"""Initialize curses color pairs."""
|
||||||
curses.start_color()
|
curses.start_color()
|
||||||
curses.use_default_colors()
|
curses.use_default_colors()
|
||||||
for i in range(0, curses.COLORS):
|
for i in range(curses.COLORS):
|
||||||
curses.init_pair(i + 1, i, -1)
|
curses.init_pair(i + 1, i, -1)
|
||||||
|
|
||||||
|
|
||||||
def get_text_input(std_screen: curses.window, prompt: str, y: int, x: int) -> str:
|
def get_text_input(std_screen: curses.window, prompt: str, y: int, x: int) -> str:
|
||||||
|
"""Read text input from a curses screen."""
|
||||||
curses.echo()
|
curses.echo()
|
||||||
std_screen.addstr(y, x, prompt)
|
std_screen.addstr(y, x, prompt)
|
||||||
input_str = ""
|
input_str = ""
|
||||||
@@ -528,10 +585,10 @@ def get_text_input(std_screen: curses.window, prompt: str, y: int, x: int) -> st
|
|||||||
key = std_screen.getch()
|
key = std_screen.getch()
|
||||||
if key == ord("\n"):
|
if key == ord("\n"):
|
||||||
break
|
break
|
||||||
elif key == 27: # ESC key
|
if key == ESCAPE_KEY:
|
||||||
input_str = ""
|
input_str = ""
|
||||||
break
|
break
|
||||||
elif key in (curses.KEY_BACKSPACE, ord("\b"), 127):
|
if key in (curses.KEY_BACKSPACE, ord("\b"), 127):
|
||||||
input_str = input_str[:-1]
|
input_str = input_str[:-1]
|
||||||
std_screen.addstr(y, x + len(prompt), input_str + " ")
|
std_screen.addstr(y, x + len(prompt), input_str + " ")
|
||||||
else:
|
else:
|
||||||
@@ -546,6 +603,7 @@ def swap_size_input(
|
|||||||
state: State,
|
state: State,
|
||||||
swap_offset: int,
|
swap_offset: int,
|
||||||
) -> State:
|
) -> State:
|
||||||
|
"""Handle swap size input."""
|
||||||
swap_size_text = "Swap size (GB): "
|
swap_size_text = "Swap size (GB): "
|
||||||
std_screen.addstr(swap_offset, 0, f"{swap_size_text}{state.swap_size}")
|
std_screen.addstr(swap_offset, 0, f"{swap_size_text}{state.swap_size}")
|
||||||
if state.key == ord("\n") and state.cursor.get_y() == swap_offset:
|
if state.key == ord("\n") and state.cursor.get_y() == swap_offset:
|
||||||
@@ -557,9 +615,7 @@ def swap_size_input(
|
|||||||
state.swap_size = int(swap_size_str)
|
state.swap_size = int(swap_size_str)
|
||||||
state.show_swap_input = False
|
state.show_swap_input = False
|
||||||
except ValueError:
|
except ValueError:
|
||||||
std_screen.addstr(
|
std_screen.addstr(swap_offset, 0, "Invalid input. Press any key to continue.")
|
||||||
swap_offset, 0, "Invalid input. Press any key to continue."
|
|
||||||
)
|
|
||||||
std_screen.getch()
|
std_screen.getch()
|
||||||
state.show_swap_input = False
|
state.show_swap_input = False
|
||||||
|
|
||||||
@@ -571,22 +627,19 @@ def reserve_size_input(
|
|||||||
state: State,
|
state: State,
|
||||||
reserve_offset: int,
|
reserve_offset: int,
|
||||||
) -> State:
|
) -> State:
|
||||||
|
"""Handle reserve size input."""
|
||||||
reserve_size_text = "reserve size (GB): "
|
reserve_size_text = "reserve size (GB): "
|
||||||
std_screen.addstr(reserve_offset, 0, f"{reserve_size_text}{state.reserve_size}")
|
std_screen.addstr(reserve_offset, 0, f"{reserve_size_text}{state.reserve_size}")
|
||||||
if state.key == ord("\n") and state.cursor.get_y() == reserve_offset:
|
if state.key == ord("\n") and state.cursor.get_y() == reserve_offset:
|
||||||
state.show_reserve_input = True
|
state.show_reserve_input = True
|
||||||
|
|
||||||
if state.show_reserve_input:
|
if state.show_reserve_input:
|
||||||
reserve_size_str = get_text_input(
|
reserve_size_str = get_text_input(std_screen, reserve_size_text, reserve_offset, 0)
|
||||||
std_screen, reserve_size_text, reserve_offset, 0
|
|
||||||
)
|
|
||||||
try:
|
try:
|
||||||
state.reserve_size = int(reserve_size_str)
|
state.reserve_size = int(reserve_size_str)
|
||||||
state.show_reserve_input = False
|
state.show_reserve_input = False
|
||||||
except ValueError:
|
except ValueError:
|
||||||
std_screen.addstr(
|
std_screen.addstr(reserve_offset, 0, "Invalid input. Press any key to continue.")
|
||||||
reserve_offset, 0, "Invalid input. Press any key to continue."
|
|
||||||
)
|
|
||||||
std_screen.getch()
|
std_screen.getch()
|
||||||
state.show_reserve_input = False
|
state.show_reserve_input = False
|
||||||
|
|
||||||
@@ -594,9 +647,11 @@ def reserve_size_input(
|
|||||||
|
|
||||||
|
|
||||||
def draw_menu(std_screen: curses.window) -> State:
|
def draw_menu(std_screen: curses.window) -> State:
|
||||||
"""draw the menu and handle user input
|
"""Draw the menu and handle user input.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
std_screen (curses.window): the curses window to draw on
|
std_screen (curses.window): the curses window to draw on
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
State: the state object
|
State: the state object
|
||||||
"""
|
"""
|
||||||
@@ -656,17 +711,18 @@ def draw_menu(std_screen: curses.window) -> State:
|
|||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
def main() -> None:
|
||||||
|
"""Run the installer menu and start installation."""
|
||||||
configure_logger("DEBUG")
|
configure_logger("DEBUG")
|
||||||
|
|
||||||
state = curses.wrapper(draw_menu)
|
state = curses.wrapper(draw_menu)
|
||||||
|
|
||||||
encrypt_key = getenv("ENCRYPT_KEY")
|
encrypt_key = getenv("ENCRYPT_KEY")
|
||||||
|
|
||||||
logging.info("installing_nixos")
|
logger.info("installing_nixos")
|
||||||
logging.info(f"disks: {state.selected_device_ids}")
|
logger.info(f"disks: {state.selected_device_ids}")
|
||||||
logging.info(f"swap_size: {state.swap_size}")
|
logger.info(f"swap_size: {state.swap_size}")
|
||||||
logging.info(f"reserve: {state.reserve_size}")
|
logger.info(f"reserve: {state.reserve_size}")
|
||||||
logging.info(f"encrypted: {bool(encrypt_key)}")
|
logger.info(f"encrypted: {bool(encrypt_key)}")
|
||||||
|
|
||||||
sleep(3)
|
sleep(3)
|
||||||
|
|
||||||
Reference in New Issue
Block a user