首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何向Pykka演员发送RabbitMQ消息?

如何向Pykka演员发送RabbitMQ消息?
EN

Stack Overflow用户
提问于 2015-02-08 17:52:15
回答 1查看 803关注 0票数 4

2015年8月更新:对于想要使用消息传递的人,我现在建议使用zeromq。可用于除比卡之外,或作为一种完全替代比卡。

如何侦听消息的RabbitMQ队列,然后将它们转发给Pykka中的参与者?

目前,当我尝试这样做时,我会有奇怪的行为,系统会停止运行。

下面是我的演员是如何实现的:

代码语言:javascript
复制
class EventListener(eventlet.EventletActor):
    def __init__(self, target):
        """
        :param pykka.ActorRef target: Where to send the queue messages.
        """
        super(EventListener, self).__init__()

        self.target = target

    def on_start(self):
        ApplicationService.listen_for_events(self.actor_ref)

下面是我在ApplicationService类中的方法,它应该检查队列中的新消息:

代码语言:javascript
复制
@classmethod
def listen_for_events(cls, actor):
    """
    Subscribe to messages and forward them to the given actor.
    """    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='test')
    def callback(ch, method, properties, body):
        message = pickle.loads(body)
        actor.tell(message)

    channel.basic_consume(callback, queue='test', no_ack=True)
    channel.start_consuming()            

start_consuming似乎正在无限期地阻塞。有没有办法让我自己定期“投票”这个队列?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2015-02-17 19:37:24

在我看来你所有的代码都是正确的。如果要检查每个参与者使用的队列,可以检查从Actor#start返回的参与者引用上可用的它们的Actor#start属性。

我在从EventletActor继承时遇到了类似的问题,所以为了测试,我使用EventletActorThreadingActor尝试了相同的代码。据我所知,从源代码中可以看出,他们都在使用eventlet进行工作。ThreadingActor对我来说很好,但是EventletActor不适用于ActorRef#tell,它确实适用于ActorRef#ask

我从同一个目录中的两个文件开始,如下所示。

my_actors.py:初始化两个参与者,它们将通过打印以类名开头的消息内容来响应消息。

代码语言:javascript
复制
from pykka.eventlet import EventletActor
import pykka


class MyThreadingActor(pykka.ThreadingActor):
    def __init__(self):
        super(MyThreadingActor, self).__init__()

    def on_receive(self, message):
        print(
            "MyThreadingActor Received: {message}".format(
                message=message)
        )


class MyEventletActor(EventletActor):
    def __init__(self):
        super(MyEventletActor, self).__init__()

    def on_receive(self, message):
        print(
            "MyEventletActor Received: {message}".format(
                message=message)
        )


my_threading_actor_ref = MyThreadingActor.start()
my_eventlet_actor_ref = MyEventletActor.start()

my_queue.py:在pika中设置一个队列,向队列发送一条消息,该消息被转发给前面设置的两个参与者。在每个参与者被告知消息后,将检查它们当前的参与者收件箱中的任何内容。

代码语言:javascript
复制
from my_actors import my_threading_actor_ref, my_eventlet_actor_ref
import pika


def on_message(channel, method_frame, header_frame, body):
    print "Received Message", body
    my_threading_actor_ref.tell({"msg": body})
    my_eventlet_actor_ref.tell({"msg": body})

    print "ThreadingActor Inbox", my_threading_actor_ref.actor_inbox
    print "EventletActor Inbox", my_eventlet_actor_ref.actor_inbox

    channel.basic_ack(delivery_tag=method_frame.delivery_tag)


queue_name = 'test'
connection = pika.BlockingConnection()

channel = connection.channel()
channel.queue_declare(queue=queue_name)
channel.basic_consume(on_message, queue_name)
channel.basic_publish(exchange='', routing_key=queue_name, body='A Message')

try:
    channel.start_consuming()
except KeyboardInterrupt:
    channel.stop_consuming()

    # It is very important to stop these actors, otherwise you may lockup
    my_threading_actor_ref.stop()
    my_eventlet_actor_ref.stop()
connection.close()

当我运行my_queue.py时,输出如下:

收到的消息 ThreadingActor收件箱<Queue.Queue instance at 0x10bf55878> MyThreadingActor接收:{'msg': 'A Message'} EventletActor收件箱<Queue maxsize=None queue=deque([{'msg': 'A Message'}]) tasks=1 _cond=<Event at 0x10bf53b50 result=NOT_USED _exc=None _waiters[0]>>

当我点击CTRL+C停止队列时,我注意到EventletActor最终接收到消息并打印出来:

接收^CMyEventletActor:{'msg': 'A Message'}

所有这些让我相信,EventletActor中可能有一个bug,我认为您的代码很好,并且存在一个bug,这是我在第一次检查时无法在代码中找到的。

我希望这些信息能有所帮助。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/28397269

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档