我正在查看一些代码,并注意到一些可能是冗余的代码:
def tasker(val):
do stuff
def multiprocessor (func, vals):
chunks = np.array_split(vals, os.cpu_count())
with multiprocessing.Pool() as pool:
pool.map(partial(func,vals), chunksize=chunks)
if __name__ == '__main__':
values = foobar
p = multiprocessing.Process(target=multiprocessor(tasker,values))
p.start()
p.close()
p.join()只是为了检查一下--在multiprocessing.Process函数上运行multiprocessing.Pool不是多余的吗?一开始不需要对multiprocessing.Pool进行功能化,对吗?这样做有什么好处吗?
发布于 2022-11-29 17:39:55
碰巧,Process调用实际上从未做任何有用的事情;target=multiprocessor(tasker,values)在主进程中运行multiprocessor,然后将其返回值(None,因为它没有显式返回)作为Process的target传递。
所以是的,确切地说,这是完全没有意义的;您在父进程中使Pool运行到完成,然后创建一个没有操作的Process,启动它,它什么也不做,然后当无用的Process退出时,主进程继续。除非创建这样一个非操作过程有一些好处,否则代码将执行相同的操作,如果被保护的块是公正的:
if __name__ == '__main__':
values = foobar
multiprocessor(tasker, values)如果正确创建了Process,则使用:
p = multiprocessing.Process(target=multiprocessor, args=(tasker, values))而且代码更复杂,这可能会有一些好处,如果Process需要被杀死(无论出于什么原因,您可以很容易地杀死它,例如因为某个截止日期已经过去),或者它会分配大量的内存,这些内存必须完全返回到操作系统(而不仅仅是释放给用户模式的空闲池以供重用),或者您试图避免主进程的全局变量的任何突变(如果Process's target对它们进行了变异,这些更改只会在该子进程和在更改后被fork编辑的任何进程中看到,父母不会看到他们被改变)。
如前所述,这些条件似乎都不适用(除了可能存在内存增长问题,特别是由于使用了partial, which has issues when used as the mapper function with Pool's various map-like methods),但不知道tasker的内容(更具体地说,它返回什么,Pool.map将收集和处理哪些内容,消耗的内存不是严格需要的,最终只能大量释放),我无法确定。
旁白:
我会注意到您编写的代码没有任何意义:
def multiprocessor (func, vals):
chunks = np.array_split(vals, os.cpu_count())
with multiprocessing.Pool() as pool:
pool.map(partial(func,vals), chunksize=chunks)不提供对pool.map的可迭代性,而是将chunks ( numpy子数组的list )作为chunksize传递,这应该是一个int。
下面的补充意见假定它实际上是作为:
def multiprocessor (func, vals):
chunks = np.array_split(vals, os.cpu_count())
with multiprocessing.Pool() as pool:
pool.map(func, chunks, chunksize=1)或者:
def multiprocessor (func, vals):
chunk_size = -(-len(vals) // os.cpu_count()) # Trick to get ceiling division out of floor division operator
with multiprocessing.Pool() as pool:
pool.map(func, vals, chunksize=chunk_size)尽管如此,当所有结果明显被丢弃时,Pool.map存储所有结果时可能出现的内存问题可以通过使用Pool.imap_unordered来改善,只需强制结果迭代器高效地运行即可。例如,您可以将pool.map(func, chunks, chunksize=1)替换为consume(pool.imap_unordered(func, chunks)),将pool.map(func, vals, chunksize=chunk_size)替换为consume(pool.imap_unordered(func, vals, chunksize=chunk_size)) (其中consume是同名的the itertools recipe )。
在这两种情况下,与其为所有结果分配一个list,不如在工作人员完成任务时将每个结果存储在其中(分配越来越多您不需要的东西),imap_unordered在返回时生成每个结果,consume立即抓取每个结果并丢弃它(必须为每个结果分配内存,但必须立即释放它,因此进程的峰值内存消耗(因此堆的大小)保持最小)。
https://stackoverflow.com/questions/74618056
复制相似问题