1
0
mirror of https://github.com/openSUSE/osc.git synced 2024-11-14 00:06:15 +01:00

Implement git-obs command with several subcommands

This commit is contained in:
Daniel Mach 2024-08-02 15:10:32 +02:00
parent 0d28997595
commit 7e52a4a050
20 changed files with 545 additions and 12 deletions

13
git-obs.py Executable file
View File

@ -0,0 +1,13 @@
#!/usr/bin/env python3
"""
This wrapper allows git-obs to be called from the source directory during development.
"""
import osc.commandline_git
if __name__ == "__main__":
osc.commandline_git.main()

135
osc/commandline_git.py Normal file
View File

@ -0,0 +1,135 @@
import os
import sys
import osc.commandline
import osc.commands_git
from . import gitea_api
from . import oscerr
from .output import print_msg
class GitObsCommand(osc.commandline.Command):
@property
def gitea_conf(self):
return self.main_command.gitea_conf
@property
def gitea_login(self):
return self.main_command.gitea_login
@property
def gitea_conn(self):
return self.main_command.gitea_conn
def print_gitea_settings(self):
print(f"Using the following Gitea settings:", file=sys.stderr)
print(f" * Config path: {self.gitea_conf.path}", file=sys.stderr)
print(f" * Login (name of the entry in the config file): {self.gitea_login.name}", file=sys.stderr)
print(f" * URL: {self.gitea_login.url}", file=sys.stderr)
print(f" * User: {self.gitea_login.user}", file=sys.stderr)
print("", file=sys.stderr)
def add_argument_owner(self):
self.add_argument(
"owner",
help="Name of the repository owner (login, org)",
)
def add_argument_repo(self):
self.add_argument(
"repo",
help="Name of the repository",
)
def add_argument_new_repo_name(self):
self.add_argument(
"--new-repo-name",
help="Name of the newly forked repo",
)
class GitObsMainCommand(osc.commandline.MainCommand):
name = "git-obs"
MODULES = (
("osc.commands_git", osc.commands_git.__path__[0]),
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._args = None
self._gitea_conf = None
self._gitea_login = None
self._gitea_conn = None
def init_arguments(self):
self.add_argument(
"--gitea-config",
help="Path to gitea config. Default: $GIT_OBS_CONFIG or ~/.config/tea/config.yml.",
)
self.add_argument(
"-G",
"--gitea-login",
help="Name of the login entry in the config file. Default: $GIT_OBS_LOGIN or the default entry from the config file.",
)
def post_parse_args(self, args):
if not args.gitea_config:
value = os.getenv("GIT_OBS_CONFIG", "").strip()
if value:
args.gitea_config = value
if not args.gitea_login:
value = os.getenv("GIT_OBS_LOGIN", "").strip()
if value:
args.gitea_login = value
self._args = args
@classmethod
def main(cls, argv=None, run=True):
"""
Initialize OscMainCommand, load all commands and run the selected command.
"""
cmd = cls()
cmd.load_commands()
if run:
args = cmd.parse_args(args=argv)
exit_code = cmd.run(args)
sys.exit(exit_code)
else:
args = None
return cmd, args
@property
def gitea_conf(self):
if self._gitea_conf is None:
self._gitea_conf = gitea_api.Config(self._args.gitea_config)
return self._gitea_conf
@property
def gitea_login(self):
if self._gitea_login is None:
self._gitea_login = self.gitea_conf.get_login(name=self._args.gitea_login)
return self._gitea_login
@property
def gitea_conn(self):
if self._gitea_conn is None:
self._gitea_conn = gitea_api.Connection(self.gitea_login)
assert self._gitea_login is not None
return self._gitea_conn
def main():
try:
GitObsMainCommand.main()
except oscerr.OscBaseError as e:
print_msg(str(e), print_to="error")
sys.exit(1)
if __name__ == "__main__":
main()

View File

15
osc/commands_git/login.py Normal file
View File

@ -0,0 +1,15 @@
import osc.commandline_git
class LoginCommand(osc.commandline_git.GitObsCommand):
"""
Manage configured credentials to Gitea servers
"""
name = "login"
def init_arguments(self):
pass
def run(self, args):
self.parser.print_help()

View File

@ -0,0 +1,35 @@
import sys
import osc.commandline_git
class LoginAddCommand(osc.commandline_git.GitObsCommand):
"""
Add a Gitea credentials entry
"""
name = "add"
parent = "LoginCommand"
def init_arguments(self):
self.parser.add_argument("name")
self.parser.add_argument("--url", required=True)
self.parser.add_argument("--user", required=True)
self.parser.add_argument("--token", required=True)
self.parser.add_argument("--ssh-key")
self.parser.add_argument("--set-as-default", action="store_true", default=None)
def run(self, args):
from osc import gitea_api
print(f"Adding a Gitea credentials entry with name '{args.name}' ...", file=sys.stderr)
print(f" * Config path: {self.gitea_conf.path}", file=sys.stderr)
print("", file=sys.stderr)
# TODO: try to authenticate to verify that the new entry works
login = gitea_api.Login(name=args.name, url=args.url, user=args.user, token=args.token, ssh_key=args.ssh_key, default=args.set_as_default)
self.gitea_conf.add_login(login)
print("Added entry:")
print(login.to_human_readable_string())

View File

@ -0,0 +1,18 @@
import osc.commandline_git
class LoginListCommand(osc.commandline_git.GitObsCommand):
"""
List Gitea credentials entries
"""
name = "list"
parent = "LoginCommand"
def init_arguments(self):
self.parser.add_argument("--show-tokens", action="store_true", help="Show tokens in the output")
def run(self, args):
for login in self.gitea_conf.list_logins():
print(login.to_human_readable_string(show_token=args.show_tokens))
print()

View File

@ -0,0 +1,25 @@
import sys
import osc.commandline_git
class LoginRemoveCommand(osc.commandline_git.GitObsCommand):
"""
Remove a Gitea credentials entry
"""
name = "remove"
parent = "LoginCommand"
def init_arguments(self):
self.parser.add_argument("name")
def run(self, args):
print(f"Removing a Gitea credentials entry with name '{args.name}' ...", file=sys.stderr)
print(f" * Config path: {self.gitea_conf.path}", file=sys.stderr)
print("", file=sys.stderr)
login = self.gitea_conf.remove_login(args.name)
print("Removed entry:")
print(login.to_human_readable_string())

View File

@ -0,0 +1,45 @@
import sys
import osc.commandline_git
class LoginUpdateCommand(osc.commandline_git.GitObsCommand):
"""
Update a Gitea credentials entry
"""
name = "update"
parent = "LoginCommand"
def init_arguments(self):
self.parser.add_argument("name")
self.parser.add_argument("--new-name")
self.parser.add_argument("--new-url")
self.parser.add_argument("--new-user")
self.parser.add_argument("--new-token")
self.parser.add_argument("--new-ssh-key")
self.parser.add_argument("--set-as-default", action="store_true")
def run(self, args):
print(f"Updating a Gitea credentials entry with name '{args.name}' ...", file=sys.stderr)
print(f" * Config path: {self.gitea_conf.path}", file=sys.stderr)
print("", file=sys.stderr)
# TODO: try to authenticate to verify that the updated entry works
original_login = self.gitea_conf.get_login(args.name)
print("Original entry:")
print(original_login.to_human_readable_string())
updated_login = self.gitea_conf.update_login(
args.name,
new_name=args.new_name,
new_url=args.new_url,
new_user=args.new_user,
new_token=args.new_token,
new_ssh_key=args.new_ssh_key,
set_as_default=args.set_as_default,
)
print("")
print("Updated entry:")
print(updated_login.to_human_readable_string())

15
osc/commands_git/repo.py Normal file
View File

@ -0,0 +1,15 @@
import osc.commandline_git
class RepoCommand(osc.commandline_git.GitObsCommand):
"""
Manage git repos
"""
name = "repo"
def init_arguments(self):
pass
def run(self, args):
self.parser.print_help()

View File

@ -0,0 +1,56 @@
import osc.commandline_git
class RepoCloneCommand(osc.commandline_git.GitObsCommand):
"""
Clone a git repo
"""
name = "clone"
parent = "RepoCommand"
def init_arguments(self):
self.add_argument_owner()
self.add_argument_repo()
self.add_argument(
"-a",
"--anonymous",
action="store_true",
default=None,
help="Clone anonymously via the http protocol",
)
self.add_argument(
"-i",
"--ssh-key",
help="Path to a private SSH key (identity file)",
)
self.add_argument(
"--no-ssh-strict-host-key-checking",
action="store_true",
help="Set 'StrictHostKeyChecking no' ssh option",
)
# TODO: replace with an optional argument to get closer to the `git clone` command?
self.add_argument(
"--directory",
help="Clone into the given directory",
)
def run(self, args):
from osc import gitea_api
self.print_gitea_settings()
gitea_api.Repo.clone(
self.gitea_conn,
args.owner,
args.repo,
directory=args.directory,
anonymous=args.anonymous,
add_remotes=True,
ssh_private_key_path=self.gitea_login.ssh_key or args.ssh_key,
ssh_strict_host_key_checking=not(args.no_ssh_strict_host_key_checking),
)

View File

@ -0,0 +1,36 @@
import sys
import osc.commandline_git
class RepoForkCommand(osc.commandline_git.GitObsCommand):
"""
Fork a git repo
"""
name = "fork"
parent = "RepoCommand"
def init_arguments(self):
self.add_argument_owner()
self.add_argument_repo()
self.add_argument_new_repo_name()
def run(self, args):
from osc import gitea_api
from osc.output import tty
self.print_gitea_settings()
print(f"Forking git repo {args.owner}/{args.repo} ...", file=sys.stderr)
try:
response = gitea_api.Fork.create(self.gitea_conn, args.owner, args.repo, new_repo_name=args.new_repo_name)
repo = response.json()
fork_owner = repo["owner"]["login"]
fork_repo = repo["name"]
print(f" * Fork created: {fork_owner}/{fork_repo}", file=sys.stderr)
except gitea_api.ForkExists as e:
fork_owner = e.fork_owner
fork_repo = e.fork_repo
print(f" * Fork already exists: {fork_owner}/{fork_repo}", file=sys.stderr)
print(f" * {tty.colorize('WARNING', 'yellow,bold')}: Using an existing fork with a different name than requested", file=sys.stderr)

View File

@ -0,0 +1,15 @@
import osc.commandline_git
class SSHKeyCommand(osc.commandline_git.GitObsCommand):
"""
Manage public SSH keys
"""
name = "ssh-key"
def init_arguments(self):
pass
def run(self, args):
self.parser.print_help()

View File

@ -0,0 +1,38 @@
import os
import osc.commandline_git
class SSHKeyAddCommand(osc.commandline_git.GitObsCommand):
"""
"""
name = "add"
parent = "SSHKeyCommand"
def init_arguments(self):
group = self.parser.add_mutually_exclusive_group(required=True)
group.add_argument(
"--key",
help="SSH public key",
)
group.add_argument(
"--key-path",
metavar="PATH",
help="Path to the SSH public key",
)
def run(self, args):
from osc import gitea_api
self.print_gitea_settings()
if args.key:
key = args.key
else:
with open(os.path.expanduser(args.key_path)) as f:
key = f.read().strip()
response = gitea_api.SSHKey.create(self.gitea_conn, key)
print("Added entry:")
print(gitea_api.SSHKey.to_human_readable_string(response.json()))

View File

@ -0,0 +1,21 @@
import osc.commandline_git
class SSHKeyListCommand(osc.commandline_git.GitObsCommand):
"""
"""
name = "list"
parent = "SSHKeyCommand"
def init_arguments(self):
pass
def run(self, args):
from osc import gitea_api
self.print_gitea_settings()
for i in gitea_api.SSHKey.list(self.gitea_conn).json():
print(gitea_api.SSHKey.to_human_readable_string(i))
print()

View File

@ -0,0 +1,30 @@
import sys
import osc.commandline_git
class SSHKeyRemoveCommand(osc.commandline_git.GitObsCommand):
"""
"""
name = "remove"
parent = "SSHKeyCommand"
def init_arguments(self):
self.parser.add_argument(
"id",
type=int,
help="Id of the SSH public key",
)
def run(self, args):
from osc import gitea_api
self.print_gitea_settings()
print(f"Removing ssh key with id='{args.id}' ...", file=sys.stderr)
response = gitea_api.SSHKey.get(self.gitea_conn, args.id)
gitea_api.SSHKey.delete(self.gitea_conn, args.id)
print("Removed entry:")
print(gitea_api.SSHKey.to_human_readable_string(response.json()))

View File

@ -128,11 +128,15 @@ class Config:
def add_login(self, login: Login): def add_login(self, login: Login):
data = self._read() data = self._read()
# print("DDD", data)
data.setdefault("logins", []) data.setdefault("logins", [])
for i in data["logins"]:
if i.get("name", None) == login.name: for entry in data["logins"]:
if entry.get("name", None) == login.name:
raise Login.AlreadyExists(login.name) raise Login.AlreadyExists(login.name)
else:
if login.default:
entry.pop("default", None)
data["logins"].append(login.dict()) data["logins"].append(login.dict())
self._write(data) self._write(data)
@ -169,15 +173,19 @@ class Config:
login.token = new_token login.token = new_token
if new_ssh_key is not None: if new_ssh_key is not None:
login.ssh_key = new_ssh_key login.ssh_key = new_ssh_key
if set_as_default:
login.default = True
if not login.has_changed(): if not login.has_changed():
return login return login
data = self._read() data = self._read()
for num, entry in enumerate(data["logins"]): for entry in data["logins"]:
if entry.get("name", None) == name: if entry.get("name", None) == name:
data["logins"][num].update(login.dict()) entry.update(login.dict())
self._write(data) else:
if set_as_default:
entry.pop("default", None)
self._write(data)
return login return login
# TODO: set_as_default

View File

@ -62,3 +62,8 @@ class ForkExists(GiteaException):
def __str__(self): def __str__(self):
result = f"Repo '{self.owner}/{self.repo}' is already forked as '{self.fork_owner}/{self.fork_repo}'" result = f"Repo '{self.owner}/{self.repo}' is already forked as '{self.fork_owner}/{self.fork_repo}'"
return result return result
class InvalidSshPublicKey(oscerr.OscBaseError):
def __str__(self):
return "Invalid public ssh key"

View File

@ -4,10 +4,6 @@ from typing import Optional
from .connection import Connection from .connection import Connection
from .connection import GiteaHTTPResponse from .connection import GiteaHTTPResponse
from .exceptions import BranchDoesNotExist
from .exceptions import BranchExists
from .exceptions import ForkExists
from .exceptions import GiteaException
from .user import User from .user import User

View File

@ -31,6 +31,32 @@ class SSHKey:
import re import re
return re.split(" +", key, maxsplit=2) return re.split(" +", key, maxsplit=2)
@classmethod
def _validate_key_format(cls, key):
"""
Check that the public ssh key has the correct format:
- must be a single line of text
- it is possible to split it into <type> <key> <comment> parts
- the <key> part is base64 encoded
"""
import base64
import binascii
from .exceptions import InvalidSshPublicKey
key = key.strip()
if len(key.splitlines()) != 1:
raise InvalidSshPublicKey()
try:
key_type, key_base64, key_comment = cls._split_key(key)
except ValueError:
raise InvalidSshPublicKey()
try:
base64.b64decode(key_base64)
except binascii.Error:
raise InvalidSshPublicKey()
@classmethod @classmethod
def create(cls, conn: Connection, key: str, title: Optional[str] = None) -> GiteaHTTPResponse: def create(cls, conn: Connection, key: str, title: Optional[str] = None) -> GiteaHTTPResponse:
""" """
@ -42,7 +68,7 @@ class SSHKey:
""" """
url = conn.makeurl("user", "keys") url = conn.makeurl("user", "keys")
# TODO: validate that we're sending a public ssh key cls._validate_key_format(key)
if not title: if not title:
title = cls._split_key(key)[2] title = cls._split_key(key)[2]

View File

@ -59,6 +59,7 @@ osc =
[options.entry_points] [options.entry_points]
console_scripts = console_scripts =
obs-git = osc.commandline_git:main
osc = osc.babysitter:main osc = osc.babysitter:main
[flake8] [flake8]