好吧,因为目前还没有答案,我不觉得这么做太糟了。虽然我仍然对幕后究竟发生了什么导致这个问题感兴趣,但我最迫切的问题是更新2中的问题。
JoinableQueue和Manager().Queue()之间有什么区别(什么时候应该使用一种而另一种?)更重要的是,在这个例子中,用一个代替另一个安全吗?
在下面的代码中,我有一个简单的进程池。向每个进程传递进程队列(pq)以从其中提取要处理的数据,并传递返回值队列(rq),将处理的返回值传递回主线程。如果我不追加返回值队列,它就能工作,但一旦我工作了,由于某种原因,进程就会被阻止停止。在这两种情况下,进程run方法都会返回,所以返回队列阻塞时不是put,而是在第二种情况下进程本身不会终止,所以当我在进程上join时,程序会死锁。为什么会这样?
更新:
至少在我的机器上,我可以在队列中有多达6570个条目,它实际上可以工作,但是比这个和它的死锁更多。
Manager().Queue()**.** 一起工作无论是JoinableQueue的限制,还是我误解了这两个对象之间的区别,我都发现,如果用Manager().Queue()替换返回队列,它就会像预期的那样工作。它们之间有什么区别,你什么时候应该使用其中一个而另一个呢?
rq消费,错误不会发生哎哟。这里有一个回答,当我评论的时候,它消失了。不管怎么说,它说的一件事是质疑是否,如果我添加了一个消费者,这个错误仍然发生。我试过了,答案是,不,没有。
它提到的另一件事是引用多处理文档的话作为解决问题的一个可能的关键。提到JoinableQueue的,它说:
..。用于计算未完成任务数量的信号量最终可能溢出,引发异常。
import multiprocessing
class _ProcSTOP:
pass
class Proc(multiprocessing.Process):
def __init__(self, pq, rq):
self._pq = pq
self._rq = rq
super().__init__()
print('++', self.name)
def run(self):
dat = self._pq.get()
while not dat is _ProcSTOP:
# self._rq.put(dat) # uncomment me for deadlock
self._pq.task_done()
dat = self._pq.get()
self._pq.task_done()
print('==', self.name)
def __del__(self):
print('--', self.name)
if __name__ == '__main__':
pq = multiprocessing.JoinableQueue()
rq = multiprocessing.JoinableQueue()
pool = []
for i in range(4):
p = Proc(pq, rq)
p.start()
pool.append(p)
for i in range(10000):
pq.put(i)
pq.join()
for i in range(4):
pq.put(_ProcSTOP)
pq.join()
while len(pool) > 0:
print('??', pool)
pool.pop().join() # hangs here (if using rq)
print('** complete')示例输出,不使用返回队列:
++ Proc-1
++ Proc-2
++ Proc-3
++ Proc-4
== Proc-4
== Proc-3
== Proc-1
?? [<Proc(Proc-1, started)>, <Proc(Proc-2, started)>, <Proc(Proc-3, started)>, <Proc(Proc-4, started)>]
== Proc-2
?? [<Proc(Proc-1, stopped)>, <Proc(Proc-2, started)>, <Proc(Proc-3, stopped)>]
-- Proc-3
?? [<Proc(Proc-1, stopped)>, <Proc(Proc-2, started)>]
-- Proc-2
?? [<Proc(Proc-1, stopped)>]
-- Proc-1
** complete
-- Proc-4示例输出,使用返回队列:
++ Proc-1
++ Proc-2
++ Proc-3
++ Proc-4
== Proc-2
== Proc-4
== Proc-1
?? [<Proc(Proc-1, started)>, <Proc(Proc-2, started)>, <Proc(Proc-3, started)>, <Proc(Proc-4, started)>]
== Proc-3
# here it hangs发布于 2011-11-06 18:10:22
来自文档
警告 如前所述,如果子进程已将项放入队列(且未使用JoinableQueue.cancel_join_thread()),则在所有缓冲项都已刷新到管道之前,该进程不会终止。 这意味着,如果尝试加入该进程,则可能会出现死锁,除非您确信队列中的所有项都已被消耗。类似地,如果子进程是非守护进程,则父进程在尝试连接其所有非守护进程子进程时可能挂起退出。 请注意,使用管理器创建的队列没有此问题。见程序设计指导方针。
因此,JoinableQueue()使用一个管道,直到它能够刷新所有数据才能关闭。
另一方面,Manager.Queue()对象使用完全不同的方法。管理人员正在运行一个单独的进程,该进程立即接收所有数据(并将其存储在内存中)。
管理人员提供了一种创建数据的方法,这些数据可以在不同的进程之间共享。管理器对象控制管理共享对象的服务器进程。其他进程可以使用代理访问共享对象。 ..。 Queue(maxsize)创建一个共享的Queue.Queue对象并返回它的代理。
https://stackoverflow.com/questions/8026050
复制相似问题