首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >多处理池与过程

多处理池与过程
EN

Stack Overflow用户
提问于 2022-11-29 17:35:17
回答 1查看 30关注 0票数 0

我正在查看一些代码,并注意到一些可能是冗余的代码:

代码语言:javascript
复制
def tasker(val):
   do stuff

def multiprocessor (func, vals):
   chunks = np.array_split(vals, os.cpu_count())
   with multiprocessing.Pool() as pool:
      pool.map(partial(func,vals), chunksize=chunks)

if __name__ == '__main__':
   values = foobar
   p = multiprocessing.Process(target=multiprocessor(tasker,values))
   p.start()
   p.close()
   p.join()

只是为了检查一下--在multiprocessing.Process函数上运行multiprocessing.Pool不是多余的吗?一开始不需要对multiprocessing.Pool进行功能化,对吗?这样做有什么好处吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-11-29 17:39:55

碰巧,Process调用实际上从未做任何有用的事情;target=multiprocessor(tasker,values)在主进程中运行multiprocessor,然后将其返回值(None,因为它没有显式返回)作为Processtarget传递。

所以是的,确切地说,这是完全没有意义的;您在父进程中使Pool运行到完成,然后创建一个没有操作的Process,启动它,它什么也不做,然后当无用的Process退出时,主进程继续。除非创建这样一个非操作过程有一些好处,否则代码将执行相同的操作,如果被保护的块是公正的:

代码语言:javascript
复制
if __name__ == '__main__':
   values = foobar
   multiprocessor(tasker, values)

如果正确创建了Process,则使用:

代码语言:javascript
复制
p = multiprocessing.Process(target=multiprocessor, args=(tasker, values))

而且代码更复杂,这可能会有一些好处,如果Process需要被杀死(无论出于什么原因,您可以很容易地杀死它,例如因为某个截止日期已经过去),或者它会分配大量的内存,这些内存必须完全返回到操作系统(而不仅仅是释放给用户模式的空闲池以供重用),或者您试图避免主进程的全局变量的任何突变(如果Process's target对它们进行了变异,这些更改只会在该子进程和在更改后被fork编辑的任何进程中看到,父母不会看到他们被改变)。

如前所述,这些条件似乎都不适用(除了可能存在内存增长问题,特别是由于使用了partial, which has issues when used as the mapper function with Pool's various map-like methods),但不知道tasker的内容(更具体地说,它返回什么,Pool.map将收集和处理哪些内容,消耗的内存不是严格需要的,最终只能大量释放),我无法确定。

旁白:

我会注意到您编写的代码没有任何意义:

代码语言:javascript
复制
def multiprocessor (func, vals):
   chunks = np.array_split(vals, os.cpu_count())
   with multiprocessing.Pool() as pool:
      pool.map(partial(func,vals), chunksize=chunks)

不提供对pool.map的可迭代性,而是将chunks ( numpy子数组的list )作为chunksize传递,这应该是一个int

下面的补充意见假定它实际上是作为:

代码语言:javascript
复制
def multiprocessor (func, vals):
   chunks = np.array_split(vals, os.cpu_count())
   with multiprocessing.Pool() as pool:
      pool.map(func, chunks, chunksize=1)

或者:

代码语言:javascript
复制
def multiprocessor (func, vals):
   chunk_size = -(-len(vals) // os.cpu_count())  # Trick to get ceiling division out of floor division operator
   with multiprocessing.Pool() as pool:
      pool.map(func, vals, chunksize=chunk_size)

尽管如此,当所有结果明显被丢弃时,Pool.map存储所有结果时可能出现的内存问题可以通过使用Pool.imap_unordered来改善,只需强制结果迭代器高效地运行即可。例如,您可以将pool.map(func, chunks, chunksize=1)替换为consume(pool.imap_unordered(func, chunks)),将pool.map(func, vals, chunksize=chunk_size)替换为consume(pool.imap_unordered(func, vals, chunksize=chunk_size)) (其中consume是同名的the itertools recipe )。

在这两种情况下,与其为所有结果分配一个list,不如在工作人员完成任务时将每个结果存储在其中(分配越来越多您不需要的东西),imap_unordered在返回时生成每个结果,consume立即抓取每个结果并丢弃它(必须为每个结果分配内存,但必须立即释放它,因此进程的峰值内存消耗(因此堆的大小)保持最小)。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74618056

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档