forked from git-workflow/autogits
- add "object_format_name": "sha256" to api in create_repo() - update add_submodules() and diff to use sha256 style
593 lines
27 KiB
Python
593 lines
27 KiB
Python
import os
|
|
import time
|
|
import pytest
|
|
import requests
|
|
import json
|
|
import re
|
|
import xml.etree.ElementTree as ET
|
|
from pathlib import Path
|
|
import base64
|
|
|
|
IS_TEST_RUN = False
|
|
|
|
def vprint(*args, **kwargs):
|
|
if IS_TEST_RUN or os.environ.get("AUTOGITS_PRINT_FIXTURES") == "1":
|
|
print(*args, **kwargs)
|
|
|
|
class GiteaAPIClient:
|
|
def __init__(self, base_url, token, sudo=None):
|
|
self.base_url = base_url
|
|
self.headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
|
|
if sudo:
|
|
self.headers["Sudo"] = sudo
|
|
self._cache = {}
|
|
self.use_cache = False
|
|
|
|
def _request(self, method, path, **kwargs):
|
|
# Very basic cache for GET requests to speed up setup
|
|
cache_key = (method, path, json.dumps(kwargs, sort_keys=True))
|
|
if self.use_cache and method == "GET" and cache_key in self._cache:
|
|
return self._cache[cache_key], 0.0
|
|
|
|
url = f"{self.base_url}/api/v1/{path}"
|
|
start_time = time.time()
|
|
try:
|
|
response = requests.request(method, url, headers=self.headers, **kwargs)
|
|
duration = time.time() - start_time
|
|
response.raise_for_status()
|
|
|
|
if self.use_cache:
|
|
if method == "GET":
|
|
self._cache[cache_key] = response
|
|
else:
|
|
self._cache.clear()
|
|
|
|
return response, duration
|
|
except requests.exceptions.HTTPError as e:
|
|
duration = time.time() - start_time
|
|
vprint(f"[{duration:.3f}s] HTTPError in _request: {e}")
|
|
vprint(f"Response Content: {e.response.text}")
|
|
raise
|
|
except requests.exceptions.RequestException as e:
|
|
duration = time.time() - start_time
|
|
vprint(f"[{duration:.3f}s] Request failed: {e}")
|
|
raise
|
|
|
|
def get_file_info(self, owner: str, repo: str, file_path: str, branch: str = "main"):
|
|
url = f"repos/{owner}/{repo}/contents/{file_path}"
|
|
if branch and branch != "main":
|
|
url += f"?ref={branch}"
|
|
try:
|
|
response, duration = self._request("GET", url)
|
|
return response.json()
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
return None
|
|
raise
|
|
|
|
def create_user(self, username, password, email):
|
|
vprint(f"--- Creating user: {username} ---")
|
|
data = {
|
|
"username": username,
|
|
"password": password,
|
|
"email": email,
|
|
"must_change_password": False,
|
|
"send_notify": False
|
|
}
|
|
try:
|
|
response, duration = self._request("POST", "admin/users", json=data)
|
|
vprint(f"[{duration:.3f}s] User '{username}' created.")
|
|
return True
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 422: # Already exists
|
|
vprint(f"User '{username}' already exists. Updating password...")
|
|
# Update password to be sure it matches our expectation
|
|
response, duration = self._request("PATCH", f"admin/users/{username}", json={"password": password, "login_name": username})
|
|
return False
|
|
else:
|
|
raise
|
|
|
|
def get_user_token(self, username, password, token_name="test-token"):
|
|
vprint(f"--- Getting token for user: {username} ---")
|
|
url = f"{self.base_url}/api/v1/users/{username}/tokens"
|
|
|
|
# Create new token using Basic Auth
|
|
response = requests.post(url, auth=(username, password), json={"name": token_name})
|
|
if response.status_code == 201:
|
|
return response.json()["sha1"]
|
|
response.raise_for_status()
|
|
|
|
def create_org(self, org_name):
|
|
vprint(f"--- Checking organization: {org_name} ---")
|
|
try:
|
|
response, duration = self._request("GET", f"orgs/{org_name}")
|
|
vprint(f"[{duration:.3f}s] Organization '{org_name}' already exists.")
|
|
return False
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
vprint(f"Creating organization '{org_name}'...")
|
|
data = {"username": org_name, "full_name": org_name}
|
|
response, duration = self._request("POST", "orgs", json=data)
|
|
vprint(f"[{duration:.3f}s] Organization '{org_name}' created.")
|
|
return True
|
|
else:
|
|
raise
|
|
|
|
def create_repo(self, org_name, repo_name):
|
|
vprint(f"--- Checking repository: {org_name}/{repo_name} ---")
|
|
try:
|
|
response, duration = self._request("GET", f"repos/{org_name}/{repo_name}")
|
|
vprint(f"[{duration:.3f}s] Repository '{org_name}/{repo_name}' already exists.")
|
|
return False
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
vprint(f"Creating repository '{org_name}/{repo_name}'...")
|
|
data = {
|
|
"name": repo_name,
|
|
"auto_init": True,
|
|
"default_branch": "main",
|
|
"gitignores": "Go",
|
|
"license": "MIT",
|
|
"private": False,
|
|
"readme": "Default",
|
|
"object_format_name": "sha256"
|
|
}
|
|
response, duration = self._request("POST", f"orgs/{org_name}/repos", json=data)
|
|
vprint(f"[{duration:.3f}s] Repository '{org_name}/{repo_name}' created with a README.")
|
|
time.sleep(0.1) # Added delay to allow Git operations to become available
|
|
return True
|
|
else:
|
|
raise
|
|
|
|
def add_collaborator(self, org_name, repo_name, collaborator_name, permission="write"):
|
|
vprint(f"--- Adding {collaborator_name} as a collaborator to {org_name}/{repo_name} with '{permission}' permission ---")
|
|
|
|
# Check if already a collaborator to provide accurate stats
|
|
try:
|
|
self._request("GET", f"repos/{org_name}/{repo_name}/collaborators/{collaborator_name}")
|
|
vprint(f"{collaborator_name} is already a collaborator of {org_name}/{repo_name}.")
|
|
return False
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code != 404:
|
|
raise
|
|
|
|
data = {"permission": permission}
|
|
# Gitea API returns 204 No Content on success and doesn't fail if already present.
|
|
response, duration = self._request("PUT", f"repos/{org_name}/{repo_name}/collaborators/{collaborator_name}", json=data)
|
|
vprint(f"[{duration:.3f}s] Added {collaborator_name} to {org_name}/{repo_name}.")
|
|
return True
|
|
|
|
def add_submodules(self, org_name, repo_name):
|
|
vprint(f"--- Adding submodules to {org_name}/{repo_name} using diffpatch ---")
|
|
parent_repo_path = f"repos/{org_name}/{repo_name}"
|
|
|
|
try:
|
|
response, duration = self._request("GET", f"{parent_repo_path}/contents/.gitmodules")
|
|
vprint(f"[{duration:.3f}s] Submodules appear to be already added. Skipping.")
|
|
return
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code != 404:
|
|
raise
|
|
|
|
# Get latest commit SHAs for the submodules
|
|
response_a, duration_a = self._request("GET", "repos/mypool/pkgA/branches/main")
|
|
pkg_a_sha = response_a.json()["commit"]["id"]
|
|
response_b, duration_b = self._request("GET", "repos/mypool/pkgB/branches/main")
|
|
pkg_b_sha = response_b.json()["commit"]["id"]
|
|
|
|
if not pkg_a_sha or not pkg_b_sha:
|
|
raise Exception("Error: Could not get submodule commit SHAs. Cannot apply patch.")
|
|
|
|
diff_content = f"""diff --git a/.gitmodules b/.gitmodules
|
|
new file mode 100644
|
|
index 00000000..f1838bd9
|
|
--- /dev/null
|
|
+++ b/.gitmodules
|
|
@@ -0,0 +1,6 @@
|
|
+[submodule "pkgA"]
|
|
+ path = pkgA
|
|
+ url = ../../mypool/pkgA.git
|
|
+[submodule "pkgB"]
|
|
+ path = pkgB
|
|
+ url = ../../mypool/pkgB.git
|
|
diff --git a/pkgA b/pkgA
|
|
new file mode 160000
|
|
index 00000000..{pkg_a_sha}
|
|
--- /dev/null
|
|
+++ b/pkgA
|
|
@@ -0,0 +1 @@
|
|
+Subproject commit {pkg_a_sha}
|
|
diff --git a/pkgB b/pkgB
|
|
new file mode 160000
|
|
index 00000000..{pkg_b_sha}
|
|
--- /dev/null
|
|
+++ b/pkgB
|
|
@@ -0,0 +1 @@
|
|
+Subproject commit {pkg_b_sha}
|
|
"""
|
|
message = "Add pkgA and pkgB as submodules"
|
|
data = {
|
|
"branch": "main",
|
|
"content": diff_content,
|
|
"message": message
|
|
}
|
|
vprint(f"Applying submodule patch to {org_name}/{repo_name}...")
|
|
response, duration = self._request("POST", f"{parent_repo_path}/diffpatch", json=data)
|
|
vprint(f"[{duration:.3f}s] Submodule patch applied.")
|
|
|
|
def update_repo_settings(self, org_name, repo_name):
|
|
vprint(f"--- Updating repository settings for: {org_name}/{repo_name} ---")
|
|
response, duration = self._request("GET", f"repos/{org_name}/{repo_name}")
|
|
repo_data = response.json()
|
|
|
|
# Ensure these are boolean values, not string
|
|
repo_data["allow_manual_merge"] = True
|
|
repo_data["autodetect_manual_merge"] = True
|
|
|
|
response, duration = self._request("PATCH", f"repos/{org_name}/{repo_name}", json=repo_data)
|
|
vprint(f"[{duration:.3f}s] Repository settings for '{org_name}/{repo_name}' updated.")
|
|
|
|
def create_webhook(self, owner: str, repo: str, target_url: str):
|
|
vprint(f"--- Checking webhook for {owner}/{repo} -> {target_url} ---")
|
|
url = f"repos/{owner}/{repo}/hooks"
|
|
|
|
try:
|
|
response, duration = self._request("GET", url)
|
|
hooks = response.json()
|
|
for hook in hooks:
|
|
if hook["config"]["url"] == target_url:
|
|
vprint(f"Webhook for {owner}/{repo} already exists with correct URL.")
|
|
return False
|
|
elif "gitea-publisher" in hook["config"]["url"] or "10.89.0." in hook["config"]["url"]:
|
|
vprint(f"Found old webhook {hook['id']} with URL {hook['config']['url']}. Deleting...")
|
|
self._request("DELETE", f"{url}/{hook['id']}")
|
|
except requests.exceptions.HTTPError:
|
|
pass
|
|
|
|
vprint(f"--- Creating webhook for {owner}/{repo} -> {target_url} ---")
|
|
data = {
|
|
"type": "gitea",
|
|
"config": {
|
|
"url": target_url,
|
|
"content_type": "json"
|
|
},
|
|
"events": ["push", "pull_request", "pull_request_review", "issue_comment"],
|
|
"active": True
|
|
}
|
|
response, duration = self._request("POST", url, json=data)
|
|
vprint(f"[{duration:.3f}s] Webhook created for {owner}/{repo}.")
|
|
return True
|
|
|
|
def create_label(self, owner: str, repo: str, name: str, color: str = "#abcdef"):
|
|
vprint(f"--- Checking label '{name}' in {owner}/{repo} ---")
|
|
url = f"repos/{owner}/{repo}/labels"
|
|
|
|
# Check if label exists first
|
|
try:
|
|
response, duration = self._request("GET", url)
|
|
labels = response.json()
|
|
for label in labels:
|
|
if label["name"] == name:
|
|
vprint(f"Label '{name}' already exists in {owner}/{repo}.")
|
|
return False
|
|
except requests.exceptions.HTTPError:
|
|
pass
|
|
|
|
vprint(f"--- Creating label '{name}' in {owner}/{repo} ---")
|
|
data = {
|
|
"name": name,
|
|
"color": color
|
|
}
|
|
try:
|
|
response, duration = self._request("POST", url, json=data)
|
|
vprint(f"[{duration:.3f}s] Label '{name}' created.")
|
|
return True
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 422: # Already exists (race condition or other reason)
|
|
vprint(f"Label '{name}' already exists.")
|
|
return False
|
|
else:
|
|
raise
|
|
|
|
def create_file(self, owner: str, repo: str, file_path: str, content: str, branch: str = "main", message: str = "Add file"):
|
|
file_info = self.get_file_info(owner, repo, file_path, branch=branch)
|
|
|
|
data = {
|
|
"content": base64.b64encode(content.encode('utf-8')).decode('ascii'),
|
|
"branch": branch,
|
|
"message": message
|
|
}
|
|
|
|
if file_info:
|
|
vprint(f"--- Updating file {file_path} in {owner}/{repo} ---")
|
|
# Re-fetch file_info to get the latest SHA right before update
|
|
latest_file_info = self.get_file_info(owner, repo, file_path, branch=branch)
|
|
if not latest_file_info:
|
|
raise Exception(f"File {file_path} disappeared during update attempt.")
|
|
data["sha"] = latest_file_info["sha"]
|
|
data["message"] = f"Update {file_path}"
|
|
method = "PUT"
|
|
else:
|
|
vprint(f"--- Creating file {file_path} in {owner}/{repo} ---")
|
|
method = "POST"
|
|
|
|
url = f"repos/{owner}/{repo}/contents/{file_path}"
|
|
response, duration = self._request(method, url, json=data)
|
|
vprint(f"[{duration:.3f}s] File {file_path} {'updated' if file_info else 'created'} in {owner}/{repo}.")
|
|
|
|
def create_gitea_pr(self, repo_full_name: str, diff_content: str, title: str, use_fork: bool, base_branch: str = "main", body: str = ""):
|
|
owner, repo = repo_full_name.split("/")
|
|
|
|
head_owner, head_repo = owner, repo
|
|
new_branch_name = f"pr-branch-{int(time.time()*1000)}"
|
|
|
|
if use_fork:
|
|
sudo_user = self.headers.get("Sudo")
|
|
head_owner = sudo_user
|
|
head_repo = repo
|
|
|
|
vprint(f"--- Forking {repo_full_name} ---")
|
|
try:
|
|
response, duration = self._request("POST", f"repos/{owner}/{repo}/forks", json={})
|
|
vprint(f"[{duration:.3f}s] --- Forked to {head_owner}/{head_repo} ---")
|
|
time.sleep(0.5) # Give more time for fork to be ready
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 409: # Already forked
|
|
vprint(f"--- Already forked to {head_owner}/{head_repo} ---")
|
|
else:
|
|
raise
|
|
|
|
# Apply the diff using diffpatch and create the new branch automatically
|
|
vprint(f"--- Applying diff to {head_owner}/{head_repo} from {base_branch} to new branch {new_branch_name} ---")
|
|
response, duration = self._request("POST", f"repos/{head_owner}/{head_repo}/diffpatch", json={
|
|
"branch": base_branch,
|
|
"new_branch": new_branch_name,
|
|
"content": diff_content,
|
|
"message": title
|
|
})
|
|
|
|
# Now create the PR in the ORIGINAL repo
|
|
data = {
|
|
"head": f"{head_owner}:{new_branch_name}" if head_owner != owner else new_branch_name,
|
|
"base": base_branch,
|
|
"title": title,
|
|
"body": body,
|
|
"allow_maintainer_edit": True
|
|
}
|
|
vprint(f"--- Creating PR in {repo_full_name} from {data['head']} ---")
|
|
response, duration = self._request("POST", f"repos/{owner}/{repo}/pulls", json=data)
|
|
return response.json()
|
|
|
|
def create_branch(self, owner: str, repo: str, new_branch_name: str, old_ref: str):
|
|
vprint(f"--- Checking branch '{new_branch_name}' in {owner}/{repo} ---")
|
|
try:
|
|
response, duration = self._request("GET", f"repos/{owner}/{repo}/branches/{new_branch_name}")
|
|
vprint(f"[{duration:.3f}s] Branch '{new_branch_name}' already exists.")
|
|
return False
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code != 404:
|
|
raise # Re-raise other HTTP errors
|
|
|
|
vprint(f"--- Creating branch '{new_branch_name}' in {owner}/{repo} from {old_ref} ---")
|
|
url = f"repos/{owner}/{repo}/branches"
|
|
data = {
|
|
"new_branch_name": new_branch_name,
|
|
"old_ref": old_ref
|
|
}
|
|
response, duration = self._request("POST", url, json=data)
|
|
vprint(f"[{duration:.3f}s] Branch '{new_branch_name}' created in {owner}/{repo}.")
|
|
return True
|
|
|
|
def ensure_branch_exists(self, owner: str, repo: str, branch: str = "main", timeout: int = 10):
|
|
vprint(f"--- Ensuring branch '{branch}' exists in {owner}/{repo} ---")
|
|
start_time = time.time()
|
|
while time.time() - start_time < timeout:
|
|
try:
|
|
response, duration = self._request("GET", f"repos/{owner}/{repo}/branches/{branch}")
|
|
vprint(f"[{duration:.3f}s] Branch '{branch}' confirmed in {owner}/{repo}.")
|
|
return
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
vprint(f"Branch '{branch}' not found yet in {owner}/{repo}. Retrying...")
|
|
time.sleep(1)
|
|
continue
|
|
raise
|
|
raise Exception(f"Timeout waiting for branch {branch} in {owner}/{repo}")
|
|
|
|
def modify_gitea_pr(self, repo_full_name: str, pr_number: int, diff_content: str, message: str):
|
|
owner, repo = repo_full_name.split("/")
|
|
|
|
# Get PR details to find the head branch AND head repo
|
|
response, duration = self._request("GET", f"repos/{owner}/{repo}/pulls/{pr_number}")
|
|
pr_details = response.json()
|
|
head_branch = pr_details["head"]["ref"]
|
|
head_repo_owner = pr_details["head"]["repo"]["owner"]["login"]
|
|
head_repo_name = pr_details["head"]["repo"]["name"]
|
|
|
|
# Apply the diff using diffpatch
|
|
vprint(f"--- Modifying PR #{pr_number} in {head_repo_owner}/{head_repo_name} branch {head_branch} ---")
|
|
response, duration = self._request("POST", f"repos/{head_repo_owner}/{head_repo_name}/diffpatch", json={
|
|
"branch": head_branch,
|
|
"content": diff_content,
|
|
"message": message
|
|
})
|
|
|
|
def update_gitea_pr_properties(self, repo_full_name: str, pr_number: int, **kwargs):
|
|
owner, repo = repo_full_name.split("/")
|
|
url = f"repos/{owner}/{repo}/pulls/{pr_number}"
|
|
response, duration = self._request("PATCH", url, json=kwargs)
|
|
return response.json()
|
|
|
|
def create_issue_comment(self, repo_full_name: str, issue_number: int, body: str):
|
|
owner, repo = repo_full_name.split("/")
|
|
url = f"repos/{owner}/{repo}/issues/{issue_number}/comments"
|
|
data = {"body": body}
|
|
vprint(f"--- Creating comment on {repo_full_name} issue #{issue_number} ---")
|
|
response, duration = self._request("POST", url, json=data)
|
|
return response.json()
|
|
|
|
def get_timeline_events(self, repo_full_name: str, pr_number: int):
|
|
owner, repo = repo_full_name.split("/")
|
|
url = f"repos/{owner}/{repo}/issues/{pr_number}/timeline"
|
|
|
|
# Retry logic for timeline events
|
|
for i in range(10): # Try up to 10 times
|
|
try:
|
|
response, duration = self._request("GET", url)
|
|
timeline_events = response.json()
|
|
if timeline_events: # Check if timeline_events list is not empty
|
|
return timeline_events
|
|
vprint(f"Attempt {i+1}: Timeline for PR {pr_number} is empty. Retrying in 1 seconds...")
|
|
time.sleep(1)
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
vprint(f"Attempt {i+1}: Timeline for PR {pr_number} not found yet. Retrying in 1 seconds...")
|
|
time.sleep(1)
|
|
else:
|
|
raise # Re-raise other HTTP errors
|
|
raise Exception(f"Failed to retrieve timeline for PR {pr_number} after multiple retries.")
|
|
|
|
def get_comments(self, repo_full_name: str, pr_number: int):
|
|
owner, repo = repo_full_name.split("/")
|
|
url = f"repos/{owner}/{repo}/issues/{pr_number}/comments"
|
|
|
|
# Retry logic for comments
|
|
for i in range(10): # Try up to 10 times
|
|
try:
|
|
response, duration = self._request("GET", url)
|
|
comments = response.json()
|
|
vprint(f"[{duration:.3f}s] Attempt {i+1}: Comments for PR {pr_number} received: {comments}") # Added debug print
|
|
if comments: # Check if comments list is not empty
|
|
return comments
|
|
vprint(f"Attempt {i+1}: Comments for PR {pr_number} are empty. Retrying in 1 seconds...")
|
|
time.sleep(1)
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
vprint(f"Attempt {i+1}: Comments for PR {pr_number} not found yet. Retrying in 1 seconds...")
|
|
time.sleep(1)
|
|
else:
|
|
raise # Re-raise other HTTP errors
|
|
raise Exception(f"Failed to retrieve comments for PR {pr_number} after multiple retries.")
|
|
|
|
def get_pr_details(self, repo_full_name: str, pr_number: int):
|
|
owner, repo = repo_full_name.split("/")
|
|
url = f"repos/{owner}/{repo}/pulls/{pr_number}"
|
|
response, duration = self._request("GET", url)
|
|
return response.json()
|
|
|
|
def create_review(self, repo_full_name: str, pr_number: int, event: str = "APPROVED", body: str = "LGTM"):
|
|
owner, repo = repo_full_name.split("/")
|
|
|
|
# Check if this user already has an APPROVED review to avoid 422
|
|
current_user = self.headers.get("Sudo") or "admin" # simplified
|
|
existing_reviews = self.list_reviews(repo_full_name, pr_number)
|
|
for r in existing_reviews:
|
|
if r["user"]["login"] == current_user and r["state"] == "APPROVED" and event == "APPROVED":
|
|
vprint(f"User {current_user} already has an APPROVED review for {repo_full_name} PR #{pr_number}")
|
|
return r
|
|
|
|
url = f"repos/{owner}/{repo}/pulls/{pr_number}/reviews"
|
|
data = {
|
|
"event": event,
|
|
"body": body
|
|
}
|
|
vprint(f"--- Creating and submitting review ({event}) for {repo_full_name} PR #{pr_number} as {current_user} ---")
|
|
try:
|
|
response, duration = self._request("POST", url, json=data)
|
|
review = response.json()
|
|
except requests.exceptions.HTTPError as e:
|
|
# If it fails with 422, it might be because a review is already pending or something else
|
|
vprint(f"Failed to create review: {e.response.text}")
|
|
# Try to find a pending review to submit
|
|
existing_reviews = self.list_reviews(repo_full_name, pr_number)
|
|
pending_review = next((r for r in existing_reviews if r["user"]["login"] == current_user and r["state"] == "PENDING"), None)
|
|
if pending_review:
|
|
review = pending_review
|
|
else:
|
|
raise
|
|
|
|
# If the state is PENDING, we submit it.
|
|
if review.get("state") == "PENDING":
|
|
review_id = review["id"]
|
|
submit_url = f"repos/{owner}/{repo}/pulls/{pr_number}/reviews/{review_id}"
|
|
submit_data = {
|
|
"event": event,
|
|
"body": body
|
|
}
|
|
try:
|
|
response, duration = self._request("POST", submit_url, json=submit_data)
|
|
vprint(f"[{duration:.3f}s] --- Review {review_id} submitted ---")
|
|
except requests.exceptions.HTTPError as e:
|
|
if "already" in e.response.text.lower() or "stay pending" in e.response.text.lower():
|
|
vprint(f"Review {review_id} could not be submitted further: {e.response.text}")
|
|
else:
|
|
raise
|
|
|
|
return review
|
|
|
|
def list_reviews(self, repo_full_name: str, pr_number: int):
|
|
owner, repo = repo_full_name.split("/")
|
|
url = f"repos/{owner}/{repo}/pulls/{pr_number}/reviews"
|
|
response, duration = self._request("GET", url)
|
|
return response.json()
|
|
|
|
def approve_requested_reviews(self, repo_full_name: str, pr_number: int):
|
|
vprint(f"--- Checking for REQUEST_REVIEW state in {repo_full_name} PR #{pr_number} ---")
|
|
reviews = self.list_reviews(repo_full_name, pr_number)
|
|
|
|
requested_reviews = [r for r in reviews if r["state"] == "REQUEST_REVIEW"]
|
|
if not requested_reviews:
|
|
vprint(f"No reviews in REQUEST_REVIEW state found for {repo_full_name} PR #{pr_number}")
|
|
return
|
|
|
|
admin_token = self.headers["Authorization"].split(" ")[1]
|
|
for r in requested_reviews:
|
|
reviewer_username = r["user"]["login"]
|
|
vprint(f"Reacting on REQUEST_REVIEW for user {reviewer_username} by approving...")
|
|
|
|
reviewer_client = GiteaAPIClient(base_url=self.base_url, token=admin_token, sudo=reviewer_username)
|
|
time.sleep(1) # give a chance to avoid possible concurrency issues with reviews request/approval
|
|
reviewer_client.create_review(repo_full_name, pr_number, event="APPROVED", body="Approving requested review")
|
|
|
|
def wait_for_project_pr(self, package_pr_repo, package_pr_number, project_pr_repo="myproducts/mySLFO", timeout=60):
|
|
vprint(f"Polling {package_pr_repo} PR #{package_pr_number} timeline for forwarded PR event in {project_pr_repo}...")
|
|
for _ in range(timeout):
|
|
time.sleep(1)
|
|
timeline_events = self.get_timeline_events(package_pr_repo, package_pr_number)
|
|
for event in timeline_events:
|
|
if event.get("type") == "pull_ref":
|
|
if not (ref_issue := event.get("ref_issue")):
|
|
continue
|
|
url_to_check = ref_issue.get("html_url", "")
|
|
match = re.search(fr"{project_pr_repo}/pulls/(\d+)", url_to_check)
|
|
if match:
|
|
return int(match.group(1))
|
|
return None
|
|
|
|
def approve_and_wait_merge(self, package_pr_repo, package_pr_number, project_pr_number, project_pr_repo="myproducts/mySLFO", timeout=30):
|
|
vprint(f"Approving reviews and verifying both PRs are merged ({package_pr_repo}#{package_pr_number} and {project_pr_repo}#{project_pr_number})...")
|
|
package_merged = False
|
|
project_merged = False
|
|
|
|
for i in range(timeout):
|
|
self.approve_requested_reviews(package_pr_repo, package_pr_number)
|
|
self.approve_requested_reviews(project_pr_repo, project_pr_number)
|
|
|
|
if not package_merged:
|
|
pkg_details = self.get_pr_details(package_pr_repo, package_pr_number)
|
|
if pkg_details.get("merged"):
|
|
package_merged = True
|
|
vprint(f"Package PR {package_pr_repo}#{package_pr_number} merged.")
|
|
|
|
if not project_merged:
|
|
prj_details = self.get_pr_details(project_pr_repo, project_pr_number)
|
|
if prj_details.get("merged"):
|
|
project_merged = True
|
|
vprint(f"Project PR {project_pr_repo}#{project_pr_number} merged.")
|
|
|
|
if package_merged and project_merged:
|
|
return True, True
|
|
|
|
time.sleep(1)
|
|
return package_merged, project_merged
|