首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >星图与tqdm结合?

星图与tqdm结合?
EN

Stack Overflow用户
提问于 2019-08-05 08:20:44
回答 4查看 15.2K关注 0票数 41

我正在做一些并行处理,如下:

代码语言:javascript
复制
with mp.Pool(8) as tmpPool:
        results = tmpPool.starmap(my_function, inputs)

输入的情况如下:(1,0.2312),(5,0.52) .即,整数和浮点数的元组。

代码运行良好,但我似乎无法将其封装在加载栏(tqdm)周围,例如,可以使用imap方法执行如下操作:

代码语言:javascript
复制
tqdm.tqdm(mp.imap(some_function,some_inputs))

这也能为星图做吗?

谢谢!

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2019-08-05 18:44:21

starmap()是不可能的,但是添加Pool.istarmap()的补丁是可能的。它基于imap()的代码。您所要做的就是创建istarmap.py-file并导入模块,以便在进行常规的多进程导入之前应用修补程序。

Python <3.8

代码语言:javascript
复制
# istarmap.py for Python <3.8
import multiprocessing.pool as mpp


def istarmap(self, func, iterable, chunksize=1):
    """starmap-version of imap
    """
    if self._state != mpp.RUN:
        raise ValueError("Pool not running")

    if chunksize < 1:
        raise ValueError(
            "Chunksize must be 1+, not {0:n}".format(
                chunksize))

    task_batches = mpp.Pool._get_tasks(func, iterable, chunksize)
    result = mpp.IMapIterator(self._cache)
    self._taskqueue.put(
        (
            self._guarded_task_generation(result._job,
                                          mpp.starmapstar,
                                          task_batches),
            result._set_length
        ))
    return (item for chunk in result for item in chunk)


mpp.Pool.istarmap = istarmap

Python 3.8+

代码语言:javascript
复制
# istarmap.py for Python 3.8+
import multiprocessing.pool as mpp


def istarmap(self, func, iterable, chunksize=1):
    """starmap-version of imap
    """
    self._check_running()
    if chunksize < 1:
        raise ValueError(
            "Chunksize must be 1+, not {0:n}".format(
                chunksize))

    task_batches = mpp.Pool._get_tasks(func, iterable, chunksize)
    result = mpp.IMapIterator(self)
    self._taskqueue.put(
        (
            self._guarded_task_generation(result._job,
                                          mpp.starmapstar,
                                          task_batches),
            result._set_length
        ))
    return (item for chunk in result for item in chunk)


mpp.Pool.istarmap = istarmap

然后在你的剧本中:

代码语言:javascript
复制
import istarmap  # import to apply patch
from multiprocessing import Pool
import tqdm    


def foo(a, b):
    for _ in range(int(50e6)):
        pass
    return a, b    


if __name__ == '__main__':

    with Pool(4) as pool:
        iterable = [(i, 'x') for i in range(10)]
        for _ in tqdm.tqdm(pool.istarmap(foo, iterable),
                           total=len(iterable)):
            pass
票数 42
EN

Stack Overflow用户

发布于 2021-01-23 01:56:00

最简单的方法可能是将tqdm()应用于输入,而不是映射函数。例如:

代码语言:javascript
复制
inputs = zip(param1, param2, param3)
with mp.Pool(8) as pool:
    results = pool.starmap(my_function, tqdm.tqdm(inputs, total=len(param1)))
票数 40
EN

Stack Overflow用户

发布于 2021-06-04 23:47:01

正如“黑暗者”所提到的,到本文撰写之时,还没有现成的istarmap。如果您想避免修补,可以添加一个简单的*_star函数作为解决办法。(此解决方案受本教程。启发)

代码语言:javascript
复制
import tqdm
import multiprocessing

def my_function(arg1, arg2, arg3):
  return arg1 + arg2 + arg3

def my_function_star(args):
    return my_function(*args)

jobs = 4
with multiprocessing.Pool(jobs) as pool:
    args = [(i, i, i) for i in range(10000)]
    results = list(tqdm.tqdm(pool.imap(my_function_star, args), total=len(args))

一些注意事项:

我也很喜欢科里的回答。它更干净,虽然进度条似乎没有我的答案那么顺利地更新。请注意,corey的答案比我上面用chunksize=1发布的代码快几个数量级(默认)。我猜想这是由于多进程序列化,因为增加chunksize (或拥有更昂贵的my_function)使它们的运行时具有可比性。

由于我的序列化/功能成本比很低,所以我对我的应用程序采用了我的回答。

票数 13
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57354700

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档