From a906e59a8ccb477e9068b3ccf1d0e3cf7194ea99 Mon Sep 17 00:00:00 2001 From: Richie Cahill Date: Thu, 30 Apr 2026 11:46:18 -0400 Subject: [PATCH] updated actions --- .github/workflows/fix_eval_warnings.yml | 30 -- .github/workflows/merge_flake_lock_update.yml | 20 +- .github/workflows/pytest.yml | 1 - .github/workflows/update-flake-lock.yml | 24 +- python/gitea.py | 335 ++++++++++++++++++ python/gitea_flake_lock.py | 138 ++++++++ 6 files changed, 493 insertions(+), 55 deletions(-) delete mode 100644 .github/workflows/fix_eval_warnings.yml create mode 100644 python/gitea.py create mode 100644 python/gitea_flake_lock.py diff --git a/.github/workflows/fix_eval_warnings.yml b/.github/workflows/fix_eval_warnings.yml deleted file mode 100644 index 79eeb4a..0000000 --- a/.github/workflows/fix_eval_warnings.yml +++ /dev/null @@ -1,30 +0,0 @@ -name: fix_eval_warnings -on: - workflow_run: - workflows: ["build_systems"] - types: [completed] - -jobs: - check-warnings: - if: >- - github.event.workflow_run.conclusion != 'cancelled' && - github.event.workflow_run.head_branch == 'main' && - (github.event.workflow_run.event == 'push' || github.event.workflow_run.event == 'schedule') - runs-on: self-hosted - permissions: - contents: write - pull-requests: write - - steps: - - uses: actions/checkout@v4 - - - name: Fix eval warnings - env: - GH_TOKEN: ${{ secrets.GH_TOKEN_FOR_UPDATES }} - run: >- - nix develop .#devShells.x86_64-linux.default -c - python -m python.eval_warnings.main - --run-id "${{ github.event.workflow_run.id }}" - --repo "${{ github.repository }}" - --ollama-url "${{ secrets.OLLAMA_URL }}" - --run-url "${{ github.event.workflow_run.html_url }}" diff --git a/.github/workflows/merge_flake_lock_update.yml b/.github/workflows/merge_flake_lock_update.yml index 83c5d57..e4d34b6 100644 --- a/.github/workflows/merge_flake_lock_update.yml +++ b/.github/workflows/merge_flake_lock_update.yml @@ -6,24 +6,18 @@ on: jobs: merge: - runs-on: ubuntu-latest + runs-on: self-hosted permissions: contents: write pull-requests: write steps: - - name: Checkout repository - uses: actions/checkout@v4 - - name: merge_flake_lock_update - run: | - pr_number=$(gh pr list --state open --author RichieCahill --label flake_lock_update --json number --jq '.[0].number') - echo "pr_number=$pr_number" >> $GITHUB_ENV - if [ -n "$pr_number" ]; then - gh pr merge "$pr_number" --rebase - else - echo "No open PR found with label flake_lock_update" - fi + run: >- + nix develop .#devShells.x86_64-linux.default -c + python -m python.gitea_flake_lock merge + --repo "${{ github.repository }}" env: - GITHUB_TOKEN: ${{ secrets.GH_TOKEN_FOR_UPDATES }} + GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }} + GITEA_URL: https://gitea.tmmworkshop.com diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 063a3f5..3e2fe6d 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -7,7 +7,6 @@ on: pull_request: branches: - main - merge_group: jobs: pytest: diff --git a/.github/workflows/update-flake-lock.yml b/.github/workflows/update-flake-lock.yml index 6b7ce74..d13563d 100644 --- a/.github/workflows/update-flake-lock.yml +++ b/.github/workflows/update-flake-lock.yml @@ -6,18 +6,20 @@ on: jobs: lockfile: - runs-on: ubuntu-latest + runs-on: self-hosted + permissions: + contents: write + pull-requests: write steps: - name: Checkout repository uses: actions/checkout@v4 - - name: Install Nix - uses: DeterminateSystems/nix-installer-action@main - name: Update flake.lock - uses: DeterminateSystems/update-flake-lock@main - with: - token: ${{ secrets.GH_TOKEN_FOR_UPDATES }} - pr-title: "Update flake.lock" - pr-labels: | - dependencies - automated - flake_lock_update + run: nix flake update + - name: Create or update flake.lock PR + env: + GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }} + GITEA_URL: https://gitea.tmmworkshop.com + run: >- + nix develop .#devShells.x86_64-linux.default -c + python -m python.gitea_flake_lock update + --repo "${{ github.repository }}" diff --git a/python/gitea.py b/python/gitea.py new file mode 100644 index 0000000..8b57fdf --- /dev/null +++ b/python/gitea.py @@ -0,0 +1,335 @@ +"""Small Gitea API client for repository automation.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Self + +import httpx + +DEFAULT_PAGE_SIZE = 100 +EXPECTED_CREATED = 201 +EXPECTED_OK = 200 + + +@dataclass(frozen=True) +class CreatedIssue: + """Issue data returned by Gitea.""" + + number: int | None + html_url: str | None + title: str + + +@dataclass(frozen=True) +class PullRequest: + """Pull request data returned by Gitea.""" + + number: int + title: str + html_url: str | None + labels: tuple[str, ...] + head_branch: str | None + base_branch: str | None + + +@dataclass(frozen=True) +class WorkflowJob: + """Workflow job data returned by Gitea Actions.""" + + id: int + name: str + run_id: int | None + status: str | None + conclusion: str | None + + +class GiteaError(RuntimeError): + """Raised when Gitea rejects an API request.""" + + +def split_repo_name(repo: str) -> tuple[str, str]: + """Split an owner/repo string into its parts.""" + owner, separator, repo_name = repo.partition("/") + if not separator or not owner or not repo_name: + msg = f"Invalid repository name: {repo}" + raise ValueError(msg) + return owner, repo_name + + +class GiteaClient: + """HTTP client for the subset of Gitea APIs used in this repository.""" + + def __init__( + self, + *, + base_url: str, + token: str, + timeout: int = 30, + transport: httpx.BaseTransport | None = None, + ) -> None: + """Initialize the Gitea client.""" + self._client = httpx.Client( + base_url=base_url.rstrip("/"), + timeout=timeout, + headers={"Authorization": f"token {token}"}, + transport=transport, + ) + + def create_issue( + self, + *, + owner: str, + repo: str, + title: str, + body: str, + labels: list[int] | None = None, + ) -> CreatedIssue: + """Create a Gitea issue.""" + payload: dict[str, object] = {"title": title, "body": body, "labels": labels or []} + response = self._request( + "POST", + f"/api/v1/repos/{owner}/{repo}/issues", + expected_statuses={EXPECTED_CREATED}, + json=payload, + ) + data = response.json() + return CreatedIssue( + number=_optional_int(data.get("number")), + html_url=_optional_str(data.get("html_url")), + title=str(data.get("title", title)), + ) + + def resolve_label_ids(self, *, owner: str, repo: str, labels: list[str]) -> list[int]: + """Resolve label names to Gitea label IDs.""" + if not labels: + return [] + + available_labels: dict[str, int] = {} + page = 1 + while True: + response = self._request( + "GET", + f"/api/v1/repos/{owner}/{repo}/labels", + params={"page": page, "limit": DEFAULT_PAGE_SIZE}, + ) + batch = response.json() + if not batch: + break + for label in batch: + label_name = str(label.get("name", "")) + label_id = _optional_int(label.get("id")) + if label_name and label_id is not None: + available_labels[label_name] = label_id + if len(batch) < DEFAULT_PAGE_SIZE: + break + page += 1 + + missing = [label for label in labels if label not in available_labels] + if missing: + missing_names = ", ".join(sorted(missing)) + msg = f"Missing Gitea labels: {missing_names}" + raise GiteaError(msg) + + return [available_labels[label] for label in labels] + + def list_open_pull_requests( + self, + *, + owner: str, + repo: str, + labels: list[str] | None = None, + head: str | None = None, + ) -> list[PullRequest]: + """List open pull requests for a repository.""" + expected_labels = set(labels or []) + pull_requests: list[PullRequest] = [] + page = 1 + while True: + response = self._request( + "GET", + f"/api/v1/repos/{owner}/{repo}/pulls", + params={"state": "open", "page": page, "limit": DEFAULT_PAGE_SIZE}, + ) + batch = response.json() + if not batch: + break + + for item in batch: + pull_request = _pull_request_from_api(item) + if head and pull_request.head_branch != head: + continue + if expected_labels and not expected_labels.issubset(set(pull_request.labels)): + continue + pull_requests.append(pull_request) + + if len(batch) < DEFAULT_PAGE_SIZE: + break + page += 1 + + return pull_requests + + def create_pull_request( + self, + *, + owner: str, + repo: str, + title: str, + body: str, + head: str, + base: str, + labels: list[str] | None = None, + ) -> PullRequest: + """Create a pull request.""" + payload: dict[str, object] = { + "title": title, + "body": body, + "head": head, + "base": base, + } + if labels: + payload["labels"] = self.resolve_label_ids(owner=owner, repo=repo, labels=labels) + + response = self._request( + "POST", + f"/api/v1/repos/{owner}/{repo}/pulls", + expected_statuses={EXPECTED_CREATED}, + json=payload, + ) + return _pull_request_from_api(response.json()) + + def merge_pull_request( + self, + *, + owner: str, + repo: str, + number: int, + merge_method: str = "rebase", + head_commit_id: str | None = None, + delete_branch_after_merge: bool = False, + ) -> None: + """Merge a pull request.""" + payload: dict[str, object] = { + "Do": merge_method, + "delete_branch_after_merge": delete_branch_after_merge, + } + if head_commit_id: + payload["head_commit_id"] = head_commit_id + + self._request( + "POST", + f"/api/v1/repos/{owner}/{repo}/pulls/{number}/merge", + json=payload, + ) + + def list_run_jobs(self, *, owner: str, repo: str, run_id: str | int) -> list[WorkflowJob]: + """List workflow jobs for a specific run.""" + jobs: list[WorkflowJob] = [] + page = 1 + while True: + response = self._request( + "GET", + f"/api/v1/repos/{owner}/{repo}/actions/jobs", + params={"page": page, "limit": DEFAULT_PAGE_SIZE}, + ) + payload = response.json() + batch = payload.get("jobs", []) + if not batch: + break + + for item in batch: + if str(item.get("run_id")) != str(run_id): + continue + jobs.append(_workflow_job_from_api(item)) + + if len(batch) < DEFAULT_PAGE_SIZE: + break + page += 1 + + return jobs + + def download_job_logs(self, *, owner: str, repo: str, job_id: int) -> str: + """Download logs for a workflow job.""" + response = self._request( + "GET", + f"/api/v1/repos/{owner}/{repo}/actions/jobs/{job_id}/logs", + ) + return response.text + + def close(self) -> None: + """Close the underlying HTTP client.""" + self._client.close() + + def __enter__(self) -> Self: + """Enter the context manager.""" + return self + + def __exit__(self, *args: object) -> None: + """Close the HTTP client.""" + self.close() + + def _request( + self, + method: str, + path: str, + *, + expected_statuses: set[int] | None = None, + **kwargs: object, + ) -> httpx.Response: + """Send an HTTP request and validate the response status.""" + response = self._client.request(method, path, **kwargs) + statuses = expected_statuses or {EXPECTED_OK} + if response.status_code not in statuses: + msg = f"Gitea request failed ({response.status_code}): {response.text}" + raise GiteaError(msg) + return response + + +def _pull_request_from_api(data: dict[str, object]) -> PullRequest: + """Convert Gitea API pull-request data into a dataclass.""" + number = _optional_int(data.get("number")) or _optional_int(data.get("index")) + if number is None: + msg = "Gitea pull request payload is missing a number" + raise GiteaError(msg) + + labels = tuple(str(label.get("name", "")) for label in data.get("labels", [])) + head = data.get("head", {}) + base = data.get("base", {}) + return PullRequest( + number=number, + title=str(data.get("title", "")), + html_url=_optional_str(data.get("html_url")), + labels=tuple(label for label in labels if label), + head_branch=_optional_str(head.get("ref")) or _optional_str(data.get("head_branch")), + base_branch=_optional_str(base.get("ref")) or _optional_str(data.get("base_branch")), + ) + + +def _workflow_job_from_api(data: dict[str, object]) -> WorkflowJob: + """Convert Gitea API workflow-job data into a dataclass.""" + job_id = _optional_int(data.get("id")) + if job_id is None: + msg = "Gitea workflow job payload is missing an ID" + raise GiteaError(msg) + + return WorkflowJob( + id=job_id, + name=str(data.get("name", "")), + run_id=_optional_int(data.get("run_id")), + status=_optional_str(data.get("status")), + conclusion=_optional_str(data.get("conclusion")), + ) + + +def _optional_int(value: object) -> int | None: + """Convert an API value to an integer when present.""" + if value is None: + return None + return int(value) + + +def _optional_str(value: object) -> str | None: + """Convert an API value to a string when present.""" + if value is None: + return None + return str(value) diff --git a/python/gitea_flake_lock.py b/python/gitea_flake_lock.py new file mode 100644 index 0000000..581438a --- /dev/null +++ b/python/gitea_flake_lock.py @@ -0,0 +1,138 @@ +"""Automation helpers for flake.lock pull requests on Gitea.""" + +from __future__ import annotations + +import subprocess +from os import getenv +from typing import Annotated + +import typer + +from python.gitea import GiteaClient, PullRequest, split_repo_name + +DEFAULT_BASE_BRANCH = "main" +DEFAULT_BRANCH = "automation/update-flake-lock" +DEFAULT_GITEA_URL = "https://gitea.tmmworkshop.com" +PR_LABELS = ["dependencies", "automated", "flake_lock_update"] +PR_TITLE = "Update flake.lock" +PR_BODY = "Automated flake.lock update." + +app = typer.Typer(add_completion=False) + + +def run_cmd(cmd: list[str], *, check: bool = True) -> subprocess.CompletedProcess[str]: + """Run a subprocess command.""" + return subprocess.run(cmd, capture_output=True, text=True, check=check) + + +def ensure_flake_lock_pull_request( + client: GiteaClient, + *, + owner: str, + repo: str, + branch: str, + base: str, +) -> PullRequest: + """Return an existing flake.lock PR for the branch or create one.""" + pull_requests = client.list_open_pull_requests(owner=owner, repo=repo, head=branch) + if pull_requests: + return pull_requests[0] + + return client.create_pull_request( + owner=owner, + repo=repo, + title=PR_TITLE, + body=PR_BODY, + head=branch, + base=base, + labels=PR_LABELS, + ) + + +def find_flake_lock_pull_request(client: GiteaClient, *, owner: str, repo: str) -> PullRequest | None: + """Find the first open flake.lock pull request.""" + pull_requests = client.list_open_pull_requests(owner=owner, repo=repo, labels=["flake_lock_update"]) + if not pull_requests: + return None + return pull_requests[0] + + +def has_worktree_changes() -> bool: + """Return whether `flake.lock` has worktree changes.""" + result = run_cmd(["git", "diff", "--quiet", "--", "flake.lock"], check=False) + return result.returncode != 0 + + +def commit_flake_lock_update(*, branch: str) -> None: + """Commit the updated lock file to the automation branch.""" + run_cmd(["git", "config", "user.name", "gitea-actions[bot]"]) + run_cmd(["git", "config", "user.email", "gitea-actions@tmmworkshop.com"]) + run_cmd(["git", "checkout", "-B", branch]) + run_cmd(["git", "add", "flake.lock"]) + run_cmd(["git", "commit", "-m", "chore: update flake.lock"]) + + +def push_branch(*, branch: str) -> None: + """Push the automation branch to origin.""" + run_cmd(["git", "push", "origin", f"HEAD:{branch}", "--force"]) + + +def _required_gitea_token() -> str: + """Read the required Gitea token from the environment.""" + token = getenv("GITEA_TOKEN") + if token: + return token + + msg = "GITEA_TOKEN environment variable is required" + raise RuntimeError(msg) + + +@app.command() +def update( + repo: Annotated[str, typer.Option("--repo", help="Gitea repository in owner/repo form")], + base: Annotated[str, typer.Option("--base", help="Base branch")] = DEFAULT_BASE_BRANCH, + branch: Annotated[str, typer.Option("--branch", help="Automation branch")] = DEFAULT_BRANCH, +) -> None: + """Commit flake.lock changes and ensure a pull request exists.""" + if not has_worktree_changes(): + typer.echo("No flake.lock changes detected") + return + + commit_flake_lock_update(branch=branch) + push_branch(branch=branch) + + owner, repo_name = split_repo_name(repo) + with GiteaClient( + base_url=getenv("GITEA_URL", DEFAULT_GITEA_URL), + token=_required_gitea_token(), + ) as client: + pull_request = ensure_flake_lock_pull_request( + client, + owner=owner, + repo=repo_name, + branch=branch, + base=base, + ) + typer.echo(pull_request.html_url or f"Pull request #{pull_request.number}") + + +@app.command() +def merge( + repo: Annotated[str, typer.Option("--repo", help="Gitea repository in owner/repo form")], +) -> None: + """Merge the first open flake.lock pull request.""" + owner, repo_name = split_repo_name(repo) + with GiteaClient( + base_url=getenv("GITEA_URL", DEFAULT_GITEA_URL), + token=_required_gitea_token(), + ) as client: + pull_request = find_flake_lock_pull_request(client, owner=owner, repo=repo_name) + if not pull_request: + typer.echo("No open PR found with label flake_lock_update") + return + client.merge_pull_request(owner=owner, repo=repo_name, number=pull_request.number, merge_method="rebase") + typer.echo(f"Merged PR #{pull_request.number}") + + +if __name__ == "__main__": + app()