首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >作为节俭服务的Kombu消费者

作为节俭服务的Kombu消费者
EN

Stack Overflow用户
提问于 2014-06-21 07:00:20
回答 1查看 175关注 0票数 1

我需要建立一个Kombu消费者,可以通过编程控制。我看到的所有示例都是告诉您使用ctrl-c来停止程序的微不足道的示例。

我的主要应用程序是作为Twisted Thrift服务运行的,我想我可以以某种方式使用Twisted reactor来处理我的使用者内部的eventloop,但我不知道如何处理。

这是我的consumer类。start_consuming()部分很好,除了它是阻塞的,并且我不能从外部调用stop_consuming()。

代码语言:javascript
复制
from kombu import BrokerConnection, Exchange, eventloop, Queue, Consumer


class DMS():
    __routing_key = None
    __is_consuming = None
    __message_counter = 0

    def __init__(self, routing_key):
        print 'server: __init__()'
        self.__routing_key = routing_key

    def __handle_message(self, body, message):
        self.__message_counter += 1

        # Print count every 10,000 messsages.
        if (self.__message_counter % 10000) == 0:
            print self.__message_counter

    def start_consuming(self):
        print 'server: start_consuming()'
        self.__is_consuming = True
        exchange = Exchange('raven-exchange', type='topic', durable=False)
        queue = Queue(self.__routing_key, exchange, routing_key=self.__routing_key)

        with BrokerConnection('amqp://guest:guest@10.1.1.121:5672//') as connection:
            with Consumer(connection, queue, callbacks=[self.__handle_message]) as consumer:
                for _ in eventloop(connection):

                    if self.__is_consuming:
                        pass
                    else:
                        break

                consumer.cancel()
            connection.close()

    def stop_consuming(self):
        print 'server: stop_consuming()'
        self.__is_consuming = False
EN

回答 1

Stack Overflow用户

发布于 2014-06-21 09:33:08

通过MQ系统路由Thrift服务调用的推荐方式是通过oneway调用,因为这是通过MQ和MessageBus系统进行通信的最自然的方式。

代码语言:javascript
复制
struct Foo {
  1: string whoa
  2: i32 counter
}

service Whatever {
    oneway void FooBar(1: Foo someData, 2:i32 moreData)
}

oneway调用是Thrift RPC调用的一种特殊形式:顾名思义,调用只有一个方向。返回值和异常(实际上是返回值)都不能与oneway一起使用。该调用只发送输入参数,不等待任何值返回。

为了建立bi-di通信,客户端需要实现类似的服务,该服务被设计为接收传入的应答消息。在Thrift /contrib folder中有一些示例,包括0MQ、Rebus和Stomp。虽然它们没有专门处理Python,但主要思想应该变得清晰起来。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/24336839

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档