import os import subprocess import sys from typing import Generator from typing import Optional import osc.commandline_git NEW_COMMENT_TEMPLATE = """ # # Lines starting with '#' will be ignored. # # Adding a comment to pull request {owner}/{repo}#{number} # """ DECLINE_REVIEW_TEMPLATE = """ # # Lines starting with '#' will be ignored. # # Requesting changes for pull request {owner}/{repo}#{number} # """ class PullRequestReviewInteractiveCommand(osc.commandline_git.GitObsCommand): """ Interactive review of pull requests """ name = "interactive" parent = "PullRequestReviewCommand" def init_arguments(self): self.add_argument( "id", nargs="*", help="Pull request ID in /# format", ) def run(self, args): from osc import gitea_api from osc.output import get_user_input if args.id: # TODO: deduplicate, skip those that do not require a review (print to stderr) pull_request_ids = args.id else: # keep only the list of pull request IDs, throw search results away # because the search returns issues instead of pull requests pr_obj_list = gitea_api.PullRequest.search(self.gitea_conn, review_requested=True) pr_obj_list.sort() pull_request_ids = [pr_obj.id for pr_obj in pr_obj_list] del pr_obj_list skipped_drafts = 0 for pr_index, pr_id in enumerate(pull_request_ids): self.print_gitea_settings() owner, repo, number = gitea_api.PullRequest.split_id(pr_id) pr_obj = gitea_api.PullRequest.get(self.gitea_conn, owner, repo, number) if pr_obj.draft: # we don't want to review drafts, they will change skipped_drafts += 1 continue self.clone_git(owner, repo, number) self.view(owner, repo, number, pr_index=pr_index, pr_count=len(pull_request_ids), pr_obj=pr_obj) while True: # TODO: print at least some context because the PR details disappear after closing less reply = get_user_input( f"Select a review action for '{pr_id}':", answers={ "a": "approve", "A": "approve and schedule for merging", "d": "decline", "m": "comment", "v": "view again", "s": "skip", "x": "exit", }, default_answer="s", ) if reply == "a": self.approve(owner, repo, number, commit=pr_obj.head_commit) break if reply == "A": self.approve(owner, repo, number, commit=pr_obj.head_commit) gitea_api.PullRequest.merge(self.gitea_conn, owner, repo, number, merge_when_checks_succeed=True) break elif reply == "d": self.decline(owner, repo, number) break elif reply == "m": self.comment(owner, repo, number) break elif reply == "v": self.view(owner, repo, number, pr_index=pr_index, pr_count=len(pull_request_ids), pr_obj=pr_obj) elif reply == "s": break elif reply == "x": return else: raise RuntimeError(f"Unhandled reply: {reply}") if skipped_drafts: print(file=sys.stderr) print(f"Skipped drafts: {skipped_drafts}", file=sys.stderr) def approve(self, owner: str, repo: str, number: int, *, commit: str): from osc import gitea_api gitea_api.PullRequest.approve_review(self.gitea_conn, owner, repo, number, commit=commit) def decline(self, owner: str, repo: str, number: int): from osc import gitea_api from .pr_create import edit_message message = edit_message(template=DECLINE_REVIEW_TEMPLATE.format(**locals())) # remove comments message = "\n".join([i for i in message.splitlines() if not i.startswith("#")]) # strip leading and trailing spaces message = message.strip() gitea_api.PullRequest.decline_review(self.gitea_conn, owner, repo, number, msg=message) def comment(self, owner: str, repo: str, number: int): from osc import gitea_api from .pr_create import edit_message message = edit_message(template=NEW_COMMENT_TEMPLATE.format(**locals())) # remove comments message = "\n".join([i for i in message.splitlines() if not i.startswith("#")]) # strip leading and trailing spaces message = message.strip() gitea_api.PullRequest.add_comment(self.gitea_conn, owner, repo, number, msg=message) def get_git_repo_path(self, owner: str, repo: str, number: int): path = os.path.join("~", ".cache", "git-obs", "reviews", self.gitea_login.name, f"{owner}_{repo}_{number}") path = os.path.expanduser(path) return path def clone_git(self, owner: str, repo: str, number: int): from osc import gitea_api repo_obj = gitea_api.Repo.get(self.gitea_conn, owner, repo) clone_url = repo_obj.ssh_url # TODO: it might be good to have a central cache for the git repos to speed cloning up path = self.get_git_repo_path(owner, repo, number) git = gitea_api.Git(path) if os.path.isdir(path): git.fetch() else: os.makedirs(path, exist_ok=True) git.clone(clone_url, directory=path, quiet=False) git.fetch_pull_request(number, force=True) def view( self, owner: str, repo: str, number: int, *, pr_index: int, pr_count: int, pr_obj: Optional["PullRequest"] = None, ): from osc import gitea_api from osc.core import highlight_diff from osc.output import sanitize_text from osc.output import tty if pr_obj is None: pr_obj = gitea_api.PullRequest.get(self.gitea_conn, owner, repo, number) # the process works with bytes rather than with strings # because the diffs may contain character sequences that cannot be decoded as utf-8 strings proc = subprocess.Popen(["less"], stdin=subprocess.PIPE) assert proc.stdin is not None # heading heading = tty.colorize(f"[{pr_index + 1}/{pr_count}] Reviewing pull request '{owner}/{repo}#{number}'...\n", "yellow,bold") proc.stdin.write(heading.encode("utf-8")) proc.stdin.write(b"\n") # pr details proc.stdin.write(pr_obj.to_human_readable_string().encode("utf-8")) proc.stdin.write(b"\n") proc.stdin.write(b"\n") # patch proc.stdin.write(tty.colorize("Patch:\n", "bold").encode("utf-8")) patch = gitea_api.PullRequest.get_patch(self.gitea_conn, owner, repo, number) patch = sanitize_text(patch) patch = highlight_diff(patch) proc.stdin.write(patch) proc.stdin.write(b"\n") # tardiff proc.stdin.write(tty.colorize("Archive diffs:\n", "bold").encode("utf-8")) tardiff_chunks = self.tardiff(owner, repo, number, pr_obj=pr_obj) for chunk in tardiff_chunks: chunk = sanitize_text(chunk) chunk = highlight_diff(chunk) try: proc.stdin.write(chunk) except BrokenPipeError: # user exits less before all data is written break proc.communicate() def get_tardiff_path(self): path = os.path.join("~", ".cache", "git-obs", "tardiff") path = os.path.expanduser(path) return path def tardiff(self, owner: str, repo: str, number: int, *, pr_obj: ".PullRequest") -> Generator[bytes, None, None]: from osc import gitea_api path = self.get_git_repo_path(owner, repo, number) git = gitea_api.Git(path) # the repo might be outdated, make sure the commits are available git.fetch() src_archives = git.lfs_ls_files(ref=pr_obj.head_commit) dst_archives = git.lfs_ls_files(ref=pr_obj.base_commit) def map_archives_by_name(archives: list): result = {} for fn, sha in archives: name = fn.rsplit("-", 1)[0] assert name not in result result[name] = (fn, sha) return result src_archives_by_name = map_archives_by_name(src_archives) dst_archives_by_name = map_archives_by_name(dst_archives) all_names = sorted(set(src_archives_by_name) | set(dst_archives_by_name)) path = self.get_tardiff_path() td = gitea_api.TarDiff(path) for name in all_names: src_archive = src_archives_by_name.get(name, (None, None)) dst_archive = dst_archives_by_name.get(name, (None, None)) if src_archive[0]: td.add_archive(src_archive[0], src_archive[1], git.lfs_cat_file(src_archive[0], ref=pr_obj.head_commit)) if dst_archive[0]: td.add_archive(dst_archive[0], dst_archive[1], git.lfs_cat_file(dst_archive[0], ref=pr_obj.base_commit)) # TODO: max output length / max lines; in such case, it would be great to list all the changed files at least yield from td.diff_archives(*dst_archive, *src_archive)