首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何告诉Python multiprocessing.pool.map_async在两次函数调用之间等待?

如何告诉Python multiprocessing.pool.map_async在两次函数调用之间等待?
EN

Stack Overflow用户
提问于 2021-11-23 21:37:39
回答 1查看 36关注 0票数 0

差不多就是这个标题。我正在使用pool.map_async运行一个对我来说是个黑匣子的程序。它基本上是分析文件并输出结果。

有时,我需要分析同一文件两次。当我多进程时,黑盒程序会生气,因为两个进程试图同时访问同一个文件。

我不能调试或更改这个黑盒程序,但是从命令行执行多处理,不同调用之间的3-5秒等待(转到不同的内核)解决了这个问题。

有没有一种方法可以告诉map_async不要尽可能快地排队,而是在两次调用之间等待一段指定的时间?

EN

回答 1

Stack Overflow用户

发布于 2021-11-23 23:03:34

正确的处理方式应该是使用locks,请参阅https://docs.python.org/3/library/multiprocessing.html#synchronization-between-processes

但这需要更改被调用的进程函数,使它们遵守作为arg提供给它们的锁。如果没有,我几乎看不到让进程等待的替代方案,而且您的解决方案足够好(尽管它在某种程度上使多处理的想法过时了……)

编辑:

这里介绍了如何使用锁将黑盒程序包装在包装器中,并由多处理池执行。该工作被分成块,以便包装器函数被逐步并行地执行。锁保证了当一个进程正在执行black_box时,没有其他的black_box在同一时间运行。

如果您知道某些工作块没有冲突,那么您也可以在锁之外执行这些black_box实例。

代码语言:javascript
复制
import multiprocessing as mp
import time
from functools import partial

# define 4 chunks of work
work = []
work.append(range(1, 5))
work.append(range(6, 10))
work.append(range(11, 15))
work.append(range(16, 20))


def black_box(i: int):
    print(i)
    time.sleep(1)



def wrapper(lock, work_chunk: list):
    for w in work_chunk:
        lock.acquire()
        black_box(w)
        lock.release()
    return f"chunk {work_chunk} done"



if __name__ == '__main__':
    m = mp.Manager()
    lock = m.Lock()
    func = partial(wrapper, lock)
    with mp.Pool(processes=4) as pool:
        print(pool.map(func, work))

另一个版本中,每个工作项都获得了是否需要锁的信息:

代码语言:javascript
复制
import multiprocessing as mp
import time
from functools import partial

# define work with lock information
work = [(1, True), (2, False), (3, True), (4, False), (5, True), (6, False), (7, False)]


def black_box(i: int):
    print(i)
    time.sleep(1)



def wrapper(lock, work_item: tuple):
    if work_item[1] is True:
        lock.acquire()
        black_box(work_item[0])
        lock.release()
    else:
        black_box(work_item[0])
    return f"chunk {work_item[0]} done"



if __name__ == '__main__':
    m = mp.Manager()
    lock = m.Lock()
    func = partial(wrapper, lock)
    with mp.Pool(processes=4) as pool:
        print(pool.map(func, work))

最后是一个没有锁的版本,其中每个工作项都有延迟信息。这里唯一的机制是一些进程被延迟了指定的秒数(并且希望没有冲突……)

代码语言:javascript
复制
import multiprocessing as mp
import time

# define work with delay information
work = [(1, 1), (2, 2), (3, 4), (4, 0), (5, 0), (6, 0), (7, 0)]


def black_box(i: int):
    print(i)
    time.sleep(1)


def wrapper(work_item: tuple):
    time.sleep(work_item[1])
    black_box(work_item[0])
    return f"chunk {work_item[0]} done"


if __name__ == '__main__':
    m = mp.Manager()
    with mp.Pool(processes=4) as pool:
        print(pool.map(wrapper, work))
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70088144

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档