首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Python多进程que重复

Python多进程que重复
EN

Stack Overflow用户
提问于 2019-03-24 02:14:40
回答 1查看 808关注 0票数 0

我正在创建子进程作为通过inputoutput队列获取和返回数据的worker对象。然而,当代码运行时,似乎每个子进程都获得了input队列的完整副本,而主线程没有通过output队列获得任何东西,这导致了死锁。为什么会这样呢?

代码语言:javascript
复制
import threading, queue, multiprocessing
class worker(multiprocessing.Process):
     def __init__(self,inp,out):
         super().__init__()
         self.input=inp
         self.output=out
         #real program has external subprocess initialization here
     def run(self):
         name=self.name
         while True:
             inp=self.input.get()
             if (inp is 'stop'):
                 break
             print('Process {} got task word: {}'.format(name,inp))
             print('queue size is: {}'.format(self.input.qsize()))
             self.output.put(inp.count('a'))
             print('Process {} input processed'.format(name))
         print('exiting {}'.format(name))

if __name__ == "__main__":
    inp=queue.Queue()
    out=queue.Queue(maxsize=4)
    strings = ['asd', 'assa','aaa','as','aa','aaaq']
    for x in strings:
        inp.put(x)
    print(inp.qsize())
    workers=[worker(inp,out) for x in range(2)]
    for w in workers:
        w.start()
    res=[]
    for x in strings:
        res.append(out.get())
        print("intermediate result is {}".format(res))
    for _ in workers:
        inp.put('stop')
    for w in workers:
        w.join()

    print(res)

在实际问题中,每个工作者都会将自己的接口初始化到另一个外部程序。我希望避免对每个项目重复初始化,因此不能简单地将worker转换为函数并将其与map一起使用。

如果将multiprocessing.Process替换为具有相同接口的threading.Thread,则程序可以正常工作,但在单核上运行,这对于任务是不可接受的。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-03-24 02:39:04

不同的进程不共享进程内存,因此您不能在不同的进程之间共享队列。每个进程都有自己的副本。

Python multiple process share the same object or not?

您必须使用IPC机制来同步多个进程。

然而,线程共享进程内存,因此它将为线程工作,但不为进程工作。

多进程解决方案

使用多处理队列

代码语言:javascript
复制
import threading, multiprocessing
class worker(multiprocessing.Process):
     def __init__(self,inp,out):
         super().__init__()
         self.input=inp
         self.output=out
         #real program has external subprocess initialization here
     def run(self):
         name=self.name
         while True:
             inp=self.input.get()
             if (inp == 'stop'):
                 break
             print('Process {} got task word: {}'.format(name,inp))
             #print('queue size is: {}'.format(self.input.qsize()))
             self.output.put((inp, inp.count('a')))
             print('Process {} input processed'.format(name))
         print('exiting {}'.format(name))

if __name__ == "__main__":
    inp=multiprocessing.Queue()
    out=multiprocessing.Queue()
    strings = ['asd', 'assa','aaa','as','aa','aaaq']
    for x in strings:
        inp.put(x)

    workers=[worker(inp,out) for x in range(2)]
    for w in workers:
        w.start()

    for _ in workers:
        inp.put('stop')
    #print (inp.qsize())


    inp.close()
    inp.join_thread()

    for w in workers:
        w.join()

    print (out.empty())
    while not out.empty():
        print (out.get())
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55316923

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档