From 4fa21414610ccf9fb6c2f4bc4c6acd6ddb5ade77 Mon Sep 17 00:00:00 2001 From: Richie Cahill Date: Tue, 28 Oct 2025 19:31:52 -0400 Subject: [PATCH] added common and paralleize --- python/__init__.py | 1 + python/common.py | 72 ++++++++++++++++++++ python/parallelize.py | 153 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 226 insertions(+) create mode 100644 python/__init__.py create mode 100644 python/common.py create mode 100644 python/parallelize.py diff --git a/python/__init__.py b/python/__init__.py new file mode 100644 index 0000000..fa22635 --- /dev/null +++ b/python/__init__.py @@ -0,0 +1 @@ +"""Server Tools.""" diff --git a/python/common.py b/python/common.py new file mode 100644 index 0000000..1e717dc --- /dev/null +++ b/python/common.py @@ -0,0 +1,72 @@ +"""common.""" + +from __future__ import annotations + +import logging +import sys +from datetime import UTC, datetime +from os import getenv +from subprocess import PIPE, Popen + +from apprise import Apprise + +logger = logging.getLogger(__name__) + + +def configure_logger(level: str = "INFO") -> None: + """Configure the logger. + + Args: + level (str, optional): The logging level. Defaults to "INFO". + """ + logging.basicConfig( + level=level, + datefmt="%Y-%m-%dT%H:%M:%S%z", + format="%(asctime)s %(levelname)s %(filename)s:%(lineno)d - %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], + ) + + +def bash_wrapper(command: str) -> tuple[str, int]: + """Execute a bash command and capture the output. + + Args: + command (str): The bash command to be executed. + + Returns: + Tuple[str, int]: A tuple containing the output of the command (stdout) as a string, + the error output (stderr) as a string (optional), and the return code as an integer. + """ + # This is a acceptable risk + process = Popen(command.split(), stdout=PIPE, stderr=PIPE) + output, error = process.communicate() + if error: + logger.error(f"{error=}") + return error.decode(), process.returncode + + return output.decode(), process.returncode + + +def signal_alert(body: str, title: str = "") -> None: + """Send a signal alert. + + Args: + body (str): The body of the alert. + title (str, optional): The title of the alert. Defaults to "". + """ + apprise_client = Apprise() + + from_phone = getenv("SIGNAL_ALERT_FROM_PHONE") + to_phone = getenv("SIGNAL_ALERT_TO_PHONE") + if not from_phone or not to_phone: + logger.info("SIGNAL_ALERT_FROM_PHONE or SIGNAL_ALERT_TO_PHONE not set") + return + + apprise_client.add(f"signal://localhost:8989/{from_phone}/{to_phone}") + + apprise_client.notify(title=title, body=body) + + +def utcnow() -> datetime: + """Get the current UTC time.""" + return datetime.now(tz=UTC) diff --git a/python/parallelize.py b/python/parallelize.py new file mode 100644 index 0000000..4922fac --- /dev/null +++ b/python/parallelize.py @@ -0,0 +1,153 @@ +"""Thing.""" + +from __future__ import annotations + +import logging +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor +from dataclasses import dataclass +from multiprocessing import cpu_count +from typing import TYPE_CHECKING, Any, Generic, Literal, TypeVar + +if TYPE_CHECKING: + from collections.abc import Callable, Mapping, Sequence + +R = TypeVar("R") + +modes = Literal["normal", "early_error"] + + +@dataclass +class ExecutorResults(Generic[R]): + """Dataclass to store the results and exceptions of the parallel execution.""" + + results: list[R] + exceptions: list[BaseException] + + def __repr__(self) -> str: + """Return a string representation of the object.""" + return f"results={self.results} exceptions={self.exceptions}" + + +def _parallelize_base( + executor_type: type[ThreadPoolExecutor | ProcessPoolExecutor], + func: Callable[..., R], + kwargs_list: Sequence[Mapping[str, Any]], + max_workers: int | None, + progress_tracker: int | None, + mode: modes, +) -> ExecutorResults: + total_work = len(kwargs_list) + + with executor_type(max_workers=max_workers) as executor: + futures = [executor.submit(func, **kwarg) for kwarg in kwargs_list] + + results = [] + exceptions = [] + for index, future in enumerate(futures, 1): + if exception := future.exception(): + logging.error(f"{future} raised {exception.__class__.__name__}") + exceptions.append(exception) + if mode == "early_error": + executor.shutdown(wait=False) + raise exception + continue + + results.append(future.result()) + + if progress_tracker and index % progress_tracker == 0: + logging.info(f"Progress: {index}/{total_work}") + + return ExecutorResults(results, exceptions) + + +def parallelize_thread( + func: Callable[..., R], + kwargs_list: Sequence[Mapping[str, Any]], + max_workers: int | None = None, + progress_tracker: int | None = None, + mode: modes = "normal", +) -> ExecutorResults: + """Generic function to run a function with multiple arguments in threads. + + Args: + func (Callable[..., R]): Function to run in threads. + kwargs_list (Sequence[Mapping[str, Any]]): List of dictionaries with the arguments for the function. + max_workers (int, optional): Number of workers to use. Defaults to 8. + progress_tracker (int, optional): Number of tasks to complete before logging progress. + mode (modes, optional): Mode to use. Defaults to "normal". + + Returns: + tuple[list[R], list[Exception]]: List with the results and a list with the exceptions. + """ + return _parallelize_base( + executor_type=ThreadPoolExecutor, + func=func, + kwargs_list=kwargs_list, + max_workers=max_workers, + progress_tracker=progress_tracker, + mode=mode, + ) + + +def parallelize_process( + func: Callable[..., R], + kwargs_list: Sequence[Mapping[str, Any]], + max_workers: int | None = None, + progress_tracker: int | None = None, + mode: modes = "normal", +) -> ExecutorResults: + """Generic function to run a function with multiple arguments in process. + + Args: + func (Callable[..., R]): Function to run in process. + kwargs_list (Sequence[Mapping[str, Any]]): List of dictionaries with the arguments for the function. + max_workers (int, optional): Number of workers to use. Defaults to 4. + progress_tracker (int, optional): Number of tasks to complete before logging progress. + mode (modes, optional): Mode to use. Defaults to "normal". + + Returns: + tuple[list[R], list[Exception]]: List with the results and a list with the exceptions. + """ + if max_workers and max_workers > cpu_count(): + error = f"max_workers must be less than or equal to {cpu_count()}" + raise RuntimeError(error) + + return process_executor_unchecked( + func=func, + kwargs_list=kwargs_list, + max_workers=max_workers, + progress_tracker=progress_tracker, + mode=mode, + ) + + +def process_executor_unchecked( + func: Callable[..., R], + kwargs_list: Sequence[Mapping[str, Any]], + max_workers: int | None, + progress_tracker: int | None, + mode: modes = "normal", +) -> ExecutorResults: + """Generic function to run a function with multiple arguments in parallel. + + Note: this function does not check if the number of workers is greater than the number of CPUs. + This can cause the system to become unresponsive. + + Args: + func (Callable[..., R]): Function to run in parallel. + kwargs_list (Sequence[Mapping[str, Any]]): List of dictionaries with the arguments for the function. + max_workers (int, optional): Number of workers to use. Defaults to 8. + progress_tracker (int, optional): Number of tasks to complete before logging progress. + mode (modes, optional): Mode to use. Defaults to "normal". + + Returns: + tuple[list[R], list[Exception]]: List with the results and a list with the exceptions. + """ + return _parallelize_base( + executor_type=ProcessPoolExecutor, + func=func, + kwargs_list=kwargs_list, + max_workers=max_workers, + progress_tracker=progress_tracker, + mode=mode, + )