首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Python Kombu -阻塞

Python Kombu -阻塞
EN

Stack Overflow用户
提问于 2014-05-30 04:36:56
回答 2查看 3.1K关注 0票数 1

通过生产者/消费者模型,我使用kombu来管理RabbitMQ。我启动了我的生产者,它将100个作业放在一个队列中(我只有一个队列,一个交换)。我想推出多个消费者,同时,并让每个消费者过程一次一个工作。不幸的是,消费者之间相互阻碍(即,当一个消费者从队列中抢夺一份工作时,另一个消费者只是坐在那里闲着)。如果我杀死了工作中的消费者,那么其他消费者中的一个就会开始工作。是否有一种方法可以让所有的使用者同时运行,每一个处理来自队列的不同的作业?我的消费者代码如下:

代码语言:javascript
复制
def start_consumer(self, incoming_exchange_name):
    if self.rabbitmq_connection.connected:
        callbacks=[]
        queues=[]

        callbacks.append(self._callback)
        queues.append(self.incoming_queue)

        print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (self.incoming_exchange.name, self.incoming_queue.name)
        self.incoming_exchange(settings.rabbitmq_connection).declare()
        self.incoming_queue(settings.rabbitmq_connection).declare()

        with settings.rabbitmq_connection.Consumer(queues=queues, callbacks=callbacks) as consumer:
            while True:
                try:
                    self.rabbitmq_connection.drain_events()
                except Exception as e:
                    print 'Error -> %s' % e.message 
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2014-09-19 02:48:00

您需要将使用者预取设置为1 (qos),这样每个使用者只需获取1条消息,其余的信息就会留在队列中,状态就绪,因此,如果将2个QOS设置为1的消费者设置为1,并且您有100条消息,那么您将同时处理两个任务。

我将缺失的部分添加到代码中,以设置预取计数

代码语言:javascript
复制
def start_consumer(self, incoming_exchange_name):
if self.rabbitmq_connection.connected:
    callbacks=[]
    queues=[]

    callbacks.append(self._callback)
    queues.append(self.incoming_queue)

    print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (self.incoming_exchange.name, self.incoming_queue.name)
    self.incoming_exchange(settings.rabbitmq_connection).declare()
    self.incoming_queue(settings.rabbitmq_connection).declare()

    channel = self.rabbitmq_connection.channel()
    channel.basic_qos(prefetch_size=0, prefetch_count=1, a_global=False)

    with settings.rabbitmq_connection.Consumer(queues=queues, callbacks=callbacks, channel=channel) as consumer:
        while True:
            try:
                self.rabbitmq_connection.drain_events()
            except Exception as e:
                print 'Error -> %s' % e.message 
票数 5
EN

Stack Overflow用户

发布于 2014-05-30 08:50:30

我认为你实际上是想自己重写芹菜:

http://www.celeryproject.org/

除非你这么做纯粹是为了学习的目的,否则不要再痛苦,而要使用芹菜。顺便说一句,kombuRabbitMQ正是芹菜所用的后端(更别提Redis后端了,这为我在某些应用程序中节省了数小时的精力)。

票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/23947119

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档