1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-02-21 01:32:10 +01:00

Add 'git-obs pr' command

This commit is contained in:
Daniel Mach 2025-01-14 16:16:20 +01:00
parent 166cadb31b
commit 124528e68b
13 changed files with 795 additions and 1 deletions

View File

@ -146,6 +146,11 @@ jobs:
run: |
podman pull ghcr.io/suse-autobuild/obs-server:latest
- name: "Configure git"
run: |
git config --global user.email "admin@example.com"
git config --global user.name "Admin"
- name: "Run tests"
run: |
cd behave

View File

@ -0,0 +1,115 @@
Feature: `git-obs pr` command
Background:
Given I set working directory to "{context.osc.temp}"
And I execute git-obs with args "repo fork pool test-GitPkgA"
And I execute git-obs with args "repo clone Admin test-GitPkgA --no-ssh-strict-host-key-checking"
And I set working directory to "{context.osc.temp}/test-GitPkgA"
And I execute "sed -i 's@^\(Version.*\)@\1.1@' *.spec"
And I execute "git commit -m 'Change version' -a"
And I execute "git push"
And I execute git-obs with args "pr create --title 'Change version' --description='some text'"
@destructive
Scenario: List pull requests
When I execute git-obs with args "pr list pool test-GitPkgA"
Then the exit code is 0
And stdout matches
"""
ID : pool/test-GitPkgA#1
URL : http://localhost:{context.podman.container.ports[gitea_http]}/pool/test-GitPkgA/pulls/1
Title : Change version
State : open
Draft : no
Merged : no
Author : Admin \(admin@example.com\)
Source : Admin/test-GitPkgA, branch: factory, commit: .*
Description : some text
"""
And stderr is
"""
Using the following Gitea settings:
* Config path: {context.git_obs.config}
* Login (name of the entry in the config file): admin
* URL: http://localhost:{context.podman.container.ports[gitea_http]}
* User: Admin
Total entries: 1
"""
@destructive
Scenario: Search pull requests
When I execute git-obs with args "pr search"
Then the exit code is 0
And stdout matches
"""
ID : pool/test-GitPkgA#1
URL : http://localhost:{context.podman.container.ports[gitea_http]}/pool/test-GitPkgA/pulls/1
Title : Change version
State : open
Author : Admin \(admin@example.com\)
Description : some text
"""
And stderr is
"""
Using the following Gitea settings:
* Config path: {context.git_obs.config}
* Login (name of the entry in the config file): admin
* URL: http://localhost:{context.podman.container.ports[gitea_http]}
* User: Admin
Total entries: 1
"""
@destructive
Scenario: Get a pull request
When I execute git-obs with args "pr get pool/test-GitPkgA#1"
Then the exit code is 0
And stdout matches
"""
ID : pool/test-GitPkgA#1
URL : http://localhost:{context.podman.container.ports[gitea_http]}/pool/test-GitPkgA/pulls/1
Title : Change version
State : open
Draft : no
Merged : no
Author : Admin \(admin@example.com\)
Source : Admin/test-GitPkgA, branch: factory, commit: .*
Description : some text
"""
And stderr is
"""
Using the following Gitea settings:
* Config path: {context.git_obs.config}
* Login (name of the entry in the config file): admin
* URL: http://localhost:{context.podman.container.ports[gitea_http]}
* User: Admin
Total entries: 1
"""
@destructive
Scenario: Get a pull request that doesn't exist
When I execute git-obs with args "pr get does-not/exist#1"
Then the exit code is 1
And stdout matches
"""
"""
And stderr is
"""
Using the following Gitea settings:
* Config path: {context.git_obs.config}
* Login (name of the entry in the config file): admin
* URL: http://localhost:{context.podman.container.ports[gitea_http]}
* User: Admin
Total entries: 0
ERROR: Couldn't retrieve the following pull requests: does-not/exist#1
"""

View File

@ -83,6 +83,12 @@ def run_in_context(context, cmd, can_fail=False, **run_args):
raise AssertionError('Running command "%s" failed: %s' % (cmd, context.cmd_exitcode))
@behave.step("I execute \"{command}\"")
def step_impl(context, command):
command = command.format(context=context)
run_in_context(context, command, can_fail=True)
@behave.step("stdout contains \"{text}\"")
def step_impl(context, text):
if re.search(text.format(context=context), context.cmd_stdout):

View File

@ -1,4 +1,5 @@
import os
import subprocess
import sys
import osc.commandline_common
@ -132,7 +133,9 @@ def main():
except oscerr.OscBaseError as e:
print_msg(str(e), print_to="error")
sys.exit(1)
except subprocess.CalledProcessError as e:
print_msg(str(e), print_to="error")
sys.exit(1)
if __name__ == "__main__":
main()

19
osc/commands_git/pr.py Normal file
View File

@ -0,0 +1,19 @@
import osc.commandline_git
# we decided not to use the command name 'pull' because that could be confused
# with the completely unrelated 'git pull' command
class PullRequestCommand(osc.commandline_git.GitObsCommand):
"""
Manage pull requests
"""
name = "pr"
def init_arguments(self):
pass
def run(self, args):
self.parser.print_help()

View File

@ -0,0 +1,212 @@
import os
import re
import subprocess
import sys
from typing import Optional
import osc.commandline_git
def get_editor() -> str:
import shutil
editor = os.getenv("EDITOR", None)
if editor:
candidates = [editor]
else:
candidates = ["vim", "vi"]
editor_path = None
for i in candidates:
editor_path = shutil.which(i)
if editor_path:
break
if not editor_path:
raise RuntimeError(f"Unable to start editor '{candidates[0]}'")
return editor_path
def run_editor(file_path: str):
cmd = [get_editor(), file_path]
subprocess.run(cmd)
def edit_message(template: Optional[str] = None) -> str:
import tempfile
with tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8", prefix="git_obs_message_") as f:
if template:
f.write(template)
f.flush()
run_editor(f.name)
f.seek(0)
return f.read()
NEW_PULL_REQUEST_TEMPLATE = """
# title
{title}
# description
{description}
#
# Please enter pull request title and description in the following format:
# <title>
# <blank line>
# <description>
#
# Lines starting with '#' will be ignored, and an empty message aborts the operation.
#
# Creating {source_owner}/{source_repo}#{source_branch} -> {target_owner}/{target_repo}#{target_branch}
#
""".lstrip()
class PullRequestCreateCommand(osc.commandline_git.GitObsCommand):
"""
Create a pull request
"""
name = "create"
parent = "PullRequestCommand"
def init_arguments(self):
self.add_argument(
"--title",
metavar="TEXT",
help="Pull request title",
)
self.add_argument(
"--description",
metavar="TEXT",
help="Pull request description (body)",
)
self.add_argument(
"--source-owner",
metavar="OWNER",
help="Owner of the source repo (default: derived from remote URL in local git repo)",
)
self.add_argument(
"--source-repo",
metavar="REPO",
help="Name of the source repo (default: derived from remote URL in local git repo)",
)
self.add_argument(
"--source-branch",
metavar="BRANCH",
help="Source branch (default: the current branch in local git repo)",
)
self.add_argument(
"--target-branch",
metavar="BRANCH",
help="Target branch (default: derived from the current branch in local git repo)",
)
def run(self, args):
from osc import gitea_api
# the source args are optional, but if one of them is used, the others must be used too
source_args = (args.source_owner, args.source_repo, args.source_branch)
if sum((int(i is not None) for i in source_args)) not in (0, len(source_args)):
self.parser.error("All of the following options must be used together: --source-owner, --source-repo, --source-branch")
self.print_gitea_settings()
use_local_git = args.source_owner is None
if use_local_git:
# local git repo
git = gitea_api.Git(".")
local_owner, local_repo = git.get_owner_repo()
local_branch = git.current_branch
local_rev = git.get_branch_head(local_branch)
# remote git repo - source
if use_local_git:
source_owner = local_owner
source_repo = local_repo
source_branch = local_branch
else:
source_owner = args.source_owner
source_repo = args.source_repo
source_branch = args.source_branch
source_repo_data = gitea_api.Repo.get(self.gitea_conn, source_owner, source_repo).json()
source_branch_data = gitea_api.Branch.get(self.gitea_conn, source_owner, source_repo, source_branch).json()
source_rev = source_branch_data["commit"]["id"]
# remote git repo - target
target_owner, target_repo = source_repo_data["parent"]["full_name"].split("/")
if source_branch.startswith("for/"):
# source branch name format: for/<target-branch>/<what-the-branch-name-would-normally-be>
target_branch = source_branch.split("/")[1]
else:
target_branch = source_branch
target_branch_data = gitea_api.Branch.get(self.gitea_conn, target_owner, target_repo, target_branch).json()
target_rev = target_branch_data["commit"]["id"]
print("Creating a pull request ...", file=sys.stderr)
if use_local_git:
print(f" * Local git: branch: {local_branch}, rev: {local_rev}", file=sys.stderr)
print(f" * Source: {source_owner}/{source_repo}, branch: {source_branch}, rev: {source_rev}", file=sys.stderr)
print(f" * Target: {target_owner}/{target_repo}, branch: {target_branch}, rev: {target_rev}", file=sys.stderr)
if use_local_git and local_rev != source_rev:
from osc.output import tty
print(f"{tty.colorize('ERROR', 'red,bold')}: Local commit doesn't correspond with the latest commit in the remote source branch")
sys.exit(1)
if source_rev == target_rev:
from osc.output import tty
print(f"{tty.colorize('ERROR', 'red,bold')}: Source and target are identical, make and push changes to the remote source repo first")
sys.exit(1)
title = args.title or ""
description = args.description or ""
if not title or not description:
# TODO: add list of commits and list of changed files to the template; requires local git repo
message = edit_message(template=NEW_PULL_REQUEST_TEMPLATE.format(**locals()))
# remove comments
message = "\n".join([i for i in message.splitlines() if not i.startswith("#")])
# strip leading and trailing spaces
message = message.strip()
if not message:
raise RuntimeError("Aborting operation due to empty title and description.")
parts = re.split(r"\n\n", message, 1)
if len(parts) == 1:
# empty description
title = parts[0]
description = ""
else:
title = parts[0]
description = parts[1]
title = title.strip()
description = description.strip()
pull = gitea_api.PullRequest.create(
self.gitea_conn,
target_owner=target_owner,
target_repo=target_repo,
target_branch=target_branch,
source_owner=source_owner,
# source_repo is not required because the information lives in Gitea database
source_branch=source_branch,
title=title,
description=description,
).json()
print("", file=sys.stderr)
print("Pull request created:", file=sys.stderr)
print(gitea_api.PullRequest.to_human_readable_string(pull))

View File

@ -0,0 +1,62 @@
import sys
import osc.commandline_git
class PullRequestGetCommand(osc.commandline_git.GitObsCommand):
"""
Get details about the specified pull requests
"""
name = "get"
aliases = ["show"] # for compatibility with osc
parent = "PullRequestCommand"
def init_arguments(self):
self.add_argument(
"id",
nargs="+",
help="Pull request ID in <owner>/<repo>#<number> format",
)
self.add_argument(
"-p",
"--patch",
action="store_true",
help="Show patches associated with the pull requests",
)
def run(self, args):
from osc import gitea_api
from osc.core import highlight_diff
from osc.output import tty
self.print_gitea_settings()
num_entries = 0
failed_entries = []
for pr_id in args.id:
owner, repo, number = gitea_api.PullRequest.split_id(pr_id)
try:
pr = gitea_api.PullRequest.get(self.gitea_conn, owner, repo, number).json()
num_entries += 1
except gitea_api.GiteaException as e:
if e.status == 404:
failed_entries.append(pr_id)
continue
raise
print(gitea_api.PullRequest.to_human_readable_string(pr))
if args.patch:
print("")
print(tty.colorize("Patch:", "bold"))
patch = gitea_api.PullRequest.get_patch(self.gitea_conn, owner, repo, number).data
patch = highlight_diff(patch)
print(patch.decode("utf-8"))
print(f"Total entries: {num_entries}", file=sys.stderr)
if failed_entries:
print(
f"{tty.colorize('ERROR', 'red,bold')}: Couldn't retrieve the following pull requests: {', '.join(failed_entries)}",
file=sys.stderr,
)
sys.exit(1)

View File

@ -0,0 +1,36 @@
import sys
import osc.commandline_git
class PullRequestListCommand(osc.commandline_git.GitObsCommand):
"""
List pull requests in a repository
"""
name = "list"
parent = "PullRequestCommand"
def init_arguments(self):
self.add_argument_owner()
self.add_argument_repo()
self.add_argument(
"--state",
choices=["open", "closed", "all"],
default="open",
help="State of the pull requests (default: open)",
)
def run(self, args):
from osc import gitea_api
self.print_gitea_settings()
data = gitea_api.PullRequest.list(self.gitea_conn, args.owner, args.repo, state=args.state).json()
text = gitea_api.PullRequest.list_to_human_readable_string(data, sort=True)
if text:
print(text)
print("", file=sys.stderr)
print(f"Total entries: {len(data)}", file=sys.stderr)

View File

@ -0,0 +1,79 @@
import sys
import osc.commandline_git
class PullRequestSearchCommand(osc.commandline_git.GitObsCommand):
"""
Search pull requests in the whole gitea instance
"""
name = "search"
parent = "PullRequestCommand"
def init_arguments(self):
self.add_argument(
"--state",
choices=["open", "closed"],
default="open",
help="Filter by state: open, closed (default: open)",
)
self.add_argument(
"--title",
help="Filter by substring in title",
)
self.add_argument(
"--owner",
help="Filter by owner of the repository associated with the pull requests",
)
self.add_argument(
"--label",
dest="labels",
metavar="LABEL",
action="append",
help="Filter by associated labels. Non existent labels are discarded. Can be specified multiple times.",
)
self.add_argument(
"--assigned",
action="store_true",
help="Filter pull requests assigned to you",
)
self.add_argument(
"--created",
action="store_true",
help="Filter pull requests created by you",
)
self.add_argument(
"--mentioned",
action="store_true",
help="Filter pull requests mentioning you",
)
self.add_argument(
"--review-requested",
action="store_true",
help="Filter pull requests requesting your review",
)
def run(self, args):
from osc import gitea_api
self.print_gitea_settings()
data = gitea_api.PullRequest.search(
self.gitea_conn,
state=args.state,
title=args.title,
owner=args.owner,
labels=args.labels,
assigned=args.assigned,
created=args.created,
mentioned=args.mentioned,
review_requested=args.review_requested,
).json()
text = gitea_api.PullRequest.list_to_human_readable_string(data, sort=True)
if text:
print(text)
print("", file=sys.stderr)
print(f"Total entries: {len(data)}", file=sys.stderr)

View File

@ -7,6 +7,8 @@ from .branch import Branch
from .conf import Config
from .conf import Login
from .fork import Fork
from .git import Git
from .pr import PullRequest
from .ssh_key import SSHKey
from .repo import Repo
from .user import User

31
osc/gitea_api/git.py Normal file
View File

@ -0,0 +1,31 @@
import os
import subprocess
import urllib
from typing import Tuple
class Git:
def __init__(self, workdir):
self.abspath = os.path.abspath(workdir)
def _run_git(self, args) -> str:
return subprocess.check_output(["git"] + args, encoding="utf-8", cwd=self.abspath).strip()
@property
def current_branch(self) -> str:
return self._run_git(["branch", "--show-current"])
def get_branch_head(self, branch: str) -> str:
return self._run_git(["rev-parse", branch])
def get_remote_url(self, name: str = "origin") -> str:
return self._run_git(["remote", "get-url", name])
def get_owner_repo(self, remote: str = "origin") -> Tuple[str, str]:
remote_url = self.get_remote_url(name=remote)
parsed_remote_url = urllib.parse.urlparse(remote_url)
path = parsed_remote_url.path
if path.endswith(".git"):
path = path[:-4]
owner, repo = path.strip("/").split("/")[:2]
return owner, repo

212
osc/gitea_api/pr.py Normal file
View File

@ -0,0 +1,212 @@
import re
from typing import List
from typing import Optional
from typing import Tuple
from .connection import Connection
from .connection import GiteaHTTPResponse
class PullRequest:
@classmethod
def cmp(cls, entry: dict):
if "base" in entry:
# a proper pull request
return entry["base"]["repo"]["full_name"], entry["number"]
else:
# an issue without pull request details
return entry["repository"]["full_name"], entry["number"]
@classmethod
def split_id(cls, pr_id: str) -> Tuple[str, str, str]:
"""
Split <owner>/<repo>#<number> into individual components and return them in a tuple.
"""
match = re.match(r"(.+)/(.+)#(.+)", pr_id)
if not match:
raise ValueError(f"Invalid pull request id: {pr_id}")
return match.groups()
@classmethod
def to_human_readable_string(cls, entry: dict):
from osc.output import KeyValueTable
from . import User
def yes_no(value):
return "yes" if value else "no"
if "base" in entry:
# a proper pull request
entry_id = f"{entry['base']['repo']['full_name']}#{entry['number']}"
is_pull_request = True
else:
# an issue without pull request details
entry_id = f"{entry['repository']['full_name']}#{entry['number']}"
is_pull_request = False
# HACK: search API returns issues, the URL needs to be transformed to a pull request URL
entry_url = entry["url"]
entry_url = re.sub(r"^(.*)/api/v1/repos/(.+/.+)/issues/([0-9]+)$", r"\1/\2/pulls/\3", entry_url)
table = KeyValueTable()
table.add("ID", entry_id, color="bold")
table.add("URL", f"{entry_url}")
table.add("Title", f"{entry['title']}")
table.add("State", entry["state"])
if is_pull_request:
table.add("Draft", yes_no(entry["draft"]))
table.add("Merged", yes_no(entry["merged"]))
table.add("Author", f"{User.to_login_full_name_email_string(entry['user'])}")
if is_pull_request:
table.add("Source", f"{entry['head']['repo']['full_name']}, branch: {entry['head']['ref']}, commit: {entry['head']['sha']}")
table.add("Description", entry["body"])
return str(table)
@classmethod
def list_to_human_readable_string(cls, entries: List, sort: bool = False):
if sort:
entries = sorted(entries, key=cls.cmp)
result = []
for entry in entries:
result.append(cls.to_human_readable_string(entry))
return "\n\n".join(result)
@classmethod
def create(
cls,
conn: Connection,
*,
target_owner: str,
target_repo: str,
target_branch: str,
source_owner: str,
source_branch: str,
title: str,
description: Optional[str] = None,
) -> GiteaHTTPResponse:
"""
Create a pull request to ``owner``/``repo`` to the ``base`` branch.
The pull request comes from a fork. The fork repo name is determined from gitea database.
:param conn: Gitea ``Connection`` instance.
:param target_owner: Owner of the target repo.
:param target_repo: Name of the target repo.
:param target_branch: Name of the target branch in the target repo.
:param source_owner: Owner of the source (forked) repo.
:param source_branch: Name of the source branch in the source (forked) repo.
:param title: Pull request title.
:param description: Pull request description.
"""
url = conn.makeurl("repos", target_owner, target_repo, "pulls")
data = {
"base": target_branch,
"head": f"{source_owner}:{source_branch}",
"title": title,
"body": description,
}
return conn.request("POST", url, json_data=data)
@classmethod
def get(
cls,
conn: Connection,
owner: str,
repo: str,
number: str,
) -> GiteaHTTPResponse:
"""
Get a pull request.
:param conn: Gitea ``Connection`` instance.
:param owner: Owner of the repo.
:param repo: Name of the repo.
:param number: Number of the pull request in the repo.
"""
url = conn.makeurl("repos", owner, repo, "pulls", number)
return conn.request("GET", url)
@classmethod
def list(
cls,
conn: Connection,
owner: str,
repo: str,
*,
state: Optional[str] = "open",
) -> GiteaHTTPResponse:
"""
List pull requests in a repo.
:param conn: Gitea ``Connection`` instance.
:param owner: Owner of the repo.
:param repo: Name of the repo.
:param state: Filter by state: open, closed, all. Defaults to open.
"""
if state == "all":
state = None
q = {
"state": state,
}
url = conn.makeurl("repos", owner, repo, "pulls", query=q)
return conn.request("GET", url)
@classmethod
def search(
cls,
conn: Connection,
*,
state: str = "open",
title: Optional[str] = None,
owner: Optional[str] = None,
labels: Optional[List[str]] = None,
assigned: bool = False,
created: bool = False,
mentioned: bool = False,
review_requested: bool = False,
) -> GiteaHTTPResponse:
"""
Search pull requests.
:param conn: Gitea ``Connection`` instance.
:param state: Filter by state: open, closed. Defaults to open.
:param title: Filter by substring in title.
:param owner: Filter by owner of the repository associated with the pull requests.
:param labels: Filter by associated labels. Non existent labels are discarded.
:param assigned: Filter pull requests assigned to you.
:param created: Filter pull requests created by you.
:param mentioned: Filter pull requests mentioning you.
:param review_requested: Filter pull requests requesting your review.
"""
q = {
"type": "pulls",
"state": state,
"q": title,
"owner": owner,
"labels": ",".join(labels) if labels else None,
"assigned": assigned,
"created": created,
"mentioned": mentioned,
"review_requested": review_requested,
}
url = conn.makeurl("repos", "issues", "search", query=q)
return conn.request("GET", url)
@classmethod
def get_patch(
cls,
conn: Connection,
owner: str,
repo: str,
number: str,
) -> GiteaHTTPResponse:
"""
Get a patch associated with a pull request.
:param conn: Gitea ``Connection`` instance.
:param owner: Owner of the repo.
:param repo: Name of the repo.
:param number: Number of the pull request in the repo.
"""
url = conn.makeurl("repos", owner, repo, "pulls", f"{number}.patch")
return conn.request("GET", url)

View File

@ -3,6 +3,18 @@ from .connection import GiteaHTTPResponse
class User:
@classmethod
def to_full_name_email_string(cls, data):
full_name = data["full_name"]
email = data["email"]
if full_name:
return f"{full_name} <{email}>"
return email
@classmethod
def to_login_full_name_email_string(cls, data):
return f"{data['login']} ({cls.to_full_name_email_string(data)})"
@classmethod
def get(
cls,