mirror of
https://github.com/openSUSE/osc.git
synced 2025-09-07 05:38:43 +02:00
commandline: New class-based commands
This is based on a prototype we've worked on together with Laurin Fäller <laurin.faeller@suse.com>.
This commit is contained in:
@@ -57,7 +57,8 @@ def run(prg, argv=None):
|
||||
if "--debugger" in (argv or sys.argv[1:]):
|
||||
pdb.set_trace()
|
||||
# here we actually run the program
|
||||
return prg.main(argv)
|
||||
prg.main(argv)
|
||||
return 0
|
||||
except:
|
||||
# If any of these was set via the command-line options,
|
||||
# the config values are expected to be changed accordingly.
|
||||
@@ -207,6 +208,6 @@ def main():
|
||||
sys.stdout = os.fdopen(sys.stdout.fileno(), sys.stdout.mode, 1)
|
||||
sys.stderr = os.fdopen(sys.stderr.fileno(), sys.stderr.mode, 1)
|
||||
|
||||
sys.exit(run(commandline.Osc()))
|
||||
sys.exit(run(commandline.OscMainCommand()))
|
||||
|
||||
# vim: sw=4 et
|
||||
|
@@ -6,9 +6,11 @@
|
||||
import argparse
|
||||
import getpass
|
||||
import glob
|
||||
import importlib
|
||||
import importlib.util
|
||||
import inspect
|
||||
import os
|
||||
import pkgutil
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
@@ -26,6 +28,7 @@ from urllib.error import HTTPError
|
||||
from . import _private
|
||||
from . import build as osc_build
|
||||
from . import cmdln
|
||||
from . import commands as osc_commands
|
||||
from . import conf
|
||||
from . import oscerr
|
||||
from . import store as osc_store
|
||||
@@ -36,6 +39,489 @@ from .util import cpio, rpmquery, safewriter
|
||||
from .util.helper import _html_escape, format_table
|
||||
|
||||
|
||||
class Command:
|
||||
#: Name of the command as used in the argument parser.
|
||||
name: str = None
|
||||
|
||||
#: Optional aliases to the command.
|
||||
aliases: List[str] = []
|
||||
|
||||
#: Whether the command is hidden from help.
|
||||
#: Defaults to ``False``.
|
||||
hidden: bool = False
|
||||
|
||||
#: Name of the parent command class.
|
||||
#: Can be prefixed if the parent comes from a different location,
|
||||
#: for example ``osc.commands.<ClassName>`` when extending osc command with a plugin.
|
||||
#: See ``OscMainCommand.MODULES`` for available prefixes.
|
||||
parent: str = None
|
||||
|
||||
def __init__(self, full_name, parent=None):
|
||||
self.full_name = full_name
|
||||
self.parent = parent
|
||||
self.subparsers = None
|
||||
|
||||
if not self.name:
|
||||
raise ValueError(f"Command '{self.full_name}' has no 'name' set")
|
||||
|
||||
if parent:
|
||||
self.parser = self.parent.subparsers.add_parser(
|
||||
self.name,
|
||||
aliases=self.aliases,
|
||||
help=self.get_help(),
|
||||
description=self.get_description(),
|
||||
formatter_class=cmdln.HelpFormatter,
|
||||
conflict_handler="resolve",
|
||||
prog=f"{self.main_command.name} [global opts] {self.name}",
|
||||
)
|
||||
self.parser.set_defaults(_selected_command=self)
|
||||
else:
|
||||
self.parser = argparse.ArgumentParser(
|
||||
prog=self.name,
|
||||
description=self.get_description(),
|
||||
formatter_class=cmdln.HelpFormatter,
|
||||
usage="%(prog)s [global opts] <command> [--help] [opts] [args]",
|
||||
)
|
||||
|
||||
# traverse the parent commands and add their options to the current command
|
||||
cmd = self
|
||||
while cmd:
|
||||
cmd.init_arguments()
|
||||
cmd = cmd.parent
|
||||
|
||||
def __repr__(self):
|
||||
return f"<osc plugin {self.full_name} at {self.__hash__():#x}>"
|
||||
|
||||
def get_help(self):
|
||||
"""
|
||||
Return the help text of the command.
|
||||
The first line of the docstring is returned by default.
|
||||
"""
|
||||
if self.hidden:
|
||||
return argparse.SUPPRESS
|
||||
|
||||
if not self.__doc__:
|
||||
return ""
|
||||
|
||||
help_lines = self.__doc__.strip().splitlines()
|
||||
|
||||
if not help_lines:
|
||||
return ""
|
||||
|
||||
return help_lines[0]
|
||||
|
||||
def get_description(self):
|
||||
"""
|
||||
Return the description of the command.
|
||||
The docstring without the first line is returned by default.
|
||||
"""
|
||||
if not self.__doc__:
|
||||
return ""
|
||||
|
||||
help_lines = self.__doc__.strip().splitlines()
|
||||
|
||||
if not help_lines:
|
||||
return ""
|
||||
|
||||
# skip the first line that contains help text
|
||||
help_lines.pop(0)
|
||||
|
||||
# remove any leading empty lines
|
||||
while help_lines and not help_lines[0]:
|
||||
help_lines.pop(0)
|
||||
|
||||
result = "\n".join(help_lines)
|
||||
result = textwrap.dedent(result)
|
||||
return result
|
||||
|
||||
@property
|
||||
def main_command(self):
|
||||
"""
|
||||
Return reference to the main command that represents the executable
|
||||
and contains the main instance of ArgumentParser.
|
||||
"""
|
||||
if not self.parent:
|
||||
return self
|
||||
return self.parent.main_command
|
||||
|
||||
def add_argument(self, *args, **kwargs):
|
||||
"""
|
||||
Add a new argument to the command's argument parser.
|
||||
See `argparse <https://docs.python.org/3/library/argparse.html>`_ documentation for allowed parameters.
|
||||
"""
|
||||
cmd = self
|
||||
|
||||
# Let's inspect if the caller was init_arguments() method.
|
||||
# In such case use the "parser" argument if specified.
|
||||
frame_1 = inspect.currentframe().f_back
|
||||
frame_1_info = inspect.getframeinfo(frame_1)
|
||||
frame_2 = frame_1.f_back
|
||||
frame_2_info = inspect.getframeinfo(frame_2)
|
||||
if (frame_1_info.function, frame_2_info.function) == ("init_arguments", "__init__"):
|
||||
# this method was called from init_arguments() that was called from __init__
|
||||
# let's extract the command class from the 2nd frame and ad arguments there
|
||||
cmd = frame_2.f_locals["self"]
|
||||
|
||||
# suppress global options from command help
|
||||
if cmd != self and not self.parent:
|
||||
kwargs["help"] = argparse.SUPPRESS
|
||||
|
||||
# We're adding hidden options from parent commands to their subcommands to allow
|
||||
# option intermixing. For all such added hidden options we need to suppress their
|
||||
# defaults because they would override any option set in the parent command.
|
||||
if cmd != self:
|
||||
kwargs["default"] = argparse.SUPPRESS
|
||||
|
||||
cmd.parser.add_argument(*args, **kwargs)
|
||||
|
||||
def init_arguments(self):
|
||||
"""
|
||||
Override to add arguments to the argument parser.
|
||||
|
||||
.. note::
|
||||
Make sure you're adding arguments only by calling ``self.add_argument()``.
|
||||
Using ``self.parser.add_argument()`` directly is not recommended
|
||||
because it disables argument intermixing.
|
||||
"""
|
||||
|
||||
def run(self, args):
|
||||
"""
|
||||
Override to implement the command functionality.
|
||||
|
||||
.. note::
|
||||
``args.positional_args`` is a list containing any unknown (unparsed) positional arguments.
|
||||
|
||||
.. note::
|
||||
Consider moving any reusable code into a library,
|
||||
leaving the command-line code only a thin wrapper on top of it.
|
||||
|
||||
If the code is generic enough, it should be added to osc directly.
|
||||
In such case don't hesitate to open an `issue <https://github.com/openSUSE/osc/issues>`_.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def register(self, command_class, command_full_name):
|
||||
if not self.subparsers:
|
||||
# instantiate subparsers on first use
|
||||
self.subparsers = self.parser.add_subparsers(dest="command", title="commands")
|
||||
|
||||
# Check for parser conflicts.
|
||||
# This is how Python 3.11+ behaves by default.
|
||||
if command_class.name in self.subparsers._name_parser_map:
|
||||
raise argparse.ArgumentError(self.subparsers, f"conflicting subparser: {command_class.name}")
|
||||
for alias in command_class.aliases:
|
||||
if alias in self.subparsers._name_parser_map:
|
||||
raise argparse.ArgumentError(self.subparsers, f"conflicting subparser alias: {alias}")
|
||||
|
||||
command = command_class(command_full_name, parent=self)
|
||||
return command
|
||||
|
||||
|
||||
class MainCommand(Command):
|
||||
MODULES = ()
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(self.__class__.__name__)
|
||||
self.command_classes = {}
|
||||
self.download_progress = None
|
||||
|
||||
def post_parse_args(self, args):
|
||||
pass
|
||||
|
||||
def run(self, args):
|
||||
cmd = getattr(args, "_selected_command", None)
|
||||
if not cmd:
|
||||
self.parser.error("Please specify a command")
|
||||
self.post_parse_args(args)
|
||||
cmd.run(args)
|
||||
|
||||
def load_command(self, cls, module_prefix):
|
||||
mod_cls_name = f"{module_prefix}.{cls.__name__}"
|
||||
parent_name = getattr(cls, "parent", None)
|
||||
if parent_name:
|
||||
# allow relative references to classes in the the same module/directory
|
||||
if "." not in parent_name:
|
||||
parent_name = f"{module_prefix}.{parent_name}"
|
||||
try:
|
||||
parent = self.main_command.command_classes[parent_name]
|
||||
except KeyError:
|
||||
msg = f"Failed to load command class '{mod_cls_name}' because it references parent '{parent_name}' that doesn't exist"
|
||||
print(msg, file=sys.stderr)
|
||||
return None
|
||||
cmd = parent.register(cls, mod_cls_name)
|
||||
else:
|
||||
cmd = self.main_command.register(cls, mod_cls_name)
|
||||
|
||||
cmd.full_name = mod_cls_name
|
||||
self.main_command.command_classes[mod_cls_name] = cmd
|
||||
return cmd
|
||||
|
||||
def load_commands(self):
|
||||
for module_prefix, module_path in self.MODULES:
|
||||
module_path = os.path.expanduser(module_path)
|
||||
for loader, module_name, _ in pkgutil.walk_packages(path=[module_path]):
|
||||
full_name = f"{module_prefix}.{module_name}"
|
||||
spec = loader.find_spec(full_name)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
try:
|
||||
spec.loader.exec_module(mod)
|
||||
except Exception as e: # pylint: disable=broad-except
|
||||
msg = f"Failed to load commands from module '{full_name}': {e}"
|
||||
print(msg, file=sys.stderr)
|
||||
continue
|
||||
for name in dir(mod):
|
||||
if name.startswith("_"):
|
||||
continue
|
||||
cls = getattr(mod, name)
|
||||
if not inspect.isclass(cls):
|
||||
continue
|
||||
if not issubclass(cls, Command):
|
||||
continue
|
||||
if cls.__module__ != full_name:
|
||||
# skip classes that weren't defined directly in the loaded plugin module
|
||||
continue
|
||||
self.load_command(cls, module_prefix)
|
||||
|
||||
def parse_args(self, *args, **kwargs):
|
||||
namespace, unknown_args = self.parser.parse_known_args(*args, **kwargs)
|
||||
|
||||
unrecognized = [i for i in unknown_args if i.startswith("-")]
|
||||
if unrecognized:
|
||||
self.parser.error(f"unrecognized arguments: " + " ".join(unrecognized))
|
||||
|
||||
namespace.positional_args = list(unknown_args)
|
||||
return namespace
|
||||
|
||||
|
||||
class OscCommand(Command):
|
||||
"""
|
||||
Inherit from this class to create new commands.
|
||||
|
||||
The first line of the docstring becomes the help text,
|
||||
the remaining lines become the command description.
|
||||
"""
|
||||
|
||||
|
||||
class OscMainCommand(MainCommand):
|
||||
name = "osc"
|
||||
|
||||
MODULES = (
|
||||
("osc.commands", osc_commands.__path__[0]),
|
||||
("osc.commands.usr_lib", "/usr/lib/osc-plugins"),
|
||||
("osc.commands.usr_local_lib", "/usr/local/lib/osc-plugins"),
|
||||
("osc.commands.home_local_lib", "~/.local/lib/osc-plugins"),
|
||||
("osc.commands.home", "~/.osc-plugins"),
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.args = None
|
||||
self.download_progress = None
|
||||
|
||||
def init_arguments(self):
|
||||
self.add_argument(
|
||||
"-v",
|
||||
"--verbose",
|
||||
action="store_true",
|
||||
help="increase verbosity",
|
||||
)
|
||||
self.add_argument(
|
||||
"-q",
|
||||
"--quiet",
|
||||
action="store_true",
|
||||
help="be quiet, not verbose",
|
||||
)
|
||||
self.add_argument(
|
||||
"--debug",
|
||||
action="store_true",
|
||||
help="print info useful for debugging",
|
||||
)
|
||||
self.add_argument(
|
||||
"--debugger",
|
||||
action="store_true",
|
||||
help="jump into the debugger before executing anything",
|
||||
)
|
||||
self.add_argument(
|
||||
"--post-mortem",
|
||||
action="store_true",
|
||||
help="jump into the debugger in case of errors",
|
||||
)
|
||||
self.add_argument(
|
||||
"--traceback",
|
||||
action="store_true",
|
||||
help="print call trace in case of errors",
|
||||
)
|
||||
self.add_argument(
|
||||
"-H",
|
||||
"--http-debug",
|
||||
action="store_true",
|
||||
help="debug HTTP traffic (filters some headers)",
|
||||
)
|
||||
self.add_argument(
|
||||
"--http-full-debug",
|
||||
action="store_true",
|
||||
help="debug HTTP traffic (filters no headers)",
|
||||
)
|
||||
self.add_argument(
|
||||
"-A",
|
||||
"--apiurl",
|
||||
metavar="URL",
|
||||
help="Open Build Service API URL or a configured alias",
|
||||
)
|
||||
self.add_argument(
|
||||
"--config",
|
||||
dest="conffile",
|
||||
metavar="FILE",
|
||||
help="specify alternate configuration file",
|
||||
)
|
||||
self.add_argument(
|
||||
"--no-keyring",
|
||||
action="store_true",
|
||||
help="disable usage of desktop keyring system",
|
||||
)
|
||||
|
||||
def post_parse_args(self, args):
|
||||
# apiurl hasn't been specified by the user
|
||||
# we need to set it here because the 'default' option of an argument doesn't support lazy evaluation
|
||||
if args.apiurl is None:
|
||||
try:
|
||||
# try reading the apiurl from the working copy
|
||||
args.apiurl = osc_store.Store(Path.cwd()).apiurl
|
||||
except oscerr.NoWorkingCopy:
|
||||
# use the default apiurl from conf (if it was configured already)
|
||||
args.apiurl = conf.config["apiurl"]
|
||||
|
||||
if not args.apiurl:
|
||||
self.parser.error("Could not determine apiurl, use -A/--apiurl to specify one")
|
||||
|
||||
conf.get_config(
|
||||
override_apiurl=args.apiurl,
|
||||
override_conffile=args.conffile,
|
||||
override_debug=args.debug,
|
||||
override_http_debug=args.http_debug,
|
||||
override_http_full_debug=args.http_full_debug,
|
||||
override_no_keyring=args.no_keyring,
|
||||
override_post_mortem=args.post_mortem,
|
||||
override_traceback=args.traceback,
|
||||
override_verbose=args.verbose,
|
||||
)
|
||||
|
||||
# write config values back to args
|
||||
# this is crucial mainly for apiurl to resolve an alias to full url
|
||||
for i in ["apiurl", "debug", "http_debug", "http_full_debug", "post_mortem", "traceback", "verbose"]:
|
||||
setattr(args, i, conf.config[i])
|
||||
args.no_keyring = not conf.config["use_keyring"]
|
||||
|
||||
if conf.config["show_download_progress"]:
|
||||
self.download_progress = create_text_meter()
|
||||
|
||||
# needed for LegacyOsc class
|
||||
self.args = args
|
||||
|
||||
def _wrap_legacy_command(self, func_):
|
||||
class LegacyCommandWrapper(Command):
|
||||
func = func_
|
||||
__doc__ = getattr(func_, "__doc__", "")
|
||||
aliases = getattr(func_, "aliases", [])
|
||||
hidden = getattr(func_, "hidden", False)
|
||||
name = getattr(func_, "name", func_.__name__[3:])
|
||||
|
||||
def __repr__(self):
|
||||
result = super().__repr__()
|
||||
result += f"({self.func.__name__})"
|
||||
return result
|
||||
|
||||
def init_arguments(self):
|
||||
options = getattr(self.func, "options", [])
|
||||
for option_args, option_kwargs in options:
|
||||
self.add_argument(*option_args, **option_kwargs)
|
||||
|
||||
def run(self, args):
|
||||
sig = inspect.signature(self.func)
|
||||
arg_names = list(sig.parameters.keys())
|
||||
if arg_names == ["subcmd", "opts"]:
|
||||
# handler doesn't take positional args via *args
|
||||
if args.positional_args:
|
||||
self.parser.error(f"unrecognized arguments: " + " ".join(args.positional_args))
|
||||
self.func(args.command, args)
|
||||
else:
|
||||
# handler takes positional args via *args
|
||||
self.func(args.command, args, *args.positional_args)
|
||||
|
||||
return LegacyCommandWrapper
|
||||
|
||||
def load_legacy_commands(self):
|
||||
# lazy links of attributes that would normally be initialized in the instance of Osc class
|
||||
class LegacyOsc(Osc): # pylint: disable=used-before-assignment
|
||||
# pylint: disable=no-self-argument
|
||||
@property
|
||||
def argparser(self_):
|
||||
return self.parser
|
||||
|
||||
# pylint: disable=no-self-argument
|
||||
@property
|
||||
def download_progress(self_):
|
||||
return self.download_progress
|
||||
|
||||
# pylint: disable=no-self-argument
|
||||
@property
|
||||
def options(self_):
|
||||
return self.args
|
||||
|
||||
# pylint: disable=no-self-argument
|
||||
@options.setter
|
||||
def options(self_, value):
|
||||
pass
|
||||
|
||||
# pylint: disable=no-self-argument
|
||||
@property
|
||||
def subparsers(self_):
|
||||
return self.subparsers
|
||||
|
||||
osc_instance = LegacyOsc()
|
||||
|
||||
for name in dir(osc_instance):
|
||||
if not name.startswith("do_"):
|
||||
continue
|
||||
|
||||
func = getattr(osc_instance, name)
|
||||
|
||||
if not inspect.ismethod(func) and not inspect.isfunction(func):
|
||||
continue
|
||||
|
||||
cls = self._wrap_legacy_command(func)
|
||||
self.load_command(cls, "osc.commands.old")
|
||||
|
||||
@classmethod
|
||||
def main(cls, argv=None, run=True):
|
||||
"""
|
||||
Initialize OscMainCommand, load all commands and run the selected command.
|
||||
"""
|
||||
cmd = cls()
|
||||
cmd.load_commands()
|
||||
cmd.load_legacy_commands()
|
||||
if run:
|
||||
args = cmd.parse_args(args=argv)
|
||||
cmd.run(args)
|
||||
else:
|
||||
args = None
|
||||
return cmd, args
|
||||
|
||||
|
||||
def get_parser():
|
||||
"""
|
||||
Needed by argparse-manpage to generate man pages from the argument parser.
|
||||
"""
|
||||
main, _ = OscMainCommand.main(run=False)
|
||||
return main.parser
|
||||
|
||||
|
||||
# ================================================================================
|
||||
# The legacy code follows.
|
||||
# Please do not use it if possible.
|
||||
# ================================================================================
|
||||
|
||||
|
||||
HELP_MULTIBUILD_MANY = """Only work with the specified flavors of a multibuild package.
|
||||
Globs are resolved according to _multibuild file from server.
|
||||
Empty string is resolved to a package without a flavor."""
|
||||
@@ -43,12 +529,6 @@ Empty string is resolved to a package without a flavor."""
|
||||
HELP_MULTIBUILD_ONE = "Only work with the specified flavor of a multibuild package."
|
||||
|
||||
|
||||
def get_parser():
|
||||
osc = Osc()
|
||||
osc.create_argparser()
|
||||
return osc.argparser
|
||||
|
||||
|
||||
def pop_args(
|
||||
args,
|
||||
arg1_name: str = None,
|
||||
@@ -435,7 +915,6 @@ class Osc(cmdln.Cmdln):
|
||||
* http://en.opensuse.org/openSUSE:OSC_plugins
|
||||
"""
|
||||
name = 'osc'
|
||||
conf = None
|
||||
|
||||
def __init__(self):
|
||||
self.options = None
|
||||
|
0
osc/commands/__init__.py
Normal file
0
osc/commands/__init__.py
Normal file
Reference in New Issue
Block a user