Merge pull request #1548 from asdil12/ttm_amqp

Add AMQP support to TTM
This commit is contained in:
Stephan Kulow 2018-05-18 09:02:54 +02:00 committed by GitHub
commit ef92c2fcec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -6,6 +6,7 @@
# (C) 2014 aplanas@suse.de, openSUSE.org # (C) 2014 aplanas@suse.de, openSUSE.org
# (C) 2014 coolo@suse.de, openSUSE.org # (C) 2014 coolo@suse.de, openSUSE.org
# (C) 2017 okurz@suse.de, openSUSE.org # (C) 2017 okurz@suse.de, openSUSE.org
# (C) 2018 dheidler@suse.de, openSUSE.org
# Distribute under GPLv2 or GPLv3 # Distribute under GPLv2 or GPLv3
from __future__ import print_function from __future__ import print_function
@ -21,6 +22,7 @@ import logging
import signal import signal
import time import time
import yaml import yaml
import pika
from xml.etree import cElementTree as ET from xml.etree import cElementTree as ET
from openqa_client.client import OpenQA_Client from openqa_client.client import OpenQA_Client
@ -66,6 +68,7 @@ class ToTestBase(object):
self.load_issues_to_ignore() self.load_issues_to_ignore()
self.project_base = project.split(':')[0] self.project_base = project.split(':')[0]
self.update_pinned_descr = False self.update_pinned_descr = False
self.amqp_url = osc.conf.config.get('ttm_amqp_url')
def load_issues_to_ignore(self): def load_issues_to_ignore(self):
url = self.api.makeurl(['source', self.project, '_attribute', 'OSRT:IgnoredIssues']) url = self.api.makeurl(['source', self.project, '_attribute', 'OSRT:IgnoredIssues'])
@ -247,16 +250,17 @@ class ToTestBase(object):
jobs = self.find_openqa_results(snapshot) jobs = self.find_openqa_results(snapshot)
self.failed_relevant_jobs = []
self.failed_ignored_jobs = []
if len(jobs) < self.jobs_num(): # not yet scheduled if len(jobs) < self.jobs_num(): # not yet scheduled
logger.warning('we have only %s jobs' % len(jobs)) logger.warning('we have only %s jobs' % len(jobs))
return QA_INPROGRESS return QA_INPROGRESS
number_of_fails = 0
in_progress = False in_progress = False
for job in jobs: for job in jobs:
# print json.dumps(job, sort_keys=True, indent=4) # print json.dumps(job, sort_keys=True, indent=4)
if job['result'] in ('failed', 'incomplete', 'skipped', 'user_cancelled', 'obsoleted', 'parallel_failed'): if job['result'] in ('failed', 'incomplete', 'skipped', 'user_cancelled', 'obsoleted', 'parallel_failed'):
jobname = job['name']
# print json.dumps(job, sort_keys=True, indent=4), jobname # print json.dumps(job, sort_keys=True, indent=4), jobname
url = makeurl(self.openqa_server, url = makeurl(self.openqa_server,
['api', 'v1', 'jobs', str(job['id']), 'comments']) ['api', 'v1', 'jobs', str(job['id']), 'comments'])
@ -287,6 +291,7 @@ class ToTestBase(object):
self.issues_to_ignore[ref] = build_nr self.issues_to_ignore[ref] = build_nr
if ignored: if ignored:
self.failed_ignored_jobs.append(job['id'])
if labeled: if labeled:
text = 'Ignored issue' if len(refs) > 0 else 'Ignored failure' text = 'Ignored issue' if len(refs) > 0 else 'Ignored failure'
# remove flag - unfortunately can't delete comment unless admin # remove flag - unfortunately can't delete comment unless admin
@ -294,9 +299,9 @@ class ToTestBase(object):
self.openqa.openqa_request( self.openqa.openqa_request(
'PUT', 'jobs/%s/comments/%d' % (job['id'], labeled), data=data) 'PUT', 'jobs/%s/comments/%d' % (job['id'], labeled), data=data)
logger.info("job %s failed, but was ignored", jobname) logger.info("job %s failed, but was ignored", job['name'])
else: else:
number_of_fails += 1 self.failed_relevant_jobs.append(job['id'])
if not labeled and len(refs) > 0: if not labeled and len(refs) > 0:
data = {'text': 'label:unknown_failure'} data = {'text': 'label:unknown_failure'}
if self.dryrun: if self.dryrun:
@ -306,7 +311,7 @@ class ToTestBase(object):
'POST', 'jobs/%s/comments' % job['id'], data=data) 'POST', 'jobs/%s/comments' % job['id'], data=data)
joburl = '%s/tests/%s' % (self.openqa_server, job['id']) joburl = '%s/tests/%s' % (self.openqa_server, job['id'])
logger.info("job %s failed, see %s", jobname, joburl) logger.info("job %s failed, see %s", job['name'], joburl)
elif job['result'] == 'passed' or job['result'] == 'softfailed': elif job['result'] == 'passed' or job['result'] == 'softfailed':
continue continue
@ -318,7 +323,7 @@ class ToTestBase(object):
self.save_issues_to_ignore() self.save_issues_to_ignore()
if number_of_fails > 0: if len(self.failed_relevant_jobs) > 0:
return QA_FAILED return QA_FAILED
if in_progress: if in_progress:
@ -541,6 +546,8 @@ class ToTestBase(object):
# with a new release # with a new release
can_release = False can_release = False
self.send_amqp_event(current_snapshot, current_result)
can_publish = (current_result == QA_PASSED) can_publish = (current_result == QA_PASSED)
# already published # already published
@ -575,6 +582,38 @@ class ToTestBase(object):
self.update_totest(new_snapshot) self.update_totest(new_snapshot)
self.write_version_to_dashboard("totest", new_snapshot) self.write_version_to_dashboard("totest", new_snapshot)
def send_amqp_event(self, current_snapshot, current_result):
if not self.amqp_url:
logger.debug('No ttm_amqp_url configured in oscrc - skipping amqp event emission')
return
logger.debug('Sending AMQP message')
inf = re.sub(r"ed$", '', self._result2str(current_result))
msg_topic = '%s.ttm.build.%s' % (self.project_base.lower(), inf)
msg_body = json.dumps({
'build': current_snapshot,
'project': self.project,
'failed_jobs': {
'relevant': self.failed_relevant_jobs,
'ignored': self.failed_ignored_jobs,
}
})
# send amqp event
tries = 7 # arbitrary
for t in range(tries):
try:
notify_connection = pika.BlockingConnection(pika.URLParameters(self.amqp_url))
notify_channel = notify_connection.channel()
notify_channel.exchange_declare(exchange='pubsub', exchange_type='topic', passive=True, durable=True)
notify_channel.basic_publish(exchange='pubsub', routing_key=msg_topic, body=msg_body)
notify_connection.close()
break
except pika.exceptions.ConnectionClosed as e:
logger.warn('Sending AMQP event did not work: %s. Retrying try %s out of %s' % (e, t, tries))
else:
logger.error('Could not send out AMQP event for %s tries, aborting.' % tries)
def release(self): def release(self):
new_snapshot = self.current_version() new_snapshot = self.current_version()
self.update_totest(new_snapshot) self.update_totest(new_snapshot)