首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >没有在空队列上退出Multiprocess.Process

没有在空队列上退出Multiprocess.Process
EN

Stack Overflow用户
提问于 2020-07-26 18:20:06
回答 1查看 743关注 0票数 1

我有大量的对象需要迭代,我认为多处理将大大加快工作的速度。然而,这个简单的例子似乎挂起,一旦我增加核心计数。

它挂在p.join()行上,如果我终止和检查,q_in.empty()返回True,输出队列有适当数量的项。

是什么导致它挂起来的?

代码语言:javascript
复制
from multiprocessing import Process, Queue
import time

def worker_func(q_in, q_out, w):
    time.sleep(1)
    while not q_in.empty():
        # Simple code standing in for more complex operation
         q_out.put(str(w) + '_' + str(q_in.get()))

def setup_func(x):
    q_in = Queue()
    for w in range(x):
        q_in.put(w)

    q_out = Queue()
    return((q_in, q_out))

def test_func(num_cores, q_in, q_out):
    processes = []

    for w in range(num_cores):
        p = Process(target=worker_func, args=(q_in, q_out, w))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    output_ls = []
    while not q_out.empty():
        output_ls.append(q_out.get())

    return(output_ls)

q_in, q_out = setup_func(1000)
test_func(1, q_in, q_out) # This returns without issue for num_cores = 1 or 2

q_in, q_out = setup_func(1000)
test_func(5, q_in, q_out) # This hangs for num_cores = 5
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-07-26 18:48:51

有多个进程从队列中提取。队列可能有数据,但是当您有时间获取它时,另一个进程已经消耗了它。multiprocessing.Queue.empty说,由于多线程/多处理语义,这是不可靠的。

另一种方法是将一个进程结束哨兵放在队列的末尾,每个进程一个。当进程看到哨兵时,它就会退出。在您的例子中,None是一个很好的选择。

代码语言:javascript
复制
from multiprocessing import Process, Queue
import time

def worker_func(q_in, q_out, w):
    time.sleep(1)
    while True:
        msg = q_in.get()
        if msg is None:
            break
        q_out.put(str(w) + '_' + str(msg))

def setup_func(x):
    q_in = Queue()
    for w in range(x):
        q_in.put(w)

    q_out = Queue()
    return((q_in, q_out))

def test_func(num_cores, q_in, q_out):
    processes = []

    for w in range(num_cores):
        q_in.put(None)
        p = Process(target=worker_func, args=(q_in, q_out, w))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    output_ls = []
    while not q_out.empty():
        output_ls.append(q_out.get())

    return(output_ls)

q_in, q_out = setup_func(1000)
test_func(1, q_in, q_out) # This returns without issue for num_cores = 1 or 2

q_in, q_out = setup_func(1000)
test_func(5, q_in, q_out) # This hangs for num_cores = 5
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63104018

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档