import os import time import pytest import requests import json import xml.etree.ElementTree as ET from pathlib import Path import base64 TEST_DATA_DIR = Path(__file__).parent.parent / "data" BUILD_RESULT_TEMPLATE = TEST_DATA_DIR / "build_result.xml.template" MOCK_RESPONSES_DIR = Path(__file__).parent.parent.parent / "mock-obs" / "responses" MOCK_BUILD_RESULT_FILE = ( MOCK_RESPONSES_DIR / "GET_build_openSUSE:Leap:16.0:PullRequest:*__result" ) MOCK_BUILD_RESULT_FILE1 = MOCK_RESPONSES_DIR / "GET_build_openSUSE:Leap:16.0__result" @pytest.fixture def mock_build_result(): """ Fixture to create a mock build result file from the template. Returns a factory function that the test can call with parameters. """ def _create_result_file(package_name: str, code: str): tree = ET.parse(BUILD_RESULT_TEMPLATE) root = tree.getroot() for status_tag in root.findall(".//status"): status_tag.set("package", package_name) status_tag.set("code", code) MOCK_RESPONSES_DIR.mkdir(exist_ok=True) tree.write(MOCK_BUILD_RESULT_FILE) tree.write(MOCK_BUILD_RESULT_FILE1) return str(MOCK_BUILD_RESULT_FILE) yield _create_result_file if MOCK_BUILD_RESULT_FILE.exists(): MOCK_BUILD_RESULT_FILE.unlink() MOCK_BUILD_RESULT_FILE1.unlink() class GiteaAPIClient: def __init__(self, base_url, token): self.base_url = base_url self.headers = {"Authorization": f"token {token}", "Content-Type": "application/json"} def _request(self, method, path, **kwargs): url = f"{self.base_url}/api/v1/{path}" response = requests.request(method, url, headers=self.headers, **kwargs) try: response.raise_for_status() except requests.exceptions.HTTPError as e: print(f"HTTPError in _request: {e}") print(f"Response Content: {e.response.text}") raise return response def create_org(self, org_name): print(f"--- Checking organization: {org_name} ---") try: self._request("GET", f"orgs/{org_name}") print(f"Organization '{org_name}' already exists.") except requests.exceptions.HTTPError as e: if e.response.status_code == 404: print(f"Creating organization '{org_name}'...") data = {"username": org_name, "full_name": org_name} self._request("POST", "orgs", json=data) print(f"Organization '{org_name}' created.") else: raise def create_repo(self, org_name, repo_name): print(f"--- Checking repository: {org_name}/{repo_name} ---") try: self._request("GET", f"repos/{org_name}/{repo_name}") print(f"Repository '{org_name}/{repo_name}' already exists.") except requests.exceptions.HTTPError as e: if e.response.status_code == 404: print(f"Creating repository '{org_name}/{repo_name}'...") data = { "name": repo_name, "auto_init": True, "default_branch": "main", "gitignores": "Go", "license": "MIT", "private": False, "readme": "Default" } self._request("POST", f"orgs/{org_name}/repos", json=data) print(f"Repository '{org_name}/{repo_name}' created with a README.") time.sleep(1) # Added delay to allow Git operations to become available else: raise def add_collaborator(self, org_name, repo_name, collaborator_name, permission="write"): print(f"--- Adding {collaborator_name} as a collaborator to {org_name}/{repo_name} with '{permission}' permission ---") data = {"permission": permission} # Gitea API returns 204 No Content on success and doesn't fail if already present. self._request("PUT", f"repos/{org_name}/{repo_name}/collaborators/{collaborator_name}", json=data) print(f"Attempted to add {collaborator_name} to {org_name}/{repo_name}.") def add_submodules(self, org_name, repo_name): print(f"--- Adding submodules to {org_name}/{repo_name} using diffpatch ---") parent_repo_path = f"repos/{org_name}/{repo_name}" try: self._request("GET", f"{parent_repo_path}/contents/.gitmodules") print("Submodules appear to be already added. Skipping.") return except requests.exceptions.HTTPError as e: if e.response.status_code != 404: raise # Get latest commit SHAs for the submodules pkg_a_sha = self._request("GET", "repos/pool/pkgA/branches/main").json()["commit"]["id"] pkg_b_sha = self._request("GET", "repos/pool/pkgB/branches/main").json()["commit"]["id"] if not pkg_a_sha or not pkg_b_sha: raise Exception("Error: Could not get submodule commit SHAs. Cannot apply patch.") diff_content = f"""diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..f1838bd --- /dev/null +++ b/.gitmodules @@ -0,0 +1,6 @@ +[submodule "pkgA"] + path = pkgA + url = ../../pool/pkgA.git +[submodule "pkgB"] + path = pkgB + url = ../../pool/pkgB.git diff --git a/pkgA b/pkgA new file mode 160000 index 0000000..{pkg_a_sha} --- /dev/null +++ b/pkgA @@ -0,0 +1 @@ +Subproject commit {pkg_a_sha} diff --git a/pkgB b/pkgB new file mode 160000 index 0000000..{pkg_b_sha} --- /dev/null +++ b/pkgB @@ -0,0 +1 @@ +Subproject commit {pkg_b_sha} diff --git a/workflow.config b/workflow.config new file mode 100644 --- /dev/null +++ b/workflow.config @@ -0,0 +7 @@ +{{ + "Workflows": ["pr"], + "GitProjectName": "products/SLFO#main", + "Organization": "pool", + "Branch": "main", + "ManualMergeProject": true, + "Reviewers": [ "-autogits_obs_staging_bot" ] +}} diff --git a/staging.config b/staging.config new file mode 100644 --- /dev/null +++ b/staging.config @@ -0,0 +3 @@ +{{ + "ObsProject": "openSUSE:Leap:16.0", + "StagingProject": "openSUSE:Leap:16.0:PullRequest" +}} """ message = "Add pkgA and pkgB as submodules and config files" data = { "branch": "main", "content": diff_content, "message": message } print(f"Applying submodule patch to {org_name}/{repo_name}...") self._request("POST", f"{parent_repo_path}/diffpatch", json=data) print("Submodule patch applied.") def update_repo_settings(self, org_name, repo_name): print(f"--- Updating repository settings for: {org_name}/{repo_name} ---") repo_data = self._request("GET", f"repos/{org_name}/{repo_name}").json() # Ensure these are boolean values, not string repo_data["allow_manual_merge"] = True repo_data["autodetect_manual_merge"] = True self._request("PATCH", f"repos/{org_name}/{repo_name}", json=repo_data) print(f"Repository settings for '{org_name}/{repo_name}' updated.") def create_gitea_pr(self, repo_full_name: str, diff_content: str, title: str): owner, repo = repo_full_name.split("/") url = f"repos/{owner}/{repo}/pulls" base_branch = "main" # Create a new branch for the PR new_branch_name = f"pr-branch-{int(time.time())}" # Get the latest commit SHA of the base branch base_commit_sha = self._request("GET", f"repos/{owner}/{repo}/branches/{base_branch}").json()["commit"]["id"] # Create the new branch self._request("POST", f"repos/{owner}/{repo}/branches", json={ "new_branch_name": new_branch_name, "old_ref": base_commit_sha # Use the commit SHA directly }) # Create a new file or modify an existing one in the new branch file_path = f"test-file-{int(time.time())}.txt" file_content = "This is a test file for the PR." self._request("POST", f"repos/{owner}/{repo}/contents/{file_path}", json={ "content": base64.b64encode(file_content.encode('utf-8')).decode('ascii'), "message": "Add test file", "branch": new_branch_name }) # Now create the PR data = { "head": new_branch_name, # Use the newly created branch as head "base": base_branch, "title": title, "body": "Test Pull Request" } response = self._request("POST", url, json=data) return response.json() def modify_gitea_pr(self, repo_full_name: str, pr_number: int, diff_content: str, message: str): owner, repo = repo_full_name.split("/") # Get PR details to find the head branch pr_details = self._request("GET", f"repos/{owner}/{repo}/pulls/{pr_number}").json() head_branch = pr_details["head"]["ref"] file_path = f"modified-file-{int(time.time())}.txt" file_content = "This is a modified test file for the PR." self._request("POST", f"repos/{owner}/{repo}/contents/{file_path}", json={ "content": base64.b64encode(file_content.encode('utf-8')).decode('ascii'), "message": message, "branch": head_branch }) def update_gitea_pr_properties(self, repo_full_name: str, pr_number: int, **kwargs): owner, repo = repo_full_name.split("/") url = f"repos/{owner}/{repo}/pulls/{pr_number}" response = self._request("PATCH", url, json=kwargs) return response.json() def get_timeline_events(self, repo_full_name: str, pr_number: int): owner, repo = repo_full_name.split("/") url = f"repos/{owner}/{repo}/issues/{pr_number}/timeline" # Retry logic for timeline events for i in range(10): # Try up to 10 times try: response = self._request("GET", url) timeline_events = response.json() if timeline_events: # Check if timeline_events list is not empty return timeline_events print(f"Attempt {i+1}: Timeline for PR {pr_number} is empty. Retrying in 3 seconds...") time.sleep(3) except requests.exceptions.HTTPError as e: if e.response.status_code == 404: print(f"Attempt {i+1}: Timeline for PR {pr_number} not found yet. Retrying in 3 seconds...") time.sleep(3) else: raise # Re-raise other HTTP errors raise Exception(f"Failed to retrieve timeline for PR {pr_number} after multiple retries.") def get_comments(self, repo_full_name: str, pr_number: int): owner, repo = repo_full_name.split("/") url = f"repos/{owner}/{repo}/issues/{pr_number}/comments" # Retry logic for comments for i in range(10): # Try up to 10 times try: response = self._request("GET", url) comments = response.json() print(f"Attempt {i+1}: Comments for PR {pr_number} received: {comments}") # Added debug print if comments: # Check if comments list is not empty return comments print(f"Attempt {i+1}: Comments for PR {pr_number} are empty. Retrying in 3 seconds...") time.sleep(3) except requests.exceptions.HTTPError as e: if e.response.status_code == 404: print(f"Attempt {i+1}: Comments for PR {pr_number} not found yet. Retrying in 3 seconds...") time.sleep(3) else: raise # Re-raise other HTTP errors raise Exception(f"Failed to retrieve comments for PR {pr_number} after multiple retries.") def get_pr_details(self, repo_full_name: str, pr_number: int): owner, repo = repo_full_name.split("/") url = f"repos/{owner}/{repo}/pulls/{pr_number}" response = self._request("GET", url) return response.json()