forked from git-workflow/autogits
Also address potential race condition between requested reviews and sending approvals in TC-MERGE-002
533 lines
23 KiB
Python
533 lines
23 KiB
Python
import os
|
|
import time
|
|
import pytest
|
|
import requests
|
|
import json
|
|
import xml.etree.ElementTree as ET
|
|
from pathlib import Path
|
|
import base64
|
|
import subprocess
|
|
|
|
TEST_DATA_DIR = Path(__file__).parent.parent / "data"
|
|
BUILD_RESULT_TEMPLATE = TEST_DATA_DIR / "build_result.xml.template"
|
|
MOCK_RESPONSES_DIR = Path(__file__).parent.parent.parent / "mock-obs" / "responses"
|
|
MOCK_BUILD_RESULT_FILE = (
|
|
MOCK_RESPONSES_DIR / "GET_build_openSUSE:Leap:16.0:PullRequest:*__result"
|
|
)
|
|
MOCK_BUILD_RESULT_FILE1 = MOCK_RESPONSES_DIR / "GET_build_openSUSE:Leap:16.0__result"
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_build_result():
|
|
"""
|
|
Fixture to create a mock build result file from the template.
|
|
Returns a factory function that the test can call with parameters.
|
|
"""
|
|
|
|
def _create_result_file(package_name: str, code: str):
|
|
tree = ET.parse(BUILD_RESULT_TEMPLATE)
|
|
root = tree.getroot()
|
|
for status_tag in root.findall(".//status"):
|
|
status_tag.set("package", package_name)
|
|
status_tag.set("code", code)
|
|
|
|
MOCK_RESPONSES_DIR.mkdir(exist_ok=True)
|
|
tree.write(MOCK_BUILD_RESULT_FILE)
|
|
tree.write(MOCK_BUILD_RESULT_FILE1)
|
|
return str(MOCK_BUILD_RESULT_FILE)
|
|
|
|
yield _create_result_file
|
|
|
|
if MOCK_BUILD_RESULT_FILE.exists():
|
|
MOCK_BUILD_RESULT_FILE.unlink()
|
|
MOCK_BUILD_RESULT_FILE1.unlink()
|
|
|
|
|
|
class GiteaAPIClient:
|
|
def __init__(self, base_url, token, sudo=None):
|
|
self.base_url = base_url
|
|
self.headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
|
|
if sudo:
|
|
self.headers["Sudo"] = sudo
|
|
|
|
def _request(self, method, path, **kwargs):
|
|
url = f"{self.base_url}/api/v1/{path}"
|
|
response = requests.request(method, url, headers=self.headers, **kwargs)
|
|
try:
|
|
response.raise_for_status()
|
|
except requests.exceptions.HTTPError as e:
|
|
print(f"HTTPError in _request: {e}")
|
|
print(f"Response Content: {e.response.text}")
|
|
raise
|
|
return response
|
|
|
|
def get_file_info(self, owner: str, repo: str, file_path: str, branch: str = "main"):
|
|
url = f"repos/{owner}/{repo}/contents/{file_path}"
|
|
if branch and branch != "main":
|
|
url += f"?ref={branch}"
|
|
try:
|
|
response = self._request("GET", url)
|
|
return response.json()
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
return None
|
|
raise
|
|
|
|
def create_user(self, username, password, email):
|
|
print(f"--- Creating user: {username} ---")
|
|
data = {
|
|
"username": username,
|
|
"password": password,
|
|
"email": email,
|
|
"must_change_password": False,
|
|
"send_notify": False
|
|
}
|
|
try:
|
|
self._request("POST", "admin/users", json=data)
|
|
print(f"User '{username}' created.")
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 422: # Already exists
|
|
print(f"User '{username}' already exists. Updating password...")
|
|
# Update password to be sure it matches our expectation
|
|
self._request("PATCH", f"admin/users/{username}", json={"password": password, "login_name": username})
|
|
else:
|
|
raise
|
|
|
|
def get_user_token(self, username, password, token_name="test-token"):
|
|
print(f"--- Getting token for user: {username} ---")
|
|
url = f"{self.base_url}/api/v1/users/{username}/tokens"
|
|
|
|
# Create new token using Basic Auth
|
|
response = requests.post(url, auth=(username, password), json={"name": token_name})
|
|
if response.status_code == 201:
|
|
return response.json()["sha1"]
|
|
response.raise_for_status()
|
|
|
|
def create_org(self, org_name):
|
|
print(f"--- Checking organization: {org_name} ---")
|
|
try:
|
|
self._request("GET", f"orgs/{org_name}")
|
|
print(f"Organization '{org_name}' already exists.")
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
print(f"Creating organization '{org_name}'...")
|
|
data = {"username": org_name, "full_name": org_name}
|
|
self._request("POST", "orgs", json=data)
|
|
print(f"Organization '{org_name}' created.")
|
|
else:
|
|
raise
|
|
print(f"--- Checking organization: {org_name} ---")
|
|
try:
|
|
self._request("GET", f"orgs/{org_name}")
|
|
print(f"Organization '{org_name}' already exists.")
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
print(f"Creating organization '{org_name}'...")
|
|
data = {"username": org_name, "full_name": org_name}
|
|
self._request("POST", "orgs", json=data)
|
|
print(f"Organization '{org_name}' created.")
|
|
else:
|
|
raise
|
|
|
|
def create_repo(self, org_name, repo_name):
|
|
print(f"--- Checking repository: {org_name}/{repo_name} ---")
|
|
try:
|
|
self._request("GET", f"repos/{org_name}/{repo_name}")
|
|
print(f"Repository '{org_name}/{repo_name}' already exists.")
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
print(f"Creating repository '{org_name}/{repo_name}'...")
|
|
data = {
|
|
"name": repo_name,
|
|
"auto_init": True,
|
|
"default_branch": "main",
|
|
"gitignores": "Go",
|
|
"license": "MIT",
|
|
"private": False,
|
|
"readme": "Default"
|
|
}
|
|
self._request("POST", f"orgs/{org_name}/repos", json=data)
|
|
print(f"Repository '{org_name}/{repo_name}' created with a README.")
|
|
time.sleep(0.1) # Added delay to allow Git operations to become available
|
|
else:
|
|
raise
|
|
|
|
def add_collaborator(self, org_name, repo_name, collaborator_name, permission="write"):
|
|
print(f"--- Adding {collaborator_name} as a collaborator to {org_name}/{repo_name} with '{permission}' permission ---")
|
|
data = {"permission": permission}
|
|
# Gitea API returns 204 No Content on success and doesn't fail if already present.
|
|
self._request("PUT", f"repos/{org_name}/{repo_name}/collaborators/{collaborator_name}", json=data)
|
|
print(f"Attempted to add {collaborator_name} to {org_name}/{repo_name}.")
|
|
|
|
def add_submodules(self, org_name, repo_name):
|
|
print(f"--- Adding submodules to {org_name}/{repo_name} using diffpatch ---")
|
|
parent_repo_path = f"repos/{org_name}/{repo_name}"
|
|
|
|
try:
|
|
self._request("GET", f"{parent_repo_path}/contents/.gitmodules")
|
|
print("Submodules appear to be already added. Skipping.")
|
|
return
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code != 404:
|
|
raise
|
|
|
|
# Get latest commit SHAs for the submodules
|
|
pkg_a_sha = self._request("GET", "repos/mypool/pkgA/branches/main").json()["commit"]["id"]
|
|
pkg_b_sha = self._request("GET", "repos/mypool/pkgB/branches/main").json()["commit"]["id"]
|
|
|
|
if not pkg_a_sha or not pkg_b_sha:
|
|
raise Exception("Error: Could not get submodule commit SHAs. Cannot apply patch.")
|
|
|
|
diff_content = f"""diff --git a/.gitmodules b/.gitmodules
|
|
new file mode 100644
|
|
index 0000000..f1838bd
|
|
--- /dev/null
|
|
+++ b/.gitmodules
|
|
@@ -0,0 +1,6 @@
|
|
+[submodule "pkgA"]
|
|
+ path = pkgA
|
|
+ url = ../../mypool/pkgA.git
|
|
+[submodule "pkgB"]
|
|
+ path = pkgB
|
|
+ url = ../../mypool/pkgB.git
|
|
diff --git a/pkgA b/pkgA
|
|
new file mode 160000
|
|
index 0000000..{pkg_a_sha}
|
|
--- /dev/null
|
|
+++ b/pkgA
|
|
@@ -0,0 +1 @@
|
|
+Subproject commit {pkg_a_sha}
|
|
diff --git a/pkgB b/pkgB
|
|
new file mode 160000
|
|
index 0000000..{pkg_b_sha}
|
|
--- /dev/null
|
|
+++ b/pkgB
|
|
@@ -0,0 +1 @@
|
|
+Subproject commit {pkg_b_sha}
|
|
"""
|
|
message = "Add pkgA and pkgB as submodules"
|
|
data = {
|
|
"branch": "main",
|
|
"content": diff_content,
|
|
"message": message
|
|
}
|
|
print(f"Applying submodule patch to {org_name}/{repo_name}...")
|
|
self._request("POST", f"{parent_repo_path}/diffpatch", json=data)
|
|
print("Submodule patch applied.")
|
|
|
|
def update_repo_settings(self, org_name, repo_name):
|
|
print(f"--- Updating repository settings for: {org_name}/{repo_name} ---")
|
|
repo_data = self._request("GET", f"repos/{org_name}/{repo_name}").json()
|
|
|
|
# Ensure these are boolean values, not string
|
|
repo_data["allow_manual_merge"] = True
|
|
repo_data["autodetect_manual_merge"] = True
|
|
|
|
self._request("PATCH", f"repos/{org_name}/{repo_name}", json=repo_data)
|
|
print(f"Repository settings for '{org_name}/{repo_name}' updated.")
|
|
|
|
def create_label(self, owner: str, repo: str, name: str, color: str = "#abcdef"):
|
|
print(f"--- Creating label '{name}' in {owner}/{repo} ---")
|
|
url = f"repos/{owner}/{repo}/labels"
|
|
data = {
|
|
"name": name,
|
|
"color": color
|
|
}
|
|
try:
|
|
self._request("POST", url, json=data)
|
|
print(f"Label '{name}' created.")
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 422: # Already exists
|
|
print(f"Label '{name}' already exists.")
|
|
else:
|
|
raise
|
|
|
|
def create_file(self, owner: str, repo: str, file_path: str, content: str, branch: str = "main", message: str = "Add file"):
|
|
file_info = self.get_file_info(owner, repo, file_path, branch=branch)
|
|
|
|
data = {
|
|
"content": base64.b64encode(content.encode('utf-8')).decode('ascii'),
|
|
"branch": branch,
|
|
"message": message
|
|
}
|
|
|
|
if file_info:
|
|
print(f"--- Updating file {file_path} in {owner}/{repo} ---")
|
|
# Re-fetch file_info to get the latest SHA right before update
|
|
latest_file_info = self.get_file_info(owner, repo, file_path, branch=branch)
|
|
if not latest_file_info:
|
|
raise Exception(f"File {file_path} disappeared during update attempt.")
|
|
data["sha"] = latest_file_info["sha"]
|
|
data["message"] = f"Update {file_path}"
|
|
method = "PUT"
|
|
else:
|
|
print(f"--- Creating file {file_path} in {owner}/{repo} ---")
|
|
method = "POST"
|
|
|
|
url = f"repos/{owner}/{repo}/contents/{file_path}"
|
|
self._request(method, url, json=data)
|
|
print(f"File {file_path} {'updated' if file_info else 'created'} in {owner}/{repo}.")
|
|
|
|
def create_gitea_pr(self, repo_full_name: str, diff_content: str, title: str, use_fork: bool, base_branch: str = "main", body: str = ""):
|
|
owner, repo = repo_full_name.split("/")
|
|
|
|
head_owner, head_repo = owner, repo
|
|
|
|
if use_fork:
|
|
sudo_user = self.headers.get("Sudo")
|
|
head_owner = sudo_user
|
|
head_repo = repo
|
|
new_branch_name = f"pr-branch-{int(time.time()*1000)}"
|
|
|
|
print(f"--- Forking {repo_full_name} ---")
|
|
try:
|
|
self._request("POST", f"repos/{owner}/{repo}/forks", json={})
|
|
print(f"--- Forked to {head_owner}/{head_repo} ---")
|
|
time.sleep(0.5) # Give more time for fork to be ready
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 409: # Already forked
|
|
print(f"--- Already forked to {head_owner}/{head_repo} ---")
|
|
else:
|
|
raise
|
|
|
|
# Create a unique branch in the FORK
|
|
base_commit_sha = self._request("GET", f"repos/{owner}/{repo}/branches/{base_branch}").json()["commit"]["id"]
|
|
print(f"--- Creating branch {new_branch_name} in {head_owner}/{head_repo} from {base_branch} ({base_commit_sha}) ---")
|
|
self._request("POST", f"repos/{head_owner}/{head_repo}/branches", json={
|
|
"new_branch_name": new_branch_name,
|
|
"old_ref": base_commit_sha
|
|
})
|
|
else:
|
|
new_branch_name = f"pr-branch-{int(time.time()*1000)}"
|
|
# Get the latest commit SHA of the base branch from the ORIGINAL repo
|
|
base_commit_sha = self._request("GET", f"repos/{owner}/{repo}/branches/{base_branch}").json()["commit"]["id"]
|
|
|
|
# Try to create the branch in the ORIGINAL repo
|
|
print(f"--- Creating branch {new_branch_name} in {repo_full_name} ---")
|
|
self._request("POST", f"repos/{owner}/{repo}/branches", json={
|
|
"new_branch_name": new_branch_name,
|
|
"old_ref": base_commit_sha
|
|
})
|
|
|
|
# Apply the diff using diffpatch in the branch (wherever it is)
|
|
print(f"--- Applying diff to {head_owner}/{head_repo} branch {new_branch_name} ---")
|
|
self._request("POST", f"repos/{head_owner}/{head_repo}/diffpatch", json={
|
|
"branch": new_branch_name,
|
|
"content": diff_content,
|
|
"message": title
|
|
})
|
|
|
|
# Now create the PR in the ORIGINAL repo
|
|
data = {
|
|
"head": f"{head_owner}:{new_branch_name}" if head_owner != owner else new_branch_name,
|
|
"base": base_branch,
|
|
"title": title,
|
|
"body": body,
|
|
"allow_maintainer_edit": True
|
|
}
|
|
print(f"--- Creating PR in {repo_full_name} from {data['head']} ---")
|
|
response = self._request("POST", f"repos/{owner}/{repo}/pulls", json=data)
|
|
return response.json()
|
|
|
|
def create_branch(self, owner: str, repo: str, new_branch_name: str, old_ref: str):
|
|
print(f"--- Checking branch '{new_branch_name}' in {owner}/{repo} ---")
|
|
try:
|
|
self._request("GET", f"repos/{owner}/{repo}/branches/{new_branch_name}")
|
|
print(f"Branch '{new_branch_name}' already exists.")
|
|
return
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code != 404:
|
|
raise # Re-raise other HTTP errors
|
|
|
|
print(f"--- Creating branch '{new_branch_name}' in {owner}/{repo} from {old_ref} ---")
|
|
url = f"repos/{owner}/{repo}/branches"
|
|
data = {
|
|
"new_branch_name": new_branch_name,
|
|
"old_ref": old_ref
|
|
}
|
|
self._request("POST", url, json=data)
|
|
print(f"Branch '{new_branch_name}' created in {owner}/{repo}.")
|
|
|
|
def ensure_branch_exists(self, owner: str, repo: str, branch: str = "main", timeout: int = 10):
|
|
print(f"--- Ensuring branch '{branch}' exists in {owner}/{repo} ---")
|
|
start_time = time.time()
|
|
while time.time() - start_time < timeout:
|
|
try:
|
|
self._request("GET", f"repos/{owner}/{repo}/branches/{branch}")
|
|
print(f"Branch '{branch}' confirmed in {owner}/{repo}.")
|
|
return
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
print(f"Branch '{branch}' not found yet in {owner}/{repo}. Retrying...")
|
|
time.sleep(1)
|
|
continue
|
|
raise
|
|
raise Exception(f"Timeout waiting for branch {branch} in {owner}/{repo}")
|
|
|
|
|
|
|
|
def modify_gitea_pr(self, repo_full_name: str, pr_number: int, diff_content: str, message: str):
|
|
owner, repo = repo_full_name.split("/")
|
|
|
|
# Get PR details to find the head branch AND head repo
|
|
pr_details = self._request("GET", f"repos/{owner}/{repo}/pulls/{pr_number}").json()
|
|
head_branch = pr_details["head"]["ref"]
|
|
head_repo_owner = pr_details["head"]["repo"]["owner"]["login"]
|
|
head_repo_name = pr_details["head"]["repo"]["name"]
|
|
|
|
# Apply the diff using diffpatch
|
|
print(f"--- Modifying PR #{pr_number} in {head_repo_owner}/{head_repo_name} branch {head_branch} ---")
|
|
self._request("POST", f"repos/{head_repo_owner}/{head_repo_name}/diffpatch", json={
|
|
"branch": head_branch,
|
|
"content": diff_content,
|
|
"message": message
|
|
})
|
|
|
|
def update_gitea_pr_properties(self, repo_full_name: str, pr_number: int, **kwargs):
|
|
owner, repo = repo_full_name.split("/")
|
|
url = f"repos/{owner}/{repo}/pulls/{pr_number}"
|
|
response = self._request("PATCH", url, json=kwargs)
|
|
return response.json()
|
|
|
|
def create_issue_comment(self, repo_full_name: str, issue_number: int, body: str):
|
|
owner, repo = repo_full_name.split("/")
|
|
url = f"repos/{owner}/{repo}/issues/{issue_number}/comments"
|
|
data = {"body": body}
|
|
print(f"--- Creating comment on {repo_full_name} issue #{issue_number} ---")
|
|
response = self._request("POST", url, json=data)
|
|
return response.json()
|
|
|
|
def get_timeline_events(self, repo_full_name: str, pr_number: int):
|
|
owner, repo = repo_full_name.split("/")
|
|
url = f"repos/{owner}/{repo}/issues/{pr_number}/timeline"
|
|
|
|
# Retry logic for timeline events
|
|
for i in range(10): # Try up to 10 times
|
|
try:
|
|
response = self._request("GET", url)
|
|
timeline_events = response.json()
|
|
if timeline_events: # Check if timeline_events list is not empty
|
|
return timeline_events
|
|
print(f"Attempt {i+1}: Timeline for PR {pr_number} is empty. Retrying in 1 seconds...")
|
|
time.sleep(1)
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
print(f"Attempt {i+1}: Timeline for PR {pr_number} not found yet. Retrying in 1 seconds...")
|
|
time.sleep(1)
|
|
else:
|
|
raise # Re-raise other HTTP errors
|
|
raise Exception(f"Failed to retrieve timeline for PR {pr_number} after multiple retries.")
|
|
|
|
def get_comments(self, repo_full_name: str, pr_number: int):
|
|
owner, repo = repo_full_name.split("/")
|
|
url = f"repos/{owner}/{repo}/issues/{pr_number}/comments"
|
|
|
|
# Retry logic for comments
|
|
for i in range(10): # Try up to 10 times
|
|
try:
|
|
response = self._request("GET", url)
|
|
comments = response.json()
|
|
print(f"Attempt {i+1}: Comments for PR {pr_number} received: {comments}") # Added debug print
|
|
if comments: # Check if comments list is not empty
|
|
return comments
|
|
print(f"Attempt {i+1}: Comments for PR {pr_number} are empty. Retrying in 1 seconds...")
|
|
time.sleep(1)
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
print(f"Attempt {i+1}: Comments for PR {pr_number} not found yet. Retrying in 1 seconds...")
|
|
time.sleep(1)
|
|
else:
|
|
raise # Re-raise other HTTP errors
|
|
raise Exception(f"Failed to retrieve comments for PR {pr_number} after multiple retries.")
|
|
|
|
def get_pr_details(self, repo_full_name: str, pr_number: int):
|
|
owner, repo = repo_full_name.split("/")
|
|
url = f"repos/{owner}/{repo}/pulls/{pr_number}"
|
|
response = self._request("GET", url)
|
|
return response.json()
|
|
|
|
def create_review(self, repo_full_name: str, pr_number: int, event: str = "APPROVED", body: str = "LGTM"):
|
|
owner, repo = repo_full_name.split("/")
|
|
|
|
# Check if this user already has an APPROVED review to avoid 422
|
|
current_user = self.headers.get("Sudo") or "admin" # simplified
|
|
existing_reviews = self.list_reviews(repo_full_name, pr_number)
|
|
for r in existing_reviews:
|
|
if r["user"]["login"] == current_user and r["state"] == "APPROVED" and event == "APPROVED":
|
|
print(f"User {current_user} already has an APPROVED review for {repo_full_name} PR #{pr_number}")
|
|
return r
|
|
|
|
url = f"repos/{owner}/{repo}/pulls/{pr_number}/reviews"
|
|
data = {
|
|
"event": event,
|
|
"body": body
|
|
}
|
|
print(f"--- Creating and submitting review ({event}) for {repo_full_name} PR #{pr_number} as {current_user} ---")
|
|
try:
|
|
response = self._request("POST", url, json=data)
|
|
review = response.json()
|
|
except requests.exceptions.HTTPError as e:
|
|
# If it fails with 422, it might be because a review is already pending or something else
|
|
print(f"Failed to create review: {e.response.text}")
|
|
# Try to find a pending review to submit
|
|
existing_reviews = self.list_reviews(repo_full_name, pr_number)
|
|
pending_review = next((r for r in existing_reviews if r["user"]["login"] == current_user and r["state"] == "PENDING"), None)
|
|
if pending_review:
|
|
review = pending_review
|
|
else:
|
|
raise
|
|
|
|
# If the state is PENDING, we submit it.
|
|
if review.get("state") == "PENDING":
|
|
review_id = review["id"]
|
|
submit_url = f"repos/{owner}/{repo}/pulls/{pr_number}/reviews/{review_id}"
|
|
submit_data = {
|
|
"event": event,
|
|
"body": body
|
|
}
|
|
try:
|
|
self._request("POST", submit_url, json=submit_data)
|
|
print(f"--- Review {review_id} submitted ---")
|
|
except requests.exceptions.HTTPError as e:
|
|
if "already" in e.response.text.lower() or "stay pending" in e.response.text.lower():
|
|
print(f"Review {review_id} could not be submitted further: {e.response.text}")
|
|
else:
|
|
raise
|
|
|
|
return review
|
|
|
|
def list_reviews(self, repo_full_name: str, pr_number: int):
|
|
owner, repo = repo_full_name.split("/")
|
|
url = f"repos/{owner}/{repo}/pulls/{pr_number}/reviews"
|
|
response = self._request("GET", url)
|
|
return response.json()
|
|
|
|
def approve_requested_reviews(self, repo_full_name: str, pr_number: int):
|
|
print(f"--- Checking for REQUEST_REVIEW state in {repo_full_name} PR #{pr_number} ---")
|
|
reviews = self.list_reviews(repo_full_name, pr_number)
|
|
|
|
requested_reviews = [r for r in reviews if r["state"] == "REQUEST_REVIEW"]
|
|
if not requested_reviews:
|
|
print(f"No reviews in REQUEST_REVIEW state found for {repo_full_name} PR #{pr_number}")
|
|
return
|
|
|
|
admin_token = self.headers["Authorization"].split(" ")[1]
|
|
for r in requested_reviews:
|
|
reviewer_username = r["user"]["login"]
|
|
print(f"Reacting on REQUEST_REVIEW for user {reviewer_username} by approving...")
|
|
|
|
reviewer_client = GiteaAPIClient(base_url=self.base_url, token=admin_token, sudo=reviewer_username)
|
|
time.sleep(1) # give a chance to avoid possible concurrency issues with reviews request/approval
|
|
reviewer_client.create_review(repo_full_name, pr_number, event="APPROVED", body="Approving requested review")
|
|
|
|
def restart_service(self, service_name: str):
|
|
print(f"--- Restarting service: {service_name} ---")
|
|
try:
|
|
# Assumes podman-compose.yml is in the parent directory of tests/lib
|
|
subprocess.run(["podman-compose", "restart", service_name], check=True, cwd=os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)))
|
|
print(f"Service {service_name} restarted successfully.")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error restarting service {service_name}: {e}")
|
|
raise
|
|
|