首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kombu ConsumerMixin关闭队列,防止扇出数据接收

Kombu ConsumerMixin关闭队列,防止扇出数据接收
EN

Stack Overflow用户
提问于 2015-03-09 12:31:22
回答 1查看 1K关注 0票数 0

我如何关闭连接到扇出交换的Kombu ConsumerMixin队列,以便在我的消费者处于非活动状态时不会从发布者累积数据?

我在Python2.7中使用Kombu 3.0.24 (带RabbitMQ)。

下面是这两个类的代码。我希望这些类是相当泛型的类,这样我就可以在直接队列和类似RPC的查询/回复中重用它们。

问题是,如果我停止并重新启动消费者,旧数据就会在消费者队列中等待我。我假设这是因为当我停止消费者时,我需要删除队列,但我不知道如何删除。谢谢。

MessageConsumer.py

代码语言:javascript
复制
from kombu.mixins import ConsumerMixin
from kombu import Queue, Exchange, Connection
import logging

class MessageConsumer(ConsumerMixin):

    def __init__(self,
                 broker='amqp://',
                 exchange='mExchange',
                 queue = 'mQueue',
                 type='direct',
                 no_ack=False):
        self.connection = Connection(broker)
        self.mExchange = Exchange(exchange, type=type)
        self.mQueue = Queue(queue, self.mExchange)
        self.mQueue.no_ack = no_ack

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=self.mQueue,
                         accept=['json'],
                         callbacks=[self.process_task])]

    def process_task(self, body, message):
        logging.debug('RECEIVED: {}'.format(body))

    def stop(self):
        self.should_stop = True
        self.connection.release()


if __name__ == '__main__':

    mMessageConsumer = MessageConsumer(exchange='sensor_data',
                                       queue='rx1_queue',
                                       type='fanout',
                                       no_ack=True)
    try:
        mMessageConsumer.run()
    except KeyboardInterrupt:
        mMessageConsumer.stop()

MessagePublisher.py

代码语言:javascript
复制
from kombu import Queue, Exchange, Connection
from kombu.pools import producers
import logging

class MessagePublisher(object):

    def __init__(self,
                 broker='amqp://',
                 exchange='mExchange',
                 type='direct',
                 no_ack=False):
        self.connection = Connection(broker)
        self.mExchange = Exchange(exchange, type=type)

    def publish(self, message, serializer='json', compression=None):
        with producers[self.connection].acquire(block=True) as producer:
            producer.publish(message,
                             serializer=serializer,
                             compression=compression,
                             exchange=self.mExchange,
                             declare=[self.mExchange]
                             )

    def close(self):
        self.connection.release()

if __name__ == '__main__':
    mMessagePublisher = MessagePublisher(type='fanout',exchange='sensor_data')
    x=0
    while True:
        x += 1
        mMessagePublisher.publish(x)
    mMessagePublisher.close()

如果有更有效的方式让我编码,请建议它。我通过Googling找到的大多数示例都使用较旧版本的Kombu,所以我很难找出3.0.24的最佳实现。

EN

回答 1

Stack Overflow用户

发布于 2015-03-14 00:39:04

找到了解决方案。我需要用"exclusive=True“创建我的队列。那么这个队列只有在我的程序使用它的时候才存在。具体地说:

代码语言:javascript
复制
self.mQueue = Queue(queue, self.mExchange, exclusive=True)
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/28935286

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档