From 6a09bc66b604b93ac63b8855084d48b0bc89ddf8 Mon Sep 17 00:00:00 2001 From: Richie Cahill Date: Thu, 30 Oct 2025 19:41:43 -0400 Subject: [PATCH] fixed most ruff error --- python/parallelize.py | 6 ++- python/random/__init__.py | 1 + python/random/capasitor.py | 19 +++++--- python/random/dc_charger.py | 19 -------- python/random/thing.py | 62 ++++---------------------- python/random/voltage_drop.py | 62 ++++++++++++++++++-------- python/system_tests/components.py | 8 ++-- python/system_tests/validate_system.py | 10 +++-- python/tools/snapshot_manager.py | 25 ++++++----- python/zfs/dataset.py | 8 ++-- tests/test_parallelize.py | 3 +- 11 files changed, 99 insertions(+), 124 deletions(-) create mode 100644 python/random/__init__.py delete mode 100644 python/random/dc_charger.py diff --git a/python/parallelize.py b/python/parallelize.py index 4922fac..99d990f 100644 --- a/python/parallelize.py +++ b/python/parallelize.py @@ -11,6 +11,8 @@ from typing import TYPE_CHECKING, Any, Generic, Literal, TypeVar if TYPE_CHECKING: from collections.abc import Callable, Mapping, Sequence +logger = logging.getLogger(__name__) + R = TypeVar("R") modes = Literal["normal", "early_error"] @@ -45,7 +47,7 @@ def _parallelize_base( exceptions = [] for index, future in enumerate(futures, 1): if exception := future.exception(): - logging.error(f"{future} raised {exception.__class__.__name__}") + logger.error(f"{future} raised {exception.__class__.__name__}") exceptions.append(exception) if mode == "early_error": executor.shutdown(wait=False) @@ -55,7 +57,7 @@ def _parallelize_base( results.append(future.result()) if progress_tracker and index % progress_tracker == 0: - logging.info(f"Progress: {index}/{total_work}") + logger.info(f"Progress: {index}/{total_work}") return ExecutorResults(results, exceptions) diff --git a/python/random/__init__.py b/python/random/__init__.py new file mode 100644 index 0000000..525291c --- /dev/null +++ b/python/random/__init__.py @@ -0,0 +1 @@ +"""init.""" diff --git a/python/random/capasitor.py b/python/random/capasitor.py index 1b3eb30..74ca426 100644 --- a/python/random/capasitor.py +++ b/python/random/capasitor.py @@ -1,20 +1,25 @@ -def calculate_capacitor_capacity(voltage, farads): +"""capasitor.""" + + +def calculate_capacitor_capacity(voltage: float, farads: float) -> float: + """Calculate capacitor capacity.""" joules = (farads * voltage**2) // 2 return joules // 3600 -def calculate_pack_capacity(cells, cell_voltage, farads): +def calculate_pack_capacity(cells: int, cell_voltage: float, farads: float) -> float: + """Calculate pack capacity.""" return calculate_capacitor_capacity(cells * cell_voltage, farads / cells) -def calculate_pack_capacity2(cells, cell_voltage, farads, cell_cost): - capacitor_capacity = calculate_capacitor_capacity( - cells * cell_voltage, farads / cells - ) +def calculate_pack_capacity2(cells: int, cell_voltage: float, farads: float, cell_cost: float) -> tuple[float, float]: + """Calculate pack capacity.""" + capacitor_capacity = calculate_capacitor_capacity(cells * cell_voltage, farads / cells) return capacitor_capacity, cell_cost * cells -def main(): +def main() -> None: + """Main.""" watt_hours = calculate_pack_capacity(cells=10, cell_voltage=2.7, farads=500) print(f"{watt_hours=}") print(f"{watt_hours*16=}") diff --git a/python/random/dc_charger.py b/python/random/dc_charger.py deleted file mode 100644 index 079affe..0000000 --- a/python/random/dc_charger.py +++ /dev/null @@ -1,19 +0,0 @@ -def dc_charger_on( - battery_max_kwh: float, - battery_current_kwh: float, - solar_max_kwh: float, - daily_power_kwh: float, - night: bool, -) -> bool: - battery_free_kwh = battery_max_kwh - battery_current_kwh - - if daily_power_kwh <= battery_current_kwh or night: - return True - - if battery_current_kwh >= battery_max_kwh: - return False - - if solar_max_kwh >= battery_free_kwh: - return False - - return True diff --git a/python/random/thing.py b/python/random/thing.py index 1c30c71..f493877 100644 --- a/python/random/thing.py +++ b/python/random/thing.py @@ -1,4 +1,12 @@ -def caculat_batry_specs(cell_amp_hour, cell_voltage, cells_per_pack, packs): +"""thing.""" + +def caculat_batry_specs( + cell_amp_hour: int, + cell_voltage: float, + cells_per_pack: int, + packs: int, +) -> tuple[float, float]: + """Caculat battry specs.""" pack_voltage = cell_voltage * cells_per_pack pack_watt_hours = pack_voltage * cell_amp_hour @@ -14,55 +22,3 @@ battry_capacity, pack_voltage = caculat_batry_specs(300, 3.2, 8, 2) print(f"{battry_capacity=} {pack_voltage=}") cost = 1700 print(f"$/kWh {cost / battry_capacity}") - -battry_capacity, pack_voltage = caculat_batry_specs(300, 3.2, 8, 4) -print(f"{battry_capacity=} {pack_voltage=}") -cost = 3300 -print(f"$/kWh {cost / battry_capacity}") - - -3300/32 - -battry_capacity, pack_voltage = caculat_batry_specs(600, 12.8, 2, 1) -print(f"{battry_capacity=} {pack_voltage=}") -cost = (740 * 2) -print({f"{cost=}"}) -print(f"$/kWh {cost / battry_capacity}") - -battry_capacity, pack_voltage = caculat_batry_specs(300, 12.8, 2, 2) -print(f"{battry_capacity=} {pack_voltage=}") -cost = 330 * 4 -print({f"{cost=}"}) -print(f"$/kWh {cost / battry_capacity}") -print("a") - -battry_capacity, pack_voltage = caculat_batry_specs(280, 3.2, 8, 1) -print(f"{battry_capacity=} {pack_voltage=}") -cost = 130 * 8 -print({f"{cost=}"}) -print(f"$/kWh {cost / battry_capacity}") - -battry_capacity, pack_voltage = caculat_batry_specs(200, 48, 1, 1) -print(f"{battry_capacity=} {pack_voltage=}") -cost = 2060 -print({f"{cost=}"}) -print(f"$/kWh {cost / battry_capacity}") - -battry_capacity, pack_voltage = caculat_batry_specs(600, 12, 2, 1) -print(f"{battry_capacity=} {pack_voltage=}") -cost = 740 * 2 -print({f"{cost=}"}) -print(f"$/kWh {cost / battry_capacity}") - -battry_capacity, pack_voltage = caculat_batry_specs(400, 12, 2, 1) -print(f"{battry_capacity=} {pack_voltage=}") -cost = 590 * 2 -print({f"{cost=}"}) -print(f"$/kWh {cost / battry_capacity}") - -battry_capacity, pack_voltage = caculat_batry_specs(100, 3.2, 8, 4) -print(f"{battry_capacity=} {pack_voltage=}") -cost = 880 * 2 -print({f"{cost=}"}) -print(f"$/kWh {cost / battry_capacity}") - diff --git a/python/random/voltage_drop.py b/python/random/voltage_drop.py index 14ba028..7e7ef42 100644 --- a/python/random/voltage_drop.py +++ b/python/random/voltage_drop.py @@ -1,24 +1,26 @@ -from enum import Enum +"""voltage_drop.""" + import math +from enum import Enum class TemperatureUnit(Enum): + """Temperature unit.""" + CELSIUS = "c" FAHRENHEIT = "f" KELVIN = "k" class Temperature: + """Temperature.""" + def __init__( self, temperature: float, unit: TemperatureUnit = TemperatureUnit.CELSIUS, ) -> None: - """ - Args: - temperature (float): Temperature in degrees Celsius - unit (TemperatureUnit, optional): Temperature unit. Defaults to TemperatureUnit.CELSIUS. - """ + """__init__.""" unit_modifier = { TemperatureUnit.CELSIUS: 1, TemperatureUnit.FAHRENHEIT: 0.5556, @@ -27,17 +29,23 @@ class Temperature: self.temperature = temperature * unit_modifier[unit] def __float__(self) -> float: + """Return the temperature in degrees Celsius.""" return self.temperature class LengthUnit(Enum): + """Length unit.""" + METERS = "m" FEET = "ft" INCHES = "in" class Length: - def __init__(self, length: float, unit: LengthUnit): + """Length.""" + + def __init__(self, length: float, unit: LengthUnit) -> None: + """__init__.""" self.meters = self._convert_to_meters(length, unit) def _convert_to_meters(self, length: float, unit: LengthUnit) -> float: @@ -49,16 +57,21 @@ class Length: test = thing.get(unit) if test: return length * test - raise ValueError(f"Unsupported unit: {unit}") + error = f"Unsupported unit: {unit}" + raise ValueError(error) - def __float__(self): + def __float__(self) -> float: + """Return the length in meters.""" return self.meters def feet(self) -> float: + """Return the length in feet.""" return self.meters * 3.2808 class MaterialType(Enum): + """Material type.""" + COPPER = "copper" ALUMINUM = "aluminum" CCA = "cca" @@ -68,8 +81,11 @@ class MaterialType(Enum): def get_material_resistivity( material: MaterialType, - temperature: Temperature = Temperature(20.0), + temperature: Temperature | None = None, ) -> float: + """Get the resistivity of a material.""" + if not temperature: + Temperature(20.0) material_info = { MaterialType.COPPER: (1.724e-8, 0.00393), MaterialType.ALUMINUM: (2.908e-8, 0.00403), @@ -83,14 +99,7 @@ def get_material_resistivity( def calculate_awg_diameter_mm(gauge: int) -> float: - """ - Calculate wire diameter in millimeters for a given AWG gauge. - Formula: diameter = 0.127 * 92^((36-gauge)/39) - Where: - - 0.127mm is the diameter of AWG 36 - - 92 is the ratio of diameters between AWG 0000 and AWG 36 - - 39 is the number of steps between AWG 0000 and AWG 36 - """ + """Calculate wire diameter in millimeters for a given AWG gauge.""" return round(0.127 * 92 ** ((36 - gauge) / 39), 3) @@ -124,6 +133,17 @@ def voltage_drop( length: Length, current_a: float, ) -> float: + """Calculate the voltage drop of a wire. + + Args: + gauge (int): The AWG (American Wire Gauge) number of the wire + material (MaterialType): The type of conductor material (e.g., copper, aluminum) + length (Length): The length of the wire in meters + current_a (float): The current flowing through the wire in amperes + + Returns: + float: The voltage drop of the wire in volts + """ resistivity = get_material_resistivity(material) resistance_per_meter = resistivity / calculate_wire_area_m2(gauge) total_resistance = resistance_per_meter * float(length) * 2 # round-trip @@ -145,7 +165,7 @@ def max_wire_length( material: MaterialType, current_amps: float, voltage_drop: float = 0.3, - temperature: Temperature = Temperature(100.0, unit=TemperatureUnit.FAHRENHEIT), + temperature: Temperature | None = None, ) -> Length: """Calculate the maximum allowable wire length based on voltage drop criteria. @@ -154,10 +174,14 @@ def max_wire_length( material (MaterialType): The type of conductor material (e.g., copper, aluminum) current_amps (float): The current flowing through the wire in amperes voltage_drop (float, optional): Maximum allowable voltage drop as a decimal (default 0.1 or 10%) + temperature (Temperature | None, optional): The temperature of the wire. Defaults to None. Returns: float: Maximum wire length in meters that maintains the specified voltage drop """ + if not temperature: + Temperature(100.0, unit=TemperatureUnit.FAHRENHEIT) + resistivity = get_material_resistivity(material, temperature) resistance_per_meter = resistivity / calculate_wire_area_m2(gauge) # V = IR, solve for length where V is the allowed voltage drop diff --git a/python/system_tests/components.py b/python/system_tests/components.py index 416995e..239c997 100644 --- a/python/system_tests/components.py +++ b/python/system_tests/components.py @@ -14,6 +14,8 @@ from python.zfs import Zpool if TYPE_CHECKING: from collections.abc import Sequence +logger = logging.getLogger(__name__) + def zpool_tests(pool_names: Sequence[str], zpool_capacity_threshold: int = 90) -> list[str] | None: """Check the zpool health and capacity. @@ -25,7 +27,7 @@ def zpool_tests(pool_names: Sequence[str], zpool_capacity_threshold: int = 90) - Returns: list[str] | None: A list of errors if any. """ - logging.info("Testing zpool") + logger.info("Testing zpool") errors: list[str] = [] for pool_name in pool_names: @@ -63,7 +65,7 @@ def systemd_tests( Returns: list[str] | None: A list of errors if any. """ - logging.info("Testing systemd service") + logger.info("Testing systemd service") max_retries = max(max_retries, 1) retry_delay_secs = max(retry_delay_secs, 1) @@ -81,7 +83,7 @@ def systemd_tests( for retry in range(max_retries): if not service_names_set: break - logging.info(f"Testing systemd service in {retry + 1} of {max_retries}") + logger.info(f"Testing systemd service in {retry + 1} of {max_retries}") service_names_to_test = copy(service_names_set) for service_name in service_names_to_test: service_status, _ = bash_wrapper(f"systemctl is-active {service_name}") diff --git a/python/system_tests/validate_system.py b/python/system_tests/validate_system.py index 8de5564..547820c 100644 --- a/python/system_tests/validate_system.py +++ b/python/system_tests/validate_system.py @@ -12,6 +12,8 @@ import typer from python.common import configure_logger, signal_alert from python.system_tests.components import systemd_tests, zpool_tests +logger = logging.getLogger(__name__) + def load_config_data(config_file: Path) -> dict[str, list[str]]: """Load a TOML configuration file. @@ -30,7 +32,7 @@ def main(config_file: Path) -> None: configure_logger(level=environ.get("LOG_LEVEL", "INFO")) server_name = gethostname() - logging.info(f"Starting {server_name} validation") + logger.info(f"Starting {server_name} validation") config_data = load_config_data(config_file) @@ -43,16 +45,16 @@ def main(config_file: Path) -> None: errors.extend(systemd_errors) except Exception as error: - logging.exception(f"{server_name} validation failed") + logger.exception(f"{server_name} validation failed") errors.append(f"{server_name} validation failed: {error}") if errors: - logging.error(f"{server_name} validation failed: \n{'\n'.join(errors)}") + logger.error(f"{server_name} validation failed: \n{'\n'.join(errors)}") signal_alert(f"{server_name} validation failed {errors}") sys.exit(1) - logging.info(f"{server_name} validation passed") + logger.info(f"{server_name} validation passed") def cli() -> None: diff --git a/python/tools/snapshot_manager.py b/python/tools/snapshot_manager.py index d00f6b2..f6ce0f0 100644 --- a/python/tools/snapshot_manager.py +++ b/python/tools/snapshot_manager.py @@ -12,35 +12,36 @@ from re import search import typer -from python.common import configure_logger, signal_alert -from python.common import utcnow +from python.common import configure_logger, signal_alert, utcnow from python.zfs import Dataset, get_datasets +logger = logging.getLogger(__name__) + def main(config_file: Path) -> None: """Main.""" configure_logger(level="DEBUG") - logging.info("Starting snapshot_manager") + logger.info("Starting snapshot_manager") try: time_stamp = get_time_stamp() for dataset in get_datasets(): status = dataset.create_snapshot(time_stamp) - logging.debug(f"{status=}") + logger.debug(f"{status=}") if status != "snapshot created": msg = f"{dataset.name} failed to create snapshot {time_stamp}" - logging.error(msg) + logger.error(msg) signal_alert(msg) continue get_snapshots_to_delete(dataset, get_count_lookup(config_file, dataset.name)) except Exception: - logging.exception("snapshot_manager failed") + logger.exception("snapshot_manager failed") signal_alert("snapshot_manager failed") sys.exit(1) else: - logging.info("snapshot_manager completed") + logger.info("snapshot_manager completed") def get_count_lookup(config_file: Path, dataset_name: str) -> dict[str, int]: @@ -99,7 +100,7 @@ def get_snapshots_to_delete( snapshots = dataset.get_snapshots() if not snapshots: - logging.info(f"{dataset.name} has no snapshots") + logger.info(f"{dataset.name} has no snapshots") return filters = ( @@ -110,21 +111,21 @@ def get_snapshots_to_delete( ) for filter_name, snapshot_filter in filters: - logging.debug(f"{filter_name=}\n{snapshot_filter=}") + logger.debug(f"{filter_name=}\n{snapshot_filter=}") filtered_snapshots = sorted(snapshot.name for snapshot in snapshots if search(snapshot_filter, snapshot.name)) - logging.debug(f"{filtered_snapshots=}") + logger.debug(f"{filtered_snapshots=}") snapshots_wanted = count_lookup[filter_name] snapshots_being_deleted = filtered_snapshots[:-snapshots_wanted] if snapshots_wanted > 0 else filtered_snapshots - logging.info(f"{snapshots_being_deleted} are being deleted") + logger.info(f"{snapshots_being_deleted} are being deleted") for snapshot in snapshots_being_deleted: if error := dataset.delete_snapshot(snapshot): error_message = f"{dataset.name}@{snapshot} failed to delete: {error}" signal_alert(error_message) - logging.error(error_message) + logger.error(error_message) def get_time_stamp() -> str: diff --git a/python/zfs/dataset.py b/python/zfs/dataset.py index ec58bbb..35f5689 100644 --- a/python/zfs/dataset.py +++ b/python/zfs/dataset.py @@ -9,6 +9,8 @@ from typing import Any from python.common import bash_wrapper +logger = logging.getLogger(__name__) + def _zfs_list(zfs_list: str) -> dict[str, Any]: """Check the version of zfs.""" @@ -118,7 +120,7 @@ class Dataset: Args: snapshot_name (str): a snapshot name """ - logging.debug(f"Creating {self.name}@{snapshot_name}") + logger.debug(f"Creating {self.name}@{snapshot_name}") _, return_code = bash_wrapper(f"zfs snapshot {self.name}@{snapshot_name}") if return_code == 0: return "snapshot created" @@ -136,7 +138,7 @@ class Dataset: Args: snapshot_name (str): a snapshot name """ - logging.debug(f"deleting {self.name}@{snapshot_name}") + logger.debug(f"deleting {self.name}@{snapshot_name}") msg, return_code = bash_wrapper(f"zfs destroy {self.name}@{snapshot_name}") if return_code != 0: if msg.startswith(f"cannot destroy '{self.name}@{snapshot_name}': snapshot has dependent clones"): @@ -203,7 +205,7 @@ def get_datasets() -> list[Dataset]: Returns: list[Dataset]: A list of zfs datasets. """ - logging.info("Getting zfs list") + logger.info("Getting zfs list") dataset_names, _ = bash_wrapper("zfs list -Hp -t filesystem -o name") diff --git a/tests/test_parallelize.py b/tests/test_parallelize.py index c00619a..8627164 100644 --- a/tests/test_parallelize.py +++ b/tests/test_parallelize.py @@ -91,8 +91,7 @@ def test_parallelize_process() -> None: def test_parallelize_process_to_many_max_workers(mocker: MockerFixture) -> None: """test_parallelize_process.""" - mocker.patch(target="python" - ".parallelize.cpu_count", return_value=1) + mocker.patch(target="python.parallelize.cpu_count", return_value=1) with pytest.raises(RuntimeError, match="max_workers must be less than or equal to 1"): parallelize_process(func=add, kwargs_list=[{"a": 1, "b": 2}], max_workers=8)