首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >zmq流水线模式不能在多进程中工作?

zmq流水线模式不能在多进程中工作?
EN

Stack Overflow用户
提问于 2015-12-09 13:18:35
回答 1查看 318关注 0票数 0

我认为我没有在正确的模式中使用zmq,我想做的是:

  1. zmq在多进程中发送消息
  2. 在多个客户端中接受消息,但一条消息应该只接受一次

根据第二个需求,我认为pipeline应该是安全的( PUSH/PULL ),但是这种模式不能在多进程中工作:

代码语言:javascript
复制
def foo(i):
    return i

def producer():
    context = zmq.Context()
    zmq_socket = context.socket(zmq.PUSH)
    zmq_socket.bind("tcp://127.0.0.1:5559")

    with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
        futs = [executor.submit(foo, i) for i in range(10)]
        for fut in concurrent.futures.as_completed(futs):
            work_message = { 'num' : fut.result() }
            zmq_socket.send("test")

producer()

所以,也许我应该使用PUB/SUB模式,但这不能满足第二个需求。

事实上我想要的是这样的东西:

代码语言:javascript
复制
PUSH|-----|                  | PULL
PUSH|-----|                  | PULL
PUSH|-----|----- DEVICE -----| PULL
PUSH|-----|                  | PULL
PUSH|-----|                  | PULL
EN

回答 1

Stack Overflow用户

发布于 2015-12-09 13:39:56

不完全是,rogerZeroMQ ZeroMQ模式有效

它只是做了一些你不想做的事情。

ZeroMQ是一个非常棒的预焙行为工具箱,它有巨大的潜力来组装更复杂的行为模型,在需要的地方使用

从理解原始参与者开始,而不是设计您的功能需求。

PUSH/PULL正式通信场景有两个参与者:

第一:PUSH选择电话和电话。是谁?连通的PULL-side.一旦PUSH决定从语音邮件中挑选语音邮件,它就会留下一条语音邮件,供PULL监听。

第二:PULL侧(在某个时间点,不一定就在1上)。听到铃声响了,然后拿起电话。

3:如果接到指示,PULL将处理从PUSH收到的消息

仅此而已,仅此而已。

ZeroMQ原语组件的程序集:

是的,有一条路可以朝着你的目标迈进:

只需通过适当的ZeroMQ原语相互连接来补充您的功能需求,以满足您的额外需求。

最琐碎的-一个,从“连接的代码,卷I”第1章-一个基于循环的消息转发到一个池-“工人”-processes。

更专业化的程序集可能会为智能分布式复杂行为模型创建额外的功能:

通常,“控制+信令”SIG_PLANE是与主功能处理并行实现的。

自诊断服务、用于不间断处理场景的心搏/自愈信号、远程无阻塞日志记录服务、前端MVC/GUI平面是最典型的分层设计目标。

你的任务?

如果对更多特性不感兴趣,只需.connect()您的未来计算PUSH-ers到PULL-ing端点即可。

这个中间步骤(性能/失败分析奇点)可以在它的PULL-ing入口端和右侧进行收集,在它的另一个端点上收集PUSH,在这里--在循环的基础上,向实际工作人员池(他们是人群--用他们的.connect()-ed PULL-er入口端点等待来自“接收器”-collector实体的传入任务)--一个任务,从所收集的FIFO (小心缓冲能力和性能开销),只需进入"next" one,就行了。

代码语言:javascript
复制
________________________________                                                      _____________________________
process(0)|....[PUSH].connect(A)|-         ________________________________         -|.connect(B)[PULL] process(-1)
process(1)|....[PUSH].connect(A)|--       |                                |       --|.connect(B)[PULL] process(-2)
process(2)|....[PUSH].connect(A)|---   ---.bind(A)[PULL]:NOP:[PUSH].bind(B)|---   ---|.connect(B)[PULL] process(-3)
process(3)|....[PUSH].connect(A)|--       |________________________________|       --|.connect(B)[PULL] process(-4)
...                                                                                                     ...
process(n)|....[PUSH].connect(A)|-                                                  -|.connect(B)[PULL] process(-m)

就这么简单。

享受凉爽的ZeroMQ工具包。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/34179834

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档