From 6297dacf8ba8ef04063327493720d5ecf9ce9631 Mon Sep 17 00:00:00 2001 From: Stephan Kulow Date: Sat, 18 May 2019 20:57:05 +0200 Subject: [PATCH] Acknowledge the rabbitmq message We need to ACK the rabbitmq message for the server to erase it in the queue - otherwise they are piling up endlessly. As we quit the services every hour the queue is reset at that point, but that still makes monitoring the server quite hard as the queues tend to be full all the time --- PubSubConsumer.py | 11 +++++++++++ gocd/rabbit-openqa.py | 2 +- gocd/rabbit-repoid.py | 2 +- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/PubSubConsumer.py b/PubSubConsumer.py index b5323bf6..73d04653 100644 --- a/PubSubConsumer.py +++ b/PubSubConsumer.py @@ -262,6 +262,17 @@ class PubSubConsumer(object): """ self.logger.info('Received message # %s: %s %s', basic_deliver.delivery_tag, basic_deliver.routing_key, body) + self.acknowledge_message(basic_deliver.delivery_tag) + + def acknowledge_message(self, delivery_tag): + """Acknowledge the message delivery from RabbitMQ by sending a + Basic.Ack RPC method for the delivery tag. + + :param int delivery_tag: The delivery tag from the Basic.Deliver frame + + """ + self.logger.info('Acknowledging message %s', delivery_tag) + self._channel.basic_ack(delivery_tag) def on_cancelok(self, _unused_frame, userdata): """This method is invoked by pika when RabbitMQ acknowledges the diff --git a/gocd/rabbit-openqa.py b/gocd/rabbit-openqa.py index 8af79912..e17bfbfb 100755 --- a/gocd/rabbit-openqa.py +++ b/gocd/rabbit-openqa.py @@ -219,7 +219,7 @@ class Listener(PubSubConsumer): self.on_openqa_job(json.loads(body).get('ISO')) else: self.logger.warning("unknown rabbitmq message {}".format(method.routing_key)) - + self.acknowledge_message(method.delivery_tag) if __name__ == '__main__': parser = argparse.ArgumentParser( diff --git a/gocd/rabbit-repoid.py b/gocd/rabbit-repoid.py index f6bf4865..7219be60 100755 --- a/gocd/rabbit-repoid.py +++ b/gocd/rabbit-repoid.py @@ -100,7 +100,7 @@ class Listener(PubSubConsumer): else: self.logger.warning( 'unknown rabbitmq message {}'.format(method.routing_key)) - + self.acknowledge_message(method.delivery_tag) if __name__ == '__main__': parser = argparse.ArgumentParser(