我正在尝试在Windows 10上设置多进程池。
基本上,一些cpus (在我的例子中是12个)应该从Qin读取数据并将结果写入Qout。在Qin中编写'end'时,该进程应该会停止。
由于某些原因,进程挂起。
我开发了一个简单的版本:
from multiprocessing import Pool, Queue, Event
import os,time
def worker( Qin, Qout, event):
time.sleep(5)
while True:
item = Qin.get()
if item == 'end':
event.set()
else:
Qout.put(item)
time.sleep(1)
def manager():
Qin,Qout,event= Queue(), Queue(), Event()
processes = os.cpu_count()
pool = Pool(processes=processes)
for _ in range(processes):
pool.apply_async(worker,args= (Qin,Qout,event,))
for i in range(100):
print(i)
Qin.put(i)
Qin.put('end')
pool.close()
event.wait()
pool.terminate()
return Qout
Qout = manager()发布于 2018-09-28 22:40:50
您需要了解在python中异步编程是如何正确工作的。当你调用apply_async时,你会得到未来的对象。python中的队列实现依赖于一个系统管道来将数据从一个进程传输到另一个进程,并依赖一些信号量来保护该管道上的读写。
from multiprocessing import Pool, Queue, Event
import os
import time
import multiprocessing
def worker( Qin, Qout, event):
print('worker')
time.sleep(1)
event.set()
def manager():
processes = multiprocessing.cpu_count()
m = multiprocessing.Manager()
Qin = m.Queue()
Qout = m.Queue()
event = m.Event()
pool = Pool(processes=processes)
result = pool.apply_async(worker, (Qin, Qout, event))
result.get()
pool.close()
event.wait()
return Qout
if __name__ == '__main__':
Qout = manager()发布于 2018-09-29 05:40:09
我认为你的代码挂起的原因是因为所有的工作者任务最终都在等待一些东西出现在输入队列中,同时带有item = Qin.get()行,因为get()“阻塞”等待一些东西被放入队列中。避免这种情况的一种方法是使用非阻塞的get_nowait()方法。这样做需要代码来处理它可能引发的任何Empty异常,但它避免了该进程中的任何进一步执行在该点上有效地停止。
此外,您还需要创建和使用一个multiprocessing.Manager,它创建一个服务器进程,该进程保存Python对象,并允许其他进程通过代理来操作它们。请参阅文档的部分的“服务器进程”部分。
此外,在Windows上使用multiprocessing时,通过将主进程的代码放在if __name__ == '__main__':语句中来确保主进程的代码有条件地执行,这一点非常重要。这是因为模块是如何在该平台上实现的-否则,每次启动另一个并发任务时,该代码都会再次执行(这涉及到它们正在执行import)。
下面是您的代码所需的修改,以便它使用multiprocessing.Manager。注意,我更改了manager()函数的名称,以避免与它用来创建共享对象的multiprocessing.Manager混淆。
import multiprocessing
from queue import Empty as QueueEmpty
import os
import time
END_MARKER = 'end'
def worker(id, Qin, Qout, event):
while True:
try:
item = Qin.get_nowait() # Non-blocking.
except QueueEmpty:
if event.is_set(): # Last item seen?
break
continue # Keep polling.
if item == END_MARKER: # Last item?
event.set()
break # Quit.
Qout.put('{} via worker {}'.format(item, id))
time.sleep(.25)
def pool_manager():
processes = os.cpu_count()
pool = multiprocessing.Pool(processes=processes)
manager = multiprocessing.Manager()
Qin, Qout, event = manager.Queue(), manager.Queue(), manager.Event()
for i in range(100):
Qin.put(i)
Qin.put(END_MARKER)
for id in range(processes):
pool.apply_async(worker, (id, Qin, Qout, event))
pool.close() # Done adding tasks.
pool.join() # Wait for all tasks to complete.
return Qout
if __name__ == '__main__':
print('Processing')
Qout = pool_manager()
print('Contents of Qout:')
while not Qout.empty():
item = Qout.get()
print(' ', item)
print('End of script')https://stackoverflow.com/questions/52556941
复制相似问题