我如何关闭连接到扇出交换的Kombu ConsumerMixin队列,以便在我的消费者处于非活动状态时不会从发布者累积数据?
我在Python2.7中使用Kombu 3.0.24 (带RabbitMQ)。
下面是这两个类的代码。我希望这些类是相当泛型的类,这样我就可以在直接队列和类似RPC的查询/回复中重用它们。
问题是,如果我停止并重新启动消费者,旧数据就会在消费者队列中等待我。我假设这是因为当我停止消费者时,我需要删除队列,但我不知道如何删除。谢谢。
MessageConsumer.py
from kombu.mixins import ConsumerMixin
from kombu import Queue, Exchange, Connection
import logging
class MessageConsumer(ConsumerMixin):
def __init__(self,
broker='amqp://',
exchange='mExchange',
queue = 'mQueue',
type='direct',
no_ack=False):
self.connection = Connection(broker)
self.mExchange = Exchange(exchange, type=type)
self.mQueue = Queue(queue, self.mExchange)
self.mQueue.no_ack = no_ack
def get_consumers(self, Consumer, channel):
return [Consumer(queues=self.mQueue,
accept=['json'],
callbacks=[self.process_task])]
def process_task(self, body, message):
logging.debug('RECEIVED: {}'.format(body))
def stop(self):
self.should_stop = True
self.connection.release()
if __name__ == '__main__':
mMessageConsumer = MessageConsumer(exchange='sensor_data',
queue='rx1_queue',
type='fanout',
no_ack=True)
try:
mMessageConsumer.run()
except KeyboardInterrupt:
mMessageConsumer.stop()MessagePublisher.py
from kombu import Queue, Exchange, Connection
from kombu.pools import producers
import logging
class MessagePublisher(object):
def __init__(self,
broker='amqp://',
exchange='mExchange',
type='direct',
no_ack=False):
self.connection = Connection(broker)
self.mExchange = Exchange(exchange, type=type)
def publish(self, message, serializer='json', compression=None):
with producers[self.connection].acquire(block=True) as producer:
producer.publish(message,
serializer=serializer,
compression=compression,
exchange=self.mExchange,
declare=[self.mExchange]
)
def close(self):
self.connection.release()
if __name__ == '__main__':
mMessagePublisher = MessagePublisher(type='fanout',exchange='sensor_data')
x=0
while True:
x += 1
mMessagePublisher.publish(x)
mMessagePublisher.close()如果有更有效的方式让我编码,请建议它。我通过Googling找到的大多数示例都使用较旧版本的Kombu,所以我很难找出3.0.24的最佳实现。
发布于 2015-03-14 00:39:04
找到了解决方案。我需要用"exclusive=True“创建我的队列。那么这个队列只有在我的程序使用它的时候才存在。具体地说:
self.mQueue = Queue(queue, self.mExchange, exclusive=True)https://stackoverflow.com/questions/28935286
复制相似问题