If found the staging project for the request then it should quit the loop, continue the loop makes no sense.
190 lines
6.4 KiB
Python
190 lines
6.4 KiB
Python
import urllib2
|
|
from xml.etree import cElementTree as ET
|
|
|
|
from osc import oscerr
|
|
from osc.core import makeurl
|
|
from osc.core import http_GET
|
|
|
|
|
|
def _is_int(x):
|
|
return isinstance(x, int) or x.isdigit()
|
|
|
|
|
|
class RequestFinder(object):
|
|
|
|
def __init__(self, api):
|
|
"""
|
|
Store the list of submit request together with the source project
|
|
|
|
Example:
|
|
{
|
|
212454: {
|
|
'project': 'openSUSE:Factory',
|
|
},
|
|
223870: {
|
|
'project': 'openSUSE:Factory',
|
|
'staging': 'openSUSE:Factory:Staging:A',
|
|
}
|
|
}
|
|
"""
|
|
self.api = api
|
|
self.srs = {}
|
|
|
|
def find_request_id(self, request_id):
|
|
"""
|
|
Look up the request by ID to verify if it is correct
|
|
:param request_id: ID of the added request
|
|
"""
|
|
|
|
if not _is_int(request_id):
|
|
return False
|
|
|
|
url = makeurl(self.api.apiurl, ['request', str(request_id)])
|
|
try:
|
|
f = http_GET(url)
|
|
except urllib2.HTTPError:
|
|
return None
|
|
|
|
root = ET.parse(f).getroot()
|
|
|
|
if root.get('id', None) != str(request_id):
|
|
return None
|
|
|
|
project = root.find('action').find('target').get('project')
|
|
if (project != self.api.project and not project.startswith(self.api.cstaging)):
|
|
msg = 'Request {} is not for {}, but for {}'
|
|
msg = msg.format(request_id, self.api.project, project)
|
|
raise oscerr.WrongArgs(msg)
|
|
self.srs[int(request_id)] = {'project': project}
|
|
|
|
return True
|
|
|
|
def find_request_package(self, package):
|
|
"""
|
|
Look up the package by its name and return the SR#
|
|
:param package: name of the package
|
|
"""
|
|
|
|
query = 'states=new,review&project={}&view=collection&package={}'
|
|
query = query.format(self.api.project, urllib2.quote(package))
|
|
url = makeurl(self.api.apiurl, ['request'], query)
|
|
f = http_GET(url)
|
|
|
|
root = ET.parse(f).getroot()
|
|
|
|
requests = []
|
|
for sr in root.findall('request'):
|
|
# Check the target matches - OBS query is case insensitive, but OBS is not
|
|
rq_target = sr.find('action').find('target')
|
|
if package != rq_target.get('package') or self.api.project != rq_target.get('project'):
|
|
continue
|
|
|
|
request = sr.get('id')
|
|
state = sr.find('state').get('name')
|
|
|
|
self.srs[int(request)] = {'project': self.api.project, 'state': state}
|
|
requests.append(request)
|
|
|
|
if len(requests) > 1:
|
|
msg = 'There are multiple requests for package "{}": {}'
|
|
msg = msg.format(package, ', '.join(requests))
|
|
raise oscerr.WrongArgs(msg)
|
|
|
|
request = int(requests[0]) if requests else None
|
|
return request
|
|
|
|
def find_request_project(self, source_project, newcand):
|
|
"""
|
|
Look up the source project by its name and return the SR#(s)
|
|
:param source_project: name of the source project
|
|
:param newcand: the review state of staging-group must be new
|
|
"""
|
|
|
|
query = 'states=new,review&project={}&view=collection'.format(self.api.project)
|
|
url = makeurl(self.api.apiurl, ['request'], query)
|
|
f = http_GET(url)
|
|
root = ET.parse(f).getroot()
|
|
|
|
ret = None
|
|
for sr in root.findall('request'):
|
|
# ensure staging tool don't picks the processed request again
|
|
if newcand:
|
|
staging_group_states = [review.get('state') for review in sr.findall('review') if review.get('by_group') == self.api.cstaging_group]
|
|
if 'new' not in staging_group_states:
|
|
continue
|
|
for act in sr.findall('action'):
|
|
src = act.find('source')
|
|
if src is not None and src.get('project') == source_project:
|
|
request = int(sr.attrib['id'])
|
|
state = sr.find('state').get('name')
|
|
self.srs[request] = {'project': self.api.project, 'state': state}
|
|
ret = True
|
|
|
|
return ret
|
|
|
|
def find(self, pkgs, newcand):
|
|
"""
|
|
Search for all various mutations and return list of SR#s
|
|
:param pkgs: mesh of argumets to search for
|
|
:param newcand: the review state of staging-group must be new
|
|
|
|
This function is only called for its side effect.
|
|
"""
|
|
for p in pkgs:
|
|
if self.find_request_package(p):
|
|
continue
|
|
if self.find_request_id(p):
|
|
continue
|
|
if self.find_request_project(p, newcand):
|
|
continue
|
|
raise oscerr.WrongArgs('No SR# found for: {}'.format(p))
|
|
|
|
def find_via_stagingapi(self, pkgs):
|
|
"""
|
|
Search for all various mutations and return list of SR#s. Use
|
|
and instance of StagingAPI to direct the search, this makes
|
|
sure that the SR# are inside a staging project.
|
|
:param pkgs: mesh of argumets to search for
|
|
|
|
This function is only called for its side effect.
|
|
"""
|
|
|
|
for p in pkgs:
|
|
found = False
|
|
for staging in self.api.get_staging_projects():
|
|
if _is_int(p) and self.api.get_package_for_request_id(staging, p):
|
|
self.srs[int(p)] = {'staging': staging}
|
|
found = True
|
|
break
|
|
else:
|
|
rq = self.api.get_request_id_for_package(staging, p)
|
|
if rq:
|
|
self.srs[rq] = {'staging': staging}
|
|
found = True
|
|
break
|
|
if not found:
|
|
raise oscerr.WrongArgs('No SR# found for: {}'.format(p))
|
|
|
|
@classmethod
|
|
def find_sr(cls, pkgs, api, newcand=False):
|
|
"""
|
|
Search for all various mutations and return list of SR#s
|
|
:param pkgs: mesh of argumets to search for
|
|
:param api: StagingAPI instance
|
|
:param newcand: the review state of staging-group must be new
|
|
"""
|
|
finder = cls(api)
|
|
finder.find(pkgs, newcand)
|
|
return finder.srs
|
|
|
|
@classmethod
|
|
def find_staged_sr(cls, pkgs, api):
|
|
"""
|
|
Search for all various mutations and return a single SR#s.
|
|
:param pkgs: mesh of argumets to search for (SR#|package name)
|
|
:param api: StagingAPI instance
|
|
"""
|
|
finder = cls(api)
|
|
finder.find_via_stagingapi(pkgs)
|
|
return finder.srs
|