问题
我是一个多进程的新手,对于我尝试过的所有事情,我都一无所获。每次我认为我想明白了什么,我就遇到了一个新的障碍。我的目标是使用多个进程加载一个队列,然后使用多个进程从队列中提取数据并处理数据。我尝试恢复到基本的队列处理,但是一旦我实现了多个进程,我就无法从队列中获得任何东西。我遗漏了什么?
代码
rom multiprocessing import Process, Lock
from queue import Queue
import os
q = Queue(5)
def get_from_q():
print('trying to get')
print(q.get())
if __name__ == '__main__':
# put items at the end of the queue
for x in range(6):
print('adding ', x)
q.put(x)
PROCESSOR_COUNT = os.cpu_count()
processes = []
for p in range(PROCESSOR_COUNT):
print('spawning process')
p = Process(target=get_from_q)
processes.append(p)
for p in processes:
print('starting')
p.start()
for p in processes:
print('joining')
p.join()结果:
adding 0
adding 1
adding 2
adding 3
adding 4
adding 5预期结果
adding 0
adding 1
adding 2
adding 3
adding 4
adding 5
spawning process
spawning process
spawning process
spawning processv
starting
starting
starting
starting
trying to get
0
trying to get
1
trying to get
2
trying to get
3
trying to get
4
trying to get
5
joining
joining
joining
joining发布于 2021-04-16 01:16:32
如果在使用spawn创建新进程的平台下运行,则在创建新进程时,不是继承主进程的地址空间,而是通过从程序顶部重新执行所有代码来初始化新地址空间。这意味着你在全局作用域中定义的任何东西都会被重新执行,例如在你的代码中:
q = Queue(5)这意味着您创建的每个进程都会执行此代码,这意味着每个进程都有自己的q副本。这是不可行的。您需要创建一次q并将其作为参数传递。我还向print函数添加了flush=True,以减少来自不同进程的输出交错的可能性。
from multiprocessing import Process, Lock, Queue
import os
def get_from_q(q):
print('trying to get', q.get(), flush=True)
if __name__ == '__main__':
PROCESSOR_COUNT = os.cpu_count()
q = Queue(PROCESSOR_COUNT) # or put no size limitation on this
# put items at the end of the queue
for x in range(PROCESSOR_COUNT):
print('adding ', x)
q.put(x)
processes = []
for p in range(PROCESSOR_COUNT):
print('spawning process')
p = Process(target=get_from_q, args=(q,))
processes.append(p)
for p in processes:
print('starting', flush=True)
p.start()
for p in processes:
print('joining', flush=True)
p.join()打印:
adding 0
adding 1
adding 2
adding 3
adding 4
adding 5
adding 6
adding 7
spawning process
spawning process
spawning process
spawning process
spawning process
spawning process
spawning process
spawning process
starting
starting
starting
starting
starting
starting
starting
starting
joining
trying to get 0
trying to get 1
trying to get 2
trying to get 3
trying to get 4
trying to get 5
trying to get 6
joining
joining
joining
trying to get 7
joining
joining
joining
joining使用进程池的
在这里,队列被池实现隐藏:
from multiprocessing import Pool, cpu_count
def worker(x):
print('x =', x, flush=True)
return x ** 2
if __name__ == '__main__':
PROCESSOR_COUNT = cpu_count()
pool = Pool(PROCESSOR_COUNT) #
print(pool.map(worker, range(PROCESSOR_COUNT)))打印:
x = 0
x = 1
x = 2
x = 3
x = 4
x = 5
x = 6
x = 7
[0, 1, 4, 9, 16, 25, 36, 49]https://stackoverflow.com/questions/67102004
复制相似问题