240 lines
9.5 KiB
Python
Executable File
240 lines
9.5 KiB
Python
Executable File
import os
|
|
import pytest
|
|
import requests
|
|
import re
|
|
import time
|
|
import random
|
|
import string
|
|
import subprocess
|
|
import xml.etree.ElementTree as ET
|
|
from pathlib import Path
|
|
|
|
# =============================================================================
|
|
# Constants
|
|
# =============================================================================
|
|
|
|
GITEA_URL = os.environ.get("GITEA_URL", "http://localhost:3000")
|
|
TOKEN_FILE = Path(__file__).parent.parent / "gitea-data" / "admin.token"
|
|
if TOKEN_FILE.exists():
|
|
with open(TOKEN_FILE) as f:
|
|
GITEA_TOKEN = f.read().strip()
|
|
else:
|
|
GITEA_TOKEN = os.environ.get("GITEA_TOKEN")
|
|
|
|
TEST_DATA_DIR = Path(__file__).parent / "data"
|
|
BUILD_RESULT_TEMPLATE = TEST_DATA_DIR / "build_result.xml.template"
|
|
MOCK_RESPONSES_DIR = Path(__file__).parent.parent / "mock-obs" / "responses"
|
|
MOCK_BUILD_RESULT_FILE = MOCK_RESPONSES_DIR / "GET_build_openSUSE:Leap:16.0:PullRequest:*__result"
|
|
MOCK_BUILD_RESULT_FILE1 = MOCK_RESPONSES_DIR / "GET_build_openSUSE:Leap:16.0__result"
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_build_result():
|
|
"""
|
|
Fixture to create a mock build result file from the template.
|
|
Returns a factory function that the test can call with parameters.
|
|
"""
|
|
def _create_result_file(package_name: str, code: str):
|
|
tree = ET.parse(BUILD_RESULT_TEMPLATE)
|
|
root = tree.getroot()
|
|
for status_tag in root.findall(".//status"):
|
|
status_tag.set("package", package_name)
|
|
status_tag.set("code", code)
|
|
|
|
MOCK_RESPONSES_DIR.mkdir(exist_ok=True)
|
|
tree.write(MOCK_BUILD_RESULT_FILE)
|
|
tree.write(MOCK_BUILD_RESULT_FILE1)
|
|
return str(MOCK_BUILD_RESULT_FILE)
|
|
|
|
yield _create_result_file
|
|
|
|
if MOCK_BUILD_RESULT_FILE.exists():
|
|
MOCK_BUILD_RESULT_FILE.unlink()
|
|
MOCK_BUILD_RESULT_FILE1.unlink()
|
|
|
|
|
|
# =============================================================================
|
|
# HELPER FUNCTIONS
|
|
# =============================================================================
|
|
|
|
def create_gitea_pr(repo: str, diff_content: str, message: str):
|
|
"""Creates a Gitea pull request using the two-step diffpatch and pulls API."""
|
|
if not GITEA_TOKEN:
|
|
pytest.fail("GITEA_TOKEN not set or token file not found.")
|
|
|
|
unique_id = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8))
|
|
new_branch_name = f"pr_test_{unique_id}"
|
|
|
|
headers = {
|
|
"Authorization": f"token {GITEA_TOKEN}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
diffpatch_url = f"{GITEA_URL}/api/v1/repos/{repo}/diffpatch"
|
|
diffpatch_payload = {
|
|
"branch": "main",
|
|
"new_branch": new_branch_name,
|
|
"content": diff_content,
|
|
"message": message,
|
|
}
|
|
patch_response = requests.post(diffpatch_url, headers=headers, json=diffpatch_payload)
|
|
patch_response.raise_for_status()
|
|
|
|
pulls_url = f"{GITEA_URL}/api/v1/repos/{repo}/pulls"
|
|
pulls_payload = {
|
|
"base": "main",
|
|
"head": new_branch_name,
|
|
"title": message,
|
|
"body": message,
|
|
}
|
|
pr_response = requests.post(pulls_url, headers=headers, json=pulls_payload)
|
|
pr_response.raise_for_status()
|
|
return pr_response.json()
|
|
|
|
|
|
def _get_timeline_events(repo, pr_number):
|
|
"""Helper to fetch timeline events for a given PR."""
|
|
url = f"{GITEA_URL}/api/v1/repos/{repo}/issues/{pr_number}/timeline"
|
|
headers = {"Authorization": f"token {GITEA_TOKEN}"}
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
|
|
def _get_comments(repo, pr_number):
|
|
"""Helper to fetch comments for a given PR."""
|
|
url = f"{GITEA_URL}/api/v1/repos/{repo}/issues/{pr_number}/comments"
|
|
headers = {"Authorization": f"token {GITEA_TOKEN}"}
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
|
|
def _get_pr_details(repo, pr_number):
|
|
"""Helper to fetch PR details."""
|
|
url = f"{GITEA_URL}/api/v1/repos/{repo}/pulls/{pr_number}"
|
|
headers = {"Authorization": f"token {GITEA_TOKEN}"}
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
# =============================================================================
|
|
# TEST CASES
|
|
# =============================================================================
|
|
|
|
@pytest.mark.skipif(not all([GITEA_URL, GITEA_TOKEN]), reason="GITEA_URL and GITEA_TOKEN must be set")
|
|
def test_pr_workflow_succeeded(mock_build_result):
|
|
"""End-to-end test for a successful PR workflow."""
|
|
diff = "diff --git a/test.txt b/test.txt\nnew file mode 100644\nindex 0000000..e69de29\n"
|
|
pr = create_gitea_pr("pool/pkgA", diff, "Test PR - should succeed")
|
|
initial_pr_number = pr["number"]
|
|
|
|
compose_dir = Path(__file__).parent.parent
|
|
|
|
forwarded_pr_number = None
|
|
print(f"Polling pool/pkgA PR #{initial_pr_number} timeline for forwarded PR event...")
|
|
for _ in range(20):
|
|
time.sleep(1)
|
|
timeline_events = _get_timeline_events("pool/pkgA", initial_pr_number)
|
|
for event in timeline_events:
|
|
if event.get("type") == "pull_ref":
|
|
if not (ref_issue := event.get("ref_issue")):
|
|
continue
|
|
url_to_check = ref_issue.get("html_url", "")
|
|
match = re.search(r'products/SLFO/pulls/(\d+)', url_to_check)
|
|
if match:
|
|
forwarded_pr_number = match.group(1)
|
|
break
|
|
if forwarded_pr_number:
|
|
break
|
|
assert forwarded_pr_number is not None, "Workflow bot did not create a pull_ref event on the timeline."
|
|
print(f"Found forwarded PR: products/SLFO #{forwarded_pr_number}")
|
|
|
|
print(f"Polling products/SLFO PR #{forwarded_pr_number} for reviewer assignment...")
|
|
reviewer_added = False
|
|
for _ in range(15):
|
|
time.sleep(1)
|
|
pr_details = _get_pr_details("products/SLFO", forwarded_pr_number)
|
|
if any(r.get('login') == 'autogits_obs_staging_bot' for r in pr_details.get('requested_reviewers', [])):
|
|
reviewer_added = True
|
|
break
|
|
assert reviewer_added, "Staging bot was not added as a reviewer."
|
|
print("Staging bot has been added as a reviewer.")
|
|
|
|
mock_build_result(package_name="pkgA", code="succeeded")
|
|
|
|
print("Restarting obs-staging-bot...")
|
|
subprocess.run(["podman-compose", "restart", "obs-staging-bot"], cwd=compose_dir, check=True, capture_output=True)
|
|
|
|
print(f"Polling products/SLFO PR #{forwarded_pr_number} for final status...")
|
|
status_comment_found = False
|
|
for _ in range(20):
|
|
time.sleep(1)
|
|
timeline_events = _get_timeline_events("products/SLFO", forwarded_pr_number)
|
|
for event in timeline_events:
|
|
print(event.get("body", "not a body"))
|
|
if event.get("body") and "successful" in event["body"]:
|
|
status_comment_found = True
|
|
break
|
|
if status_comment_found:
|
|
break
|
|
assert status_comment_found, "Staging bot did not post a 'successful' comment."
|
|
|
|
|
|
@pytest.mark.skipif(not all([GITEA_URL, GITEA_TOKEN]), reason="GITEA_URL and GITEA_TOKEN must be set")
|
|
def test_pr_workflow_failed(mock_build_result):
|
|
"""End-to-end test for a failed PR workflow."""
|
|
diff = "diff --git a/another_test.txt b/another_test.txt\nnew file mode 100644\nindex 0000000..e69de29\n"
|
|
pr = create_gitea_pr("pool/pkgA", diff, "Test PR - should fail")
|
|
initial_pr_number = pr["number"]
|
|
|
|
compose_dir = Path(__file__).parent.parent
|
|
|
|
forwarded_pr_number = None
|
|
print(f"Polling pool/pkgA PR #{initial_pr_number} timeline for forwarded PR event...")
|
|
for _ in range(20):
|
|
time.sleep(1)
|
|
timeline_events = _get_timeline_events("pool/pkgA", initial_pr_number)
|
|
for event in timeline_events:
|
|
if event.get("type") == "pull_ref":
|
|
if not (ref_issue := event.get("ref_issue")):
|
|
continue
|
|
url_to_check = ref_issue.get("html_url", "")
|
|
match = re.search(r'products/SLFO/pulls/(\d+)', url_to_check)
|
|
if match:
|
|
forwarded_pr_number = match.group(1)
|
|
break
|
|
if forwarded_pr_number:
|
|
break
|
|
assert forwarded_pr_number is not None, "Workflow bot did not create a pull_ref event on the timeline."
|
|
print(f"Found forwarded PR: products/SLFO #{forwarded_pr_number}")
|
|
|
|
print(f"Polling products/SLFO PR #{forwarded_pr_number} for reviewer assignment...")
|
|
reviewer_added = False
|
|
for _ in range(15):
|
|
time.sleep(1)
|
|
pr_details = _get_pr_details("products/SLFO", forwarded_pr_number)
|
|
if any(r.get('login') == 'autogits_obs_staging_bot' for r in pr_details.get('requested_reviewers', [])):
|
|
reviewer_added = True
|
|
break
|
|
assert reviewer_added, "Staging bot was not added as a reviewer."
|
|
print("Staging bot has been added as a reviewer.")
|
|
|
|
mock_build_result(package_name="pkgA", code="failed")
|
|
|
|
print("Restarting obs-staging-bot...")
|
|
subprocess.run(["podman-compose", "restart", "obs-staging-bot"], cwd=compose_dir, check=True, capture_output=True)
|
|
|
|
print(f"Polling products/SLFO PR #{forwarded_pr_number} for final status...")
|
|
status_comment_found = False
|
|
for _ in range(20):
|
|
time.sleep(1)
|
|
timeline_events = _get_timeline_events("products/SLFO", forwarded_pr_number)
|
|
for event in timeline_events:
|
|
if event.get("body") and "failed" in event["body"]:
|
|
status_comment_found = True
|
|
break
|
|
if status_comment_found:
|
|
break
|
|
assert status_comment_found, "Staging bot did not post a 'failed' comment."
|