首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >ZMQ设备、ioloop和多处理

ZMQ设备、ioloop和多处理
EN

Stack Overflow用户
提问于 2015-06-05 09:24:40
回答 1查看 536关注 0票数 0

我试图应用推/拉模式,如下图所示:

代码语言:javascript
复制
                            | PULL  ---> Send via HTTP
                            | PULL  ---> Send via HTTP
---- PUSH ----- DEVICE ---- | PULL  ---> Send via HTTP
                            | PULL  ---> Send via HTTP
                            | PULL  ---> Send via HTTP

PUSH套接字连接到ZeroMQ设备并发出消息,然后将消息传播到所有连接的PULL套接字。我想要实现的是一种对流水线中多个节点的并行处理。当PULL套接字完成处理后,它应该通过HTTP将消息转发到远程端点。

以下是代码:

代码语言:javascript
复制
from multiprocessing import Process
import random
import time
import zmq
from zmq.devices import ProcessDevice

from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream

ioloop.install()


bind_in_port = 5559
bind_out_port = 5560

dev = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)
dev.bind_in("tcp://127.0.0.1:%d" % bind_in_port)
dev.bind_out("tcp://127.0.0.1:%d" % bind_out_port)
dev.setsockopt_in(zmq.IDENTITY, b'PULL')
dev.setsockopt_out(zmq.IDENTITY, b'PUSH')
dev.start()
time.sleep(2)


def push():
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.connect("tcp://127.0.0.1:%s" % bind_in_port)
    server_id = random.randrange(1,10005)
    for i in range(5):
        print("Message %d sent" % i)
        socket.send_string("Push from %s" % server_id)


def pull():
    context = zmq.Context()
    socket = context.socket(zmq.PULL)
    socket.connect("tcp://127.0.0.1:%s" % bind_out_port)
    loop = ioloop.IOLoop.instance()

    pull_stream = ZMQStream(socket, loop)

    def on_recv(message):
        print(message)
    pull_stream.on_recv(on_recv)

    loop.start()

Process(target=push).start()

time.sleep(2)

for i in range(2):
    Process(target=pull).start()

虽然消息被正确地发送到ZeroMQ设备,但我看不到收到任何消息-- on_recv回调从未被调用。

任何帮助都是非常感谢的。

谢谢

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2015-07-15 18:17:00

设备init中缺少上面的代码来提供完整的答案。什么是dev和*port ?1可以是在.connect之后添加睡眠(1),以便端口稳定下来:无需在push/pull上设置标识。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/30662920

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档