差不多就是这个标题。我正在使用pool.map_async运行一个对我来说是个黑匣子的程序。它基本上是分析文件并输出结果。
有时,我需要分析同一文件两次。当我多进程时,黑盒程序会生气,因为两个进程试图同时访问同一个文件。
我不能调试或更改这个黑盒程序,但是从命令行执行多处理,不同调用之间的3-5秒等待(转到不同的内核)解决了这个问题。
有没有一种方法可以告诉map_async不要尽可能快地排队,而是在两次调用之间等待一段指定的时间?
发布于 2021-11-23 23:03:34
正确的处理方式应该是使用locks,请参阅https://docs.python.org/3/library/multiprocessing.html#synchronization-between-processes
但这需要更改被调用的进程函数,使它们遵守作为arg提供给它们的锁。如果没有,我几乎看不到让进程等待的替代方案,而且您的解决方案足够好(尽管它在某种程度上使多处理的想法过时了……)
编辑:
这里介绍了如何使用锁将黑盒程序包装在包装器中,并由多处理池执行。该工作被分成块,以便包装器函数被逐步并行地执行。锁保证了当一个进程正在执行black_box时,没有其他的black_box在同一时间运行。
如果您知道某些工作块没有冲突,那么您也可以在锁之外执行这些black_box实例。
import multiprocessing as mp
import time
from functools import partial
# define 4 chunks of work
work = []
work.append(range(1, 5))
work.append(range(6, 10))
work.append(range(11, 15))
work.append(range(16, 20))
def black_box(i: int):
print(i)
time.sleep(1)
def wrapper(lock, work_chunk: list):
for w in work_chunk:
lock.acquire()
black_box(w)
lock.release()
return f"chunk {work_chunk} done"
if __name__ == '__main__':
m = mp.Manager()
lock = m.Lock()
func = partial(wrapper, lock)
with mp.Pool(processes=4) as pool:
print(pool.map(func, work))另一个版本中,每个工作项都获得了是否需要锁的信息:
import multiprocessing as mp
import time
from functools import partial
# define work with lock information
work = [(1, True), (2, False), (3, True), (4, False), (5, True), (6, False), (7, False)]
def black_box(i: int):
print(i)
time.sleep(1)
def wrapper(lock, work_item: tuple):
if work_item[1] is True:
lock.acquire()
black_box(work_item[0])
lock.release()
else:
black_box(work_item[0])
return f"chunk {work_item[0]} done"
if __name__ == '__main__':
m = mp.Manager()
lock = m.Lock()
func = partial(wrapper, lock)
with mp.Pool(processes=4) as pool:
print(pool.map(func, work))最后是一个没有锁的版本,其中每个工作项都有延迟信息。这里唯一的机制是一些进程被延迟了指定的秒数(并且希望没有冲突……)
import multiprocessing as mp
import time
# define work with delay information
work = [(1, 1), (2, 2), (3, 4), (4, 0), (5, 0), (6, 0), (7, 0)]
def black_box(i: int):
print(i)
time.sleep(1)
def wrapper(work_item: tuple):
time.sleep(work_item[1])
black_box(work_item[0])
return f"chunk {work_item[0]} done"
if __name__ == '__main__':
m = mp.Manager()
with mp.Pool(processes=4) as pool:
print(pool.map(wrapper, work))https://stackoverflow.com/questions/70088144
复制相似问题