1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-09-13 08:24:31 +02:00
Files
github.com_openSUSE_osc/osc/commands_git/pr_review_interactive.py

270 lines
9.5 KiB
Python

import os
import subprocess
import sys
from typing import Generator
from typing import Optional
import osc.commandline_git
NEW_COMMENT_TEMPLATE = """
#
# Lines starting with '#' will be ignored.
#
# Adding a comment to pull request {owner}/{repo}#{number}
#
"""
DECLINE_REVIEW_TEMPLATE = """
#
# Lines starting with '#' will be ignored.
#
# Requesting changes for pull request {owner}/{repo}#{number}
#
"""
class PullRequestReviewInteractiveCommand(osc.commandline_git.GitObsCommand):
"""
Interactive review of pull requests
"""
name = "interactive"
parent = "PullRequestReviewCommand"
def init_arguments(self):
self.add_argument(
"id",
nargs="*",
help="Pull request ID in <owner>/<repo>#<number> format",
)
def run(self, args):
from osc import gitea_api
from osc.output import get_user_input
if args.id:
# TODO: deduplicate, skip those that do not require a review (print to stderr)
pull_request_ids = args.id
else:
# keep only the list of pull request IDs, throw search results away
# because the search returns issues instead of pull requests
pr_obj_list = gitea_api.PullRequest.search(self.gitea_conn, review_requested=True)
pr_obj_list.sort()
pull_request_ids = [pr_obj.id for pr_obj in pr_obj_list]
del pr_obj_list
skipped_drafts = 0
for pr_index, pr_id in enumerate(pull_request_ids):
self.print_gitea_settings()
owner, repo, number = gitea_api.PullRequest.split_id(pr_id)
pr_obj = gitea_api.PullRequest.get(self.gitea_conn, owner, repo, number)
if pr_obj.draft:
# we don't want to review drafts, they will change
skipped_drafts += 1
continue
self.clone_git(owner, repo, number)
self.view(owner, repo, number, pr_index=pr_index, pr_count=len(pull_request_ids), pr_obj=pr_obj)
while True:
# TODO: print at least some context because the PR details disappear after closing less
reply = get_user_input(
f"Select a review action for '{pr_id}':",
answers={
"a": "approve",
"A": "approve and schedule for merging",
"d": "decline",
"m": "comment",
"v": "view again",
"s": "skip",
"x": "exit",
},
default_answer="s",
)
if reply == "a":
self.approve(owner, repo, number, commit=pr_obj.head_commit)
break
if reply == "A":
self.approve(owner, repo, number, commit=pr_obj.head_commit)
gitea_api.PullRequest.merge(self.gitea_conn, owner, repo, number, merge_when_checks_succeed=True)
break
elif reply == "d":
self.decline(owner, repo, number)
break
elif reply == "m":
self.comment(owner, repo, number)
break
elif reply == "v":
self.view(owner, repo, number, pr_index=pr_index, pr_count=len(pull_request_ids), pr_obj=pr_obj)
elif reply == "s":
break
elif reply == "x":
return
else:
raise RuntimeError(f"Unhandled reply: {reply}")
if skipped_drafts:
print(file=sys.stderr)
print(f"Skipped drafts: {skipped_drafts}", file=sys.stderr)
def approve(self, owner: str, repo: str, number: int, *, commit: str):
from osc import gitea_api
gitea_api.PullRequest.approve_review(self.gitea_conn, owner, repo, number, commit=commit)
def decline(self, owner: str, repo: str, number: int):
from osc import gitea_api
from .pr_create import edit_message
message = edit_message(template=DECLINE_REVIEW_TEMPLATE.format(**locals()))
# remove comments
message = "\n".join([i for i in message.splitlines() if not i.startswith("#")])
# strip leading and trailing spaces
message = message.strip()
gitea_api.PullRequest.decline_review(self.gitea_conn, owner, repo, number, msg=message)
def comment(self, owner: str, repo: str, number: int):
from osc import gitea_api
from .pr_create import edit_message
message = edit_message(template=NEW_COMMENT_TEMPLATE.format(**locals()))
# remove comments
message = "\n".join([i for i in message.splitlines() if not i.startswith("#")])
# strip leading and trailing spaces
message = message.strip()
gitea_api.PullRequest.add_comment(self.gitea_conn, owner, repo, number, msg=message)
def get_git_repo_path(self, owner: str, repo: str, number: int):
path = os.path.join("~", ".cache", "git-obs", "reviews", self.gitea_login.name, f"{owner}_{repo}_{number}")
path = os.path.expanduser(path)
return path
def clone_git(self, owner: str, repo: str, number: int):
from osc import gitea_api
repo_obj = gitea_api.Repo.get(self.gitea_conn, owner, repo)
clone_url = repo_obj.ssh_url
# TODO: it might be good to have a central cache for the git repos to speed cloning up
path = self.get_git_repo_path(owner, repo, number)
git = gitea_api.Git(path)
if os.path.isdir(path):
git.fetch()
else:
os.makedirs(path, exist_ok=True)
git.clone(clone_url, directory=path, quiet=False)
git.fetch_pull_request(number, force=True)
def view(
self,
owner: str,
repo: str,
number: int,
*,
pr_index: int,
pr_count: int,
pr_obj: Optional["PullRequest"] = None,
):
from osc import gitea_api
from osc.core import highlight_diff
from osc.output import sanitize_text
from osc.output import tty
if pr_obj is None:
pr_obj = gitea_api.PullRequest.get(self.gitea_conn, owner, repo, number)
# the process works with bytes rather than with strings
# because the diffs may contain character sequences that cannot be decoded as utf-8 strings
proc = subprocess.Popen(["less"], stdin=subprocess.PIPE)
assert proc.stdin is not None
# heading
heading = tty.colorize(f"[{pr_index + 1}/{pr_count}] Reviewing pull request '{owner}/{repo}#{number}'...\n", "yellow,bold")
proc.stdin.write(heading.encode("utf-8"))
proc.stdin.write(b"\n")
# pr details
proc.stdin.write(pr_obj.to_human_readable_string().encode("utf-8"))
proc.stdin.write(b"\n")
proc.stdin.write(b"\n")
# patch
proc.stdin.write(tty.colorize("Patch:\n", "bold").encode("utf-8"))
patch = gitea_api.PullRequest.get_patch(self.gitea_conn, owner, repo, number)
patch = sanitize_text(patch)
patch = highlight_diff(patch)
proc.stdin.write(patch)
proc.stdin.write(b"\n")
# tardiff
proc.stdin.write(tty.colorize("Archive diffs:\n", "bold").encode("utf-8"))
tardiff_chunks = self.tardiff(owner, repo, number, pr_obj=pr_obj)
for chunk in tardiff_chunks:
chunk = sanitize_text(chunk)
chunk = highlight_diff(chunk)
try:
proc.stdin.write(chunk)
except BrokenPipeError:
# user exits less before all data is written
break
proc.communicate()
def get_tardiff_path(self):
path = os.path.join("~", ".cache", "git-obs", "tardiff")
path = os.path.expanduser(path)
return path
def tardiff(self, owner: str, repo: str, number: int, *, pr_obj: ".PullRequest") -> Generator[bytes, None, None]:
from osc import gitea_api
path = self.get_git_repo_path(owner, repo, number)
git = gitea_api.Git(path)
# the repo might be outdated, make sure the commits are available
git.fetch()
src_archives = git.lfs_ls_files(ref=pr_obj.head_commit)
dst_archives = git.lfs_ls_files(ref=pr_obj.base_commit)
def map_archives_by_name(archives: list):
result = {}
for fn, sha in archives:
name = fn.rsplit("-", 1)[0]
assert name not in result
result[name] = (fn, sha)
return result
src_archives_by_name = map_archives_by_name(src_archives)
dst_archives_by_name = map_archives_by_name(dst_archives)
all_names = sorted(set(src_archives_by_name) | set(dst_archives_by_name))
path = self.get_tardiff_path()
td = gitea_api.TarDiff(path)
for name in all_names:
src_archive = src_archives_by_name.get(name, (None, None))
dst_archive = dst_archives_by_name.get(name, (None, None))
if src_archive[0]:
td.add_archive(src_archive[0], src_archive[1], git.lfs_cat_file(src_archive[0], ref=pr_obj.head_commit))
if dst_archive[0]:
td.add_archive(dst_archive[0], dst_archive[1], git.lfs_cat_file(dst_archive[0], ref=pr_obj.base_commit))
# TODO: max output length / max lines; in such case, it would be great to list all the changed files at least
yield from td.diff_archives(*dst_archive, *src_archive)