我有两台服务器,分别叫做A和B。B运行RabbitMQ,而A通过Kombu连接到RabbitMQ。如果我在B上重新启动RabbitMQ,则kombu连接断开,消息不再传递。然后,我必须重置A上的进程以重新建立连接。有没有更好的方法,也就是说,有没有一种方法可以让Kombu自动重新连接,即使RabbitMQ进程重新启动了?
我的基本代码实现如下,提前感谢!:)
def start_consumer(routing_key, incoming_exchange_name, outgoing_exchange_name):
global rabbitmq_producer
incoming_exchange = kombu.Exchange(name=incoming_exchange_name, type='direct')
incoming_queue = kombu.Queue(name=routing_key+'_'+incoming_exchange_name, exchange=incoming_exchange, routing_key=routing_key)#, auto_delete=True)
outgoing_exchange = kombu.Exchange(name=outgoing_exchange_name, type='direct')
rabbitmq_producer = kombu.Producer(settings.rabbitmq_connection0, exchange=outgoing_exchange, serializer='json', compression=None, auto_declare=True)
settings.rabbitmq_connection0.connect()
if settings.rabbitmq_connection0.connected:
callbacks=[]
queues=[]
callbacks.append(callback)
# if push_queue:
# callbacks.append(push_message_callback)
queues.append(incoming_queue)
print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (incoming_exchange.name, incoming_queue.name)
incoming_exchange(settings.rabbitmq_connection0).declare()
incoming_queue(settings.rabbitmq_connection0).declare()
print 'opening a new *outgoing* rabbitmq connection to the %s exchange' % outgoing_exchange.name
outgoing_exchange(settings.rabbitmq_connection0).declare()
with settings.rabbitmq_connection0.Consumer(queues=queues, callbacks=callbacks) as consumer:
while True:
settings.rabbitmq_connection0.drain_events()发布于 2016-08-09 22:17:48
在消费者端,当连接断开时,kombu.mixins.ConsumerMixin处理重新连接(还处理心跳等,并允许您编写更少的代码)。不幸的是,似乎没有ProducerMixin,但您可以深入研究代码并对其进行调整...?
https://stackoverflow.com/questions/37578967
复制相似问题