Fetch all openQA jobs for the ISO every time

This is ugly on first look, but has several advantages:
- we can more easily support a cold start
- as such we don't need to have a persistant queue and
  can directly bind the routing keys we want
- we do the same on all openqa events, simplifying the code
- we can cope support short names for the checks

The last is the most significant benefit (not yet implemented though).
We can name the openqa jobs RAID1 and gnome and only have to append
the machine name (or other settings) if they conflict
This commit is contained in:
Stephan Kulow 2018-10-08 14:05:25 +02:00
parent dab6850133
commit ecdc60065a

View File

@ -10,12 +10,15 @@ from osc.core import http_POST
from osclib.conf import Config
from osclib.stagingapi import StagingAPI
from lxml import etree as ET
from openqa_client.client import OpenQA_Client
class Project(object):
def __init__(self, name):
Config(apiurl, name)
self.api = StagingAPI(apiurl, name)
self.staging_projects = dict()
self.listener = None
for p in self.api.get_staging_projects():
if self.api.is_adi_project(p):
continue
@ -49,10 +52,36 @@ class Project(object):
return {'isos': self.gather_isos(name, 'images'),
'id': self.gather_buildid(name, 'images')}
# once the project is added to a listener, it's calling back
def fetch_initial_openqa(self, listener):
self.listener = listener
for project in self.staging_projects:
self.update_staging_status(project)
def fetch_openqa_jobs(self, staging, iso):
buildid = self.staging_projects[staging].get('id')
if not buildid:
print("I don't know the build id of " + staging)
return
openqa = self.listener.jobs_for_iso(iso)
for job in openqa:
print(staging, iso, job['id'], job['state'], job['result'],
job['settings']['MACHINE'], job['settings']['TEST'])
xml = self.openqa_check_xml(self.listener.test_url(job),
self.map_openqa_result(job),
job['settings']['TEST'] + '@' + job['settings']['MACHINE'])
url = self.api.makeurl(['status_reports', 'published', staging, 'images', 'reports', buildid])
http_POST(url, data=xml)
print(url, xml)
def update_staging_status(self, project):
for iso in self.staging_projects[project]['isos']:
self.fetch_openqa_jobs(project, iso)
def update_staging_buildid(self, project, repository, buildid):
self.staging_projects[project]['id'] = buildid
self.staging_projects[project]['isos'] = self.gather_isos(project, repository)
print('UPDATE', project, self.staging_projects[project])
self.update_staging_status(project)
def check_published_repo(self, project, repository, buildid):
if repository != 'images':
@ -66,45 +95,42 @@ class Project(object):
if iso in self.staging_projects[p]['isos']:
return p
def map_openqa_result(self, result):
if result in ['passed', 'softfailed']:
def map_openqa_result(self, job):
if job['result'] in ['passed', 'softfailed']:
return 'success'
if job['result'] == 'none':
return 'pending'
return 'failure'
def openqa_done(self, iso, test, machine, id, result):
print('openqa_done', iso, test, machine, id, result)
def openqa_job_change(self, iso):
staging = self.matching_project(iso)
if not staging:
return
buildid = self.staging_projects[staging].get('id')
if not buildid:
print("I don't know the build id of " + staging)
return
xml = self.openqa_check_xml(id,
self.map_openqa_result(result),
test + '@' + machine)
url = self.api.makeurl(['status_reports', 'published', staging, 'images', 'reports', buildid])
http_POST(url, data=xml)
# we fetch all openqa jobs so we can avoid long job names
self.fetch_openqa_jobs(staging, iso)
def openqa_create(self, iso, test, machine, id):
print('openqa_create', iso, test, machine, id)
def openqa_check_xml(self, id, state, name):
def openqa_check_xml(self, url, state, name):
check = ET.Element('check')
se = ET.SubElement(check, 'url')
se.text = "https://openqa.suse.de/tests/{}".format(id)
se.text = url
se = ET.SubElement(check, 'state')
se.text = state
se = ET.SubElement(check, 'name')
se.text = name
return ET.tostring(check)
class Listener(object):
def __init__(self, amqp_prefix, amqp_url):
def __init__(self, amqp_prefix, amqp_url, openqa_url):
self.projects = []
self.amqp_prefix = amqp_prefix
self.amqp_url = amqp_url
connection = pika.BlockingConnection(pika.URLParameters(amqp_url))
self.openqa_url = openqa_url
self.openqa = OpenQA_Client(server=openqa_url)
self.setup_rabbitmq()
def setup_rabbitmq(self):
connection = pika.BlockingConnection(pika.URLParameters(self.amqp_url))
self.channel = connection.channel()
self.channel.exchange_declare(exchange='pubsub', exchange_type='topic', passive=True, durable=True)
@ -112,46 +138,56 @@ class Listener(object):
result = self.channel.queue_declare(exclusive=True)
queue_name = result.method.queue
self.channel.queue_bind(exchange='pubsub',
queue=queue_name,routing_key='#')
for event in ['.obs.repo.published', '.openqa.job.done',
'.openqa.job.create', '.openqa.job.restart']:
self.channel.queue_bind(exchange='pubsub',
queue=queue_name,
routing_key=self.amqp_prefix + event)
self.channel.basic_consume(self.on_message,
queue=queue_name,
no_ack=True)
print(' [*] Waiting for logs. To exit press CTRL+C')
no_ack=True)
def add(self, project):
project.fetch_initial_openqa(self)
self.projects.append(project)
def jobs_for_iso(self, iso):
values = {
'iso': iso,
'scope': 'current',
'latest': '1',
}
return self.openqa.openqa_request('GET', 'jobs', values)['jobs']
def test_url(self, job):
# TODO: link to failing module if failed
return self.openqa_url + ("/tests/%d" % job['id'])
def on_published_repo(self, payload):
for p in self.projects:
p.check_published_repo(str(payload['project']), str(payload['repo']), str(payload['buildid']))
def on_openqa_create(self, payload):
def on_openqa_job(self, iso):
print('openqa_job_change', iso)
for p in self.projects:
p.openqa_create(str(payload.get('ISO', '')), str(payload['TEST']), str(payload['MACHINE']), str(payload['id']))
def on_openqa_restart(self, payload):
print(payload)
def on_openqa_done(self, payload):
for p in self.projects:
p.openqa_done(str(payload.get('ISO', '')), payload['TEST'], payload['MACHINE'], payload['id'], payload['result'])
p.openqa_job_change(iso)
def on_message(self, unused_channel, method, properties, body):
if method.routing_key == '{}.obs.repo.published'.format(amqp_prefix):
self.on_published_repo(json.loads(body))
if method.routing_key == '{}.openqa.job.done'.format(amqp_prefix):
self.on_openqa_done(json.loads(body))
self.on_openqa_job(json.loads(body).get('ISO'))
if method.routing_key == '{}.openqa.job.create'.format(amqp_prefix):
self.on_openqa_create(json.loads(body))
self.on_openqa_job(json.loads(body).get('ISO'))
if method.routing_key == '{}.openqa.job.restart'.format(amqp_prefix):
self.on_openqa_restart(json.loads(body))
self.on_openqa_job(json.loads(body).get('ISO'))
def listen(self):
print(' [*] Waiting for logs. To exit press CTRL+C')
self.channel.start_consuming()
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Bot to sync openQA status to OBS')
@ -175,11 +211,14 @@ if __name__ == '__main__':
if apiurl.endswith('suse.de'):
amqp_prefix = 'suse'
amqp_url = "amqps://suse:suse@rabbit.suse.de?heartbeat_interval=15"
openqa_url = 'https://openqa.suse.de'
else:
amqp_prefix = 'opensuse'
amqp_url = "amqps://opensuse:opensuse@rabbit.opensuse.org?heartbeat_interval=15"
openqa_url = 'https://openqa.opensuse.org'
l = Listener(amqp_prefix, amqp_url, openqa_url)
l = Listener(amqp_prefix, amqp_url)
if amqp_prefix == 'opensuse':
class Leap15(Project):