首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么队列大小保持为零?

为什么队列大小保持为零?
EN

Stack Overflow用户
提问于 2021-09-16 22:26:07
回答 3查看 142关注 0票数 0

我正在使用多处理来处理我的记录。

代码语言:javascript
复制
queue = Queue()

def produce(i, item):
    data = process(i, item)
    queue.put(data)

def process(i, item):
    data = do_processing(i, item)
    return data

if __name__ == '__main__':
    records = load_records()

    with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
        print('produce items')
        for i, item in ennumerate(records.items()):
            executor.submit(produce, i, item)

    print('queue size:{}'.format(queue.qsize()))
    while not queue.empty():
        save(queue.get())

在这里,我将来自produce记录放在一个队列中,因为该步骤非常耗时。在处理记录之后,我保存它们。由于消费步骤并不耗时,因此我不会费心在单独的线程中运行它。

在这里,在我执行代码之后,队列仍然是空的。这里发生什么事情?

EN

回答 3

Stack Overflow用户

发布于 2021-09-17 01:27:24

我想这就是如何做你想做的事情。正如在comment中提到的,每个进程都在自己的内存空间中运行,因此不能简单地共享像队列这样的全局变量,也不能将其作为参数传递给每个进程。

在使用ProcessPoolExecutor时,您可以有效地执行所需的操作-共享队列-通过定义一个初始化器函数,该函数将在每个进程开始时被调用,该函数将为该进程创建一个全局进程,并将队列作为参数传递给该进程。

下面是一些与你的代码非常相似的东西,实际上是可运行的,用来说明我的意思:

代码语言:javascript
复制
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Queue
import os


MAX_RECORDS = 10

def load_records():
    return dict.fromkeys(range(MAX_RECORDS), 0)

def do_processing(item):
    return item

def init_queue(queue):
    globals()['queue'] = queue  # Makes queue a global in each process.

def produce(i, item):
    data = process(i, item)
    queue.put(data)

def process(i, item):
    data = do_processing(item)
    return data


if __name__ == '__main__':
    records = load_records()

    queue = Queue()
    with ProcessPoolExecutor(max_workers=os.cpu_count(),
                             initializer=init_queue, initargs=(queue,)) as executor:
        print('producing items')
        for i, item in enumerate(records.items()):
            future = executor.submit(produce, i, item)
        print('done producing items')

    print('queue size: {}'.format(queue.qsize()))
    while not queue.empty():
        print(queue.get())

输出:

代码语言:javascript
复制
producing items
done producing items
queue size: 10
(0, 0)
(1, 0)
(2, 0)
(3, 0)
(4, 0)
(5, 0)
(6, 0)
(7, 0)
(8, 0)
(9, 0)
票数 1
EN

Stack Overflow用户

发布于 2021-09-16 23:37:13

使用multiprocessing.Queue()处理多进程。

代码语言:javascript
复制
queue = multiprocessing.Queue()

def produce(item):
    data = process(item)
    queue.put(data)

def process(item):
    data = do_processing(item)
    return data

if __name__ == '__main__':
    records = load_records()

    with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
        print('produce items')
        for item in records.items():
            executor.submit(produce, item)

    print('queue size:{}'.format(queue.qsize()))
    while not queue.empty():
        save(queue.get())
票数 0
EN

Stack Overflow用户

发布于 2021-09-17 00:15:09

代码语言:javascript
复制
def process(item):
    data = do_processing(item)
    return data

queue = Queue() # not a multiprocessing queue

with multiprocessing.Pool(processes=os.cpu_count()) as pool:
    for result in pool.imap(process, records):
        queue.put(result)

while not queue.empty():
    save(queue.get())
票数 -2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69215848

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档