Fix rabbitmq acknowledge: first ACK then check the content
This commit is contained in:
parent
fd3bbccfa4
commit
a73054b189
@ -271,7 +271,7 @@ class PubSubConsumer(object):
|
|||||||
:param int delivery_tag: The delivery tag from the Basic.Deliver frame
|
:param int delivery_tag: The delivery tag from the Basic.Deliver frame
|
||||||
|
|
||||||
"""
|
"""
|
||||||
self.logger.info('Acknowledging message %s', delivery_tag)
|
self.logger.debug('Acknowledging message %s', delivery_tag)
|
||||||
self._channel.basic_ack(delivery_tag)
|
self._channel.basic_ack(delivery_tag)
|
||||||
|
|
||||||
def on_cancelok(self, _unused_frame, userdata):
|
def on_cancelok(self, _unused_frame, userdata):
|
||||||
|
@ -213,13 +213,13 @@ class Listener(PubSubConsumer):
|
|||||||
p.openqa_job_change(iso)
|
p.openqa_job_change(iso)
|
||||||
|
|
||||||
def on_message(self, unused_channel, method, properties, body):
|
def on_message(self, unused_channel, method, properties, body):
|
||||||
|
self.acknowledge_message(method.delivery_tag)
|
||||||
if method.routing_key == '{}.obs.repo.published'.format(amqp_prefix):
|
if method.routing_key == '{}.obs.repo.published'.format(amqp_prefix):
|
||||||
self.on_published_repo(json.loads(body))
|
self.on_published_repo(json.loads(body))
|
||||||
elif re.search(r'.openqa.', method.routing_key):
|
elif re.search(r'.openqa.', method.routing_key):
|
||||||
self.on_openqa_job(json.loads(body).get('ISO'))
|
self.on_openqa_job(json.loads(body).get('ISO'))
|
||||||
else:
|
else:
|
||||||
self.logger.warning("unknown rabbitmq message {}".format(method.routing_key))
|
self.logger.warning("unknown rabbitmq message {}".format(method.routing_key))
|
||||||
self.acknowledge_message(method.delivery_tag)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
|
@ -86,6 +86,7 @@ class Listener(PubSubConsumer):
|
|||||||
f.write('{}: {}\n'.format(arch, ids[arch]))
|
f.write('{}: {}\n'.format(arch, ids[arch]))
|
||||||
|
|
||||||
def on_message(self, unused_channel, method, properties, body):
|
def on_message(self, unused_channel, method, properties, body):
|
||||||
|
self.acknowledge_message(method.delivery_tag)
|
||||||
try:
|
try:
|
||||||
body = json.loads(body)
|
body = json.loads(body)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
@ -100,7 +101,6 @@ class Listener(PubSubConsumer):
|
|||||||
else:
|
else:
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
'unknown rabbitmq message {}'.format(method.routing_key))
|
'unknown rabbitmq message {}'.format(method.routing_key))
|
||||||
self.acknowledge_message(method.delivery_tag)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user