diff --git a/totest-manager.py b/totest-manager.py index c01f67ed..0e5aa38a 100755 --- a/totest-manager.py +++ b/totest-manager.py @@ -6,6 +6,7 @@ # (C) 2014 aplanas@suse.de, openSUSE.org # (C) 2014 coolo@suse.de, openSUSE.org # (C) 2017 okurz@suse.de, openSUSE.org +# (C) 2018 dheidler@suse.de, openSUSE.org # Distribute under GPLv2 or GPLv3 from __future__ import print_function @@ -21,6 +22,7 @@ import logging import signal import time import yaml +import pika from xml.etree import cElementTree as ET from openqa_client.client import OpenQA_Client @@ -66,6 +68,7 @@ class ToTestBase(object): self.load_issues_to_ignore() self.project_base = project.split(':')[0] self.update_pinned_descr = False + self.amqp_url = osc.conf.config.get('ttm_amqp_url') def load_issues_to_ignore(self): url = self.api.makeurl(['source', self.project, '_attribute', 'OSRT:IgnoredIssues']) @@ -247,16 +250,17 @@ class ToTestBase(object): jobs = self.find_openqa_results(snapshot) + self.failed_relevant_jobs = [] + self.failed_ignored_jobs = [] + if len(jobs) < self.jobs_num(): # not yet scheduled logger.warning('we have only %s jobs' % len(jobs)) return QA_INPROGRESS - number_of_fails = 0 in_progress = False for job in jobs: # print json.dumps(job, sort_keys=True, indent=4) if job['result'] in ('failed', 'incomplete', 'skipped', 'user_cancelled', 'obsoleted', 'parallel_failed'): - jobname = job['name'] # print json.dumps(job, sort_keys=True, indent=4), jobname url = makeurl(self.openqa_server, ['api', 'v1', 'jobs', str(job['id']), 'comments']) @@ -287,6 +291,7 @@ class ToTestBase(object): self.issues_to_ignore[ref] = build_nr if ignored: + self.failed_ignored_jobs.append(job['id']) if labeled: text = 'Ignored issue' if len(refs) > 0 else 'Ignored failure' # remove flag - unfortunately can't delete comment unless admin @@ -294,9 +299,9 @@ class ToTestBase(object): self.openqa.openqa_request( 'PUT', 'jobs/%s/comments/%d' % (job['id'], labeled), data=data) - logger.info("job %s failed, but was ignored", jobname) + logger.info("job %s failed, but was ignored", job['name']) else: - number_of_fails += 1 + self.failed_relevant_jobs.append(job['id']) if not labeled and len(refs) > 0: data = {'text': 'label:unknown_failure'} if self.dryrun: @@ -306,7 +311,7 @@ class ToTestBase(object): 'POST', 'jobs/%s/comments' % job['id'], data=data) joburl = '%s/tests/%s' % (self.openqa_server, job['id']) - logger.info("job %s failed, see %s", jobname, joburl) + logger.info("job %s failed, see %s", job['name'], joburl) elif job['result'] == 'passed' or job['result'] == 'softfailed': continue @@ -318,7 +323,7 @@ class ToTestBase(object): self.save_issues_to_ignore() - if number_of_fails > 0: + if len(self.failed_relevant_jobs) > 0: return QA_FAILED if in_progress: @@ -541,6 +546,8 @@ class ToTestBase(object): # with a new release can_release = False + self.send_amqp_event(current_snapshot, current_result) + can_publish = (current_result == QA_PASSED) # already published @@ -575,6 +582,38 @@ class ToTestBase(object): self.update_totest(new_snapshot) self.write_version_to_dashboard("totest", new_snapshot) + def send_amqp_event(self, current_snapshot, current_result): + if not self.amqp_url: + logger.debug('No ttm_amqp_url configured in oscrc - skipping amqp event emission') + return + + logger.debug('Sending AMQP message') + inf = re.sub(r"ed$", '', self._result2str(current_result)) + msg_topic = '%s.ttm.build.%s' % (self.project_base.lower(), inf) + msg_body = json.dumps({ + 'build': current_snapshot, + 'project': self.project, + 'failed_jobs': { + 'relevant': self.failed_relevant_jobs, + 'ignored': self.failed_ignored_jobs, + } + }) + + # send amqp event + tries = 7 # arbitrary + for t in range(tries): + try: + notify_connection = pika.BlockingConnection(pika.URLParameters(self.amqp_url)) + notify_channel = notify_connection.channel() + notify_channel.exchange_declare(exchange='pubsub', exchange_type='topic', passive=True, durable=True) + notify_channel.basic_publish(exchange='pubsub', routing_key=msg_topic, body=msg_body) + notify_connection.close() + break + except pika.exceptions.ConnectionClosed as e: + logger.warn('Sending AMQP event did not work: %s. Retrying try %s out of %s' % (e, t, tries)) + else: + logger.error('Could not send out AMQP event for %s tries, aborting.' % tries) + def release(self): new_snapshot = self.current_version() self.update_totest(new_snapshot)