fixed most ruff error

This commit is contained in:
2025-10-30 19:41:43 -04:00
parent 65fca5c8a4
commit 6a09bc66b6
11 changed files with 99 additions and 124 deletions

View File

@@ -11,6 +11,8 @@ from typing import TYPE_CHECKING, Any, Generic, Literal, TypeVar
if TYPE_CHECKING:
from collections.abc import Callable, Mapping, Sequence
logger = logging.getLogger(__name__)
R = TypeVar("R")
modes = Literal["normal", "early_error"]
@@ -45,7 +47,7 @@ def _parallelize_base(
exceptions = []
for index, future in enumerate(futures, 1):
if exception := future.exception():
logging.error(f"{future} raised {exception.__class__.__name__}")
logger.error(f"{future} raised {exception.__class__.__name__}")
exceptions.append(exception)
if mode == "early_error":
executor.shutdown(wait=False)
@@ -55,7 +57,7 @@ def _parallelize_base(
results.append(future.result())
if progress_tracker and index % progress_tracker == 0:
logging.info(f"Progress: {index}/{total_work}")
logger.info(f"Progress: {index}/{total_work}")
return ExecutorResults(results, exceptions)

View File

@@ -0,0 +1 @@
"""init."""

View File

@@ -1,20 +1,25 @@
def calculate_capacitor_capacity(voltage, farads):
"""capasitor."""
def calculate_capacitor_capacity(voltage: float, farads: float) -> float:
"""Calculate capacitor capacity."""
joules = (farads * voltage**2) // 2
return joules // 3600
def calculate_pack_capacity(cells, cell_voltage, farads):
def calculate_pack_capacity(cells: int, cell_voltage: float, farads: float) -> float:
"""Calculate pack capacity."""
return calculate_capacitor_capacity(cells * cell_voltage, farads / cells)
def calculate_pack_capacity2(cells, cell_voltage, farads, cell_cost):
capacitor_capacity = calculate_capacitor_capacity(
cells * cell_voltage, farads / cells
)
def calculate_pack_capacity2(cells: int, cell_voltage: float, farads: float, cell_cost: float) -> tuple[float, float]:
"""Calculate pack capacity."""
capacitor_capacity = calculate_capacitor_capacity(cells * cell_voltage, farads / cells)
return capacitor_capacity, cell_cost * cells
def main():
def main() -> None:
"""Main."""
watt_hours = calculate_pack_capacity(cells=10, cell_voltage=2.7, farads=500)
print(f"{watt_hours=}")
print(f"{watt_hours*16=}")

View File

@@ -1,19 +0,0 @@
def dc_charger_on(
battery_max_kwh: float,
battery_current_kwh: float,
solar_max_kwh: float,
daily_power_kwh: float,
night: bool,
) -> bool:
battery_free_kwh = battery_max_kwh - battery_current_kwh
if daily_power_kwh <= battery_current_kwh or night:
return True
if battery_current_kwh >= battery_max_kwh:
return False
if solar_max_kwh >= battery_free_kwh:
return False
return True

View File

@@ -1,4 +1,12 @@
def caculat_batry_specs(cell_amp_hour, cell_voltage, cells_per_pack, packs):
"""thing."""
def caculat_batry_specs(
cell_amp_hour: int,
cell_voltage: float,
cells_per_pack: int,
packs: int,
) -> tuple[float, float]:
"""Caculat battry specs."""
pack_voltage = cell_voltage * cells_per_pack
pack_watt_hours = pack_voltage * cell_amp_hour
@@ -14,55 +22,3 @@ battry_capacity, pack_voltage = caculat_batry_specs(300, 3.2, 8, 2)
print(f"{battry_capacity=} {pack_voltage=}")
cost = 1700
print(f"$/kWh {cost / battry_capacity}")
battry_capacity, pack_voltage = caculat_batry_specs(300, 3.2, 8, 4)
print(f"{battry_capacity=} {pack_voltage=}")
cost = 3300
print(f"$/kWh {cost / battry_capacity}")
3300/32
battry_capacity, pack_voltage = caculat_batry_specs(600, 12.8, 2, 1)
print(f"{battry_capacity=} {pack_voltage=}")
cost = (740 * 2)
print({f"{cost=}"})
print(f"$/kWh {cost / battry_capacity}")
battry_capacity, pack_voltage = caculat_batry_specs(300, 12.8, 2, 2)
print(f"{battry_capacity=} {pack_voltage=}")
cost = 330 * 4
print({f"{cost=}"})
print(f"$/kWh {cost / battry_capacity}")
print("a")
battry_capacity, pack_voltage = caculat_batry_specs(280, 3.2, 8, 1)
print(f"{battry_capacity=} {pack_voltage=}")
cost = 130 * 8
print({f"{cost=}"})
print(f"$/kWh {cost / battry_capacity}")
battry_capacity, pack_voltage = caculat_batry_specs(200, 48, 1, 1)
print(f"{battry_capacity=} {pack_voltage=}")
cost = 2060
print({f"{cost=}"})
print(f"$/kWh {cost / battry_capacity}")
battry_capacity, pack_voltage = caculat_batry_specs(600, 12, 2, 1)
print(f"{battry_capacity=} {pack_voltage=}")
cost = 740 * 2
print({f"{cost=}"})
print(f"$/kWh {cost / battry_capacity}")
battry_capacity, pack_voltage = caculat_batry_specs(400, 12, 2, 1)
print(f"{battry_capacity=} {pack_voltage=}")
cost = 590 * 2
print({f"{cost=}"})
print(f"$/kWh {cost / battry_capacity}")
battry_capacity, pack_voltage = caculat_batry_specs(100, 3.2, 8, 4)
print(f"{battry_capacity=} {pack_voltage=}")
cost = 880 * 2
print({f"{cost=}"})
print(f"$/kWh {cost / battry_capacity}")

View File

@@ -1,24 +1,26 @@
from enum import Enum
"""voltage_drop."""
import math
from enum import Enum
class TemperatureUnit(Enum):
"""Temperature unit."""
CELSIUS = "c"
FAHRENHEIT = "f"
KELVIN = "k"
class Temperature:
"""Temperature."""
def __init__(
self,
temperature: float,
unit: TemperatureUnit = TemperatureUnit.CELSIUS,
) -> None:
"""
Args:
temperature (float): Temperature in degrees Celsius
unit (TemperatureUnit, optional): Temperature unit. Defaults to TemperatureUnit.CELSIUS.
"""
"""__init__."""
unit_modifier = {
TemperatureUnit.CELSIUS: 1,
TemperatureUnit.FAHRENHEIT: 0.5556,
@@ -27,17 +29,23 @@ class Temperature:
self.temperature = temperature * unit_modifier[unit]
def __float__(self) -> float:
"""Return the temperature in degrees Celsius."""
return self.temperature
class LengthUnit(Enum):
"""Length unit."""
METERS = "m"
FEET = "ft"
INCHES = "in"
class Length:
def __init__(self, length: float, unit: LengthUnit):
"""Length."""
def __init__(self, length: float, unit: LengthUnit) -> None:
"""__init__."""
self.meters = self._convert_to_meters(length, unit)
def _convert_to_meters(self, length: float, unit: LengthUnit) -> float:
@@ -49,16 +57,21 @@ class Length:
test = thing.get(unit)
if test:
return length * test
raise ValueError(f"Unsupported unit: {unit}")
error = f"Unsupported unit: {unit}"
raise ValueError(error)
def __float__(self):
def __float__(self) -> float:
"""Return the length in meters."""
return self.meters
def feet(self) -> float:
"""Return the length in feet."""
return self.meters * 3.2808
class MaterialType(Enum):
"""Material type."""
COPPER = "copper"
ALUMINUM = "aluminum"
CCA = "cca"
@@ -68,8 +81,11 @@ class MaterialType(Enum):
def get_material_resistivity(
material: MaterialType,
temperature: Temperature = Temperature(20.0),
temperature: Temperature | None = None,
) -> float:
"""Get the resistivity of a material."""
if not temperature:
Temperature(20.0)
material_info = {
MaterialType.COPPER: (1.724e-8, 0.00393),
MaterialType.ALUMINUM: (2.908e-8, 0.00403),
@@ -83,14 +99,7 @@ def get_material_resistivity(
def calculate_awg_diameter_mm(gauge: int) -> float:
"""
Calculate wire diameter in millimeters for a given AWG gauge.
Formula: diameter = 0.127 * 92^((36-gauge)/39)
Where:
- 0.127mm is the diameter of AWG 36
- 92 is the ratio of diameters between AWG 0000 and AWG 36
- 39 is the number of steps between AWG 0000 and AWG 36
"""
"""Calculate wire diameter in millimeters for a given AWG gauge."""
return round(0.127 * 92 ** ((36 - gauge) / 39), 3)
@@ -124,6 +133,17 @@ def voltage_drop(
length: Length,
current_a: float,
) -> float:
"""Calculate the voltage drop of a wire.
Args:
gauge (int): The AWG (American Wire Gauge) number of the wire
material (MaterialType): The type of conductor material (e.g., copper, aluminum)
length (Length): The length of the wire in meters
current_a (float): The current flowing through the wire in amperes
Returns:
float: The voltage drop of the wire in volts
"""
resistivity = get_material_resistivity(material)
resistance_per_meter = resistivity / calculate_wire_area_m2(gauge)
total_resistance = resistance_per_meter * float(length) * 2 # round-trip
@@ -145,7 +165,7 @@ def max_wire_length(
material: MaterialType,
current_amps: float,
voltage_drop: float = 0.3,
temperature: Temperature = Temperature(100.0, unit=TemperatureUnit.FAHRENHEIT),
temperature: Temperature | None = None,
) -> Length:
"""Calculate the maximum allowable wire length based on voltage drop criteria.
@@ -154,10 +174,14 @@ def max_wire_length(
material (MaterialType): The type of conductor material (e.g., copper, aluminum)
current_amps (float): The current flowing through the wire in amperes
voltage_drop (float, optional): Maximum allowable voltage drop as a decimal (default 0.1 or 10%)
temperature (Temperature | None, optional): The temperature of the wire. Defaults to None.
Returns:
float: Maximum wire length in meters that maintains the specified voltage drop
"""
if not temperature:
Temperature(100.0, unit=TemperatureUnit.FAHRENHEIT)
resistivity = get_material_resistivity(material, temperature)
resistance_per_meter = resistivity / calculate_wire_area_m2(gauge)
# V = IR, solve for length where V is the allowed voltage drop

View File

@@ -14,6 +14,8 @@ from python.zfs import Zpool
if TYPE_CHECKING:
from collections.abc import Sequence
logger = logging.getLogger(__name__)
def zpool_tests(pool_names: Sequence[str], zpool_capacity_threshold: int = 90) -> list[str] | None:
"""Check the zpool health and capacity.
@@ -25,7 +27,7 @@ def zpool_tests(pool_names: Sequence[str], zpool_capacity_threshold: int = 90) -
Returns:
list[str] | None: A list of errors if any.
"""
logging.info("Testing zpool")
logger.info("Testing zpool")
errors: list[str] = []
for pool_name in pool_names:
@@ -63,7 +65,7 @@ def systemd_tests(
Returns:
list[str] | None: A list of errors if any.
"""
logging.info("Testing systemd service")
logger.info("Testing systemd service")
max_retries = max(max_retries, 1)
retry_delay_secs = max(retry_delay_secs, 1)
@@ -81,7 +83,7 @@ def systemd_tests(
for retry in range(max_retries):
if not service_names_set:
break
logging.info(f"Testing systemd service in {retry + 1} of {max_retries}")
logger.info(f"Testing systemd service in {retry + 1} of {max_retries}")
service_names_to_test = copy(service_names_set)
for service_name in service_names_to_test:
service_status, _ = bash_wrapper(f"systemctl is-active {service_name}")

View File

@@ -12,6 +12,8 @@ import typer
from python.common import configure_logger, signal_alert
from python.system_tests.components import systemd_tests, zpool_tests
logger = logging.getLogger(__name__)
def load_config_data(config_file: Path) -> dict[str, list[str]]:
"""Load a TOML configuration file.
@@ -30,7 +32,7 @@ def main(config_file: Path) -> None:
configure_logger(level=environ.get("LOG_LEVEL", "INFO"))
server_name = gethostname()
logging.info(f"Starting {server_name} validation")
logger.info(f"Starting {server_name} validation")
config_data = load_config_data(config_file)
@@ -43,16 +45,16 @@ def main(config_file: Path) -> None:
errors.extend(systemd_errors)
except Exception as error:
logging.exception(f"{server_name} validation failed")
logger.exception(f"{server_name} validation failed")
errors.append(f"{server_name} validation failed: {error}")
if errors:
logging.error(f"{server_name} validation failed: \n{'\n'.join(errors)}")
logger.error(f"{server_name} validation failed: \n{'\n'.join(errors)}")
signal_alert(f"{server_name} validation failed {errors}")
sys.exit(1)
logging.info(f"{server_name} validation passed")
logger.info(f"{server_name} validation passed")
def cli() -> None:

View File

@@ -12,35 +12,36 @@ from re import search
import typer
from python.common import configure_logger, signal_alert
from python.common import utcnow
from python.common import configure_logger, signal_alert, utcnow
from python.zfs import Dataset, get_datasets
logger = logging.getLogger(__name__)
def main(config_file: Path) -> None:
"""Main."""
configure_logger(level="DEBUG")
logging.info("Starting snapshot_manager")
logger.info("Starting snapshot_manager")
try:
time_stamp = get_time_stamp()
for dataset in get_datasets():
status = dataset.create_snapshot(time_stamp)
logging.debug(f"{status=}")
logger.debug(f"{status=}")
if status != "snapshot created":
msg = f"{dataset.name} failed to create snapshot {time_stamp}"
logging.error(msg)
logger.error(msg)
signal_alert(msg)
continue
get_snapshots_to_delete(dataset, get_count_lookup(config_file, dataset.name))
except Exception:
logging.exception("snapshot_manager failed")
logger.exception("snapshot_manager failed")
signal_alert("snapshot_manager failed")
sys.exit(1)
else:
logging.info("snapshot_manager completed")
logger.info("snapshot_manager completed")
def get_count_lookup(config_file: Path, dataset_name: str) -> dict[str, int]:
@@ -99,7 +100,7 @@ def get_snapshots_to_delete(
snapshots = dataset.get_snapshots()
if not snapshots:
logging.info(f"{dataset.name} has no snapshots")
logger.info(f"{dataset.name} has no snapshots")
return
filters = (
@@ -110,21 +111,21 @@ def get_snapshots_to_delete(
)
for filter_name, snapshot_filter in filters:
logging.debug(f"{filter_name=}\n{snapshot_filter=}")
logger.debug(f"{filter_name=}\n{snapshot_filter=}")
filtered_snapshots = sorted(snapshot.name for snapshot in snapshots if search(snapshot_filter, snapshot.name))
logging.debug(f"{filtered_snapshots=}")
logger.debug(f"{filtered_snapshots=}")
snapshots_wanted = count_lookup[filter_name]
snapshots_being_deleted = filtered_snapshots[:-snapshots_wanted] if snapshots_wanted > 0 else filtered_snapshots
logging.info(f"{snapshots_being_deleted} are being deleted")
logger.info(f"{snapshots_being_deleted} are being deleted")
for snapshot in snapshots_being_deleted:
if error := dataset.delete_snapshot(snapshot):
error_message = f"{dataset.name}@{snapshot} failed to delete: {error}"
signal_alert(error_message)
logging.error(error_message)
logger.error(error_message)
def get_time_stamp() -> str:

View File

@@ -9,6 +9,8 @@ from typing import Any
from python.common import bash_wrapper
logger = logging.getLogger(__name__)
def _zfs_list(zfs_list: str) -> dict[str, Any]:
"""Check the version of zfs."""
@@ -118,7 +120,7 @@ class Dataset:
Args:
snapshot_name (str): a snapshot name
"""
logging.debug(f"Creating {self.name}@{snapshot_name}")
logger.debug(f"Creating {self.name}@{snapshot_name}")
_, return_code = bash_wrapper(f"zfs snapshot {self.name}@{snapshot_name}")
if return_code == 0:
return "snapshot created"
@@ -136,7 +138,7 @@ class Dataset:
Args:
snapshot_name (str): a snapshot name
"""
logging.debug(f"deleting {self.name}@{snapshot_name}")
logger.debug(f"deleting {self.name}@{snapshot_name}")
msg, return_code = bash_wrapper(f"zfs destroy {self.name}@{snapshot_name}")
if return_code != 0:
if msg.startswith(f"cannot destroy '{self.name}@{snapshot_name}': snapshot has dependent clones"):
@@ -203,7 +205,7 @@ def get_datasets() -> list[Dataset]:
Returns:
list[Dataset]: A list of zfs datasets.
"""
logging.info("Getting zfs list")
logger.info("Getting zfs list")
dataset_names, _ = bash_wrapper("zfs list -Hp -t filesystem -o name")