mirror of
https://github.com/RichieCahill/dotfiles.git
synced 2026-04-17 13:08:19 -04:00
fixed all ruff error in python
This commit is contained in:
@@ -40,6 +40,10 @@ lint.ignore = [
|
|||||||
"tests/**" = [
|
"tests/**" = [
|
||||||
"S101", # (perm) pytest needs asserts
|
"S101", # (perm) pytest needs asserts
|
||||||
]
|
]
|
||||||
|
"python/random/**" = [
|
||||||
|
"T201", # (perm) I don't care about print statements dir
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
[tool.ruff.lint.pydocstyle]
|
[tool.ruff.lint.pydocstyle]
|
||||||
convention = "google"
|
convention = "google"
|
||||||
|
|||||||
@@ -18,6 +18,8 @@ from python.installer.tui import draw_menu
|
|||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from collections.abc import Sequence
|
from collections.abc import Sequence
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def bash_wrapper(command: str) -> str:
|
def bash_wrapper(command: str) -> str:
|
||||||
"""Execute a bash command and capture the output.
|
"""Execute a bash command and capture the output.
|
||||||
@@ -29,7 +31,7 @@ def bash_wrapper(command: str) -> str:
|
|||||||
Tuple[str, int]: A tuple containing the output of the command (stdout) as a string,
|
Tuple[str, int]: A tuple containing the output of the command (stdout) as a string,
|
||||||
the error output (stderr) as a string (optional), and the return code as an integer.
|
the error output (stderr) as a string (optional), and the return code as an integer.
|
||||||
"""
|
"""
|
||||||
logging.debug(f"running {command=}")
|
logger.debug(f"running {command=}")
|
||||||
# This is a acceptable risk
|
# This is a acceptable risk
|
||||||
process = Popen(command.split(), stdout=PIPE, stderr=PIPE)
|
process = Popen(command.split(), stdout=PIPE, stderr=PIPE)
|
||||||
output, _ = process.communicate()
|
output, _ = process.communicate()
|
||||||
@@ -50,7 +52,7 @@ def partition_disk(disk: str, swap_size: int, reserve: int = 0) -> None:
|
|||||||
reserve (int, optional): The size of the reserve partition in GB. Defaults to 0.
|
reserve (int, optional): The size of the reserve partition in GB. Defaults to 0.
|
||||||
minimum value is 0.
|
minimum value is 0.
|
||||||
"""
|
"""
|
||||||
logging.info(f"partitioning {disk=}")
|
logger.info(f"partitioning {disk=}")
|
||||||
swap_size = max(swap_size, 1)
|
swap_size = max(swap_size, 1)
|
||||||
reserve = max(reserve, 0)
|
reserve = max(reserve, 0)
|
||||||
|
|
||||||
@@ -58,16 +60,16 @@ def partition_disk(disk: str, swap_size: int, reserve: int = 0) -> None:
|
|||||||
|
|
||||||
if reserve > 0:
|
if reserve > 0:
|
||||||
msg = f"Creating swap partition on {disk=} with size {swap_size=}GiB and reserve {reserve=}GiB"
|
msg = f"Creating swap partition on {disk=} with size {swap_size=}GiB and reserve {reserve=}GiB"
|
||||||
logging.info(msg)
|
logger.info(msg)
|
||||||
|
|
||||||
swap_start = swap_size + reserve
|
swap_start = swap_size + reserve
|
||||||
swap_partition = f"mkpart swap -{swap_start}GiB -{reserve}GiB "
|
swap_partition = f"mkpart swap -{swap_start}GiB -{reserve}GiB "
|
||||||
else:
|
else:
|
||||||
logging.info(f"Creating swap partition on {disk=} with size {swap_size=}GiB")
|
logger.info(f"Creating swap partition on {disk=} with size {swap_size=}GiB")
|
||||||
swap_start = swap_size
|
swap_start = swap_size
|
||||||
swap_partition = f"mkpart swap -{swap_start}GiB 100% "
|
swap_partition = f"mkpart swap -{swap_start}GiB 100% "
|
||||||
|
|
||||||
logging.debug(f"{swap_partition=}")
|
logger.debug(f"{swap_partition=}")
|
||||||
|
|
||||||
create_partitions = (
|
create_partitions = (
|
||||||
f"parted --script --align=optimal {disk} -- "
|
f"parted --script --align=optimal {disk} -- "
|
||||||
@@ -79,7 +81,7 @@ def partition_disk(disk: str, swap_size: int, reserve: int = 0) -> None:
|
|||||||
)
|
)
|
||||||
bash_wrapper(create_partitions)
|
bash_wrapper(create_partitions)
|
||||||
|
|
||||||
logging.info(f"{disk=} successfully partitioned")
|
logger.info(f"{disk=} successfully partitioned")
|
||||||
|
|
||||||
|
|
||||||
def create_zfs_pool(pool_disks: Sequence[str], mnt_dir: str) -> None:
|
def create_zfs_pool(pool_disks: Sequence[str], mnt_dir: str) -> None:
|
||||||
@@ -118,7 +120,7 @@ def create_zfs_pool(pool_disks: Sequence[str], mnt_dir: str) -> None:
|
|||||||
bash_wrapper(zpool_create)
|
bash_wrapper(zpool_create)
|
||||||
zpools = bash_wrapper("zpool list -o name")
|
zpools = bash_wrapper("zpool list -o name")
|
||||||
if "root_pool" not in zpools.splitlines():
|
if "root_pool" not in zpools.splitlines():
|
||||||
logging.critical("Failed to create root_pool")
|
logger.critical("Failed to create root_pool")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
@@ -138,7 +140,7 @@ def create_zfs_datasets() -> None:
|
|||||||
}
|
}
|
||||||
missing_datasets = expected_datasets.difference(datasets.splitlines())
|
missing_datasets = expected_datasets.difference(datasets.splitlines())
|
||||||
if missing_datasets:
|
if missing_datasets:
|
||||||
logging.critical(f"Failed to create pools {missing_datasets}")
|
logger.critical(f"Failed to create pools {missing_datasets}")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
@@ -246,7 +248,7 @@ def installer(
|
|||||||
encrypt_key: str | None,
|
encrypt_key: str | None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Main."""
|
"""Main."""
|
||||||
logging.info("Starting installation")
|
logger.info("Starting installation")
|
||||||
|
|
||||||
for disk in disks:
|
for disk in disks:
|
||||||
partition_disk(disk, swap_size, reserve)
|
partition_disk(disk, swap_size, reserve)
|
||||||
@@ -275,7 +277,7 @@ def installer(
|
|||||||
|
|
||||||
install_nixos(mnt_dir, disks, encrypt_key)
|
install_nixos(mnt_dir, disks, encrypt_key)
|
||||||
|
|
||||||
logging.info("Installation complete")
|
logger.info("Installation complete")
|
||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
def main() -> None:
|
||||||
@@ -286,11 +288,11 @@ def main() -> None:
|
|||||||
|
|
||||||
encrypt_key = getenv("ENCRYPT_KEY")
|
encrypt_key = getenv("ENCRYPT_KEY")
|
||||||
|
|
||||||
logging.info("installing_nixos")
|
logger.info("installing_nixos")
|
||||||
logging.info(f"disks: {state.selected_device_ids}")
|
logger.info(f"disks: {state.selected_device_ids}")
|
||||||
logging.info(f"swap_size: {state.swap_size}")
|
logger.info(f"swap_size: {state.swap_size}")
|
||||||
logging.info(f"reserve: {state.reserve_size}")
|
logger.info(f"reserve: {state.reserve_size}")
|
||||||
logging.info(f"encrypted: {bool(encrypt_key)}")
|
logger.info(f"encrypted: {bool(encrypt_key)}")
|
||||||
|
|
||||||
sleep(3)
|
sleep(3)
|
||||||
|
|
||||||
|
|||||||
@@ -7,6 +7,8 @@ import logging
|
|||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from subprocess import PIPE, Popen
|
from subprocess import PIPE, Popen
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def bash_wrapper(command: str) -> str:
|
def bash_wrapper(command: str) -> str:
|
||||||
"""Execute a bash command and capture the output.
|
"""Execute a bash command and capture the output.
|
||||||
@@ -18,7 +20,7 @@ def bash_wrapper(command: str) -> str:
|
|||||||
Tuple[str, int]: A tuple containing the output of the command (stdout) as a string,
|
Tuple[str, int]: A tuple containing the output of the command (stdout) as a string,
|
||||||
the error output (stderr) as a string (optional), and the return code as an integer.
|
the error output (stderr) as a string (optional), and the return code as an integer.
|
||||||
"""
|
"""
|
||||||
logging.debug(f"running {command=}")
|
logger.debug(f"running {command=}")
|
||||||
# This is a acceptable risk
|
# This is a acceptable risk
|
||||||
process = Popen(command.split(), stdout=PIPE, stderr=PIPE)
|
process = Popen(command.split(), stdout=PIPE, stderr=PIPE)
|
||||||
output, _ = process.communicate()
|
output, _ = process.communicate()
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import logging
|
|||||||
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
|
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from multiprocessing import cpu_count
|
from multiprocessing import cpu_count
|
||||||
from typing import TYPE_CHECKING, Any, Generic, Literal, TypeVar
|
from typing import TYPE_CHECKING, Any, Literal, TypeVar
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from collections.abc import Callable, Mapping, Sequence
|
from collections.abc import Callable, Mapping, Sequence
|
||||||
@@ -19,7 +19,7 @@ modes = Literal["normal", "early_error"]
|
|||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class ExecutorResults(Generic[R]):
|
class ExecutorResults[R]:
|
||||||
"""Dataclass to store the results and exceptions of the parallel execution."""
|
"""Dataclass to store the results and exceptions of the parallel execution."""
|
||||||
|
|
||||||
results: list[R]
|
results: list[R]
|
||||||
@@ -30,7 +30,7 @@ class ExecutorResults(Generic[R]):
|
|||||||
return f"results={self.results} exceptions={self.exceptions}"
|
return f"results={self.results} exceptions={self.exceptions}"
|
||||||
|
|
||||||
|
|
||||||
def _parallelize_base(
|
def _parallelize_base[R](
|
||||||
executor_type: type[ThreadPoolExecutor | ProcessPoolExecutor],
|
executor_type: type[ThreadPoolExecutor | ProcessPoolExecutor],
|
||||||
func: Callable[..., R],
|
func: Callable[..., R],
|
||||||
kwargs_list: Sequence[Mapping[str, Any]],
|
kwargs_list: Sequence[Mapping[str, Any]],
|
||||||
@@ -62,7 +62,7 @@ def _parallelize_base(
|
|||||||
return ExecutorResults(results, exceptions)
|
return ExecutorResults(results, exceptions)
|
||||||
|
|
||||||
|
|
||||||
def parallelize_thread(
|
def parallelize_thread[R](
|
||||||
func: Callable[..., R],
|
func: Callable[..., R],
|
||||||
kwargs_list: Sequence[Mapping[str, Any]],
|
kwargs_list: Sequence[Mapping[str, Any]],
|
||||||
max_workers: int | None = None,
|
max_workers: int | None = None,
|
||||||
@@ -91,7 +91,7 @@ def parallelize_thread(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def parallelize_process(
|
def parallelize_process[R](
|
||||||
func: Callable[..., R],
|
func: Callable[..., R],
|
||||||
kwargs_list: Sequence[Mapping[str, Any]],
|
kwargs_list: Sequence[Mapping[str, Any]],
|
||||||
max_workers: int | None = None,
|
max_workers: int | None = None,
|
||||||
@@ -123,7 +123,7 @@ def parallelize_process(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def process_executor_unchecked(
|
def process_executor_unchecked[R](
|
||||||
func: Callable[..., R],
|
func: Callable[..., R],
|
||||||
kwargs_list: Sequence[Mapping[str, Any]],
|
kwargs_list: Sequence[Mapping[str, Any]],
|
||||||
max_workers: int | None,
|
max_workers: int | None,
|
||||||
|
|||||||
Reference in New Issue
Block a user