parent
ccca194f84
commit
e0a2d7e653
@ -7,7 +7,10 @@ import time
|
||||
from datetime import datetime
|
||||
|
||||
class PubSubConsumer(object):
|
||||
"""This is an example consumer that will handle unexpected interactions
|
||||
"""
|
||||
Based on https://github.com/pika/pika/blob/master/examples/asynchronous_consumer_example.py
|
||||
|
||||
This is an example consumer that will handle unexpected interactions
|
||||
with RabbitMQ such as channel and connection closures.
|
||||
|
||||
If RabbitMQ closes the connection, it will reopen it. You should
|
||||
@ -30,6 +33,7 @@ class PubSubConsumer(object):
|
||||
self._connection = None
|
||||
self._channel = None
|
||||
self._closing = False
|
||||
self._consuming = False
|
||||
self._consumer_tag = None
|
||||
self._prefix = amqp_prefix
|
||||
self._timer_id = None
|
||||
@ -77,6 +81,10 @@ class PubSubConsumer(object):
|
||||
|
||||
def close_connection(self):
|
||||
"""This method closes the connection to RabbitMQ."""
|
||||
self._consuming = False
|
||||
if self._connection.is_closing or self._connection.is_closed:
|
||||
self.logger.info('Connection is closing or already closed')
|
||||
else:
|
||||
self.logger.info('Closing connection')
|
||||
self._connection.close()
|
||||
|
||||
@ -302,6 +310,7 @@ class PubSubConsumer(object):
|
||||
self._consumer_tag = self._channel.basic_consume(self.queue_name,
|
||||
self.on_message,
|
||||
auto_ack=False)
|
||||
self._consuming = True
|
||||
|
||||
def on_bindok(self, unused_frame):
|
||||
"""Invoked by pika when the Queue.Bind method has completed. At this
|
||||
@ -348,12 +357,16 @@ class PubSubConsumer(object):
|
||||
|
||||
"""
|
||||
self.logger.debug('Stopping')
|
||||
if not self._closing:
|
||||
self._closing = True
|
||||
self.logger.debug('Stopping')
|
||||
if self._consuming:
|
||||
self.stop_consuming()
|
||||
self._connection.ioloop.start()
|
||||
#self._connection.ioloop.start()
|
||||
else:
|
||||
self._connection.ioloop.stop()
|
||||
self.logger.debug('Stopped')
|
||||
|
||||
|
||||
def main():
|
||||
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
|
||||
'-35s %(lineno) -5d: %(message)s')
|
||||
|
Loading…
x
Reference in New Issue
Block a user