SHA256
1
0
Files
autogits/integration/tests/lib/common_test_utils.py
Andrii Nikitin e1ed2e78e0 t: create the repos using sha256 format
- add "object_format_name": "sha256" to api in create_repo()
- update add_submodules() and diff to use sha256 style
2026-03-09 13:27:34 +01:00

593 lines
27 KiB
Python

import os
import time
import pytest
import requests
import json
import re
import xml.etree.ElementTree as ET
from pathlib import Path
import base64
IS_TEST_RUN = False
def vprint(*args, **kwargs):
if IS_TEST_RUN or os.environ.get("AUTOGITS_PRINT_FIXTURES") == "1":
print(*args, **kwargs)
class GiteaAPIClient:
def __init__(self, base_url, token, sudo=None):
self.base_url = base_url
self.headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
if sudo:
self.headers["Sudo"] = sudo
self._cache = {}
self.use_cache = False
def _request(self, method, path, **kwargs):
# Very basic cache for GET requests to speed up setup
cache_key = (method, path, json.dumps(kwargs, sort_keys=True))
if self.use_cache and method == "GET" and cache_key in self._cache:
return self._cache[cache_key], 0.0
url = f"{self.base_url}/api/v1/{path}"
start_time = time.time()
try:
response = requests.request(method, url, headers=self.headers, **kwargs)
duration = time.time() - start_time
response.raise_for_status()
if self.use_cache:
if method == "GET":
self._cache[cache_key] = response
else:
self._cache.clear()
return response, duration
except requests.exceptions.HTTPError as e:
duration = time.time() - start_time
vprint(f"[{duration:.3f}s] HTTPError in _request: {e}")
vprint(f"Response Content: {e.response.text}")
raise
except requests.exceptions.RequestException as e:
duration = time.time() - start_time
vprint(f"[{duration:.3f}s] Request failed: {e}")
raise
def get_file_info(self, owner: str, repo: str, file_path: str, branch: str = "main"):
url = f"repos/{owner}/{repo}/contents/{file_path}"
if branch and branch != "main":
url += f"?ref={branch}"
try:
response, duration = self._request("GET", url)
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
return None
raise
def create_user(self, username, password, email):
vprint(f"--- Creating user: {username} ---")
data = {
"username": username,
"password": password,
"email": email,
"must_change_password": False,
"send_notify": False
}
try:
response, duration = self._request("POST", "admin/users", json=data)
vprint(f"[{duration:.3f}s] User '{username}' created.")
return True
except requests.exceptions.HTTPError as e:
if e.response.status_code == 422: # Already exists
vprint(f"User '{username}' already exists. Updating password...")
# Update password to be sure it matches our expectation
response, duration = self._request("PATCH", f"admin/users/{username}", json={"password": password, "login_name": username})
return False
else:
raise
def get_user_token(self, username, password, token_name="test-token"):
vprint(f"--- Getting token for user: {username} ---")
url = f"{self.base_url}/api/v1/users/{username}/tokens"
# Create new token using Basic Auth
response = requests.post(url, auth=(username, password), json={"name": token_name})
if response.status_code == 201:
return response.json()["sha1"]
response.raise_for_status()
def create_org(self, org_name):
vprint(f"--- Checking organization: {org_name} ---")
try:
response, duration = self._request("GET", f"orgs/{org_name}")
vprint(f"[{duration:.3f}s] Organization '{org_name}' already exists.")
return False
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
vprint(f"Creating organization '{org_name}'...")
data = {"username": org_name, "full_name": org_name}
response, duration = self._request("POST", "orgs", json=data)
vprint(f"[{duration:.3f}s] Organization '{org_name}' created.")
return True
else:
raise
def create_repo(self, org_name, repo_name):
vprint(f"--- Checking repository: {org_name}/{repo_name} ---")
try:
response, duration = self._request("GET", f"repos/{org_name}/{repo_name}")
vprint(f"[{duration:.3f}s] Repository '{org_name}/{repo_name}' already exists.")
return False
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
vprint(f"Creating repository '{org_name}/{repo_name}'...")
data = {
"name": repo_name,
"auto_init": True,
"default_branch": "main",
"gitignores": "Go",
"license": "MIT",
"private": False,
"readme": "Default",
"object_format_name": "sha256"
}
response, duration = self._request("POST", f"orgs/{org_name}/repos", json=data)
vprint(f"[{duration:.3f}s] Repository '{org_name}/{repo_name}' created with a README.")
time.sleep(0.1) # Added delay to allow Git operations to become available
return True
else:
raise
def add_collaborator(self, org_name, repo_name, collaborator_name, permission="write"):
vprint(f"--- Adding {collaborator_name} as a collaborator to {org_name}/{repo_name} with '{permission}' permission ---")
# Check if already a collaborator to provide accurate stats
try:
self._request("GET", f"repos/{org_name}/{repo_name}/collaborators/{collaborator_name}")
vprint(f"{collaborator_name} is already a collaborator of {org_name}/{repo_name}.")
return False
except requests.exceptions.HTTPError as e:
if e.response.status_code != 404:
raise
data = {"permission": permission}
# Gitea API returns 204 No Content on success and doesn't fail if already present.
response, duration = self._request("PUT", f"repos/{org_name}/{repo_name}/collaborators/{collaborator_name}", json=data)
vprint(f"[{duration:.3f}s] Added {collaborator_name} to {org_name}/{repo_name}.")
return True
def add_submodules(self, org_name, repo_name):
vprint(f"--- Adding submodules to {org_name}/{repo_name} using diffpatch ---")
parent_repo_path = f"repos/{org_name}/{repo_name}"
try:
response, duration = self._request("GET", f"{parent_repo_path}/contents/.gitmodules")
vprint(f"[{duration:.3f}s] Submodules appear to be already added. Skipping.")
return
except requests.exceptions.HTTPError as e:
if e.response.status_code != 404:
raise
# Get latest commit SHAs for the submodules
response_a, duration_a = self._request("GET", "repos/mypool/pkgA/branches/main")
pkg_a_sha = response_a.json()["commit"]["id"]
response_b, duration_b = self._request("GET", "repos/mypool/pkgB/branches/main")
pkg_b_sha = response_b.json()["commit"]["id"]
if not pkg_a_sha or not pkg_b_sha:
raise Exception("Error: Could not get submodule commit SHAs. Cannot apply patch.")
diff_content = f"""diff --git a/.gitmodules b/.gitmodules
new file mode 100644
index 00000000..f1838bd9
--- /dev/null
+++ b/.gitmodules
@@ -0,0 +1,6 @@
+[submodule "pkgA"]
+ path = pkgA
+ url = ../../mypool/pkgA.git
+[submodule "pkgB"]
+ path = pkgB
+ url = ../../mypool/pkgB.git
diff --git a/pkgA b/pkgA
new file mode 160000
index 00000000..{pkg_a_sha}
--- /dev/null
+++ b/pkgA
@@ -0,0 +1 @@
+Subproject commit {pkg_a_sha}
diff --git a/pkgB b/pkgB
new file mode 160000
index 00000000..{pkg_b_sha}
--- /dev/null
+++ b/pkgB
@@ -0,0 +1 @@
+Subproject commit {pkg_b_sha}
"""
message = "Add pkgA and pkgB as submodules"
data = {
"branch": "main",
"content": diff_content,
"message": message
}
vprint(f"Applying submodule patch to {org_name}/{repo_name}...")
response, duration = self._request("POST", f"{parent_repo_path}/diffpatch", json=data)
vprint(f"[{duration:.3f}s] Submodule patch applied.")
def update_repo_settings(self, org_name, repo_name):
vprint(f"--- Updating repository settings for: {org_name}/{repo_name} ---")
response, duration = self._request("GET", f"repos/{org_name}/{repo_name}")
repo_data = response.json()
# Ensure these are boolean values, not string
repo_data["allow_manual_merge"] = True
repo_data["autodetect_manual_merge"] = True
response, duration = self._request("PATCH", f"repos/{org_name}/{repo_name}", json=repo_data)
vprint(f"[{duration:.3f}s] Repository settings for '{org_name}/{repo_name}' updated.")
def create_webhook(self, owner: str, repo: str, target_url: str):
vprint(f"--- Checking webhook for {owner}/{repo} -> {target_url} ---")
url = f"repos/{owner}/{repo}/hooks"
try:
response, duration = self._request("GET", url)
hooks = response.json()
for hook in hooks:
if hook["config"]["url"] == target_url:
vprint(f"Webhook for {owner}/{repo} already exists with correct URL.")
return False
elif "gitea-publisher" in hook["config"]["url"] or "10.89.0." in hook["config"]["url"]:
vprint(f"Found old webhook {hook['id']} with URL {hook['config']['url']}. Deleting...")
self._request("DELETE", f"{url}/{hook['id']}")
except requests.exceptions.HTTPError:
pass
vprint(f"--- Creating webhook for {owner}/{repo} -> {target_url} ---")
data = {
"type": "gitea",
"config": {
"url": target_url,
"content_type": "json"
},
"events": ["push", "pull_request", "pull_request_review", "issue_comment"],
"active": True
}
response, duration = self._request("POST", url, json=data)
vprint(f"[{duration:.3f}s] Webhook created for {owner}/{repo}.")
return True
def create_label(self, owner: str, repo: str, name: str, color: str = "#abcdef"):
vprint(f"--- Checking label '{name}' in {owner}/{repo} ---")
url = f"repos/{owner}/{repo}/labels"
# Check if label exists first
try:
response, duration = self._request("GET", url)
labels = response.json()
for label in labels:
if label["name"] == name:
vprint(f"Label '{name}' already exists in {owner}/{repo}.")
return False
except requests.exceptions.HTTPError:
pass
vprint(f"--- Creating label '{name}' in {owner}/{repo} ---")
data = {
"name": name,
"color": color
}
try:
response, duration = self._request("POST", url, json=data)
vprint(f"[{duration:.3f}s] Label '{name}' created.")
return True
except requests.exceptions.HTTPError as e:
if e.response.status_code == 422: # Already exists (race condition or other reason)
vprint(f"Label '{name}' already exists.")
return False
else:
raise
def create_file(self, owner: str, repo: str, file_path: str, content: str, branch: str = "main", message: str = "Add file"):
file_info = self.get_file_info(owner, repo, file_path, branch=branch)
data = {
"content": base64.b64encode(content.encode('utf-8')).decode('ascii'),
"branch": branch,
"message": message
}
if file_info:
vprint(f"--- Updating file {file_path} in {owner}/{repo} ---")
# Re-fetch file_info to get the latest SHA right before update
latest_file_info = self.get_file_info(owner, repo, file_path, branch=branch)
if not latest_file_info:
raise Exception(f"File {file_path} disappeared during update attempt.")
data["sha"] = latest_file_info["sha"]
data["message"] = f"Update {file_path}"
method = "PUT"
else:
vprint(f"--- Creating file {file_path} in {owner}/{repo} ---")
method = "POST"
url = f"repos/{owner}/{repo}/contents/{file_path}"
response, duration = self._request(method, url, json=data)
vprint(f"[{duration:.3f}s] File {file_path} {'updated' if file_info else 'created'} in {owner}/{repo}.")
def create_gitea_pr(self, repo_full_name: str, diff_content: str, title: str, use_fork: bool, base_branch: str = "main", body: str = ""):
owner, repo = repo_full_name.split("/")
head_owner, head_repo = owner, repo
new_branch_name = f"pr-branch-{int(time.time()*1000)}"
if use_fork:
sudo_user = self.headers.get("Sudo")
head_owner = sudo_user
head_repo = repo
vprint(f"--- Forking {repo_full_name} ---")
try:
response, duration = self._request("POST", f"repos/{owner}/{repo}/forks", json={})
vprint(f"[{duration:.3f}s] --- Forked to {head_owner}/{head_repo} ---")
time.sleep(0.5) # Give more time for fork to be ready
except requests.exceptions.HTTPError as e:
if e.response.status_code == 409: # Already forked
vprint(f"--- Already forked to {head_owner}/{head_repo} ---")
else:
raise
# Apply the diff using diffpatch and create the new branch automatically
vprint(f"--- Applying diff to {head_owner}/{head_repo} from {base_branch} to new branch {new_branch_name} ---")
response, duration = self._request("POST", f"repos/{head_owner}/{head_repo}/diffpatch", json={
"branch": base_branch,
"new_branch": new_branch_name,
"content": diff_content,
"message": title
})
# Now create the PR in the ORIGINAL repo
data = {
"head": f"{head_owner}:{new_branch_name}" if head_owner != owner else new_branch_name,
"base": base_branch,
"title": title,
"body": body,
"allow_maintainer_edit": True
}
vprint(f"--- Creating PR in {repo_full_name} from {data['head']} ---")
response, duration = self._request("POST", f"repos/{owner}/{repo}/pulls", json=data)
return response.json()
def create_branch(self, owner: str, repo: str, new_branch_name: str, old_ref: str):
vprint(f"--- Checking branch '{new_branch_name}' in {owner}/{repo} ---")
try:
response, duration = self._request("GET", f"repos/{owner}/{repo}/branches/{new_branch_name}")
vprint(f"[{duration:.3f}s] Branch '{new_branch_name}' already exists.")
return False
except requests.exceptions.HTTPError as e:
if e.response.status_code != 404:
raise # Re-raise other HTTP errors
vprint(f"--- Creating branch '{new_branch_name}' in {owner}/{repo} from {old_ref} ---")
url = f"repos/{owner}/{repo}/branches"
data = {
"new_branch_name": new_branch_name,
"old_ref": old_ref
}
response, duration = self._request("POST", url, json=data)
vprint(f"[{duration:.3f}s] Branch '{new_branch_name}' created in {owner}/{repo}.")
return True
def ensure_branch_exists(self, owner: str, repo: str, branch: str = "main", timeout: int = 10):
vprint(f"--- Ensuring branch '{branch}' exists in {owner}/{repo} ---")
start_time = time.time()
while time.time() - start_time < timeout:
try:
response, duration = self._request("GET", f"repos/{owner}/{repo}/branches/{branch}")
vprint(f"[{duration:.3f}s] Branch '{branch}' confirmed in {owner}/{repo}.")
return
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
vprint(f"Branch '{branch}' not found yet in {owner}/{repo}. Retrying...")
time.sleep(1)
continue
raise
raise Exception(f"Timeout waiting for branch {branch} in {owner}/{repo}")
def modify_gitea_pr(self, repo_full_name: str, pr_number: int, diff_content: str, message: str):
owner, repo = repo_full_name.split("/")
# Get PR details to find the head branch AND head repo
response, duration = self._request("GET", f"repos/{owner}/{repo}/pulls/{pr_number}")
pr_details = response.json()
head_branch = pr_details["head"]["ref"]
head_repo_owner = pr_details["head"]["repo"]["owner"]["login"]
head_repo_name = pr_details["head"]["repo"]["name"]
# Apply the diff using diffpatch
vprint(f"--- Modifying PR #{pr_number} in {head_repo_owner}/{head_repo_name} branch {head_branch} ---")
response, duration = self._request("POST", f"repos/{head_repo_owner}/{head_repo_name}/diffpatch", json={
"branch": head_branch,
"content": diff_content,
"message": message
})
def update_gitea_pr_properties(self, repo_full_name: str, pr_number: int, **kwargs):
owner, repo = repo_full_name.split("/")
url = f"repos/{owner}/{repo}/pulls/{pr_number}"
response, duration = self._request("PATCH", url, json=kwargs)
return response.json()
def create_issue_comment(self, repo_full_name: str, issue_number: int, body: str):
owner, repo = repo_full_name.split("/")
url = f"repos/{owner}/{repo}/issues/{issue_number}/comments"
data = {"body": body}
vprint(f"--- Creating comment on {repo_full_name} issue #{issue_number} ---")
response, duration = self._request("POST", url, json=data)
return response.json()
def get_timeline_events(self, repo_full_name: str, pr_number: int):
owner, repo = repo_full_name.split("/")
url = f"repos/{owner}/{repo}/issues/{pr_number}/timeline"
# Retry logic for timeline events
for i in range(10): # Try up to 10 times
try:
response, duration = self._request("GET", url)
timeline_events = response.json()
if timeline_events: # Check if timeline_events list is not empty
return timeline_events
vprint(f"Attempt {i+1}: Timeline for PR {pr_number} is empty. Retrying in 1 seconds...")
time.sleep(1)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
vprint(f"Attempt {i+1}: Timeline for PR {pr_number} not found yet. Retrying in 1 seconds...")
time.sleep(1)
else:
raise # Re-raise other HTTP errors
raise Exception(f"Failed to retrieve timeline for PR {pr_number} after multiple retries.")
def get_comments(self, repo_full_name: str, pr_number: int):
owner, repo = repo_full_name.split("/")
url = f"repos/{owner}/{repo}/issues/{pr_number}/comments"
# Retry logic for comments
for i in range(10): # Try up to 10 times
try:
response, duration = self._request("GET", url)
comments = response.json()
vprint(f"[{duration:.3f}s] Attempt {i+1}: Comments for PR {pr_number} received: {comments}") # Added debug print
if comments: # Check if comments list is not empty
return comments
vprint(f"Attempt {i+1}: Comments for PR {pr_number} are empty. Retrying in 1 seconds...")
time.sleep(1)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
vprint(f"Attempt {i+1}: Comments for PR {pr_number} not found yet. Retrying in 1 seconds...")
time.sleep(1)
else:
raise # Re-raise other HTTP errors
raise Exception(f"Failed to retrieve comments for PR {pr_number} after multiple retries.")
def get_pr_details(self, repo_full_name: str, pr_number: int):
owner, repo = repo_full_name.split("/")
url = f"repos/{owner}/{repo}/pulls/{pr_number}"
response, duration = self._request("GET", url)
return response.json()
def create_review(self, repo_full_name: str, pr_number: int, event: str = "APPROVED", body: str = "LGTM"):
owner, repo = repo_full_name.split("/")
# Check if this user already has an APPROVED review to avoid 422
current_user = self.headers.get("Sudo") or "admin" # simplified
existing_reviews = self.list_reviews(repo_full_name, pr_number)
for r in existing_reviews:
if r["user"]["login"] == current_user and r["state"] == "APPROVED" and event == "APPROVED":
vprint(f"User {current_user} already has an APPROVED review for {repo_full_name} PR #{pr_number}")
return r
url = f"repos/{owner}/{repo}/pulls/{pr_number}/reviews"
data = {
"event": event,
"body": body
}
vprint(f"--- Creating and submitting review ({event}) for {repo_full_name} PR #{pr_number} as {current_user} ---")
try:
response, duration = self._request("POST", url, json=data)
review = response.json()
except requests.exceptions.HTTPError as e:
# If it fails with 422, it might be because a review is already pending or something else
vprint(f"Failed to create review: {e.response.text}")
# Try to find a pending review to submit
existing_reviews = self.list_reviews(repo_full_name, pr_number)
pending_review = next((r for r in existing_reviews if r["user"]["login"] == current_user and r["state"] == "PENDING"), None)
if pending_review:
review = pending_review
else:
raise
# If the state is PENDING, we submit it.
if review.get("state") == "PENDING":
review_id = review["id"]
submit_url = f"repos/{owner}/{repo}/pulls/{pr_number}/reviews/{review_id}"
submit_data = {
"event": event,
"body": body
}
try:
response, duration = self._request("POST", submit_url, json=submit_data)
vprint(f"[{duration:.3f}s] --- Review {review_id} submitted ---")
except requests.exceptions.HTTPError as e:
if "already" in e.response.text.lower() or "stay pending" in e.response.text.lower():
vprint(f"Review {review_id} could not be submitted further: {e.response.text}")
else:
raise
return review
def list_reviews(self, repo_full_name: str, pr_number: int):
owner, repo = repo_full_name.split("/")
url = f"repos/{owner}/{repo}/pulls/{pr_number}/reviews"
response, duration = self._request("GET", url)
return response.json()
def approve_requested_reviews(self, repo_full_name: str, pr_number: int):
vprint(f"--- Checking for REQUEST_REVIEW state in {repo_full_name} PR #{pr_number} ---")
reviews = self.list_reviews(repo_full_name, pr_number)
requested_reviews = [r for r in reviews if r["state"] == "REQUEST_REVIEW"]
if not requested_reviews:
vprint(f"No reviews in REQUEST_REVIEW state found for {repo_full_name} PR #{pr_number}")
return
admin_token = self.headers["Authorization"].split(" ")[1]
for r in requested_reviews:
reviewer_username = r["user"]["login"]
vprint(f"Reacting on REQUEST_REVIEW for user {reviewer_username} by approving...")
reviewer_client = GiteaAPIClient(base_url=self.base_url, token=admin_token, sudo=reviewer_username)
time.sleep(1) # give a chance to avoid possible concurrency issues with reviews request/approval
reviewer_client.create_review(repo_full_name, pr_number, event="APPROVED", body="Approving requested review")
def wait_for_project_pr(self, package_pr_repo, package_pr_number, project_pr_repo="myproducts/mySLFO", timeout=60):
vprint(f"Polling {package_pr_repo} PR #{package_pr_number} timeline for forwarded PR event in {project_pr_repo}...")
for _ in range(timeout):
time.sleep(1)
timeline_events = self.get_timeline_events(package_pr_repo, package_pr_number)
for event in timeline_events:
if event.get("type") == "pull_ref":
if not (ref_issue := event.get("ref_issue")):
continue
url_to_check = ref_issue.get("html_url", "")
match = re.search(fr"{project_pr_repo}/pulls/(\d+)", url_to_check)
if match:
return int(match.group(1))
return None
def approve_and_wait_merge(self, package_pr_repo, package_pr_number, project_pr_number, project_pr_repo="myproducts/mySLFO", timeout=30):
vprint(f"Approving reviews and verifying both PRs are merged ({package_pr_repo}#{package_pr_number} and {project_pr_repo}#{project_pr_number})...")
package_merged = False
project_merged = False
for i in range(timeout):
self.approve_requested_reviews(package_pr_repo, package_pr_number)
self.approve_requested_reviews(project_pr_repo, project_pr_number)
if not package_merged:
pkg_details = self.get_pr_details(package_pr_repo, package_pr_number)
if pkg_details.get("merged"):
package_merged = True
vprint(f"Package PR {package_pr_repo}#{package_pr_number} merged.")
if not project_merged:
prj_details = self.get_pr_details(project_pr_repo, project_pr_number)
if prj_details.get("merged"):
project_merged = True
vprint(f"Project PR {project_pr_repo}#{project_pr_number} merged.")
if package_merged and project_merged:
return True, True
time.sleep(1)
return package_merged, project_merged