我正在尝试使用Python中的ZeroMQ将订阅者的确认返回给发行者。
我尝试了几个使用zmq.PUSH和zmq.PULL代码序列的代码示例,但都没有效果。
我的代码是:
PUB_SERVER.PY
import zmq
import random
import sys
import time
port = "5556"
if len( sys.argv ) > 1:
port = sys.argv[1]
int( port )
context = zmq.Context()
socket = context.socket( zmq.PUB )
socket.bind( "tcp://*:%s" % port )
while True:
# topic = random.randrange( 9999, 10005 )
topic = 10000
messagedata = random.randrange( 1, 215 ) - 80
print "%d %d" % ( topic, messagedata )
socket.send( "%d %d" % ( topic, messagedata ) )
time.sleep( 1 )SUB_CLIENT.PY
import sys
import zmq
port = "5556"
if len( sys.argv ) > 1:
port = sys.argv[1]
int( port )
if len( sys.argv ) > 2:
port1 = sys.argv[2]
int( port1 )
# Socket to talk to server
context = zmq.Context()
socket = context.socket( zmq.SUB )
print "Collecting updates from weather server..."
socket.connect( "tcp://192.168.0.21:%s" % port )
if len( sys.argv ) > 2:
socket.connect( "tcp://192.168.0.21:%s" % port1 )
# Subscribe to zipcode, default is NYC, 10001
topicfilter = "10000"
socket.setsockopt( zmq.SUBSCRIBE, topicfilter )
# Process 5 updates
total_value = 0
for update_nbr in range( 5 ):
string = socket.recv()
topic, messagedata = string.split()
total_value += int( messagedata )
print topic, messagedata
print "Average messagedata value for topic '%s' was %dF" % ( topicfilter, total_value / update_nbr )该代码提供了服务器在一个SSH窗口中的输出(在Parallella上),以及在另一个SSH窗口(在RaspberryPi上)接收到的过滤后的客户机消息,这两个窗口运行得很好。
我迷路的地方是,一旦客户端从服务器获得了经过过滤的消息,将如何确认接收到过滤消息的,然后让服务器记录这些已确认的消息?
最后,我想做一些明智的决策,将文件发送给确认的订阅者。
发布于 2014-12-07 13:16:18
如何承认?
可以创建用于该目的的软信令的并行消息传递结构。
使用PUB_SERVER.PY Rx访问点扩展.SUB:
anAckRxSOCKET = context.socket( zmq.SUB ) # create SUB side
anAckRxSOCKET.bind( "tcp://*:%s" % aServerAckPORT ) ) # .bind()
anAckRxSOCKET.setsockopt( zmq.SUBSCRIBE, "" ) # SUB to *-anything
# ...
anAckRxSTRING = anAckRxSOCKET.recv() # .recv()使用SUB_CLIENT.PY Tx套接字将.PUB扩展到服务器端访问点:
anAckTxSOCKET = context.socket( zmq.PUB ) # create PUB side(s)
anAckTxSOCKET.connect( "tcp://192.168.0.21:%s" % aServerAckPORT ) )并为您可能需要或需要的任何服务器端处理发送带有“a-代理ID”的ACK(s)。
anAckTxSOCKET.send( topicfilter ) # ACK with an "identity"-proxyhttps://stackoverflow.com/questions/27308768
复制相似问题