首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何管理进程池?

如何管理进程池?
EN

Stack Overflow用户
提问于 2018-09-28 21:56:16
回答 2查看 520关注 0票数 2

我正在尝试在Windows 10上设置多进程池。

基本上,一些cpus (在我的例子中是12个)应该从Qin读取数据并将结果写入Qout。在Qin中编写'end'时,该进程应该会停止。

由于某些原因,进程挂起。

我开发了一个简单的版本:

代码语言:javascript
复制
from multiprocessing import Pool, Queue, Event
import os,time


def worker( Qin, Qout, event):
    time.sleep(5)
    while True:
        item = Qin.get()
        if item == 'end':
            event.set()
        else:
            Qout.put(item)
        time.sleep(1)

def manager():
    Qin,Qout,event= Queue(), Queue(), Event()
    processes = os.cpu_count()
    pool = Pool(processes=processes)
    for _ in range(processes):
        pool.apply_async(worker,args= (Qin,Qout,event,))
    for i in range(100):
        print(i)
        Qin.put(i)

    Qin.put('end')

    pool.close()
    event.wait()
    pool.terminate()
    return Qout

Qout = manager()
EN

回答 2

Stack Overflow用户

发布于 2018-09-28 22:40:50

您需要了解在python中异步编程是如何正确工作的。当你调用apply_async时,你会得到未来的对象。python中的队列实现依赖于一个系统管道来将数据从一个进程传输到另一个进程,并依赖一些信号量来保护该管道上的读写。

代码语言:javascript
复制
from multiprocessing import Pool, Queue, Event
import os
import time
import multiprocessing

def worker( Qin, Qout, event):
    print('worker')
    time.sleep(1)
    event.set()

def manager():
    processes = multiprocessing.cpu_count()
    m = multiprocessing.Manager()
    Qin = m.Queue()
    Qout = m.Queue()
    event = m.Event()
    pool = Pool(processes=processes)
    result = pool.apply_async(worker, (Qin, Qout, event))
    result.get()
    pool.close()
    event.wait()
    return Qout

if __name__ == '__main__':
    Qout = manager()
票数 0
EN

Stack Overflow用户

发布于 2018-09-29 05:40:09

我认为你的代码挂起的原因是因为所有的工作者任务最终都在等待一些东西出现在输入队列中,同时带有item = Qin.get()行,因为get()“阻塞”等待一些东西被放入队列中。避免这种情况的一种方法是使用非阻塞的get_nowait()方法。这样做需要代码来处理它可能引发的任何Empty异常,但它避免了该进程中的任何进一步执行在该点上有效地停止。

此外,您还需要创建和使用一个multiprocessing.Manager,它创建一个服务器进程,该进程保存Python对象,并允许其他进程通过代理来操作它们。请参阅文档的部分的“服务器进程”部分。

此外,在Windows上使用multiprocessing时,通过将主进程的代码放在if __name__ == '__main__':语句中来确保主进程的代码有条件地执行,这一点非常重要。这是因为模块是如何在该平台上实现的-否则,每次启动另一个并发任务时,该代码都会再次执行(这涉及到它们正在执行import)。

下面是您的代码所需的修改,以便它使用multiprocessing.Manager。注意,我更改了manager()函数的名称,以避免与它用来创建共享对象的multiprocessing.Manager混淆。

代码语言:javascript
复制
import multiprocessing
from queue import Empty as QueueEmpty
import os
import time

END_MARKER = 'end'


def worker(id, Qin, Qout, event):
    while True:
        try:
            item = Qin.get_nowait()  # Non-blocking.
        except QueueEmpty:
            if event.is_set():  # Last item seen?
               break
            continue # Keep polling.

        if item == END_MARKER:  # Last item?
            event.set()
            break  # Quit.

        Qout.put('{} via worker {}'.format(item, id))
        time.sleep(.25)


def pool_manager():
    processes = os.cpu_count()
    pool = multiprocessing.Pool(processes=processes)
    manager = multiprocessing.Manager()
    Qin, Qout, event = manager.Queue(), manager.Queue(), manager.Event()

    for i in range(100):
        Qin.put(i)

    Qin.put(END_MARKER)

    for id in range(processes):
        pool.apply_async(worker, (id, Qin, Qout, event))

    pool.close()  # Done adding tasks.
    pool.join()  # Wait for all tasks to complete.

    return Qout


if __name__ == '__main__':
    print('Processing')
    Qout = pool_manager()

    print('Contents of Qout:')
    while not Qout.empty():
        item = Qout.get()
        print(' ', item)

    print('End of script')
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52556941

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档