mirror of
https://github.com/openSUSE/osc.git
synced 2024-11-11 07:06:16 +01:00
Merge pull request #1271 from dmach/meta-attribute-add
meta attribute: Add --add option to append values to the existing list
This commit is contained in:
commit
3828cec5c2
@ -4,6 +4,10 @@ and work with related XML data.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
import xml.sax.saxutils
|
||||||
|
from xml.etree import ElementTree as ET
|
||||||
|
|
||||||
|
|
||||||
def get(apiurl, path, query=None):
|
def get(apiurl, path, query=None):
|
||||||
"""
|
"""
|
||||||
Send a GET request to OBS.
|
Send a GET request to OBS.
|
||||||
@ -28,7 +32,7 @@ def get(apiurl, path, query=None):
|
|||||||
|
|
||||||
url = osc_core.makeurl(apiurl, path, query)
|
url = osc_core.makeurl(apiurl, path, query)
|
||||||
with osc_connection.http_GET(url) as f:
|
with osc_connection.http_GET(url) as f:
|
||||||
root = osc_core.ET.parse(f).getroot()
|
root = ET.parse(f).getroot()
|
||||||
return root
|
return root
|
||||||
|
|
||||||
|
|
||||||
@ -56,11 +60,44 @@ def post(apiurl, path, query=None):
|
|||||||
|
|
||||||
url = osc_core.makeurl(apiurl, path, query)
|
url = osc_core.makeurl(apiurl, path, query)
|
||||||
with osc_connection.http_POST(url) as f:
|
with osc_connection.http_POST(url) as f:
|
||||||
root = osc_core.ET.parse(f).getroot()
|
root = ET.parse(f).getroot()
|
||||||
return root
|
return root
|
||||||
|
|
||||||
|
|
||||||
def find_nodes(root, root_name, node_name):
|
def _to_xpath(*args):
|
||||||
|
"""
|
||||||
|
Convert strings and dictionaries to xpath:
|
||||||
|
string gets translated to a node name
|
||||||
|
dictionary gets translated to [@key='value'] predicate
|
||||||
|
|
||||||
|
All values are properly escaped.
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
args: ["directory", "entry", {"name": "osc"}]
|
||||||
|
result: "directory/entry[@name='osc']"
|
||||||
|
|
||||||
|
args: ["attributes", "attribute", {"namespace": "OBS", "name": "BranchSkipRepositories"}, "value"]
|
||||||
|
result: "attributes/attribute[@namespace='OBS'][@name='BranchSkipRepositories']/value"
|
||||||
|
"""
|
||||||
|
xpath = ""
|
||||||
|
for arg in args:
|
||||||
|
if isinstance(arg, str):
|
||||||
|
arg = xml.sax.saxutils.escape(arg)
|
||||||
|
xpath += f"/{arg}"
|
||||||
|
elif isinstance(arg, dict):
|
||||||
|
for key, value in arg.items():
|
||||||
|
key = xml.sax.saxutils.escape(key)
|
||||||
|
value = xml.sax.saxutils.escape(value)
|
||||||
|
xpath += f"[@{key}='{value}']"
|
||||||
|
else:
|
||||||
|
raise TypeError(f"Argument '{arg}' has invalid type '{type(arg).__name__}'. Expected types: str, dict")
|
||||||
|
|
||||||
|
# strip the leading slash because we're making a relative search
|
||||||
|
xpath = xpath.lstrip("/")
|
||||||
|
return xpath
|
||||||
|
|
||||||
|
|
||||||
|
def find_nodes(root, root_name, *args):
|
||||||
"""
|
"""
|
||||||
Find nodes with given `node_name`.
|
Find nodes with given `node_name`.
|
||||||
Also, verify that the root tag matches the `root_name`.
|
Also, verify that the root tag matches the `root_name`.
|
||||||
@ -69,16 +106,16 @@ def find_nodes(root, root_name, node_name):
|
|||||||
:type root: xml.etree.ElementTree.Element
|
:type root: xml.etree.ElementTree.Element
|
||||||
:param root_name: Expected (tag) name of the root node.
|
:param root_name: Expected (tag) name of the root node.
|
||||||
:type root_name: str
|
:type root_name: str
|
||||||
:param node_name: Name of the nodes we're looking for.
|
:param *args: Simplified xpath notation: strings are node names, dictionaries translate to [@key='value'] predicates.
|
||||||
:type node_name: str
|
:type *args: list[str, dict]
|
||||||
:returns: List of nodes that match the given `node_name`.
|
:returns: List of nodes that match xpath based on the given `args`.
|
||||||
:rtype: list(xml.etree.ElementTree.Element)
|
:rtype: list(xml.etree.ElementTree.Element)
|
||||||
"""
|
"""
|
||||||
assert root.tag == root_name
|
assert root.tag == root_name
|
||||||
return root.findall(node_name)
|
return root.findall(_to_xpath(*args))
|
||||||
|
|
||||||
|
|
||||||
def find_node(root, root_name, node_name=None):
|
def find_node(root, root_name, *args):
|
||||||
"""
|
"""
|
||||||
Find a single node with given `node_name`.
|
Find a single node with given `node_name`.
|
||||||
If `node_name` is not specified, the root node is returned.
|
If `node_name` is not specified, the root node is returned.
|
||||||
@ -88,17 +125,18 @@ def find_node(root, root_name, node_name=None):
|
|||||||
:type root: xml.etree.ElementTree.Element
|
:type root: xml.etree.ElementTree.Element
|
||||||
:param root_name: Expected (tag) name of the root node.
|
:param root_name: Expected (tag) name of the root node.
|
||||||
:type root_name: str
|
:type root_name: str
|
||||||
:param node_name: Name of the nodes we're looking for.
|
:param *args: Simplified xpath notation: strings are node names, dictionaries translate to [@key='value'] predicates.
|
||||||
:type node_name: str
|
:type *args: list[str, dict]
|
||||||
:returns: The node that matches the given `node_name`
|
:returns: The node that matches xpath based on the given `args`
|
||||||
or the root node if `node_name` is not specified.
|
or the root node if `args` are not specified.
|
||||||
:rtype: xml.etree.ElementTree.Element
|
:rtype: xml.etree.ElementTree.Element
|
||||||
"""
|
"""
|
||||||
|
|
||||||
assert root.tag == root_name
|
assert root.tag == root_name
|
||||||
if node_name:
|
if not args:
|
||||||
return root.find(node_name)
|
# only verify the root tag
|
||||||
return root
|
return root
|
||||||
|
return root.find(_to_xpath(*args))
|
||||||
|
|
||||||
|
|
||||||
def write_xml_node_to_file(node, path, indent=True):
|
def write_xml_node_to_file(node, path, indent=True):
|
||||||
@ -112,8 +150,31 @@ def write_xml_node_to_file(node, path, indent=True):
|
|||||||
:param indent: Whether to indent (pretty-print) the written XML.
|
:param indent: Whether to indent (pretty-print) the written XML.
|
||||||
:type indent: bool
|
:type indent: bool
|
||||||
"""
|
"""
|
||||||
from .. import core as osc_core
|
|
||||||
|
|
||||||
if indent:
|
if indent:
|
||||||
osc_core.xmlindent(node)
|
xml_indent(node)
|
||||||
osc_core.ET.ElementTree(node).write(path)
|
ET.ElementTree(node).write(path)
|
||||||
|
|
||||||
|
|
||||||
|
def xml_escape(string):
|
||||||
|
"""
|
||||||
|
Escape the string so it's safe to use in XML and xpath.
|
||||||
|
"""
|
||||||
|
entities = {
|
||||||
|
"\"": """,
|
||||||
|
"'": "'",
|
||||||
|
}
|
||||||
|
if isinstance(string, bytes):
|
||||||
|
return xml.sax.saxutils.escape(string.decode("utf-8"), entities=entities).encode("utf-8")
|
||||||
|
return xml.sax.saxutils.escape(string, entities=entities)
|
||||||
|
|
||||||
|
|
||||||
|
def xml_indent(root):
|
||||||
|
"""
|
||||||
|
Indent XML so it looks pretty after printing or saving to file.
|
||||||
|
"""
|
||||||
|
if hasattr(ET, "indent"):
|
||||||
|
# ElementTree supports indent() in Python 3.9 and newer
|
||||||
|
ET.indent(root)
|
||||||
|
else:
|
||||||
|
from .. import core as osc_core
|
||||||
|
osc_core.xmlindent(root)
|
||||||
|
@ -16,6 +16,7 @@ from urllib.error import URLError, HTTPError
|
|||||||
|
|
||||||
import urllib3.exceptions
|
import urllib3.exceptions
|
||||||
|
|
||||||
|
from . import _private
|
||||||
from . import commandline
|
from . import commandline
|
||||||
from . import oscerr
|
from . import oscerr
|
||||||
from .OscConfigParser import configparser
|
from .OscConfigParser import configparser
|
||||||
@ -112,7 +113,7 @@ def run(prg, argv=None):
|
|||||||
if b'<summary>' in body:
|
if b'<summary>' in body:
|
||||||
msg = body.split(b'<summary>')[1]
|
msg = body.split(b'<summary>')[1]
|
||||||
msg = msg.split(b'</summary>')[0]
|
msg = msg.split(b'</summary>')[0]
|
||||||
msg = msg.replace(b'<', b'<').replace(b'>', b'>').replace(b'&', b'&')
|
msg = _private.api.xml_escape(msg)
|
||||||
print(decode_it(msg), file=sys.stderr)
|
print(decode_it(msg), file=sys.stderr)
|
||||||
if e.code >= 500 and e.code <= 599:
|
if e.code >= 500 and e.code <= 599:
|
||||||
print('\nRequest: %s' % e.filename)
|
print('\nRequest: %s' % e.filename)
|
||||||
|
@ -1235,6 +1235,8 @@ class Osc(cmdln.Cmdln):
|
|||||||
help='Try to remove also all repositories building against remove ones.')
|
help='Try to remove also all repositories building against remove ones.')
|
||||||
@cmdln.option('-s', '--set', metavar='ATTRIBUTE_VALUES',
|
@cmdln.option('-s', '--set', metavar='ATTRIBUTE_VALUES',
|
||||||
help='set attribute values')
|
help='set attribute values')
|
||||||
|
@cmdln.option('--add', metavar='ATTRIBUTE_VALUES',
|
||||||
|
help='add to the existing attribute values')
|
||||||
@cmdln.option('--delete', action='store_true',
|
@cmdln.option('--delete', action='store_true',
|
||||||
help='delete a pattern or attribute')
|
help='delete a pattern or attribute')
|
||||||
def do_meta(self, subcmd, opts, *args):
|
def do_meta(self, subcmd, opts, *args):
|
||||||
@ -1306,6 +1308,9 @@ class Osc(cmdln.Cmdln):
|
|||||||
if len(args) > max_args:
|
if len(args) > max_args:
|
||||||
raise oscerr.WrongArgs('Too many arguments.')
|
raise oscerr.WrongArgs('Too many arguments.')
|
||||||
|
|
||||||
|
if opts.add and opts.set:
|
||||||
|
self.argparse_error("Options --add and --set are mutually exclusive")
|
||||||
|
|
||||||
apiurl = self.get_api_url()
|
apiurl = self.get_api_url()
|
||||||
|
|
||||||
# Specific arguments
|
# Specific arguments
|
||||||
@ -1369,7 +1374,7 @@ class Osc(cmdln.Cmdln):
|
|||||||
raise oscerr.WrongOptions('options --revision and --message are only supported for the prj or prjconf subcommand')
|
raise oscerr.WrongOptions('options --revision and --message are only supported for the prj or prjconf subcommand')
|
||||||
|
|
||||||
# show
|
# show
|
||||||
if not opts.edit and not opts.file and not opts.delete and not opts.create and not opts.set:
|
if not opts.edit and not opts.file and not opts.delete and not opts.create and not opts.set and not opts.add:
|
||||||
if cmd == 'prj':
|
if cmd == 'prj':
|
||||||
sys.stdout.write(decode_it(b''.join(show_project_meta(apiurl, project, rev=opts.revision, blame=opts.blame))))
|
sys.stdout.write(decode_it(b''.join(show_project_meta(apiurl, project, rev=opts.revision, blame=opts.blame))))
|
||||||
elif cmd == 'pkg':
|
elif cmd == 'pkg':
|
||||||
@ -1445,17 +1450,33 @@ class Osc(cmdln.Cmdln):
|
|||||||
template_args=None)
|
template_args=None)
|
||||||
|
|
||||||
# create attribute entry
|
# create attribute entry
|
||||||
if (opts.create or opts.set) and cmd == 'attribute':
|
if (opts.create or opts.set or opts.add) and cmd == 'attribute':
|
||||||
if not opts.attribute:
|
if not opts.attribute:
|
||||||
raise oscerr.WrongOptions('no attribute given to create')
|
raise oscerr.WrongOptions('no attribute given to create')
|
||||||
values = ''
|
|
||||||
if opts.set:
|
|
||||||
opts.set = opts.set.replace('&', '&').replace('<', '<').replace('>', '>')
|
|
||||||
for i in opts.set.split(','):
|
|
||||||
values += '<value>%s</value>' % i
|
|
||||||
aname = opts.attribute.split(":")
|
aname = opts.attribute.split(":")
|
||||||
if len(aname) != 2:
|
if len(aname) != 2:
|
||||||
raise oscerr.WrongOptions('Given attribute is not in "NAMESPACE:NAME" style')
|
raise oscerr.WrongOptions('Given attribute is not in "NAMESPACE:NAME" style')
|
||||||
|
|
||||||
|
values = ''
|
||||||
|
|
||||||
|
if opts.add:
|
||||||
|
# read the existing values from server
|
||||||
|
root = _private.api.get(apiurl, attributepath)
|
||||||
|
nodes = _private.api.find_nodes(root, "attributes", "attribute", {"namespace": aname[0], "name": aname[1]}, "value")
|
||||||
|
for node in nodes:
|
||||||
|
# append the existing values
|
||||||
|
value = _private.api.xml_escape(node.text)
|
||||||
|
values += f"<value>{value}</value>"
|
||||||
|
|
||||||
|
# pretend we're setting values in order to append the values we have specified on the command-line,
|
||||||
|
# because OBS API doesn't support extending the value list directly
|
||||||
|
opts.set = opts.add
|
||||||
|
|
||||||
|
if opts.set:
|
||||||
|
for i in opts.set.split(','):
|
||||||
|
values += '<value>%s</value>' % _private.api.xml_escape(i)
|
||||||
|
|
||||||
d = '<attributes><attribute namespace=\'%s\' name=\'%s\' >%s</attribute></attributes>' % (aname[0], aname[1], values)
|
d = '<attributes><attribute namespace=\'%s\' name=\'%s\' >%s</attribute></attributes>' % (aname[0], aname[1], values)
|
||||||
url = makeurl(apiurl, attributepath)
|
url = makeurl(apiurl, attributepath)
|
||||||
for data in streamfile(url, http_POST, data=d):
|
for data in streamfile(url, http_POST, data=d):
|
||||||
|
@ -7089,8 +7089,7 @@ def get_commitlog(
|
|||||||
r.append('<author>%s</author>' % user)
|
r.append('<author>%s</author>' % user)
|
||||||
r.append('<date>%s</date>' % t)
|
r.append('<date>%s</date>' % t)
|
||||||
r.append('<requestid>%s</requestid>' % requestid)
|
r.append('<requestid>%s</requestid>' % requestid)
|
||||||
r.append('<msg>%s</msg>' %
|
r.append('<msg>%s</msg>' % _private.api.xml_escape(decode_it(comment)))
|
||||||
decode_it(comment).replace('&', '&').replace('<', '>').replace('>', '<'))
|
|
||||||
r.append('</logentry>')
|
r.append('</logentry>')
|
||||||
else:
|
else:
|
||||||
if requestid:
|
if requestid:
|
||||||
|
34
tests/test__private_api.py
Normal file
34
tests/test__private_api.py
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
import unittest
|
||||||
|
|
||||||
|
from osc._private.api import xml_escape
|
||||||
|
|
||||||
|
|
||||||
|
class TestXmlEscape(unittest.TestCase):
|
||||||
|
def test_lt(self):
|
||||||
|
actual = xml_escape("<")
|
||||||
|
expected = "<"
|
||||||
|
self.assertEqual(actual, expected)
|
||||||
|
|
||||||
|
def test_gt(self):
|
||||||
|
actual = xml_escape(">")
|
||||||
|
expected = ">"
|
||||||
|
self.assertEqual(actual, expected)
|
||||||
|
|
||||||
|
def test_apos(self):
|
||||||
|
actual = xml_escape("'")
|
||||||
|
expected = "'"
|
||||||
|
self.assertEqual(actual, expected)
|
||||||
|
|
||||||
|
def test_quot(self):
|
||||||
|
actual = xml_escape("\"")
|
||||||
|
expected = """
|
||||||
|
self.assertEqual(actual, expected)
|
||||||
|
|
||||||
|
def test_amp(self):
|
||||||
|
actual = xml_escape("&")
|
||||||
|
expected = "&"
|
||||||
|
self.assertEqual(actual, expected)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
Loading…
Reference in New Issue
Block a user