你好,我有一个多处理程序,如
#this is pseudocode-ish
def worker(queue, context):
set_context(context) #set global context within worker
while queue.qsize() > 0:
process(queue.get(False))
pool = multiprocessing.Pool(20, worker, (queue, global_context))
pool.close()
pool.join()问题是,global context是一个非常重的对象,因此产生每个单独的进程(酸洗/不酸洗)需要一段时间。所以我一直在发现,对于较短的队列,整个队列由前两个产生的进程处理,然后程序的其余部分被卡住,产生其余的进程,这不可避免地什么都不做,因为队列中什么都没有了。例如,每个进程需要1秒来生成,但是队列在2秒内被处理--因此前两个进程在2-3秒内完成队列,然后程序的其余部分需要17秒才能生成其余的队列。
当队列为空时,是否有一种方法可以杀死其余的进程?或者一种更灵活的方法来设置进程池数量--例如,只在需要时生成另一个进程?
谢谢
发布于 2017-07-31 15:35:22
没有办法在动态的过程中生成multiprocessing.Pool。如果你想要这种行为,你需要自己去修改它。
对于关闭,一种方法是使用multiprocessing.Pool.terminate方法。但是它可能会等待所有的worker完成它们的初始化。
当你的工作完成后,你也可以直接杀死所有的工人。我认为它们是一个_pool字段,它包含可以强制终止的所有worker Process。请注意,这可能会导致一些奇怪的行为,这是不打算被外部处理的。您必须确保正确地清理了所有的管理thread,这可能很棘手。
你的设计选择是很不寻常的。你在复制call_queue。实际上,Pool本身应该负责通信,您不需要额外的queue。如果所有的task_list都在process_task中,并且需要由process_task进行处理,那么您可以执行以下操作
#this is pseudocode-ish
def init(queue, context):
set_context(context) # set global context within worker
pool = multiprocessing.Pool(20, init, (global_context,))
res = pool.map(process_task, task_list)
pool.terminate()
pool.join()这样可以避免破坏Pool设置,而且可能更有效。
最后,如果您打算重复使用您的池几次,并且您的global_context没有改变,您可以考虑使用loky。(免责声明:我是这个项目的维护者之一)。这允许您在程序中多次重用一个工作人员池,而不必重新设置所有内容。一个问题是没有initializer,因为它遵循concurrent.futures的API,但是initializer可以使用multiprocessing.Barrier和提交max_workers初始化作业来完成。这将确保initializer的每个作业都由一个工作人员运行,并且所有工作人员都运行initializer。
https://stackoverflow.com/questions/45379578
复制相似问题