为什么我不能把process放在Pool中,变成Queue
在这里,我的代码在使用Pool时工作,可以获得Test实例属性。
from multiprocessing import Pool
from multiprocessing import Queue
class Test(object):
def __init__(self, num):
self.num = num
if __name__ == '__main__':
p = Pool()
procs = []
for i in range(5):
proc = p.apply_async(Test, args=(i,))
procs.append(proc)
p.close()
for each in procs:
test = each.get(10)
print(test.num)
p.join()当我尝试使用Queue而不是python list来存储进程时,这是行不通的。
我的代码:
from multiprocessing import Pool
from multiprocessing import Queue
class Test(object):
def __init__(self, num):
self.num = num
if __name__ == '__main__':
p = Pool()
q = Queue()
for i in range(5):
proc = p.apply_async(Test, args=(i,))
q.put(proc)
p.close()
while not q.empty():
q.get()
p.join()错误信息:
Traceback (most recent call last):
File "C:\Users\laich\AppData\Local\Programs\Python\Python36-
32\lib\multiprocessing\queues.py", line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File "C:\Users\laich\AppData\Local\Programs\Python\Python36-
32\lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects我去看多处理文档:
class multiprocessing.Queue([maxsize])返回一个使用管道和几个锁/信号量实现的进程共享队列。当进程第一次将项放到队列中时,就会启动一个馈线线程,该线程将对象从缓冲区传输到管道中。 标准库的队列模块中通常出现的queue.Empty和queue.Full异常会引发超时信号。 Queue实现了queue.Queue的所有方法,但task_done()和join()除外。
这里写着“放置一个项目”,这个项目不可能是任何东西(python对象)?在我的例子中,我尝试将process中的Pool()放入Queue中。
发布于 2018-12-24 10:55:12
Queue-based代码至少有两个问题。Pool.apply_async方法返回AsyncResult对象,而不是进程。您可以对此对象调用get以获得相应进程的结果。考虑到这一点,让我们看看您的代码:
proc = p.apply_async(Test, args=(i,)) # Returns an AsyncResult object
q.put(proc) # won't work在你的情况下,第二行总是失败的。您放入队列中的任何内容都必须是可选择的,因为multiprocess.Queue使用序列化。这还不是很好的文档,而且在Python的问题跟踪器中有一个公开发行来更新文档。问题是AsyncResult是不可挑选的。你可以自己试试:
import pickle
import multiprocessing as mp
with mp.Pool() as p:
result = p.apply_async(lambda x: x, (1,))
pickle.dumps(result) # ErrorAsyncResult内部包含一些锁对象,它们是不可序列化的。让我们转到下一个问题:
while not q.empty():
q.get()如果我没有错,请在上面的代码中调用AsyncResult.get而不是Queue.get。在这种情况下,您必须首先从队列中获取对象,然后调用对象上的相应方法。但是,在您的代码中,情况并非如此,因为AsyncResult是不可序列化的。
发布于 2018-12-24 11:12:21
作为@Mehdi 解释,不能对AsyncResult对象进行腌制,这是multiprocessing.Queue所需要的。但是,这里不需要队列,因为队列不在进程之间共享。这意味着您可以只使用常规的Queue。
from multiprocessing import Pool
#from multiprocessing import Queue
from queue import Queue
class Test(object):
def __init__(self, num):
self.num = num
print('Test({!r}) created'.format(num))
if __name__ == '__main__':
p = Pool()
q = Queue()
for i in range(5):
proc = p.apply_async(Test, args=(i,))
q.put(proc)
p.close()
while not q.empty():
q.get()
p.join()
print('done')输出:
Test(0)
Test(1)
Test(2)
Test(3)
Test(4)
donehttps://stackoverflow.com/questions/53910898
复制相似问题