我有一个可迭代的Python类,它环绕着一个多处理生成器。在一些用例中,生成的东西只有一个子集是必需的,因此它被封装在孤岛中。
但是,当使用孤岛时,调用会挂起,我猜是因为底层的多处理过程没有意识到事情已经结束了。
一个功能最低的例子如下:
from itertools import islice
import multiprocessing as mp
STOP_MSG = 'STOP!'
def generator(queue, max_val):
for i in range(max_val):
queue.put(i)
queue.put(STOP_MSG)
class GeneratorMPProc:
def __init__(self, max_val):
self.max_val = max_val
def __iter__(self):
queue = mp.Queue()
feeder_process = mp.Process(
target=generator,
args=(
queue,
self.max_val,
))
feeder_process.start()
msg = queue.get()
while msg != STOP_MSG:
yield msg
msg = queue.get()
feeder_process.join()
if __name__ == '__main__':
max_val = 0xFFFFFFFFF
end_val = 10
psm = GeneratorMPProc(max_val)
rsm = [i for i in islice(psm, end_val)]如何解决这个问题,以便即使在使用孤岛或任何子集选择器时,它也会正确终止?
发布于 2021-03-27 12:47:40
您的isllice调用在GeneratorMPProc.iter返回之前不会返回,如果max_val设置为0xFFFFFFFFF (写入队列不是最快的操作,这也会消耗一些资源),这将需要相当长的时间。换句话说,“事情还没有结束”直到您的生成器函数结束,因此您的multiprocess.Process实际上结束并可以加入。
max_val 将设置为20这样的值,您的程序就可以很容易地终止。
if __name__ == '__main__':
#max_val = 0xFFFFFFFFF
max_val = 20
end_val = 10
psm = GeneratorMPProc(max_val)
rsm = [i for i in islice(psm, end_val)]
print(rsm)指纹:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]更新
您可能需要考虑使用额外的参数daemon=True启动“生成器”进程,使其成为守护进程,然后从方法__iter__中完全删除feeder_process.join()。这样,即使您的原始max_val,代码也可以工作。
def __iter__(self):
queue = mp.Queue()
feeder_process = mp.Process(
target=generator,
args=(
queue,
self.max_val,
),
daemon=True
)
feeder_process.start()
msg = queue.get()
while msg != STOP_MSG:
yield msg
msg = queue.get()https://stackoverflow.com/questions/66806433
复制相似问题