"""Small Gitea API client for repository automation.""" from __future__ import annotations from dataclasses import dataclass from typing import Self from urllib.parse import quote import httpx DEFAULT_PAGE_SIZE = 100 EXPECTED_NO_CONTENT = 204 EXPECTED_CREATED = 201 EXPECTED_OK = 200 @dataclass(frozen=True) class CreatedIssue: """Issue data returned by Gitea.""" number: int | None html_url: str | None title: str @dataclass(frozen=True) class PullRequest: """Pull request data returned by Gitea.""" number: int title: str html_url: str | None labels: tuple[str, ...] head_branch: str | None base_branch: str | None @dataclass(frozen=True) class WorkflowJob: """Workflow job data returned by Gitea Actions.""" id: int name: str run_id: int | None status: str | None conclusion: str | None class GiteaError(RuntimeError): """Raised when Gitea rejects an API request.""" def split_repo_name(repo: str) -> tuple[str, str]: """Split an owner/repo string into its parts.""" owner, separator, repo_name = repo.partition("/") if not separator or not owner or not repo_name: msg = f"Invalid repository name: {repo}" raise ValueError(msg) return owner, repo_name class GiteaClient: """HTTP client for the subset of Gitea APIs used in this repository.""" def __init__( self, *, base_url: str, token: str, timeout: int = 30, transport: httpx.BaseTransport | None = None, ) -> None: """Initialize the Gitea client.""" self._client = httpx.Client( base_url=base_url.rstrip("/"), timeout=timeout, headers={"Authorization": f"token {token}"}, transport=transport, ) def create_issue( self, *, owner: str, repo: str, title: str, body: str, labels: list[int] | None = None, ) -> CreatedIssue: """Create a Gitea issue.""" payload: dict[str, object] = {"title": title, "body": body, "labels": labels or []} response = self._request( "POST", f"/api/v1/repos/{owner}/{repo}/issues", expected_statuses={EXPECTED_CREATED}, json=payload, ) data = response.json() return CreatedIssue( number=_optional_int(data.get("number")), html_url=_optional_str(data.get("html_url")), title=str(data.get("title", title)), ) def resolve_label_ids(self, *, owner: str, repo: str, labels: list[str]) -> list[int]: """Resolve label names to Gitea label IDs.""" if not labels: return [] available_labels: dict[str, int] = {} page = 1 while True: response = self._request( "GET", f"/api/v1/repos/{owner}/{repo}/labels", params={"page": page, "limit": DEFAULT_PAGE_SIZE}, ) batch = response.json() if not batch: break for label in batch: label_name = str(label.get("name", "")) label_id = _optional_int(label.get("id")) if label_name and label_id is not None: available_labels[label_name] = label_id if len(batch) < DEFAULT_PAGE_SIZE: break page += 1 missing = [label for label in labels if label not in available_labels] if missing: missing_names = ", ".join(sorted(missing)) msg = f"Missing Gitea labels: {missing_names}" raise GiteaError(msg) return [available_labels[label] for label in labels] def list_open_pull_requests( self, *, owner: str, repo: str, labels: list[str] | None = None, head: str | None = None, ) -> list[PullRequest]: """List open pull requests for a repository.""" expected_labels = set(labels or []) pull_requests: list[PullRequest] = [] page = 1 while True: response = self._request( "GET", f"/api/v1/repos/{owner}/{repo}/pulls", params={"state": "open", "page": page, "limit": DEFAULT_PAGE_SIZE}, ) batch = response.json() if not batch: break for item in batch: pull_request = _pull_request_from_api(item) if head and pull_request.head_branch != head: continue if expected_labels and not expected_labels.issubset(set(pull_request.labels)): continue pull_requests.append(pull_request) if len(batch) < DEFAULT_PAGE_SIZE: break page += 1 return pull_requests def create_pull_request( self, *, owner: str, repo: str, title: str, body: str, head: str, base: str, labels: list[str] | None = None, ) -> PullRequest: """Create a pull request.""" payload: dict[str, object] = { "title": title, "body": body, "head": head, "base": base, } if labels: payload["labels"] = self.resolve_label_ids(owner=owner, repo=repo, labels=labels) response = self._request( "POST", f"/api/v1/repos/{owner}/{repo}/pulls", expected_statuses={EXPECTED_CREATED}, json=payload, ) return _pull_request_from_api(response.json()) def merge_pull_request( self, *, owner: str, repo: str, number: int, merge_method: str = "rebase", head_commit_id: str | None = None, delete_branch_after_merge: bool = False, ) -> None: """Merge a pull request.""" payload: dict[str, object] = { "Do": merge_method, "delete_branch_after_merge": delete_branch_after_merge, } if head_commit_id: payload["head_commit_id"] = head_commit_id self._request( "POST", f"/api/v1/repos/{owner}/{repo}/pulls/{number}/merge", json=payload, ) def dispatch_workflow(self, *, owner: str, repo: str, workflow_id: str, ref: str) -> None: """Trigger a workflow_dispatch run.""" workflow_path = quote(workflow_id, safe="") self._request( "POST", f"/api/v1/repos/{owner}/{repo}/actions/workflows/{workflow_path}/dispatches", expected_statuses={EXPECTED_OK, EXPECTED_NO_CONTENT}, json={"ref": ref}, ) def list_run_jobs(self, *, owner: str, repo: str, run_id: str | int) -> list[WorkflowJob]: """List workflow jobs for a specific run.""" jobs: list[WorkflowJob] = [] page = 1 while True: response = self._request( "GET", f"/api/v1/repos/{owner}/{repo}/actions/jobs", params={"page": page, "limit": DEFAULT_PAGE_SIZE}, ) payload = response.json() batch = payload.get("jobs", []) if not batch: break for item in batch: if str(item.get("run_id")) != str(run_id): continue jobs.append(_workflow_job_from_api(item)) if len(batch) < DEFAULT_PAGE_SIZE: break page += 1 return jobs def download_job_logs(self, *, owner: str, repo: str, job_id: int) -> str: """Download logs for a workflow job.""" response = self._request( "GET", f"/api/v1/repos/{owner}/{repo}/actions/jobs/{job_id}/logs", ) return response.text def close(self) -> None: """Close the underlying HTTP client.""" self._client.close() def __enter__(self) -> Self: """Enter the context manager.""" return self def __exit__(self, *args: object) -> None: """Close the HTTP client.""" self.close() def _request( self, method: str, path: str, *, expected_statuses: set[int] | None = None, **kwargs: object, ) -> httpx.Response: """Send an HTTP request and validate the response status.""" response = self._client.request(method, path, **kwargs) statuses = expected_statuses or {EXPECTED_OK} if response.status_code not in statuses: msg = f"Gitea request failed ({response.status_code}): {response.text}" raise GiteaError(msg) return response def _pull_request_from_api(data: dict[str, object]) -> PullRequest: """Convert Gitea API pull-request data into a dataclass.""" number = _optional_int(data.get("number")) or _optional_int(data.get("index")) if number is None: msg = "Gitea pull request payload is missing a number" raise GiteaError(msg) labels = tuple(str(label.get("name", "")) for label in data.get("labels", [])) head = data.get("head", {}) base = data.get("base", {}) return PullRequest( number=number, title=str(data.get("title", "")), html_url=_optional_str(data.get("html_url")), labels=tuple(label for label in labels if label), head_branch=_optional_str(head.get("ref")) or _optional_str(data.get("head_branch")), base_branch=_optional_str(base.get("ref")) or _optional_str(data.get("base_branch")), ) def _workflow_job_from_api(data: dict[str, object]) -> WorkflowJob: """Convert Gitea API workflow-job data into a dataclass.""" job_id = _optional_int(data.get("id")) if job_id is None: msg = "Gitea workflow job payload is missing an ID" raise GiteaError(msg) return WorkflowJob( id=job_id, name=str(data.get("name", "")), run_id=_optional_int(data.get("run_id")), status=_optional_str(data.get("status")), conclusion=_optional_str(data.get("conclusion")), ) def _optional_int(value: object) -> int | None: """Convert an API value to an integer when present.""" if value is None: return None return int(value) def _optional_str(value: object) -> str | None: """Convert an API value to a string when present.""" if value is None: return None return str(value)