我正在创建子进程作为通过input和output队列获取和返回数据的worker对象。然而,当代码运行时,似乎每个子进程都获得了input队列的完整副本,而主线程没有通过output队列获得任何东西,这导致了死锁。为什么会这样呢?
import threading, queue, multiprocessing
class worker(multiprocessing.Process):
def __init__(self,inp,out):
super().__init__()
self.input=inp
self.output=out
#real program has external subprocess initialization here
def run(self):
name=self.name
while True:
inp=self.input.get()
if (inp is 'stop'):
break
print('Process {} got task word: {}'.format(name,inp))
print('queue size is: {}'.format(self.input.qsize()))
self.output.put(inp.count('a'))
print('Process {} input processed'.format(name))
print('exiting {}'.format(name))
if __name__ == "__main__":
inp=queue.Queue()
out=queue.Queue(maxsize=4)
strings = ['asd', 'assa','aaa','as','aa','aaaq']
for x in strings:
inp.put(x)
print(inp.qsize())
workers=[worker(inp,out) for x in range(2)]
for w in workers:
w.start()
res=[]
for x in strings:
res.append(out.get())
print("intermediate result is {}".format(res))
for _ in workers:
inp.put('stop')
for w in workers:
w.join()
print(res)在实际问题中,每个工作者都会将自己的接口初始化到另一个外部程序。我希望避免对每个项目重复初始化,因此不能简单地将worker转换为函数并将其与map一起使用。
如果将multiprocessing.Process替换为具有相同接口的threading.Thread,则程序可以正常工作,但在单核上运行,这对于任务是不可接受的。
发布于 2019-03-24 02:39:04
不同的进程不共享进程内存,因此您不能在不同的进程之间共享队列。每个进程都有自己的副本。
Python multiple process share the same object or not?
您必须使用IPC机制来同步多个进程。
然而,线程共享进程内存,因此它将为线程工作,但不为进程工作。
多进程解决方案
使用多处理队列
import threading, multiprocessing
class worker(multiprocessing.Process):
def __init__(self,inp,out):
super().__init__()
self.input=inp
self.output=out
#real program has external subprocess initialization here
def run(self):
name=self.name
while True:
inp=self.input.get()
if (inp == 'stop'):
break
print('Process {} got task word: {}'.format(name,inp))
#print('queue size is: {}'.format(self.input.qsize()))
self.output.put((inp, inp.count('a')))
print('Process {} input processed'.format(name))
print('exiting {}'.format(name))
if __name__ == "__main__":
inp=multiprocessing.Queue()
out=multiprocessing.Queue()
strings = ['asd', 'assa','aaa','as','aa','aaaq']
for x in strings:
inp.put(x)
workers=[worker(inp,out) for x in range(2)]
for w in workers:
w.start()
for _ in workers:
inp.put('stop')
#print (inp.qsize())
inp.close()
inp.join_thread()
for w in workers:
w.join()
print (out.empty())
while not out.empty():
print (out.get())https://stackoverflow.com/questions/55316923
复制相似问题