302 lines
12 KiB
Python
302 lines
12 KiB
Python
import os
|
|
import time
|
|
import pytest
|
|
import requests
|
|
import json
|
|
import xml.etree.ElementTree as ET
|
|
from pathlib import Path
|
|
import base64
|
|
|
|
TEST_DATA_DIR = Path(__file__).parent.parent / "data"
|
|
BUILD_RESULT_TEMPLATE = TEST_DATA_DIR / "build_result.xml.template"
|
|
MOCK_RESPONSES_DIR = Path(__file__).parent.parent.parent / "mock-obs" / "responses"
|
|
MOCK_BUILD_RESULT_FILE = (
|
|
MOCK_RESPONSES_DIR / "GET_build_openSUSE:Leap:16.0:PullRequest:*__result"
|
|
)
|
|
MOCK_BUILD_RESULT_FILE1 = MOCK_RESPONSES_DIR / "GET_build_openSUSE:Leap:16.0__result"
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_build_result():
|
|
"""
|
|
Fixture to create a mock build result file from the template.
|
|
Returns a factory function that the test can call with parameters.
|
|
"""
|
|
|
|
def _create_result_file(package_name: str, code: str):
|
|
tree = ET.parse(BUILD_RESULT_TEMPLATE)
|
|
root = tree.getroot()
|
|
for status_tag in root.findall(".//status"):
|
|
status_tag.set("package", package_name)
|
|
status_tag.set("code", code)
|
|
|
|
MOCK_RESPONSES_DIR.mkdir(exist_ok=True)
|
|
tree.write(MOCK_BUILD_RESULT_FILE)
|
|
tree.write(MOCK_BUILD_RESULT_FILE1)
|
|
return str(MOCK_BUILD_RESULT_FILE)
|
|
|
|
yield _create_result_file
|
|
|
|
if MOCK_BUILD_RESULT_FILE.exists():
|
|
MOCK_BUILD_RESULT_FILE.unlink()
|
|
MOCK_BUILD_RESULT_FILE1.unlink()
|
|
|
|
|
|
class GiteaAPIClient:
|
|
def __init__(self, base_url, token):
|
|
self.base_url = base_url
|
|
self.headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
|
|
|
|
def _request(self, method, path, **kwargs):
|
|
url = f"{self.base_url}/api/v1/{path}"
|
|
response = requests.request(method, url, headers=self.headers, **kwargs)
|
|
try:
|
|
response.raise_for_status()
|
|
except requests.exceptions.HTTPError as e:
|
|
print(f"HTTPError in _request: {e}")
|
|
print(f"Response Content: {e.response.text}")
|
|
raise
|
|
return response
|
|
|
|
def create_org(self, org_name):
|
|
print(f"--- Checking organization: {org_name} ---")
|
|
try:
|
|
self._request("GET", f"orgs/{org_name}")
|
|
print(f"Organization '{org_name}' already exists.")
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
print(f"Creating organization '{org_name}'...")
|
|
data = {"username": org_name, "full_name": org_name}
|
|
self._request("POST", "orgs", json=data)
|
|
print(f"Organization '{org_name}' created.")
|
|
else:
|
|
raise
|
|
|
|
def create_repo(self, org_name, repo_name):
|
|
print(f"--- Checking repository: {org_name}/{repo_name} ---")
|
|
try:
|
|
self._request("GET", f"repos/{org_name}/{repo_name}")
|
|
print(f"Repository '{org_name}/{repo_name}' already exists.")
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
print(f"Creating repository '{org_name}/{repo_name}'...")
|
|
data = {
|
|
"name": repo_name,
|
|
"auto_init": True,
|
|
"default_branch": "main",
|
|
"gitignores": "Go",
|
|
"license": "MIT",
|
|
"private": False,
|
|
"readme": "Default"
|
|
}
|
|
self._request("POST", f"orgs/{org_name}/repos", json=data)
|
|
print(f"Repository '{org_name}/{repo_name}' created with a README.")
|
|
time.sleep(1) # Added delay to allow Git operations to become available
|
|
else:
|
|
raise
|
|
|
|
def add_collaborator(self, org_name, repo_name, collaborator_name, permission="write"):
|
|
print(f"--- Adding {collaborator_name} as a collaborator to {org_name}/{repo_name} with '{permission}' permission ---")
|
|
data = {"permission": permission}
|
|
# Gitea API returns 204 No Content on success and doesn't fail if already present.
|
|
self._request("PUT", f"repos/{org_name}/{repo_name}/collaborators/{collaborator_name}", json=data)
|
|
print(f"Attempted to add {collaborator_name} to {org_name}/{repo_name}.")
|
|
|
|
def add_submodules(self, org_name, repo_name):
|
|
print(f"--- Adding submodules to {org_name}/{repo_name} using diffpatch ---")
|
|
parent_repo_path = f"repos/{org_name}/{repo_name}"
|
|
|
|
try:
|
|
self._request("GET", f"{parent_repo_path}/contents/.gitmodules")
|
|
print("Submodules appear to be already added. Skipping.")
|
|
return
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code != 404:
|
|
raise
|
|
|
|
# Get latest commit SHAs for the submodules
|
|
pkg_a_sha = self._request("GET", "repos/pool/pkgA/branches/main").json()["commit"]["id"]
|
|
pkg_b_sha = self._request("GET", "repos/pool/pkgB/branches/main").json()["commit"]["id"]
|
|
|
|
if not pkg_a_sha or not pkg_b_sha:
|
|
raise Exception("Error: Could not get submodule commit SHAs. Cannot apply patch.")
|
|
|
|
diff_content = f"""diff --git a/.gitmodules b/.gitmodules
|
|
new file mode 100644
|
|
index 0000000..f1838bd
|
|
--- /dev/null
|
|
+++ b/.gitmodules
|
|
@@ -0,0 +1,6 @@
|
|
+[submodule "pkgA"]
|
|
+ path = pkgA
|
|
+ url = ../../pool/pkgA.git
|
|
+[submodule "pkgB"]
|
|
+ path = pkgB
|
|
+ url = ../../pool/pkgB.git
|
|
diff --git a/pkgA b/pkgA
|
|
new file mode 160000
|
|
index 0000000..{pkg_a_sha}
|
|
--- /dev/null
|
|
+++ b/pkgA
|
|
@@ -0,0 +1 @@
|
|
+Subproject commit {pkg_a_sha}
|
|
diff --git a/pkgB b/pkgB
|
|
new file mode 160000
|
|
index 0000000..{pkg_b_sha}
|
|
--- /dev/null
|
|
+++ b/pkgB
|
|
@@ -0,0 +1 @@
|
|
+Subproject commit {pkg_b_sha}
|
|
diff --git a/workflow.config b/workflow.config
|
|
new file mode 100644
|
|
--- /dev/null
|
|
+++ b/workflow.config
|
|
@@ -0,0 +7 @@
|
|
+{{
|
|
+ "Workflows": ["pr"],
|
|
+ "GitProjectName": "products/SLFO#main",
|
|
+ "Organization": "pool",
|
|
+ "Branch": "main",
|
|
+ "ManualMergeProject": true,
|
|
+ "Reviewers": [ "-autogits_obs_staging_bot" ]
|
|
+}}
|
|
diff --git a/staging.config b/staging.config
|
|
new file mode 100644
|
|
--- /dev/null
|
|
+++ b/staging.config
|
|
@@ -0,0 +3 @@
|
|
+{{
|
|
+ "ObsProject": "openSUSE:Leap:16.0",
|
|
+ "StagingProject": "openSUSE:Leap:16.0:PullRequest"
|
|
+}}
|
|
"""
|
|
message = "Add pkgA and pkgB as submodules and config files"
|
|
data = {
|
|
"branch": "main",
|
|
"content": diff_content,
|
|
"message": message
|
|
}
|
|
print(f"Applying submodule patch to {org_name}/{repo_name}...")
|
|
self._request("POST", f"{parent_repo_path}/diffpatch", json=data)
|
|
print("Submodule patch applied.")
|
|
|
|
def update_repo_settings(self, org_name, repo_name):
|
|
print(f"--- Updating repository settings for: {org_name}/{repo_name} ---")
|
|
repo_data = self._request("GET", f"repos/{org_name}/{repo_name}").json()
|
|
|
|
# Ensure these are boolean values, not string
|
|
repo_data["allow_manual_merge"] = True
|
|
repo_data["autodetect_manual_merge"] = True
|
|
|
|
self._request("PATCH", f"repos/{org_name}/{repo_name}", json=repo_data)
|
|
print(f"Repository settings for '{org_name}/{repo_name}' updated.")
|
|
|
|
|
|
def create_gitea_pr(self, repo_full_name: str, diff_content: str, title: str):
|
|
owner, repo = repo_full_name.split("/")
|
|
url = f"repos/{owner}/{repo}/pulls"
|
|
base_branch = "main"
|
|
|
|
# Create a new branch for the PR
|
|
new_branch_name = f"pr-branch-{int(time.time())}"
|
|
|
|
# Get the latest commit SHA of the base branch
|
|
base_commit_sha = self._request("GET", f"repos/{owner}/{repo}/branches/{base_branch}").json()["commit"]["id"]
|
|
|
|
# Create the new branch
|
|
self._request("POST", f"repos/{owner}/{repo}/branches", json={
|
|
"new_branch_name": new_branch_name,
|
|
"old_ref": base_commit_sha # Use the commit SHA directly
|
|
})
|
|
|
|
# Create a new file or modify an existing one in the new branch
|
|
file_path = f"test-file-{int(time.time())}.txt"
|
|
file_content = "This is a test file for the PR."
|
|
self._request("POST", f"repos/{owner}/{repo}/contents/{file_path}", json={
|
|
"content": base64.b64encode(file_content.encode('utf-8')).decode('ascii'),
|
|
"message": "Add test file",
|
|
"branch": new_branch_name
|
|
})
|
|
|
|
# Now create the PR
|
|
data = {
|
|
"head": new_branch_name, # Use the newly created branch as head
|
|
"base": base_branch,
|
|
"title": title,
|
|
"body": "Test Pull Request"
|
|
}
|
|
response = self._request("POST", url, json=data)
|
|
return response.json()
|
|
|
|
def modify_gitea_pr(self, repo_full_name: str, pr_number: int, diff_content: str, message: str):
|
|
owner, repo = repo_full_name.split("/")
|
|
|
|
# Get PR details to find the head branch
|
|
pr_details = self._request("GET", f"repos/{owner}/{repo}/pulls/{pr_number}").json()
|
|
head_branch = pr_details["head"]["ref"]
|
|
|
|
file_path = f"modified-file-{int(time.time())}.txt"
|
|
file_content = "This is a modified test file for the PR."
|
|
|
|
self._request("POST", f"repos/{owner}/{repo}/contents/{file_path}", json={
|
|
"content": base64.b64encode(file_content.encode('utf-8')).decode('ascii'),
|
|
"message": message,
|
|
"branch": head_branch
|
|
})
|
|
|
|
def update_gitea_pr_properties(self, repo_full_name: str, pr_number: int, **kwargs):
|
|
owner, repo = repo_full_name.split("/")
|
|
url = f"repos/{owner}/{repo}/pulls/{pr_number}"
|
|
response = self._request("PATCH", url, json=kwargs)
|
|
return response.json()
|
|
|
|
def get_timeline_events(self, repo_full_name: str, pr_number: int):
|
|
owner, repo = repo_full_name.split("/")
|
|
url = f"repos/{owner}/{repo}/issues/{pr_number}/timeline"
|
|
|
|
# Retry logic for timeline events
|
|
for i in range(10): # Try up to 10 times
|
|
try:
|
|
response = self._request("GET", url)
|
|
timeline_events = response.json()
|
|
if timeline_events: # Check if timeline_events list is not empty
|
|
return timeline_events
|
|
print(f"Attempt {i+1}: Timeline for PR {pr_number} is empty. Retrying in 3 seconds...")
|
|
time.sleep(3)
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
print(f"Attempt {i+1}: Timeline for PR {pr_number} not found yet. Retrying in 3 seconds...")
|
|
time.sleep(3)
|
|
else:
|
|
raise # Re-raise other HTTP errors
|
|
raise Exception(f"Failed to retrieve timeline for PR {pr_number} after multiple retries.")
|
|
|
|
def get_comments(self, repo_full_name: str, pr_number: int):
|
|
owner, repo = repo_full_name.split("/")
|
|
url = f"repos/{owner}/{repo}/issues/{pr_number}/comments"
|
|
|
|
# Retry logic for comments
|
|
for i in range(10): # Try up to 10 times
|
|
try:
|
|
response = self._request("GET", url)
|
|
comments = response.json()
|
|
print(f"Attempt {i+1}: Comments for PR {pr_number} received: {comments}") # Added debug print
|
|
if comments: # Check if comments list is not empty
|
|
return comments
|
|
print(f"Attempt {i+1}: Comments for PR {pr_number} are empty. Retrying in 3 seconds...")
|
|
time.sleep(3)
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
print(f"Attempt {i+1}: Comments for PR {pr_number} not found yet. Retrying in 3 seconds...")
|
|
time.sleep(3)
|
|
else:
|
|
raise # Re-raise other HTTP errors
|
|
raise Exception(f"Failed to retrieve comments for PR {pr_number} after multiple retries.")
|
|
|
|
def get_pr_details(self, repo_full_name: str, pr_number: int):
|
|
owner, repo = repo_full_name.split("/")
|
|
url = f"repos/{owner}/{repo}/pulls/{pr_number}"
|
|
response = self._request("GET", url)
|
|
return response.json()
|
|
|