1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-02-19 08:42:11 +01:00

Extend xml.etree.ElementTree.ParseError output with a snippet of broken XML

This commit is contained in:
Daniel Mach 2025-01-02 14:22:54 +01:00
parent ffca873272
commit 6fc5654aca
16 changed files with 246 additions and 167 deletions

View File

@ -10,6 +10,7 @@ from xml.etree import ElementTree as ET
from ..util.xml import xml_escape
from ..util.xml import xml_indent
from ..util.xml import xml_unescape
from ..util.xml import xml_parse
def get(apiurl, path, query=None):
@ -36,7 +37,7 @@ def get(apiurl, path, query=None):
url = osc_core.makeurl(apiurl, path, query)
with osc_connection.http_GET(url) as f:
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
return root
@ -64,7 +65,7 @@ def post(apiurl, path, query=None):
url = osc_core.makeurl(apiurl, path, query)
with osc_connection.http_POST(url) as f:
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
return root
@ -92,7 +93,7 @@ def put(apiurl, path, query=None, data=None):
url = osc_core.makeurl(apiurl, path, query)
with osc_connection.http_PUT(url, data=data) as f:
root = osc_core.ET.parse(f).getroot()
root = xml_parse(f).getroot()
return root

View File

@ -30,6 +30,7 @@ from .util import cpio
from .util import archquery, debquery, packagequery, rpmquery
from .util import repodata
from .util.helper import decode_it
from .util.xml import xml_parse
change_personality = {
@ -79,7 +80,7 @@ class Buildinfo:
def __init__(self, filename, apiurl, buildtype='spec', localpkgs=None, binarytype='rpm'):
localpkgs = localpkgs or []
try:
tree = ET.parse(filename)
tree = xml_parse(filename)
except ET.ParseError:
print('could not parse the buildinfo:', file=sys.stderr)
print(open(filename).read(), file=sys.stderr)
@ -1351,7 +1352,7 @@ def main(apiurl, store, opts, argv):
if build_type == 'kiwi':
# Is a obsrepositories tag used?
try:
tree = ET.parse(build_descr)
tree = xml_parse(build_descr)
except:
print('could not parse the kiwi file:', file=sys.stderr)
print(open(build_descr).read(), file=sys.stderr)

View File

@ -30,6 +30,8 @@ from urllib.error import HTTPError
from . import commands as osc_commands
from . import oscerr
from .commandline_common import *
from .util.xml import xml_fromstring
from .util.xml import xml_parse
class OscCommand(Command):
@ -1165,7 +1167,7 @@ class Osc(cmdln.Cmdln):
break
m = show_files_meta(apiurl, project, package)
li = Linkinfo()
root = ET.fromstring(m)
root = xml_fromstring(m)
li.read(root.find('linkinfo'))
if li.haserror():
raise oscerr.LinkExpandError(project, package, li.error)
@ -2095,7 +2097,7 @@ class Osc(cmdln.Cmdln):
# get _link info from server, that knows about the local state ...
u = makeurl(apiurl, ['source', project, p])
f = http_GET(u)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
_check_service(root)
linkinfo = root.find('linkinfo')
if linkinfo is None:
@ -2140,7 +2142,7 @@ class Osc(cmdln.Cmdln):
u = makeurl(apiurl, ['request'], query='cmd=create&addrevision=1')
f = http_POST(u, data=xml)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
sr_ids.append(root.get('id'))
print("Request(s) created: ", end=' ')
@ -2150,7 +2152,7 @@ class Osc(cmdln.Cmdln):
# was this project created by clone request ?
u = makeurl(apiurl, ['source', project, '_attribute', 'OBS:RequestCloned'])
f = http_GET(u)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
value = root.findtext('attribute/value')
if value and not opts.yes:
repl = ''
@ -2214,7 +2216,7 @@ class Osc(cmdln.Cmdln):
# check for failed source service
u = makeurl(apiurl, ['source', src_project, src_package])
f = http_GET(u)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
_check_service(root)
if not opts.nodevelproject:
@ -2242,7 +2244,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
# get _link info from server, that knows about the local state ...
u = makeurl(apiurl, ['source', src_project, src_package], query="expand=1")
f = http_GET(u)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
linkinfo = root.find('linkinfo')
if linkinfo is None:
rev = root.get('rev')
@ -2358,7 +2360,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
# get _link info from server, that knows about the local state ...
u = makeurl(apiurl, ['source', project, p])
f = http_GET(u)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
linkinfo = root.find('linkinfo')
if linkinfo is None:
print("Package ", p, " is not a source link.")
@ -2751,7 +2753,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
u = makeurl(apiurl, ['request'], query='cmd=create')
f = http_POST(u, data=xml)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
rid = root.get('id')
print(f"Request {rid} created")
for srid in supersede:
@ -3249,21 +3251,21 @@ Please submit there instead, or use --nodevelproject to force direct submission.
query = {'cmd': cmd}
url = makeurl(apiurl, ['request', reqid], query)
r = http_POST(url, data=opts.message)
print(ET.parse(r).getroot().get('code'))
print(xml_parse(r).getroot().get('code'))
# change incidents
elif cmd == 'setincident':
query = {'cmd': 'setincident', 'incident': incident}
url = makeurl(apiurl, ['request', reqid], query)
r = http_POST(url, data=opts.message)
print(ET.parse(r).getroot().get('code'))
print(xml_parse(r).getroot().get('code'))
# change priority
elif cmd in ['prioritize', 'priorize']:
query = {'cmd': 'setpriority', 'priority': priority}
url = makeurl(apiurl, ['request', reqid], query)
r = http_POST(url, data=opts.message)
print(ET.parse(r).getroot().get('code'))
print(xml_parse(r).getroot().get('code'))
# add new reviewer to existing request
elif cmd in ['add'] and subcmd == 'review':
@ -3280,7 +3282,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
if not opts.message:
opts.message = edit_message()
r = http_POST(url, data=opts.message)
print(ET.parse(r).getroot().get('code'))
print(xml_parse(r).getroot().get('code'))
# list and approvenew
elif cmd == 'list' or cmd == 'approvenew':
@ -3436,7 +3438,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
except HTTPError as e:
if e.code == 404:
# Any referenced object does not exist, eg. the superseded request
root = ET.fromstring(e.read())
root = xml_fromstring(e.read())
summary = root.find('summary')
print(summary.text, file=sys.stderr)
raise oscerr.WrongOptions("Object does not exist")
@ -3521,7 +3523,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
details = e.hdrs.get('X-Opensuse-Errorcode')
if details:
print(details, file=sys.stderr)
root = ET.fromstring(e.read())
root = xml_fromstring(e.read())
summary = root.find('summary')
if summary is not None:
print(summary.text)
@ -3544,7 +3546,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
'match': f"([devel[@project='{action.tgt_project}' and @package='{action.tgt_package}']])"
})
f = http_GET(u)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
if root.findall('package') and not opts.no_devel:
for node in root.findall('package'):
project = node.get('project')
@ -3554,7 +3556,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
links_to_project = links_to_package = None
try:
file = http_GET(link_url)
root = ET.parse(file).getroot()
root = xml_parse(file).getroot()
link_node = root.find('linkinfo')
if link_node is not None:
links_to_project = link_node.get('project') or project
@ -3713,7 +3715,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
try:
copy_pac(apiurl, project, package, apiurl, project, package, expand=True, comment=opts.message)
except HTTPError as e:
root = ET.fromstring(show_files_meta(apiurl, project, package, 'latest', expand=False))
root = xml_fromstring(show_files_meta(apiurl, project, package, 'latest', expand=False))
li = Linkinfo()
li.read(root.find('linkinfo'))
if li.islink() and li.haserror():
@ -4040,7 +4042,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
source_project = self._process_project_name(args[0])
f = show_project_meta(apiurl, source_project)
root = ET.fromstring(b''.join(f))
root = xml_fromstring(b''.join(f))
if not opts.message:
opts.message = edit_message()
@ -4116,14 +4118,14 @@ Please submit there instead, or use --nodevelproject to force direct submission.
url = makeurl(apiurl, ['source', target_project], query=query)
r = http_POST(url, data=opts.message)
project = None
for i in ET.fromstring(r.read()).findall('data'):
for i in xml_fromstring(r.read()).findall('data'):
if i.get('name') == 'targetproject':
project = i.text.strip()
if project:
print("Incident project created: ", project)
else:
print(ET.parse(r).getroot().get('code'))
print(ET.parse(r).getroot().get('error'))
print(xml_parse(r).getroot().get('code'))
print(xml_parse(r).getroot().get('error'))
@cmdln.option('-a', '--attribute', metavar='ATTRIBUTE',
help='Use this attribute to find default maintenance project (default is OBS:MaintenanceProject)')
@ -4524,7 +4526,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
devloc = None
if not exists and (srcprj != self._process_project_name(args[0]) or srcpkg != args[1]):
try:
root = ET.fromstring(b''.join(show_attribute_meta(apiurl, args[0], None, None,
root = xml_fromstring(b''.join(show_attribute_meta(apiurl, args[0], None, None,
conf.config['maintained_update_project_attribute'], None, None)))
# this might raise an AttributeError
uproject = root.find('attribute').find('value').text
@ -4781,7 +4783,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
u = makeurl(apiurl, ['source', project, package], query=query)
f = http_GET(u)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
linkinfo = root.find('linkinfo')
if linkinfo is None:
raise oscerr.APIError('package is not a source link')
@ -4973,7 +4975,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
try:
file = http_GET(link_url)
root = ET.parse(file).getroot()
root = xml_parse(file).getroot()
except HTTPError as e:
return (None, None)
except SyntaxError as e:
@ -4996,7 +4998,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
link_url = makeurl(apiurl, ['source', project, package])
try:
file = http_GET(link_url)
root = ET.parse(file).getroot()
root = xml_parse(file).getroot()
except HTTPError as e:
if e.code != 404:
print(f'Cannot get list of files for {project}/{package}: {e}', file=sys.stderr)
@ -5527,7 +5529,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
try:
m = show_files_meta(apiurl, project, package)
li = Linkinfo()
li.read(ET.fromstring(''.join(m)).find('linkinfo'))
li.read(xml_fromstring(''.join(m)).find('linkinfo'))
if not li.haserror():
if li.project == project:
print(statfrmt('S', package + " link to package " + li.package))
@ -6028,7 +6030,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
pacs[0].name, revision=rev,
linkrev=opts.linkrev,
expand=opts.server_side_source_service_files)
directory = ET.fromstring(meta)
directory = xml_fromstring(meta)
li_node = directory.find('linkinfo')
if li_node is None:
print(f'Revision \'{rev}\' is no link', file=sys.stderr)
@ -6609,7 +6611,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
query['lastsucceeded'] = 1
u = makeurl(self.get_api_url(), ['build', project, repository, arch, package, '_log'], query=query)
f = http_GET(u)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
offset = int(root.find('entry').get('size'))
if opts.offset:
offset = offset - int(opts.offset)
@ -6714,7 +6716,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
query['lastsucceeded'] = 1
u = makeurl(self.get_api_url(), ['build', project, repository, arch, package, '_log'], query=query)
f = http_GET(u)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
offset = int(root.find('entry').get('size'))
if opts.offset:
offset = offset - int(opts.offset)
@ -6744,7 +6746,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
for f in files[1:]:
if os.stat(f).st_atime > os.stat(cfg).st_atime:
cfg = f
root = ET.parse(cfg).getroot()
root = xml_parse(cfg).getroot()
repo = root.get("repository")
arch = root.findtext("arch")
return repo, arch
@ -6870,7 +6872,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
print(apiurl, project, package, repository, arch)
xml = show_package_trigger_reason(apiurl, project, package, repository, arch)
root = ET.fromstring(xml)
root = xml_fromstring(xml)
if root.find('explain') is None:
reason = "No triggerreason found"
print(reason)
@ -6984,7 +6986,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
project_packages = meta_get_packagelist(apiurl, project, deleted=False, expand=False)
xml = get_dependson(apiurl, project, repository, arch, packages, reverse)
root = ET.fromstring(xml)
root = xml_fromstring(xml)
for package in root.findall('package'):
print(package.get('name'), ":")
for dep in package.findall('pkgdep'):
@ -8829,7 +8831,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
'match': f"([kind='patchinfo' and issue[@state='OPEN' and owner/@login='{user}']])"
})
f = http_GET(u)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
if root.findall('package'):
print("Patchinfos with open bugs assigned to you:\n")
for node in root.findall('package'):
@ -8838,7 +8840,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
print(project, "/", package, '\n')
p = makeurl(apiurl, ['source', project, package], {'view': 'issues'})
fp = http_GET(p)
issues = ET.parse(fp).findall('issue')
issues = xml_parse(fp).findall('issue')
for issue in issues:
if issue.find('state') is None or issue.find('state').text != "OPEN":
continue
@ -8865,7 +8867,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
'user': user,
})
f = http_GET(u)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
if root.findall('request'):
print("Requests which request a review by you:\n")
for node in root.findall('request'):
@ -8881,7 +8883,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
'user': user,
})
f = http_GET(u)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
if root.findall('request'):
print("Requests for your packages:\n")
for node in root.findall('request'):
@ -8897,7 +8899,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
'user': user,
})
f = http_GET(u)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
if root.findall('request'):
print("Declined requests created by you (revoke, reopen or supersede):\n")
for node in root.findall('request'):
@ -9326,7 +9328,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
'name': pac,
'user': user}), apiurl=apiurl)
if data:
data = ET.fromstring(parse_meta_to_string(data))
data = xml_fromstring(parse_meta_to_string(data))
data.find('title').text = ''.join(title)
data.find('description').text = ''.join(descr)
data.find('url').text = url
@ -9657,7 +9659,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
u = makeurl(apiurl, ['request'], query='cmd=create')
f = http_POST(u, data=xml)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
print("Request ID:", root.get('id'))
elif opts.delete:
@ -9678,7 +9680,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
else:
if pac:
m = show_package_meta(apiurl, prj, pac)
metaroot = ET.fromstring(b''.join(m))
metaroot = xml_fromstring(b''.join(m))
if not opts.nodevelproject:
while metaroot.findall('devel'):
d = metaroot.find('devel')
@ -9687,18 +9689,18 @@ Please submit there instead, or use --nodevelproject to force direct submission.
if opts.verbose:
print(f"Following to the development space: {prj}/{pac}")
m = show_package_meta(apiurl, prj, pac)
metaroot = ET.fromstring(b''.join(m))
metaroot = xml_fromstring(b''.join(m))
if not metaroot.findall('person') and not metaroot.findall('group'):
if opts.verbose:
print("No dedicated persons in package defined, showing the project persons.")
pac = None
m = show_project_meta(apiurl, prj)
metaroot = ET.fromstring(b''.join(m))
metaroot = xml_fromstring(b''.join(m))
else:
# fallback to project lookup for old servers
if prj and not searchresult:
m = show_project_meta(apiurl, prj)
metaroot = ET.fromstring(b''.join(m))
metaroot = xml_fromstring(b''.join(m))
# extract the maintainers
projects = []
@ -9993,7 +9995,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
query = {'rev': 'latest'}
u = makeurl(apiurl, ['source', prj, package], query=query)
f = http_GET(u)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
linkinfo = root.find('linkinfo')
if linkinfo is None:
raise oscerr.APIError('package is not a source link')
@ -10005,7 +10007,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
query = {'rev': 'latest', 'linkrev': 'base'}
u = makeurl(apiurl, ['source', prj, package], query=query)
f = http_GET(u)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
linkinfo = root.find('linkinfo')
if linkinfo.get('error') is None:
workingrev = linkinfo.get('xsrcmd5')
@ -10014,7 +10016,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
query = {'lastworking': 1}
u = makeurl(apiurl, ['source', prj, package], query=query)
f = http_GET(u)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
linkinfo = root.find('linkinfo')
if linkinfo is None:
raise oscerr.APIError('package is not a source link')
@ -10031,7 +10033,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
u = makeurl(apiurl, ['source', prj, package], query=query)
f = http_GET(u)
meta = f.readlines()
root_new = ET.fromstring(b''.join(meta))
root_new = xml_fromstring(b''.join(meta))
dir_new = {'apiurl': apiurl, 'project': prj, 'package': package}
dir_new['srcmd5'] = root_new.get('srcmd5')
dir_new['entries'] = [[n.get('name'), n.get('md5')] for n in root_new.findall('entry')]
@ -10039,7 +10041,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
query = {'rev': workingrev}
u = makeurl(apiurl, ['source', prj, package], query=query)
f = http_GET(u)
root_oldpatched = ET.parse(f).getroot()
root_oldpatched = xml_parse(f).getroot()
linkinfo_oldpatched = root_oldpatched.find('linkinfo')
if linkinfo_oldpatched is None:
raise oscerr.APIError('working rev is not a source link?')
@ -10053,7 +10055,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
query['rev'] = linkinfo_oldpatched.get('srcmd5')
u = makeurl(apiurl, ['source', linkinfo_oldpatched.get('project'), linkinfo_oldpatched.get('package')], query=query)
f = http_GET(u)
root_old = ET.parse(f).getroot()
root_old = xml_parse(f).getroot()
dir_old = {'apiurl': apiurl}
dir_old['project'] = linkinfo_oldpatched.get('project')
dir_old['package'] = linkinfo_oldpatched.get('package')
@ -10187,7 +10189,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
u = makeurl(p.apiurl, ['source', p.prjname, p.name], query=query)
f = http_GET(u)
meta = f.readlines()
root_new = ET.fromstring(b''.join(meta))
root_new = xml_fromstring(b''.join(meta))
linkinfo_new = root_new.find('linkinfo')
if linkinfo_new is None:
raise oscerr.APIError('link is not a really a link?')
@ -10207,7 +10209,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
query = {'rev': linkinfo.srcmd5}
u = makeurl(p.apiurl, ['source', linkinfo.project, linkinfo.package], query=query)
f = http_GET(u)
root_old = ET.parse(f).getroot()
root_old = xml_parse(f).getroot()
dir_old = {'apiurl': p.apiurl, 'project': linkinfo.project, 'package': linkinfo.package, 'srcmd5': linkinfo.srcmd5}
dir_old['entries'] = [[n.get('name'), n.get('md5')] for n in root_old.findall('entry')]

View File

@ -83,7 +83,9 @@ from .output import run_pager
from .output import sanitize_text
from .util import xdg
from .util.helper import decode_list, decode_it, raw_input, _html_escape
from .util.xml import xml_fromstring
from .util.xml import xml_indent_compat as xmlindent
from .util.xml import xml_parse
ET_ENCODING = "unicode"
@ -993,7 +995,7 @@ class Request:
query['enforce_branching'] = "1"
u = makeurl(apiurl, ['request'], query=query)
f = http_POST(u, data=self.to_str())
root = ET.fromstring(f.read())
root = xml_fromstring(f.read())
self.read(root)
@ -1220,7 +1222,7 @@ def meta_get_packagelist(apiurl: str, prj, deleted=None, expand=False):
u = makeurl(apiurl, ['source', prj], query)
f = http_GET(u)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
return [node.get('name') for node in root.findall('entry')]
@ -1244,7 +1246,7 @@ def meta_get_filelist(
u = makeurl(apiurl, ['source', prj, package], query=query)
f = http_GET(u)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
if not verbose:
return [node.get('name') for node in root.findall('entry')]
@ -1270,7 +1272,7 @@ def meta_get_project_list(apiurl: str, deleted=False):
u = makeurl(apiurl, ['source'], query)
f = http_GET(u)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
return sorted(node.get('name') for node in root if node.get('name'))
@ -1443,7 +1445,7 @@ def show_pattern_metalist(apiurl: str, prj: str):
url = makeurl(apiurl, ['source', prj, '_pattern'])
try:
f = http_GET(url)
tree = ET.parse(f)
tree = xml_parse(f)
except HTTPError as e:
e.osc_msg = f'show_pattern_metalist: Error getting pattern list for project \'{prj}\''
raise
@ -1543,7 +1545,7 @@ class metafile:
print('BuildService API error:', error_help, file=sys.stderr)
# examine the error - we can't raise an exception because we might want
# to try again
root = ET.fromstring(e.read())
root = xml_fromstring(e.read())
summary = root.find('summary')
if summary is not None:
print(summary.text, file=sys.stderr)
@ -1695,7 +1697,7 @@ def edit_meta(
if metatype == 'pkg':
# check if the package is a link to a different project
project, package = path_args
orgprj = ET.fromstring(parse_meta_to_string(data)).get('project')
orgprj = xml_fromstring(parse_meta_to_string(data)).get('project')
if orgprj is not None and unquote(project) != orgprj:
print('The package is linked from a different project.')
@ -1752,7 +1754,7 @@ def show_upstream_srcmd5(
apiurl: str, prj: str, pac: str, expand=False, revision=None, meta=False, include_service_files=False, deleted=False
):
m = show_files_meta(apiurl, prj, pac, expand=expand, revision=revision, meta=meta, deleted=deleted)
et = ET.fromstring(m)
et = xml_fromstring(m)
if include_service_files:
try:
sinfo = et.find('serviceinfo')
@ -1776,7 +1778,7 @@ def show_upstream_xsrcmd5(
meta=meta,
expand=include_service_files,
)
et = ET.fromstring(m)
et = xml_fromstring(m)
if include_service_files:
return et.get('srcmd5')
@ -1820,7 +1822,7 @@ def get_project_sourceinfo(apiurl: str, project: str, nofilename: bool, *package
pkgs = packages[n:]
res.update(get_project_sourceinfo(apiurl, project, nofilename, *pkgs))
return res
root = ET.fromstring(si)
root = xml_fromstring(si)
res = {}
for sinfo in root.findall('sourceinfo'):
res[sinfo.get('package')] = sinfo
@ -1829,7 +1831,7 @@ def get_project_sourceinfo(apiurl: str, project: str, nofilename: bool, *package
def show_upstream_rev_vrev(apiurl: str, prj, pac, revision=None, expand=False, meta=False):
m = show_files_meta(apiurl, prj, pac, revision=revision, expand=expand, meta=meta)
et = ET.fromstring(m)
et = xml_fromstring(m)
rev = et.get("rev") or None
vrev = et.get("vrev") or None
return rev, vrev
@ -1839,7 +1841,7 @@ def show_upstream_rev(
apiurl: str, prj, pac, revision=None, expand=False, linkrev=None, meta=False, include_service_files=False
):
m = show_files_meta(apiurl, prj, pac, revision=revision, expand=expand, linkrev=linkrev, meta=meta)
et = ET.fromstring(m)
et = xml_fromstring(m)
if include_service_files:
try:
sinfo = et.find('serviceinfo')
@ -2071,7 +2073,7 @@ def clone_request(apiurl: str, reqid, msg=None):
query = {'cmd': 'branch', 'request': reqid}
url = makeurl(apiurl, ['source'], query)
r = http_POST(url, data=msg)
root = ET.fromstring(r.read())
root = xml_fromstring(r.read())
project = None
for i in root.findall('data'):
if i.get('name') == 'targetproject':
@ -2182,7 +2184,7 @@ def create_submit_request(
def get_request(apiurl: str, reqid):
u = makeurl(apiurl, ['request', reqid], {'withfullhistory': '1'})
f = http_GET(u)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
r = Request()
r.read(root, apiurl=apiurl)
@ -2205,7 +2207,7 @@ def change_review_state(
query['superseded_by'] = supersed
u = makeurl(apiurl, ['request', reqid], query=query)
f = http_POST(u, data=message)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
return root.get('code')
@ -2221,7 +2223,7 @@ def change_request_state(apiurl: str, reqid, newstate, message="", supersed=None
['request', reqid], query=query)
f = http_POST(u, data=message)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
return root.get('code', 'unknown')
@ -2370,7 +2372,7 @@ def get_request_collection(
u = makeurl(apiurl, ['request'], query)
f = http_GET(u)
res = ET.parse(f).getroot()
res = xml_parse(f).getroot()
requests = []
for root in res.findall('request'):
@ -2648,7 +2650,7 @@ def get_user_meta(apiurl: str, user: str):
def _get_xml_data(meta, *tags):
data = []
if meta is not None:
root = ET.fromstring(meta)
root = xml_fromstring(meta)
for tag in tags:
elm = root.find(tag)
if elm is None or elm.text is None:
@ -2911,7 +2913,7 @@ def server_diff(
del_issue_list = []
add_issue_list = []
chn_issue_list = []
root = ET.fromstring(f.read())
root = xml_fromstring(f.read())
node = root.find('issues')
for issuenode in node.findall('issue'):
if issuenode.get('state') == 'deleted':
@ -2966,7 +2968,7 @@ def server_diff_noex(
new_project, new_package, new_revision,
unified, missingok, meta, False, files=files)
except:
elm = ET.fromstring(body).find('summary')
elm = xml_fromstring(body).find('summary')
summary = ''
if elm is not None and elm.text is not None:
summary = elm.text
@ -2992,14 +2994,14 @@ def get_request_issues(apiurl: str, reqid):
"""
u = makeurl(apiurl, ['request', reqid], query={'cmd': 'diff', 'view': 'xml', 'withissues': '1'})
f = http_POST(u)
request_tree = ET.parse(f).getroot()
request_tree = xml_parse(f).getroot()
issue_list = []
for elem in request_tree.iterfind('action/sourcediff/issues/issue'):
issue_id = elem.get('name')
encode_search = f'@name=\'{issue_id}\''
u = makeurl(apiurl, ['search/issue'], query={'match': encode_search})
f = http_GET(u)
collection = ET.parse(f).getroot()
collection = xml_parse(f).getroot()
for cissue in collection:
issue = {}
for issue_detail in cissue.iter():
@ -3023,10 +3025,10 @@ def submit_action_diff(apiurl: str, action: Action):
except HTTPError as e:
if e.code != 404:
raise e
root = ET.fromstring(e.read())
root = xml_fromstring(e.read())
return b'error: \'%s\' does not exist' % root.findtext("summary").encode()
elif e.code == 404:
root = ET.fromstring(e.read())
root = xml_fromstring(e.read())
return b'error: \'%s\' does not exist' % root.findtext("summary").encode()
raise e
@ -3158,7 +3160,7 @@ def checkout_package(
# before we create directories and stuff, check if the package actually
# exists
meta_data = b''.join(show_package_meta(apiurl, project, package))
root = ET.fromstring(meta_data)
root = xml_fromstring(meta_data)
scmsync_element = root.find("scmsync")
if not native_obs_package and scmsync_element is not None and scmsync_element.text is not None:
directory = make_dir(apiurl, project, package, pathname, prj_dir, conf.config['do_package_tracking'], outdir)
@ -3222,7 +3224,7 @@ def replace_pkg_meta(
only maintainer (unless keep_maintainers is set). Additionally remove the
develproject entry (<devel />) unless keep_develproject is true.
"""
root = ET.fromstring(b''.join(pkgmeta))
root = xml_fromstring(b''.join(pkgmeta))
root.set('name', new_name)
root.set('project', new_prj)
# never take releasename, it needs to be explicit
@ -3411,7 +3413,7 @@ def aggregate_pac(
path_args=(dst_project, dst_package_meta),
template_args=None,
create_new=False, apiurl=apiurl)
root = ET.fromstring(parse_meta_to_string(dst_meta))
root = xml_fromstring(parse_meta_to_string(dst_meta))
if root.get('project') != dst_project:
# The source comes from a different project via a project link, we need to create this instance
meta_change = True
@ -3455,7 +3457,7 @@ def aggregate_pac(
if disable_publish:
meta_change = True
root = ET.fromstring(''.join(dst_meta))
root = xml_fromstring(''.join(dst_meta))
elm = root.find('publish')
if not elm:
elm = ET.SubElement(root, 'publish')
@ -3543,7 +3545,7 @@ def attribute_branch_pkg(
try:
f = http_POST(u)
except HTTPError as e:
root = ET.fromstring(e.read())
root = xml_fromstring(e.read())
summary = root.find('summary')
if summary is not None and summary.text is not None:
raise oscerr.APIError(summary.text)
@ -3552,7 +3554,7 @@ def attribute_branch_pkg(
r = None
root = ET.fromstring(f.read())
root = xml_fromstring(f.read())
if dryrun:
return root
# TODO: change api here and return parsed XML as class
@ -3597,7 +3599,7 @@ def branch_pkg(
# read src_package meta
try:
m = b"".join(show_package_meta(apiurl, src_project, src_package))
root = ET.fromstring(m)
root = xml_fromstring(m)
except HTTPError as e:
if e.code == 404 and missingok:
root = None
@ -3614,7 +3616,7 @@ def branch_pkg(
if devel_project:
# replace src_package meta with devel_package meta because we're about branch from devel
m = b"".join(show_package_meta(apiurl, devel_project, devel_package))
root = ET.fromstring(m)
root = xml_fromstring(m)
# error out if we're branching a scmsync package (we'd end up with garbage anyway)
if root is not None and root.find("scmsync") is not None:
@ -3660,7 +3662,7 @@ def branch_pkg(
try:
f = http_POST(u)
except HTTPError as e:
root = ET.fromstring(e.read())
root = xml_fromstring(e.read())
if missingok:
if root and root.get('code') == "not_missing":
raise oscerr.NotMissing("Package exists already via project link, but link will point to given project")
@ -3675,7 +3677,7 @@ def branch_pkg(
raise
return (True, m.group(1), m.group(2), None, None)
root = ET.fromstring(f.read())
root = xml_fromstring(f.read())
if conf.config['http_debug']:
print(ET.tostring(root, encoding=ET_ENCODING), file=sys.stderr)
data = {}
@ -3684,7 +3686,7 @@ def branch_pkg(
if disable_build:
target_meta = show_package_meta(apiurl, data["targetproject"], data["targetpackage"])
root = ET.fromstring(b''.join(target_meta))
root = xml_fromstring(b''.join(target_meta))
elm = root.find('build')
if not elm:
@ -3751,7 +3753,7 @@ def copy_pac(
if meta is None:
meta = show_files_meta(dst_apiurl, dst_project, dst_package)
root = ET.fromstring(meta)
root = xml_fromstring(meta)
if root.find("scmsync") is not None:
print("Note: package source is managed via SCM")
return
@ -3776,7 +3778,7 @@ def copy_pac(
query = {'rev': 'upload'}
xml = show_files_meta(src_apiurl, src_project, src_package,
expand=expand, revision=revision)
filelist = ET.fromstring(xml)
filelist = xml_fromstring(xml)
revision = filelist.get('srcmd5')
# filter out _service: files
for entry in filelist.findall('entry'):
@ -3905,7 +3907,7 @@ def get_platforms(apiurl: str):
def get_repositories(apiurl: str):
f = http_GET(makeurl(apiurl, ['platform']))
tree = ET.parse(f)
tree = xml_parse(f)
r = sorted(node.get('name') for node in tree.getroot())
return r
@ -3915,7 +3917,7 @@ def get_distributions(apiurl: str):
'distribution', 'project', 'repository', 'reponame'"""
f = http_GET(makeurl(apiurl, ['distributions']))
root = ET.fromstring(b''.join(f))
root = xml_fromstring(b''.join(f))
distlist = []
for node in root.findall('distribution'):
@ -3994,7 +3996,7 @@ def get_binarylist(
query['withccache'] = 1
u = makeurl(apiurl, ['build', prj, repo, arch, what], query=query)
f = http_GET(u)
tree = ET.parse(f)
tree = xml_parse(f)
if not verbose:
return [node.get('filename') for node in tree.findall('binary')]
else:
@ -4011,7 +4013,7 @@ def get_binarylist(
def get_binarylist_published(apiurl: str, prj: str, repo: str, arch: str):
u = makeurl(apiurl, ['published', prj, repo, arch])
f = http_GET(u)
tree = ET.parse(f)
tree = xml_parse(f)
r = [node.get('name') for node in tree.findall('entry')]
return r
@ -4058,7 +4060,7 @@ def show_prj_results_meta(
def result_xml_to_dicts(xml):
# assumption: xml contains at most one status element (maybe we should
# generalize this to arbitrary status element)
root = ET.fromstring(xml)
root = xml_fromstring(xml)
for node in root.findall('result'):
rmap = {}
rmap['project'] = rmap['prj'] = node.get('project')
@ -4196,13 +4198,13 @@ def get_package_results(apiurl: str, project: str, package: Optional[str] = None
if e.code == 502 or e.code == 504:
# re-try result request
continue
root = ET.fromstring(e.read())
root = xml_fromstring(e.read())
if e.code == 400 and kwargs.get('multibuild') and re.search('multibuild', getattr(root.find('summary'), 'text', '')):
kwargs['multibuild'] = None
kwargs['locallink'] = None
continue
raise
root = ET.fromstring(xml)
root = xml_fromstring(xml)
kwargs['oldstate'] = root.get('state')
for result in root.findall('result'):
if result.get('dirty') is not None:
@ -4270,7 +4272,7 @@ def get_prj_results(
r = []
f = show_prj_results_meta(apiurl, prj)
root = ET.fromstring(b''.join(f))
root = xml_fromstring(b''.join(f))
if name_filter is not None:
name_filter = re.compile(name_filter)
@ -4632,11 +4634,11 @@ def create_pbuild_config(apiurl: str, project: str, repository: str, arch: str,
f.write(decode_it(bc))
# create the _pbuild file based on expanded repository path informations
pb = ET.fromstring('<pbuild></pbuild>')
pb = xml_fromstring('<pbuild></pbuild>')
tree = ET.ElementTree(pb)
preset = ET.SubElement(pb, 'preset', name=repository, default="") # default should be empty, but ET crashes
bi_text = decode_it(get_buildinfo(apiurl, project, '_repository', repository, arch, specfile="Name: dummy"))
root = ET.fromstring(bi_text)
root = xml_fromstring(bi_text)
# cross compile setups are not yet supported
# for path in root.findall('hostsystem'):
@ -4661,7 +4663,7 @@ def check_constraints(apiurl: str, prj: str, repository: str, arch: str, package
query = {"cmd": "checkconstraints", "project": prj, "package": package, "repository": repository, "arch": arch}
u = makeurl(apiurl, ["worker"], query)
f = http_POST(u, data=constraintsfile)
root = ET.fromstring(b''.join(f))
root = xml_fromstring(b''.join(f))
return [node.get('name') for node in root.findall('entry')]
@ -4675,7 +4677,7 @@ def get_source_rev(apiurl: str, project: str, package: str, revision=None):
else:
url = makeurl(apiurl, ['source', project, package, '_history'])
f = http_GET(url)
xml = ET.parse(f)
xml = xml_parse(f)
ent = None
for new in xml.findall('revision'):
# remember the newest one.
@ -4701,7 +4703,7 @@ def print_jobhistory(apiurl: str, prj: str, current_package: str, repository: st
query['limit'] = int(limit)
u = makeurl(apiurl, ['build', prj, repository, arch, '_jobhistory'], query)
f = http_GET(u)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
if format == 'text':
print("time package reason code build time worker")
@ -4833,7 +4835,7 @@ def runservice(apiurl: str, prj: str, package: str):
e.osc_msg = f'could not trigger service run for project \'{prj}\' package \'{package}\''
raise
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
return root.get('code')
@ -4846,7 +4848,7 @@ def waitservice(apiurl: str, prj: str, package: str):
e.osc_msg = f'The service for project \'{prj}\' package \'{package}\' failed'
raise
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
return root.get('code')
@ -4863,7 +4865,7 @@ def mergeservice(apiurl: str, prj: str, package: str):
e.osc_msg = f'could not merge service files in project \'{prj}\' package \'{package}\''
raise
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
return root.get('code')
@ -4885,7 +4887,7 @@ def rebuild(apiurl: str, prj: str, package: str, repo: str, arch: str, code=None
e.osc_msg = f'could not trigger rebuild for project \'{prj}\' package \'{package}\''
raise
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
return root.get('code')
@ -4941,7 +4943,7 @@ def cmdbuild(
e.osc_msg += f' sysrq={code}'
raise
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
return root.get('code')
@ -5106,7 +5108,7 @@ def search(apiurl: str, queries=None, **kwargs):
query['match'] = xpath
u = makeurl(apiurl, path, query)
f = http_GET(u)
res[urlpath] = ET.parse(f).getroot()
res[urlpath] = xml_parse(f).getroot()
return res
@ -5148,7 +5150,7 @@ def owner(
res = None
try:
f = http_GET(u)
res = ET.parse(f).getroot()
res = xml_parse(f).getroot()
except HTTPError as e:
# old server not supporting this search
pass
@ -5159,7 +5161,7 @@ def set_link_rev(apiurl: str, project: str, package: str, revision="", expand=Fa
url = makeurl(apiurl, ["source", project, package, "_link"])
try:
f = http_GET(url)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
except HTTPError as e:
e.osc_msg = f'Unable to get _link file in package \'{package}\' for project \'{project}\''
raise
@ -5307,7 +5309,7 @@ def addPerson(apiurl: str, prj: str, pac: str, user: str, role="maintainer"):
create_new=False)
if data and get_user_meta(apiurl, user) is not None:
root = ET.fromstring(parse_meta_to_string(data))
root = xml_fromstring(parse_meta_to_string(data))
found = False
for person in root.iter('person'):
if person.get('userid') == user and person.get('role') == role:
@ -5342,7 +5344,7 @@ def delPerson(apiurl: str, prj: str, pac: str, user: str, role="maintainer"):
template_args=None,
create_new=False)
if data and get_user_meta(apiurl, user) is not None:
root = ET.fromstring(parse_meta_to_string(data))
root = xml_fromstring(parse_meta_to_string(data))
found = False
for person in root.iter('person'):
if person.get('userid') == user and person.get('role') == role:
@ -5374,7 +5376,7 @@ def setBugowner(apiurl: str, prj: str, pac: str, user=None, group=None):
group = user.replace('group:', '')
user = None
if data:
root = ET.fromstring(parse_meta_to_string(data))
root = xml_fromstring(parse_meta_to_string(data))
for group_element in root.iter('group'):
if group_element.get('role') == "bugowner":
root.remove(group_element)
@ -5426,9 +5428,9 @@ def addGitSource(url):
service_file = os.path.join(os.getcwd(), '_service')
addfile = False
if os.path.exists(service_file):
services = ET.parse(os.path.join(os.getcwd(), '_service')).getroot()
services = xml_parse(os.path.join(os.getcwd(), '_service')).getroot()
else:
services = ET.fromstring("<services />")
services = xml_fromstring("<services />")
addfile = True
stripETxml(services)
si = Serviceinfo()
@ -5451,9 +5453,9 @@ def addDownloadUrlService(url):
service_file = os.path.join(os.getcwd(), '_service')
addfile = False
if os.path.exists(service_file):
services = ET.parse(os.path.join(os.getcwd(), '_service')).getroot()
services = xml_parse(os.path.join(os.getcwd(), '_service')).getroot()
else:
services = ET.fromstring("<services />")
services = xml_fromstring("<services />")
addfile = True
stripETxml(services)
si = Serviceinfo()
@ -5708,7 +5710,7 @@ def request_interactive_review(apiurl, request, initial_cmd='', group=None,
details = e.hdrs.get('X-Opensuse-Errorcode')
if details:
print(details, file=sys.stderr)
root = ET.fromstring(e.read())
root = xml_fromstring(e.read())
summary = root.find('summary')
if summary is not None:
print(summary.text, file=sys.stderr)
@ -6230,7 +6232,7 @@ def which(name: str):
def get_comments(apiurl: str, kind, *args):
url = makeurl(apiurl, ["comments", kind] + list(args))
f = http_GET(url)
return ET.parse(f).getroot()
return xml_parse(f).getroot()
def print_comments(apiurl: str, kind, *args):
@ -6254,7 +6256,7 @@ def create_comment(apiurl: str, kind, comment, *args, **kwargs) -> Optional[str]
query["parent_id"] = kwargs.get("parent", None)
u = makeurl(apiurl, ["comments", kind] + list(args), query=query)
f = http_POST(u, data=comment)
ret = ET.fromstring(f.read()).find('summary')
ret = xml_fromstring(f.read()).find('summary')
if ret is None:
return None
return ret.text
@ -6263,7 +6265,7 @@ def create_comment(apiurl: str, kind, comment, *args, **kwargs) -> Optional[str]
def delete_comment(apiurl: str, cid: str) -> Optional[str]:
u = makeurl(apiurl, ['comment', cid])
f = http_DELETE(u)
ret = ET.fromstring(f.read()).find('summary')
ret = xml_fromstring(f.read()).find('summary')
if ret is None:
return None
return ret.text
@ -6378,7 +6380,7 @@ class MultibuildFlavorResolver:
if not s:
return result
root = ET.fromstring(s)
root = xml_fromstring(s)
for node in root.findall("flavor"):
result.add(node.text)
return result

View File

@ -159,7 +159,7 @@ class Package(XmlModel):
@classmethod
def get_revision_list(cls, apiurl: str, project: str, package: str, deleted: Optional[bool] = None, meta: Optional[bool] = None):
from xml.etree import ElementTree as ET
from ..util.xml import xml_parse
url_path = ["source", project, package, "_history"]
url_query = {
@ -167,7 +167,7 @@ class Package(XmlModel):
"deleted": deleted,
}
response = cls.xml_request("GET", apiurl, url_path, url_query)
root = ET.parse(response).getroot()
root = xml_parse(response).getroot()
assert root.tag == "revisionlist"
result = []
for node in root:

View File

@ -63,7 +63,7 @@ class Person(XmlModel):
state: Optional[str] = None,
**kwargs,
) -> List["Person"]:
from xml.etree import ElementTree as ET
from ..util.xml import xml_parse
from ..util.xpath import XPathQuery as Q
url_path = ["search", "person"]
@ -77,7 +77,7 @@ class Person(XmlModel):
),
}
response = cls.xml_request("GET", apiurl, url_path, url_query)
root = ET.parse(response).getroot()
root = xml_parse(response).getroot()
assert root.tag == "collection"
result = []
for node in root:

View File

@ -108,12 +108,12 @@ class Token(XmlModel):
@classmethod
def do_list(cls, apiurl: str, user: str):
from ..util.xml import ET
from ..util.xml import xml_parse
url_path = ["person", user, "token"]
url_query = {}
response = cls.xml_request("GET", apiurl, url_path, url_query)
root = ET.parse(response).getroot()
root = xml_parse(response).getroot()
assert root.tag == "directory"
result = []
for node in root:

View File

@ -11,6 +11,8 @@ from typing import Optional
from .. import conf
from .. import oscerr
from ..util.xml import ET
from ..util.xml import xml_fromstring
from ..util.xml import xml_parse
from .file import File
from .linkinfo import Linkinfo
from .serviceinfo import Serviceinfo
@ -413,7 +415,7 @@ class Package:
query.update({'cmd': 'commitfilelist', 'user': user, 'comment': msg})
u = makeurl(apiurl, ['source', project, package], query=query)
f = http_POST(u, data=ET.tostring(filelist, encoding=ET_ENCODING))
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
return root
@staticmethod
@ -616,7 +618,7 @@ class Package:
li.read(sfilelist.find('linkinfo'))
if li.xsrcmd5 is None:
raise oscerr.APIError(f'linkinfo has no xsrcmd5 attr:\n{ET.tostring(sfilelist, encoding=ET_ENCODING)}\n')
sfilelist = ET.fromstring(self.get_files_meta(revision=li.xsrcmd5))
sfilelist = xml_fromstring(self.get_files_meta(revision=li.xsrcmd5))
for i in sfilelist.findall('entry'):
if i.get('name') in self.skipped:
i.set('skipped', 'true')
@ -639,7 +641,7 @@ class Package:
sys.stdout.write('.')
sys.stdout.flush()
# does it make sense to add some delay?
sfilelist = ET.fromstring(http_GET(u).read())
sfilelist = xml_fromstring(http_GET(u).read())
# if sinfo is None another commit might have occured in the "meantime"
sinfo = sfilelist.find('serviceinfo')
print('')
@ -754,7 +756,7 @@ class Package:
fm = show_files_meta(self.apiurl, self.prjname, self.name, revision=revision, meta=self.meta)
# look for "too large" files according to size limit and mark them
root = ET.fromstring(fm)
root = xml_fromstring(fm)
for e in root.findall('entry'):
size = e.get('size')
if size and self.size_limit and int(size) > self.size_limit \
@ -797,7 +799,7 @@ class Package:
meta = self.get_local_meta()
if meta is None:
return self.prjname
root = ET.fromstring(meta)
root = xml_fromstring(meta)
return root.get('project')
def is_link_to_different_project(self):
@ -1125,7 +1127,7 @@ class Package:
raise oscerr.OscIOError(None, f'file \'{fname}\' is not under version control')
else:
fm = self.get_files_meta(revision=revision)
root = ET.fromstring(fm)
root = xml_fromstring(fm)
rfiles = self.__get_files(root)
# swap added and deleted
kept, deleted, added, services = self.__get_rev_changes(rfiles)
@ -1391,7 +1393,7 @@ rev: %s
in_update_files_path = os.path.join(self.storedir, "_in_update", "_files")
if os.path.isfile(in_update_files_path) and os.path.getsize(in_update_files_path) != 0:
print('resuming broken update...')
root = ET.parse(os.path.join(self.storedir, '_in_update', '_files')).getroot()
root = xml_parse(os.path.join(self.storedir, '_in_update', '_files')).getroot()
rfiles = self.__get_files(root)
kept, added, deleted, services = self.__get_rev_changes(rfiles)
# check if we aborted in the middle of a file update
@ -1445,7 +1447,7 @@ rev: %s
os.rmdir(os.path.join(self.storedir, '_in_update'))
# ok everything is ok (hopefully)...
fm = self.get_files_meta(revision=rev)
root = ET.fromstring(fm)
root = xml_fromstring(fm)
rfiles = self.__get_files(root)
store_write_string(self.absdir, '_files', fm + '\n', subdir='_in_update')
kept, added, deleted, services = self.__get_rev_changes(rfiles)
@ -1546,7 +1548,7 @@ rev: %s
si = Serviceinfo()
if os.path.exists('_service'):
try:
service = ET.parse(os.path.join(self.absdir, '_service')).getroot()
service = xml_parse(os.path.join(self.absdir, '_service')).getroot()
except ET.ParseError as v:
line, column = v.position
print(f'XML error in _service file on line {line}, column {column}')

View File

@ -6,6 +6,7 @@ from typing import Optional
from .. import conf
from .. import oscerr
from ..util.xml import ET
from ..util.xml import xml_parse
from .store import Store
from .store import delete_storedir
from .store import store
@ -277,7 +278,7 @@ class Project:
packages_file = os.path.join(self.absdir, store, '_packages')
if os.path.isfile(packages_file) and os.path.getsize(packages_file):
try:
result = ET.parse(packages_file)
result = xml_parse(packages_file)
except:
msg = f'Cannot read package file \'{packages_file}\'. '
msg += 'You can try to remove it and then run osc repairwc.'
@ -294,7 +295,7 @@ class Project:
and Package(pac_dir).name == data:
cur_pacs.append(ET.Element('package', name=data, state=' '))
store_write_initial_packages(self.absdir, self.name, cur_pacs)
return ET.parse(os.path.join(self.absdir, store, '_packages'))
return xml_parse(os.path.join(self.absdir, store, '_packages'))
def write_packages(self):
from ..core import ET_ENCODING

View File

@ -64,13 +64,14 @@ class Serviceinfo:
def getProjectGlobalServices(self, apiurl: str, project: str, package: str):
from ..core import http_POST
from ..core import makeurl
from ..util.xml import xml_parse
self.apiurl = apiurl
# get all project wide services in one file, we don't store it yet
u = makeurl(apiurl, ["source", project, package], query={"cmd": "getprojectservices"})
try:
f = http_POST(u)
root = ET.parse(f).getroot()
root = xml_parse(f).getroot()
self.read(root, True)
self.project = project
self.package = package

View File

@ -172,9 +172,11 @@ class Store:
self.write_string(fn, value, subdir=subdir)
def read_xml_node(self, fn, node_name, subdir=None):
from ..util.xml import xml_parse
path = self.get_path(fn, subdir=subdir)
try:
tree = ET.parse(path)
tree = xml_parse(path)
except SyntaxError as e:
msg = f"Unable to parse '{path}': {e}"
raise oscerr.NoWorkingCopy(msg)
@ -463,6 +465,8 @@ def is_package_dir(d):
def read_filemeta(dir):
from ..util.xml import xml_parse
global store
msg = f'\'{dir}\' is not a valid working copy.'
@ -475,7 +479,7 @@ def read_filemeta(dir):
raise oscerr.NoWorkingCopy(f'{msg} ({filesmeta} does not exist)')
try:
r = ET.parse(filesmeta)
r = xml_parse(filesmeta)
except SyntaxError as e:
raise oscerr.NoWorkingCopy(f'{msg}\nWhen parsing .osc/_files, the following error was encountered:\n{e}')
return r

View File

@ -567,7 +567,7 @@ class XmlModel(BaseModel):
"""
Instantiate model from string.
"""
root = ET.fromstring(string)
root = xml.xml_fromstring(string)
return cls.from_xml(root, apiurl=apiurl)
@classmethod
@ -575,7 +575,7 @@ class XmlModel(BaseModel):
"""
Instantiate model from file.
"""
root = ET.parse(file).getroot()
root = xml.xml_parse(file).getroot()
return cls.from_xml(root, apiurl=apiurl)
def to_bytes(self, *, with_comments: bool = False) -> bytes:

View File

@ -31,8 +31,10 @@ def primaryPath(directory):
:rtype: str
:raise IOError: if repomd.xml contains no primary location
"""
from .xml import xml_parse
metaDataPath = os.path.join(directory, "repodata", "repomd.xml")
elementTree = ET.parse(metaDataPath)
elementTree = xml_parse(metaDataPath)
root = elementTree.getroot()
for dataElement in root:
@ -56,10 +58,12 @@ def queries(directory):
:return: list of RepoDataQueryResult instances
:raise IOError: if repomd.xml contains no primary location
"""
from .xml import xml_parse
path = primaryPath(directory)
gunzippedPrimary = gzip.GzipFile(path)
elementTree = ET.parse(gunzippedPrimary)
elementTree = xml_parse(gunzippedPrimary)
root = elementTree.getroot()
packageQueries = []

View File

@ -2,8 +2,9 @@
Functions that manipulate with XML.
"""
import io
import xml.sax.saxutils
from typing import Union
from xml.etree import ElementTree as ET
@ -79,3 +80,61 @@ def xml_indent(root):
ET.indent(root)
else:
xml_indent_compat(root)
def _extend_parser_error_msg(e: ET.ParseError, text: Union[str, bytes]):
from ..output import tty
y, x = e.position
text = text.splitlines()[y-1][x-1:]
if isinstance(text, bytes):
text = text.decode("utf-8")
new_text = ""
for char in text:
if char >= " ":
new_text += char
continue
byte = ord(char)
char = f"0x{byte:0>2X}"
char = tty.colorize(char, "bg_red")
new_text += char
e.msg += ": " + new_text
def xml_fromstring(text: str):
"""
xml.etree.ElementTree.fromstring() wrapper that extends error message in ParseError
exceptions with a snippet of the broken XML.
"""
try:
return ET.fromstring(text)
except ET.ParseError as e:
_extend_parser_error_msg(e, text)
raise
def xml_parse(source):
"""
xml.etree.ElementTree.parse() wrapper that extends error message in ParseError
exceptions with a snippet of the broken XML.
"""
if isinstance(source, str):
# source is a file name
with open(source, "rb") as f:
data = f.read()
else:
# source is an IO object
data = source.read()
if isinstance(data, bytes):
f = io.BytesIO(data)
else:
f = io.StringIO(data)
try:
return ET.parse(f)
except ET.ParseError as e:
_extend_parser_error_msg(e, data)
raise

View File

@ -13,6 +13,7 @@ import urllib3.response
import osc.conf
import osc.core
from osc.util.xml import xml_fromstring
def urlcompare(url, *args):
@ -41,8 +42,8 @@ def urlcompare(url, *args):
def xml_equal(actual, exp):
try:
actual_xml = ET.fromstring(actual)
exp_xml = ET.fromstring(exp)
actual_xml = xml_fromstring(actual)
exp_xml = xml_fromstring(exp)
except ET.ParseError:
return False
todo = [(actual_xml, exp_xml)]
@ -257,7 +258,7 @@ class OscTestCase(unittest.TestCase):
with open(fname) as f:
files_exp = f.read()
self.assertXMLEqual(files_act, files_exp)
root = ET.fromstring(files_act)
root = xml_fromstring(files_act)
for i in root.findall('entry'):
if i.get('name') in skipfiles:
continue

View File

@ -4,6 +4,7 @@ from xml.etree import ElementTree as ET
import osc.core
import osc.oscerr
from osc.util.xml import xml_fromstring
from .common import OscTestCase
@ -263,7 +264,7 @@ class TestRequest(OscTestCase):
<person name="user" role="reader" />
<group name="group" role="reviewer" />
</action>"""
action = osc.core.Action.from_xml(ET.fromstring(xml))
action = osc.core.Action.from_xml(xml_fromstring(xml))
self.assertEqual(action.type, 'add_role')
self.assertEqual(action.tgt_project, 'foo')
self.assertEqual(action.tgt_package, 'bar')
@ -283,7 +284,7 @@ class TestRequest(OscTestCase):
<updatelink>1</updatelink>
</options>
</action>"""
action = osc.core.Action.from_xml(ET.fromstring(xml))
action = osc.core.Action.from_xml(xml_fromstring(xml))
self.assertEqual(action.type, 'submit')
self.assertEqual(action.src_project, 'foo')
self.assertEqual(action.src_package, 'bar')
@ -301,7 +302,7 @@ class TestRequest(OscTestCase):
<target package="baz" project="foobar" />
<acceptinfo rev="5" srcmd5="aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" xsrcmd5="ffffffffffffffffffffffffffffffff" />
</action>"""
action = osc.core.Action.from_xml(ET.fromstring(xml))
action = osc.core.Action.from_xml(xml_fromstring(xml))
self.assertEqual(action.type, 'submit')
self.assertEqual(action.src_project, 'testprj')
self.assertEqual(action.src_package, 'bar')
@ -320,13 +321,13 @@ class TestRequest(OscTestCase):
def test_action_from_xml_unknown_type(self):
"""try to create action from xml with unknown type"""
xml = '<action type="foo"><source package="bar" project="foo" /></action>'
self.assertRaises(osc.oscerr.WrongArgs, osc.core.Action.from_xml, ET.fromstring(xml))
self.assertRaises(osc.oscerr.WrongArgs, osc.core.Action.from_xml, xml_fromstring(xml))
def test_read_request1(self):
"""read in a request"""
xml = self._get_fixture('test_read_request1.xml')
r = osc.core.Request()
r.read(ET.fromstring(xml))
r.read(xml_fromstring(xml))
self.assertEqual(r.reqid, '42')
self.assertEqual(r.actions[0].type, 'submit')
self.assertEqual(r.actions[0].src_project, 'foo')
@ -357,7 +358,7 @@ class TestRequest(OscTestCase):
"""read in a request (with reviews)"""
xml = self._get_fixture('test_read_request2.xml')
r = osc.core.Request()
r.read(ET.fromstring(xml))
r.read(xml_fromstring(xml))
self.assertEqual(r.reqid, '123')
self.assertEqual(r.actions[0].type, 'submit')
self.assertEqual(r.actions[0].src_project, 'xyz')
@ -404,7 +405,7 @@ class TestRequest(OscTestCase):
<description></description>
</request>"""
r = osc.core.Request()
r.read(ET.fromstring(xml))
r.read(xml_fromstring(xml))
self.assertEqual(r.reqid, '2')
self.assertEqual(r.actions[0].type, 'set_bugowner')
self.assertEqual(r.actions[0].tgt_project, 'foo')
@ -442,14 +443,14 @@ class TestRequest(OscTestCase):
delete: deleteme
delete: foo/bar\n"""
r = osc.core.Request()
r.read(ET.fromstring(xml))
r.read(xml_fromstring(xml))
self.assertEqual(exp, r.list_view())
def test_request_list_view2(self):
"""test the list_view method (with history elements and description)"""
xml = self._get_fixture('test_request_list_view2.xml')
r = osc.core.Request()
r.read(ET.fromstring(xml))
r.read(xml_fromstring(xml))
exp = """\
21 State:accepted By:foobar When:2010-12-29T16:37:45
Created by: foobar
@ -465,7 +466,7 @@ class TestRequest(OscTestCase):
xml = self._get_fixture('test_request_str1.xml')
r = osc.core.Request()
r = osc.core.Request()
r.read(ET.fromstring(xml))
r.read(xml_fromstring(xml))
self.assertEqual(r.creator, 'creator')
exp = """\
Request: 123
@ -510,7 +511,7 @@ History:
<state name="new" when="2010-12-29T00:11:22" who="creator" />
</request>"""
r = osc.core.Request()
r.read(ET.fromstring(xml))
r.read(xml_fromstring(xml))
self.assertEqual(r.creator, 'creator')
exp = """\
Request: 98765
@ -538,7 +539,7 @@ State:
<state name="new" when="2010-12-30T02:11:22" who="olduser" />
</request>"""
r = osc.core.Request()
r.read(ET.fromstring(xml))
r.read(xml_fromstring(xml))
self.assertEqual(r.reqid, '1234')
self.assertEqual(r.actions[0].type, 'submit')
self.assertEqual(r.actions[0].src_project, 'foobar')
@ -566,7 +567,7 @@ State:
"""test get_actions method"""
xml = self._get_fixture('test_request_list_view1.xml')
r = osc.core.Request()
r.read(ET.fromstring(xml))
r.read(xml_fromstring(xml))
sr_actions = r.get_actions('submit')
self.assertTrue(len(sr_actions) == 2)
for i in sr_actions: