我有大量的对象需要迭代,我认为多处理将大大加快工作的速度。然而,这个简单的例子似乎挂起,一旦我增加核心计数。
它挂在p.join()行上,如果我终止和检查,q_in.empty()返回True,输出队列有适当数量的项。
是什么导致它挂起来的?
from multiprocessing import Process, Queue
import time
def worker_func(q_in, q_out, w):
time.sleep(1)
while not q_in.empty():
# Simple code standing in for more complex operation
q_out.put(str(w) + '_' + str(q_in.get()))
def setup_func(x):
q_in = Queue()
for w in range(x):
q_in.put(w)
q_out = Queue()
return((q_in, q_out))
def test_func(num_cores, q_in, q_out):
processes = []
for w in range(num_cores):
p = Process(target=worker_func, args=(q_in, q_out, w))
processes.append(p)
p.start()
for p in processes:
p.join()
output_ls = []
while not q_out.empty():
output_ls.append(q_out.get())
return(output_ls)
q_in, q_out = setup_func(1000)
test_func(1, q_in, q_out) # This returns without issue for num_cores = 1 or 2
q_in, q_out = setup_func(1000)
test_func(5, q_in, q_out) # This hangs for num_cores = 5发布于 2020-07-26 18:48:51
有多个进程从队列中提取。队列可能有数据,但是当您有时间获取它时,另一个进程已经消耗了它。multiprocessing.Queue.empty说,由于多线程/多处理语义,这是不可靠的。
另一种方法是将一个进程结束哨兵放在队列的末尾,每个进程一个。当进程看到哨兵时,它就会退出。在您的例子中,None是一个很好的选择。
from multiprocessing import Process, Queue
import time
def worker_func(q_in, q_out, w):
time.sleep(1)
while True:
msg = q_in.get()
if msg is None:
break
q_out.put(str(w) + '_' + str(msg))
def setup_func(x):
q_in = Queue()
for w in range(x):
q_in.put(w)
q_out = Queue()
return((q_in, q_out))
def test_func(num_cores, q_in, q_out):
processes = []
for w in range(num_cores):
q_in.put(None)
p = Process(target=worker_func, args=(q_in, q_out, w))
processes.append(p)
p.start()
for p in processes:
p.join()
output_ls = []
while not q_out.empty():
output_ls.append(q_out.get())
return(output_ls)
q_in, q_out = setup_func(1000)
test_func(1, q_in, q_out) # This returns without issue for num_cores = 1 or 2
q_in, q_out = setup_func(1000)
test_func(5, q_in, q_out) # This hangs for num_cores = 5https://stackoverflow.com/questions/63104018
复制相似问题