首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >multiprocessing.Queue和queue.Queue的实现

multiprocessing.Queue和queue.Queue的实现
EN

Stack Overflow用户
提问于 2017-07-17 15:38:04
回答 1查看 3.5K关注 0票数 19

我正在寻找更多关于Python中队列实现的见解,而不是我在文档中所能找到的。

据我所知,如果我错了,请原谅我的无知:

queue.Queue():是通过内存中的基本数组实现的,因此不能在多个进程之间共享,而是可以在线程之间共享。到现在为止还好。

multiprocessing.Queue():是通过具有大小限制的管道(man 2 pipes)实现的(很小:在Linux上,man 7 pipe说65536没有调整):

由于Linux2.6.35,默认管道容量为65536字节,但是可以使用fcntl(2) F_GETPIPE_SZF_SETPIPE_SZ操作查询和设置容量

但是,在Python中,每当我试图将大于65536字节的数据写入管道时,它毫无例外地工作--我可以以这种方式淹没我的内存:

代码语言:javascript
复制
import multiprocessing
from time import sleep

def big():
    result = ""
    for i in range(1,70000):
        result += ","+str(i)
    return result # 408888 bytes string

def writequeue(q):
    while True:
        q.put(big())
        sleep(0.1)

if __name__ == '__main__':
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=writequeue, args=(q,))
    p.start()
    while True:
        sleep(1) # No pipe consumption, we just want to flood the pipe

以下是我的问题:

  • Python是否调整了管道限制?如果是,多少钱?欢迎Python源代码。
  • Python管道通信是否可与其他非Python进程互操作?如果是,欢迎工作实例(最好是JS)和资源链接。
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-07-19 22:52:09

为什么q.put()没有阻塞??

如果管道已经满了,mutiprocessing.Queue会创建一个阻塞的管道。当然,超过管道容量的写入将导致write调用阻塞,直到读取端清除足够的数据为止。好的,如果管道在达到容量时阻塞,为什么q.put()在管道满了之后也不阻塞?即使是示例中对q.put()的第一次调用也应该填满管道,并且所有的东西都应该阻塞在那里,不是吗?

不,它不阻塞,因为multiprocessing.Queue 实现将 .put() 方法从写入管道中分离出来。 .put()方法将传递给它的数据排在内部缓冲区中,并且有一个单独的线程负责从该缓冲区读取数据并写入管道。当管道已满时,此线程将阻塞,但它不会阻止.put()将更多数据排队到内部缓冲区。

.put()的实现将数据保存到self._buffer,并注意如果没有一个线程正在运行,它是如何启动线程的:

代码语言:javascript
复制
def put(self, obj, block=True, timeout=None):
    assert not self._closed
    if not self._sem.acquire(block, timeout):
        raise Full

    with self._notempty:
        if self._thread is None:
            self._start_thread()
        self._buffer.append(obj)
        self._notempty.notify()

._feed()方法是从self._buffer读取数据并将数据输入管道的方法。._start_thread()是用来设置运行._feed()的线程的。

如何限制队列大小?

如果您想限制将多少数据写入队列中,我看不出有一种方法可以通过指定字节数来实现,但是可以通过将一个数字传递给multiprocessing.Queue来限制存储在内部缓冲区中的项的数量。

代码语言:javascript
复制
q = multiprocessing.Queue(2)

当我使用上述参数并使用您的代码时,q.put()将对两个项进行排队,并在第三次尝试时阻塞。

Python管道通信是否可与其他非Python进程互操作?

那得看情况。multiprocessing模块提供的工具很难与其他语言进行互操作。我希望multiprocessing能够与其他语言进行互操作,但实现这一目标将是一项重要的事业。编写该模块时期望所涉及的进程正在运行Python代码。

如果你看一下更一般的方法,那么答案是肯定的。您可以使用套接字作为两个不同进程之间的通信管道。例如,从命名套接字读取的JavaScript进程:

代码语言:javascript
复制
var net = require("net");
var fs = require("fs");

sockPath = "/tmp/test.sock"
try {
    fs.unlinkSync(sockPath);
}
catch (ex) {
    // Don't care if the path does not exist, but rethrow if we get
    // another error.
    if (ex.code !== "ENOENT") {
        throw ex;
    }
}

var server = net.createServer(function(stream) {
  stream.on("data", function(c) {
    console.log("received:", c.toString());
  });

  stream.on("end", function() {
    server.close();
  });
});

server.listen(sockPath);

和写到它的Python进程:

代码语言:javascript
复制
import socket
import time

sockfile = "/tmp/test.sock"

conn = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
conn.connect(sockfile)

count = 0
while True:
    count += 1
    conn.sendall(bytes(str(count), "utf-8"))
    time.sleep(1)

如果您想要尝试上面的内容,您需要首先启动JavaScript端,以便Python有一些要写入的内容。这是一个概念的证明。一个完整的解决方案需要更多的润色。

为了将复杂的结构从Python传递到其他语言,您必须找到一种方法,以一种可以在两边读取的格式序列化您的数据。不幸的是,Pickles是Python特有的。每当我需要在语言之间进行序列化时,我通常都会选择JSON,如果JSON不愿意使用即席格式,我就会选择JSON。

票数 20
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45148271

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档