Merge pull request #2010 from coolo/more_pika

Close the rabbit connection more carefully
This commit is contained in:
Stephan Kulow 2019-05-13 11:55:17 +02:00 committed by GitHub
commit 03a548021a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -7,7 +7,10 @@ import time
from datetime import datetime from datetime import datetime
class PubSubConsumer(object): class PubSubConsumer(object):
"""This is an example consumer that will handle unexpected interactions """
Based on https://github.com/pika/pika/blob/master/examples/asynchronous_consumer_example.py
This is an example consumer that will handle unexpected interactions
with RabbitMQ such as channel and connection closures. with RabbitMQ such as channel and connection closures.
If RabbitMQ closes the connection, it will reopen it. You should If RabbitMQ closes the connection, it will reopen it. You should
@ -30,6 +33,7 @@ class PubSubConsumer(object):
self._connection = None self._connection = None
self._channel = None self._channel = None
self._closing = False self._closing = False
self._consuming = False
self._consumer_tag = None self._consumer_tag = None
self._prefix = amqp_prefix self._prefix = amqp_prefix
self._timer_id = None self._timer_id = None
@ -77,8 +81,12 @@ class PubSubConsumer(object):
def close_connection(self): def close_connection(self):
"""This method closes the connection to RabbitMQ.""" """This method closes the connection to RabbitMQ."""
self.logger.info('Closing connection') self._consuming = False
self._connection.close() if self._connection.is_closing or self._connection.is_closed:
self.logger.info('Connection is closing or already closed')
else:
self.logger.info('Closing connection')
self._connection.close()
def add_on_connection_close_callback(self): def add_on_connection_close_callback(self):
"""This method adds an on close callback that will be invoked by pika """This method adds an on close callback that will be invoked by pika
@ -302,6 +310,7 @@ class PubSubConsumer(object):
self._consumer_tag = self._channel.basic_consume(self.queue_name, self._consumer_tag = self._channel.basic_consume(self.queue_name,
self.on_message, self.on_message,
auto_ack=False) auto_ack=False)
self._consuming = True
def on_bindok(self, unused_frame): def on_bindok(self, unused_frame):
"""Invoked by pika when the Queue.Bind method has completed. At this """Invoked by pika when the Queue.Bind method has completed. At this
@ -348,11 +357,15 @@ class PubSubConsumer(object):
""" """
self.logger.debug('Stopping') self.logger.debug('Stopping')
self._closing = True if not self._closing:
self.stop_consuming() self._closing = True
self._connection.ioloop.start() self.logger.debug('Stopping')
self.logger.debug('Stopped') if self._consuming:
self.stop_consuming()
#self._connection.ioloop.start()
else:
self._connection.ioloop.stop()
self.logger.debug('Stopped')
def main(): def main():
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) ' LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '