首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Kombu ConsumerMixin,如何声明多个绑定?

使用Kombu ConsumerMixin,如何声明多个绑定?
EN

Stack Overflow用户
提问于 2016-05-13 15:53:58
回答 2查看 2.9K关注 0票数 4

我有一个RabbitMQ主题交换,名为experiment。我正在构建一个使用者,在这里我希望接收路由密钥以"foo“开头的所有消息,以及其路由密钥以"bar”开头的所有消息。

根据RabbitMQ文档,根据我自己在管理UI中的实验,应该可以有一个交换、一个队列和两个绑定(foo.#bar.#)来连接它们。

我不知道如何用Kombu的ConsumerMixin来表达这一点。我觉得我应该能做到:

代码语言:javascript
复制
q = Queue(exchange=exchange, routing_key=['foo.#', 'bar.#'])

...but --一点也不喜欢这样。我也试过:

代码语言:javascript
复制
q.bind_to(exchange=exchange, routing_key='foo.#')
q.bind_to(exchange=exchange, routing_key='bar.#')

...but每次我试着得到:

代码语言:javascript
复制
kombu.exceptions.NotBoundError: Can't call method on Queue not bound to a channel

...which,我想我觉得她很理智。然而,我看不到任何地方,在混音的接口,我可以很容易地钩到队列,一旦他们被绑定到通道。下面是基本(工作)代码:

代码语言:javascript
复制
from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin


class Worker(ConsumerMixin):
    exchange = Exchange('experiment', type='topic')
    q = Queue(exchange=exchange, routing_key='foo.#', exclusive=True)

    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=[self.q], callbacks=[self.on_task])]

    def on_task(self, body, message):
        print body
        message.ack()


if __name__ == '__main__':
    with Connection('amqp://guest:guest@localhost:5672//') as conn:
        worker = Worker(conn)
        worker.run()

...which可以工作,但只给我foo消息。除了为我感兴趣的每个路由密钥创建一个新队列并将它们全部传递给使用者之外,还有一个干净的方法吗?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-05-13 16:14:47

在挖掘了一点之后,我找到了一种方法来完成这个任务,这与我的第一个想法相当接近。与其向队列传递routing_key字符串,不如传递一个bindings列表。列表中的每个元素都是指定交换和路由键的binding对象的实例。

一个例子胜过千言万语:

代码语言:javascript
复制
from kombu import Exchange, Queue, binding

exchange = Exchange('experiment', type='topic')
q = Queue(exchange=exchange, bindings=[
    binding(exchange, routing_key='foo.#'),
    binding(exchange, routing_key='bar.#')
], exclusive=True)

而且效果很好!

票数 10
EN

Stack Overflow用户

发布于 2022-05-24 08:52:33

下面是smitelli对答案的一个小调整。当bindings参数用于定义绑定时,exchange参数将被忽略。

调整后的例子:

代码语言:javascript
复制
from kombu import Exchange, Queue, binding

exchange = Exchange('experiment', type='topic')
q = Queue(bindings=[
    binding(exchange, routing_key='foo.#'),
    binding(exchange, routing_key='bar.#'),
])

exchange参数在队列init期间被丢弃:

代码语言:javascript
复制
if self.bindings:
    self.exchange = None
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37214404

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档