首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在使用multiprocessing.Queue时如何理解multiprocessing.Pool?

在使用multiprocessing.Queue时如何理解multiprocessing.Pool?
EN

Stack Overflow用户
提问于 2018-12-24 08:18:52
回答 2查看 767关注 0票数 0

为什么我不能把process放在Pool中,变成Queue

在这里,我的代码在使用Pool时工作,可以获得Test实例属性。

代码语言:javascript
复制
from multiprocessing import Pool
from multiprocessing import Queue


class Test(object):
    def __init__(self, num):
        self.num = num


if __name__ == '__main__':
    p = Pool()
    procs = []
    for i in range(5):
        proc = p.apply_async(Test, args=(i,))
        procs.append(proc)
    p.close()
    for each in procs:
        test = each.get(10)
        print(test.num)
    p.join()

当我尝试使用Queue而不是python list来存储进程时,这是行不通的。

我的代码:

代码语言:javascript
复制
from multiprocessing import Pool
from multiprocessing import Queue


class Test(object):
    def __init__(self, num):
        self.num = num


if __name__ == '__main__':
    p = Pool()
    q = Queue()
    for i in range(5):
        proc = p.apply_async(Test, args=(i,))
        q.put(proc)
    p.close()
    while not q.empty():
        q.get()
    p.join()

错误信息:

代码语言:javascript
复制
Traceback (most recent call last):
  File "C:\Users\laich\AppData\Local\Programs\Python\Python36- 
32\lib\multiprocessing\queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "C:\Users\laich\AppData\Local\Programs\Python\Python36- 
32\lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects

我去看多处理文档:

class multiprocessing.Queue([maxsize])返回一个使用管道和几个锁/信号量实现的进程共享队列。当进程第一次将项放到队列中时,就会启动一个馈线线程,该线程将对象从缓冲区传输到管道中。 标准库的队列模块中通常出现的queue.Emptyqueue.Full异常会引发超时信号。 Queue实现了queue.Queue的所有方法,但task_done()join()除外。

这里写着“放置一个项目”,这个项目不可能是任何东西(python对象)?在我的例子中,我尝试将process中的Pool()放入Queue中。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-12-24 10:55:12

Queue-based代码至少有两个问题。Pool.apply_async方法返回AsyncResult对象,而不是进程。您可以对此对象调用get以获得相应进程的结果。考虑到这一点,让我们看看您的代码:

代码语言:javascript
复制
proc = p.apply_async(Test, args=(i,)) # Returns an AsyncResult object
q.put(proc) # won't work

在你的情况下,第二行总是失败的。您放入队列中的任何内容都必须是可选择的,因为multiprocess.Queue使用序列化。这还不是很好的文档,而且在Python的问题跟踪器中有一个公开发行来更新文档。问题是AsyncResult是不可挑选的。你可以自己试试:

代码语言:javascript
复制
import pickle
import multiprocessing as mp

with mp.Pool() as p:
    result = p.apply_async(lambda x: x, (1,))

pickle.dumps(result) # Error

AsyncResult内部包含一些锁对象,它们是不可序列化的。让我们转到下一个问题:

代码语言:javascript
复制
while not q.empty():
    q.get()

如果我没有错,请在上面的代码中调用AsyncResult.get而不是Queue.get。在这种情况下,您必须首先从队列中获取对象,然后调用对象上的相应方法。但是,在您的代码中,情况并非如此,因为AsyncResult是不可序列化的。

票数 0
EN

Stack Overflow用户

发布于 2018-12-24 11:12:21

作为@Mehdi 解释,不能对AsyncResult对象进行腌制,这是multiprocessing.Queue所需要的。但是,这里不需要队列,因为队列不在进程之间共享。这意味着您可以只使用常规的Queue

代码语言:javascript
复制
from multiprocessing import Pool
#from multiprocessing import Queue
from queue import Queue


class Test(object):
    def __init__(self, num):
        self.num = num
        print('Test({!r}) created'.format(num))


if __name__ == '__main__':
    p = Pool()
    q = Queue()
    for i in range(5):
        proc = p.apply_async(Test, args=(i,))
        q.put(proc)
    p.close()
    while not q.empty():
        q.get()
    p.join()

    print('done')

输出:

代码语言:javascript
复制
Test(0)
Test(1)
Test(2)
Test(3)
Test(4)
done
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53910898

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档