Refactor create_gitea_pr() in common_test_utils.py to leverage the 'new_branch' parameter in the Gitea 'diffpatch' API call. This allows for automatic creation of the target branch while applying the diff, eliminating the need for explicit branch creation using the branches endpoint. This also fixed strange behavior when diffpatch damaged history of precreated branch
558 lines
24 KiB
Python
558 lines
24 KiB
Python
import os
|
|
import time
|
|
import pytest
|
|
import requests
|
|
import json
|
|
import re
|
|
import xml.etree.ElementTree as ET
|
|
from pathlib import Path
|
|
import base64
|
|
import subprocess
|
|
|
|
TEST_DATA_DIR = Path(__file__).parent.parent / "data"
|
|
BUILD_RESULT_TEMPLATE = TEST_DATA_DIR / "build_result.xml.template"
|
|
MOCK_RESPONSES_DIR = Path(__file__).parent.parent.parent / "mock-obs" / "responses"
|
|
MOCK_BUILD_RESULT_FILE = (
|
|
MOCK_RESPONSES_DIR / "GET_build_openSUSE:Leap:16.0:PullRequest:*__result"
|
|
)
|
|
MOCK_BUILD_RESULT_FILE1 = MOCK_RESPONSES_DIR / "GET_build_openSUSE:Leap:16.0__result"
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_build_result():
|
|
"""
|
|
Fixture to create a mock build result file from the template.
|
|
Returns a factory function that the test can call with parameters.
|
|
"""
|
|
|
|
def _create_result_file(package_name: str, code: str):
|
|
tree = ET.parse(BUILD_RESULT_TEMPLATE)
|
|
root = tree.getroot()
|
|
for status_tag in root.findall(".//status"):
|
|
status_tag.set("package", package_name)
|
|
status_tag.set("code", code)
|
|
|
|
MOCK_RESPONSES_DIR.mkdir(exist_ok=True)
|
|
tree.write(MOCK_BUILD_RESULT_FILE)
|
|
tree.write(MOCK_BUILD_RESULT_FILE1)
|
|
return str(MOCK_BUILD_RESULT_FILE)
|
|
|
|
yield _create_result_file
|
|
|
|
if MOCK_BUILD_RESULT_FILE.exists():
|
|
MOCK_BUILD_RESULT_FILE.unlink()
|
|
MOCK_BUILD_RESULT_FILE1.unlink()
|
|
|
|
|
|
class GiteaAPIClient:
|
|
def __init__(self, base_url, token, sudo=None):
|
|
self.base_url = base_url
|
|
self.headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
|
|
if sudo:
|
|
self.headers["Sudo"] = sudo
|
|
|
|
def _request(self, method, path, **kwargs):
|
|
url = f"{self.base_url}/api/v1/{path}"
|
|
response = requests.request(method, url, headers=self.headers, **kwargs)
|
|
try:
|
|
response.raise_for_status()
|
|
except requests.exceptions.HTTPError as e:
|
|
print(f"HTTPError in _request: {e}")
|
|
print(f"Response Content: {e.response.text}")
|
|
raise
|
|
return response
|
|
|
|
def get_file_info(self, owner: str, repo: str, file_path: str, branch: str = "main"):
|
|
url = f"repos/{owner}/{repo}/contents/{file_path}"
|
|
if branch and branch != "main":
|
|
url += f"?ref={branch}"
|
|
try:
|
|
response = self._request("GET", url)
|
|
return response.json()
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
return None
|
|
raise
|
|
|
|
def create_user(self, username, password, email):
|
|
print(f"--- Creating user: {username} ---")
|
|
data = {
|
|
"username": username,
|
|
"password": password,
|
|
"email": email,
|
|
"must_change_password": False,
|
|
"send_notify": False
|
|
}
|
|
try:
|
|
self._request("POST", "admin/users", json=data)
|
|
print(f"User '{username}' created.")
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 422: # Already exists
|
|
print(f"User '{username}' already exists. Updating password...")
|
|
# Update password to be sure it matches our expectation
|
|
self._request("PATCH", f"admin/users/{username}", json={"password": password, "login_name": username})
|
|
else:
|
|
raise
|
|
|
|
def get_user_token(self, username, password, token_name="test-token"):
|
|
print(f"--- Getting token for user: {username} ---")
|
|
url = f"{self.base_url}/api/v1/users/{username}/tokens"
|
|
|
|
# Create new token using Basic Auth
|
|
response = requests.post(url, auth=(username, password), json={"name": token_name})
|
|
if response.status_code == 201:
|
|
return response.json()["sha1"]
|
|
response.raise_for_status()
|
|
|
|
def create_org(self, org_name):
|
|
print(f"--- Checking organization: {org_name} ---")
|
|
try:
|
|
self._request("GET", f"orgs/{org_name}")
|
|
print(f"Organization '{org_name}' already exists.")
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
print(f"Creating organization '{org_name}'...")
|
|
data = {"username": org_name, "full_name": org_name}
|
|
self._request("POST", "orgs", json=data)
|
|
print(f"Organization '{org_name}' created.")
|
|
else:
|
|
raise
|
|
print(f"--- Checking organization: {org_name} ---")
|
|
try:
|
|
self._request("GET", f"orgs/{org_name}")
|
|
print(f"Organization '{org_name}' already exists.")
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
print(f"Creating organization '{org_name}'...")
|
|
data = {"username": org_name, "full_name": org_name}
|
|
self._request("POST", "orgs", json=data)
|
|
print(f"Organization '{org_name}' created.")
|
|
else:
|
|
raise
|
|
|
|
def create_repo(self, org_name, repo_name):
|
|
print(f"--- Checking repository: {org_name}/{repo_name} ---")
|
|
try:
|
|
self._request("GET", f"repos/{org_name}/{repo_name}")
|
|
print(f"Repository '{org_name}/{repo_name}' already exists.")
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
print(f"Creating repository '{org_name}/{repo_name}'...")
|
|
data = {
|
|
"name": repo_name,
|
|
"auto_init": True,
|
|
"default_branch": "main",
|
|
"gitignores": "Go",
|
|
"license": "MIT",
|
|
"private": False,
|
|
"readme": "Default"
|
|
}
|
|
self._request("POST", f"orgs/{org_name}/repos", json=data)
|
|
print(f"Repository '{org_name}/{repo_name}' created with a README.")
|
|
time.sleep(0.1) # Added delay to allow Git operations to become available
|
|
else:
|
|
raise
|
|
|
|
def add_collaborator(self, org_name, repo_name, collaborator_name, permission="write"):
|
|
print(f"--- Adding {collaborator_name} as a collaborator to {org_name}/{repo_name} with '{permission}' permission ---")
|
|
data = {"permission": permission}
|
|
# Gitea API returns 204 No Content on success and doesn't fail if already present.
|
|
self._request("PUT", f"repos/{org_name}/{repo_name}/collaborators/{collaborator_name}", json=data)
|
|
print(f"Attempted to add {collaborator_name} to {org_name}/{repo_name}.")
|
|
|
|
def add_submodules(self, org_name, repo_name):
|
|
print(f"--- Adding submodules to {org_name}/{repo_name} using diffpatch ---")
|
|
parent_repo_path = f"repos/{org_name}/{repo_name}"
|
|
|
|
try:
|
|
self._request("GET", f"{parent_repo_path}/contents/.gitmodules")
|
|
print("Submodules appear to be already added. Skipping.")
|
|
return
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code != 404:
|
|
raise
|
|
|
|
# Get latest commit SHAs for the submodules
|
|
pkg_a_sha = self._request("GET", "repos/mypool/pkgA/branches/main").json()["commit"]["id"]
|
|
pkg_b_sha = self._request("GET", "repos/mypool/pkgB/branches/main").json()["commit"]["id"]
|
|
|
|
if not pkg_a_sha or not pkg_b_sha:
|
|
raise Exception("Error: Could not get submodule commit SHAs. Cannot apply patch.")
|
|
|
|
diff_content = f"""diff --git a/.gitmodules b/.gitmodules
|
|
new file mode 100644
|
|
index 0000000..f1838bd
|
|
--- /dev/null
|
|
+++ b/.gitmodules
|
|
@@ -0,0 +1,6 @@
|
|
+[submodule "pkgA"]
|
|
+ path = pkgA
|
|
+ url = ../../mypool/pkgA.git
|
|
+[submodule "pkgB"]
|
|
+ path = pkgB
|
|
+ url = ../../mypool/pkgB.git
|
|
diff --git a/pkgA b/pkgA
|
|
new file mode 160000
|
|
index 0000000..{pkg_a_sha}
|
|
--- /dev/null
|
|
+++ b/pkgA
|
|
@@ -0,0 +1 @@
|
|
+Subproject commit {pkg_a_sha}
|
|
diff --git a/pkgB b/pkgB
|
|
new file mode 160000
|
|
index 0000000..{pkg_b_sha}
|
|
--- /dev/null
|
|
+++ b/pkgB
|
|
@@ -0,0 +1 @@
|
|
+Subproject commit {pkg_b_sha}
|
|
"""
|
|
message = "Add pkgA and pkgB as submodules"
|
|
data = {
|
|
"branch": "main",
|
|
"content": diff_content,
|
|
"message": message
|
|
}
|
|
print(f"Applying submodule patch to {org_name}/{repo_name}...")
|
|
self._request("POST", f"{parent_repo_path}/diffpatch", json=data)
|
|
print("Submodule patch applied.")
|
|
|
|
def update_repo_settings(self, org_name, repo_name):
|
|
print(f"--- Updating repository settings for: {org_name}/{repo_name} ---")
|
|
repo_data = self._request("GET", f"repos/{org_name}/{repo_name}").json()
|
|
|
|
# Ensure these are boolean values, not string
|
|
repo_data["allow_manual_merge"] = True
|
|
repo_data["autodetect_manual_merge"] = True
|
|
|
|
self._request("PATCH", f"repos/{org_name}/{repo_name}", json=repo_data)
|
|
print(f"Repository settings for '{org_name}/{repo_name}' updated.")
|
|
|
|
def create_label(self, owner: str, repo: str, name: str, color: str = "#abcdef"):
|
|
print(f"--- Creating label '{name}' in {owner}/{repo} ---")
|
|
url = f"repos/{owner}/{repo}/labels"
|
|
data = {
|
|
"name": name,
|
|
"color": color
|
|
}
|
|
try:
|
|
self._request("POST", url, json=data)
|
|
print(f"Label '{name}' created.")
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 422: # Already exists
|
|
print(f"Label '{name}' already exists.")
|
|
else:
|
|
raise
|
|
|
|
def create_file(self, owner: str, repo: str, file_path: str, content: str, branch: str = "main", message: str = "Add file"):
|
|
file_info = self.get_file_info(owner, repo, file_path, branch=branch)
|
|
|
|
data = {
|
|
"content": base64.b64encode(content.encode('utf-8')).decode('ascii'),
|
|
"branch": branch,
|
|
"message": message
|
|
}
|
|
|
|
if file_info:
|
|
print(f"--- Updating file {file_path} in {owner}/{repo} ---")
|
|
# Re-fetch file_info to get the latest SHA right before update
|
|
latest_file_info = self.get_file_info(owner, repo, file_path, branch=branch)
|
|
if not latest_file_info:
|
|
raise Exception(f"File {file_path} disappeared during update attempt.")
|
|
data["sha"] = latest_file_info["sha"]
|
|
data["message"] = f"Update {file_path}"
|
|
method = "PUT"
|
|
else:
|
|
print(f"--- Creating file {file_path} in {owner}/{repo} ---")
|
|
method = "POST"
|
|
|
|
url = f"repos/{owner}/{repo}/contents/{file_path}"
|
|
self._request(method, url, json=data)
|
|
print(f"File {file_path} {'updated' if file_info else 'created'} in {owner}/{repo}.")
|
|
|
|
def create_gitea_pr(self, repo_full_name: str, diff_content: str, title: str, use_fork: bool, base_branch: str = "main", body: str = ""):
|
|
owner, repo = repo_full_name.split("/")
|
|
|
|
head_owner, head_repo = owner, repo
|
|
new_branch_name = f"pr-branch-{int(time.time()*1000)}"
|
|
|
|
if use_fork:
|
|
sudo_user = self.headers.get("Sudo")
|
|
head_owner = sudo_user
|
|
head_repo = repo
|
|
|
|
print(f"--- Forking {repo_full_name} ---")
|
|
try:
|
|
self._request("POST", f"repos/{owner}/{repo}/forks", json={})
|
|
print(f"--- Forked to {head_owner}/{head_repo} ---")
|
|
time.sleep(0.5) # Give more time for fork to be ready
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 409: # Already forked
|
|
print(f"--- Already forked to {head_owner}/{head_repo} ---")
|
|
else:
|
|
raise
|
|
|
|
# Apply the diff using diffpatch and create the new branch automatically
|
|
print(f"--- Applying diff to {head_owner}/{head_repo} from {base_branch} to new branch {new_branch_name} ---")
|
|
self._request("POST", f"repos/{head_owner}/{head_repo}/diffpatch", json={
|
|
"branch": base_branch,
|
|
"new_branch": new_branch_name,
|
|
"content": diff_content,
|
|
"message": title
|
|
})
|
|
|
|
# Now create the PR in the ORIGINAL repo
|
|
data = {
|
|
"head": f"{head_owner}:{new_branch_name}" if head_owner != owner else new_branch_name,
|
|
"base": base_branch,
|
|
"title": title,
|
|
"body": body,
|
|
"allow_maintainer_edit": True
|
|
}
|
|
print(f"--- Creating PR in {repo_full_name} from {data['head']} ---")
|
|
response = self._request("POST", f"repos/{owner}/{repo}/pulls", json=data)
|
|
return response.json()
|
|
|
|
def create_branch(self, owner: str, repo: str, new_branch_name: str, old_ref: str):
|
|
print(f"--- Checking branch '{new_branch_name}' in {owner}/{repo} ---")
|
|
try:
|
|
self._request("GET", f"repos/{owner}/{repo}/branches/{new_branch_name}")
|
|
print(f"Branch '{new_branch_name}' already exists.")
|
|
return
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code != 404:
|
|
raise # Re-raise other HTTP errors
|
|
|
|
print(f"--- Creating branch '{new_branch_name}' in {owner}/{repo} from {old_ref} ---")
|
|
url = f"repos/{owner}/{repo}/branches"
|
|
data = {
|
|
"new_branch_name": new_branch_name,
|
|
"old_ref": old_ref
|
|
}
|
|
self._request("POST", url, json=data)
|
|
print(f"Branch '{new_branch_name}' created in {owner}/{repo}.")
|
|
|
|
def ensure_branch_exists(self, owner: str, repo: str, branch: str = "main", timeout: int = 10):
|
|
print(f"--- Ensuring branch '{branch}' exists in {owner}/{repo} ---")
|
|
start_time = time.time()
|
|
while time.time() - start_time < timeout:
|
|
try:
|
|
self._request("GET", f"repos/{owner}/{repo}/branches/{branch}")
|
|
print(f"Branch '{branch}' confirmed in {owner}/{repo}.")
|
|
return
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
print(f"Branch '{branch}' not found yet in {owner}/{repo}. Retrying...")
|
|
time.sleep(1)
|
|
continue
|
|
raise
|
|
raise Exception(f"Timeout waiting for branch {branch} in {owner}/{repo}")
|
|
|
|
|
|
|
|
def modify_gitea_pr(self, repo_full_name: str, pr_number: int, diff_content: str, message: str):
|
|
owner, repo = repo_full_name.split("/")
|
|
|
|
# Get PR details to find the head branch AND head repo
|
|
pr_details = self._request("GET", f"repos/{owner}/{repo}/pulls/{pr_number}").json()
|
|
head_branch = pr_details["head"]["ref"]
|
|
head_repo_owner = pr_details["head"]["repo"]["owner"]["login"]
|
|
head_repo_name = pr_details["head"]["repo"]["name"]
|
|
|
|
# Apply the diff using diffpatch
|
|
print(f"--- Modifying PR #{pr_number} in {head_repo_owner}/{head_repo_name} branch {head_branch} ---")
|
|
self._request("POST", f"repos/{head_repo_owner}/{head_repo_name}/diffpatch", json={
|
|
"branch": head_branch,
|
|
"content": diff_content,
|
|
"message": message
|
|
})
|
|
|
|
def update_gitea_pr_properties(self, repo_full_name: str, pr_number: int, **kwargs):
|
|
owner, repo = repo_full_name.split("/")
|
|
url = f"repos/{owner}/{repo}/pulls/{pr_number}"
|
|
response = self._request("PATCH", url, json=kwargs)
|
|
return response.json()
|
|
|
|
def create_issue_comment(self, repo_full_name: str, issue_number: int, body: str):
|
|
owner, repo = repo_full_name.split("/")
|
|
url = f"repos/{owner}/{repo}/issues/{issue_number}/comments"
|
|
data = {"body": body}
|
|
print(f"--- Creating comment on {repo_full_name} issue #{issue_number} ---")
|
|
response = self._request("POST", url, json=data)
|
|
return response.json()
|
|
|
|
def get_timeline_events(self, repo_full_name: str, pr_number: int):
|
|
owner, repo = repo_full_name.split("/")
|
|
url = f"repos/{owner}/{repo}/issues/{pr_number}/timeline"
|
|
|
|
# Retry logic for timeline events
|
|
for i in range(10): # Try up to 10 times
|
|
try:
|
|
response = self._request("GET", url)
|
|
timeline_events = response.json()
|
|
if timeline_events: # Check if timeline_events list is not empty
|
|
return timeline_events
|
|
print(f"Attempt {i+1}: Timeline for PR {pr_number} is empty. Retrying in 1 seconds...")
|
|
time.sleep(1)
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
print(f"Attempt {i+1}: Timeline for PR {pr_number} not found yet. Retrying in 1 seconds...")
|
|
time.sleep(1)
|
|
else:
|
|
raise # Re-raise other HTTP errors
|
|
raise Exception(f"Failed to retrieve timeline for PR {pr_number} after multiple retries.")
|
|
|
|
def get_comments(self, repo_full_name: str, pr_number: int):
|
|
owner, repo = repo_full_name.split("/")
|
|
url = f"repos/{owner}/{repo}/issues/{pr_number}/comments"
|
|
|
|
# Retry logic for comments
|
|
for i in range(10): # Try up to 10 times
|
|
try:
|
|
response = self._request("GET", url)
|
|
comments = response.json()
|
|
print(f"Attempt {i+1}: Comments for PR {pr_number} received: {comments}") # Added debug print
|
|
if comments: # Check if comments list is not empty
|
|
return comments
|
|
print(f"Attempt {i+1}: Comments for PR {pr_number} are empty. Retrying in 1 seconds...")
|
|
time.sleep(1)
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
print(f"Attempt {i+1}: Comments for PR {pr_number} not found yet. Retrying in 1 seconds...")
|
|
time.sleep(1)
|
|
else:
|
|
raise # Re-raise other HTTP errors
|
|
raise Exception(f"Failed to retrieve comments for PR {pr_number} after multiple retries.")
|
|
|
|
def get_pr_details(self, repo_full_name: str, pr_number: int):
|
|
owner, repo = repo_full_name.split("/")
|
|
url = f"repos/{owner}/{repo}/pulls/{pr_number}"
|
|
response = self._request("GET", url)
|
|
return response.json()
|
|
|
|
def create_review(self, repo_full_name: str, pr_number: int, event: str = "APPROVED", body: str = "LGTM"):
|
|
owner, repo = repo_full_name.split("/")
|
|
|
|
# Check if this user already has an APPROVED review to avoid 422
|
|
current_user = self.headers.get("Sudo") or "admin" # simplified
|
|
existing_reviews = self.list_reviews(repo_full_name, pr_number)
|
|
for r in existing_reviews:
|
|
if r["user"]["login"] == current_user and r["state"] == "APPROVED" and event == "APPROVED":
|
|
print(f"User {current_user} already has an APPROVED review for {repo_full_name} PR #{pr_number}")
|
|
return r
|
|
|
|
url = f"repos/{owner}/{repo}/pulls/{pr_number}/reviews"
|
|
data = {
|
|
"event": event,
|
|
"body": body
|
|
}
|
|
print(f"--- Creating and submitting review ({event}) for {repo_full_name} PR #{pr_number} as {current_user} ---")
|
|
try:
|
|
response = self._request("POST", url, json=data)
|
|
review = response.json()
|
|
except requests.exceptions.HTTPError as e:
|
|
# If it fails with 422, it might be because a review is already pending or something else
|
|
print(f"Failed to create review: {e.response.text}")
|
|
# Try to find a pending review to submit
|
|
existing_reviews = self.list_reviews(repo_full_name, pr_number)
|
|
pending_review = next((r for r in existing_reviews if r["user"]["login"] == current_user and r["state"] == "PENDING"), None)
|
|
if pending_review:
|
|
review = pending_review
|
|
else:
|
|
raise
|
|
|
|
# If the state is PENDING, we submit it.
|
|
if review.get("state") == "PENDING":
|
|
review_id = review["id"]
|
|
submit_url = f"repos/{owner}/{repo}/pulls/{pr_number}/reviews/{review_id}"
|
|
submit_data = {
|
|
"event": event,
|
|
"body": body
|
|
}
|
|
try:
|
|
self._request("POST", submit_url, json=submit_data)
|
|
print(f"--- Review {review_id} submitted ---")
|
|
except requests.exceptions.HTTPError as e:
|
|
if "already" in e.response.text.lower() or "stay pending" in e.response.text.lower():
|
|
print(f"Review {review_id} could not be submitted further: {e.response.text}")
|
|
else:
|
|
raise
|
|
|
|
return review
|
|
|
|
def list_reviews(self, repo_full_name: str, pr_number: int):
|
|
owner, repo = repo_full_name.split("/")
|
|
url = f"repos/{owner}/{repo}/pulls/{pr_number}/reviews"
|
|
response = self._request("GET", url)
|
|
return response.json()
|
|
|
|
def approve_requested_reviews(self, repo_full_name: str, pr_number: int):
|
|
print(f"--- Checking for REQUEST_REVIEW state in {repo_full_name} PR #{pr_number} ---")
|
|
reviews = self.list_reviews(repo_full_name, pr_number)
|
|
|
|
requested_reviews = [r for r in reviews if r["state"] == "REQUEST_REVIEW"]
|
|
if not requested_reviews:
|
|
print(f"No reviews in REQUEST_REVIEW state found for {repo_full_name} PR #{pr_number}")
|
|
return
|
|
|
|
admin_token = self.headers["Authorization"].split(" ")[1]
|
|
for r in requested_reviews:
|
|
reviewer_username = r["user"]["login"]
|
|
print(f"Reacting on REQUEST_REVIEW for user {reviewer_username} by approving...")
|
|
|
|
reviewer_client = GiteaAPIClient(base_url=self.base_url, token=admin_token, sudo=reviewer_username)
|
|
time.sleep(1) # give a chance to avoid possible concurrency issues with reviews request/approval
|
|
reviewer_client.create_review(repo_full_name, pr_number, event="APPROVED", body="Approving requested review")
|
|
|
|
def restart_service(self, service_name: str):
|
|
print(f"--- Restarting service: {service_name} ---")
|
|
try:
|
|
# Assumes podman-compose.yml is in the parent directory of tests/lib
|
|
subprocess.run(["podman-compose", "restart", service_name], check=True, cwd=os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)))
|
|
print(f"Service {service_name} restarted successfully.")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error restarting service {service_name}: {e}")
|
|
raise
|
|
|
|
def wait_for_project_pr(self, package_pr_repo, package_pr_number, project_pr_repo="myproducts/mySLFO", timeout=60):
|
|
print(f"Polling {package_pr_repo} PR #{package_pr_number} timeline for forwarded PR event in {project_pr_repo}...")
|
|
for _ in range(timeout):
|
|
time.sleep(1)
|
|
timeline_events = self.get_timeline_events(package_pr_repo, package_pr_number)
|
|
for event in timeline_events:
|
|
if event.get("type") == "pull_ref":
|
|
if not (ref_issue := event.get("ref_issue")):
|
|
continue
|
|
url_to_check = ref_issue.get("html_url", "")
|
|
match = re.search(fr"{project_pr_repo}/pulls/(\d+)", url_to_check)
|
|
if match:
|
|
return int(match.group(1))
|
|
return None
|
|
|
|
def approve_and_wait_merge(self, package_pr_repo, package_pr_number, project_pr_number, project_pr_repo="myproducts/mySLFO", timeout=30):
|
|
print(f"Approving reviews and verifying both PRs are merged ({package_pr_repo}#{package_pr_number} and {project_pr_repo}#{project_pr_number})...")
|
|
package_merged = False
|
|
project_merged = False
|
|
|
|
for i in range(timeout):
|
|
self.approve_requested_reviews(package_pr_repo, package_pr_number)
|
|
self.approve_requested_reviews(project_pr_repo, project_pr_number)
|
|
|
|
if not package_merged:
|
|
pkg_details = self.get_pr_details(package_pr_repo, package_pr_number)
|
|
if pkg_details.get("merged"):
|
|
package_merged = True
|
|
print(f"Package PR {package_pr_repo}#{package_pr_number} merged.")
|
|
|
|
if not project_merged:
|
|
prj_details = self.get_pr_details(project_pr_repo, project_pr_number)
|
|
if prj_details.get("merged"):
|
|
project_merged = True
|
|
print(f"Project PR {project_pr_repo}#{project_pr_number} merged.")
|
|
|
|
if package_merged and project_merged:
|
|
return True, True
|
|
|
|
time.sleep(1)
|
|
return package_merged, project_merged
|
|
|