diff --git a/.travis.yml b/.travis.yml index 8a2970a5..96edb418 100644 --- a/.travis.yml +++ b/.travis.yml @@ -59,7 +59,7 @@ matrix: install: # urlgrabber needed to install osc from git in requirements.txt # m2crypto for osc to be runable as used in docker-compose-obs - - pip install pycurl urlgrabber m2crypto + - pip install pycurl urlgrabber m2crypto pika - pip install -r requirements.txt - pip install python-coveralls - pip install nose-exclude diff --git a/PubSubConsumer.py b/PubSubConsumer.py new file mode 100644 index 00000000..d73e3aa3 --- /dev/null +++ b/PubSubConsumer.py @@ -0,0 +1,333 @@ +import logging +import pika + + +class PubSubConsumer(object): + """This is an example consumer that will handle unexpected interactions + with RabbitMQ such as channel and connection closures. + + If RabbitMQ closes the connection, it will reopen it. You should + look at the output, as there are limited reasons why the connection may + be closed, which usually are tied to permission related issues or + socket timeouts. + + If the channel is closed, it will indicate a problem with one of the + commands that were issued and that should surface in the output as well. + + """ + + def __init__(self, amqp_url, logger): + """Create a new instance of the consumer class, passing in the AMQP + URL used to connect to RabbitMQ. + + :param str amqp_url: The AMQP url to connect with + + """ + self._connection = None + self._channel = None + self._closing = False + self._consumer_tag = None + self._url = amqp_url + self.logger = logger + + def connect(self): + """This method connects to RabbitMQ, returning the connection handle. + When the connection is established, the on_connection_open method + will be invoked by pika. + + :rtype: pika.SelectConnection + + """ + self.logger.info('Connecting to %s', self._url) + return pika.SelectConnection(pika.URLParameters(self._url), + self.on_connection_open, + stop_ioloop_on_close=False) + + def close_connection(self): + """This method closes the connection to RabbitMQ.""" + self.logger.info('Closing connection') + self._connection.close() + + def add_on_connection_close_callback(self): + """This method adds an on close callback that will be invoked by pika + when RabbitMQ closes the connection to the publisher unexpectedly. + + """ + self.logger.debug('Adding connection close callback') + self._connection.add_on_close_callback(self.on_connection_closed) + + def on_connection_closed(self, connection, reply_code, reply_text): + """This method is invoked by pika when the connection to RabbitMQ is + closed unexpectedly. Since it is unexpected, we will reconnect to + RabbitMQ if it disconnects. + + :param pika.connection.Connection connection: The closed connection obj + :param int reply_code: The server provided reply_code if given + :param str reply_text: The server provided reply_text if given + + """ + self._channel = None + if self._closing: + self._connection.ioloop.stop() + else: + self.logger.warning('Connection closed, reopening in 5 seconds: (%s) %s', + reply_code, reply_text) + self._connection.add_timeout(5, self.reconnect) + + def on_connection_open(self, unused_connection): + """This method is called by pika once the connection to RabbitMQ has + been established. It passes the handle to the connection object in + case we need it, but in this case, we'll just mark it unused. + + :type unused_connection: pika.SelectConnection + + """ + self.logger.info('Connection opened') + self.add_on_connection_close_callback() + self.open_channel() + + def reconnect(self): + """Will be invoked by the IOLoop timer if the connection is + closed. See the on_connection_closed method. + + """ + # This is the old connection IOLoop instance, stop its ioloop + self._connection.ioloop.stop() + + if not self._closing: + + # Create a new connection + self._connection = self.connect() + + # There is now a new connection, needs a new ioloop to run + self._connection.ioloop.start() + + def add_on_channel_close_callback(self): + """This method tells pika to call the on_channel_closed method if + RabbitMQ unexpectedly closes the channel. + + """ + self.logger.debug('Adding channel close callback') + self._channel.add_on_close_callback(self.on_channel_closed) + + def on_channel_closed(self, channel, reply_code, reply_text): + """Invoked by pika when RabbitMQ unexpectedly closes the channel. + Channels are usually closed if you attempt to do something that + violates the protocol, such as re-declare an exchange or queue with + different parameters. In this case, we'll close the connection + to shutdown the object. + + :param pika.channel.Channel: The closed channel + :param int reply_code: The numeric reason the channel was closed + :param str reply_text: The text reason the channel was closed + + """ + self.logger.warning('Channel %i was closed: (%s) %s', + channel, reply_code, reply_text) + self._connection.close() + + def on_channel_open(self, channel): + """This method is invoked by pika when the channel has been opened. + The channel object is passed in so we can make use of it. + + Since the channel is now open, we'll declare the exchange to use. + + :param pika.channel.Channel channel: The channel object + + """ + self.logger.debug('Channel opened') + self._channel = channel + self.add_on_channel_close_callback() + self.setup_exchange('pubsub') + + def setup_exchange(self, exchange_name): + """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC + command. When it is complete, the on_exchange_declareok method will + be invoked by pika. + + :param str|unicode exchange_name: The name of the exchange to declare + + """ + self.logger.debug('Declaring exchange %s', exchange_name) + self._channel.exchange_declare(self.on_exchange_declareok, + exchange=exchange_name, + exchange_type='topic', + passive=True, durable=True) + + def on_exchange_declareok(self, unused_frame): + """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC + command. + + :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame + + """ + self.logger.debug('Exchange declared') + self._channel.queue_declare(self.on_queue_declareok, exclusive=True) + + def on_queue_declareok(self, method_frame): + """Method invoked by pika when the Queue.Declare RPC call made in + setup_queue has completed. In this method we will bind the queue + and exchange together with the routing key by issuing the Queue.Bind + RPC command. When this command is complete, the on_bindok method will + be invoked by pika. + + :param pika.frame.Method method_frame: The Queue.DeclareOk frame + + """ + self.queue_name = method_frame.method.queue + self.routing_keys_to_bind = self.routing_keys() + self.bind_queue_to_routing_key(self.routing_keys_to_bind.pop()) + + def routing_keys(self): + return ['#'] + + def bind_queue_to_routing_key(self, key): + self.logger.info('Binding %s to %s', key, self.queue_name) + self._channel.queue_bind(self.on_bindok, self.queue_name, 'pubsub', key) + + def add_on_cancel_callback(self): + """Add a callback that will be invoked if RabbitMQ cancels the consumer + for some reason. If RabbitMQ does cancel the consumer, + on_consumer_cancelled will be invoked by pika. + + """ + self.logger.debug('Adding consumer cancellation callback') + self._channel.add_on_cancel_callback(self.on_consumer_cancelled) + + def on_consumer_cancelled(self, method_frame): + """Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer + receiving messages. + + :param pika.frame.Method method_frame: The Basic.Cancel frame + + """ + self.logger.info('Consumer was cancelled remotely, shutting down: %r', + method_frame) + if self._channel: + self._channel.close() + + def on_message(self, unused_channel, basic_deliver, properties, body): + """Invoked by pika when a message is delivered from RabbitMQ. The + channel is passed for your convenience. The basic_deliver object that + is passed in carries the exchange, routing key, delivery tag and + a redelivered flag for the message. The properties passed in is an + instance of BasicProperties with the message properties and the body + is the message that was sent. + + :param pika.channel.Channel unused_channel: The channel object + :param pika.Spec.Basic.Deliver: basic_deliver method + :param pika.Spec.BasicProperties: properties + :param str|unicode body: The message body + + """ + self.logger.info('Received message # %s: %s %s', + basic_deliver.delivery_tag, basic_deliver.routing_key, body) + + def on_cancelok(self, unused_frame): + """This method is invoked by pika when RabbitMQ acknowledges the + cancellation of a consumer. At this point we will close the channel. + This will invoke the on_channel_closed method once the channel has been + closed, which will in-turn close the connection. + + :param pika.frame.Method unused_frame: The Basic.CancelOk frame + + """ + self.logger.debug('RabbitMQ acknowledged the cancellation of the consumer') + self.close_channel() + + def stop_consuming(self): + """Tell RabbitMQ that you would like to stop consuming by sending the + Basic.Cancel RPC command. + + """ + if self._channel: + self.logger.debug('Sending a Basic.Cancel RPC command to RabbitMQ') + self._channel.basic_cancel(self.on_cancelok, self._consumer_tag) + + def start_consuming(self): + """This method sets up the consumer by first calling + add_on_cancel_callback so that the object is notified if RabbitMQ + cancels the consumer. It then issues the Basic.Consume RPC command + which returns the consumer tag that is used to uniquely identify the + consumer with RabbitMQ. We keep the value to use it when we want to + cancel consuming. The on_message method is passed in as a callback pika + will invoke when a message is fully received. + + """ + self.logger.debug('Issuing consumer related RPC commands') + self.add_on_cancel_callback() + self._consumer_tag = self._channel.basic_consume(self.on_message, + self.queue_name, no_ack=True) + + def on_bindok(self, unused_frame): + """Invoked by pika when the Queue.Bind method has completed. At this + point we will start consuming messages by calling start_consuming + which will invoke the needed RPC commands to start the process. + + :param pika.frame.Method unused_frame: The Queue.BindOk response frame + + """ + self.logger.debug('Queue bound') + if len(self.routing_keys_to_bind): + self.bind_queue_to_routing_key(self.routing_keys_to_bind.pop()) + else: + self.start_consuming() + + def close_channel(self): + """Call to close the channel with RabbitMQ cleanly by issuing the + Channel.Close RPC command. + + """ + self.logger.debug('Closing the channel') + self._channel.close() + + def open_channel(self): + """Open a new channel with RabbitMQ by issuing the Channel.Open RPC + command. When RabbitMQ responds that the channel is open, the + on_channel_open callback will be invoked by pika. + + """ + self.logger.debug('Creating a new channel') + self._connection.channel(on_open_callback=self.on_channel_open) + + def run(self): + """Run the example consumer by connecting to RabbitMQ and then + starting the IOLoop to block and allow the SelectConnection to operate. + + """ + self._connection = self.connect() + self._connection.ioloop.start() + + def stop(self): + """Cleanly shutdown the connection to RabbitMQ by stopping the consumer + with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok + will be invoked by pika, which will then closing the channel and + connection. The IOLoop is started again because this method is invoked + when CTRL-C is pressed raising a KeyboardInterrupt exception. This + exception stops the IOLoop which needs to be running for pika to + communicate with RabbitMQ. All of the commands issued prior to starting + the IOLoop will be buffered but not processed. + + """ + self.logger.debug('Stopping') + self._closing = True + self.stop_consuming() + self._connection.ioloop.start() + self.logger.debug('Stopped') + + +def main(): + LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) ' + '-35s %(lineno) -5d: %(message)s') + + logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) + example = PubSubConsumer('amqps://opensuse:opensuse@rabbit.opensuse.org', + logging.getLogger(__name__)) + try: + example.run() + except KeyboardInterrupt: + example.stop() + + +if __name__ == '__main__': + main() diff --git a/dist/package/openSUSE-release-tools.spec b/dist/package/openSUSE-release-tools.spec index 4efb0184..2e58f4a6 100644 --- a/dist/package/openSUSE-release-tools.spec +++ b/dist/package/openSUSE-release-tools.spec @@ -293,6 +293,16 @@ Requires: osc >= 0.159.0 %description -n osc-plugin-vdelreq OSC plugin to check for virtually accepted request, see `osc vdelreq --help`. +%package rabbit-openqa +Summary: Sync openQA Status Into OBS +Group: Development/Tools/Other +BuildArch: noarch +Requires: osc >= 0.159.0 + +%description rabbit-openqa +Bot listening to AMQP bus and syncs openQA job status into OBS for +staging projects + %prep %setup -q @@ -388,6 +398,14 @@ fi %postun pkglistgen %systemd_postun +%pre rabbit-openqa +getent passwd osrt-rabit-openqa > /dev/null || \ + useradd -r -m -s /sbin/nologin -c "user for openSUSE-release-tools-rabbit-openqa" osrt-rabit-openqa +exit 0 + +%postun rabbit-openqa +%systemd_postun + %files %defattr(-,root,root,-) %doc README.md @@ -436,6 +454,7 @@ fi %exclude %{_datadir}/%{source_dir}/osc-staging.py %exclude %{_datadir}/%{source_dir}/osc-vdelreq.py %exclude %{_datadir}/%{source_dir}/update_crawler.py +%exclude %{_datadir}/%{source_dir}/rabbit-openqa.py %dir %{_sysconfdir}/openSUSE-release-tools %files devel @@ -571,6 +590,12 @@ fi %{_unitdir}/osrt-pkglistgen@.service %{_unitdir}/osrt-pkglistgen@.timer +%files rabbit-openqa +%defattr(-,root,root,-) +%{_bindir}/osrt-rabbit-openqa +%{_datadir}/%{source_dir}/rabbit-openqa.py +%{_unitdir}/osrt-rabbit-openqa.service + %files -n osclib %defattr(-,root,root,-) %{_datadir}/%{source_dir}/osclib diff --git a/rabbit-openqa.py b/rabbit-openqa.py new file mode 100755 index 00000000..e81d1406 --- /dev/null +++ b/rabbit-openqa.py @@ -0,0 +1,272 @@ +#!/usr/bin/python + +import argparse +import logging +import pika +import sys +import json +import osc +import re +from time import sleep +from osc.core import http_GET, http_POST, makeurl +from M2Crypto.SSL import SSLError as SSLError +from osclib.conf import Config +from osclib.stagingapi import StagingAPI +from lxml import etree as ET +from openqa_client.client import OpenQA_Client +from openqa_client.exceptions import ConnectionError +from urllib import quote_plus +import requests +try: + from urllib.error import HTTPError, URLError +except ImportError: + # python 2.x + from urllib2 import HTTPError, URLError +from PubSubConsumer import PubSubConsumer + + +class Project(object): + def __init__(self, name): + self.name = name + Config(apiurl, name) + self.api = StagingAPI(apiurl, name) + self.staging_projects = dict() + self.listener = None + self.replace_string = self.api.attribute_value_load('OpenQAMapping') + + def init(self): + for p in self.api.get_staging_projects(): + if self.api.is_adi_project(p): + continue + self.staging_projects[p] = self.initial_staging_state(p) + self.update_staging_status(p) + + def staging_letter(self, name): + return name.split(':')[-1] + + def map_iso(self, staging_project, iso): + parts = self.replace_string.split('/') + if parts[0] != 's': + raise Exception("{}'s iso_replace_string does not start with s/".format(self.name)) + old = parts[1] + new = parts[2] + new = new.replace('$LETTER', self.staging_letter(staging_project)) + return re.compile(old).sub(new, iso) + + def gather_isos(self, name, repository): + url = self.api.makeurl(['published', name, repository, 'iso']) + f = self.api.retried_GET(url) + root = ET.parse(f).getroot() + ret = [] + for entry in root.findall('entry'): + if entry.get('name').endswith('iso'): + ret.append(self.map_iso(name, entry.get('name'))) + return ret + + def gather_buildid(self, name, repository): + url = self.api.makeurl(['published', name, repository], {'view': 'status'}) + f = self.api.retried_GET(url) + id = ET.parse(f).getroot().find('buildid') + if id is not None: + return id.text + + def initial_staging_state(self, name): + return {'isos': self.gather_isos(name, 'images'), + 'id': self.gather_buildid(name, 'images')} + + def fetch_openqa_jobs(self, staging, iso): + buildid = self.staging_projects[staging].get('id') + if not buildid: + self.logger.info("I don't know the build id of " + staging) + return + # all openQA jobs are created at the same URL + url = self.api.makeurl(['status_reports', 'published', staging, 'images', 'reports', buildid]) + openqa = self.listener.jobs_for_iso(iso) + # collect job infos to pick names + openqa_infos = dict() + for job in openqa: + print(staging, iso, job['id'], job['state'], job['result'], + job['settings']['MACHINE'], job['settings']['TEST']) + openqa_infos[job['id']] = {'url': self.listener.test_url(job)} + openqa_infos[job['id']]['state'] = self.map_openqa_result(job) + openqa_infos[job['id']]['name'] = job['settings']['TEST'] + openqa_infos[job['id']]['machine'] = job['settings']['MACHINE'] + + # make sure the names are unique + taken_names = dict() + for id in openqa_infos: + name = openqa_infos[id]['name'] + if name in taken_names: + openqa_infos[id]['name'] = openqa_infos[id]['name'] + "@" + openqa_infos[id]['machine'] + # the other id + id = taken_names[name] + openqa_infos[id]['name'] = openqa_infos[id]['name'] + "@" + openqa_infos[id]['machine'] + taken_names[name] = id + + for info in openqa_infos.values(): + xml = self.openqa_check_xml(info['url'], info['state'], info['name']) + try: + http_POST(url, data=xml) + except HTTPError: + self.logger.error('failed to post status to ' + url) + + def update_staging_status(self, project): + for iso in self.staging_projects[project]['isos']: + self.fetch_openqa_jobs(project, iso) + + def update_staging_buildid(self, project, repository, buildid): + self.staging_projects[project]['id'] = buildid + self.staging_projects[project]['isos'] = self.gather_isos(project, repository) + self.update_staging_status(project) + + def check_published_repo(self, project, repository, buildid): + if repository != 'images': + return + for p in self.staging_projects: + if project == p: + self.update_staging_buildid(project, repository, buildid) + + def matching_project(self, iso): + for p in self.staging_projects: + if iso in self.staging_projects[p]['isos']: + return p + + def map_openqa_result(self, job): + if job['result'] in ['passed', 'softfailed']: + return 'success' + if job['result'] == 'none': + return 'pending' + return 'failure' + + def openqa_job_change(self, iso): + staging = self.matching_project(iso) + if not staging: + return + # we fetch all openqa jobs so we can avoid long job names + self.fetch_openqa_jobs(staging, iso) + + def openqa_check_xml(self, url, state, name): + check = ET.Element('check') + se = ET.SubElement(check, 'url') + se.text = url + se = ET.SubElement(check, 'state') + se.text = state + se = ET.SubElement(check, 'name') + se.text = name + return ET.tostring(check) + + +class Listener(PubSubConsumer): + def __init__(self, amqp_prefix, amqp_url, openqa_url): + super(Listener, self).__init__(amqp_url, logging.getLogger(__name__)) + self.projects = [] + self.amqp_prefix = amqp_prefix + self.openqa_url = openqa_url + self.openqa = OpenQA_Client(server=openqa_url) + + def routing_keys(self): + ret = [] + for suffix in ['.obs.repo.published', '.openqa.job.done', + '.openqa.job.create', '.openqa.job.restart']: + ret.append(self.amqp_prefix + suffix) + return ret + + def add(self, project): + project.listener = self + self.projects.append(project) + + def start_consuming(self): + # now we are (re-)connected to the bus and need to fetch the + # initial state + for project in self.projects: + self.logger.info('Fetching ISOs of %s', project.name) + project.init() + self.logger.info('Finished fetching initial ISOs, listening') + super(Listener, self).start_consuming() + + def jobs_for_iso(self, iso): + values = { + 'iso': iso, + 'scope': 'current', + 'latest': '1', + } + return self.openqa.openqa_request('GET', 'jobs', values)['jobs'] + + def get_step_url(self, testurl, modulename): + failurl = testurl + '/modules/{!s}/fails'.format(quote_plus(modulename)) + fails = requests.get(failurl).json() + failed_step = fails.get('first_failed_step', 1) + return "{!s}#step/{!s}/{:d}".format(testurl, modulename, failed_step) + + def test_url(self, job): + url = self.openqa_url + ("/tests/%d" % job['id']) + if job['result'] == 'failed': + for module in job['modules']: + if module['result'] == 'failed': + return self.get_step_url(url, module['name']) + return url + + def on_published_repo(self, payload): + for p in self.projects: + p.check_published_repo(str(payload['project']), str(payload['repo']), str(payload['buildid'])) + + def on_openqa_job(self, iso): + self.logger.debug('openqa_job_change', iso) + for p in self.projects: + p.openqa_job_change(iso) + + def on_message(self, unused_channel, method, properties, body): + if method.routing_key == '{}.obs.repo.published'.format(amqp_prefix): + self.on_published_repo(json.loads(body)) + elif re.search(r'.openqa.', method.routing_key): + self.on_openqa_job(json.loads(body).get('ISO')) + else: + self.logger.warning("unknown rabbitmq message {}".format(method.routing_key)) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description='Bot to sync openQA status to OBS') + parser.add_argument("--apiurl", '-A', type=str, default='https://api.opensuse.org', help='API URL of OBS') + parser.add_argument('-s', '--staging', type=str, default=None, + help='staging project letter') + parser.add_argument('-f', '--force', action='store_true', default=False, + help='force the write of the comment') + parser.add_argument('-p', '--project', type=str, default='Factory', + help='openSUSE version to make the check (Factory, 13.2)') + parser.add_argument('-d', '--debug', action='store_true', default=False, + help='enable debug information') + + args = parser.parse_args() + + osc.conf.get_config() + osc.conf.config['debug'] = args.debug + + apiurl = args.apiurl + + if apiurl.endswith('suse.de'): + amqp_prefix = 'suse' + amqp_url = "amqps://suse:suse@rabbit.suse.de" + openqa_url = 'https://openqa.suse.de' + else: + amqp_prefix = 'opensuse' + amqp_url = "amqps://opensuse:opensuse@rabbit.opensuse.org" + openqa_url = 'https://openqa.opensuse.org' + + logging.basicConfig(level=logging.INFO) + + l = Listener(amqp_prefix, amqp_url, openqa_url) + url = makeurl(apiurl, ['search', 'project', 'id'], {'match': 'attribute/@name="OSRT:OpenQAMapping"'}) + f = http_GET(url) + root = ET.parse(f).getroot() + for entry in root.findall('project'): + l.add(Project(entry.get('name'))) + + while True: + try: + l.run() + except KeyboardInterrupt: + l.stop() + except (HTTPError, URLError, ConnectionError, SSLError): + # OBS/openQA hickup + sleep(10) diff --git a/systemd/osrt-rabbit-openqa.service b/systemd/osrt-rabbit-openqa.service new file mode 100644 index 00000000..8c83097f --- /dev/null +++ b/systemd/osrt-rabbit-openqa.service @@ -0,0 +1,9 @@ +[Unit] +Description=openSUSE Release Tools: Sync openQA status + +[Service] +User=osrt-rabbit-openqa +ExecStart=/usr/bin/osrt-rabbit-openqa + +[Install] +WantedBy=multi-user.target