added common and paralleize

This commit is contained in:
2025-10-28 19:31:52 -04:00
parent 626bd70d67
commit 4fa2141461
3 changed files with 226 additions and 0 deletions

1
python/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Server Tools."""

72
python/common.py Normal file
View File

@@ -0,0 +1,72 @@
"""common."""
from __future__ import annotations
import logging
import sys
from datetime import UTC, datetime
from os import getenv
from subprocess import PIPE, Popen
from apprise import Apprise
logger = logging.getLogger(__name__)
def configure_logger(level: str = "INFO") -> None:
"""Configure the logger.
Args:
level (str, optional): The logging level. Defaults to "INFO".
"""
logging.basicConfig(
level=level,
datefmt="%Y-%m-%dT%H:%M:%S%z",
format="%(asctime)s %(levelname)s %(filename)s:%(lineno)d - %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
def bash_wrapper(command: str) -> tuple[str, int]:
"""Execute a bash command and capture the output.
Args:
command (str): The bash command to be executed.
Returns:
Tuple[str, int]: A tuple containing the output of the command (stdout) as a string,
the error output (stderr) as a string (optional), and the return code as an integer.
"""
# This is a acceptable risk
process = Popen(command.split(), stdout=PIPE, stderr=PIPE)
output, error = process.communicate()
if error:
logger.error(f"{error=}")
return error.decode(), process.returncode
return output.decode(), process.returncode
def signal_alert(body: str, title: str = "") -> None:
"""Send a signal alert.
Args:
body (str): The body of the alert.
title (str, optional): The title of the alert. Defaults to "".
"""
apprise_client = Apprise()
from_phone = getenv("SIGNAL_ALERT_FROM_PHONE")
to_phone = getenv("SIGNAL_ALERT_TO_PHONE")
if not from_phone or not to_phone:
logger.info("SIGNAL_ALERT_FROM_PHONE or SIGNAL_ALERT_TO_PHONE not set")
return
apprise_client.add(f"signal://localhost:8989/{from_phone}/{to_phone}")
apprise_client.notify(title=title, body=body)
def utcnow() -> datetime:
"""Get the current UTC time."""
return datetime.now(tz=UTC)

153
python/parallelize.py Normal file
View File

@@ -0,0 +1,153 @@
"""Thing."""
from __future__ import annotations
import logging
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from dataclasses import dataclass
from multiprocessing import cpu_count
from typing import TYPE_CHECKING, Any, Generic, Literal, TypeVar
if TYPE_CHECKING:
from collections.abc import Callable, Mapping, Sequence
R = TypeVar("R")
modes = Literal["normal", "early_error"]
@dataclass
class ExecutorResults(Generic[R]):
"""Dataclass to store the results and exceptions of the parallel execution."""
results: list[R]
exceptions: list[BaseException]
def __repr__(self) -> str:
"""Return a string representation of the object."""
return f"results={self.results} exceptions={self.exceptions}"
def _parallelize_base(
executor_type: type[ThreadPoolExecutor | ProcessPoolExecutor],
func: Callable[..., R],
kwargs_list: Sequence[Mapping[str, Any]],
max_workers: int | None,
progress_tracker: int | None,
mode: modes,
) -> ExecutorResults:
total_work = len(kwargs_list)
with executor_type(max_workers=max_workers) as executor:
futures = [executor.submit(func, **kwarg) for kwarg in kwargs_list]
results = []
exceptions = []
for index, future in enumerate(futures, 1):
if exception := future.exception():
logging.error(f"{future} raised {exception.__class__.__name__}")
exceptions.append(exception)
if mode == "early_error":
executor.shutdown(wait=False)
raise exception
continue
results.append(future.result())
if progress_tracker and index % progress_tracker == 0:
logging.info(f"Progress: {index}/{total_work}")
return ExecutorResults(results, exceptions)
def parallelize_thread(
func: Callable[..., R],
kwargs_list: Sequence[Mapping[str, Any]],
max_workers: int | None = None,
progress_tracker: int | None = None,
mode: modes = "normal",
) -> ExecutorResults:
"""Generic function to run a function with multiple arguments in threads.
Args:
func (Callable[..., R]): Function to run in threads.
kwargs_list (Sequence[Mapping[str, Any]]): List of dictionaries with the arguments for the function.
max_workers (int, optional): Number of workers to use. Defaults to 8.
progress_tracker (int, optional): Number of tasks to complete before logging progress.
mode (modes, optional): Mode to use. Defaults to "normal".
Returns:
tuple[list[R], list[Exception]]: List with the results and a list with the exceptions.
"""
return _parallelize_base(
executor_type=ThreadPoolExecutor,
func=func,
kwargs_list=kwargs_list,
max_workers=max_workers,
progress_tracker=progress_tracker,
mode=mode,
)
def parallelize_process(
func: Callable[..., R],
kwargs_list: Sequence[Mapping[str, Any]],
max_workers: int | None = None,
progress_tracker: int | None = None,
mode: modes = "normal",
) -> ExecutorResults:
"""Generic function to run a function with multiple arguments in process.
Args:
func (Callable[..., R]): Function to run in process.
kwargs_list (Sequence[Mapping[str, Any]]): List of dictionaries with the arguments for the function.
max_workers (int, optional): Number of workers to use. Defaults to 4.
progress_tracker (int, optional): Number of tasks to complete before logging progress.
mode (modes, optional): Mode to use. Defaults to "normal".
Returns:
tuple[list[R], list[Exception]]: List with the results and a list with the exceptions.
"""
if max_workers and max_workers > cpu_count():
error = f"max_workers must be less than or equal to {cpu_count()}"
raise RuntimeError(error)
return process_executor_unchecked(
func=func,
kwargs_list=kwargs_list,
max_workers=max_workers,
progress_tracker=progress_tracker,
mode=mode,
)
def process_executor_unchecked(
func: Callable[..., R],
kwargs_list: Sequence[Mapping[str, Any]],
max_workers: int | None,
progress_tracker: int | None,
mode: modes = "normal",
) -> ExecutorResults:
"""Generic function to run a function with multiple arguments in parallel.
Note: this function does not check if the number of workers is greater than the number of CPUs.
This can cause the system to become unresponsive.
Args:
func (Callable[..., R]): Function to run in parallel.
kwargs_list (Sequence[Mapping[str, Any]]): List of dictionaries with the arguments for the function.
max_workers (int, optional): Number of workers to use. Defaults to 8.
progress_tracker (int, optional): Number of tasks to complete before logging progress.
mode (modes, optional): Mode to use. Defaults to "normal".
Returns:
tuple[list[R], list[Exception]]: List with the results and a list with the exceptions.
"""
return _parallelize_base(
executor_type=ProcessPoolExecutor,
func=func,
kwargs_list=kwargs_list,
max_workers=max_workers,
progress_tracker=progress_tracker,
mode=mode,
)