From a73054b189fb3cb5d05cd1599b32cf361c977a7e Mon Sep 17 00:00:00 2001 From: Stephan Kulow Date: Mon, 20 May 2019 11:43:04 +0200 Subject: [PATCH] Fix rabbitmq acknowledge: first ACK then check the content --- PubSubConsumer.py | 2 +- gocd/rabbit-openqa.py | 2 +- gocd/rabbit-repoid.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/PubSubConsumer.py b/PubSubConsumer.py index 73d04653..3d86398f 100644 --- a/PubSubConsumer.py +++ b/PubSubConsumer.py @@ -271,7 +271,7 @@ class PubSubConsumer(object): :param int delivery_tag: The delivery tag from the Basic.Deliver frame """ - self.logger.info('Acknowledging message %s', delivery_tag) + self.logger.debug('Acknowledging message %s', delivery_tag) self._channel.basic_ack(delivery_tag) def on_cancelok(self, _unused_frame, userdata): diff --git a/gocd/rabbit-openqa.py b/gocd/rabbit-openqa.py index e17bfbfb..0ea519f7 100755 --- a/gocd/rabbit-openqa.py +++ b/gocd/rabbit-openqa.py @@ -213,13 +213,13 @@ class Listener(PubSubConsumer): p.openqa_job_change(iso) def on_message(self, unused_channel, method, properties, body): + self.acknowledge_message(method.delivery_tag) if method.routing_key == '{}.obs.repo.published'.format(amqp_prefix): self.on_published_repo(json.loads(body)) elif re.search(r'.openqa.', method.routing_key): self.on_openqa_job(json.loads(body).get('ISO')) else: self.logger.warning("unknown rabbitmq message {}".format(method.routing_key)) - self.acknowledge_message(method.delivery_tag) if __name__ == '__main__': parser = argparse.ArgumentParser( diff --git a/gocd/rabbit-repoid.py b/gocd/rabbit-repoid.py index 7219be60..e3a3095e 100755 --- a/gocd/rabbit-repoid.py +++ b/gocd/rabbit-repoid.py @@ -86,6 +86,7 @@ class Listener(PubSubConsumer): f.write('{}: {}\n'.format(arch, ids[arch])) def on_message(self, unused_channel, method, properties, body): + self.acknowledge_message(method.delivery_tag) try: body = json.loads(body) except ValueError: @@ -100,7 +101,6 @@ class Listener(PubSubConsumer): else: self.logger.warning( 'unknown rabbitmq message {}'.format(method.routing_key)) - self.acknowledge_message(method.delivery_tag) if __name__ == '__main__': parser = argparse.ArgumentParser(