From 24f0e8693a7035b52a4511549d9a9dedbb9c0c33 Mon Sep 17 00:00:00 2001 From: Richie Cahill Date: Fri, 10 Apr 2026 12:37:14 -0400 Subject: [PATCH] added tools dir for on off scripts i used --- .../tools/build_finetune_dataset.py | 114 ++++++++++++++++++ python/prompt_bench/tools/count_tokens.py | 97 +++++++++++++++ 2 files changed, 211 insertions(+) create mode 100644 python/prompt_bench/tools/build_finetune_dataset.py create mode 100644 python/prompt_bench/tools/count_tokens.py diff --git a/python/prompt_bench/tools/build_finetune_dataset.py b/python/prompt_bench/tools/build_finetune_dataset.py new file mode 100644 index 0000000..e3594b8 --- /dev/null +++ b/python/prompt_bench/tools/build_finetune_dataset.py @@ -0,0 +1,114 @@ +"""Build a fine-tuning JSONL dataset from batch request + output files. + +Joins the original request JSONL (system + user messages) with the batch +output JSONL (assistant completions) by custom_id to produce a ChatML-style +messages JSONL suitable for fine-tuning. +""" + +from __future__ import annotations + +import json +import logging +from pathlib import Path +from typing import Annotated + +import typer + +logger = logging.getLogger(__name__) + +HTTP_OK = 200 + + +def load_requests(path: Path) -> dict[str, list[dict]]: + """Parse request JSONL into {custom_id: messages}.""" + results: dict[str, list[dict]] = {} + with path.open(encoding="utf-8") as handle: + for raw_line in handle: + stripped = raw_line.strip() + if not stripped: + continue + record = json.loads(stripped) + custom_id = record["custom_id"] + messages = record["body"]["messages"] + results[custom_id] = messages + return results + + +def load_completions(path: Path) -> dict[str, str]: + """Parse batch output JSONL into {custom_id: assistant_content}.""" + results: dict[str, str] = {} + with path.open(encoding="utf-8") as handle: + for line_number, raw_line in enumerate(handle, 1): + stripped = raw_line.strip() + if not stripped: + continue + record = json.loads(stripped) + custom_id = record["custom_id"] + response = record.get("response", {}) + if response.get("status_code") != HTTP_OK: + logger.warning("Skipping %s (line %d): status %s", custom_id, line_number, response.get("status_code")) + continue + body = response.get("body", {}) + choices = body.get("choices", []) + if not choices: + logger.warning("Skipping %s (line %d): no choices", custom_id, line_number) + continue + content = choices[0].get("message", {}).get("content", "") + if not content: + logger.warning("Skipping %s (line %d): empty content", custom_id, line_number) + continue + results[custom_id] = content + return results + + +def main( + requests_path: Annotated[Path, typer.Option("--requests", help="Batch request JSONL")] = Path( + "output/openai_batch/requests.jsonl", + ), + batch_output: Annotated[Path, typer.Option("--batch-output", help="Batch output JSONL")] = Path( + "batch_69d84558d91c819091d53f08d78f9fd6_output.jsonl", + ), + output_path: Annotated[Path, typer.Option("--output", help="Fine-tuning JSONL output")] = Path( + "output/finetune_dataset.jsonl", + ), + log_level: Annotated[str, typer.Option(help="Log level")] = "INFO", +) -> None: + """Build fine-tuning dataset by joining request and output JSONL files.""" + logging.basicConfig(level=log_level, format="%(asctime)s %(levelname)s %(name)s: %(message)s") + + logger.info("Loading requests from %s", requests_path) + requests = load_requests(requests_path) + logger.info("Loaded %d requests", len(requests)) + + logger.info("Loading completions from %s", batch_output) + completions = load_completions(batch_output) + logger.info("Loaded %d completions", len(completions)) + + output_path.parent.mkdir(parents=True, exist_ok=True) + matched = 0 + skipped = 0 + + with output_path.open("w", encoding="utf-8") as handle: + for custom_id, messages in requests.items(): + assistant_content = completions.get(custom_id) + if assistant_content is None: + skipped += 1 + continue + + example = { + "messages": [*messages, {"role": "assistant", "content": assistant_content}], + } + handle.write(json.dumps(example, ensure_ascii=False)) + handle.write("\n") + matched += 1 + + logger.info("Wrote %d examples to %s (skipped %d unmatched)", matched, output_path, skipped) + + +def cli() -> None: + """Typer entry point.""" + typer.run(main) + + +if __name__ == "__main__": + cli() diff --git a/python/prompt_bench/tools/count_tokens.py b/python/prompt_bench/tools/count_tokens.py new file mode 100644 index 0000000..fdc05de --- /dev/null +++ b/python/prompt_bench/tools/count_tokens.py @@ -0,0 +1,97 @@ +"""Sum token usage across compressed and uncompressed run directories.""" + +from __future__ import annotations + +import json +import logging +from dataclasses import dataclass, field +from pathlib import Path +from typing import Annotated + +import typer + +logger = logging.getLogger(__name__) + + +@dataclass +class UsageTotals: + """Aggregate usage counters for a directory of run records.""" + + files: int = 0 + errors: int = 0 + prompt_tokens: int = 0 + cached_tokens: int = 0 + completion_tokens: int = 0 + reasoning_tokens: int = 0 + total_tokens: int = 0 + per_file: list[tuple[str, int, int, int]] = field(default_factory=list) + + +def tally_directory(directory: Path) -> UsageTotals: + """Return aggregated usage stats for every JSON record in a directory.""" + totals = UsageTotals() + decoder = json.JSONDecoder() + for path in sorted(directory.glob("*.json")): + text = path.read_text().lstrip() + record, _ = decoder.raw_decode(text) + totals.files += 1 + usage = record.get("usage") + if not usage: + totals.errors += 1 + continue + prompt_tokens = usage.get("prompt_tokens", 0) + completion_tokens = usage.get("completion_tokens", 0) + total_tokens = usage.get("total_tokens", 0) + cached_tokens = (usage.get("prompt_tokens_details") or {}).get("cached_tokens", 0) + reasoning_tokens = (usage.get("completion_tokens_details") or {}).get("reasoning_tokens", 0) + totals.prompt_tokens += prompt_tokens + totals.completion_tokens += completion_tokens + totals.total_tokens += total_tokens + totals.cached_tokens += cached_tokens + totals.reasoning_tokens += reasoning_tokens + totals.per_file.append((path.name, prompt_tokens, completion_tokens, total_tokens)) + return totals + + +def log_totals(label: str, totals: UsageTotals) -> None: + """Log a one-block summary for a directory.""" + counted = totals.files - totals.errors + average_total = totals.total_tokens / counted if counted else 0 + logger.info("[%s]", label) + logger.info(" files : %d (with usage: %d, errors: %d)", totals.files, counted, totals.errors) + logger.info(" prompt tokens : %d", totals.prompt_tokens) + logger.info(" cached tokens : %d", totals.cached_tokens) + logger.info(" completion tok : %d", totals.completion_tokens) + logger.info(" reasoning tok : %d", totals.reasoning_tokens) + logger.info(" total tokens : %d", totals.total_tokens) + logger.info(" avg total/file : %.1f", average_total) + + +def main( + runs_dir: Annotated[Path, typer.Option("--runs-dir")] = Path("output/openai_runs_temp_1"), + log_level: Annotated[str, typer.Option("--log-level")] = "INFO", +) -> None: + """Print token usage totals for the compressed and uncompressed run directories.""" + logging.basicConfig(level=log_level, format="%(message)s") + + grand = UsageTotals() + for label in ("compressed", "uncompressed"): + directory = runs_dir / label + if not directory.is_dir(): + logger.warning("%s: directory not found at %s", label, directory) + continue + totals = tally_directory(directory) + log_totals(label, totals) + grand.files += totals.files + grand.errors += totals.errors + grand.prompt_tokens += totals.prompt_tokens + grand.cached_tokens += totals.cached_tokens + grand.completion_tokens += totals.completion_tokens + grand.reasoning_tokens += totals.reasoning_tokens + grand.total_tokens += totals.total_tokens + + log_totals("grand total", grand) + + +if __name__ == "__main__": + typer.run(main)