1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-01-31 00:36:17 +01:00

Merge pull request #1504 from dmach/xmlmodel-Person

Introduce obs_api.Person, migrate some code to it
This commit is contained in:
Daniel Mach 2024-03-05 17:04:04 +01:00 committed by GitHub
commit d40d68f8b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 247 additions and 47 deletions

View File

@ -1,7 +1,7 @@
from . import api
from .common import format_msg_project_package_options
from .common import print_msg
from .. import oscerr
from ..output import print_msg
def add_channels(apiurl, project, package=None, enable_all=False, skip_disabled=False, print_to="debug"):

View File

@ -1,27 +1,4 @@
import sys
def print_msg(*args, print_to="debug"):
from .. import conf
if print_to is None:
return
elif print_to == "debug":
# print a debug message to stderr if config["debug"] is set
if conf.config["debug"]:
print("DEBUG:", *args, file=sys.stderr)
elif print_to == "verbose":
# print a verbose message to stdout if config["verbose"] or config["debug"] is set
if conf.config["verbose"] or conf.config["debug"]:
print(*args)
elif print_to == "stdout":
# print the message to stdout
print(*args)
elif print_to == "stderr":
# print the message to stderr
print(*args, file=sys.stderr)
else:
raise ValueError(f"Invalid value of the 'print_to' option: {print_to}")
from ..output.output import print_msg
def format_msg_project_package_options(

View File

@ -21,6 +21,7 @@ from . import commandline
from . import conf as osc_conf
from . import core as osc_core
from . import oscerr
from . import output
from .OscConfigParser import configparser
from .oscssl import CertVerificationError
from .util.cpio import CpioError
@ -108,8 +109,8 @@ def run(prg, argv=None):
except AttributeError:
body = ''
_private.print_msg(e.hdrs, print_to="debug")
_private.print_msg(body, print_to="debug")
output.print_msg(e.hdrs, print_to="debug")
output.print_msg(body, print_to="debug")
if e.code in [400, 403, 404, 500]:
if b'<summary>' in body:
@ -153,7 +154,7 @@ def run(prg, argv=None):
print(e.message, file=sys.stderr)
except oscerr.OscIOError as e:
print(e.msg, file=sys.stderr)
_private.print_msg(e.e, print_to="debug")
output.print_msg(e.e, print_to="debug")
except (oscerr.WrongOptions, oscerr.WrongArgs) as e:
print(e, file=sys.stderr)
return 2

View File

@ -33,6 +33,7 @@ from . import commands as osc_commands
from . import conf
from . import git_scm
from . import oscerr
from . import output
from . import store as osc_store
from .core import *
from .grabber import OscFileGrabber
@ -269,7 +270,7 @@ class MainCommand(Command):
def load_commands(self):
if IN_VENV:
_private.print_msg("Running in virtual environment, skipping loading plugins installed outside the virtual environment.", print_to="stderr")
output.print_msg("Running in virtual environment, skipping loading plugins installed outside the virtual environment.", print_to="stderr")
for module_prefix, module_path in self.MODULES:
module_path = os.path.expanduser(module_path)
@ -4315,7 +4316,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
except:
print('Error while checkout package:\n', package, file=sys.stderr)
_private.print_msg('Note: You can use "osc delete" or "osc submitpac" when done.\n', print_to="verbose")
output.print_msg('Note: You can use "osc delete" or "osc submitpac" when done.\n', print_to="verbose")
@cmdln.alias('branchco')
@cmdln.alias('bco')
@ -4466,7 +4467,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
if opts.checkout:
checkout_package(apiurl, targetprj, package, server_service_files=False,
expand_link=True, prj_dir=Path(targetprj))
_private.print_msg('Note: You can use "osc delete" or "osc submitpac" when done.\n', print_to="verbose")
output.print_msg('Note: You can use "osc delete" or "osc submitpac" when done.\n', print_to="verbose")
else:
apiopt = ''
if conf.get_configParser().get("general", "apiurl", fallback=None) != apiurl:
@ -8888,7 +8889,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
@cmdln.option('-S', '--set-bugowner-request', metavar='user',
help='Set the bugowner to specified person via a request (or group via group: prefix)')
@cmdln.option('-U', '--user', metavar='USER',
help='All official maintained instances for the specified USER')
help='All official maintained instances for the specified USER (specified by the username or email)')
@cmdln.option('-G', '--group', metavar='GROUP',
help='All official maintained instances for the specified GROUP')
@cmdln.option('-d', '--delete', metavar='user',
@ -9012,6 +9013,12 @@ Please submit there instead, or use --nodevelproject to force direct submission.
if repl.lower() != 'y':
searchresult = None
elif opts.user:
if "@" in opts.user:
# resolve email address to login
from . import obs_api
users = obs_api.Person.search(apiurl, email=opts.user)
if users:
opts.user = users[0].login
searchresult = owner(apiurl, opts.user, "user", usefilter=filterroles, devel=None)
elif opts.group:
searchresult = owner(apiurl, opts.group, "group", usefilter=filterroles, devel=None)
@ -9175,16 +9182,26 @@ Please submit there instead, or use --nodevelproject to force direct submission.
"""
Show fullname and email of a buildservice user
"""
from . import obs_api
from .output import print_msg
apiurl = self.get_api_url()
usernames = opts.user
if not usernames:
usernames = [conf.config["api_host_options"][apiurl]["user"]]
for name in usernames:
user = get_user_data(apiurl, name, 'login', 'realname', 'email')
if len(user) == 3:
print(f"{user[0]}: \"{user[1]}\" <{user[2]}>")
# remove duplicates
usernames = list(set(usernames))
users = obs_api.Person.search(apiurl, login=usernames)
users_by_login = {i.login: i for i in users}
for username in usernames:
user = users_by_login.get(username, None)
if not user:
print_msg(f"User '{username}' does not exist", print_to="warning")
continue
print(f'{user.login}: "{user.realname}" <{user.email}>')
@cmdln.name("create-pbuild-config")
@cmdln.alias('cpc')
@ -10119,7 +10136,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
def _load_plugins(self):
if IN_VENV:
_private.print_msg("Running in virtual environment, skipping loading legacy plugins.", print_to="stderr")
output.print_msg("Running in virtual environment, skipping loading legacy plugins.", print_to="stderr")
return
plugin_dirs = [

View File

@ -21,10 +21,10 @@ import urllib3.response
import urllib3.util
from . import __version__
from . import _private
from . import conf
from . import oscerr
from . import oscssl
from . import output
from .util.helper import decode_it
@ -690,7 +690,7 @@ class SignatureAuthHandler(AuthHandlerBase):
return False
if not self.ssh_keygen_path:
_private.print_msg("Skipping signature auth because ssh-keygen is not available", print_to="debug")
output.print_msg("Skipping signature auth because ssh-keygen is not available", print_to="debug")
return False
if not self.sshkey_known():

View File

@ -50,6 +50,7 @@ from . import _private
from . import conf
from . import meter
from . import oscerr
from . import output
from . import store as osc_store
from .connection import http_request, http_GET, http_POST, http_PUT, http_DELETE
from .store import Store
@ -524,7 +525,7 @@ class Serviceinfo:
raise oscerr.PackageNotInstalled(f"obs-service-{cmd[0]}")
cmd[0] = "/usr/lib/obs/service/" + cmd[0]
cmd = cmd + ["--outdir", temp_dir]
_private.print_msg("Run source service:", " ".join(cmd), print_to="verbose")
output.print_msg("Run source service:", " ".join(cmd), print_to="verbose")
r = run_external(*cmd)
if r != 0:
@ -3893,7 +3894,7 @@ def set_devel_project(apiurl, prj, pac, devprj=None, devpac=None, print_to="debu
devprj,
devpac,
)
_private.print_msg(msg, print_to=print_to)
output.print_msg(msg, print_to=print_to)
package_obj = obs_api.Package.from_api(apiurl, prj, pac)
@ -4811,7 +4812,7 @@ def get_review_list(
xpath_base = xpath_join(xpath_base, 'action/source/@%(kind)s=\'%(val)s\'', op='or', inner=True)
xpath = xpath_join(xpath, xpath_base % {'kind': kind, 'val': val}, op='and', nexpr_parentheses=True)
_private.print_msg(f"[ {xpath} ]", print_to="debug")
output.print_msg(f"[ {xpath} ]", print_to="debug")
res = search(apiurl, request=xpath)
collection = res['request']
requests = []
@ -4952,7 +4953,7 @@ def get_exact_request_list(
if req_type:
xpath += f" and action/@type='{req_type}'"
_private.print_msg(f"[ {xpath} ]", print_to="debug")
output.print_msg(f"[ {xpath} ]", print_to="debug")
res = search(apiurl, request=xpath)
collection = res['request']
@ -5625,7 +5626,7 @@ def checkout_package(
oldproj = None
if conf.config['checkout_rooted']:
if prj_dir.stem == '/':
_private.print_msg(f"checkout_rooted ignored for {prj_dir}", print_to="verbose")
output.print_msg(f"checkout_rooted ignored for {prj_dir}", print_to="verbose")
# ?? should we complain if not is_project_dir(prj_dir) ??
else:
# if we are inside a project or package dir, ascend to parent
@ -5652,7 +5653,7 @@ def checkout_package(
root_dots = root_dots / ("../" * n)
if str(root_dots) != '.':
_private.print_msg(f"{prj_dir} is project dir of {oldproj}. Root found at {os.path.abspath(root_dots)}", print_to="verbose")
output.print_msg(f"{prj_dir} is project dir of {oldproj}. Root found at {os.path.abspath(root_dots)}", print_to="verbose")
prj_dir = root_dots / prj_dir
if not pathname:

View File

@ -1,4 +1,5 @@
from .package import Package
from .package_sources import PackageSources
from .person import Person
from .project import Project
from .request import Request

View File

@ -7,6 +7,11 @@ class BlockModes(str, Enum):
NEVER = "never"
class BoolString(str, Enum):
TRUE = "true"
FALSE = "false"
class BuildArch(str, Enum):
NOARCH = "noarch"
AARCH64 = "aarch64"

71
osc/obs_api/person.py Normal file
View File

@ -0,0 +1,71 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
from .enums import BoolString
from .person_owner import PersonOwner
from .person_watchlist import PersonWatchlist
class Person(XmlModel):
XML_TAG = "person"
login: str = Field(
)
email: Optional[str] = Field(
)
realname: Optional[str] = Field(
)
owner: Optional[PersonOwner] = Field(
)
state: Optional[str] = Field(
)
globalrole_list: Optional[List[str]] = Field(
xml_name="globalrole",
)
watchlist: Optional[PersonWatchlist] = Field(
)
ignore_auth_services: Optional[BoolString] = Field(
)
@classmethod
def from_api(cls, apiurl: str, username: str):
url_path = ["person", username]
url_query = {}
response = cls.xml_request("GET", apiurl, url_path, url_query)
return cls.from_file(response, apiurl=apiurl)
@classmethod
def search(
cls,
apiurl: str,
login: Optional[str] = None,
email: Optional[str] = None,
realname: Optional[str] = None,
state: Optional[str] = None,
**kwargs,
) -> List["Person"]:
from xml.etree import ElementTree as ET
from ..util.xpath import XPathQuery as Q
url_path = ["search", "person"]
url_query = {
"match": Q(
login=login,
email=email,
realname=realname,
state=state,
**kwargs,
),
}
response = cls.xml_request("GET", apiurl, url_path, url_query)
root = ET.parse(response).getroot()
assert root.tag == "collection"
result = []
for node in root:
result.append(cls.from_xml(node, apiurl=apiurl))
return result

View File

@ -0,0 +1,9 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
class PersonOwner(XmlModel):
XML_TAG = "owner"
userid: str = Field(
xml_attribute=True,
)

View File

@ -0,0 +1,20 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
from .person_watchlist_package import PersonWatchlistPackage
from .person_watchlist_project import PersonWatchlistProject
from .person_watchlist_request import PersonWatchlistRequest
class PersonWatchlist(XmlModel):
XML_TAG = "watchlist"
project_list: Optional[List[PersonWatchlistProject]] = Field(
xml_name="project",
)
package_list: Optional[List[PersonWatchlistPackage]] = Field(
xml_name="package",
)
request_list: Optional[List[PersonWatchlistRequest]] = Field(
xml_name="request",
)

View File

@ -0,0 +1,13 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
class PersonWatchlistPackage(XmlModel):
XML_TAG = "package"
name: str = Field(
xml_attribute=True,
)
project: str = Field(
xml_attribute=True,
)

View File

@ -0,0 +1,9 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
class PersonWatchlistProject(XmlModel):
XML_TAG = "project"
name: str = Field(
xml_attribute=True,
)

View File

@ -0,0 +1,9 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
class PersonWatchlistRequest(XmlModel):
XML_TAG = "request"
number: str = Field(
xml_attribute=True,
)

View File

@ -1,5 +1,6 @@
from .key_value_table import KeyValueTable
from .input import get_user_input
from .output import print_msg
from .tty import colorize
from .widechar import wc_ljust
from .widechar import wc_width

41
osc/output/output.py Normal file
View File

@ -0,0 +1,41 @@
import sys
from typing import Optional
from . import tty
def print_msg(*args, print_to: Optional[str] = "debug"):
"""
Print ``*args`` to the ``print_to`` target:
- None: print nothing
- debug: print() to stderr with "DEBUG:" prefix if config["debug"] is set
- verbose: print() to stdout if config["verbose"] or config["debug"] is set
- error: print() to stderr with red "ERROR:" prefix
- warning: print() to stderr with yellow "WARNING:" prefix
- stdout: print() to stdout
- stderr: print() to stderr
"""
from .. import conf
if print_to is None:
return
elif print_to == "debug":
# print a debug message to stderr if config["debug"] is set
if conf.config["debug"]:
print("DEBUG:", *args, file=sys.stderr)
elif print_to == "verbose":
# print a verbose message to stdout if config["verbose"] or config["debug"] is set
if conf.config["verbose"] or conf.config["debug"]:
print(*args)
elif print_to == "error":
print(tty.colorize("ERROR:", "red,bold"), *args, file=sys.stderr)
elif print_to == "warning":
print(tty.colorize("WARNING:", "yellow,bold"), *args, file=sys.stderr)
elif print_to == "stdout":
# print the message to stdout
print(*args)
elif print_to == "stderr":
# print the message to stderr
print(*args, file=sys.stderr)
else:
raise ValueError(f"Invalid value of the 'print_to' option: {print_to}")

View File

@ -443,7 +443,7 @@ class BaseModel(metaclass=ModelMeta):
value = getattr(self, name)
if value is not None and field.is_model:
result[name] = value.dict()
if value is not None and field.is_model_list:
elif value is not None and field.is_model_list:
result[name] = [i.dict() for i in value]
else:
result[name] = value

View File

@ -3,8 +3,9 @@ import io
import unittest
import osc.conf
from osc._private import print_msg
from osc.output import KeyValueTable
from osc.output import print_msg
from osc.output import tty
class TestKeyValueTable(unittest.TestCase):
@ -118,6 +119,22 @@ class TestPrintMsg(unittest.TestCase):
self.assertEqual("foo bar\n", stdout.getvalue())
self.assertEqual("", stderr.getvalue())
def test_error(self):
stdout = io.StringIO()
stderr = io.StringIO()
with contextlib.redirect_stdout(stdout), contextlib.redirect_stderr(stderr):
print_msg("foo", "bar", print_to="error")
self.assertEqual("", stdout.getvalue())
self.assertEqual(f"{tty.colorize('ERROR:', 'red,bold')} foo bar\n", stderr.getvalue())
def test_warning(self):
stdout = io.StringIO()
stderr = io.StringIO()
with contextlib.redirect_stdout(stdout), contextlib.redirect_stderr(stderr):
print_msg("foo", "bar", print_to="warning")
self.assertEqual("", stdout.getvalue())
self.assertEqual(f"{tty.colorize('WARNING:', 'yellow,bold')} foo bar\n", stderr.getvalue())
def test_none(self):
stdout = io.StringIO()
stderr = io.StringIO()
@ -134,6 +151,14 @@ class TestPrintMsg(unittest.TestCase):
self.assertEqual("foo bar\n", stdout.getvalue())
self.assertEqual("", stderr.getvalue())
def test_stderr(self):
stdout = io.StringIO()
stderr = io.StringIO()
with contextlib.redirect_stdout(stdout), contextlib.redirect_stderr(stderr):
print_msg("foo", "bar", print_to="stderr")
self.assertEqual("", stdout.getvalue())
self.assertEqual("foo bar\n", stderr.getvalue())
if __name__ == "__main__":
unittest.main()