updated actions
This commit is contained in:
+335
@@ -0,0 +1,335 @@
|
||||
"""Small Gitea API client for repository automation."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Self
|
||||
|
||||
import httpx
|
||||
|
||||
DEFAULT_PAGE_SIZE = 100
|
||||
EXPECTED_CREATED = 201
|
||||
EXPECTED_OK = 200
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class CreatedIssue:
|
||||
"""Issue data returned by Gitea."""
|
||||
|
||||
number: int | None
|
||||
html_url: str | None
|
||||
title: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PullRequest:
|
||||
"""Pull request data returned by Gitea."""
|
||||
|
||||
number: int
|
||||
title: str
|
||||
html_url: str | None
|
||||
labels: tuple[str, ...]
|
||||
head_branch: str | None
|
||||
base_branch: str | None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class WorkflowJob:
|
||||
"""Workflow job data returned by Gitea Actions."""
|
||||
|
||||
id: int
|
||||
name: str
|
||||
run_id: int | None
|
||||
status: str | None
|
||||
conclusion: str | None
|
||||
|
||||
|
||||
class GiteaError(RuntimeError):
|
||||
"""Raised when Gitea rejects an API request."""
|
||||
|
||||
|
||||
def split_repo_name(repo: str) -> tuple[str, str]:
|
||||
"""Split an owner/repo string into its parts."""
|
||||
owner, separator, repo_name = repo.partition("/")
|
||||
if not separator or not owner or not repo_name:
|
||||
msg = f"Invalid repository name: {repo}"
|
||||
raise ValueError(msg)
|
||||
return owner, repo_name
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
"""HTTP client for the subset of Gitea APIs used in this repository."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
base_url: str,
|
||||
token: str,
|
||||
timeout: int = 30,
|
||||
transport: httpx.BaseTransport | None = None,
|
||||
) -> None:
|
||||
"""Initialize the Gitea client."""
|
||||
self._client = httpx.Client(
|
||||
base_url=base_url.rstrip("/"),
|
||||
timeout=timeout,
|
||||
headers={"Authorization": f"token {token}"},
|
||||
transport=transport,
|
||||
)
|
||||
|
||||
def create_issue(
|
||||
self,
|
||||
*,
|
||||
owner: str,
|
||||
repo: str,
|
||||
title: str,
|
||||
body: str,
|
||||
labels: list[int] | None = None,
|
||||
) -> CreatedIssue:
|
||||
"""Create a Gitea issue."""
|
||||
payload: dict[str, object] = {"title": title, "body": body, "labels": labels or []}
|
||||
response = self._request(
|
||||
"POST",
|
||||
f"/api/v1/repos/{owner}/{repo}/issues",
|
||||
expected_statuses={EXPECTED_CREATED},
|
||||
json=payload,
|
||||
)
|
||||
data = response.json()
|
||||
return CreatedIssue(
|
||||
number=_optional_int(data.get("number")),
|
||||
html_url=_optional_str(data.get("html_url")),
|
||||
title=str(data.get("title", title)),
|
||||
)
|
||||
|
||||
def resolve_label_ids(self, *, owner: str, repo: str, labels: list[str]) -> list[int]:
|
||||
"""Resolve label names to Gitea label IDs."""
|
||||
if not labels:
|
||||
return []
|
||||
|
||||
available_labels: dict[str, int] = {}
|
||||
page = 1
|
||||
while True:
|
||||
response = self._request(
|
||||
"GET",
|
||||
f"/api/v1/repos/{owner}/{repo}/labels",
|
||||
params={"page": page, "limit": DEFAULT_PAGE_SIZE},
|
||||
)
|
||||
batch = response.json()
|
||||
if not batch:
|
||||
break
|
||||
for label in batch:
|
||||
label_name = str(label.get("name", ""))
|
||||
label_id = _optional_int(label.get("id"))
|
||||
if label_name and label_id is not None:
|
||||
available_labels[label_name] = label_id
|
||||
if len(batch) < DEFAULT_PAGE_SIZE:
|
||||
break
|
||||
page += 1
|
||||
|
||||
missing = [label for label in labels if label not in available_labels]
|
||||
if missing:
|
||||
missing_names = ", ".join(sorted(missing))
|
||||
msg = f"Missing Gitea labels: {missing_names}"
|
||||
raise GiteaError(msg)
|
||||
|
||||
return [available_labels[label] for label in labels]
|
||||
|
||||
def list_open_pull_requests(
|
||||
self,
|
||||
*,
|
||||
owner: str,
|
||||
repo: str,
|
||||
labels: list[str] | None = None,
|
||||
head: str | None = None,
|
||||
) -> list[PullRequest]:
|
||||
"""List open pull requests for a repository."""
|
||||
expected_labels = set(labels or [])
|
||||
pull_requests: list[PullRequest] = []
|
||||
page = 1
|
||||
while True:
|
||||
response = self._request(
|
||||
"GET",
|
||||
f"/api/v1/repos/{owner}/{repo}/pulls",
|
||||
params={"state": "open", "page": page, "limit": DEFAULT_PAGE_SIZE},
|
||||
)
|
||||
batch = response.json()
|
||||
if not batch:
|
||||
break
|
||||
|
||||
for item in batch:
|
||||
pull_request = _pull_request_from_api(item)
|
||||
if head and pull_request.head_branch != head:
|
||||
continue
|
||||
if expected_labels and not expected_labels.issubset(set(pull_request.labels)):
|
||||
continue
|
||||
pull_requests.append(pull_request)
|
||||
|
||||
if len(batch) < DEFAULT_PAGE_SIZE:
|
||||
break
|
||||
page += 1
|
||||
|
||||
return pull_requests
|
||||
|
||||
def create_pull_request(
|
||||
self,
|
||||
*,
|
||||
owner: str,
|
||||
repo: str,
|
||||
title: str,
|
||||
body: str,
|
||||
head: str,
|
||||
base: str,
|
||||
labels: list[str] | None = None,
|
||||
) -> PullRequest:
|
||||
"""Create a pull request."""
|
||||
payload: dict[str, object] = {
|
||||
"title": title,
|
||||
"body": body,
|
||||
"head": head,
|
||||
"base": base,
|
||||
}
|
||||
if labels:
|
||||
payload["labels"] = self.resolve_label_ids(owner=owner, repo=repo, labels=labels)
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
f"/api/v1/repos/{owner}/{repo}/pulls",
|
||||
expected_statuses={EXPECTED_CREATED},
|
||||
json=payload,
|
||||
)
|
||||
return _pull_request_from_api(response.json())
|
||||
|
||||
def merge_pull_request(
|
||||
self,
|
||||
*,
|
||||
owner: str,
|
||||
repo: str,
|
||||
number: int,
|
||||
merge_method: str = "rebase",
|
||||
head_commit_id: str | None = None,
|
||||
delete_branch_after_merge: bool = False,
|
||||
) -> None:
|
||||
"""Merge a pull request."""
|
||||
payload: dict[str, object] = {
|
||||
"Do": merge_method,
|
||||
"delete_branch_after_merge": delete_branch_after_merge,
|
||||
}
|
||||
if head_commit_id:
|
||||
payload["head_commit_id"] = head_commit_id
|
||||
|
||||
self._request(
|
||||
"POST",
|
||||
f"/api/v1/repos/{owner}/{repo}/pulls/{number}/merge",
|
||||
json=payload,
|
||||
)
|
||||
|
||||
def list_run_jobs(self, *, owner: str, repo: str, run_id: str | int) -> list[WorkflowJob]:
|
||||
"""List workflow jobs for a specific run."""
|
||||
jobs: list[WorkflowJob] = []
|
||||
page = 1
|
||||
while True:
|
||||
response = self._request(
|
||||
"GET",
|
||||
f"/api/v1/repos/{owner}/{repo}/actions/jobs",
|
||||
params={"page": page, "limit": DEFAULT_PAGE_SIZE},
|
||||
)
|
||||
payload = response.json()
|
||||
batch = payload.get("jobs", [])
|
||||
if not batch:
|
||||
break
|
||||
|
||||
for item in batch:
|
||||
if str(item.get("run_id")) != str(run_id):
|
||||
continue
|
||||
jobs.append(_workflow_job_from_api(item))
|
||||
|
||||
if len(batch) < DEFAULT_PAGE_SIZE:
|
||||
break
|
||||
page += 1
|
||||
|
||||
return jobs
|
||||
|
||||
def download_job_logs(self, *, owner: str, repo: str, job_id: int) -> str:
|
||||
"""Download logs for a workflow job."""
|
||||
response = self._request(
|
||||
"GET",
|
||||
f"/api/v1/repos/{owner}/{repo}/actions/jobs/{job_id}/logs",
|
||||
)
|
||||
return response.text
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the underlying HTTP client."""
|
||||
self._client.close()
|
||||
|
||||
def __enter__(self) -> Self:
|
||||
"""Enter the context manager."""
|
||||
return self
|
||||
|
||||
def __exit__(self, *args: object) -> None:
|
||||
"""Close the HTTP client."""
|
||||
self.close()
|
||||
|
||||
def _request(
|
||||
self,
|
||||
method: str,
|
||||
path: str,
|
||||
*,
|
||||
expected_statuses: set[int] | None = None,
|
||||
**kwargs: object,
|
||||
) -> httpx.Response:
|
||||
"""Send an HTTP request and validate the response status."""
|
||||
response = self._client.request(method, path, **kwargs)
|
||||
statuses = expected_statuses or {EXPECTED_OK}
|
||||
if response.status_code not in statuses:
|
||||
msg = f"Gitea request failed ({response.status_code}): {response.text}"
|
||||
raise GiteaError(msg)
|
||||
return response
|
||||
|
||||
|
||||
def _pull_request_from_api(data: dict[str, object]) -> PullRequest:
|
||||
"""Convert Gitea API pull-request data into a dataclass."""
|
||||
number = _optional_int(data.get("number")) or _optional_int(data.get("index"))
|
||||
if number is None:
|
||||
msg = "Gitea pull request payload is missing a number"
|
||||
raise GiteaError(msg)
|
||||
|
||||
labels = tuple(str(label.get("name", "")) for label in data.get("labels", []))
|
||||
head = data.get("head", {})
|
||||
base = data.get("base", {})
|
||||
return PullRequest(
|
||||
number=number,
|
||||
title=str(data.get("title", "")),
|
||||
html_url=_optional_str(data.get("html_url")),
|
||||
labels=tuple(label for label in labels if label),
|
||||
head_branch=_optional_str(head.get("ref")) or _optional_str(data.get("head_branch")),
|
||||
base_branch=_optional_str(base.get("ref")) or _optional_str(data.get("base_branch")),
|
||||
)
|
||||
|
||||
|
||||
def _workflow_job_from_api(data: dict[str, object]) -> WorkflowJob:
|
||||
"""Convert Gitea API workflow-job data into a dataclass."""
|
||||
job_id = _optional_int(data.get("id"))
|
||||
if job_id is None:
|
||||
msg = "Gitea workflow job payload is missing an ID"
|
||||
raise GiteaError(msg)
|
||||
|
||||
return WorkflowJob(
|
||||
id=job_id,
|
||||
name=str(data.get("name", "")),
|
||||
run_id=_optional_int(data.get("run_id")),
|
||||
status=_optional_str(data.get("status")),
|
||||
conclusion=_optional_str(data.get("conclusion")),
|
||||
)
|
||||
|
||||
|
||||
def _optional_int(value: object) -> int | None:
|
||||
"""Convert an API value to an integer when present."""
|
||||
if value is None:
|
||||
return None
|
||||
return int(value)
|
||||
|
||||
|
||||
def _optional_str(value: object) -> str | None:
|
||||
"""Convert an API value to a string when present."""
|
||||
if value is None:
|
||||
return None
|
||||
return str(value)
|
||||
@@ -0,0 +1,138 @@
|
||||
"""Automation helpers for flake.lock pull requests on Gitea."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import subprocess
|
||||
from os import getenv
|
||||
from typing import Annotated
|
||||
|
||||
import typer
|
||||
|
||||
from python.gitea import GiteaClient, PullRequest, split_repo_name
|
||||
|
||||
DEFAULT_BASE_BRANCH = "main"
|
||||
DEFAULT_BRANCH = "automation/update-flake-lock"
|
||||
DEFAULT_GITEA_URL = "https://gitea.tmmworkshop.com"
|
||||
PR_LABELS = ["dependencies", "automated", "flake_lock_update"]
|
||||
PR_TITLE = "Update flake.lock"
|
||||
PR_BODY = "Automated flake.lock update."
|
||||
|
||||
app = typer.Typer(add_completion=False)
|
||||
|
||||
|
||||
def run_cmd(cmd: list[str], *, check: bool = True) -> subprocess.CompletedProcess[str]:
|
||||
"""Run a subprocess command."""
|
||||
return subprocess.run(cmd, capture_output=True, text=True, check=check)
|
||||
|
||||
|
||||
def ensure_flake_lock_pull_request(
|
||||
client: GiteaClient,
|
||||
*,
|
||||
owner: str,
|
||||
repo: str,
|
||||
branch: str,
|
||||
base: str,
|
||||
) -> PullRequest:
|
||||
"""Return an existing flake.lock PR for the branch or create one."""
|
||||
pull_requests = client.list_open_pull_requests(owner=owner, repo=repo, head=branch)
|
||||
if pull_requests:
|
||||
return pull_requests[0]
|
||||
|
||||
return client.create_pull_request(
|
||||
owner=owner,
|
||||
repo=repo,
|
||||
title=PR_TITLE,
|
||||
body=PR_BODY,
|
||||
head=branch,
|
||||
base=base,
|
||||
labels=PR_LABELS,
|
||||
)
|
||||
|
||||
|
||||
def find_flake_lock_pull_request(client: GiteaClient, *, owner: str, repo: str) -> PullRequest | None:
|
||||
"""Find the first open flake.lock pull request."""
|
||||
pull_requests = client.list_open_pull_requests(owner=owner, repo=repo, labels=["flake_lock_update"])
|
||||
if not pull_requests:
|
||||
return None
|
||||
return pull_requests[0]
|
||||
|
||||
|
||||
def has_worktree_changes() -> bool:
|
||||
"""Return whether `flake.lock` has worktree changes."""
|
||||
result = run_cmd(["git", "diff", "--quiet", "--", "flake.lock"], check=False)
|
||||
return result.returncode != 0
|
||||
|
||||
|
||||
def commit_flake_lock_update(*, branch: str) -> None:
|
||||
"""Commit the updated lock file to the automation branch."""
|
||||
run_cmd(["git", "config", "user.name", "gitea-actions[bot]"])
|
||||
run_cmd(["git", "config", "user.email", "gitea-actions@tmmworkshop.com"])
|
||||
run_cmd(["git", "checkout", "-B", branch])
|
||||
run_cmd(["git", "add", "flake.lock"])
|
||||
run_cmd(["git", "commit", "-m", "chore: update flake.lock"])
|
||||
|
||||
|
||||
def push_branch(*, branch: str) -> None:
|
||||
"""Push the automation branch to origin."""
|
||||
run_cmd(["git", "push", "origin", f"HEAD:{branch}", "--force"])
|
||||
|
||||
|
||||
def _required_gitea_token() -> str:
|
||||
"""Read the required Gitea token from the environment."""
|
||||
token = getenv("GITEA_TOKEN")
|
||||
if token:
|
||||
return token
|
||||
|
||||
msg = "GITEA_TOKEN environment variable is required"
|
||||
raise RuntimeError(msg)
|
||||
|
||||
|
||||
@app.command()
|
||||
def update(
|
||||
repo: Annotated[str, typer.Option("--repo", help="Gitea repository in owner/repo form")],
|
||||
base: Annotated[str, typer.Option("--base", help="Base branch")] = DEFAULT_BASE_BRANCH,
|
||||
branch: Annotated[str, typer.Option("--branch", help="Automation branch")] = DEFAULT_BRANCH,
|
||||
) -> None:
|
||||
"""Commit flake.lock changes and ensure a pull request exists."""
|
||||
if not has_worktree_changes():
|
||||
typer.echo("No flake.lock changes detected")
|
||||
return
|
||||
|
||||
commit_flake_lock_update(branch=branch)
|
||||
push_branch(branch=branch)
|
||||
|
||||
owner, repo_name = split_repo_name(repo)
|
||||
with GiteaClient(
|
||||
base_url=getenv("GITEA_URL", DEFAULT_GITEA_URL),
|
||||
token=_required_gitea_token(),
|
||||
) as client:
|
||||
pull_request = ensure_flake_lock_pull_request(
|
||||
client,
|
||||
owner=owner,
|
||||
repo=repo_name,
|
||||
branch=branch,
|
||||
base=base,
|
||||
)
|
||||
typer.echo(pull_request.html_url or f"Pull request #{pull_request.number}")
|
||||
|
||||
|
||||
@app.command()
|
||||
def merge(
|
||||
repo: Annotated[str, typer.Option("--repo", help="Gitea repository in owner/repo form")],
|
||||
) -> None:
|
||||
"""Merge the first open flake.lock pull request."""
|
||||
owner, repo_name = split_repo_name(repo)
|
||||
with GiteaClient(
|
||||
base_url=getenv("GITEA_URL", DEFAULT_GITEA_URL),
|
||||
token=_required_gitea_token(),
|
||||
) as client:
|
||||
pull_request = find_flake_lock_pull_request(client, owner=owner, repo=repo_name)
|
||||
if not pull_request:
|
||||
typer.echo("No open PR found with label flake_lock_update")
|
||||
return
|
||||
client.merge_pull_request(owner=owner, repo=repo_name, number=pull_request.number, merge_method="rebase")
|
||||
typer.echo(f"Merged PR #{pull_request.number}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app()
|
||||
Reference in New Issue
Block a user