Merge pull request #1720 from coolo/add_openqa_sync
Add helper bot to listen to rabbit bus and feed OBS with openQA Status
This commit is contained in:
commit
9b1618e7c5
@ -59,7 +59,7 @@ matrix:
|
|||||||
install:
|
install:
|
||||||
# urlgrabber needed to install osc from git in requirements.txt
|
# urlgrabber needed to install osc from git in requirements.txt
|
||||||
# m2crypto for osc to be runable as used in docker-compose-obs
|
# m2crypto for osc to be runable as used in docker-compose-obs
|
||||||
- pip install pycurl urlgrabber m2crypto
|
- pip install pycurl urlgrabber m2crypto pika
|
||||||
- pip install -r requirements.txt
|
- pip install -r requirements.txt
|
||||||
- pip install python-coveralls
|
- pip install python-coveralls
|
||||||
- pip install nose-exclude
|
- pip install nose-exclude
|
||||||
|
333
PubSubConsumer.py
Normal file
333
PubSubConsumer.py
Normal file
@ -0,0 +1,333 @@
|
|||||||
|
import logging
|
||||||
|
import pika
|
||||||
|
|
||||||
|
|
||||||
|
class PubSubConsumer(object):
|
||||||
|
"""This is an example consumer that will handle unexpected interactions
|
||||||
|
with RabbitMQ such as channel and connection closures.
|
||||||
|
|
||||||
|
If RabbitMQ closes the connection, it will reopen it. You should
|
||||||
|
look at the output, as there are limited reasons why the connection may
|
||||||
|
be closed, which usually are tied to permission related issues or
|
||||||
|
socket timeouts.
|
||||||
|
|
||||||
|
If the channel is closed, it will indicate a problem with one of the
|
||||||
|
commands that were issued and that should surface in the output as well.
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, amqp_url, logger):
|
||||||
|
"""Create a new instance of the consumer class, passing in the AMQP
|
||||||
|
URL used to connect to RabbitMQ.
|
||||||
|
|
||||||
|
:param str amqp_url: The AMQP url to connect with
|
||||||
|
|
||||||
|
"""
|
||||||
|
self._connection = None
|
||||||
|
self._channel = None
|
||||||
|
self._closing = False
|
||||||
|
self._consumer_tag = None
|
||||||
|
self._url = amqp_url
|
||||||
|
self.logger = logger
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
"""This method connects to RabbitMQ, returning the connection handle.
|
||||||
|
When the connection is established, the on_connection_open method
|
||||||
|
will be invoked by pika.
|
||||||
|
|
||||||
|
:rtype: pika.SelectConnection
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.logger.info('Connecting to %s', self._url)
|
||||||
|
return pika.SelectConnection(pika.URLParameters(self._url),
|
||||||
|
self.on_connection_open,
|
||||||
|
stop_ioloop_on_close=False)
|
||||||
|
|
||||||
|
def close_connection(self):
|
||||||
|
"""This method closes the connection to RabbitMQ."""
|
||||||
|
self.logger.info('Closing connection')
|
||||||
|
self._connection.close()
|
||||||
|
|
||||||
|
def add_on_connection_close_callback(self):
|
||||||
|
"""This method adds an on close callback that will be invoked by pika
|
||||||
|
when RabbitMQ closes the connection to the publisher unexpectedly.
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.logger.debug('Adding connection close callback')
|
||||||
|
self._connection.add_on_close_callback(self.on_connection_closed)
|
||||||
|
|
||||||
|
def on_connection_closed(self, connection, reply_code, reply_text):
|
||||||
|
"""This method is invoked by pika when the connection to RabbitMQ is
|
||||||
|
closed unexpectedly. Since it is unexpected, we will reconnect to
|
||||||
|
RabbitMQ if it disconnects.
|
||||||
|
|
||||||
|
:param pika.connection.Connection connection: The closed connection obj
|
||||||
|
:param int reply_code: The server provided reply_code if given
|
||||||
|
:param str reply_text: The server provided reply_text if given
|
||||||
|
|
||||||
|
"""
|
||||||
|
self._channel = None
|
||||||
|
if self._closing:
|
||||||
|
self._connection.ioloop.stop()
|
||||||
|
else:
|
||||||
|
self.logger.warning('Connection closed, reopening in 5 seconds: (%s) %s',
|
||||||
|
reply_code, reply_text)
|
||||||
|
self._connection.add_timeout(5, self.reconnect)
|
||||||
|
|
||||||
|
def on_connection_open(self, unused_connection):
|
||||||
|
"""This method is called by pika once the connection to RabbitMQ has
|
||||||
|
been established. It passes the handle to the connection object in
|
||||||
|
case we need it, but in this case, we'll just mark it unused.
|
||||||
|
|
||||||
|
:type unused_connection: pika.SelectConnection
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.logger.info('Connection opened')
|
||||||
|
self.add_on_connection_close_callback()
|
||||||
|
self.open_channel()
|
||||||
|
|
||||||
|
def reconnect(self):
|
||||||
|
"""Will be invoked by the IOLoop timer if the connection is
|
||||||
|
closed. See the on_connection_closed method.
|
||||||
|
|
||||||
|
"""
|
||||||
|
# This is the old connection IOLoop instance, stop its ioloop
|
||||||
|
self._connection.ioloop.stop()
|
||||||
|
|
||||||
|
if not self._closing:
|
||||||
|
|
||||||
|
# Create a new connection
|
||||||
|
self._connection = self.connect()
|
||||||
|
|
||||||
|
# There is now a new connection, needs a new ioloop to run
|
||||||
|
self._connection.ioloop.start()
|
||||||
|
|
||||||
|
def add_on_channel_close_callback(self):
|
||||||
|
"""This method tells pika to call the on_channel_closed method if
|
||||||
|
RabbitMQ unexpectedly closes the channel.
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.logger.debug('Adding channel close callback')
|
||||||
|
self._channel.add_on_close_callback(self.on_channel_closed)
|
||||||
|
|
||||||
|
def on_channel_closed(self, channel, reply_code, reply_text):
|
||||||
|
"""Invoked by pika when RabbitMQ unexpectedly closes the channel.
|
||||||
|
Channels are usually closed if you attempt to do something that
|
||||||
|
violates the protocol, such as re-declare an exchange or queue with
|
||||||
|
different parameters. In this case, we'll close the connection
|
||||||
|
to shutdown the object.
|
||||||
|
|
||||||
|
:param pika.channel.Channel: The closed channel
|
||||||
|
:param int reply_code: The numeric reason the channel was closed
|
||||||
|
:param str reply_text: The text reason the channel was closed
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.logger.warning('Channel %i was closed: (%s) %s',
|
||||||
|
channel, reply_code, reply_text)
|
||||||
|
self._connection.close()
|
||||||
|
|
||||||
|
def on_channel_open(self, channel):
|
||||||
|
"""This method is invoked by pika when the channel has been opened.
|
||||||
|
The channel object is passed in so we can make use of it.
|
||||||
|
|
||||||
|
Since the channel is now open, we'll declare the exchange to use.
|
||||||
|
|
||||||
|
:param pika.channel.Channel channel: The channel object
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.logger.debug('Channel opened')
|
||||||
|
self._channel = channel
|
||||||
|
self.add_on_channel_close_callback()
|
||||||
|
self.setup_exchange('pubsub')
|
||||||
|
|
||||||
|
def setup_exchange(self, exchange_name):
|
||||||
|
"""Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
|
||||||
|
command. When it is complete, the on_exchange_declareok method will
|
||||||
|
be invoked by pika.
|
||||||
|
|
||||||
|
:param str|unicode exchange_name: The name of the exchange to declare
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.logger.debug('Declaring exchange %s', exchange_name)
|
||||||
|
self._channel.exchange_declare(self.on_exchange_declareok,
|
||||||
|
exchange=exchange_name,
|
||||||
|
exchange_type='topic',
|
||||||
|
passive=True, durable=True)
|
||||||
|
|
||||||
|
def on_exchange_declareok(self, unused_frame):
|
||||||
|
"""Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
|
||||||
|
command.
|
||||||
|
|
||||||
|
:param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.logger.debug('Exchange declared')
|
||||||
|
self._channel.queue_declare(self.on_queue_declareok, exclusive=True)
|
||||||
|
|
||||||
|
def on_queue_declareok(self, method_frame):
|
||||||
|
"""Method invoked by pika when the Queue.Declare RPC call made in
|
||||||
|
setup_queue has completed. In this method we will bind the queue
|
||||||
|
and exchange together with the routing key by issuing the Queue.Bind
|
||||||
|
RPC command. When this command is complete, the on_bindok method will
|
||||||
|
be invoked by pika.
|
||||||
|
|
||||||
|
:param pika.frame.Method method_frame: The Queue.DeclareOk frame
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.queue_name = method_frame.method.queue
|
||||||
|
self.routing_keys_to_bind = self.routing_keys()
|
||||||
|
self.bind_queue_to_routing_key(self.routing_keys_to_bind.pop())
|
||||||
|
|
||||||
|
def routing_keys(self):
|
||||||
|
return ['#']
|
||||||
|
|
||||||
|
def bind_queue_to_routing_key(self, key):
|
||||||
|
self.logger.info('Binding %s to %s', key, self.queue_name)
|
||||||
|
self._channel.queue_bind(self.on_bindok, self.queue_name, 'pubsub', key)
|
||||||
|
|
||||||
|
def add_on_cancel_callback(self):
|
||||||
|
"""Add a callback that will be invoked if RabbitMQ cancels the consumer
|
||||||
|
for some reason. If RabbitMQ does cancel the consumer,
|
||||||
|
on_consumer_cancelled will be invoked by pika.
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.logger.debug('Adding consumer cancellation callback')
|
||||||
|
self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
|
||||||
|
|
||||||
|
def on_consumer_cancelled(self, method_frame):
|
||||||
|
"""Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer
|
||||||
|
receiving messages.
|
||||||
|
|
||||||
|
:param pika.frame.Method method_frame: The Basic.Cancel frame
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.logger.info('Consumer was cancelled remotely, shutting down: %r',
|
||||||
|
method_frame)
|
||||||
|
if self._channel:
|
||||||
|
self._channel.close()
|
||||||
|
|
||||||
|
def on_message(self, unused_channel, basic_deliver, properties, body):
|
||||||
|
"""Invoked by pika when a message is delivered from RabbitMQ. The
|
||||||
|
channel is passed for your convenience. The basic_deliver object that
|
||||||
|
is passed in carries the exchange, routing key, delivery tag and
|
||||||
|
a redelivered flag for the message. The properties passed in is an
|
||||||
|
instance of BasicProperties with the message properties and the body
|
||||||
|
is the message that was sent.
|
||||||
|
|
||||||
|
:param pika.channel.Channel unused_channel: The channel object
|
||||||
|
:param pika.Spec.Basic.Deliver: basic_deliver method
|
||||||
|
:param pika.Spec.BasicProperties: properties
|
||||||
|
:param str|unicode body: The message body
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.logger.info('Received message # %s: %s %s',
|
||||||
|
basic_deliver.delivery_tag, basic_deliver.routing_key, body)
|
||||||
|
|
||||||
|
def on_cancelok(self, unused_frame):
|
||||||
|
"""This method is invoked by pika when RabbitMQ acknowledges the
|
||||||
|
cancellation of a consumer. At this point we will close the channel.
|
||||||
|
This will invoke the on_channel_closed method once the channel has been
|
||||||
|
closed, which will in-turn close the connection.
|
||||||
|
|
||||||
|
:param pika.frame.Method unused_frame: The Basic.CancelOk frame
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.logger.debug('RabbitMQ acknowledged the cancellation of the consumer')
|
||||||
|
self.close_channel()
|
||||||
|
|
||||||
|
def stop_consuming(self):
|
||||||
|
"""Tell RabbitMQ that you would like to stop consuming by sending the
|
||||||
|
Basic.Cancel RPC command.
|
||||||
|
|
||||||
|
"""
|
||||||
|
if self._channel:
|
||||||
|
self.logger.debug('Sending a Basic.Cancel RPC command to RabbitMQ')
|
||||||
|
self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
|
||||||
|
|
||||||
|
def start_consuming(self):
|
||||||
|
"""This method sets up the consumer by first calling
|
||||||
|
add_on_cancel_callback so that the object is notified if RabbitMQ
|
||||||
|
cancels the consumer. It then issues the Basic.Consume RPC command
|
||||||
|
which returns the consumer tag that is used to uniquely identify the
|
||||||
|
consumer with RabbitMQ. We keep the value to use it when we want to
|
||||||
|
cancel consuming. The on_message method is passed in as a callback pika
|
||||||
|
will invoke when a message is fully received.
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.logger.debug('Issuing consumer related RPC commands')
|
||||||
|
self.add_on_cancel_callback()
|
||||||
|
self._consumer_tag = self._channel.basic_consume(self.on_message,
|
||||||
|
self.queue_name, no_ack=True)
|
||||||
|
|
||||||
|
def on_bindok(self, unused_frame):
|
||||||
|
"""Invoked by pika when the Queue.Bind method has completed. At this
|
||||||
|
point we will start consuming messages by calling start_consuming
|
||||||
|
which will invoke the needed RPC commands to start the process.
|
||||||
|
|
||||||
|
:param pika.frame.Method unused_frame: The Queue.BindOk response frame
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.logger.debug('Queue bound')
|
||||||
|
if len(self.routing_keys_to_bind):
|
||||||
|
self.bind_queue_to_routing_key(self.routing_keys_to_bind.pop())
|
||||||
|
else:
|
||||||
|
self.start_consuming()
|
||||||
|
|
||||||
|
def close_channel(self):
|
||||||
|
"""Call to close the channel with RabbitMQ cleanly by issuing the
|
||||||
|
Channel.Close RPC command.
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.logger.debug('Closing the channel')
|
||||||
|
self._channel.close()
|
||||||
|
|
||||||
|
def open_channel(self):
|
||||||
|
"""Open a new channel with RabbitMQ by issuing the Channel.Open RPC
|
||||||
|
command. When RabbitMQ responds that the channel is open, the
|
||||||
|
on_channel_open callback will be invoked by pika.
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.logger.debug('Creating a new channel')
|
||||||
|
self._connection.channel(on_open_callback=self.on_channel_open)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
"""Run the example consumer by connecting to RabbitMQ and then
|
||||||
|
starting the IOLoop to block and allow the SelectConnection to operate.
|
||||||
|
|
||||||
|
"""
|
||||||
|
self._connection = self.connect()
|
||||||
|
self._connection.ioloop.start()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""Cleanly shutdown the connection to RabbitMQ by stopping the consumer
|
||||||
|
with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok
|
||||||
|
will be invoked by pika, which will then closing the channel and
|
||||||
|
connection. The IOLoop is started again because this method is invoked
|
||||||
|
when CTRL-C is pressed raising a KeyboardInterrupt exception. This
|
||||||
|
exception stops the IOLoop which needs to be running for pika to
|
||||||
|
communicate with RabbitMQ. All of the commands issued prior to starting
|
||||||
|
the IOLoop will be buffered but not processed.
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.logger.debug('Stopping')
|
||||||
|
self._closing = True
|
||||||
|
self.stop_consuming()
|
||||||
|
self._connection.ioloop.start()
|
||||||
|
self.logger.debug('Stopped')
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
|
||||||
|
'-35s %(lineno) -5d: %(message)s')
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
|
||||||
|
example = PubSubConsumer('amqps://opensuse:opensuse@rabbit.opensuse.org',
|
||||||
|
logging.getLogger(__name__))
|
||||||
|
try:
|
||||||
|
example.run()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
example.stop()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
25
dist/package/openSUSE-release-tools.spec
vendored
25
dist/package/openSUSE-release-tools.spec
vendored
@ -293,6 +293,16 @@ Requires: osc >= 0.159.0
|
|||||||
%description -n osc-plugin-vdelreq
|
%description -n osc-plugin-vdelreq
|
||||||
OSC plugin to check for virtually accepted request, see `osc vdelreq --help`.
|
OSC plugin to check for virtually accepted request, see `osc vdelreq --help`.
|
||||||
|
|
||||||
|
%package rabbit-openqa
|
||||||
|
Summary: Sync openQA Status Into OBS
|
||||||
|
Group: Development/Tools/Other
|
||||||
|
BuildArch: noarch
|
||||||
|
Requires: osc >= 0.159.0
|
||||||
|
|
||||||
|
%description rabbit-openqa
|
||||||
|
Bot listening to AMQP bus and syncs openQA job status into OBS for
|
||||||
|
staging projects
|
||||||
|
|
||||||
%prep
|
%prep
|
||||||
%setup -q
|
%setup -q
|
||||||
|
|
||||||
@ -388,6 +398,14 @@ fi
|
|||||||
%postun pkglistgen
|
%postun pkglistgen
|
||||||
%systemd_postun
|
%systemd_postun
|
||||||
|
|
||||||
|
%pre rabbit-openqa
|
||||||
|
getent passwd osrt-rabit-openqa > /dev/null || \
|
||||||
|
useradd -r -m -s /sbin/nologin -c "user for openSUSE-release-tools-rabbit-openqa" osrt-rabit-openqa
|
||||||
|
exit 0
|
||||||
|
|
||||||
|
%postun rabbit-openqa
|
||||||
|
%systemd_postun
|
||||||
|
|
||||||
%files
|
%files
|
||||||
%defattr(-,root,root,-)
|
%defattr(-,root,root,-)
|
||||||
%doc README.md
|
%doc README.md
|
||||||
@ -436,6 +454,7 @@ fi
|
|||||||
%exclude %{_datadir}/%{source_dir}/osc-staging.py
|
%exclude %{_datadir}/%{source_dir}/osc-staging.py
|
||||||
%exclude %{_datadir}/%{source_dir}/osc-vdelreq.py
|
%exclude %{_datadir}/%{source_dir}/osc-vdelreq.py
|
||||||
%exclude %{_datadir}/%{source_dir}/update_crawler.py
|
%exclude %{_datadir}/%{source_dir}/update_crawler.py
|
||||||
|
%exclude %{_datadir}/%{source_dir}/rabbit-openqa.py
|
||||||
%dir %{_sysconfdir}/openSUSE-release-tools
|
%dir %{_sysconfdir}/openSUSE-release-tools
|
||||||
|
|
||||||
%files devel
|
%files devel
|
||||||
@ -571,6 +590,12 @@ fi
|
|||||||
%{_unitdir}/osrt-pkglistgen@.service
|
%{_unitdir}/osrt-pkglistgen@.service
|
||||||
%{_unitdir}/osrt-pkglistgen@.timer
|
%{_unitdir}/osrt-pkglistgen@.timer
|
||||||
|
|
||||||
|
%files rabbit-openqa
|
||||||
|
%defattr(-,root,root,-)
|
||||||
|
%{_bindir}/osrt-rabbit-openqa
|
||||||
|
%{_datadir}/%{source_dir}/rabbit-openqa.py
|
||||||
|
%{_unitdir}/osrt-rabbit-openqa.service
|
||||||
|
|
||||||
%files -n osclib
|
%files -n osclib
|
||||||
%defattr(-,root,root,-)
|
%defattr(-,root,root,-)
|
||||||
%{_datadir}/%{source_dir}/osclib
|
%{_datadir}/%{source_dir}/osclib
|
||||||
|
272
rabbit-openqa.py
Executable file
272
rabbit-openqa.py
Executable file
@ -0,0 +1,272 @@
|
|||||||
|
#!/usr/bin/python
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
import pika
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
import osc
|
||||||
|
import re
|
||||||
|
from time import sleep
|
||||||
|
from osc.core import http_GET, http_POST, makeurl
|
||||||
|
from M2Crypto.SSL import SSLError as SSLError
|
||||||
|
from osclib.conf import Config
|
||||||
|
from osclib.stagingapi import StagingAPI
|
||||||
|
from lxml import etree as ET
|
||||||
|
from openqa_client.client import OpenQA_Client
|
||||||
|
from openqa_client.exceptions import ConnectionError
|
||||||
|
from urllib import quote_plus
|
||||||
|
import requests
|
||||||
|
try:
|
||||||
|
from urllib.error import HTTPError, URLError
|
||||||
|
except ImportError:
|
||||||
|
# python 2.x
|
||||||
|
from urllib2 import HTTPError, URLError
|
||||||
|
from PubSubConsumer import PubSubConsumer
|
||||||
|
|
||||||
|
|
||||||
|
class Project(object):
|
||||||
|
def __init__(self, name):
|
||||||
|
self.name = name
|
||||||
|
Config(apiurl, name)
|
||||||
|
self.api = StagingAPI(apiurl, name)
|
||||||
|
self.staging_projects = dict()
|
||||||
|
self.listener = None
|
||||||
|
self.replace_string = self.api.attribute_value_load('OpenQAMapping')
|
||||||
|
|
||||||
|
def init(self):
|
||||||
|
for p in self.api.get_staging_projects():
|
||||||
|
if self.api.is_adi_project(p):
|
||||||
|
continue
|
||||||
|
self.staging_projects[p] = self.initial_staging_state(p)
|
||||||
|
self.update_staging_status(p)
|
||||||
|
|
||||||
|
def staging_letter(self, name):
|
||||||
|
return name.split(':')[-1]
|
||||||
|
|
||||||
|
def map_iso(self, staging_project, iso):
|
||||||
|
parts = self.replace_string.split('/')
|
||||||
|
if parts[0] != 's':
|
||||||
|
raise Exception("{}'s iso_replace_string does not start with s/".format(self.name))
|
||||||
|
old = parts[1]
|
||||||
|
new = parts[2]
|
||||||
|
new = new.replace('$LETTER', self.staging_letter(staging_project))
|
||||||
|
return re.compile(old).sub(new, iso)
|
||||||
|
|
||||||
|
def gather_isos(self, name, repository):
|
||||||
|
url = self.api.makeurl(['published', name, repository, 'iso'])
|
||||||
|
f = self.api.retried_GET(url)
|
||||||
|
root = ET.parse(f).getroot()
|
||||||
|
ret = []
|
||||||
|
for entry in root.findall('entry'):
|
||||||
|
if entry.get('name').endswith('iso'):
|
||||||
|
ret.append(self.map_iso(name, entry.get('name')))
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def gather_buildid(self, name, repository):
|
||||||
|
url = self.api.makeurl(['published', name, repository], {'view': 'status'})
|
||||||
|
f = self.api.retried_GET(url)
|
||||||
|
id = ET.parse(f).getroot().find('buildid')
|
||||||
|
if id is not None:
|
||||||
|
return id.text
|
||||||
|
|
||||||
|
def initial_staging_state(self, name):
|
||||||
|
return {'isos': self.gather_isos(name, 'images'),
|
||||||
|
'id': self.gather_buildid(name, 'images')}
|
||||||
|
|
||||||
|
def fetch_openqa_jobs(self, staging, iso):
|
||||||
|
buildid = self.staging_projects[staging].get('id')
|
||||||
|
if not buildid:
|
||||||
|
self.logger.info("I don't know the build id of " + staging)
|
||||||
|
return
|
||||||
|
# all openQA jobs are created at the same URL
|
||||||
|
url = self.api.makeurl(['status_reports', 'published', staging, 'images', 'reports', buildid])
|
||||||
|
openqa = self.listener.jobs_for_iso(iso)
|
||||||
|
# collect job infos to pick names
|
||||||
|
openqa_infos = dict()
|
||||||
|
for job in openqa:
|
||||||
|
print(staging, iso, job['id'], job['state'], job['result'],
|
||||||
|
job['settings']['MACHINE'], job['settings']['TEST'])
|
||||||
|
openqa_infos[job['id']] = {'url': self.listener.test_url(job)}
|
||||||
|
openqa_infos[job['id']]['state'] = self.map_openqa_result(job)
|
||||||
|
openqa_infos[job['id']]['name'] = job['settings']['TEST']
|
||||||
|
openqa_infos[job['id']]['machine'] = job['settings']['MACHINE']
|
||||||
|
|
||||||
|
# make sure the names are unique
|
||||||
|
taken_names = dict()
|
||||||
|
for id in openqa_infos:
|
||||||
|
name = openqa_infos[id]['name']
|
||||||
|
if name in taken_names:
|
||||||
|
openqa_infos[id]['name'] = openqa_infos[id]['name'] + "@" + openqa_infos[id]['machine']
|
||||||
|
# the other id
|
||||||
|
id = taken_names[name]
|
||||||
|
openqa_infos[id]['name'] = openqa_infos[id]['name'] + "@" + openqa_infos[id]['machine']
|
||||||
|
taken_names[name] = id
|
||||||
|
|
||||||
|
for info in openqa_infos.values():
|
||||||
|
xml = self.openqa_check_xml(info['url'], info['state'], info['name'])
|
||||||
|
try:
|
||||||
|
http_POST(url, data=xml)
|
||||||
|
except HTTPError:
|
||||||
|
self.logger.error('failed to post status to ' + url)
|
||||||
|
|
||||||
|
def update_staging_status(self, project):
|
||||||
|
for iso in self.staging_projects[project]['isos']:
|
||||||
|
self.fetch_openqa_jobs(project, iso)
|
||||||
|
|
||||||
|
def update_staging_buildid(self, project, repository, buildid):
|
||||||
|
self.staging_projects[project]['id'] = buildid
|
||||||
|
self.staging_projects[project]['isos'] = self.gather_isos(project, repository)
|
||||||
|
self.update_staging_status(project)
|
||||||
|
|
||||||
|
def check_published_repo(self, project, repository, buildid):
|
||||||
|
if repository != 'images':
|
||||||
|
return
|
||||||
|
for p in self.staging_projects:
|
||||||
|
if project == p:
|
||||||
|
self.update_staging_buildid(project, repository, buildid)
|
||||||
|
|
||||||
|
def matching_project(self, iso):
|
||||||
|
for p in self.staging_projects:
|
||||||
|
if iso in self.staging_projects[p]['isos']:
|
||||||
|
return p
|
||||||
|
|
||||||
|
def map_openqa_result(self, job):
|
||||||
|
if job['result'] in ['passed', 'softfailed']:
|
||||||
|
return 'success'
|
||||||
|
if job['result'] == 'none':
|
||||||
|
return 'pending'
|
||||||
|
return 'failure'
|
||||||
|
|
||||||
|
def openqa_job_change(self, iso):
|
||||||
|
staging = self.matching_project(iso)
|
||||||
|
if not staging:
|
||||||
|
return
|
||||||
|
# we fetch all openqa jobs so we can avoid long job names
|
||||||
|
self.fetch_openqa_jobs(staging, iso)
|
||||||
|
|
||||||
|
def openqa_check_xml(self, url, state, name):
|
||||||
|
check = ET.Element('check')
|
||||||
|
se = ET.SubElement(check, 'url')
|
||||||
|
se.text = url
|
||||||
|
se = ET.SubElement(check, 'state')
|
||||||
|
se.text = state
|
||||||
|
se = ET.SubElement(check, 'name')
|
||||||
|
se.text = name
|
||||||
|
return ET.tostring(check)
|
||||||
|
|
||||||
|
|
||||||
|
class Listener(PubSubConsumer):
|
||||||
|
def __init__(self, amqp_prefix, amqp_url, openqa_url):
|
||||||
|
super(Listener, self).__init__(amqp_url, logging.getLogger(__name__))
|
||||||
|
self.projects = []
|
||||||
|
self.amqp_prefix = amqp_prefix
|
||||||
|
self.openqa_url = openqa_url
|
||||||
|
self.openqa = OpenQA_Client(server=openqa_url)
|
||||||
|
|
||||||
|
def routing_keys(self):
|
||||||
|
ret = []
|
||||||
|
for suffix in ['.obs.repo.published', '.openqa.job.done',
|
||||||
|
'.openqa.job.create', '.openqa.job.restart']:
|
||||||
|
ret.append(self.amqp_prefix + suffix)
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def add(self, project):
|
||||||
|
project.listener = self
|
||||||
|
self.projects.append(project)
|
||||||
|
|
||||||
|
def start_consuming(self):
|
||||||
|
# now we are (re-)connected to the bus and need to fetch the
|
||||||
|
# initial state
|
||||||
|
for project in self.projects:
|
||||||
|
self.logger.info('Fetching ISOs of %s', project.name)
|
||||||
|
project.init()
|
||||||
|
self.logger.info('Finished fetching initial ISOs, listening')
|
||||||
|
super(Listener, self).start_consuming()
|
||||||
|
|
||||||
|
def jobs_for_iso(self, iso):
|
||||||
|
values = {
|
||||||
|
'iso': iso,
|
||||||
|
'scope': 'current',
|
||||||
|
'latest': '1',
|
||||||
|
}
|
||||||
|
return self.openqa.openqa_request('GET', 'jobs', values)['jobs']
|
||||||
|
|
||||||
|
def get_step_url(self, testurl, modulename):
|
||||||
|
failurl = testurl + '/modules/{!s}/fails'.format(quote_plus(modulename))
|
||||||
|
fails = requests.get(failurl).json()
|
||||||
|
failed_step = fails.get('first_failed_step', 1)
|
||||||
|
return "{!s}#step/{!s}/{:d}".format(testurl, modulename, failed_step)
|
||||||
|
|
||||||
|
def test_url(self, job):
|
||||||
|
url = self.openqa_url + ("/tests/%d" % job['id'])
|
||||||
|
if job['result'] == 'failed':
|
||||||
|
for module in job['modules']:
|
||||||
|
if module['result'] == 'failed':
|
||||||
|
return self.get_step_url(url, module['name'])
|
||||||
|
return url
|
||||||
|
|
||||||
|
def on_published_repo(self, payload):
|
||||||
|
for p in self.projects:
|
||||||
|
p.check_published_repo(str(payload['project']), str(payload['repo']), str(payload['buildid']))
|
||||||
|
|
||||||
|
def on_openqa_job(self, iso):
|
||||||
|
self.logger.debug('openqa_job_change', iso)
|
||||||
|
for p in self.projects:
|
||||||
|
p.openqa_job_change(iso)
|
||||||
|
|
||||||
|
def on_message(self, unused_channel, method, properties, body):
|
||||||
|
if method.routing_key == '{}.obs.repo.published'.format(amqp_prefix):
|
||||||
|
self.on_published_repo(json.loads(body))
|
||||||
|
elif re.search(r'.openqa.', method.routing_key):
|
||||||
|
self.on_openqa_job(json.loads(body).get('ISO'))
|
||||||
|
else:
|
||||||
|
self.logger.warning("unknown rabbitmq message {}".format(method.routing_key))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description='Bot to sync openQA status to OBS')
|
||||||
|
parser.add_argument("--apiurl", '-A', type=str, default='https://api.opensuse.org', help='API URL of OBS')
|
||||||
|
parser.add_argument('-s', '--staging', type=str, default=None,
|
||||||
|
help='staging project letter')
|
||||||
|
parser.add_argument('-f', '--force', action='store_true', default=False,
|
||||||
|
help='force the write of the comment')
|
||||||
|
parser.add_argument('-p', '--project', type=str, default='Factory',
|
||||||
|
help='openSUSE version to make the check (Factory, 13.2)')
|
||||||
|
parser.add_argument('-d', '--debug', action='store_true', default=False,
|
||||||
|
help='enable debug information')
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
osc.conf.get_config()
|
||||||
|
osc.conf.config['debug'] = args.debug
|
||||||
|
|
||||||
|
apiurl = args.apiurl
|
||||||
|
|
||||||
|
if apiurl.endswith('suse.de'):
|
||||||
|
amqp_prefix = 'suse'
|
||||||
|
amqp_url = "amqps://suse:suse@rabbit.suse.de"
|
||||||
|
openqa_url = 'https://openqa.suse.de'
|
||||||
|
else:
|
||||||
|
amqp_prefix = 'opensuse'
|
||||||
|
amqp_url = "amqps://opensuse:opensuse@rabbit.opensuse.org"
|
||||||
|
openqa_url = 'https://openqa.opensuse.org'
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
|
||||||
|
l = Listener(amqp_prefix, amqp_url, openqa_url)
|
||||||
|
url = makeurl(apiurl, ['search', 'project', 'id'], {'match': 'attribute/@name="OSRT:OpenQAMapping"'})
|
||||||
|
f = http_GET(url)
|
||||||
|
root = ET.parse(f).getroot()
|
||||||
|
for entry in root.findall('project'):
|
||||||
|
l.add(Project(entry.get('name')))
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
l.run()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
l.stop()
|
||||||
|
except (HTTPError, URLError, ConnectionError, SSLError):
|
||||||
|
# OBS/openQA hickup
|
||||||
|
sleep(10)
|
9
systemd/osrt-rabbit-openqa.service
Normal file
9
systemd/osrt-rabbit-openqa.service
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
[Unit]
|
||||||
|
Description=openSUSE Release Tools: Sync openQA status
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
User=osrt-rabbit-openqa
|
||||||
|
ExecStart=/usr/bin/osrt-rabbit-openqa
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=multi-user.target
|
Loading…
x
Reference in New Issue
Block a user