From d1c3e212d78cf32aaa07c30f4f259523effe3e7f Mon Sep 17 00:00:00 2001 From: Stephan Kulow Date: Fri, 5 Oct 2018 15:26:10 +0200 Subject: [PATCH 01/14] Add helper bot to listen to rabbit bus and feed OBS with openQA Status There is an API gap on OBS side atm, so this won't complete: https://github.com/openSUSE/open-build-service/issues/6035 --- rabbit-openqa.py | 209 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 209 insertions(+) create mode 100644 rabbit-openqa.py diff --git a/rabbit-openqa.py b/rabbit-openqa.py new file mode 100644 index 00000000..5ebf79ff --- /dev/null +++ b/rabbit-openqa.py @@ -0,0 +1,209 @@ +#!/usr/bin/python + +import argparse +import pika +import sys +import json +import osc +import re +from osc.core import http_POST +from osclib.conf import Config +from osclib.stagingapi import StagingAPI +from lxml import etree as ET + +class Project(object): + def __init__(self, name): + Config(apiurl, name) + self.api = StagingAPI(apiurl, name) + self.staging_projects = dict() + for p in self.api.get_staging_projects(): + if self.api.is_adi_project(p): + continue + self.staging_projects[p] = self.initial_staging_state(p) + print(self.staging_projects) + + def staging_letter(self, name): + return name.split(':')[-1] + + def map_iso(self, staging_project, iso): + raise 'Unimplemented' + + def gather_isos(self, name, repository): + url = self.api.makeurl(['published', name, repository, 'iso']) + f = self.api.retried_GET(url) + root = ET.parse(f).getroot() + ret = [] + for entry in root.findall('entry'): + if entry.get('name').endswith('iso'): + ret.append(self.map_iso(name, entry.get('name'))) + return ret + + def initial_staging_state(self, name): + ret = {'isos': self.gather_isos(name, 'images')} + # missing API for initial repo id + return ret + + def update_staging_buildid(self, project, repository, buildid): + self.staging_projects[project]['id'] = buildid + self.staging_projects[project]['isos'] = self.gather_isos(project, repository) + print('UPDATE', project, self.staging_projects[project]) + + def check_published_repo(self, project, repository, buildid): + if repository != 'images': + return + for p in self.staging_projects: + if project == p: + self.update_staging_buildid(project, repository, buildid) + + def matching_project(self, iso): + for p in self.staging_projects: + if iso in self.staging_projects[p]['isos']: + return p + + def map_openqa_result(self, result): + if result in ['passed', 'softfailed']: + return 'success' + return 'failure' + + def openqa_done(self, iso, test, machine, id, result): + print('openqa_done', iso, test, machine, id, result) + staging = self.matching_project(iso) + if not staging: + return + buildid = self.staging_projects[staging].get('id') + if not buildid: + print("I don't know the build id of " + staging) + return + xml = self.openqa_check_xml(id, + self.map_openqa_result(result), + test + '@' + machine) + url = self.api.makeurl(['status_reports', 'published', staging, 'images', 'reports', buildid]) + http_POST(url, data=xml) + + def openqa_create(self, iso, test, machine, id): + print('openqa_create', iso, test, machine, id) + + def openqa_check_xml(self, id, state, name): + check = ET.Element('check') + se = ET.SubElement(check, 'url') + se.text = "https://openqa.suse.de/tests/{}".format(id) + se = ET.SubElement(check, 'state') + se.text = state + se = ET.SubElement(check, 'name') + se.text = name + return ET.tostring(check) + +class Listener(object): + def __init__(self, amqp_prefix, amqp_url): + self.projects = [] + self.amqp_prefix = amqp_prefix + self.amqp_url = amqp_url + connection = pika.BlockingConnection(pika.URLParameters(amqp_url)) + self.channel = connection.channel() + + self.channel.exchange_declare(exchange='pubsub', exchange_type='topic', passive=True, durable=True) + + result = self.channel.queue_declare(exclusive=True) + queue_name = result.method.queue + + self.channel.queue_bind(exchange='pubsub', + queue=queue_name,routing_key='#') + self.channel.basic_consume(self.on_message, + queue=queue_name, + no_ack=True) + + print(' [*] Waiting for logs. To exit press CTRL+C') + + def add(self, project): + self.projects.append(project) + + def on_published_repo(self, payload): + for p in self.projects: + p.check_published_repo(str(payload['project']), str(payload['repo']), str(payload['buildid'])) + + def on_openqa_create(self, payload): + for p in self.projects: + p.openqa_create(str(payload.get('ISO', '')), str(payload['TEST']), str(payload['MACHINE']), str(payload['id'])) + + def on_openqa_restart(self, payload): + print(payload) + + def on_openqa_done(self, payload): + for p in self.projects: + p.openqa_done(str(payload.get('ISO', '')), payload['TEST'], payload['MACHINE'], payload['id'], payload['result']) + + def on_message(self, unused_channel, method, properties, body): + if method.routing_key == '{}.obs.repo.published'.format(amqp_prefix): + self.on_published_repo(json.loads(body)) + + if method.routing_key == '{}.openqa.job.done'.format(amqp_prefix): + self.on_openqa_done(json.loads(body)) + if method.routing_key == '{}.openqa.job.create'.format(amqp_prefix): + self.on_openqa_create(json.loads(body)) + if method.routing_key == '{}.openqa.job.restart'.format(amqp_prefix): + self.on_openqa_restart(json.loads(body)) + + def listen(self): + self.channel.start_consuming() + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description='Bot to sync openQA status to OBS') + parser.add_argument("--apiurl", '-A', type=str, default='https://api.opensuse.org', help='API URL of OBS') + parser.add_argument('-s', '--staging', type=str, default=None, + help='staging project letter') + parser.add_argument('-f', '--force', action='store_true', default=False, + help='force the write of the comment') + parser.add_argument('-p', '--project', type=str, default='Factory', + help='openSUSE version to make the check (Factory, 13.2)') + parser.add_argument('-d', '--debug', action='store_true', default=False, + help='enable debug information') + + args = parser.parse_args() + + osc.conf.get_config() + osc.conf.config['debug'] = args.debug + + apiurl = args.apiurl + + if apiurl.endswith('suse.de'): + amqp_prefix = 'suse' + amqp_url = "amqps://suse:suse@rabbit.suse.de?heartbeat_interval=15" + else: + amqp_prefix = 'opensuse' + amqp_url = "amqps://opensuse:opensuse@rabbit.opensuse.org?heartbeat_interval=15" + + l = Listener(amqp_prefix, amqp_url) + if amqp_prefix == 'opensuse': + + class Leap15(Project): + def __init__(self, name): + super(name) + + l.add(Leap15('openSUSE:Leap:15.1')) + else: + class Sle15(Project): + def map_iso(self, project, iso): + # B: SLE-15-SP1-Installer-DVD-x86_64-Build67.2-Media1.iso + # A: SLE-15-SP1-Staging:D-Installer-DVD-x86_64-BuildD.67.2-Media1.iso + letter = self.staging_letter(project) + begin = re.sub(r'^(.*)-Installer.*', r'\1', iso) + middle = re.sub(r'^.*-(Installer.*-Build).*', r'\1', iso) + ending = re.sub(r'.*-Build', '', iso) + return "%s-Staging:%s-%s%s.%s" % (begin, letter, middle, letter, ending) + + l.add(Sle15('SUSE:SLE-15-SP1:GA')) + + class Sle12(Project): + def map_iso(self, project, iso): + # B: Test-Server-DVD-x86_64-Build42.1-Media.iso + # A: SLE12-SP4-Staging:Y-Test-Server-DVD-x86_64-BuildY.42.1-Media.iso + letter = self.staging_letter(project) + begin = re.sub(r'SUSE:SLE-(.*):GA.*', r'SLE\1', project) + middle = re.sub(r'^(.*-Build).*', r'\1', iso) + ending = re.sub(r'.*-Build', '', iso) + return "%s-Staging:%s-%s%s.%s" % (begin, letter, middle, letter, ending) + + l.add(Sle12('SUSE:SLE-12-SP4:GA')) + + l.listen() From dab685013352e7f21f7eecb941ada09379cb44a9 Mon Sep 17 00:00:00 2001 From: Stephan Kulow Date: Mon, 8 Oct 2018 10:37:28 +0200 Subject: [PATCH 02/14] Gather initial buildid of repositories --- rabbit-openqa.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) mode change 100644 => 100755 rabbit-openqa.py diff --git a/rabbit-openqa.py b/rabbit-openqa.py old mode 100644 new mode 100755 index 5ebf79ff..990c1ba1 --- a/rabbit-openqa.py +++ b/rabbit-openqa.py @@ -38,10 +38,16 @@ class Project(object): ret.append(self.map_iso(name, entry.get('name'))) return ret + def gather_buildid(self, name, repository): + url = self.api.makeurl(['published', name, repository], {'view': 'status'}) + f = self.api.retried_GET(url) + id = ET.parse(f).getroot().find('buildid') + if id is not None: + return id.text + def initial_staging_state(self, name): - ret = {'isos': self.gather_isos(name, 'images')} - # missing API for initial repo id - return ret + return {'isos': self.gather_isos(name, 'images'), + 'id': self.gather_buildid(name, 'images')} def update_staging_buildid(self, project, repository, buildid): self.staging_projects[project]['id'] = buildid @@ -79,7 +85,7 @@ class Project(object): test + '@' + machine) url = self.api.makeurl(['status_reports', 'published', staging, 'images', 'reports', buildid]) http_POST(url, data=xml) - + def openqa_create(self, iso, test, machine, id): print('openqa_create', iso, test, machine, id) From ecdc60065a52ad8658cdcbbddfa2a02256a6d12c Mon Sep 17 00:00:00 2001 From: Stephan Kulow Date: Mon, 8 Oct 2018 14:05:25 +0200 Subject: [PATCH 03/14] Fetch all openQA jobs for the ISO every time This is ugly on first look, but has several advantages: - we can more easily support a cold start - as such we don't need to have a persistant queue and can directly bind the routing keys we want - we do the same on all openqa events, simplifying the code - we can cope support short names for the checks The last is the most significant benefit (not yet implemented though). We can name the openqa jobs RAID1 and gnome and only have to append the machine name (or other settings) if they conflict --- rabbit-openqa.py | 117 +++++++++++++++++++++++++++++++---------------- 1 file changed, 78 insertions(+), 39 deletions(-) diff --git a/rabbit-openqa.py b/rabbit-openqa.py index 990c1ba1..9e538f62 100755 --- a/rabbit-openqa.py +++ b/rabbit-openqa.py @@ -10,12 +10,15 @@ from osc.core import http_POST from osclib.conf import Config from osclib.stagingapi import StagingAPI from lxml import etree as ET +from openqa_client.client import OpenQA_Client + class Project(object): def __init__(self, name): Config(apiurl, name) self.api = StagingAPI(apiurl, name) self.staging_projects = dict() + self.listener = None for p in self.api.get_staging_projects(): if self.api.is_adi_project(p): continue @@ -49,10 +52,36 @@ class Project(object): return {'isos': self.gather_isos(name, 'images'), 'id': self.gather_buildid(name, 'images')} + # once the project is added to a listener, it's calling back + def fetch_initial_openqa(self, listener): + self.listener = listener + for project in self.staging_projects: + self.update_staging_status(project) + + def fetch_openqa_jobs(self, staging, iso): + buildid = self.staging_projects[staging].get('id') + if not buildid: + print("I don't know the build id of " + staging) + return + openqa = self.listener.jobs_for_iso(iso) + for job in openqa: + print(staging, iso, job['id'], job['state'], job['result'], + job['settings']['MACHINE'], job['settings']['TEST']) + xml = self.openqa_check_xml(self.listener.test_url(job), + self.map_openqa_result(job), + job['settings']['TEST'] + '@' + job['settings']['MACHINE']) + url = self.api.makeurl(['status_reports', 'published', staging, 'images', 'reports', buildid]) + http_POST(url, data=xml) + print(url, xml) + + def update_staging_status(self, project): + for iso in self.staging_projects[project]['isos']: + self.fetch_openqa_jobs(project, iso) + def update_staging_buildid(self, project, repository, buildid): self.staging_projects[project]['id'] = buildid self.staging_projects[project]['isos'] = self.gather_isos(project, repository) - print('UPDATE', project, self.staging_projects[project]) + self.update_staging_status(project) def check_published_repo(self, project, repository, buildid): if repository != 'images': @@ -66,45 +95,42 @@ class Project(object): if iso in self.staging_projects[p]['isos']: return p - def map_openqa_result(self, result): - if result in ['passed', 'softfailed']: + def map_openqa_result(self, job): + if job['result'] in ['passed', 'softfailed']: return 'success' + if job['result'] == 'none': + return 'pending' return 'failure' - def openqa_done(self, iso, test, machine, id, result): - print('openqa_done', iso, test, machine, id, result) + def openqa_job_change(self, iso): staging = self.matching_project(iso) if not staging: return - buildid = self.staging_projects[staging].get('id') - if not buildid: - print("I don't know the build id of " + staging) - return - xml = self.openqa_check_xml(id, - self.map_openqa_result(result), - test + '@' + machine) - url = self.api.makeurl(['status_reports', 'published', staging, 'images', 'reports', buildid]) - http_POST(url, data=xml) + # we fetch all openqa jobs so we can avoid long job names + self.fetch_openqa_jobs(staging, iso) - def openqa_create(self, iso, test, machine, id): - print('openqa_create', iso, test, machine, id) - - def openqa_check_xml(self, id, state, name): + def openqa_check_xml(self, url, state, name): check = ET.Element('check') se = ET.SubElement(check, 'url') - se.text = "https://openqa.suse.de/tests/{}".format(id) + se.text = url se = ET.SubElement(check, 'state') se.text = state se = ET.SubElement(check, 'name') se.text = name return ET.tostring(check) + class Listener(object): - def __init__(self, amqp_prefix, amqp_url): + def __init__(self, amqp_prefix, amqp_url, openqa_url): self.projects = [] self.amqp_prefix = amqp_prefix self.amqp_url = amqp_url - connection = pika.BlockingConnection(pika.URLParameters(amqp_url)) + self.openqa_url = openqa_url + self.openqa = OpenQA_Client(server=openqa_url) + self.setup_rabbitmq() + + def setup_rabbitmq(self): + connection = pika.BlockingConnection(pika.URLParameters(self.amqp_url)) self.channel = connection.channel() self.channel.exchange_declare(exchange='pubsub', exchange_type='topic', passive=True, durable=True) @@ -112,46 +138,56 @@ class Listener(object): result = self.channel.queue_declare(exclusive=True) queue_name = result.method.queue - self.channel.queue_bind(exchange='pubsub', - queue=queue_name,routing_key='#') + for event in ['.obs.repo.published', '.openqa.job.done', + '.openqa.job.create', '.openqa.job.restart']: + self.channel.queue_bind(exchange='pubsub', + queue=queue_name, + routing_key=self.amqp_prefix + event) self.channel.basic_consume(self.on_message, queue=queue_name, - no_ack=True) - - print(' [*] Waiting for logs. To exit press CTRL+C') + no_ack=True) def add(self, project): + project.fetch_initial_openqa(self) self.projects.append(project) + def jobs_for_iso(self, iso): + values = { + 'iso': iso, + 'scope': 'current', + 'latest': '1', + } + return self.openqa.openqa_request('GET', 'jobs', values)['jobs'] + + def test_url(self, job): + # TODO: link to failing module if failed + return self.openqa_url + ("/tests/%d" % job['id']) + def on_published_repo(self, payload): for p in self.projects: p.check_published_repo(str(payload['project']), str(payload['repo']), str(payload['buildid'])) - def on_openqa_create(self, payload): + def on_openqa_job(self, iso): + print('openqa_job_change', iso) for p in self.projects: - p.openqa_create(str(payload.get('ISO', '')), str(payload['TEST']), str(payload['MACHINE']), str(payload['id'])) - - def on_openqa_restart(self, payload): - print(payload) - - def on_openqa_done(self, payload): - for p in self.projects: - p.openqa_done(str(payload.get('ISO', '')), payload['TEST'], payload['MACHINE'], payload['id'], payload['result']) + p.openqa_job_change(iso) def on_message(self, unused_channel, method, properties, body): if method.routing_key == '{}.obs.repo.published'.format(amqp_prefix): self.on_published_repo(json.loads(body)) if method.routing_key == '{}.openqa.job.done'.format(amqp_prefix): - self.on_openqa_done(json.loads(body)) + self.on_openqa_job(json.loads(body).get('ISO')) if method.routing_key == '{}.openqa.job.create'.format(amqp_prefix): - self.on_openqa_create(json.loads(body)) + self.on_openqa_job(json.loads(body).get('ISO')) if method.routing_key == '{}.openqa.job.restart'.format(amqp_prefix): - self.on_openqa_restart(json.loads(body)) + self.on_openqa_job(json.loads(body).get('ISO')) def listen(self): + print(' [*] Waiting for logs. To exit press CTRL+C') self.channel.start_consuming() + if __name__ == '__main__': parser = argparse.ArgumentParser( description='Bot to sync openQA status to OBS') @@ -175,11 +211,14 @@ if __name__ == '__main__': if apiurl.endswith('suse.de'): amqp_prefix = 'suse' amqp_url = "amqps://suse:suse@rabbit.suse.de?heartbeat_interval=15" + openqa_url = 'https://openqa.suse.de' else: amqp_prefix = 'opensuse' amqp_url = "amqps://opensuse:opensuse@rabbit.opensuse.org?heartbeat_interval=15" + openqa_url = 'https://openqa.opensuse.org' + + l = Listener(amqp_prefix, amqp_url, openqa_url) - l = Listener(amqp_prefix, amqp_url) if amqp_prefix == 'opensuse': class Leap15(Project): From 6f5978cab6e0920104619bb104a0661432d06a1e Mon Sep 17 00:00:00 2001 From: Stephan Kulow Date: Mon, 8 Oct 2018 15:43:45 +0200 Subject: [PATCH 04/14] Link to failed test modules SLE unfortunately does not fail atm, so I had to implement leap :) --- rabbit-openqa.py | 37 +++++++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/rabbit-openqa.py b/rabbit-openqa.py index 9e538f62..6cfeccfa 100755 --- a/rabbit-openqa.py +++ b/rabbit-openqa.py @@ -11,7 +11,13 @@ from osclib.conf import Config from osclib.stagingapi import StagingAPI from lxml import etree as ET from openqa_client.client import OpenQA_Client - +from urllib import quote_plus +import requests +try: + from urllib.error import HTTPError +except ImportError: + #python 2.x + from urllib2 import HTTPError class Project(object): def __init__(self, name): @@ -71,8 +77,12 @@ class Project(object): self.map_openqa_result(job), job['settings']['TEST'] + '@' + job['settings']['MACHINE']) url = self.api.makeurl(['status_reports', 'published', staging, 'images', 'reports', buildid]) - http_POST(url, data=xml) - print(url, xml) + try: + http_POST(url, data=xml) + except HTTPError: + # https://github.com/openSUSE/open-build-service/issues/6051 + print('failed to post status to ' + url) + print(xml) def update_staging_status(self, project): for iso in self.staging_projects[project]['isos']: @@ -119,7 +129,6 @@ class Project(object): se.text = name return ET.tostring(check) - class Listener(object): def __init__(self, amqp_prefix, amqp_url, openqa_url): self.projects = [] @@ -159,9 +168,19 @@ class Listener(object): } return self.openqa.openqa_request('GET', 'jobs', values)['jobs'] + def get_step_url(self, testurl, modulename): + failurl = testurl + '/modules/{!s}/fails'.format(quote_plus(modulename)) + fails = requests.get(failurl).json() + failed_step = fails.get('first_failed_step', 1) + return "{!s}#step/{!s}/{:d}".format(testurl, modulename, failed_step) + def test_url(self, job): - # TODO: link to failing module if failed - return self.openqa_url + ("/tests/%d" % job['id']) + url = self.openqa_url + ("/tests/%d" % job['id']) + if job['result'] == 'failed': + for module in job['modules']: + if module['result'] == 'failed': + return self.get_step_url(url, module['name']) + return url def on_published_repo(self, payload): for p in self.projects: @@ -222,8 +241,10 @@ if __name__ == '__main__': if amqp_prefix == 'opensuse': class Leap15(Project): - def __init__(self, name): - super(name) + def map_iso(self, project, iso): + # B: openSUSE-Leap-15.1-DVD-x86_64-Build21.3-Media.iso + # A: openSUSE-Leap:15.1-Staging:D-Staging-DVD-x86_64-Build21.3-Media.iso + return re.sub(r'Leap-(.*)-DVD', r'Leap:\1-Staging:' + self.staging_letter(project) + '-Staging-DVD', iso) l.add(Leap15('openSUSE:Leap:15.1')) else: From b886e28db337e31981984ad4a4f8edf3f59f2aa9 Mon Sep 17 00:00:00 2001 From: Stephan Kulow Date: Mon, 8 Oct 2018 19:47:16 +0200 Subject: [PATCH 05/14] No need to if loop - we do the same for all of openqa --- rabbit-openqa.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/rabbit-openqa.py b/rabbit-openqa.py index 6cfeccfa..49762544 100755 --- a/rabbit-openqa.py +++ b/rabbit-openqa.py @@ -194,13 +194,10 @@ class Listener(object): def on_message(self, unused_channel, method, properties, body): if method.routing_key == '{}.obs.repo.published'.format(amqp_prefix): self.on_published_repo(json.loads(body)) - - if method.routing_key == '{}.openqa.job.done'.format(amqp_prefix): - self.on_openqa_job(json.loads(body).get('ISO')) - if method.routing_key == '{}.openqa.job.create'.format(amqp_prefix): - self.on_openqa_job(json.loads(body).get('ISO')) - if method.routing_key == '{}.openqa.job.restart'.format(amqp_prefix): + elif re.search(r'.openqa.', method.routing_key): self.on_openqa_job(json.loads(body).get('ISO')) + else: + print("unknown rabbitmq message {}".format(method.routing_key)) def listen(self): print(' [*] Waiting for logs. To exit press CTRL+C') From b38a1edc3489f733b7d206cdaf1e0088c74e788b Mon Sep 17 00:00:00 2001 From: Stephan Kulow Date: Mon, 8 Oct 2018 20:09:10 +0200 Subject: [PATCH 06/14] Make the names of the openQA checks unique Prefer 'minimal_x' unless there are 2 (the code doesn't support more than 2) --- rabbit-openqa.py | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/rabbit-openqa.py b/rabbit-openqa.py index 49762544..7fc82384 100755 --- a/rabbit-openqa.py +++ b/rabbit-openqa.py @@ -16,9 +16,10 @@ import requests try: from urllib.error import HTTPError except ImportError: - #python 2.x + # python 2.x from urllib2 import HTTPError + class Project(object): def __init__(self, name): Config(apiurl, name) @@ -69,14 +70,32 @@ class Project(object): if not buildid: print("I don't know the build id of " + staging) return + # all openQA jobs are created at the same URL + url = self.api.makeurl(['status_reports', 'published', staging, 'images', 'reports', buildid]) openqa = self.listener.jobs_for_iso(iso) + # collect job infos to pick names + openqa_infos = dict() for job in openqa: print(staging, iso, job['id'], job['state'], job['result'], job['settings']['MACHINE'], job['settings']['TEST']) - xml = self.openqa_check_xml(self.listener.test_url(job), - self.map_openqa_result(job), - job['settings']['TEST'] + '@' + job['settings']['MACHINE']) - url = self.api.makeurl(['status_reports', 'published', staging, 'images', 'reports', buildid]) + openqa_infos[job['id']] = {'url': self.listener.test_url(job)} + openqa_infos[job['id']]['state'] = self.map_openqa_result(job) + openqa_infos[job['id']]['name'] = job['settings']['TEST'] + openqa_infos[job['id']]['machine'] = job['settings']['MACHINE'] + + # make sure the names are unique + taken_names = dict() + for id in openqa_infos: + name = openqa_infos[id]['name'] + if name in taken_names: + openqa_infos[id]['name'] = openqa_infos[id]['name'] + "@" + openqa_infos[id]['machine'] + # the other id + id = taken_names[name] + openqa_infos[id]['name'] = openqa_infos[id]['name'] + "@" + openqa_infos[id]['machine'] + taken_names[name] = id + + for info in openqa_infos.values(): + xml = self.openqa_check_xml(info['url'], info['state'], info['name']) try: http_POST(url, data=xml) except HTTPError: @@ -129,6 +148,7 @@ class Project(object): se.text = name return ET.tostring(check) + class Listener(object): def __init__(self, amqp_prefix, amqp_url, openqa_url): self.projects = [] From 3933e4e30222ae155ec5c81cd110c47af85f82c4 Mon Sep 17 00:00:00 2001 From: Stephan Kulow Date: Tue, 9 Oct 2018 08:38:23 +0200 Subject: [PATCH 07/14] Package rabbit-openqa I don't really like the name, but lack alternative ideas --- dist/package/openSUSE-release-tools.spec | 25 ++++++++++++++++++++++++ systemd/osrt-rabbit-openqa.service | 9 +++++++++ 2 files changed, 34 insertions(+) create mode 100644 systemd/osrt-rabbit-openqa.service diff --git a/dist/package/openSUSE-release-tools.spec b/dist/package/openSUSE-release-tools.spec index 4efb0184..2e58f4a6 100644 --- a/dist/package/openSUSE-release-tools.spec +++ b/dist/package/openSUSE-release-tools.spec @@ -293,6 +293,16 @@ Requires: osc >= 0.159.0 %description -n osc-plugin-vdelreq OSC plugin to check for virtually accepted request, see `osc vdelreq --help`. +%package rabbit-openqa +Summary: Sync openQA Status Into OBS +Group: Development/Tools/Other +BuildArch: noarch +Requires: osc >= 0.159.0 + +%description rabbit-openqa +Bot listening to AMQP bus and syncs openQA job status into OBS for +staging projects + %prep %setup -q @@ -388,6 +398,14 @@ fi %postun pkglistgen %systemd_postun +%pre rabbit-openqa +getent passwd osrt-rabit-openqa > /dev/null || \ + useradd -r -m -s /sbin/nologin -c "user for openSUSE-release-tools-rabbit-openqa" osrt-rabit-openqa +exit 0 + +%postun rabbit-openqa +%systemd_postun + %files %defattr(-,root,root,-) %doc README.md @@ -436,6 +454,7 @@ fi %exclude %{_datadir}/%{source_dir}/osc-staging.py %exclude %{_datadir}/%{source_dir}/osc-vdelreq.py %exclude %{_datadir}/%{source_dir}/update_crawler.py +%exclude %{_datadir}/%{source_dir}/rabbit-openqa.py %dir %{_sysconfdir}/openSUSE-release-tools %files devel @@ -571,6 +590,12 @@ fi %{_unitdir}/osrt-pkglistgen@.service %{_unitdir}/osrt-pkglistgen@.timer +%files rabbit-openqa +%defattr(-,root,root,-) +%{_bindir}/osrt-rabbit-openqa +%{_datadir}/%{source_dir}/rabbit-openqa.py +%{_unitdir}/osrt-rabbit-openqa.service + %files -n osclib %defattr(-,root,root,-) %{_datadir}/%{source_dir}/osclib diff --git a/systemd/osrt-rabbit-openqa.service b/systemd/osrt-rabbit-openqa.service new file mode 100644 index 00000000..8c83097f --- /dev/null +++ b/systemd/osrt-rabbit-openqa.service @@ -0,0 +1,9 @@ +[Unit] +Description=openSUSE Release Tools: Sync openQA status + +[Service] +User=osrt-rabbit-openqa +ExecStart=/usr/bin/osrt-rabbit-openqa + +[Install] +WantedBy=multi-user.target From a8d7b769c1b64b1373920422539b5b3e38c2fea1 Mon Sep 17 00:00:00 2001 From: Stephan Kulow Date: Tue, 9 Oct 2018 09:07:34 +0200 Subject: [PATCH 08/14] Find a sed like syntax for Staging ISOs This can be moved to remote config --- rabbit-openqa.py | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/rabbit-openqa.py b/rabbit-openqa.py index 7fc82384..1baab6d3 100755 --- a/rabbit-openqa.py +++ b/rabbit-openqa.py @@ -36,7 +36,13 @@ class Project(object): return name.split(':')[-1] def map_iso(self, staging_project, iso): - raise 'Unimplemented' + parts = self.replace_string().split('/') + if parts[0] != 's': + raise Exception("{}'s iso_replace_string does not start with s/".format(self.name)) + old = parts[1] + new = parts[2] + new = new.replace('$LETTER', self.staging_letter(staging_project)) + return re.compile(old).sub(new, iso) def gather_isos(self, name, repository): url = self.api.makeurl(['published', name, repository, 'iso']) @@ -258,35 +264,27 @@ if __name__ == '__main__': if amqp_prefix == 'opensuse': class Leap15(Project): - def map_iso(self, project, iso): + def replace_string(self): # B: openSUSE-Leap-15.1-DVD-x86_64-Build21.3-Media.iso # A: openSUSE-Leap:15.1-Staging:D-Staging-DVD-x86_64-Build21.3-Media.iso - return re.sub(r'Leap-(.*)-DVD', r'Leap:\1-Staging:' + self.staging_letter(project) + '-Staging-DVD', iso) + return 's/Leap-(.*)-DVD/Leap:\g<1>-Staging:$LETTER-Staging-DVD/' l.add(Leap15('openSUSE:Leap:15.1')) else: class Sle15(Project): - def map_iso(self, project, iso): + def replace_string(self): # B: SLE-15-SP1-Installer-DVD-x86_64-Build67.2-Media1.iso # A: SLE-15-SP1-Staging:D-Installer-DVD-x86_64-BuildD.67.2-Media1.iso - letter = self.staging_letter(project) - begin = re.sub(r'^(.*)-Installer.*', r'\1', iso) - middle = re.sub(r'^.*-(Installer.*-Build).*', r'\1', iso) - ending = re.sub(r'.*-Build', '', iso) - return "%s-Staging:%s-%s%s.%s" % (begin, letter, middle, letter, ending) + return 's/^(SLE-.*)-Installer-(.*)Build/\g<1>-Staging:$LETTER-Installer-\g<2>Build$LETTER./' l.add(Sle15('SUSE:SLE-15-SP1:GA')) - class Sle12(Project): - def map_iso(self, project, iso): + class Sle12sp4(Project): + def replace_string(self): # B: Test-Server-DVD-x86_64-Build42.1-Media.iso # A: SLE12-SP4-Staging:Y-Test-Server-DVD-x86_64-BuildY.42.1-Media.iso - letter = self.staging_letter(project) - begin = re.sub(r'SUSE:SLE-(.*):GA.*', r'SLE\1', project) - middle = re.sub(r'^(.*-Build).*', r'\1', iso) - ending = re.sub(r'.*-Build', '', iso) - return "%s-Staging:%s-%s%s.%s" % (begin, letter, middle, letter, ending) + return 's/^(Test.*Build)/SLE12-SP4-Staging:$LETTER-\g<1>$LETTER/' - l.add(Sle12('SUSE:SLE-12-SP4:GA')) + l.add(Sle12sp4('SUSE:SLE-12-SP4:GA')) l.listen() From ceaef3ea8856edefbcceacab8361c57ed148627a Mon Sep 17 00:00:00 2001 From: Stephan Kulow Date: Wed, 10 Oct 2018 20:59:16 +0200 Subject: [PATCH 09/14] Moved the ISO replace_string into OBS attributes --- rabbit-openqa.py | 37 ++++++++----------------------------- 1 file changed, 8 insertions(+), 29 deletions(-) diff --git a/rabbit-openqa.py b/rabbit-openqa.py index 1baab6d3..339e05d7 100755 --- a/rabbit-openqa.py +++ b/rabbit-openqa.py @@ -6,7 +6,7 @@ import sys import json import osc import re -from osc.core import http_POST +from osc.core import http_GET, http_POST, makeurl from osclib.conf import Config from osclib.stagingapi import StagingAPI from lxml import etree as ET @@ -26,17 +26,17 @@ class Project(object): self.api = StagingAPI(apiurl, name) self.staging_projects = dict() self.listener = None + self.replace_string = self.api.attribute_value_load('OpenQAMapping') for p in self.api.get_staging_projects(): if self.api.is_adi_project(p): continue self.staging_projects[p] = self.initial_staging_state(p) - print(self.staging_projects) def staging_letter(self, name): return name.split(':')[-1] def map_iso(self, staging_project, iso): - parts = self.replace_string().split('/') + parts = self.replace_string.split('/') if parts[0] != 's': raise Exception("{}'s iso_replace_string does not start with s/".format(self.name)) old = parts[1] @@ -260,31 +260,10 @@ if __name__ == '__main__': openqa_url = 'https://openqa.opensuse.org' l = Listener(amqp_prefix, amqp_url, openqa_url) - - if amqp_prefix == 'opensuse': - - class Leap15(Project): - def replace_string(self): - # B: openSUSE-Leap-15.1-DVD-x86_64-Build21.3-Media.iso - # A: openSUSE-Leap:15.1-Staging:D-Staging-DVD-x86_64-Build21.3-Media.iso - return 's/Leap-(.*)-DVD/Leap:\g<1>-Staging:$LETTER-Staging-DVD/' - - l.add(Leap15('openSUSE:Leap:15.1')) - else: - class Sle15(Project): - def replace_string(self): - # B: SLE-15-SP1-Installer-DVD-x86_64-Build67.2-Media1.iso - # A: SLE-15-SP1-Staging:D-Installer-DVD-x86_64-BuildD.67.2-Media1.iso - return 's/^(SLE-.*)-Installer-(.*)Build/\g<1>-Staging:$LETTER-Installer-\g<2>Build$LETTER./' - - l.add(Sle15('SUSE:SLE-15-SP1:GA')) - - class Sle12sp4(Project): - def replace_string(self): - # B: Test-Server-DVD-x86_64-Build42.1-Media.iso - # A: SLE12-SP4-Staging:Y-Test-Server-DVD-x86_64-BuildY.42.1-Media.iso - return 's/^(Test.*Build)/SLE12-SP4-Staging:$LETTER-\g<1>$LETTER/' - - l.add(Sle12sp4('SUSE:SLE-12-SP4:GA')) + url = makeurl(apiurl, ['search', 'project', 'id'], {'match': 'attribute/@name="OSRT:OpenQAMapping"'}) + f = http_GET(url) + root = ET.parse(f).getroot() + for entry in root.findall('project'): + l.add(Project(entry.get('name'))) l.listen() From ae7f35c7e261c34b75ce721ae9737b779e45278f Mon Sep 17 00:00:00 2001 From: Stephan Kulow Date: Thu, 11 Oct 2018 15:10:41 +0200 Subject: [PATCH 10/14] Adopt pika example on async --- consumer.py | 333 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 333 insertions(+) create mode 100644 consumer.py diff --git a/consumer.py b/consumer.py new file mode 100644 index 00000000..7523ae13 --- /dev/null +++ b/consumer.py @@ -0,0 +1,333 @@ +import logging +import pika + +LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) ' + '-35s %(lineno) -5d: %(message)s') +LOGGER = logging.getLogger(__name__) + + +class ExampleConsumer(object): + """This is an example consumer that will handle unexpected interactions + with RabbitMQ such as channel and connection closures. + + If RabbitMQ closes the connection, it will reopen it. You should + look at the output, as there are limited reasons why the connection may + be closed, which usually are tied to permission related issues or + socket timeouts. + + If the channel is closed, it will indicate a problem with one of the + commands that were issued and that should surface in the output as well. + + """ + ROUTING_KEY = '#' + + def __init__(self, amqp_url): + """Create a new instance of the consumer class, passing in the AMQP + URL used to connect to RabbitMQ. + + :param str amqp_url: The AMQP url to connect with + + """ + self._connection = None + self._channel = None + self._closing = False + self._consumer_tag = None + self._url = amqp_url + + def connect(self): + """This method connects to RabbitMQ, returning the connection handle. + When the connection is established, the on_connection_open method + will be invoked by pika. + + :rtype: pika.SelectConnection + + """ + LOGGER.info('Connecting to %s', self._url) + return pika.SelectConnection(pika.URLParameters(self._url), + self.on_connection_open, + stop_ioloop_on_close=False) + + def close_connection(self): + """This method closes the connection to RabbitMQ.""" + LOGGER.info('Closing connection') + self._connection.close() + + def add_on_connection_close_callback(self): + """This method adds an on close callback that will be invoked by pika + when RabbitMQ closes the connection to the publisher unexpectedly. + + """ + LOGGER.info('Adding connection close callback') + self._connection.add_on_close_callback(self.on_connection_closed) + + def on_connection_closed(self, connection, reply_code, reply_text): + """This method is invoked by pika when the connection to RabbitMQ is + closed unexpectedly. Since it is unexpected, we will reconnect to + RabbitMQ if it disconnects. + + :param pika.connection.Connection connection: The closed connection obj + :param int reply_code: The server provided reply_code if given + :param str reply_text: The server provided reply_text if given + + """ + self._channel = None + if self._closing: + self._connection.ioloop.stop() + else: + LOGGER.warning('Connection closed, reopening in 5 seconds: (%s) %s', + reply_code, reply_text) + self._connection.add_timeout(5, self.reconnect) + + def on_connection_open(self, unused_connection): + """This method is called by pika once the connection to RabbitMQ has + been established. It passes the handle to the connection object in + case we need it, but in this case, we'll just mark it unused. + + :type unused_connection: pika.SelectConnection + + """ + LOGGER.info('Connection opened') + self.add_on_connection_close_callback() + self.open_channel() + + def reconnect(self): + """Will be invoked by the IOLoop timer if the connection is + closed. See the on_connection_closed method. + + """ + # This is the old connection IOLoop instance, stop its ioloop + self._connection.ioloop.stop() + + if not self._closing: + + # Create a new connection + self._connection = self.connect() + + # There is now a new connection, needs a new ioloop to run + self._connection.ioloop.start() + + def add_on_channel_close_callback(self): + """This method tells pika to call the on_channel_closed method if + RabbitMQ unexpectedly closes the channel. + + """ + LOGGER.info('Adding channel close callback') + self._channel.add_on_close_callback(self.on_channel_closed) + + def on_channel_closed(self, channel, reply_code, reply_text): + """Invoked by pika when RabbitMQ unexpectedly closes the channel. + Channels are usually closed if you attempt to do something that + violates the protocol, such as re-declare an exchange or queue with + different parameters. In this case, we'll close the connection + to shutdown the object. + + :param pika.channel.Channel: The closed channel + :param int reply_code: The numeric reason the channel was closed + :param str reply_text: The text reason the channel was closed + + """ + LOGGER.warning('Channel %i was closed: (%s) %s', + channel, reply_code, reply_text) + self._connection.close() + + def on_channel_open(self, channel): + """This method is invoked by pika when the channel has been opened. + The channel object is passed in so we can make use of it. + + Since the channel is now open, we'll declare the exchange to use. + + :param pika.channel.Channel channel: The channel object + + """ + LOGGER.info('Channel opened') + self._channel = channel + self.add_on_channel_close_callback() + self.setup_exchange('pubsub') + + def setup_exchange(self, exchange_name): + """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC + command. When it is complete, the on_exchange_declareok method will + be invoked by pika. + + :param str|unicode exchange_name: The name of the exchange to declare + + """ + LOGGER.info('Declaring exchange %s', exchange_name) + self._channel.exchange_declare(self.on_exchange_declareok, + exchange=exchange_name, + exchange_type='topic', + passive=True, durable=True) + + def on_exchange_declareok(self, unused_frame): + """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC + command. + + :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame + + """ + LOGGER.info('Exchange declared') + self._channel.queue_declare(self.on_queue_declareok, exclusive=True) + + def on_queue_declareok(self, method_frame): + """Method invoked by pika when the Queue.Declare RPC call made in + setup_queue has completed. In this method we will bind the queue + and exchange together with the routing key by issuing the Queue.Bind + RPC command. When this command is complete, the on_bindok method will + be invoked by pika. + + :param pika.frame.Method method_frame: The Queue.DeclareOk frame + + """ + self.queue_name = method_frame.method.queue + self.routing_keys_to_bind = self.routing_keys() + self.bind_queue_to_routing_key(self.routing_keys_to_bind.pop()) + + def routing_keys(self): + return ['opensuse.obs.repo.published', 'opensuse.openqa.job.done'] + + def bind_queue_to_routing_key(self, key): + LOGGER.info('Binding %s to %s', key, self.queue_name) + self._channel.queue_bind(self.on_bindok, self.queue_name, 'pubsub', key) + + def add_on_cancel_callback(self): + """Add a callback that will be invoked if RabbitMQ cancels the consumer + for some reason. If RabbitMQ does cancel the consumer, + on_consumer_cancelled will be invoked by pika. + + """ + LOGGER.info('Adding consumer cancellation callback') + self._channel.add_on_cancel_callback(self.on_consumer_cancelled) + + def on_consumer_cancelled(self, method_frame): + """Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer + receiving messages. + + :param pika.frame.Method method_frame: The Basic.Cancel frame + + """ + LOGGER.info('Consumer was cancelled remotely, shutting down: %r', + method_frame) + if self._channel: + self._channel.close() + + def on_message(self, unused_channel, basic_deliver, properties, body): + """Invoked by pika when a message is delivered from RabbitMQ. The + channel is passed for your convenience. The basic_deliver object that + is passed in carries the exchange, routing key, delivery tag and + a redelivered flag for the message. The properties passed in is an + instance of BasicProperties with the message properties and the body + is the message that was sent. + + :param pika.channel.Channel unused_channel: The channel object + :param pika.Spec.Basic.Deliver: basic_deliver method + :param pika.Spec.BasicProperties: properties + :param str|unicode body: The message body + + """ + LOGGER.info('Received message # %s: %s %s', + basic_deliver.delivery_tag, basic_deliver.routing_key, body) + + def on_cancelok(self, unused_frame): + """This method is invoked by pika when RabbitMQ acknowledges the + cancellation of a consumer. At this point we will close the channel. + This will invoke the on_channel_closed method once the channel has been + closed, which will in-turn close the connection. + + :param pika.frame.Method unused_frame: The Basic.CancelOk frame + + """ + LOGGER.info('RabbitMQ acknowledged the cancellation of the consumer') + self.close_channel() + + def stop_consuming(self): + """Tell RabbitMQ that you would like to stop consuming by sending the + Basic.Cancel RPC command. + + """ + if self._channel: + LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ') + self._channel.basic_cancel(self.on_cancelok, self._consumer_tag) + + def start_consuming(self): + """This method sets up the consumer by first calling + add_on_cancel_callback so that the object is notified if RabbitMQ + cancels the consumer. It then issues the Basic.Consume RPC command + which returns the consumer tag that is used to uniquely identify the + consumer with RabbitMQ. We keep the value to use it when we want to + cancel consuming. The on_message method is passed in as a callback pika + will invoke when a message is fully received. + + """ + LOGGER.info('Issuing consumer related RPC commands') + self.add_on_cancel_callback() + self._consumer_tag = self._channel.basic_consume(self.on_message, + self.queue_name, no_ack=True) + + def on_bindok(self, unused_frame): + """Invoked by pika when the Queue.Bind method has completed. At this + point we will start consuming messages by calling start_consuming + which will invoke the needed RPC commands to start the process. + + :param pika.frame.Method unused_frame: The Queue.BindOk response frame + + """ + LOGGER.info('Queue bound') + if len(self.routing_keys_to_bind): + self.bind_queue_to_routing_key(self.routing_keys_to_bind.pop()) + else: + self.start_consuming() + + def close_channel(self): + """Call to close the channel with RabbitMQ cleanly by issuing the + Channel.Close RPC command. + + """ + LOGGER.info('Closing the channel') + self._channel.close() + + def open_channel(self): + """Open a new channel with RabbitMQ by issuing the Channel.Open RPC + command. When RabbitMQ responds that the channel is open, the + on_channel_open callback will be invoked by pika. + + """ + LOGGER.info('Creating a new channel') + self._connection.channel(on_open_callback=self.on_channel_open) + + def run(self): + """Run the example consumer by connecting to RabbitMQ and then + starting the IOLoop to block and allow the SelectConnection to operate. + + """ + self._connection = self.connect() + self._connection.ioloop.start() + + def stop(self): + """Cleanly shutdown the connection to RabbitMQ by stopping the consumer + with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok + will be invoked by pika, which will then closing the channel and + connection. The IOLoop is started again because this method is invoked + when CTRL-C is pressed raising a KeyboardInterrupt exception. This + exception stops the IOLoop which needs to be running for pika to + communicate with RabbitMQ. All of the commands issued prior to starting + the IOLoop will be buffered but not processed. + + """ + LOGGER.info('Stopping') + self._closing = True + self.stop_consuming() + self._connection.ioloop.start() + LOGGER.info('Stopped') + + +def main(): + logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) + example = ExampleConsumer('amqps://opensuse:opensuse@rabbit.opensuse.org') + try: + example.run() + except KeyboardInterrupt: + example.stop() + + +if __name__ == '__main__': + main() From d5947f5c193d03b239223ee02cb10c34031d7e5a Mon Sep 17 00:00:00 2001 From: Stephan Kulow Date: Fri, 12 Oct 2018 11:06:54 +0200 Subject: [PATCH 11/14] Refactored rabbit-openqa to be based on PubSubConsumer The base class listens to openSUSE pubsub messages (and can be used standalone) and rabbit-openqa just overwrites the keys and the on_message handler --- consumer.py => PubSubConsumer.py | 70 ++++++++++++++++---------------- rabbit-openqa.py | 51 +++++++++-------------- 2 files changed, 55 insertions(+), 66 deletions(-) rename consumer.py => PubSubConsumer.py (85%) diff --git a/consumer.py b/PubSubConsumer.py similarity index 85% rename from consumer.py rename to PubSubConsumer.py index 7523ae13..d73e3aa3 100644 --- a/consumer.py +++ b/PubSubConsumer.py @@ -1,12 +1,8 @@ import logging import pika -LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) ' - '-35s %(lineno) -5d: %(message)s') -LOGGER = logging.getLogger(__name__) - -class ExampleConsumer(object): +class PubSubConsumer(object): """This is an example consumer that will handle unexpected interactions with RabbitMQ such as channel and connection closures. @@ -19,9 +15,8 @@ class ExampleConsumer(object): commands that were issued and that should surface in the output as well. """ - ROUTING_KEY = '#' - def __init__(self, amqp_url): + def __init__(self, amqp_url, logger): """Create a new instance of the consumer class, passing in the AMQP URL used to connect to RabbitMQ. @@ -33,6 +28,7 @@ class ExampleConsumer(object): self._closing = False self._consumer_tag = None self._url = amqp_url + self.logger = logger def connect(self): """This method connects to RabbitMQ, returning the connection handle. @@ -42,14 +38,14 @@ class ExampleConsumer(object): :rtype: pika.SelectConnection """ - LOGGER.info('Connecting to %s', self._url) + self.logger.info('Connecting to %s', self._url) return pika.SelectConnection(pika.URLParameters(self._url), self.on_connection_open, stop_ioloop_on_close=False) def close_connection(self): """This method closes the connection to RabbitMQ.""" - LOGGER.info('Closing connection') + self.logger.info('Closing connection') self._connection.close() def add_on_connection_close_callback(self): @@ -57,7 +53,7 @@ class ExampleConsumer(object): when RabbitMQ closes the connection to the publisher unexpectedly. """ - LOGGER.info('Adding connection close callback') + self.logger.debug('Adding connection close callback') self._connection.add_on_close_callback(self.on_connection_closed) def on_connection_closed(self, connection, reply_code, reply_text): @@ -74,8 +70,8 @@ class ExampleConsumer(object): if self._closing: self._connection.ioloop.stop() else: - LOGGER.warning('Connection closed, reopening in 5 seconds: (%s) %s', - reply_code, reply_text) + self.logger.warning('Connection closed, reopening in 5 seconds: (%s) %s', + reply_code, reply_text) self._connection.add_timeout(5, self.reconnect) def on_connection_open(self, unused_connection): @@ -86,7 +82,7 @@ class ExampleConsumer(object): :type unused_connection: pika.SelectConnection """ - LOGGER.info('Connection opened') + self.logger.info('Connection opened') self.add_on_connection_close_callback() self.open_channel() @@ -111,7 +107,7 @@ class ExampleConsumer(object): RabbitMQ unexpectedly closes the channel. """ - LOGGER.info('Adding channel close callback') + self.logger.debug('Adding channel close callback') self._channel.add_on_close_callback(self.on_channel_closed) def on_channel_closed(self, channel, reply_code, reply_text): @@ -126,8 +122,8 @@ class ExampleConsumer(object): :param str reply_text: The text reason the channel was closed """ - LOGGER.warning('Channel %i was closed: (%s) %s', - channel, reply_code, reply_text) + self.logger.warning('Channel %i was closed: (%s) %s', + channel, reply_code, reply_text) self._connection.close() def on_channel_open(self, channel): @@ -139,7 +135,7 @@ class ExampleConsumer(object): :param pika.channel.Channel channel: The channel object """ - LOGGER.info('Channel opened') + self.logger.debug('Channel opened') self._channel = channel self.add_on_channel_close_callback() self.setup_exchange('pubsub') @@ -152,7 +148,7 @@ class ExampleConsumer(object): :param str|unicode exchange_name: The name of the exchange to declare """ - LOGGER.info('Declaring exchange %s', exchange_name) + self.logger.debug('Declaring exchange %s', exchange_name) self._channel.exchange_declare(self.on_exchange_declareok, exchange=exchange_name, exchange_type='topic', @@ -165,7 +161,7 @@ class ExampleConsumer(object): :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame """ - LOGGER.info('Exchange declared') + self.logger.debug('Exchange declared') self._channel.queue_declare(self.on_queue_declareok, exclusive=True) def on_queue_declareok(self, method_frame): @@ -183,10 +179,10 @@ class ExampleConsumer(object): self.bind_queue_to_routing_key(self.routing_keys_to_bind.pop()) def routing_keys(self): - return ['opensuse.obs.repo.published', 'opensuse.openqa.job.done'] + return ['#'] def bind_queue_to_routing_key(self, key): - LOGGER.info('Binding %s to %s', key, self.queue_name) + self.logger.info('Binding %s to %s', key, self.queue_name) self._channel.queue_bind(self.on_bindok, self.queue_name, 'pubsub', key) def add_on_cancel_callback(self): @@ -195,7 +191,7 @@ class ExampleConsumer(object): on_consumer_cancelled will be invoked by pika. """ - LOGGER.info('Adding consumer cancellation callback') + self.logger.debug('Adding consumer cancellation callback') self._channel.add_on_cancel_callback(self.on_consumer_cancelled) def on_consumer_cancelled(self, method_frame): @@ -205,8 +201,8 @@ class ExampleConsumer(object): :param pika.frame.Method method_frame: The Basic.Cancel frame """ - LOGGER.info('Consumer was cancelled remotely, shutting down: %r', - method_frame) + self.logger.info('Consumer was cancelled remotely, shutting down: %r', + method_frame) if self._channel: self._channel.close() @@ -224,8 +220,8 @@ class ExampleConsumer(object): :param str|unicode body: The message body """ - LOGGER.info('Received message # %s: %s %s', - basic_deliver.delivery_tag, basic_deliver.routing_key, body) + self.logger.info('Received message # %s: %s %s', + basic_deliver.delivery_tag, basic_deliver.routing_key, body) def on_cancelok(self, unused_frame): """This method is invoked by pika when RabbitMQ acknowledges the @@ -236,7 +232,7 @@ class ExampleConsumer(object): :param pika.frame.Method unused_frame: The Basic.CancelOk frame """ - LOGGER.info('RabbitMQ acknowledged the cancellation of the consumer') + self.logger.debug('RabbitMQ acknowledged the cancellation of the consumer') self.close_channel() def stop_consuming(self): @@ -245,7 +241,7 @@ class ExampleConsumer(object): """ if self._channel: - LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ') + self.logger.debug('Sending a Basic.Cancel RPC command to RabbitMQ') self._channel.basic_cancel(self.on_cancelok, self._consumer_tag) def start_consuming(self): @@ -258,7 +254,7 @@ class ExampleConsumer(object): will invoke when a message is fully received. """ - LOGGER.info('Issuing consumer related RPC commands') + self.logger.debug('Issuing consumer related RPC commands') self.add_on_cancel_callback() self._consumer_tag = self._channel.basic_consume(self.on_message, self.queue_name, no_ack=True) @@ -271,7 +267,7 @@ class ExampleConsumer(object): :param pika.frame.Method unused_frame: The Queue.BindOk response frame """ - LOGGER.info('Queue bound') + self.logger.debug('Queue bound') if len(self.routing_keys_to_bind): self.bind_queue_to_routing_key(self.routing_keys_to_bind.pop()) else: @@ -282,7 +278,7 @@ class ExampleConsumer(object): Channel.Close RPC command. """ - LOGGER.info('Closing the channel') + self.logger.debug('Closing the channel') self._channel.close() def open_channel(self): @@ -291,7 +287,7 @@ class ExampleConsumer(object): on_channel_open callback will be invoked by pika. """ - LOGGER.info('Creating a new channel') + self.logger.debug('Creating a new channel') self._connection.channel(on_open_callback=self.on_channel_open) def run(self): @@ -313,16 +309,20 @@ class ExampleConsumer(object): the IOLoop will be buffered but not processed. """ - LOGGER.info('Stopping') + self.logger.debug('Stopping') self._closing = True self.stop_consuming() self._connection.ioloop.start() - LOGGER.info('Stopped') + self.logger.debug('Stopped') def main(): + LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) ' + '-35s %(lineno) -5d: %(message)s') + logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) - example = ExampleConsumer('amqps://opensuse:opensuse@rabbit.opensuse.org') + example = PubSubConsumer('amqps://opensuse:opensuse@rabbit.opensuse.org', + logging.getLogger(__name__)) try: example.run() except KeyboardInterrupt: diff --git a/rabbit-openqa.py b/rabbit-openqa.py index 339e05d7..dfbdc301 100755 --- a/rabbit-openqa.py +++ b/rabbit-openqa.py @@ -1,6 +1,7 @@ #!/usr/bin/python import argparse +import logging import pika import sys import json @@ -18,6 +19,7 @@ try: except ImportError: # python 2.x from urllib2 import HTTPError +from PubSubConsumer import PubSubConsumer class Project(object): @@ -74,7 +76,7 @@ class Project(object): def fetch_openqa_jobs(self, staging, iso): buildid = self.staging_projects[staging].get('id') if not buildid: - print("I don't know the build id of " + staging) + self.logger.info("I don't know the build id of " + staging) return # all openQA jobs are created at the same URL url = self.api.makeurl(['status_reports', 'published', staging, 'images', 'reports', buildid]) @@ -105,9 +107,7 @@ class Project(object): try: http_POST(url, data=xml) except HTTPError: - # https://github.com/openSUSE/open-build-service/issues/6051 - print('failed to post status to ' + url) - print(xml) + self.logger.error('failed to post status to ' + url) def update_staging_status(self, project): for iso in self.staging_projects[project]['isos']: @@ -155,32 +155,20 @@ class Project(object): return ET.tostring(check) -class Listener(object): +class Listener(PubSubConsumer): def __init__(self, amqp_prefix, amqp_url, openqa_url): + super(Listener, self).__init__(amqp_url, logging.getLogger(__name__)) self.projects = [] self.amqp_prefix = amqp_prefix - self.amqp_url = amqp_url self.openqa_url = openqa_url self.openqa = OpenQA_Client(server=openqa_url) - self.setup_rabbitmq() - def setup_rabbitmq(self): - connection = pika.BlockingConnection(pika.URLParameters(self.amqp_url)) - self.channel = connection.channel() - - self.channel.exchange_declare(exchange='pubsub', exchange_type='topic', passive=True, durable=True) - - result = self.channel.queue_declare(exclusive=True) - queue_name = result.method.queue - - for event in ['.obs.repo.published', '.openqa.job.done', - '.openqa.job.create', '.openqa.job.restart']: - self.channel.queue_bind(exchange='pubsub', - queue=queue_name, - routing_key=self.amqp_prefix + event) - self.channel.basic_consume(self.on_message, - queue=queue_name, - no_ack=True) + def routing_keys(self): + ret = [] + for suffix in ['.obs.repo.published', '.openqa.job.done', + '.openqa.job.create', '.openqa.job.restart']: + ret.append(self.amqp_prefix + suffix) + return ret def add(self, project): project.fetch_initial_openqa(self) @@ -213,7 +201,7 @@ class Listener(object): p.check_published_repo(str(payload['project']), str(payload['repo']), str(payload['buildid'])) def on_openqa_job(self, iso): - print('openqa_job_change', iso) + self.logger.debug('openqa_job_change', iso) for p in self.projects: p.openqa_job_change(iso) @@ -223,11 +211,7 @@ class Listener(object): elif re.search(r'.openqa.', method.routing_key): self.on_openqa_job(json.loads(body).get('ISO')) else: - print("unknown rabbitmq message {}".format(method.routing_key)) - - def listen(self): - print(' [*] Waiting for logs. To exit press CTRL+C') - self.channel.start_consuming() + self.logger.warning("unknown rabbitmq message {}".format(method.routing_key)) if __name__ == '__main__': @@ -259,6 +243,8 @@ if __name__ == '__main__': amqp_url = "amqps://opensuse:opensuse@rabbit.opensuse.org?heartbeat_interval=15" openqa_url = 'https://openqa.opensuse.org' + logging.basicConfig(level=logging.INFO) + l = Listener(amqp_prefix, amqp_url, openqa_url) url = makeurl(apiurl, ['search', 'project', 'id'], {'match': 'attribute/@name="OSRT:OpenQAMapping"'}) f = http_GET(url) @@ -266,4 +252,7 @@ if __name__ == '__main__': for entry in root.findall('project'): l.add(Project(entry.get('name'))) - l.listen() + try: + l.run() + except KeyboardInterrupt: + l.stop() From 8b6b498a0f51abf228f5950a245fb21354d59e88 Mon Sep 17 00:00:00 2001 From: Stephan Kulow Date: Fri, 12 Oct 2018 11:28:42 +0200 Subject: [PATCH 12/14] First connect to AMQP then fetch initial state This way we don't miss anything - we just have to make sure we're done within heartbeart interval, or the server will close the connection. But that's 60 seconds, so we're safe for this bot (and if we fail once, we have to reconnect) --- rabbit-openqa.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/rabbit-openqa.py b/rabbit-openqa.py index dfbdc301..e47206f0 100755 --- a/rabbit-openqa.py +++ b/rabbit-openqa.py @@ -24,15 +24,19 @@ from PubSubConsumer import PubSubConsumer class Project(object): def __init__(self, name): + self.name = name Config(apiurl, name) self.api = StagingAPI(apiurl, name) self.staging_projects = dict() self.listener = None self.replace_string = self.api.attribute_value_load('OpenQAMapping') + + def init(self): for p in self.api.get_staging_projects(): if self.api.is_adi_project(p): continue self.staging_projects[p] = self.initial_staging_state(p) + self.update_staging_status(p) def staging_letter(self, name): return name.split(':')[-1] @@ -67,12 +71,6 @@ class Project(object): return {'isos': self.gather_isos(name, 'images'), 'id': self.gather_buildid(name, 'images')} - # once the project is added to a listener, it's calling back - def fetch_initial_openqa(self, listener): - self.listener = listener - for project in self.staging_projects: - self.update_staging_status(project) - def fetch_openqa_jobs(self, staging, iso): buildid = self.staging_projects[staging].get('id') if not buildid: @@ -171,9 +169,18 @@ class Listener(PubSubConsumer): return ret def add(self, project): - project.fetch_initial_openqa(self) + project.listener = self self.projects.append(project) + def start_consuming(self): + # now we are (re-)connected to the bus and need to fetch the + # initial state + for project in self.projects: + self.logger.info('Fetching ISOs of %s', project.name) + project.init() + self.logger.info('Finished fetching initial ISOs, listening') + super(Listener, self).start_consuming() + def jobs_for_iso(self, iso): values = { 'iso': iso, @@ -236,11 +243,11 @@ if __name__ == '__main__': if apiurl.endswith('suse.de'): amqp_prefix = 'suse' - amqp_url = "amqps://suse:suse@rabbit.suse.de?heartbeat_interval=15" + amqp_url = "amqps://suse:suse@rabbit.suse.de" openqa_url = 'https://openqa.suse.de' else: amqp_prefix = 'opensuse' - amqp_url = "amqps://opensuse:opensuse@rabbit.opensuse.org?heartbeat_interval=15" + amqp_url = "amqps://opensuse:opensuse@rabbit.opensuse.org" openqa_url = 'https://openqa.opensuse.org' logging.basicConfig(level=logging.INFO) From 5b9efe757d4cc18efc97188aeb436f1ed5c566f4 Mon Sep 17 00:00:00 2001 From: Stephan Kulow Date: Fri, 12 Oct 2018 15:04:41 +0200 Subject: [PATCH 13/14] Require python-pika in CI --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 8a2970a5..96edb418 100644 --- a/.travis.yml +++ b/.travis.yml @@ -59,7 +59,7 @@ matrix: install: # urlgrabber needed to install osc from git in requirements.txt # m2crypto for osc to be runable as used in docker-compose-obs - - pip install pycurl urlgrabber m2crypto + - pip install pycurl urlgrabber m2crypto pika - pip install -r requirements.txt - pip install python-coveralls - pip install nose-exclude From 75306457da7889968c25188c8d5fd3a81660b4af Mon Sep 17 00:00:00 2001 From: Stephan Kulow Date: Sun, 21 Oct 2018 20:53:37 +0200 Subject: [PATCH 14/14] If OBS or openQA raise an exception, restart --- rabbit-openqa.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/rabbit-openqa.py b/rabbit-openqa.py index e47206f0..e81d1406 100755 --- a/rabbit-openqa.py +++ b/rabbit-openqa.py @@ -7,18 +7,21 @@ import sys import json import osc import re +from time import sleep from osc.core import http_GET, http_POST, makeurl +from M2Crypto.SSL import SSLError as SSLError from osclib.conf import Config from osclib.stagingapi import StagingAPI from lxml import etree as ET from openqa_client.client import OpenQA_Client +from openqa_client.exceptions import ConnectionError from urllib import quote_plus import requests try: - from urllib.error import HTTPError + from urllib.error import HTTPError, URLError except ImportError: # python 2.x - from urllib2 import HTTPError + from urllib2 import HTTPError, URLError from PubSubConsumer import PubSubConsumer @@ -259,7 +262,11 @@ if __name__ == '__main__': for entry in root.findall('project'): l.add(Project(entry.get('name'))) - try: - l.run() - except KeyboardInterrupt: - l.stop() + while True: + try: + l.run() + except KeyboardInterrupt: + l.stop() + except (HTTPError, URLError, ConnectionError, SSLError): + # OBS/openQA hickup + sleep(10)