336 lines
9.9 KiB
Python
336 lines
9.9 KiB
Python
"""Small Gitea API client for repository automation."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Self
|
|
|
|
import httpx
|
|
|
|
DEFAULT_PAGE_SIZE = 100
|
|
EXPECTED_CREATED = 201
|
|
EXPECTED_OK = 200
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class CreatedIssue:
|
|
"""Issue data returned by Gitea."""
|
|
|
|
number: int | None
|
|
html_url: str | None
|
|
title: str
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class PullRequest:
|
|
"""Pull request data returned by Gitea."""
|
|
|
|
number: int
|
|
title: str
|
|
html_url: str | None
|
|
labels: tuple[str, ...]
|
|
head_branch: str | None
|
|
base_branch: str | None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class WorkflowJob:
|
|
"""Workflow job data returned by Gitea Actions."""
|
|
|
|
id: int
|
|
name: str
|
|
run_id: int | None
|
|
status: str | None
|
|
conclusion: str | None
|
|
|
|
|
|
class GiteaError(RuntimeError):
|
|
"""Raised when Gitea rejects an API request."""
|
|
|
|
|
|
def split_repo_name(repo: str) -> tuple[str, str]:
|
|
"""Split an owner/repo string into its parts."""
|
|
owner, separator, repo_name = repo.partition("/")
|
|
if not separator or not owner or not repo_name:
|
|
msg = f"Invalid repository name: {repo}"
|
|
raise ValueError(msg)
|
|
return owner, repo_name
|
|
|
|
|
|
class GiteaClient:
|
|
"""HTTP client for the subset of Gitea APIs used in this repository."""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
base_url: str,
|
|
token: str,
|
|
timeout: int = 30,
|
|
transport: httpx.BaseTransport | None = None,
|
|
) -> None:
|
|
"""Initialize the Gitea client."""
|
|
self._client = httpx.Client(
|
|
base_url=base_url.rstrip("/"),
|
|
timeout=timeout,
|
|
headers={"Authorization": f"token {token}"},
|
|
transport=transport,
|
|
)
|
|
|
|
def create_issue(
|
|
self,
|
|
*,
|
|
owner: str,
|
|
repo: str,
|
|
title: str,
|
|
body: str,
|
|
labels: list[int] | None = None,
|
|
) -> CreatedIssue:
|
|
"""Create a Gitea issue."""
|
|
payload: dict[str, object] = {"title": title, "body": body, "labels": labels or []}
|
|
response = self._request(
|
|
"POST",
|
|
f"/api/v1/repos/{owner}/{repo}/issues",
|
|
expected_statuses={EXPECTED_CREATED},
|
|
json=payload,
|
|
)
|
|
data = response.json()
|
|
return CreatedIssue(
|
|
number=_optional_int(data.get("number")),
|
|
html_url=_optional_str(data.get("html_url")),
|
|
title=str(data.get("title", title)),
|
|
)
|
|
|
|
def resolve_label_ids(self, *, owner: str, repo: str, labels: list[str]) -> list[int]:
|
|
"""Resolve label names to Gitea label IDs."""
|
|
if not labels:
|
|
return []
|
|
|
|
available_labels: dict[str, int] = {}
|
|
page = 1
|
|
while True:
|
|
response = self._request(
|
|
"GET",
|
|
f"/api/v1/repos/{owner}/{repo}/labels",
|
|
params={"page": page, "limit": DEFAULT_PAGE_SIZE},
|
|
)
|
|
batch = response.json()
|
|
if not batch:
|
|
break
|
|
for label in batch:
|
|
label_name = str(label.get("name", ""))
|
|
label_id = _optional_int(label.get("id"))
|
|
if label_name and label_id is not None:
|
|
available_labels[label_name] = label_id
|
|
if len(batch) < DEFAULT_PAGE_SIZE:
|
|
break
|
|
page += 1
|
|
|
|
missing = [label for label in labels if label not in available_labels]
|
|
if missing:
|
|
missing_names = ", ".join(sorted(missing))
|
|
msg = f"Missing Gitea labels: {missing_names}"
|
|
raise GiteaError(msg)
|
|
|
|
return [available_labels[label] for label in labels]
|
|
|
|
def list_open_pull_requests(
|
|
self,
|
|
*,
|
|
owner: str,
|
|
repo: str,
|
|
labels: list[str] | None = None,
|
|
head: str | None = None,
|
|
) -> list[PullRequest]:
|
|
"""List open pull requests for a repository."""
|
|
expected_labels = set(labels or [])
|
|
pull_requests: list[PullRequest] = []
|
|
page = 1
|
|
while True:
|
|
response = self._request(
|
|
"GET",
|
|
f"/api/v1/repos/{owner}/{repo}/pulls",
|
|
params={"state": "open", "page": page, "limit": DEFAULT_PAGE_SIZE},
|
|
)
|
|
batch = response.json()
|
|
if not batch:
|
|
break
|
|
|
|
for item in batch:
|
|
pull_request = _pull_request_from_api(item)
|
|
if head and pull_request.head_branch != head:
|
|
continue
|
|
if expected_labels and not expected_labels.issubset(set(pull_request.labels)):
|
|
continue
|
|
pull_requests.append(pull_request)
|
|
|
|
if len(batch) < DEFAULT_PAGE_SIZE:
|
|
break
|
|
page += 1
|
|
|
|
return pull_requests
|
|
|
|
def create_pull_request(
|
|
self,
|
|
*,
|
|
owner: str,
|
|
repo: str,
|
|
title: str,
|
|
body: str,
|
|
head: str,
|
|
base: str,
|
|
labels: list[str] | None = None,
|
|
) -> PullRequest:
|
|
"""Create a pull request."""
|
|
payload: dict[str, object] = {
|
|
"title": title,
|
|
"body": body,
|
|
"head": head,
|
|
"base": base,
|
|
}
|
|
if labels:
|
|
payload["labels"] = self.resolve_label_ids(owner=owner, repo=repo, labels=labels)
|
|
|
|
response = self._request(
|
|
"POST",
|
|
f"/api/v1/repos/{owner}/{repo}/pulls",
|
|
expected_statuses={EXPECTED_CREATED},
|
|
json=payload,
|
|
)
|
|
return _pull_request_from_api(response.json())
|
|
|
|
def merge_pull_request(
|
|
self,
|
|
*,
|
|
owner: str,
|
|
repo: str,
|
|
number: int,
|
|
merge_method: str = "rebase",
|
|
head_commit_id: str | None = None,
|
|
delete_branch_after_merge: bool = False,
|
|
) -> None:
|
|
"""Merge a pull request."""
|
|
payload: dict[str, object] = {
|
|
"Do": merge_method,
|
|
"delete_branch_after_merge": delete_branch_after_merge,
|
|
}
|
|
if head_commit_id:
|
|
payload["head_commit_id"] = head_commit_id
|
|
|
|
self._request(
|
|
"POST",
|
|
f"/api/v1/repos/{owner}/{repo}/pulls/{number}/merge",
|
|
json=payload,
|
|
)
|
|
|
|
def list_run_jobs(self, *, owner: str, repo: str, run_id: str | int) -> list[WorkflowJob]:
|
|
"""List workflow jobs for a specific run."""
|
|
jobs: list[WorkflowJob] = []
|
|
page = 1
|
|
while True:
|
|
response = self._request(
|
|
"GET",
|
|
f"/api/v1/repos/{owner}/{repo}/actions/jobs",
|
|
params={"page": page, "limit": DEFAULT_PAGE_SIZE},
|
|
)
|
|
payload = response.json()
|
|
batch = payload.get("jobs", [])
|
|
if not batch:
|
|
break
|
|
|
|
for item in batch:
|
|
if str(item.get("run_id")) != str(run_id):
|
|
continue
|
|
jobs.append(_workflow_job_from_api(item))
|
|
|
|
if len(batch) < DEFAULT_PAGE_SIZE:
|
|
break
|
|
page += 1
|
|
|
|
return jobs
|
|
|
|
def download_job_logs(self, *, owner: str, repo: str, job_id: int) -> str:
|
|
"""Download logs for a workflow job."""
|
|
response = self._request(
|
|
"GET",
|
|
f"/api/v1/repos/{owner}/{repo}/actions/jobs/{job_id}/logs",
|
|
)
|
|
return response.text
|
|
|
|
def close(self) -> None:
|
|
"""Close the underlying HTTP client."""
|
|
self._client.close()
|
|
|
|
def __enter__(self) -> Self:
|
|
"""Enter the context manager."""
|
|
return self
|
|
|
|
def __exit__(self, *args: object) -> None:
|
|
"""Close the HTTP client."""
|
|
self.close()
|
|
|
|
def _request(
|
|
self,
|
|
method: str,
|
|
path: str,
|
|
*,
|
|
expected_statuses: set[int] | None = None,
|
|
**kwargs: object,
|
|
) -> httpx.Response:
|
|
"""Send an HTTP request and validate the response status."""
|
|
response = self._client.request(method, path, **kwargs)
|
|
statuses = expected_statuses or {EXPECTED_OK}
|
|
if response.status_code not in statuses:
|
|
msg = f"Gitea request failed ({response.status_code}): {response.text}"
|
|
raise GiteaError(msg)
|
|
return response
|
|
|
|
|
|
def _pull_request_from_api(data: dict[str, object]) -> PullRequest:
|
|
"""Convert Gitea API pull-request data into a dataclass."""
|
|
number = _optional_int(data.get("number")) or _optional_int(data.get("index"))
|
|
if number is None:
|
|
msg = "Gitea pull request payload is missing a number"
|
|
raise GiteaError(msg)
|
|
|
|
labels = tuple(str(label.get("name", "")) for label in data.get("labels", []))
|
|
head = data.get("head", {})
|
|
base = data.get("base", {})
|
|
return PullRequest(
|
|
number=number,
|
|
title=str(data.get("title", "")),
|
|
html_url=_optional_str(data.get("html_url")),
|
|
labels=tuple(label for label in labels if label),
|
|
head_branch=_optional_str(head.get("ref")) or _optional_str(data.get("head_branch")),
|
|
base_branch=_optional_str(base.get("ref")) or _optional_str(data.get("base_branch")),
|
|
)
|
|
|
|
|
|
def _workflow_job_from_api(data: dict[str, object]) -> WorkflowJob:
|
|
"""Convert Gitea API workflow-job data into a dataclass."""
|
|
job_id = _optional_int(data.get("id"))
|
|
if job_id is None:
|
|
msg = "Gitea workflow job payload is missing an ID"
|
|
raise GiteaError(msg)
|
|
|
|
return WorkflowJob(
|
|
id=job_id,
|
|
name=str(data.get("name", "")),
|
|
run_id=_optional_int(data.get("run_id")),
|
|
status=_optional_str(data.get("status")),
|
|
conclusion=_optional_str(data.get("conclusion")),
|
|
)
|
|
|
|
|
|
def _optional_int(value: object) -> int | None:
|
|
"""Convert an API value to an integer when present."""
|
|
if value is None:
|
|
return None
|
|
return int(value)
|
|
|
|
|
|
def _optional_str(value: object) -> str | None:
|
|
"""Convert an API value to a string when present."""
|
|
if value is None:
|
|
return None
|
|
return str(value)
|