我正在使用多处理来处理我的记录。
queue = Queue()
def produce(i, item):
data = process(i, item)
queue.put(data)
def process(i, item):
data = do_processing(i, item)
return data
if __name__ == '__main__':
records = load_records()
with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
print('produce items')
for i, item in ennumerate(records.items()):
executor.submit(produce, i, item)
print('queue size:{}'.format(queue.qsize()))
while not queue.empty():
save(queue.get())在这里,我将来自produce记录放在一个队列中,因为该步骤非常耗时。在处理记录之后,我保存它们。由于消费步骤并不耗时,因此我不会费心在单独的线程中运行它。
在这里,在我执行代码之后,队列仍然是空的。这里发生什么事情?
发布于 2021-09-17 01:27:24
我想这就是如何做你想做的事情。正如在comment中提到的,每个进程都在自己的内存空间中运行,因此不能简单地共享像队列这样的全局变量,也不能将其作为参数传递给每个进程。
在使用ProcessPoolExecutor时,您可以有效地执行所需的操作-共享队列-通过定义一个初始化器函数,该函数将在每个进程开始时被调用,该函数将为该进程创建一个全局进程,并将队列作为参数传递给该进程。
下面是一些与你的代码非常相似的东西,实际上是可运行的,用来说明我的意思:
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Queue
import os
MAX_RECORDS = 10
def load_records():
return dict.fromkeys(range(MAX_RECORDS), 0)
def do_processing(item):
return item
def init_queue(queue):
globals()['queue'] = queue # Makes queue a global in each process.
def produce(i, item):
data = process(i, item)
queue.put(data)
def process(i, item):
data = do_processing(item)
return data
if __name__ == '__main__':
records = load_records()
queue = Queue()
with ProcessPoolExecutor(max_workers=os.cpu_count(),
initializer=init_queue, initargs=(queue,)) as executor:
print('producing items')
for i, item in enumerate(records.items()):
future = executor.submit(produce, i, item)
print('done producing items')
print('queue size: {}'.format(queue.qsize()))
while not queue.empty():
print(queue.get())输出:
producing items
done producing items
queue size: 10
(0, 0)
(1, 0)
(2, 0)
(3, 0)
(4, 0)
(5, 0)
(6, 0)
(7, 0)
(8, 0)
(9, 0)发布于 2021-09-16 23:37:13
使用multiprocessing.Queue()处理多进程。
queue = multiprocessing.Queue()
def produce(item):
data = process(item)
queue.put(data)
def process(item):
data = do_processing(item)
return data
if __name__ == '__main__':
records = load_records()
with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
print('produce items')
for item in records.items():
executor.submit(produce, item)
print('queue size:{}'.format(queue.qsize()))
while not queue.empty():
save(queue.get())发布于 2021-09-17 00:15:09
def process(item):
data = do_processing(item)
return data
queue = Queue() # not a multiprocessing queue
with multiprocessing.Pool(processes=os.cpu_count()) as pool:
for result in pool.imap(process, records):
queue.put(result)
while not queue.empty():
save(queue.get())https://stackoverflow.com/questions/69215848
复制相似问题