我希望限制我的ZeroMQ消息队列在Python应用程序中消耗的内存量。我知道设置高水标记会限制发送方排队的数量,但是有办法控制接收方排队的数量吗?Python绑定似乎将其设置为无限。
我的测试场景:我有两个用于测试的python终端。一个是接受者:
Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04)
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import zmq
>>> context = zmq.Context()
>>> socket = context.socket(zmq.PULL)
>>> socket.setsockopt(zmq.RCVBUF, 256)
>>> socket.bind("tcp://127.0.0.1:12345")另一个是发送者:
Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04)
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import zmq
>>> context=zmq.Context()
>>> socket = context.socket(zmq.PUSH)
>>> socket.setsockopt(zmq.SNDBUF, 2048)
>>> socket.setsockopt(zmq.HWM, 1)
>>> socket.connect("tcp://127.0.0.1:12345")
>>> num = 0
>>> while True:
... print num
... socket.send(str(num))
... num = num + 1
... 我在接收端运行了几次socket.recv(),以确保队列正常工作,但除此之外,让两个终端坐在那里。send循环似乎从不阻塞,而接收提示符似乎具有越来越大的内存占用空间。
发布于 2012-02-22 16:45:31
与ZeroMQ文档相矛盾的是,需要在PUSH侧和PULL侧设置高水标。一旦我改变了PULL,效果就更好了。新的PULL代码是:
Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04)
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import zmq
>>> context=zmq.Context()
>>> socket = context.socket(zmq.PULL)
>>> socket.setsockopt(zmq.RCVBUF, 256)
>>> socket.setsockopt(zmq.HWM, 1)
>>> socket.bind("tcp://127.0.0.1:12345")发布于 2018-09-16 11:33:26
通过zmq.SNDBUF和zmq.RCVBUF选项,您可以使用set a limit on buffer size。
此外,我在接收方使用zmq.CONFLATE选项将ZeroMQ队列大小限制为一个:
下面是ZMQ PUSH/PULL的一个示例
发件人(zmq.PUSH):
def create_pub_socket(ip, port):
try:
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.setsockopt(zmq.SNDHWM, 1)
zmq_address = "tcp://{}:{}".format(ip, port)
socket.connect(zmq_address)
return socket
except zmq.ZMQError as exp:
print(exp)
return False
sock = create_push_socket('127.0.0.1', 5558)
if sock:
sock.send_json({'a': 1})Getter (zmq.PULL):
def listen(self):
sock = None
try:
context = zmq.Context()
sock = context.socket(zmq.PULL)
sock.setsockopt(zmq.RCVHWM, 1)
sock.setsockopt(zmq.CONFLATE, 1) # last msg only.
sock.bind("tcp://*:5558")
except zmq.ZMQError:
logger.captureException()
configs = None
while configs is None:
if sock:
configs = sock.recv_json()
time.sleep(1e-1)
else:
time.sleep(5)
listen() # Recursive.
listen()发布于 2015-03-25 17:19:02
实际上,文件上说:
“当ZMQ_PUSH套接字由于达到所有下游节点的高水位标志而进入异常状态时,或者如果根本没有下游节点,则套接字上的任何zmq_send(3)操作将被阻塞,直到异常状态结束或至少一个下游节点可用发送为止;消息不会被丢弃。”
http://api.zeromq.org/2-1:zmq-socket
其中直接声明您可以(而且应该)为下游节点(也称为“拉”)设置高水位标记,并且可能意味着将其设置在推送侧不会产生任何效果(尽管我怀疑这不是真的,因为仍然存在下游节点可用的情况,但消息传入的速度比发送的速度快)。
https://stackoverflow.com/questions/9385249
复制相似问题