我看了一下各种ZMQ消息传递模式,我不确定哪种模式可以用于我的项目。我所要做的就是能够连接到服务器并发送命令(客户端永远不会收到任何信息)。在服务器端,我希望能够检查是否有消息,如果有消息,处理它,否则继续执行其他内容而不阻塞。这样,即使没有客户端连接,服务器也可以继续工作。
#client.py
while(True):
select = raw_input()
if select == "1":
socket.send(msg1)
elif select == "2":
socket.send(msg2)
...
#server.py
while(True):
msg = socket.recv() #should not block
if msg == ...
#do stuff
#do other stuff那么,我应该在ZMQ中使用哪种模式来做到这一点呢?示例代码将不胜感激。
发布于 2014-02-22 08:24:46
首先,由于您希望使用一个接收消息的套接字进行单向通信,这通常意味着推拉。下面是客户端的一个版本:
import zmq
ctx = zmq.Context.instance()
s = ctx.socket(zmq.PUSH)
url = 'tcp://127.0.0.1:5555'
s.connect(url)
while True:
msg = raw_input("msg > ")
s.send(msg)
if msg == 'quit':
break因此,推送套接字发送我们从raw_input获得的消息。应该清楚的是,如何更改该逻辑以生成所需的消息。有一点好处是,如果你输入“退出”,客户端和服务器都会退出。
根据应用程序的复杂性,有多种方法可以完成非阻塞服务器。我将展示几个示例,从最基本的示例到最强大/可扩展的示例。
所有这些服务器示例都假设在顶部设置服务器的拉套接字:
import time
import zmq
ctx = zmq.Context.instance()
s = ctx.socket(zmq.PULL)
url = 'tcp://127.0.0.1:5555'
s.bind(url)第一个例子是简单的非阻塞的recv,如果没有可以接收到的消息,它将引发zmq.Again异常:
# server0.py
while True:
try:
msg = s.recv(zmq.NOBLOCK) # note NOBLOCK here
except zmq.Again:
# no message to recv, do other things
time.sleep(1)
else:
print("received %r" % msg)
if msg == 'quit':
break但这种模式很难超越非常简单的情况。第二个示例使用Poller检查套接字上的事件:
# server1.py
poller = zmq.Poller()
poller.register(s)
while True:
events = dict(poller.poll(0))
if s in events:
msg = s.recv()
print("received %r" % msg)
if msg == 'quit':
break
else:
# no message to recv, do other things
time.sleep(1)在这个玩具例子中,这和第一个非常相似。但是,与第一个不同的是,通过对poller.register的进一步调用或向poller.poll传递除零以外的超时,很容易扩展到许多套接字或事件。
最后一个示例使用eventloop循环,并实际为消息到达时注册回调。您可以使用这种模式构建非常复杂的应用程序,这是一种非常简单的方法,只在有工作需要完成时才编写代码。
# server2.py
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
def print_msg(msg):
print("received %r" % ' '.join(msg))
if msg[0] == 'quit':
ioloop.IOLoop.instance().stop()
# register the print_msg callback to be fired
# whenever there is a message on our socket
stream = ZMQStream(s)
stream.on_recv(print_msg)
# do other things in the meantime
tic = time.time()
def do_other_things():
print("%.3f" % (time.time() - tic))
pc = ioloop.PeriodicCallback(do_other_things, 1000)
pc.start()
# start the eventloop
ioloop.IOLoop.instance().start()因此,这是不阻塞地处理zmq消息的几种基本方法。您可以获取这些示例,合二为一。
https://stackoverflow.com/questions/21945864
复制相似问题