我正在做一些并行处理,如下:
with mp.Pool(8) as tmpPool:
results = tmpPool.starmap(my_function, inputs)输入的情况如下:(1,0.2312),(5,0.52) .即,整数和浮点数的元组。
代码运行良好,但我似乎无法将其封装在加载栏(tqdm)周围,例如,可以使用imap方法执行如下操作:
tqdm.tqdm(mp.imap(some_function,some_inputs))这也能为星图做吗?
谢谢!
发布于 2019-08-05 18:44:21
starmap()是不可能的,但是添加Pool.istarmap()的补丁是可能的。它基于imap()的代码。您所要做的就是创建istarmap.py-file并导入模块,以便在进行常规的多进程导入之前应用修补程序。
Python <3.8
# istarmap.py for Python <3.8
import multiprocessing.pool as mpp
def istarmap(self, func, iterable, chunksize=1):
"""starmap-version of imap
"""
if self._state != mpp.RUN:
raise ValueError("Pool not running")
if chunksize < 1:
raise ValueError(
"Chunksize must be 1+, not {0:n}".format(
chunksize))
task_batches = mpp.Pool._get_tasks(func, iterable, chunksize)
result = mpp.IMapIterator(self._cache)
self._taskqueue.put(
(
self._guarded_task_generation(result._job,
mpp.starmapstar,
task_batches),
result._set_length
))
return (item for chunk in result for item in chunk)
mpp.Pool.istarmap = istarmapPython 3.8+
# istarmap.py for Python 3.8+
import multiprocessing.pool as mpp
def istarmap(self, func, iterable, chunksize=1):
"""starmap-version of imap
"""
self._check_running()
if chunksize < 1:
raise ValueError(
"Chunksize must be 1+, not {0:n}".format(
chunksize))
task_batches = mpp.Pool._get_tasks(func, iterable, chunksize)
result = mpp.IMapIterator(self)
self._taskqueue.put(
(
self._guarded_task_generation(result._job,
mpp.starmapstar,
task_batches),
result._set_length
))
return (item for chunk in result for item in chunk)
mpp.Pool.istarmap = istarmap然后在你的剧本中:
import istarmap # import to apply patch
from multiprocessing import Pool
import tqdm
def foo(a, b):
for _ in range(int(50e6)):
pass
return a, b
if __name__ == '__main__':
with Pool(4) as pool:
iterable = [(i, 'x') for i in range(10)]
for _ in tqdm.tqdm(pool.istarmap(foo, iterable),
total=len(iterable)):
pass发布于 2021-01-23 01:56:00
最简单的方法可能是将tqdm()应用于输入,而不是映射函数。例如:
inputs = zip(param1, param2, param3)
with mp.Pool(8) as pool:
results = pool.starmap(my_function, tqdm.tqdm(inputs, total=len(param1)))发布于 2021-06-04 23:47:01
正如“黑暗者”所提到的,到本文撰写之时,还没有现成的istarmap。如果您想避免修补,可以添加一个简单的*_star函数作为解决办法。(此解决方案受本教程。启发)
import tqdm
import multiprocessing
def my_function(arg1, arg2, arg3):
return arg1 + arg2 + arg3
def my_function_star(args):
return my_function(*args)
jobs = 4
with multiprocessing.Pool(jobs) as pool:
args = [(i, i, i) for i in range(10000)]
results = list(tqdm.tqdm(pool.imap(my_function_star, args), total=len(args))一些注意事项:
我也很喜欢科里的回答。它更干净,虽然进度条似乎没有我的答案那么顺利地更新。请注意,corey的答案比我上面用chunksize=1发布的代码快几个数量级(默认)。我猜想这是由于多进程序列化,因为增加chunksize (或拥有更昂贵的my_function)使它们的运行时具有可比性。
由于我的序列化/功能成本比很低,所以我对我的应用程序采用了我的回答。
https://stackoverflow.com/questions/57354700
复制相似问题