我正在寻找更多关于Python中队列实现的见解,而不是我在文档中所能找到的。
据我所知,如果我错了,请原谅我的无知:
queue.Queue():是通过内存中的基本数组实现的,因此不能在多个进程之间共享,而是可以在线程之间共享。到现在为止还好。
multiprocessing.Queue():是通过具有大小限制的管道(man 2 pipes)实现的(很小:在Linux上,man 7 pipe说65536没有调整):
由于Linux2.6.35,默认管道容量为65536字节,但是可以使用
fcntl(2)F_GETPIPE_SZ和F_SETPIPE_SZ操作查询和设置容量
但是,在Python中,每当我试图将大于65536字节的数据写入管道时,它毫无例外地工作--我可以以这种方式淹没我的内存:
import multiprocessing
from time import sleep
def big():
result = ""
for i in range(1,70000):
result += ","+str(i)
return result # 408888 bytes string
def writequeue(q):
while True:
q.put(big())
sleep(0.1)
if __name__ == '__main__':
q = multiprocessing.Queue()
p = multiprocessing.Process(target=writequeue, args=(q,))
p.start()
while True:
sleep(1) # No pipe consumption, we just want to flood the pipe以下是我的问题:
发布于 2017-07-19 22:52:09
为什么q.put()没有阻塞??
如果管道已经满了,mutiprocessing.Queue会创建一个阻塞的管道。当然,超过管道容量的写入将导致write调用阻塞,直到读取端清除足够的数据为止。好的,如果管道在达到容量时阻塞,为什么q.put()在管道满了之后也不阻塞?即使是示例中对q.put()的第一次调用也应该填满管道,并且所有的东西都应该阻塞在那里,不是吗?
不,它不阻塞,因为multiprocessing.Queue 实现将 .put() 方法从写入管道中分离出来。 .put()方法将传递给它的数据排在内部缓冲区中,并且有一个单独的线程负责从该缓冲区读取数据并写入管道。当管道已满时,此线程将阻塞,但它不会阻止.put()将更多数据排队到内部缓冲区。
.put()的实现将数据保存到self._buffer,并注意如果没有一个线程正在运行,它是如何启动线程的:
def put(self, obj, block=True, timeout=None):
assert not self._closed
if not self._sem.acquire(block, timeout):
raise Full
with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()._feed()方法是从self._buffer读取数据并将数据输入管道的方法。._start_thread()是用来设置运行._feed()的线程的。
如何限制队列大小?
如果您想限制将多少数据写入队列中,我看不出有一种方法可以通过指定字节数来实现,但是可以通过将一个数字传递给multiprocessing.Queue来限制存储在内部缓冲区中的项的数量。
q = multiprocessing.Queue(2)当我使用上述参数并使用您的代码时,q.put()将对两个项进行排队,并在第三次尝试时阻塞。
Python管道通信是否可与其他非Python进程互操作?
那得看情况。multiprocessing模块提供的工具很难与其他语言进行互操作。我希望multiprocessing能够与其他语言进行互操作,但实现这一目标将是一项重要的事业。编写该模块时期望所涉及的进程正在运行Python代码。
如果你看一下更一般的方法,那么答案是肯定的。您可以使用套接字作为两个不同进程之间的通信管道。例如,从命名套接字读取的JavaScript进程:
var net = require("net");
var fs = require("fs");
sockPath = "/tmp/test.sock"
try {
fs.unlinkSync(sockPath);
}
catch (ex) {
// Don't care if the path does not exist, but rethrow if we get
// another error.
if (ex.code !== "ENOENT") {
throw ex;
}
}
var server = net.createServer(function(stream) {
stream.on("data", function(c) {
console.log("received:", c.toString());
});
stream.on("end", function() {
server.close();
});
});
server.listen(sockPath);和写到它的Python进程:
import socket
import time
sockfile = "/tmp/test.sock"
conn = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
conn.connect(sockfile)
count = 0
while True:
count += 1
conn.sendall(bytes(str(count), "utf-8"))
time.sleep(1)如果您想要尝试上面的内容,您需要首先启动JavaScript端,以便Python有一些要写入的内容。这是一个概念的证明。一个完整的解决方案需要更多的润色。
为了将复杂的结构从Python传递到其他语言,您必须找到一种方法,以一种可以在两边读取的格式序列化您的数据。不幸的是,Pickles是Python特有的。每当我需要在语言之间进行序列化时,我通常都会选择JSON,如果JSON不愿意使用即席格式,我就会选择JSON。
https://stackoverflow.com/questions/45148271
复制相似问题