我一直试图使用concurrent.futures.ProcessPoolExecutor并行化一些代码,但是一直存在一些在ThreadPoolExecutor中没有出现的奇怪的死锁。一个最小的例子是:
from concurrent import futures
def test():
pass
with futures.ProcessPoolExecutor(4) as executor:
for i in range(100):
print('submitting {}'.format(i))
executor.submit(test)在python 3.2.2中(在64位Ubuntu上),在提交所有作业之后,这个问题似乎一直挂起--而且每当提交的作业数量大于工人数量时,就会出现这种情况。如果我用ProcessPoolExecutor替换ThreadPoolExecutor,它的工作原理是完美无缺的。
作为调查的尝试,我给了每个未来的回调,以打印i的值。
from concurrent import futures
def test():
pass
with futures.ProcessPoolExecutor(4) as executor:
for i in range(100):
print('submitting {}'.format(i))
future = executor.submit(test)
def callback(f):
print('callback {}'.format(i))
future.add_done_callback(callback)这更让我感到困惑--由callback打印出来的callback值是调用时的值,而不是定义时的值(所以我从未见过callback 0,但得到了大量的callback 99s)。同样,ThreadPoolExecutor输出期望值。
想知道这是否是一个bug,我尝试了一个最新的python开发版本。现在,代码至少看起来是终止的,但是我仍然得到了打印出来的i的错误值。
有谁能解释一下:
ProcessPoolExecutor在python3.2和当前的dev版本之间发生了什么,该版本显然修复了这个死锁,i的“错误”值编辑:正如jukiewicz所指出的,当然,在调用回调时,打印i会打印值,我不知道我在想什么……如果我传递一个以i的值作为其属性之一的可调用对象,它将按预期的方式工作。
编辑:更多的信息:所有的回调都被执行,所以看起来是executor.shutdown (由executor.__exit__调用)无法判断进程是否已经完成。在当前的python3.3中,这似乎是完全固定的,但是似乎对multiprocessing和concurrent.futures做了很多修改,所以我不知道是什么解决了这个问题。由于我不能使用3.3 (它似乎与numpy的发行版或dev版本都不兼容),所以我尝试将它的多处理和并发包复制到我的3.2安装中,这似乎很好。不过,据我所见,ProcessPoolExecutor在最新的发行版中完全崩溃了,但没有其他人受到影响,这似乎有点奇怪。
发布于 2012-03-04 08:34:53
我对代码进行了如下修改,解决了这两个问题。callback函数被定义为闭包,因此每次都使用i的更新值。至于死锁,这很可能是在所有任务完成之前关闭执行器的原因之一。等待未来的完成也解决了这个问题。
from concurrent import futures
def test(i):
return i
def callback(f):
print('callback {}'.format(f.result()))
with futures.ProcessPoolExecutor(4) as executor:
fs = []
for i in range(100):
print('submitting {}'.format(i))
future = executor.submit(test, i)
future.add_done_callback(callback)
fs.append(future)
for _ in futures.as_completed(fs): pass更新:哦,对不起,我还没看过你的更新,这似乎已经解决了。
https://stackoverflow.com/questions/9214214
复制相似问题