1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-11-02 04:22:15 +01:00

Implement 'git-obs pr dump' command to store pull request information on disk

This commit is contained in:
2025-05-13 13:50:07 +02:00
parent 7a9113953b
commit f288795e58
4 changed files with 289 additions and 10 deletions

182
osc/commands_git/pr_dump.py Normal file
View File

@@ -0,0 +1,182 @@
import os
from typing import Optional
import osc.commandline_git
class PullRequestDumpCommand(osc.commandline_git.GitObsCommand):
"""
Dump a pull request to disk
"""
name = "dump"
parent = "PullRequestCommand"
def init_arguments(self):
from osc.commandline_git import complete_checkout_pr
self.add_argument(
"id",
nargs="+",
help="Pull request ID in <owner>/<repo>#<number> format",
).completer = complete_checkout_pr
def clone_or_update(
self,
owner: str,
repo: str,
*,
pr_number: Optional[int] = None,
branch: Optional[str] = None,
commit: str,
directory: str,
reference: Optional[str] = None,
):
from osc import gitea_api
if not pr_number and not branch:
raise ValueError("Either 'pr_number' or 'branch' must be specified")
if not os.path.isdir(os.path.join(directory, ".git")):
gitea_api.Repo.clone(
self.gitea_conn,
owner,
repo,
directory=directory,
add_remotes=True,
reference=reference,
)
git = gitea_api.Git(directory)
git_owner, git_repo = git.get_owner_repo()
assert git_owner == owner, f"owner does not match: {git_owner} != {owner}"
assert git_repo == repo, f"repo does not match: {git_repo} != {repo}"
git.fetch()
if pr_number:
# checkout the pull request and check if HEAD matches head/sha from Gitea
pr_branch = git.fetch_pull_request(pr_number, force=True)
git.switch(pr_branch)
head_commit = git.get_branch_head()
assert head_commit == commit, f"HEAD of the current branch '{pr_branch}' is '{head_commit}' but the Gitea pull request points to '{commit}'"
elif branch:
git.switch(branch)
if not git.branch_contains_commit(commit=commit):
raise RuntimeError(f"Branch '{branch}' doesn't contain commit '{commit}'")
git.reset(commit, hard=True)
else:
raise ValueError("Either 'pr_number' or 'branch' must be specified")
def run(self, args):
import json
from osc import gitea_api
from osc import obs_api
from osc.util.xml import xml_indent
from osc.util.xml import ET
self.print_gitea_settings()
pull_request_ids = args.id
for pr_id in pull_request_ids:
owner, repo, number = gitea_api.PullRequest.split_id(pr_id)
pr_obj = gitea_api.PullRequest.get(self.gitea_conn, owner, repo, number)
path = os.path.join(owner, repo, str(number))
review_obj_list = pr_obj.get_reviews(self.gitea_conn)
# see https://github.com/go-gitea/gitea/blob/main/modules/structs/pull_review.go - look for "type ReviewStateType string"
state_map = {
"APPROVED": "accepted",
"REQUEST_CHANGES": "declined",
"REQUEST_REVIEW": "new", # review hasn't started
"PENDING": "review", # review is in progress
"COMMENT": "deleted", # just to make XML validation happy, we'll replace it with "comment" later
}
xml_review_list = []
for review_obj in review_obj_list:
xml_review_list.append(
{
"state": state_map[review_obj.state],
"who": review_obj.who,
"created": review_obj.submitted_at,
"when": review_obj.updated_at,
"comment": review_obj.body,
}
)
req = obs_api.Request(
id=pr_id,
title=pr_obj.title,
description=pr_obj.body,
creator=pr_obj.user,
# each pull request maps to only one action
action_list=[
{
"type": "submit",
"source": {
"project": pr_obj.head_owner,
"package": pr_obj.head_repo,
"rev": pr_obj.head_commit,
},
"target": {
"project": pr_obj.base_owner,
"package": pr_obj.base_repo,
},
},
],
review_list=xml_review_list,
)
# HACK: changes to request XML that are not compatible with OBS
req_xml = req.to_xml()
req_xml_action = req_xml.find("action")
assert req_xml_action is not None
req_xml_action.attrib["type"] = "gitea-pull-request"
req_xml_action.insert(0, ET.Comment("The type='gitea-pull-request' attribute value is a custom extension to the OBS XML schema."))
req_xml_action_source = req_xml_action.find("source")
assert req_xml_action_source is not None
req_xml_action_source.append(ET.Comment("The 'branch' attribute is a custom extension to the OBS XML schema."))
req_xml_action_source.attrib["branch"] = pr_obj.head_branch
req_xml_action_target = req_xml_action.find("target")
assert req_xml_action_target is not None
req_xml_action_target.append(ET.Comment("The 'rev' and 'branch' attributes are custom extensions to the OBS XML schema."))
req_xml_action_target.attrib["rev"] = pr_obj.base_commit
req_xml_action_target.attrib["branch"] = pr_obj.base_branch
req_xml_review_list = req_xml.findall("review")
for req_xml_review in req_xml_review_list:
if req_xml_review.attrib["state"] == "deleted":
req_xml_review.attrib["state"] = "comment"
req_xml_review.insert(0, ET.Comment("The state='comment' attribute value is a custom extension to the OBS XML schema."))
metadata_dir = os.path.join(path, "metadata")
os.makedirs(metadata_dir, exist_ok=True)
with open(os.path.join(metadata_dir, "obs-request.xml"), "wb") as f:
xml_indent(req_xml)
ET.ElementTree(req_xml).write(f, encoding="utf-8")
with open(os.path.join(metadata_dir, "pr.json"), "w", encoding="utf-8") as f:
json.dump(pr_obj._data, f, indent=4, sort_keys=True)
with open(os.path.join(metadata_dir, "base.json"), "w", encoding="utf-8") as f:
json.dump(pr_obj._data["base"], f, indent=4, sort_keys=True)
with open(os.path.join(metadata_dir, "head.json"), "w", encoding="utf-8") as f:
json.dump(pr_obj._data["head"], f, indent=4, sort_keys=True)
with open(os.path.join(metadata_dir, "reviews.json"), "w", encoding="utf-8") as f:
json.dump([i._data for i in review_obj_list], f, indent=4, sort_keys=True)
base_dir = os.path.join(path, "base")
# we must use the `merge_base` instead of `head_commit`, because the latter changes after merging the PR and the `base` directory would contain incorrect data
self.clone_or_update(owner, repo, branch=pr_obj.base_branch, commit=pr_obj.merge_base, directory=base_dir)
head_dir = os.path.join(path, "head")
self.clone_or_update(owner, repo, pr_number=pr_obj.number, commit=pr_obj.head_commit, directory=head_dir, reference=base_dir)

View File

@@ -77,10 +77,21 @@ class Git:
cmd += ["-q"] cmd += ["-q"]
self._run_git(cmd, mute_stderr=mute_stderr) self._run_git(cmd, mute_stderr=mute_stderr)
def clone(self, url, directory: Optional[str] = None, quiet: bool = True): def clone(self,
url: str,
*,
directory: Optional[str] = None,
reference: Optional[str] = None,
reference_if_able: Optional[str] = None,
quiet: bool = True
):
cmd = ["clone", url] cmd = ["clone", url]
if directory: if directory:
cmd += [directory] cmd += [directory]
if reference:
cmd += ["--reference", reference]
if reference_if_able:
cmd += ["--reference-if-able", reference_if_able]
if quiet: if quiet:
cmd += ["-q"] cmd += ["-q"]
self._run_git(cmd) self._run_git(cmd)
@@ -94,7 +105,20 @@ class Git:
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
return None return None
def get_branch_head(self, branch: str) -> str: def branch_contains_commit(self, commit: str, branch: Optional[str] = None) -> bool:
if not branch:
branch = self.current_branch
try:
stdout = self._run_git(["branch", branch, "--contains", commit, "--format", "%(objectname) %(objecttype) %(refname)"])
return stdout.strip() == f"{commit} commit refs/heads/{branch}"
except subprocess.CalledProcessError:
return False
def get_branch_head(self, branch: Optional[str] = None) -> str:
if not branch:
branch = self.current_branch
return self._run_git(["rev-parse", f"refs/heads/{branch}"]) return self._run_git(["rev-parse", f"refs/heads/{branch}"])
def branch_exists(self, branch: str) -> bool: def branch_exists(self, branch: str) -> bool:

View File

@@ -10,6 +10,68 @@ from .connection import GiteaHTTPResponse
from .user import User from .user import User
class PullRequestReview:
def __init__(self, data: dict, *, response: Optional[GiteaHTTPResponse] = None):
self._data = data
self._response = response
@property
def state(self) -> str:
return self._data["state"]
@property
def user(self) -> Optional[str]:
if not self._data["user"]:
return None
return self._data["user"]["login"]
@property
def team(self) -> Optional[str]:
if not self._data["team"]:
return None
return self._data["team"]["name"]
@property
def who(self) -> str:
return self.user if self.user else f"@{self.team}"
@property
def submitted_at(self) -> str:
return self._data["submitted_at"]
@property
def updated_at(self) -> str:
return self._data["updated_at"]
@property
def body(self) -> str:
return self._data["body"]
@classmethod
def list(
cls,
conn: Connection,
owner: str,
repo: str,
number: int,
) -> List["PullRequestReview"]:
"""
List reviews associated with a pull request.
:param conn: Gitea ``Connection`` instance.
:param owner: Owner of the repo.
:param repo: Name of the repo.
:param number: Number of the pull request in owner/repo.
"""
q = {
"limit": -1,
}
url = conn.makeurl("repos", owner, repo, "pulls", str(number), "reviews", query=q)
response = conn.request("GET", url)
obj_list = [cls(i, response=response) for i in response.json()]
return obj_list
@functools.total_ordering @functools.total_ordering
class PullRequest(GiteaModel): class PullRequest(GiteaModel):
def __eq__(self, other): def __eq__(self, other):
@@ -123,6 +185,12 @@ class PullRequest(GiteaModel):
return None return None
return self._data["base"]["repo"]["ssh_url"] return self._data["base"]["repo"]["ssh_url"]
@property
def merge_base(self) -> Optional[str]:
if not self.is_pull_request:
return None
return self._data["merge_base"]
@property @property
def head_owner(self) -> Optional[str]: def head_owner(self) -> Optional[str]:
if not self.is_pull_request: if not self.is_pull_request:
@@ -432,16 +500,11 @@ class PullRequest(GiteaModel):
} }
return conn.request("POST", url, json_data=json_data) return conn.request("POST", url, json_data=json_data)
@classmethod
def get_reviews( def get_reviews(
cls, self,
conn: Connection, conn: Connection,
owner: str, ) -> List[PullRequestReview]:
repo: str, return PullRequestReview.list(conn, self.base_owner, self.base_repo, self.number)
number: int,
):
url = conn.makeurl("repos", owner, repo, "pulls", str(number), "reviews")
return conn.request("GET", url)
@classmethod @classmethod
def approve_review( def approve_review(

View File

@@ -92,6 +92,8 @@ class Repo(GiteaModel):
cwd: Optional[str] = None, cwd: Optional[str] = None,
use_http: bool = False, use_http: bool = False,
add_remotes: bool = False, add_remotes: bool = False,
reference: Optional[str] = None,
reference_if_able: Optional[str] = None,
ssh_private_key_path: Optional[str] = None, ssh_private_key_path: Optional[str] = None,
ssh_strict_host_key_checking: bool = True, ssh_strict_host_key_checking: bool = True,
) -> str: ) -> str:
@@ -105,6 +107,8 @@ class Repo(GiteaModel):
:param cwd: Working directory. Defaults to the current working directory. :param cwd: Working directory. Defaults to the current working directory.
:param use_http: Whether to use``clone_url`` for cloning over http(s) instead of ``ssh_url`` for cloning over SSH. :param use_http: Whether to use``clone_url`` for cloning over http(s) instead of ``ssh_url`` for cloning over SSH.
:param add_remotes: Determine and add 'parent' or 'fork' remotes to the cloned repo. :param add_remotes: Determine and add 'parent' or 'fork' remotes to the cloned repo.
:param reference: Reuse objects from the specified local repository, error out if the repository doesn't exist.
:param reference_if_able: Reuse objects from the specified local repository, only print warning if the repository doesn't exist.
""" """
import shlex import shlex
@@ -161,6 +165,12 @@ class Repo(GiteaModel):
if branch: if branch:
cmd += ["--branch", branch] cmd += ["--branch", branch]
if reference:
cmd += ["--reference", reference]
if reference_if_able:
cmd += ["--reference-if-able", reference_if_able]
if quiet: if quiet:
cmd += ["--quiet"] cmd += ["--quiet"]