got alembic working

This commit is contained in:
2026-04-21 11:36:58 -04:00
parent e5ba089479
commit be4b473a3c
14 changed files with 144 additions and 40 deletions
+1
View File
@@ -0,0 +1 @@
"""Init."""
+72
View File
@@ -0,0 +1,72 @@
"""common."""
from __future__ import annotations
import logging
import sys
from datetime import UTC, datetime
from os import getenv
from subprocess import PIPE, Popen
from apprise import Apprise
logger = logging.getLogger(__name__)
def configure_logger(level: str = "INFO") -> None:
"""Configure the logger.
Args:
level (str, optional): The logging level. Defaults to "INFO".
"""
logging.basicConfig(
level=level,
datefmt="%Y-%m-%dT%H:%M:%S%z",
format="%(asctime)s %(levelname)s %(filename)s:%(lineno)d - %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
def bash_wrapper(command: str) -> tuple[str, int]:
"""Execute a bash command and capture the output.
Args:
command (str): The bash command to be executed.
Returns:
Tuple[str, int]: A tuple containing the output of the command (stdout) as a string,
the error output (stderr) as a string (optional), and the return code as an integer.
"""
# This is a acceptable risk
process = Popen(command.split(), stdout=PIPE, stderr=PIPE)
output, error = process.communicate()
if error:
logger.error(f"{error=}")
return error.decode(), process.returncode
return output.decode(), process.returncode
def signal_alert(body: str, title: str = "") -> None:
"""Send a signal alert.
Args:
body (str): The body of the alert.
title (str, optional): The title of the alert. Defaults to "".
"""
apprise_client = Apprise()
from_phone = getenv("SIGNAL_ALERT_FROM_PHONE")
to_phone = getenv("SIGNAL_ALERT_TO_PHONE")
if not from_phone or not to_phone:
logger.info("SIGNAL_ALERT_FROM_PHONE or SIGNAL_ALERT_TO_PHONE not set")
return
apprise_client.add(f"signal://localhost:8989/{from_phone}/{to_phone}")
apprise_client.notify(title=title, body=body)
def utcnow() -> datetime:
"""Get the current UTC time."""
return datetime.now(tz=UTC)
+89
View File
@@ -0,0 +1,89 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import tomllib
@dataclass
class LoraConfig:
"""LoRA adapter hyperparameters."""
rank: int
alpha: int
dropout: float
targets: list[str]
@dataclass
class TrainingConfig:
"""Training loop hyperparameters."""
learning_rate: float
epochs: int
batch_size: int
gradient_accumulation: int
max_seq_length: int
warmup_ratio: float
weight_decay: float
logging_steps: int
save_steps: int
@dataclass
class FinetuneConfig:
"""Top-level finetune configuration."""
base_model: str
lora: LoraConfig
training: TrainingConfig
@classmethod
def from_toml(cls, config_path: Path) -> FinetuneConfig:
"""Load finetune config from a TOML file."""
raw = tomllib.loads(config_path.read_text())["finetune"]
return cls(
base_model=raw["base_model"],
lora=LoraConfig(**raw["lora"]),
training=TrainingConfig(**raw["training"]),
)
class BenchmarkConfig:
"""Top-level benchmark configuration loaded from TOML."""
models: list[str]
model_dir: str
port: int
gpu_memory_utilization: float
temperature: float
timeout: int
concurrency: int
vllm_startup_timeout: int
@classmethod
def from_toml(cls, config_path: Path) -> BenchmarkConfig:
"""Load benchmark config from a TOML file."""
raw = tomllib.loads(config_path.read_text())["bench"]
return cls(**raw)
def get_config_dir() -> Path:
"""Get the path to the config file."""
return Path(__file__).resolve().parent.parent.parent / "config"
def default_config_path() -> Path:
"""Get the path to the config file."""
return get_config_dir() / "config.toml"
def get_finetune_config(config_path: Path | None = None) -> FinetuneConfig:
if config_path is None:
config_path = default_config_path()
return FinetuneConfig.from_toml(config_path)
def get_benchmark_config(config_path: Path | None = None) -> BenchmarkConfig:
if config_path is None:
config_path = default_config_path()
return BenchmarkConfig.from_toml(config_path)
+1 -1
View File
@@ -21,7 +21,7 @@ import yaml
from sqlalchemy import select
from sqlalchemy.orm import Session
from pipelines.common import configure_logger
from pipelines.pipelines.common import configure_logger
from pipelines.orm.common import get_postgres_engine
from pipelines.orm.data_science_dev.congress import (
Bill,
+2 -2
View File
@@ -17,9 +17,9 @@ import orjson
import psycopg
import typer
from pipelines.common import configure_logger
from pipelines.pipelines.common import configure_logger
from pipelines.orm.common import get_connection_info
from pipelines.parallelize import parallelize_process
from pipelines.pipelines.parallelize import parallelize_process
if TYPE_CHECKING:
from collections.abc import Iterator
+155
View File
@@ -0,0 +1,155 @@
"""Thing."""
from __future__ import annotations
import logging
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from dataclasses import dataclass
from multiprocessing import cpu_count
from typing import TYPE_CHECKING, Any, Literal, TypeVar
if TYPE_CHECKING:
from collections.abc import Callable, Mapping, Sequence
logger = logging.getLogger(__name__)
R = TypeVar("R")
modes = Literal["normal", "early_error"]
@dataclass
class ExecutorResults[R]:
"""Dataclass to store the results and exceptions of the parallel execution."""
results: list[R]
exceptions: list[BaseException]
def __repr__(self) -> str:
"""Return a string representation of the object."""
return f"results={self.results} exceptions={self.exceptions}"
def _parallelize_base[R](
executor_type: type[ThreadPoolExecutor | ProcessPoolExecutor],
func: Callable[..., R],
kwargs_list: Sequence[Mapping[str, Any]],
max_workers: int | None,
progress_tracker: int | None,
mode: modes,
) -> ExecutorResults:
total_work = len(kwargs_list)
with executor_type(max_workers=max_workers) as executor:
futures = [executor.submit(func, **kwarg) for kwarg in kwargs_list]
results = []
exceptions = []
for index, future in enumerate(futures, 1):
if exception := future.exception():
logger.error(f"{future} raised {exception.__class__.__name__}")
exceptions.append(exception)
if mode == "early_error":
executor.shutdown(wait=False)
raise exception
continue
results.append(future.result())
if progress_tracker and index % progress_tracker == 0:
logger.info(f"Progress: {index}/{total_work}")
return ExecutorResults(results, exceptions)
def parallelize_thread[R](
func: Callable[..., R],
kwargs_list: Sequence[Mapping[str, Any]],
max_workers: int | None = None,
progress_tracker: int | None = None,
mode: modes = "normal",
) -> ExecutorResults:
"""Generic function to run a function with multiple arguments in threads.
Args:
func (Callable[..., R]): Function to run in threads.
kwargs_list (Sequence[Mapping[str, Any]]): List of dictionaries with the arguments for the function.
max_workers (int, optional): Number of workers to use. Defaults to 8.
progress_tracker (int, optional): Number of tasks to complete before logging progress.
mode (modes, optional): Mode to use. Defaults to "normal".
Returns:
tuple[list[R], list[Exception]]: List with the results and a list with the exceptions.
"""
return _parallelize_base(
executor_type=ThreadPoolExecutor,
func=func,
kwargs_list=kwargs_list,
max_workers=max_workers,
progress_tracker=progress_tracker,
mode=mode,
)
def parallelize_process[R](
func: Callable[..., R],
kwargs_list: Sequence[Mapping[str, Any]],
max_workers: int | None = None,
progress_tracker: int | None = None,
mode: modes = "normal",
) -> ExecutorResults:
"""Generic function to run a function with multiple arguments in process.
Args:
func (Callable[..., R]): Function to run in process.
kwargs_list (Sequence[Mapping[str, Any]]): List of dictionaries with the arguments for the function.
max_workers (int, optional): Number of workers to use. Defaults to 4.
progress_tracker (int, optional): Number of tasks to complete before logging progress.
mode (modes, optional): Mode to use. Defaults to "normal".
Returns:
tuple[list[R], list[Exception]]: List with the results and a list with the exceptions.
"""
if max_workers and max_workers > cpu_count():
error = f"max_workers must be less than or equal to {cpu_count()}"
raise RuntimeError(error)
return process_executor_unchecked(
func=func,
kwargs_list=kwargs_list,
max_workers=max_workers,
progress_tracker=progress_tracker,
mode=mode,
)
def process_executor_unchecked[R](
func: Callable[..., R],
kwargs_list: Sequence[Mapping[str, Any]],
max_workers: int | None,
progress_tracker: int | None,
mode: modes = "normal",
) -> ExecutorResults:
"""Generic function to run a function with multiple arguments in parallel.
Note: this function does not check if the number of workers is greater than the number of CPUs.
This can cause the system to become unresponsive.
Args:
func (Callable[..., R]): Function to run in parallel.
kwargs_list (Sequence[Mapping[str, Any]]): List of dictionaries with the arguments for the function.
max_workers (int, optional): Number of workers to use. Defaults to 8.
progress_tracker (int, optional): Number of tasks to complete before logging progress.
mode (modes, optional): Mode to use. Defaults to "normal".
Returns:
tuple[list[R], list[Exception]]: List with the results and a list with the exceptions.
"""
return _parallelize_base(
executor_type=ProcessPoolExecutor,
func=func,
kwargs_list=kwargs_list,
max_workers=max_workers,
progress_tracker=progress_tracker,
mode=mode,
)