Gustavo Pichorim Boiko 9866e35bd7 Ignore patchinfo packages when checking for duplicated binaries
In the staging areas used by maintenance, the patchinfo packages are staged together
with the source packages, and because of the way they work, they collect binaries
from the source packages around.

This causes binaries to be reported as duplicated, so ignore patchinfos.
2024-07-05 13:39:35 +00:00

1401 lines
49 KiB
Python

from collections import namedtuple
from datetime import datetime, timezone
from dateutil.parser import parse as date_parse
import re
import socket
import logging
from typing import List, Optional, Tuple, Union
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
from lxml import etree as ET
from urllib.error import HTTPError
from osc.core import ReviewState, create_submit_request
from osc.core import get_binarylist
from osc.core import get_commitlog
from osc.core import get_dependson
from osc.connection import http_DELETE
from osc.connection import http_GET
from osc.connection import http_POST
from osc.connection import http_PUT
from osc.core import makeurl
from osc.core import owner
from osc.core import search as osc_core_search
from osc.core import Request
from osc.core import Action
from osc.core import show_package_meta
from osc.core import show_project_meta
from osc.core import show_results_meta
from osc.core import xpath_join
from osc.util.helper import decode_it
from osc import conf
from osclib.conf import Config
from osclib.memoize import memoize
import traceback
BINARY_REGEX = r'(?:.*::)?(?P<filename>(?P<name>.*)-(?P<version>[^-]+)-(?P<release>[^-]+)\.(?P<arch>[^-\.]+))'
RPM_REGEX = BINARY_REGEX + r'\.rpm'
BinaryParsed = namedtuple('BinaryParsed', ('package', 'filename', 'name', 'arch'))
REQUEST_STATES_MINUS_ACCEPTED = ['new', 'review', 'declined', 'revoked', 'superseded']
@memoize(session=True)
def group_members(apiurl, group, maintainers=False):
url = makeurl(apiurl, ['group', group])
root = ET.parse(http_GET(url)).getroot()
if maintainers:
return root.xpath('maintainer/@userid')
return root.xpath('person/person/@userid')
def groups_members(apiurl, groups):
members = []
for group in groups:
members.extend(group_members(apiurl, group))
return members
def get_request_list_with_history(
apiurl, project='', package='', req_who='', req_state=('new', 'review', 'declined'),
req_type=None, exclude_target_projects=[]):
"""using an xpath search to get full request history. deprecated copy of old code from osc 0.x."""
import warnings
warnings.warn(
"get_request_list_with_history() uses an xpath search, which is slow. consider porting to"
"use osc.core.get_request_collection() instead.",
DeprecationWarning
)
xpath = ''
if 'all' not in req_state:
for state in req_state:
xpath = xpath_join(xpath, f'state/@name=\'{state}\'', inner=True)
if req_who:
xpath = xpath_join(xpath, '(state/@who=\'%(who)s\' or history/@who=\'%(who)s\')' % {'who': req_who}, op='and')
# XXX: we cannot use the '|' in the xpath expression because it is not supported
# in the backend
todo = {}
if project:
todo['project'] = project
if package:
todo['package'] = package
for kind, val in todo.items():
xpath_base = 'action/target/@%(kind)s=\'%(val)s\''
if conf.config['include_request_from_project']:
xpath_base = xpath_join(xpath_base, 'action/source/@%(kind)s=\'%(val)s\'', op='or', inner=True)
xpath = xpath_join(xpath, xpath_base % {'kind': kind, 'val': val}, op='and', nexpr_parentheses=True)
if req_type:
xpath = xpath_join(xpath, f'action/@type=\'{req_type}\'', op='and')
for i in exclude_target_projects:
xpath = xpath_join(xpath, f'(not(action/target/@project=\'{i}\'))', op='and')
if conf.config['verbose'] > 1:
print(f'[ {xpath} ]')
queries = {}
queries['request'] = {'withfullhistory': '1'}
res = osc_core_search(apiurl, queries=queries, request=xpath)
collection = res['request']
requests = []
for root in collection.findall('request'):
r = Request()
r.read(root)
requests.append(r)
return requests
def convert_from_osc_et(xml):
"""osc uses xml.etree while we rely on lxml."""
from xml.etree import ElementTree as oscET
return ET.fromstring(oscET.tostring(xml))
@memoize(session=True)
def owner_fallback(apiurl, project, package):
root = owner(apiurl, package, project=project)
entry = root.find('owner') if root else None
if not entry or project.startswith(entry.get('project')):
# Fallback to global (ex Factory) maintainer.
root = owner(apiurl, package)
return convert_from_osc_et(root)
@memoize(session=True)
def maintainers_get(apiurl, project, package=None):
if package is None:
meta = ET.fromstringlist(show_project_meta(apiurl, project))
maintainers = meta.xpath('//person[@role="maintainer"]/@userid')
groups = meta.xpath('//group[@role="maintainer"]/@groupid')
maintainers.extend(groups_members(apiurl, groups))
return maintainers
root = owner_fallback(apiurl, project, package)
maintainers = root.xpath('//person[@role="maintainer"]/@name')
groups = root.xpath('//group[@role="maintainer"]/@name')
maintainers.extend(groups_members(apiurl, groups))
return maintainers
@memoize(session=True)
def package_role_expand(apiurl, project, package, role='maintainer', inherit=True):
"""
All users with a certain role on a package, including those who have the role directly assigned
and those who are part of a group with that role.
"""
meta = ET.fromstringlist(show_package_meta(apiurl, project, package))
users = meta_role_expand(apiurl, meta, role)
if inherit:
users.extend(project_role_expand(apiurl, project, role))
return users
@memoize(session=True)
def project_role_expand(apiurl, project, role='maintainer'):
"""
All users with a certain role on a project, including those who have the role directly assigned
and those who are part of a group with that role.
"""
meta = ET.fromstringlist(show_project_meta(apiurl, project))
return meta_role_expand(apiurl, meta, role)
def meta_role_expand(apiurl, meta, role='maintainer'):
users = meta.xpath(f'//person[@role="{role}"]/@userid')
groups = meta.xpath(f'//group[@role="{role}"]/@groupid')
users.extend(groups_members(apiurl, groups))
return users
def package_list(apiurl, project, expand=True):
query = {}
if expand:
query['expand'] = 1
url = makeurl(apiurl, ['source', project], query)
root = ET.parse(http_GET(url)).getroot()
packages = []
for package in root.findall('entry'):
packages.append(package.get('name'))
return sorted(packages)
@memoize(session=True)
def target_archs(apiurl, project, repository='standard'):
meta = ET.fromstringlist(show_project_meta(apiurl, project))
return meta.xpath(f'repository[@name="{repository}"]/arch/text()')
@memoize(session=True)
def depends_on(apiurl, project, repository, packages=None, reverse=None):
dependencies = set()
for arch in target_archs(apiurl, project, repository):
root = ET.fromstring(get_dependson(apiurl, project, repository, arch, packages, reverse))
dependencies.update(pkgdep.text for pkgdep in root.findall('.//pkgdep'))
return dependencies
def request_when_staged(request, project, first=False):
when = None
for history in request.statehistory:
if project in history.comment:
when = history.when
return date_parse(when)
def binary_list(apiurl, project, repository, arch, package=None):
parsed = []
for binary in get_binarylist(apiurl, project, repository, arch, package):
result = re.match(RPM_REGEX, binary)
if not result:
continue
name = result.group('name')
if name.endswith('-debuginfo') or name.endswith('-debuginfo-32bit'):
continue
if name.endswith('-debugsource'):
continue
if result.group('arch') == 'src':
continue
parsed.append(BinaryParsed(package, result.group('filename'), name, result.group('arch')))
return parsed
@memoize(session=True)
def package_binary_list(apiurl, project, repository, arch, package=None, strip_multibuild=True, exclude_src_debug=False,
exclude_patchinfo=False):
path = ['build', project, repository, arch]
if package:
path.append(package)
url = makeurl(apiurl, path, {'view': 'binaryversions'})
root = ET.parse(http_GET(url)).getroot()
package_binaries = []
binary_map = {} # last duplicate wins
for binary_list in root:
package = binary_list.get('package')
if strip_multibuild:
package = package.split(':', 1)[0]
if exclude_patchinfo and package.startswith("patchinfo."):
continue
for binary in binary_list:
filename = binary.get('name')
result = re.match(RPM_REGEX, filename)
if not result:
continue
binary = BinaryParsed(package, result.group('filename'),
result.group('name'), result.group('arch'))
if exclude_src_debug and binary_src_debug(binary):
continue
package_binaries.append(binary)
binary_map[result.group('filename')] = package
return package_binaries, binary_map
def binary_src_debug(binary):
return (
binary.arch == 'src' or
binary.arch == 'nosrc' or
binary.name.endswith('-debuginfo') or
binary.name.endswith('-debugsource')
)
@memoize(session=True)
def devel_project_get(apiurl: str, target_project: str, target_package: str) -> Union[Tuple[str, str], Tuple[Literal[None], Literal[None]]]:
"""Fetch the devel project & package name of the supplied ``target_project``
and ``target_package`` and return the devel project and devel package name
as a tuple. If no devel project has been defined, then return ``None,
None``.
"""
try:
meta = ET.fromstringlist(show_package_meta(apiurl, target_project, target_package))
node = meta.find('devel')
if node is not None:
return node.get('project'), node.get('package')
except HTTPError as e:
if e.code != 404:
raise e
return None, None
@memoize(session=True)
def devel_project_fallback(apiurl, target_project, target_package):
project, package = devel_project_get(apiurl, target_project, target_package)
if project is None and target_project != 'openSUSE:Factory':
if target_project.startswith('openSUSE:'):
project, package = devel_project_get(apiurl, 'openSUSE:Factory', target_package)
elif target_project.startswith('SUSE:'):
# For SLE (assume IBS), fallback to openSUSE:Factory devel projects.
project, package = devel_project_get(apiurl, 'openSUSE.org:openSUSE:Factory', target_package)
if project:
# Strip openSUSE.org: prefix since string since not used for lookup.
project = project.split(':', 1)[1]
return project, package
@memoize(session=True)
def devel_projects(apiurl, project):
devel_projects = set()
root = search(apiurl, 'package', f"@project='{project}' and devel/@project!=''")
for devel_project in root.xpath('package/devel/@project'):
if devel_project != project:
devel_projects.add(devel_project)
return sorted(devel_projects)
def request_created(request):
if isinstance(request, Request):
created = request.statehistory[0].when
else:
created = request.find('history').get('when')
return date_parse(created)
def request_age(request):
return datetime.utcnow() - request_created(request)
def project_list_prefix(apiurl, prefix):
"""Get a list of project with the same prefix."""
query = {'match': f'starts-with(@name, "{prefix}")'}
url = makeurl(apiurl, ['search', 'project', 'id'], query)
root = ET.parse(http_GET(url)).getroot()
return root.xpath('project/@name')
def project_locked(apiurl, project):
meta = ET.fromstringlist(show_project_meta(apiurl, project))
return meta.find('lock/enable') is not None
#
# Depdendency helpers
#
def fileinfo_ext_all(apiurl, project, repo, arch, package):
url = makeurl(apiurl, ['build', project, repo, arch, package])
binaries = ET.parse(http_GET(url)).getroot()
for binary in binaries.findall('binary'):
filename = binary.get('filename')
if not filename.endswith('.rpm'):
continue
if filename.endswith('.src.rpm'):
continue
if '-debuginfo-' in filename or '-debugsource-' in filename:
continue
yield fileinfo_ext(apiurl, project, repo, arch, package, filename)
def fileinfo_ext(apiurl, project, repo, arch, package, filename):
url = makeurl(apiurl,
['build', project, repo, arch, package, filename],
{'view': 'fileinfo_ext'})
return ET.parse(http_GET(url)).getroot()
def builddepinfo(apiurl, project, repo, arch, order=False):
query = {}
if order:
query['view'] = 'order'
url = makeurl(apiurl, ['build', project, repo, arch, '_builddepinfo'], query)
return ET.parse(http_GET(url)).getroot()
def entity_email(apiurl, key, entity_type='person', include_name=False):
url = makeurl(apiurl, [entity_type, key])
root = ET.parse(http_GET(url)).getroot()
email = root.find('email')
if email is None:
return None
email = email.text
realname = root.find('realname')
if include_name and realname is not None:
email = f'{realname.text} <{email}>'
return email
def source_file_load(apiurl, project, package, filename, revision=None):
query = {'expand': 1}
if revision:
query['rev'] = revision
url = makeurl(apiurl, ['source', project, package, filename], query)
try:
return decode_it(http_GET(url).read())
except HTTPError:
return None
def source_file_save(apiurl, project, package, filename, content, comment=None):
comment = message_suffix('updated', comment)
url = makeurl(apiurl, ['source', project, package, filename], {'comment': comment})
http_PUT(url, data=content)
def source_file_ensure(apiurl, project, package, filename, content, comment=None):
if content != source_file_load(apiurl, project, package, filename):
source_file_save(apiurl, project, package, filename, content, comment)
def project_pseudometa_package(apiurl, project):
package = Config.get(apiurl, project).get('pseudometa_package', '00Meta')
if '/' in package:
project, package = package.split('/', 2)
return project, package
def project_pseudometa_file_load(apiurl, project, filename, revision=None):
project, package = project_pseudometa_package(apiurl, project)
source_file = source_file_load(apiurl, project, package, filename, revision)
if source_file is not None:
source_file = source_file.rstrip()
return source_file
def project_pseudometa_file_save(apiurl, project, filename, content, comment=None):
project, package = project_pseudometa_package(apiurl, project)
source_file_save(apiurl, project, package, filename, content, comment)
def project_pseudometa_file_ensure(apiurl, project, filename, content, comment=None):
if content != project_pseudometa_file_load(apiurl, project, filename):
project_pseudometa_file_save(apiurl, project, filename, content, comment)
# Should be an API call that says give me "real" packages that does not include
# multibuild entries, nor linked packages, nor maintenance update packages, but
# does included inherited packages from project layering. Unfortunately, no such
# call provides either server-side filtering nor enough information to filter
# client-side. As such extra calls must be made for each package to handle the
# various different cases that can exist between products. For a more detailed
# write-up see the opensuse-buildservice mailing list thread:
# https://lists.opensuse.org/opensuse-buildservice/2019-05/msg00020.html.
def package_list_kind_filtered(apiurl, project, kinds_allowed=['source']):
query = {
'view': 'info',
'nofilename': '1',
}
url = makeurl(apiurl, ['source', project], query)
root = ET.parse(http_GET(url)).getroot()
for package in root.xpath('sourceinfo/@package'):
kind = package_kind(apiurl, project, package)
if kind not in kinds_allowed:
continue
yield package
def attribute_value_load(
apiurl: str,
project: str,
name: str,
namespace: str = 'OSRT',
package: Optional[str] = None) -> Optional[Union[str, Literal[True]]]:
"""Fetch an attribute from the OBS instance with the given ``apiurl`` for
the given ``project`` (and optionally for a package if a package name is
provided). The attribute is expected to have the name ``$namespace:$name``,
with the namespace defaulting to ``OSRT``.
Attributes are stored as xml and can be either strings or are interpreted as
``True`` if only the element is present but has no other children or
entries.
"""
path = list(filter(None, ['source', project, package, '_attribute', namespace + ':' + name]))
url = makeurl(apiurl, path)
try:
root = ET.parse(http_GET(url)).getroot()
except HTTPError as e:
if e.code == 404:
return None
raise e
xpath_base = f'./attribute[@namespace="{namespace}" and @name="{name}"]'
value = root.xpath(f'{xpath_base}/value/text()')
if not len(value):
if root.xpath(xpath_base):
# Handle boolean attributes that are present, but have no value.
return True
return None
return str(value[0])
# New attributes must be defined manually before they can be used. Example:
# `osc api /attribute/OSRT/IgnoredIssues/_meta outputs`
#
# The new attribute can be created via:
# `api -T $xml /attribute/OSRT/$NEWATTRIBUTE/_meta`
#
# Remember to create for both OBS and IBS as necessary.
def attribute_value_save(
apiurl: str,
project: str,
name: str,
value: str,
namespace='OSRT',
package: Optional[str] = None
) -> None:
root = ET.Element('attributes')
attribute = ET.SubElement(root, 'attribute')
attribute.set('namespace', namespace)
attribute.set('name', name)
ET.SubElement(attribute, 'value').text = value
# The OBS API of attributes is super strange, POST to update.
url = makeurl(apiurl, list(filter(None, ['source', project, package, '_attribute'])))
try:
http_POST(url, data=ET.tostring(root))
except HTTPError as e:
if e.code == 404:
logging.error(f"Saving attribute {namespace}:{name} to {project} failed. You may need to create the type on your instance.")
raise e
def attribute_value_delete(apiurl: str, project: str, name: str, namespace='OSRT', package: Optional[str] = None) -> None:
http_DELETE(makeurl(
apiurl, list(filter(None, ['source', project, package, '_attribute', namespace + ':' + name]))))
@memoize(session=True)
def repository_path_expand(apiurl: str, project: str, repo: str, visited_repos: Optional[set] = None):
"""Recursively list underlying projects."""
if visited_repos is None:
visited_repos = set()
repos = [[project, repo]]
meta = ET.fromstringlist(show_project_meta(apiurl, project))
paths = meta.findall(f'.//repository[@name="{repo}"]/path')
# The listed paths are taken as-is, except for the last one...
for path in paths[:-1]:
repos += [[path.get('project', project), path.get('repository')]]
# ...which is expanded recursively
if len(paths) > 0:
p_project = paths[-1].get('project', project)
p_repository = paths[-1].get('repository')
if (p_project, p_repository) not in visited_repos:
visited_repos.add((p_project, p_repository))
repos += repository_path_expand(apiurl, p_project, p_repository, visited_repos)
return repos
@memoize(session=True)
def repository_path_search(apiurl, project, search_project, search_repository):
queue = []
# Initialize breadth first search queue with repositories from top project.
root = ET.fromstringlist(show_project_meta(apiurl, project))
for repository in root.xpath('repository[path[@project and @repository]]/@name'):
queue.append((repository, project, repository))
# Perform a breadth first search and return the first repository chain with
# a series of path elements targeting search project and repository.
for repository_top, project, repository in queue:
if root.get('name') != project:
# Repositories for a single project are in a row so cache parsing.
root = ET.fromstringlist(show_project_meta(apiurl, project))
paths = root.findall(f'repository[@name="{repository}"]/path')
for path in paths:
if path.get('project') == search_project and path.get('repository') == search_repository:
return repository_top
queue.append((repository_top, path.get('project'), path.get('repository')))
return None
def repository_arch_state(apiurl, project, repository, arch):
# just checking the mtimes of the repository's binaries
url = makeurl(apiurl, ['build', project, repository, arch, '_repository'])
from osclib.util import sha1_short
try:
return sha1_short(http_GET(url).read())
except HTTPError as e:
# e.g. staging projects inherit the project config from 'ports' repository.
# but that repository does not contain the archs we want, as such it has no state
if e.code != 404:
raise e
def repository_state(apiurl, project, repository, archs=[]):
if not len(archs):
archs = target_archs(apiurl, project, repository)
# Unfortunately, the state hash reflects the published state and not the
# binaries published in repository. As such request binary list and hash.
combined_state = []
for arch in archs:
state = repository_arch_state(apiurl, project, repository, arch)
if state:
combined_state.append(state)
from osclib.util import sha1_short
return sha1_short(combined_state)
def repositories_states(apiurl, repository_pairs, archs=[]):
states = []
for project, repository in repository_pairs:
state = repository_state(apiurl, project, repository, archs)
if state:
states.append(state)
return states
def repository_published(apiurl, project, repository, archs=[]):
# In a perfect world this would check for the existence of imports from i586
# into x86_64, but in an even more perfect world OBS would show archs that
# depend on another arch for imports as not completed until the dependent
# arch completes. This is a simplified check that ensures x86_64 repos are
# not indicated as published when i586 has not finished which is primarily
# useful for repo_checker when only checking x86_64. The API treats archs as
# a filter on what to return and thus non-existent archs do not cause an
# issue nor alter the result.
if 'x86_64' in archs and 'i586' not in archs:
# Create a copy to avoid altering caller's list.
archs = list(archs)
archs.append('i586')
root = ET.fromstringlist(show_results_meta(
apiurl, project, multibuild=True, repository=[repository], arch=archs))
return not len(root.xpath('result[@state!="published" and @state!="unpublished"]'))
def repositories_published(apiurl, repository_pairs, archs=[]):
for project, repository in repository_pairs:
if not repository_published(apiurl, project, repository, archs):
return (project, repository)
return True
def project_meta_revision(apiurl, project):
root = ET.fromstringlist(get_commitlog(
apiurl, project, '_project', None, format='xml', meta=True))
return int(root.find('logentry').get('revision'))
def package_source_changed(apiurl, project, package):
url = makeurl(apiurl, ['source', project, package, '_history'], {'limit': 1})
root = ET.parse(http_GET(url)).getroot()
return datetime.fromtimestamp(int(root.find('revision/time').text), timezone.utc).replace(tzinfo=None)
def package_source_age(apiurl, project, package):
return datetime.utcnow() - package_source_changed(apiurl, project, package)
def entity_exists(apiurl: str, project: str, package: Optional[str] = None) -> bool:
"""Check whether the project or the project/package exists on the OBS
instance behind the supplied apiurl and return True if it exists or False
otherwise.
"""
try:
http_GET(makeurl(apiurl, list(filter(None, ['source', project, package])) + ['_meta']))
except HTTPError as e:
if e.code == 404:
return False
raise e
return True
def package_kind(
apiurl, project, package
) -> Optional[Literal['meta', 'multibuild_subpackage', 'patchinfo', 'maintenance_update', 'multispec_subpackage', 'source']]:
if package.startswith('00') or package.startswith('_'):
return 'meta'
if ':' in package:
return 'multibuild_subpackage'
if package.startswith('patchinfo.'):
return 'patchinfo'
try:
url = makeurl(apiurl, ['source', project, package, '_meta'])
root = ET.parse(http_GET(url)).getroot()
except HTTPError as e:
if e.code == 404:
return None
raise e
if root.find('releasename') is not None and root.find('releasename').text != package:
return 'maintenance_update'
# Some multispec subpackages do not have bcntsynctag, so check link.
link = entity_source_link(apiurl, project, package)
if link is not None and link.get('cicount') == 'copy':
kind_target = package_kind(apiurl, project, link.get('package'))
if kind_target != 'maintenance_update':
# If a multispec subpackage was updated via a maintenance update the
# proper link information is lost and it will be considered source.
return 'multispec_subpackage'
return 'source'
def entity_source_link(apiurl, project, package=None):
try:
if package:
parts = ['source', project, package, '_link']
else:
parts = ['source', project, '_meta']
url = makeurl(apiurl, parts)
root = ET.parse(http_GET(url)).getroot()
except HTTPError as e:
if e.code == 404:
return None
raise e
return root if package else root.find('link')
@memoize(session=True)
def package_source_link_copy(apiurl, project, package):
link = entity_source_link(apiurl, project, package)
return link is not None and link.get('cicount') == 'copy'
# Ideally, all package_source_hash* functions would operate on srcmd5, but
# unfortunately that is not practical for real use-cases. The srcmd5 includes
# service run information in addition to the presence of a link even if the
# expanded sources are identical. The verifymd5 sum excludes such information
# and only covers the sources (as should be the point), but looks at the link
# sources which means for projects like devel which link to the head revision of
# downstream all the verifymd5 sums are the same. This makes the summary md5s
# provided by OBS useless for comparing source and really anything. Instead the
# individual file md5s are used to generate a sha1 which is used for comparison.
# In the case of maintenance projects they are structured such that the updates
# are suffixed packages and the unsuffixed package is empty and only links to
# a specific suffixed package each revision. As such for maintenance projects
# the link must be expanded and is safe to do so. Additionally, projects that
# inherit packages need to same treatment (ie. expanding) until they are
# overridden within the project.
@memoize(session=True)
def package_source_hash(apiurl, project, package, revision=None):
query = {}
if revision:
query['rev'] = revision
# Will not catch packages that previous had a link, but no longer do.
if package_source_link_copy(apiurl, project, package):
query['expand'] = 1
try:
url = makeurl(apiurl, ['source', project, package], query)
root = ET.parse(http_GET(url)).getroot()
except HTTPError as e:
if e.code == 400 or e.code == 404:
# 400: revision not found, 404: package not found.
return None
raise e
if revision and root.find('error') is not None:
# OBS returns XML error instead of HTTP 404 if revision not found.
return None
from osclib.util import sha1_short
return sha1_short(root.xpath('entry[@name!="_link"]/@md5'))
def package_source_hash_history(apiurl, project, package, limit=5, include_project_link=False):
try:
# get_commitlog() reverses the order so newest revisions are first.
root = ET.fromstringlist(
get_commitlog(apiurl, project, package, None, format='xml'))
except HTTPError as e:
if e.code == 404:
return
raise e
if include_project_link:
source_hashes = []
source_md5s = root.xpath('logentry/@srcmd5')
for source_md5 in source_md5s[:limit]:
source_hash = package_source_hash(apiurl, project, package, source_md5)
yield source_hash
if include_project_link:
source_hashes.append(source_hash)
if include_project_link and (not limit or len(source_md5s) < limit):
link = entity_source_link(apiurl, project)
if link is None:
return
project = link.get('project')
if limit:
limit_remaining = limit - len(source_md5s)
# Allow small margin for duplicates.
for source_hash in package_source_hash_history(apiurl, project, package, None, True):
if source_hash in source_hashes:
continue
yield source_hash
if limit:
limit_remaining += -1
if limit_remaining == 0:
break
def package_version(apiurl, project, package):
try:
url = makeurl(apiurl, ['source', project, package, '_history'], {'limit': 1})
root = ET.parse(http_GET(url)).getroot()
except HTTPError as e:
if e.code == 404:
return False
raise e
return str(root.xpath('(//version)[last()]/text()')[0])
def project_attribute_list(apiurl, attribute, locked=None):
xpath = f'attribute/@name="{attribute}"'
root = search(apiurl, 'project', xpath)
for project in root.xpath('project/@name'):
# Locked not exposed via OBS xpath engine.
if locked is not None and project_locked(apiurl, project) != locked:
continue
yield project
# OBS xpath engine does not support multiple attribute queries nor negation. As
# such both must be done client-side.
def project_attributes_list(apiurl, attributes, attributes_not=None, locked=None):
projects = set()
for attribute in attributes:
projects.update(project_attribute_list(apiurl, attribute, locked))
for attribute in attributes_not:
projects.difference_update(project_attribute_list(apiurl, attribute, locked))
return list(projects)
@memoize(session=True)
def project_remote_list(apiurl):
remotes = {}
root = search(apiurl, 'project', 'starts-with(remoteurl, "http")')
for project in root.findall('project'):
# Strip ending /public as the only use-cases for manually checking
# remote projects is to query them directly to use an API that does not
# work over the interconnect. As such /public will have same problem.
remotes[project.get('name')] = re.sub('/public$', '', project.find('remoteurl').text)
return remotes
def project_remote_apiurl(apiurl, project):
remotes = project_remote_list(apiurl)
for remote in remotes:
if project.startswith(remote + ':'):
return remotes[remote], project[len(remote) + 1:]
return apiurl, project
def project_remote_prefixed(apiurl, apiurl_remote, project):
if apiurl_remote == apiurl:
return project
remotes = project_remote_list(apiurl)
for remote, remote_apiurl in remotes.items():
if remote_apiurl == apiurl_remote:
return remote + ':' + project
raise Exception(f'remote APIURL interconnect not configured for{apiurl_remote}')
def review_find_last(request, user, states=['all']):
for review in reversed(request.reviews):
if review.by_user == user and ('all' in states or review.state in states):
return review
return None
def reviews_remaining(request: Request, incident_psuedo=False) -> List[ReviewState]:
reviews = []
for review in request.reviews:
if review.state != 'accepted':
reviews.append(review_short(review))
if incident_psuedo:
# Add review in the same style as the staging review used for non
# maintenance projects to allow for the same wait on review.
for action in request.actions:
if action.type == 'maintenance_incident':
reviews.append('maintenance_incident')
break
return reviews
def review_short(review):
if review.by_user:
return review.by_user
if review.by_group:
return review.by_group
if review.by_project:
if review.by_package:
return '/'.join([review.by_project, review.by_package])
return review.by_project
return None
def issue_trackers(apiurl):
url = makeurl(apiurl, ['issue_trackers'])
root = ET.parse(http_GET(url)).getroot()
trackers = {}
for tracker in root.findall('issue-tracker'):
trackers[tracker.find('name').text] = tracker.find('label').text
return trackers
def issue_tracker_by_url(apiurl: str, tracker_url: str) -> Optional[str]:
url = makeurl(apiurl, ['issue_trackers'])
root = ET.parse(http_GET(url)).getroot()
if not tracker_url.endswith('/'):
# All trackers are formatted with trailing slash.
tracker_url += '/'
return next(iter(root.xpath(f'issue-tracker[url[text()="{tracker_url}"]]') or []), None)
def issue_tracker_label_apply(tracker, identifier):
return tracker.find('label').text.replace('@@@', identifier)
def request_remote_identifier(apiurl: str, apiurl_remote: str, request_id: str) -> str:
if apiurl_remote == apiurl:
return f'request#{request_id}'
# The URL differences make this rather convoluted.
tracker = issue_tracker_by_url(apiurl, apiurl_remote.replace('api.', 'build.'))
if tracker is not None:
return issue_tracker_label_apply(tracker, request_id)
return request_id
def duplicated_binaries_in_repo(apiurl, project, repository):
duplicates = {}
for arch in sorted(target_archs(apiurl, project, repository), reverse=True):
package_binaries, _ = package_binary_list(
apiurl, project, repository, arch,
strip_multibuild=False, exclude_src_debug=True,
exclude_patchinfo=True)
binaries = {}
for pb in package_binaries:
if pb.arch != 'noarch' and pb.arch != arch:
continue
binaries.setdefault(arch, {})
if pb.name in binaries[arch]:
duplicates.setdefault(str(arch), {})
duplicates[arch].setdefault(pb.name, set())
duplicates[arch][pb.name].add(pb.package)
duplicates[arch][pb.name].add(binaries[arch][pb.name])
continue
binaries[arch][pb.name] = pb.package
# convert sets to lists for readable yaml
for arch in duplicates.keys():
for name in duplicates[arch].keys():
duplicates[arch][name] = list(duplicates[arch][name])
return duplicates
# osc.core.search() is over-complicated and does not return lxml element.
def search(apiurl, path, xpath, query={}):
query['match'] = xpath
url = makeurl(apiurl, ['search', path], query)
return ET.parse(http_GET(url)).getroot()
def action_is_patchinfo(action):
return (action.type == 'maintenance_incident' and (
action.src_package == 'patchinfo' or action.src_package.startswith('patchinfo.')))
def request_action_key(action: Action) -> str:
identifier = []
if action.type in ['add_role', 'change_devel', 'maintenance_release', 'set_bugowner', 'submit']:
identifier.append(action.tgt_project)
if action.tgt_package is not None:
identifier.append(action.tgt_package)
if action.type in ['add_role', 'set_bugowner']:
if action.person_name is not None:
identifier.append(action.person_name)
if action.type == 'add_role':
identifier.append(action.person_role)
else:
identifier.append(action.group_name)
if action.type == 'add_role':
identifier.append(action.group_role)
elif action.type == 'delete':
identifier.append(action.tgt_project)
if action.tgt_package is not None:
identifier.append(action.tgt_package)
elif action.tgt_repository is not None:
identifier.append(action.tgt_repository)
elif action.type == 'maintenance_incident':
if not action_is_patchinfo(action):
identifier.append(action.tgt_releaseproject)
identifier.append(action.src_package)
return '::'.join(['/'.join(identifier), action.type])
def request_action_list_maintenance_incident(apiurl, project, package, states=['new', 'review']):
# The maintenance workflow seems to be designed to be as difficult to find
# requests as possible. As such, in order to find incidents for a given
# target project one must search for the requests in two states: before and
# after being assigned to an incident project. Additionally, one must search
# the "maintenance projects" denoted by an attribute instead of the actual
# target project. To make matters worse the actual target project of the
# request is not accessible via search (ie. action/target/releaseproject)
# so it must be checked client side. Lastly, since multiple actions are also
# designed completely wrong one must loop over the actions and recheck the
# search parameters to figure out which action caused the request to be
# included in the search results. Overall, another prime example of design
# done completely and utterly wrong.
package_repository = f"{package}.{project.replace(':', '_')}"
# Loop over all maintenance projects and create selectors for the two
# request states for the given project.
xpath = ''
for maintenance_project in project_attribute_list(apiurl, 'OBS:MaintenanceProject'):
xpath_project = ''
# Before being assigned to an incident.
xpath_project = xpath_join(xpath_project, 'action/target/@project="{}"'.format(
maintenance_project))
xpath_project_package = ''
xpath_project_package = xpath_join(
xpath_project_package, f'action/source/@package="{package}"')
xpath_project_package = xpath_join(
xpath_project_package, 'action/source/@package="{}"'.format(
package_repository), op='or', inner=True)
xpath_project = xpath_join(xpath_project, f'({xpath_project_package})', op='and', inner=True)
xpath = xpath_join(xpath, xpath_project, op='or', nexpr_parentheses=True)
xpath_project = ''
# After being assigned to an incident.
xpath_project = xpath_join(xpath_project, 'starts-with(action/target/@project,"{}:")'.format(
maintenance_project))
xpath_project = xpath_join(xpath_project, 'action/target/@package="{}"'.format(
package_repository), op='and', inner=True)
xpath = xpath_join(xpath, xpath_project, op='or', nexpr_parentheses=True)
xpath = f'({xpath})'
if 'all' not in states:
xpath_states = ''
for state in states:
xpath_states = xpath_join(xpath_states, f'state/@name="{state}"', inner=True)
xpath = xpath_join(xpath, xpath_states, op='and', nexpr_parentheses=True)
xpath = xpath_join(xpath, 'action/@type="maintenance_incident"', op='and')
root = search(apiurl, 'request', xpath)
for request_element in root.findall('request'):
request = Request()
request.read(request_element)
for action in request.actions:
if action.type == 'maintenance_incident' and action.tgt_releaseproject == project and (
(action.tgt_package is None and
(action.src_package == package or action.src_package == package_repository)) or
(action.tgt_package == package_repository)):
yield request, action
break
def request_action_list_maintenance_release(apiurl, project, package, states=['new', 'review']):
package_repository = f"{package}.{project.replace(':', '_')}"
xpath = f'action/target/@project="{project}"'
xpath = xpath_join(xpath, f'action/source/@package="{package_repository}"', op='and', inner=True)
xpath = f'({xpath})'
if 'all' not in states:
xpath_states = ''
for state in states:
xpath_states = xpath_join(xpath_states, f'state/@name="{state}"', inner=True)
xpath = xpath_join(xpath, xpath_states, op='and', nexpr_parentheses=True)
xpath = xpath_join(xpath, 'action/@type="maintenance_release"', op='and')
root = search(apiurl, 'request', xpath)
for request_element in root.findall('request'):
request = Request()
request.read(request_element)
for action in request.actions:
if (action.type == 'maintenance_release' and
action.tgt_project == project and action.src_package == package_repository):
yield request, action
break
def request_action_simple_list(apiurl, project, package, states, request_type):
# Disable including source project in get_request_list() query.
before = conf.config['include_request_from_project']
conf.config['include_request_from_project'] = False
requests = get_request_list_with_history(apiurl, project, package, None, states, request_type)
conf.config['include_request_from_project'] = before
for request in requests:
for action in request.actions:
if action.tgt_project == project and action.tgt_package == package:
yield request, action
break
def request_action_list(apiurl, project, package, states=['new', 'review'], types=['submit']):
for request_type in types:
if request_type == 'maintenance_incident':
yield from request_action_list_maintenance_incident(apiurl, project, package, states)
if request_type == 'maintenance_release':
yield from request_action_list_maintenance_release(apiurl, project, package, states)
else:
yield from request_action_simple_list(apiurl, project, package, states, request_type)
def request_action_list_source(apiurl, project, package, states=['new', 'review'], include_release=False):
types = []
if attribute_value_load(apiurl, project, 'Maintained', 'OBS'):
types.append('maintenance_incident')
if include_release:
types.append('maintenance_release')
else:
types.append('submit')
yield from request_action_list(apiurl, project, package, states, types)
def request_create_submit(apiurl, source_project, source_package,
target_project, target_package=None, message=None, revision=None,
ignore_if_any_request=False, supersede=True, frequency=None):
"""
ignore_if_any_request: ignore source changes and do not submit if any prior requests
"""
if not target_package:
target_package = source_package
source_hash = package_source_hash(apiurl, target_project, target_package)
source_hash_consider = package_source_hash(apiurl, source_project, source_package, revision)
if source_hash_consider == source_hash:
# No sense submitting identical sources.
return False
for request, action in request_action_list(
apiurl, target_project, target_package, REQUEST_STATES_MINUS_ACCEPTED, ['submit']):
if ignore_if_any_request:
return False
if not supersede and request.state.name in ('new', 'review'):
return False
if frequency and request_age(request).total_seconds() < frequency:
return False
source_hash_pending = package_source_hash(
apiurl, action.src_project, action.src_package, action.src_rev)
if source_hash_pending == source_hash_consider:
# Pending request with identical sources.
return False
message = message_suffix('created', message)
def create_function():
return create_submit_request(apiurl, source_project, source_package,
target_project, target_package,
message=message, orev=revision)
return RequestFuture('submit {}/{} -> {}/{}'.format(
source_project, source_package, target_project, target_package), create_function)
def request_create_delete(apiurl, target_project, target_package, message=None):
for request, action in request_action_list(
apiurl, target_project, target_package, REQUEST_STATES_MINUS_ACCEPTED, ['delete']):
return False
# No proper API function to perform the same operation.
message = message_suffix('created', message)
def create_function():
return create_delete_request(apiurl, target_project, target_package, message)
return RequestFuture(f'delete {target_project}/{target_package}', create_function)
def request_create_change_devel(apiurl, source_project, source_package,
target_project, target_package=None, message=None):
if not target_package:
target_package = source_package
for request, action in request_action_list(
apiurl, target_project, target_package, REQUEST_STATES_MINUS_ACCEPTED, ['change_devel']):
return False
message = message_suffix('created', message)
def create_function():
return create_change_devel_request(apiurl, source_project, source_package,
target_project, target_package, message)
return RequestFuture('change_devel {}/{} -> {}/{}'.format(
source_project, source_package, target_project, target_package), create_function)
def create_delete_request(apiurl, target_project, target_package=None, message=None):
"""Create a delete request"""
action = Action('delete', tgt_project=target_project, tgt_package=target_package)
return create_request(apiurl, action, message)
def create_change_devel_request(apiurl, source_project, source_package,
target_project, target_package=None, message=None):
"""Create a change_devel request"""
action = Action('change_devel', src_project=source_project, src_package=source_package,
tgt_project=target_project, tgt_package=target_package)
return create_request(apiurl, action, message)
def create_add_role_request(apiurl, target_project, user, role, target_package=None, message=None):
"""Create an add_role request
user -- user or group name. If it is a group, it has to start with 'group:'.
"""
if user.startswith('group:'):
group = user.replace('group:', '')
kargs = dict(group_name=group, group_role=role)
else:
kargs = dict(person_name=user, person_role=role)
action = Action('add_role', tgt_project=target_project, tgt_package=target_package, **kargs)
return create_request(apiurl, action, message)
def create_set_bugowner_request(apiurl, target_project, user, target_package=None, message=None):
"""Create an set_bugowner request
user -- user or group name. If it is a group, it has to start with 'group:'.
"""
if user.startswith('group:'):
group = user.replace('group:', '')
kargs = dict(group_name=group)
else:
kargs = dict(person_name=user)
action = Action('set_bugowner', tgt_project=target_project, tgt_package=target_package, **kargs)
return create_request(apiurl, action, message)
def create_request(apiurl, action, message=None):
"""Create a request for the given action
If no message is given, one is generated (see add_description)
"""
r = Request()
r.actions.append(action)
add_description(r, message)
r.create(apiurl)
return r.reqid
class RequestFuture:
def __init__(self, description, create_function):
self.description = description
self.create_function = create_function
def create(self):
return self.create_function()
def create_tolerant(self):
try:
return self.create()
except HTTPError:
traceback.print_exc()
return False
def print_and_create(self, dry=False):
if dry:
print(self)
return None
request_id = self.create_tolerant()
print(f'{self} = {request_id}')
return request_id
def __str__(self):
return self.description
def add_description(request, text=None):
"""Add a description to the given request.
If a text is given, that is used as description. Otherwise a generic text is generated.
"""
if not text:
text = message_suffix('created')
request.description = text
def message_suffix(action, message=None):
if not message:
message = f'{action} by OSRT tools'
message += f' (host {socket.gethostname()})'
return message
def request_state_change(apiurl, request_id, state):
query = {'newstate': state, 'cmd': 'changestate'}
url = makeurl(apiurl, ['request', request_id], query)
return ET.parse(http_POST(url)).getroot().get('code')