import os import time import pytest import requests import json import re import xml.etree.ElementTree as ET from pathlib import Path import base64 IS_TEST_RUN = False def vprint(*args, **kwargs): if IS_TEST_RUN or os.environ.get("AUTOGITS_PRINT_FIXTURES") == "1": print(*args, **kwargs) class GiteaAPIClient: def __init__(self, base_url, token, sudo=None): self.base_url = base_url self.headers = {"Authorization": f"token {token}", "Content-Type": "application/json"} if sudo: self.headers["Sudo"] = sudo self._cache = {} self.use_cache = False def _request(self, method, path, **kwargs): # Very basic cache for GET requests to speed up setup cache_key = (method, path, json.dumps(kwargs, sort_keys=True)) if self.use_cache and method == "GET" and cache_key in self._cache: return self._cache[cache_key], 0.0 url = f"{self.base_url}/api/v1/{path}" start_time = time.time() try: response = requests.request(method, url, headers=self.headers, **kwargs) duration = time.time() - start_time response.raise_for_status() if self.use_cache: if method == "GET": self._cache[cache_key] = response else: self._cache.clear() return response, duration except requests.exceptions.HTTPError as e: duration = time.time() - start_time vprint(f"[{duration:.3f}s] HTTPError in _request: {e}") vprint(f"Response Content: {e.response.text}") raise except requests.exceptions.RequestException as e: duration = time.time() - start_time vprint(f"[{duration:.3f}s] Request failed: {e}") raise def get_file_info(self, owner: str, repo: str, file_path: str, branch: str = "main"): url = f"repos/{owner}/{repo}/contents/{file_path}" if branch and branch != "main": url += f"?ref={branch}" try: response, duration = self._request("GET", url) return response.json() except requests.exceptions.HTTPError as e: if e.response.status_code == 404: return None raise def create_user(self, username, password, email): vprint(f"--- Creating user: {username} ---") data = { "username": username, "password": password, "email": email, "must_change_password": False, "send_notify": False } try: response, duration = self._request("POST", "admin/users", json=data) vprint(f"[{duration:.3f}s] User '{username}' created.") return True except requests.exceptions.HTTPError as e: if e.response.status_code == 422: # Already exists vprint(f"User '{username}' already exists. Updating password...") # Update password to be sure it matches our expectation response, duration = self._request("PATCH", f"admin/users/{username}", json={"password": password, "login_name": username}) return False else: raise def get_user_token(self, username, password, token_name="test-token"): vprint(f"--- Getting token for user: {username} ---") url = f"{self.base_url}/api/v1/users/{username}/tokens" # Create new token using Basic Auth response = requests.post(url, auth=(username, password), json={"name": token_name}) if response.status_code == 201: return response.json()["sha1"] response.raise_for_status() def create_org(self, org_name): vprint(f"--- Checking organization: {org_name} ---") try: response, duration = self._request("GET", f"orgs/{org_name}") vprint(f"[{duration:.3f}s] Organization '{org_name}' already exists.") return False except requests.exceptions.HTTPError as e: if e.response.status_code == 404: vprint(f"Creating organization '{org_name}'...") data = {"username": org_name, "full_name": org_name} response, duration = self._request("POST", "orgs", json=data) vprint(f"[{duration:.3f}s] Organization '{org_name}' created.") return True else: raise def create_repo(self, org_name, repo_name): vprint(f"--- Checking repository: {org_name}/{repo_name} ---") try: response, duration = self._request("GET", f"repos/{org_name}/{repo_name}") vprint(f"[{duration:.3f}s] Repository '{org_name}/{repo_name}' already exists.") return False except requests.exceptions.HTTPError as e: if e.response.status_code == 404: vprint(f"Creating repository '{org_name}/{repo_name}'...") data = { "name": repo_name, "auto_init": True, "default_branch": "main", "gitignores": "Go", "license": "MIT", "private": False, "readme": "Default", "object_format_name": "sha256" } response, duration = self._request("POST", f"orgs/{org_name}/repos", json=data) vprint(f"[{duration:.3f}s] Repository '{org_name}/{repo_name}' created with a README.") time.sleep(0.1) # Added delay to allow Git operations to become available return True else: raise def add_collaborator(self, org_name, repo_name, collaborator_name, permission="write"): vprint(f"--- Adding {collaborator_name} as a collaborator to {org_name}/{repo_name} with '{permission}' permission ---") # Check if already a collaborator to provide accurate stats try: self._request("GET", f"repos/{org_name}/{repo_name}/collaborators/{collaborator_name}") vprint(f"{collaborator_name} is already a collaborator of {org_name}/{repo_name}.") return False except requests.exceptions.HTTPError as e: if e.response.status_code != 404: raise data = {"permission": permission} # Gitea API returns 204 No Content on success and doesn't fail if already present. response, duration = self._request("PUT", f"repos/{org_name}/{repo_name}/collaborators/{collaborator_name}", json=data) vprint(f"[{duration:.3f}s] Added {collaborator_name} to {org_name}/{repo_name}.") return True def add_submodules(self, org_name, repo_name): vprint(f"--- Adding submodules to {org_name}/{repo_name} using diffpatch ---") parent_repo_path = f"repos/{org_name}/{repo_name}" try: response, duration = self._request("GET", f"{parent_repo_path}/contents/.gitmodules") vprint(f"[{duration:.3f}s] Submodules appear to be already added. Skipping.") return except requests.exceptions.HTTPError as e: if e.response.status_code != 404: raise # Get latest commit SHAs for the submodules response_a, duration_a = self._request("GET", "repos/mypool/pkgA/branches/main") pkg_a_sha = response_a.json()["commit"]["id"] response_b, duration_b = self._request("GET", "repos/mypool/pkgB/branches/main") pkg_b_sha = response_b.json()["commit"]["id"] if not pkg_a_sha or not pkg_b_sha: raise Exception("Error: Could not get submodule commit SHAs. Cannot apply patch.") diff_content = f"""diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 00000000..f1838bd9 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,6 @@ +[submodule "pkgA"] + path = pkgA + url = ../../mypool/pkgA.git +[submodule "pkgB"] + path = pkgB + url = ../../mypool/pkgB.git diff --git a/pkgA b/pkgA new file mode 160000 index 00000000..{pkg_a_sha} --- /dev/null +++ b/pkgA @@ -0,0 +1 @@ +Subproject commit {pkg_a_sha} diff --git a/pkgB b/pkgB new file mode 160000 index 00000000..{pkg_b_sha} --- /dev/null +++ b/pkgB @@ -0,0 +1 @@ +Subproject commit {pkg_b_sha} """ message = "Add pkgA and pkgB as submodules" data = { "branch": "main", "content": diff_content, "message": message } vprint(f"Applying submodule patch to {org_name}/{repo_name}...") response, duration = self._request("POST", f"{parent_repo_path}/diffpatch", json=data) vprint(f"[{duration:.3f}s] Submodule patch applied.") def update_repo_settings(self, org_name, repo_name): vprint(f"--- Updating repository settings for: {org_name}/{repo_name} ---") response, duration = self._request("GET", f"repos/{org_name}/{repo_name}") repo_data = response.json() # Ensure these are boolean values, not string repo_data["allow_manual_merge"] = True repo_data["autodetect_manual_merge"] = True response, duration = self._request("PATCH", f"repos/{org_name}/{repo_name}", json=repo_data) vprint(f"[{duration:.3f}s] Repository settings for '{org_name}/{repo_name}' updated.") def create_webhook(self, owner: str, repo: str, target_url: str): vprint(f"--- Checking webhook for {owner}/{repo} -> {target_url} ---") url = f"repos/{owner}/{repo}/hooks" try: response, duration = self._request("GET", url) hooks = response.json() for hook in hooks: if hook["config"]["url"] == target_url: vprint(f"Webhook for {owner}/{repo} already exists with correct URL.") return False elif "gitea-publisher" in hook["config"]["url"] or "10.89.0." in hook["config"]["url"]: vprint(f"Found old webhook {hook['id']} with URL {hook['config']['url']}. Deleting...") self._request("DELETE", f"{url}/{hook['id']}") except requests.exceptions.HTTPError: pass vprint(f"--- Creating webhook for {owner}/{repo} -> {target_url} ---") data = { "type": "gitea", "config": { "url": target_url, "content_type": "json" }, "events": ["push", "pull_request", "pull_request_review", "issue_comment"], "active": True } response, duration = self._request("POST", url, json=data) vprint(f"[{duration:.3f}s] Webhook created for {owner}/{repo}.") return True def create_label(self, owner: str, repo: str, name: str, color: str = "#abcdef"): vprint(f"--- Checking label '{name}' in {owner}/{repo} ---") url = f"repos/{owner}/{repo}/labels" # Check if label exists first try: response, duration = self._request("GET", url) labels = response.json() for label in labels: if label["name"] == name: vprint(f"Label '{name}' already exists in {owner}/{repo}.") return False except requests.exceptions.HTTPError: pass vprint(f"--- Creating label '{name}' in {owner}/{repo} ---") data = { "name": name, "color": color } try: response, duration = self._request("POST", url, json=data) vprint(f"[{duration:.3f}s] Label '{name}' created.") return True except requests.exceptions.HTTPError as e: if e.response.status_code == 422: # Already exists (race condition or other reason) vprint(f"Label '{name}' already exists.") return False else: raise def create_file(self, owner: str, repo: str, file_path: str, content: str, branch: str = "main", message: str = "Add file"): file_info = self.get_file_info(owner, repo, file_path, branch=branch) data = { "content": base64.b64encode(content.encode('utf-8')).decode('ascii'), "branch": branch, "message": message } if file_info: vprint(f"--- Updating file {file_path} in {owner}/{repo} ---") # Re-fetch file_info to get the latest SHA right before update latest_file_info = self.get_file_info(owner, repo, file_path, branch=branch) if not latest_file_info: raise Exception(f"File {file_path} disappeared during update attempt.") data["sha"] = latest_file_info["sha"] data["message"] = f"Update {file_path}" method = "PUT" else: vprint(f"--- Creating file {file_path} in {owner}/{repo} ---") method = "POST" url = f"repos/{owner}/{repo}/contents/{file_path}" response, duration = self._request(method, url, json=data) vprint(f"[{duration:.3f}s] File {file_path} {'updated' if file_info else 'created'} in {owner}/{repo}.") def create_gitea_pr(self, repo_full_name: str, diff_content: str, title: str, use_fork: bool, base_branch: str = "main", body: str = ""): owner, repo = repo_full_name.split("/") head_owner, head_repo = owner, repo new_branch_name = f"pr-branch-{int(time.time()*1000)}" if use_fork: sudo_user = self.headers.get("Sudo") head_owner = sudo_user head_repo = repo vprint(f"--- Forking {repo_full_name} ---") try: response, duration = self._request("POST", f"repos/{owner}/{repo}/forks", json={}) vprint(f"[{duration:.3f}s] --- Forked to {head_owner}/{head_repo} ---") time.sleep(0.5) # Give more time for fork to be ready except requests.exceptions.HTTPError as e: if e.response.status_code == 409: # Already forked vprint(f"--- Already forked to {head_owner}/{head_repo} ---") else: raise # Apply the diff using diffpatch and create the new branch automatically vprint(f"--- Applying diff to {head_owner}/{head_repo} from {base_branch} to new branch {new_branch_name} ---") response, duration = self._request("POST", f"repos/{head_owner}/{head_repo}/diffpatch", json={ "branch": base_branch, "new_branch": new_branch_name, "content": diff_content, "message": title }) # Now create the PR in the ORIGINAL repo data = { "head": f"{head_owner}:{new_branch_name}" if head_owner != owner else new_branch_name, "base": base_branch, "title": title, "body": body, "allow_maintainer_edit": True } vprint(f"--- Creating PR in {repo_full_name} from {data['head']} ---") response, duration = self._request("POST", f"repos/{owner}/{repo}/pulls", json=data) return response.json() def create_branch(self, owner: str, repo: str, new_branch_name: str, old_ref: str): vprint(f"--- Checking branch '{new_branch_name}' in {owner}/{repo} ---") try: response, duration = self._request("GET", f"repos/{owner}/{repo}/branches/{new_branch_name}") vprint(f"[{duration:.3f}s] Branch '{new_branch_name}' already exists.") return False except requests.exceptions.HTTPError as e: if e.response.status_code != 404: raise # Re-raise other HTTP errors vprint(f"--- Creating branch '{new_branch_name}' in {owner}/{repo} from {old_ref} ---") url = f"repos/{owner}/{repo}/branches" data = { "new_branch_name": new_branch_name, "old_ref": old_ref } response, duration = self._request("POST", url, json=data) vprint(f"[{duration:.3f}s] Branch '{new_branch_name}' created in {owner}/{repo}.") return True def ensure_branch_exists(self, owner: str, repo: str, branch: str = "main", timeout: int = 10): vprint(f"--- Ensuring branch '{branch}' exists in {owner}/{repo} ---") start_time = time.time() while time.time() - start_time < timeout: try: response, duration = self._request("GET", f"repos/{owner}/{repo}/branches/{branch}") vprint(f"[{duration:.3f}s] Branch '{branch}' confirmed in {owner}/{repo}.") return except requests.exceptions.HTTPError as e: if e.response.status_code == 404: vprint(f"Branch '{branch}' not found yet in {owner}/{repo}. Retrying...") time.sleep(1) continue raise raise Exception(f"Timeout waiting for branch {branch} in {owner}/{repo}") def modify_gitea_pr(self, repo_full_name: str, pr_number: int, diff_content: str, message: str): owner, repo = repo_full_name.split("/") # Get PR details to find the head branch AND head repo response, duration = self._request("GET", f"repos/{owner}/{repo}/pulls/{pr_number}") pr_details = response.json() head_branch = pr_details["head"]["ref"] head_repo_owner = pr_details["head"]["repo"]["owner"]["login"] head_repo_name = pr_details["head"]["repo"]["name"] # Apply the diff using diffpatch vprint(f"--- Modifying PR #{pr_number} in {head_repo_owner}/{head_repo_name} branch {head_branch} ---") response, duration = self._request("POST", f"repos/{head_repo_owner}/{head_repo_name}/diffpatch", json={ "branch": head_branch, "content": diff_content, "message": message }) def update_gitea_pr_properties(self, repo_full_name: str, pr_number: int, **kwargs): owner, repo = repo_full_name.split("/") url = f"repos/{owner}/{repo}/pulls/{pr_number}" response, duration = self._request("PATCH", url, json=kwargs) return response.json() def create_issue_comment(self, repo_full_name: str, issue_number: int, body: str): owner, repo = repo_full_name.split("/") url = f"repos/{owner}/{repo}/issues/{issue_number}/comments" data = {"body": body} vprint(f"--- Creating comment on {repo_full_name} issue #{issue_number} ---") response, duration = self._request("POST", url, json=data) return response.json() def get_timeline_events(self, repo_full_name: str, pr_number: int): owner, repo = repo_full_name.split("/") url = f"repos/{owner}/{repo}/issues/{pr_number}/timeline" # Retry logic for timeline events for i in range(10): # Try up to 10 times try: response, duration = self._request("GET", url) timeline_events = response.json() if timeline_events: # Check if timeline_events list is not empty return timeline_events vprint(f"Attempt {i+1}: Timeline for PR {pr_number} is empty. Retrying in 1 seconds...") time.sleep(1) except requests.exceptions.HTTPError as e: if e.response.status_code == 404: vprint(f"Attempt {i+1}: Timeline for PR {pr_number} not found yet. Retrying in 1 seconds...") time.sleep(1) else: raise # Re-raise other HTTP errors raise Exception(f"Failed to retrieve timeline for PR {pr_number} after multiple retries.") def get_comments(self, repo_full_name: str, pr_number: int): owner, repo = repo_full_name.split("/") url = f"repos/{owner}/{repo}/issues/{pr_number}/comments" # Retry logic for comments for i in range(10): # Try up to 10 times try: response, duration = self._request("GET", url) comments = response.json() vprint(f"[{duration:.3f}s] Attempt {i+1}: Comments for PR {pr_number} received: {comments}") # Added debug print if comments: # Check if comments list is not empty return comments vprint(f"Attempt {i+1}: Comments for PR {pr_number} are empty. Retrying in 1 seconds...") time.sleep(1) except requests.exceptions.HTTPError as e: if e.response.status_code == 404: vprint(f"Attempt {i+1}: Comments for PR {pr_number} not found yet. Retrying in 1 seconds...") time.sleep(1) else: raise # Re-raise other HTTP errors raise Exception(f"Failed to retrieve comments for PR {pr_number} after multiple retries.") def get_pr_details(self, repo_full_name: str, pr_number: int): owner, repo = repo_full_name.split("/") url = f"repos/{owner}/{repo}/pulls/{pr_number}" response, duration = self._request("GET", url) return response.json() def create_review(self, repo_full_name: str, pr_number: int, event: str = "APPROVED", body: str = "LGTM"): owner, repo = repo_full_name.split("/") # Check if this user already has an APPROVED review to avoid 422 current_user = self.headers.get("Sudo") or "admin" # simplified existing_reviews = self.list_reviews(repo_full_name, pr_number) for r in existing_reviews: if r["user"]["login"] == current_user and r["state"] == "APPROVED" and event == "APPROVED": vprint(f"User {current_user} already has an APPROVED review for {repo_full_name} PR #{pr_number}") return r url = f"repos/{owner}/{repo}/pulls/{pr_number}/reviews" data = { "event": event, "body": body } vprint(f"--- Creating and submitting review ({event}) for {repo_full_name} PR #{pr_number} as {current_user} ---") try: response, duration = self._request("POST", url, json=data) review = response.json() except requests.exceptions.HTTPError as e: # If it fails with 422, it might be because a review is already pending or something else vprint(f"Failed to create review: {e.response.text}") # Try to find a pending review to submit existing_reviews = self.list_reviews(repo_full_name, pr_number) pending_review = next((r for r in existing_reviews if r["user"]["login"] == current_user and r["state"] == "PENDING"), None) if pending_review: review = pending_review else: raise # If the state is PENDING, we submit it. if review.get("state") == "PENDING": review_id = review["id"] submit_url = f"repos/{owner}/{repo}/pulls/{pr_number}/reviews/{review_id}" submit_data = { "event": event, "body": body } try: response, duration = self._request("POST", submit_url, json=submit_data) vprint(f"[{duration:.3f}s] --- Review {review_id} submitted ---") except requests.exceptions.HTTPError as e: if "already" in e.response.text.lower() or "stay pending" in e.response.text.lower(): vprint(f"Review {review_id} could not be submitted further: {e.response.text}") else: raise return review def list_reviews(self, repo_full_name: str, pr_number: int): owner, repo = repo_full_name.split("/") url = f"repos/{owner}/{repo}/pulls/{pr_number}/reviews" response, duration = self._request("GET", url) return response.json() def approve_requested_reviews(self, repo_full_name: str, pr_number: int): vprint(f"--- Checking for REQUEST_REVIEW state in {repo_full_name} PR #{pr_number} ---") reviews = self.list_reviews(repo_full_name, pr_number) requested_reviews = [r for r in reviews if r["state"] == "REQUEST_REVIEW"] if not requested_reviews: vprint(f"No reviews in REQUEST_REVIEW state found for {repo_full_name} PR #{pr_number}") return admin_token = self.headers["Authorization"].split(" ")[1] for r in requested_reviews: reviewer_username = r["user"]["login"] vprint(f"Reacting on REQUEST_REVIEW for user {reviewer_username} by approving...") reviewer_client = GiteaAPIClient(base_url=self.base_url, token=admin_token, sudo=reviewer_username) time.sleep(1) # give a chance to avoid possible concurrency issues with reviews request/approval reviewer_client.create_review(repo_full_name, pr_number, event="APPROVED", body="Approving requested review") def wait_for_project_pr(self, package_pr_repo, package_pr_number, project_pr_repo="myproducts/mySLFO", timeout=60): vprint(f"Polling {package_pr_repo} PR #{package_pr_number} timeline for forwarded PR event in {project_pr_repo}...") for _ in range(timeout): time.sleep(1) timeline_events = self.get_timeline_events(package_pr_repo, package_pr_number) for event in timeline_events: if event.get("type") == "pull_ref": if not (ref_issue := event.get("ref_issue")): continue url_to_check = ref_issue.get("html_url", "") match = re.search(fr"{project_pr_repo}/pulls/(\d+)", url_to_check) if match: return int(match.group(1)) return None def approve_and_wait_merge(self, package_pr_repo, package_pr_number, project_pr_number, project_pr_repo="myproducts/mySLFO", timeout=30): vprint(f"Approving reviews and verifying both PRs are merged ({package_pr_repo}#{package_pr_number} and {project_pr_repo}#{project_pr_number})...") package_merged = False project_merged = False for i in range(timeout): self.approve_requested_reviews(package_pr_repo, package_pr_number) self.approve_requested_reviews(project_pr_repo, project_pr_number) if not package_merged: pkg_details = self.get_pr_details(package_pr_repo, package_pr_number) if pkg_details.get("merged"): package_merged = True vprint(f"Package PR {package_pr_repo}#{package_pr_number} merged.") if not project_merged: prj_details = self.get_pr_details(project_pr_repo, project_pr_number) if prj_details.get("merged"): project_merged = True vprint(f"Project PR {project_pr_repo}#{project_pr_number} merged.") if package_merged and project_merged: return True, True time.sleep(1) return package_merged, project_merged