From ecdc60065a52ad8658cdcbbddfa2a02256a6d12c Mon Sep 17 00:00:00 2001 From: Stephan Kulow Date: Mon, 8 Oct 2018 14:05:25 +0200 Subject: [PATCH] Fetch all openQA jobs for the ISO every time This is ugly on first look, but has several advantages: - we can more easily support a cold start - as such we don't need to have a persistant queue and can directly bind the routing keys we want - we do the same on all openqa events, simplifying the code - we can cope support short names for the checks The last is the most significant benefit (not yet implemented though). We can name the openqa jobs RAID1 and gnome and only have to append the machine name (or other settings) if they conflict --- rabbit-openqa.py | 117 +++++++++++++++++++++++++++++++---------------- 1 file changed, 78 insertions(+), 39 deletions(-) diff --git a/rabbit-openqa.py b/rabbit-openqa.py index 990c1ba1..9e538f62 100755 --- a/rabbit-openqa.py +++ b/rabbit-openqa.py @@ -10,12 +10,15 @@ from osc.core import http_POST from osclib.conf import Config from osclib.stagingapi import StagingAPI from lxml import etree as ET +from openqa_client.client import OpenQA_Client + class Project(object): def __init__(self, name): Config(apiurl, name) self.api = StagingAPI(apiurl, name) self.staging_projects = dict() + self.listener = None for p in self.api.get_staging_projects(): if self.api.is_adi_project(p): continue @@ -49,10 +52,36 @@ class Project(object): return {'isos': self.gather_isos(name, 'images'), 'id': self.gather_buildid(name, 'images')} + # once the project is added to a listener, it's calling back + def fetch_initial_openqa(self, listener): + self.listener = listener + for project in self.staging_projects: + self.update_staging_status(project) + + def fetch_openqa_jobs(self, staging, iso): + buildid = self.staging_projects[staging].get('id') + if not buildid: + print("I don't know the build id of " + staging) + return + openqa = self.listener.jobs_for_iso(iso) + for job in openqa: + print(staging, iso, job['id'], job['state'], job['result'], + job['settings']['MACHINE'], job['settings']['TEST']) + xml = self.openqa_check_xml(self.listener.test_url(job), + self.map_openqa_result(job), + job['settings']['TEST'] + '@' + job['settings']['MACHINE']) + url = self.api.makeurl(['status_reports', 'published', staging, 'images', 'reports', buildid]) + http_POST(url, data=xml) + print(url, xml) + + def update_staging_status(self, project): + for iso in self.staging_projects[project]['isos']: + self.fetch_openqa_jobs(project, iso) + def update_staging_buildid(self, project, repository, buildid): self.staging_projects[project]['id'] = buildid self.staging_projects[project]['isos'] = self.gather_isos(project, repository) - print('UPDATE', project, self.staging_projects[project]) + self.update_staging_status(project) def check_published_repo(self, project, repository, buildid): if repository != 'images': @@ -66,45 +95,42 @@ class Project(object): if iso in self.staging_projects[p]['isos']: return p - def map_openqa_result(self, result): - if result in ['passed', 'softfailed']: + def map_openqa_result(self, job): + if job['result'] in ['passed', 'softfailed']: return 'success' + if job['result'] == 'none': + return 'pending' return 'failure' - def openqa_done(self, iso, test, machine, id, result): - print('openqa_done', iso, test, machine, id, result) + def openqa_job_change(self, iso): staging = self.matching_project(iso) if not staging: return - buildid = self.staging_projects[staging].get('id') - if not buildid: - print("I don't know the build id of " + staging) - return - xml = self.openqa_check_xml(id, - self.map_openqa_result(result), - test + '@' + machine) - url = self.api.makeurl(['status_reports', 'published', staging, 'images', 'reports', buildid]) - http_POST(url, data=xml) + # we fetch all openqa jobs so we can avoid long job names + self.fetch_openqa_jobs(staging, iso) - def openqa_create(self, iso, test, machine, id): - print('openqa_create', iso, test, machine, id) - - def openqa_check_xml(self, id, state, name): + def openqa_check_xml(self, url, state, name): check = ET.Element('check') se = ET.SubElement(check, 'url') - se.text = "https://openqa.suse.de/tests/{}".format(id) + se.text = url se = ET.SubElement(check, 'state') se.text = state se = ET.SubElement(check, 'name') se.text = name return ET.tostring(check) + class Listener(object): - def __init__(self, amqp_prefix, amqp_url): + def __init__(self, amqp_prefix, amqp_url, openqa_url): self.projects = [] self.amqp_prefix = amqp_prefix self.amqp_url = amqp_url - connection = pika.BlockingConnection(pika.URLParameters(amqp_url)) + self.openqa_url = openqa_url + self.openqa = OpenQA_Client(server=openqa_url) + self.setup_rabbitmq() + + def setup_rabbitmq(self): + connection = pika.BlockingConnection(pika.URLParameters(self.amqp_url)) self.channel = connection.channel() self.channel.exchange_declare(exchange='pubsub', exchange_type='topic', passive=True, durable=True) @@ -112,46 +138,56 @@ class Listener(object): result = self.channel.queue_declare(exclusive=True) queue_name = result.method.queue - self.channel.queue_bind(exchange='pubsub', - queue=queue_name,routing_key='#') + for event in ['.obs.repo.published', '.openqa.job.done', + '.openqa.job.create', '.openqa.job.restart']: + self.channel.queue_bind(exchange='pubsub', + queue=queue_name, + routing_key=self.amqp_prefix + event) self.channel.basic_consume(self.on_message, queue=queue_name, - no_ack=True) - - print(' [*] Waiting for logs. To exit press CTRL+C') + no_ack=True) def add(self, project): + project.fetch_initial_openqa(self) self.projects.append(project) + def jobs_for_iso(self, iso): + values = { + 'iso': iso, + 'scope': 'current', + 'latest': '1', + } + return self.openqa.openqa_request('GET', 'jobs', values)['jobs'] + + def test_url(self, job): + # TODO: link to failing module if failed + return self.openqa_url + ("/tests/%d" % job['id']) + def on_published_repo(self, payload): for p in self.projects: p.check_published_repo(str(payload['project']), str(payload['repo']), str(payload['buildid'])) - def on_openqa_create(self, payload): + def on_openqa_job(self, iso): + print('openqa_job_change', iso) for p in self.projects: - p.openqa_create(str(payload.get('ISO', '')), str(payload['TEST']), str(payload['MACHINE']), str(payload['id'])) - - def on_openqa_restart(self, payload): - print(payload) - - def on_openqa_done(self, payload): - for p in self.projects: - p.openqa_done(str(payload.get('ISO', '')), payload['TEST'], payload['MACHINE'], payload['id'], payload['result']) + p.openqa_job_change(iso) def on_message(self, unused_channel, method, properties, body): if method.routing_key == '{}.obs.repo.published'.format(amqp_prefix): self.on_published_repo(json.loads(body)) if method.routing_key == '{}.openqa.job.done'.format(amqp_prefix): - self.on_openqa_done(json.loads(body)) + self.on_openqa_job(json.loads(body).get('ISO')) if method.routing_key == '{}.openqa.job.create'.format(amqp_prefix): - self.on_openqa_create(json.loads(body)) + self.on_openqa_job(json.loads(body).get('ISO')) if method.routing_key == '{}.openqa.job.restart'.format(amqp_prefix): - self.on_openqa_restart(json.loads(body)) + self.on_openqa_job(json.loads(body).get('ISO')) def listen(self): + print(' [*] Waiting for logs. To exit press CTRL+C') self.channel.start_consuming() + if __name__ == '__main__': parser = argparse.ArgumentParser( description='Bot to sync openQA status to OBS') @@ -175,11 +211,14 @@ if __name__ == '__main__': if apiurl.endswith('suse.de'): amqp_prefix = 'suse' amqp_url = "amqps://suse:suse@rabbit.suse.de?heartbeat_interval=15" + openqa_url = 'https://openqa.suse.de' else: amqp_prefix = 'opensuse' amqp_url = "amqps://opensuse:opensuse@rabbit.opensuse.org?heartbeat_interval=15" + openqa_url = 'https://openqa.opensuse.org' + + l = Listener(amqp_prefix, amqp_url, openqa_url) - l = Listener(amqp_prefix, amqp_url) if amqp_prefix == 'opensuse': class Leap15(Project):