import os import pytest import requests import re import time import random import string import subprocess import xml.etree.ElementTree as ET from pathlib import Path # ============================================================================= # Constants # ============================================================================= GITEA_URL = os.environ.get("GITEA_URL", "http://localhost:3000") TOKEN_FILE = Path(__file__).parent.parent / "gitea-data" / "admin.token" if TOKEN_FILE.exists(): with open(TOKEN_FILE) as f: GITEA_TOKEN = f.read().strip() else: GITEA_TOKEN = os.environ.get("GITEA_TOKEN") TEST_DATA_DIR = Path(__file__).parent / "data" BUILD_RESULT_TEMPLATE = TEST_DATA_DIR / "build_result.xml.template" MOCK_RESPONSES_DIR = Path(__file__).parent.parent / "mock-obs" / "responses" MOCK_BUILD_RESULT_FILE = MOCK_RESPONSES_DIR / "GET_build_openSUSE:Leap:16.0:PullRequest:*__result" MOCK_BUILD_RESULT_FILE1 = MOCK_RESPONSES_DIR / "GET_build_openSUSE:Leap:16.0__result" @pytest.fixture def mock_build_result(): """ Fixture to create a mock build result file from the template. Returns a factory function that the test can call with parameters. """ def _create_result_file(package_name: str, code: str): tree = ET.parse(BUILD_RESULT_TEMPLATE) root = tree.getroot() for status_tag in root.findall(".//status"): status_tag.set("package", package_name) status_tag.set("code", code) MOCK_RESPONSES_DIR.mkdir(exist_ok=True) tree.write(MOCK_BUILD_RESULT_FILE) tree.write(MOCK_BUILD_RESULT_FILE1) return str(MOCK_BUILD_RESULT_FILE) yield _create_result_file if MOCK_BUILD_RESULT_FILE.exists(): MOCK_BUILD_RESULT_FILE.unlink() MOCK_BUILD_RESULT_FILE1.unlink() # ============================================================================= # HELPER FUNCTIONS # ============================================================================= def create_gitea_pr(repo: str, diff_content: str, message: str): """Creates a Gitea pull request using the two-step diffpatch and pulls API.""" if not GITEA_TOKEN: pytest.fail("GITEA_TOKEN not set or token file not found.") unique_id = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8)) new_branch_name = f"pr_test_{unique_id}" headers = { "Authorization": f"token {GITEA_TOKEN}", "Content-Type": "application/json", } diffpatch_url = f"{GITEA_URL}/api/v1/repos/{repo}/diffpatch" diffpatch_payload = { "branch": "main", "new_branch": new_branch_name, "content": diff_content, "message": message, } patch_response = requests.post(diffpatch_url, headers=headers, json=diffpatch_payload) patch_response.raise_for_status() pulls_url = f"{GITEA_URL}/api/v1/repos/{repo}/pulls" pulls_payload = { "base": "main", "head": new_branch_name, "title": message, "body": message, } pr_response = requests.post(pulls_url, headers=headers, json=pulls_payload) pr_response.raise_for_status() return pr_response.json() def _get_timeline_events(repo, pr_number): """Helper to fetch timeline events for a given PR.""" url = f"{GITEA_URL}/api/v1/repos/{repo}/issues/{pr_number}/timeline" headers = {"Authorization": f"token {GITEA_TOKEN}"} response = requests.get(url, headers=headers) response.raise_for_status() return response.json() def _get_comments(repo, pr_number): """Helper to fetch comments for a given PR.""" url = f"{GITEA_URL}/api/v1/repos/{repo}/issues/{pr_number}/comments" headers = {"Authorization": f"token {GITEA_TOKEN}"} response = requests.get(url, headers=headers) response.raise_for_status() return response.json() def _get_pr_details(repo, pr_number): """Helper to fetch PR details.""" url = f"{GITEA_URL}/api/v1/repos/{repo}/pulls/{pr_number}" headers = {"Authorization": f"token {GITEA_TOKEN}"} response = requests.get(url, headers=headers) response.raise_for_status() return response.json() # ============================================================================= # TEST CASES # ============================================================================= @pytest.mark.skipif(not all([GITEA_URL, GITEA_TOKEN]), reason="GITEA_URL and GITEA_TOKEN must be set") def test_pr_workflow_succeeded(mock_build_result): """End-to-end test for a successful PR workflow.""" diff = "diff --git a/test.txt b/test.txt\nnew file mode 100644\nindex 0000000..e69de29\n" pr = create_gitea_pr("pool/pkgA", diff, "Test PR - should succeed") initial_pr_number = pr["number"] compose_dir = Path(__file__).parent.parent forwarded_pr_number = None print(f"Polling pool/pkgA PR #{initial_pr_number} timeline for forwarded PR event...") for _ in range(20): time.sleep(1) timeline_events = _get_timeline_events("pool/pkgA", initial_pr_number) for event in timeline_events: if event.get("type") == "pull_ref": if not (ref_issue := event.get("ref_issue")): continue url_to_check = ref_issue.get("html_url", "") match = re.search(r'products/SLFO/pulls/(\d+)', url_to_check) if match: forwarded_pr_number = match.group(1) break if forwarded_pr_number: break assert forwarded_pr_number is not None, "Workflow bot did not create a pull_ref event on the timeline." print(f"Found forwarded PR: products/SLFO #{forwarded_pr_number}") print(f"Polling products/SLFO PR #{forwarded_pr_number} for reviewer assignment...") reviewer_added = False for _ in range(15): time.sleep(1) pr_details = _get_pr_details("products/SLFO", forwarded_pr_number) if any(r.get('login') == 'autogits_obs_staging_bot' for r in pr_details.get('requested_reviewers', [])): reviewer_added = True break assert reviewer_added, "Staging bot was not added as a reviewer." print("Staging bot has been added as a reviewer.") mock_build_result(package_name="pkgA", code="succeeded") print("Restarting obs-staging-bot...") subprocess.run(["podman-compose", "restart", "obs-staging-bot"], cwd=compose_dir, check=True, capture_output=True) print(f"Polling products/SLFO PR #{forwarded_pr_number} for final status...") status_comment_found = False for _ in range(20): time.sleep(1) timeline_events = _get_timeline_events("products/SLFO", forwarded_pr_number) for event in timeline_events: print(event.get("body", "not a body")) if event.get("body") and "successful" in event["body"]: status_comment_found = True break if status_comment_found: break assert status_comment_found, "Staging bot did not post a 'successful' comment." @pytest.mark.skipif(not all([GITEA_URL, GITEA_TOKEN]), reason="GITEA_URL and GITEA_TOKEN must be set") def test_pr_workflow_failed(mock_build_result): """End-to-end test for a failed PR workflow.""" diff = "diff --git a/another_test.txt b/another_test.txt\nnew file mode 100644\nindex 0000000..e69de29\n" pr = create_gitea_pr("pool/pkgA", diff, "Test PR - should fail") initial_pr_number = pr["number"] compose_dir = Path(__file__).parent.parent forwarded_pr_number = None print(f"Polling pool/pkgA PR #{initial_pr_number} timeline for forwarded PR event...") for _ in range(20): time.sleep(1) timeline_events = _get_timeline_events("pool/pkgA", initial_pr_number) for event in timeline_events: if event.get("type") == "pull_ref": if not (ref_issue := event.get("ref_issue")): continue url_to_check = ref_issue.get("html_url", "") match = re.search(r'products/SLFO/pulls/(\d+)', url_to_check) if match: forwarded_pr_number = match.group(1) break if forwarded_pr_number: break assert forwarded_pr_number is not None, "Workflow bot did not create a pull_ref event on the timeline." print(f"Found forwarded PR: products/SLFO #{forwarded_pr_number}") print(f"Polling products/SLFO PR #{forwarded_pr_number} for reviewer assignment...") reviewer_added = False for _ in range(15): time.sleep(1) pr_details = _get_pr_details("products/SLFO", forwarded_pr_number) if any(r.get('login') == 'autogits_obs_staging_bot' for r in pr_details.get('requested_reviewers', [])): reviewer_added = True break assert reviewer_added, "Staging bot was not added as a reviewer." print("Staging bot has been added as a reviewer.") mock_build_result(package_name="pkgA", code="failed") print("Restarting obs-staging-bot...") subprocess.run(["podman-compose", "restart", "obs-staging-bot"], cwd=compose_dir, check=True, capture_output=True) print(f"Polling products/SLFO PR #{forwarded_pr_number} for final status...") status_comment_found = False for _ in range(20): time.sleep(1) timeline_events = _get_timeline_events("products/SLFO", forwarded_pr_number) for event in timeline_events: if event.get("body") and "failed" in event["body"]: status_comment_found = True break if status_comment_found: break assert status_comment_found, "Staging bot did not post a 'failed' comment."