我有一个使用multiprocessing的程序。
以下是相关的核心:
"""Multiprocessing worker."""
from argparse import Namespace
from datetime import datetime
from logging import INFO, Logger, getLogger
from multiprocessing import Process, Queue
from queue import Empty
from signal import SIGUSR1, SIGUSR2, signal
from typing import Any, Iterator, Sequence, Type
from setproctitle import setproctitle
from homeinfotools.exceptions import SSHConnectionError
from homeinfotools.logging import syslogger
__all__ = ['BaseWorker', 'multiprocess']
class BaseWorker:
"""Stored args and manager to process systems."""
__slots__ = ('index', 'systems', 'results', 'running', 'current_system')
def __init__(self, index: int, systems: Queue, results: Queue):
"""Sets the command line arguments."""
self.index = index
self.systems = systems
self.results = results
self.running = True
self.current_system = None
def __call__(self, args: Namespace) -> None:
"""Runs the worker on the given system."""
setproctitle(self.name)
signal(SIGUSR1, self.signal)
signal(SIGUSR2, self.signal)
while self.running:
try:
self.current_system = system = self.systems.get(timeout=1)
except Empty:
self.logger.info('Finished')
return
result = self.process_system(system, args)
self.results.put_nowait((system, result))
self.logger.info('Aborted')
@property
def info(self) -> str:
"""Returns information about the state of the worker."""
if self.current_system is None:
return 'idle'
return f'Processing system #{self.current_system}'
@property
def logger(self) -> Logger:
"""Returns the worker's logger."""
logger = getLogger(self.name)
logger.setLevel(INFO)
return logger
@property
def name(self) -> str:
"""Returns the worker's name."""
return f'hidsltools-worker-{self.index}'
def signal(self, signal_number: int, _: Any) -> None:
"""Handles the given signal."""
if signal_number == SIGUSR1:
self.logger.info(self.info)
elif signal_number == SIGUSR2:
self.running = False
else:
self.logger.error('Received invalid signal: %i', signal_number)
def process_system(self, system: int, args: Namespace) -> dict:
"""Processes a single system."""
result = {'start': (start := datetime.now()).isoformat()}
try:
result['result'] = self.run(system, args)
except SSHConnectionError:
syslogger(system).error('Could not establish SSH connection.')
result['online'] = False
else:
result['online'] = True
result['end'] = (end := datetime.now()).isoformat()
result['duration'] = str(end - start)
return result
@staticmethod
def run(system: int, args: Namespace) -> dict:
"""Runs the respective processes."""
raise NotImplementedError()
def multiprocess(
worker_cls: Type[BaseWorker],
systems: list[int],
processes: int,
args: Namespace
) -> dict:
"""Spawns workers and waits for them to finish."""
wait_for_processes(list(spawn_workers(
worker_cls,
processes,
sequence_to_queue(systems),
results := Queue(),
args
)))
return dict(iter_queue(results))
def sequence_to_queue(sequence: Sequence[Any]) -> Queue:
"""Returns a queue with items from the given sequence."""
queue = Queue(len(sequence))
for item in sequence:
queue.put(item)
return queue
def spawn_workers(
worker_cls: Type[BaseWorker],
amount: int,
systems: Queue,
results: Queue,
args: Namespace
) -> Iterator[Process]:
"""Spawns worker processes."""
for index in range(amount):
worker = worker_cls(index, systems, results)
process = Process(target=worker, args=(args,))
process.start()
yield process
def wait_for_processes(processes: list[Process]) -> None:
"""Wait for the given processes."""
try:
for process in processes:
process.join()
except KeyboardInterrupt:
for process in processes:
process.kill()
raise
def iter_queue(queue: Queue) -> Iterator[Any]:
"""Yield queue items."""
while not queue.empty():
yield queue.get()您可以找到完整的程序这里。
现在存在的问题是,即使在所有子进程都完成之后,程序也不会终止,特别是当有许多子进程时:
$ sysrpc -Rp 8 {201..900}
[ERROR] sysrpc.201: Could not establish SSH connection.
[ERROR] sysrpc.222: Could not establish SSH connection.
[ERROR] sysrpc.208: Could not establish SSH connection.
[ERROR] sysrpc.209: Could not establish SSH connection.
[ERROR] sysrpc.210: Could not establish SSH connection.
[ERROR] sysrpc.217: Could not establish SSH connection.
[ERROR] sysrpc.223: Could not establish SSH connection.
[ERROR] sysrpc.224: Could not establish SSH connection.
[ERROR] sysrpc.225: Could not establish SSH connection.
[ERROR] sysrpc.226: Could not establish SSH connection.
[ERROR] sysrpc.228: Could not establish SSH connection.
[ERROR] sysrpc.229: Could not establish SSH connection.
[ERROR] sysrpc.230: Could not establish SSH connection.
[ERROR] sysrpc.231: Could not establish SSH connection.
[ERROR] sysrpc.232: Could not establish SSH connection.
[ERROR] sysrpc.233: Could not establish SSH connection.
[ERROR] sysrpc.234: Could not establish SSH connection.
[ERROR] sysrpc.247: Could not establish SSH connection.
[ERROR] sysrpc.235: Could not establish SSH connection.
[ERROR] sysrpc.256: Could not establish SSH connection.
[ERROR] sysrpc.258: Could not establish SSH connection.
[ERROR] sysrpc.241: Could not establish SSH connection.
[ERROR] sysrpc.246: Could not establish SSH connection.
[ERROR] sysrpc.278: Could not establish SSH connection.
[ERROR] sysrpc.262: Could not establish SSH connection.
[ERROR] sysrpc.287: Could not establish SSH connection.
[ERROR] sysrpc.276: Could not establish SSH connection.
[ERROR] sysrpc.299: Could not establish SSH connection.
[ERROR] sysrpc.301: Could not establish SSH connection.
[ERROR] sysrpc.263: Could not establish SSH connection.
[ERROR] sysrpc.268: Could not establish SSH connection.
[ERROR] sysrpc.315: Could not establish SSH connection.
[ERROR] sysrpc.274: Could not establish SSH connection.
[ERROR] sysrpc.320: Could not establish SSH connection.
[ERROR] sysrpc.318: Could not establish SSH connection.
[ERROR] sysrpc.302: Could not establish SSH connection.
[ERROR] sysrpc.330: Could not establish SSH connection.
[ERROR] sysrpc.311: Could not establish SSH connection.
[ERROR] sysrpc.312: Could not establish SSH connection.
[ERROR] sysrpc.335: Could not establish SSH connection.
[ERROR] sysrpc.294: Could not establish SSH connection.
[ERROR] sysrpc.358: Could not establish SSH connection.
[ERROR] sysrpc.360: Could not establish SSH connection.
[ERROR] sysrpc.362: Could not establish SSH connection.
[ERROR] sysrpc.308: Could not establish SSH connection.
[ERROR] sysrpc.336: Could not establish SSH connection.
[ERROR] sysrpc.380: Could not establish SSH connection.
[ERROR] sysrpc.386: Could not establish SSH connection.
[ERROR] sysrpc.349: Could not establish SSH connection.
[ERROR] sysrpc.357: Could not establish SSH connection.
[ERROR] sysrpc.417: Could not establish SSH connection.
[ERROR] sysrpc.422: Could not establish SSH connection.
[ERROR] sysrpc.435: Could not establish SSH connection.
[ERROR] sysrpc.408: Could not establish SSH connection.
[ERROR] sysrpc.412: Could not establish SSH connection.
[ERROR] sysrpc.420: Could not establish SSH connection.
[ERROR] sysrpc.436: Could not establish SSH connection.
[ERROR] sysrpc.390: Could not establish SSH connection.
[ERROR] sysrpc.439: Could not establish SSH connection.
[ERROR] sysrpc.440: Could not establish SSH connection.
[ERROR] sysrpc.441: Could not establish SSH connection.
[ERROR] sysrpc.443: Could not establish SSH connection.
[ERROR] sysrpc.446: Could not establish SSH connection.
[ERROR] sysrpc.433: Could not establish SSH connection.
[ERROR] sysrpc.448: Could not establish SSH connection.
[ERROR] sysrpc.449: Could not establish SSH connection.
[ERROR] sysrpc.450: Could not establish SSH connection.
[ERROR] sysrpc.451: Could not establish SSH connection.
[ERROR] sysrpc.452: Could not establish SSH connection.
[ERROR] sysrpc.453: Could not establish SSH connection.
[ERROR] sysrpc.454: Could not establish SSH connection.
[ERROR] sysrpc.455: Could not establish SSH connection.
[ERROR] sysrpc.456: Could not establish SSH connection.
[ERROR] sysrpc.457: Could not establish SSH connection.
[ERROR] sysrpc.464: Could not establish SSH connection.
[ERROR] sysrpc.458: Could not establish SSH connection.
[ERROR] sysrpc.459: Could not establish SSH connection.
[ERROR] sysrpc.460: Could not establish SSH connection.
[ERROR] sysrpc.461: Could not establish SSH connection.
[ERROR] sysrpc.462: Could not establish SSH connection.
[ERROR] sysrpc.463: Could not establish SSH connection.
[ERROR] sysrpc.465: Could not establish SSH connection.
[ERROR] sysrpc.469: Could not establish SSH connection.
[ERROR] sysrpc.471: Could not establish SSH connection.
[ERROR] sysrpc.473: Could not establish SSH connection.
[ERROR] sysrpc.474: Could not establish SSH connection.
[ERROR] sysrpc.479: Could not establish SSH connection.
[ERROR] sysrpc.475: Could not establish SSH connection.
[ERROR] sysrpc.476: Could not establish SSH connection.
[ERROR] sysrpc.477: Could not establish SSH connection.
[ERROR] sysrpc.480: Could not establish SSH connection.
[ERROR] sysrpc.481: Could not establish SSH connection.
[ERROR] sysrpc.482: Could not establish SSH connection.
[ERROR] sysrpc.483: Could not establish SSH connection.
[ERROR] sysrpc.484: Could not establish SSH connection.
[ERROR] sysrpc.489: Could not establish SSH connection.
[ERROR] sysrpc.491: Could not establish SSH connection.
[ERROR] sysrpc.492: Could not establish SSH connection.
[ERROR] sysrpc.497: Could not establish SSH connection.
[ERROR] sysrpc.528: Could not establish SSH connection.
[ERROR] sysrpc.498: Could not establish SSH connection.
[ERROR] sysrpc.532: Could not establish SSH connection.
[ERROR] sysrpc.533: Could not establish SSH connection.
[ERROR] sysrpc.505: Could not establish SSH connection.
[ERROR] sysrpc.541: Could not establish SSH connection.
[ERROR] sysrpc.544: Could not establish SSH connection.
[ERROR] sysrpc.546: Could not establish SSH connection.
[ERROR] sysrpc.511: Could not establish SSH connection.
[ERROR] sysrpc.521: Could not establish SSH connection.
[ERROR] sysrpc.569: Could not establish SSH connection.
[ERROR] sysrpc.573: Could not establish SSH connection.
[ERROR] sysrpc.579: Could not establish SSH connection.
[ERROR] sysrpc.586: Could not establish SSH connection.
[ERROR] sysrpc.543: Could not establish SSH connection.
[ERROR] sysrpc.559: Could not establish SSH connection.
[ERROR] sysrpc.563: Could not establish SSH connection.
[ERROR] sysrpc.572: Could not establish SSH connection.
[ERROR] sysrpc.602: Could not establish SSH connection.
[ERROR] sysrpc.606: Could not establish SSH connection.
[ERROR] sysrpc.581: Could not establish SSH connection.
[ERROR] sysrpc.614: Could not establish SSH connection.
[ERROR] sysrpc.584: Could not establish SSH connection.
[ERROR] sysrpc.616: Could not establish SSH connection.
[ERROR] sysrpc.618: Could not establish SSH connection.
[ERROR] sysrpc.588: Could not establish SSH connection.
[ERROR] sysrpc.632: Could not establish SSH connection.
[ERROR] sysrpc.642: Could not establish SSH connection.
[ERROR] sysrpc.646: Could not establish SSH connection.
[ERROR] sysrpc.626: Could not establish SSH connection.
[ERROR] sysrpc.627: Could not establish SSH connection.
[ERROR] sysrpc.657: Could not establish SSH connection.
[ERROR] sysrpc.647: Could not establish SSH connection.
[ERROR] sysrpc.650: Could not establish SSH connection.
[ERROR] sysrpc.611: Could not establish SSH connection.
[ERROR] sysrpc.623: Could not establish SSH connection.
[ERROR] sysrpc.668: Could not establish SSH connection.
[ERROR] sysrpc.669: Could not establish SSH connection.
[ERROR] sysrpc.670: Could not establish SSH connection.
[ERROR] sysrpc.656: Could not establish SSH connection.
[ERROR] sysrpc.663: Could not establish SSH connection.
[ERROR] sysrpc.687: Could not establish SSH connection.
[ERROR] sysrpc.688: Could not establish SSH connection.
[ERROR] sysrpc.689: Could not establish SSH connection.
[ERROR] sysrpc.690: Could not establish SSH connection.
[ERROR] sysrpc.691: Could not establish SSH connection.
[ERROR] sysrpc.693: Could not establish SSH connection.
[ERROR] sysrpc.702: Could not establish SSH connection.
[ERROR] sysrpc.694: Could not establish SSH connection.
[ERROR] sysrpc.710: Could not establish SSH connection.
[ERROR] sysrpc.715: Could not establish SSH connection.
[ERROR] sysrpc.717: Could not establish SSH connection.
[ERROR] sysrpc.719: Could not establish SSH connection.
[ERROR] sysrpc.696: Could not establish SSH connection.
[ERROR] sysrpc.697: Could not establish SSH connection.
[ERROR] sysrpc.705: Could not establish SSH connection.
[ERROR] sysrpc.742: Could not establish SSH connection.
[ERROR] sysrpc.743: Could not establish SSH connection.
[ERROR] sysrpc.744: Could not establish SSH connection.
[ERROR] sysrpc.747: Could not establish SSH connection.
[ERROR] sysrpc.748: Could not establish SSH connection.
[ERROR] sysrpc.722: Could not establish SSH connection.
[ERROR] sysrpc.695: Could not establish SSH connection.
[ERROR] sysrpc.773: Could not establish SSH connection.
[ERROR] sysrpc.776: Could not establish SSH connection.
[ERROR] sysrpc.783: Could not establish SSH connection.
[ERROR] sysrpc.798: Could not establish SSH connection.
[ERROR] sysrpc.803: Could not establish SSH connection.
[ERROR] sysrpc.812: Could not establish SSH connection.
[ERROR] sysrpc.818: Could not establish SSH connection.
[ERROR] sysrpc.819: Could not establish SSH connection.
[ERROR] sysrpc.822: Could not establish SSH connection.
[ERROR] sysrpc.821: Could not establish SSH connection.
[ERROR] sysrpc.830: Could not establish SSH connection.
[ERROR] sysrpc.831: Could not establish SSH connection.
[ERROR] sysrpc.835: Could not establish SSH connection.
[ERROR] sysrpc.789: Could not establish SSH connection.
[ERROR] sysrpc.790: Could not establish SSH connection.
[ERROR] sysrpc.841: Could not establish SSH connection.
[ERROR] sysrpc.826: Could not establish SSH connection.
[ERROR] sysrpc.842: Could not establish SSH connection.
[ERROR] sysrpc.848: Could not establish SSH connection.
[ERROR] sysrpc.849: Could not establish SSH connection.
[ERROR] sysrpc.850: Could not establish SSH connection.
[ERROR] sysrpc.851: Could not establish SSH connection.
[ERROR] sysrpc.853: Could not establish SSH connection.
[ERROR] sysrpc.854: Could not establish SSH connection.
[ERROR] sysrpc.860: Could not establish SSH connection.
[ERROR] sysrpc.866: Could not establish SSH connection.
[ERROR] sysrpc.867: Could not establish SSH connection.
[ERROR] sysrpc.868: Could not establish SSH connection.
[ERROR] sysrpc.872: Could not establish SSH connection.
[ERROR] sysrpc.856: Could not establish SSH connection.
[ERROR] sysrpc.883: Could not establish SSH connection.
[ERROR] sysrpc.881: Could not establish SSH connection.
[ERROR] sysrpc.877: Could not establish SSH connection.
[ERROR] sysrpc.876: Could not establish SSH connection.
[ERROR] sysrpc.882: Could not establish SSH connection.
[ERROR] sysrpc.887: Could not establish SSH connection.
[ERROR] sysrpc.884: Could not establish SSH connection.
[ERROR] sysrpc.888: Could not establish SSH connection.
[INFO] hidsltools-worker-2: Finished
[ERROR] sysrpc.885: Could not establish SSH connection.
[INFO] hidsltools-worker-0: Finished
[ERROR] sysrpc.896: Could not establish SSH connection.
[ERROR] sysrpc.886: Could not establish SSH connection.
[INFO] hidsltools-worker-5: Finished
[ERROR] sysrpc.897: Could not establish SSH connection.
[INFO] hidsltools-worker-1: Finished
[INFO] hidsltools-worker-4: Finished
[ERROR] sysrpc.898: Could not establish SSH connection.
[ERROR] sysrpc.899: Could not establish SSH connection.
[INFO] hidsltools-worker-7: Finished
[INFO] hidsltools-worker-6: Finished
[ERROR] sysrpc.900: Could not establish SSH connection.
[INFO] hidsltools-worker-3: Finished正如您所看到的,所有八个工人都完成了,但是程序没有终止。如果我按回车,这也不会终止主进程:
...
[ERROR] sysrpc.900: Could not establish SSH connection.
[INFO] hidsltools-worker-3: Finished只有当我按Ctrl+C时,主进程才会退出:
[INFO] hidsltools-worker-3: Finished
^CProcess Process-7:
1 ✗ neumann@ThinkCentre ~ $ 为什么主进程不终止,即使所有子进程都终止了?
发布于 2022-07-20 16:03:57
如果您阅读了multiprocessing.Queue,特别是警告,您将看到:
前面提到的警告,如果子进程已经将项放到队列中(并且它没有使用
JoinableQueue.cancel_join_thread),那么在所有缓冲项都被刷新到管道之前,该进程不会终止。 这意味着,如果尝试加入该进程,则可能会出现死锁,除非您确信队列中的所有项都已被消耗。类似地,如果子进程是非守护进程,则父进程在尝试连接其所有非守护进程子进程时可能挂起退出。 请注意,使用管理器创建的队列没有此问题。见程序设计指导方针。
具体来说,您调用的是wait_for_processes,它尝试对子进程进行join,然后调用iter_queue,后者在队列中调用get,子进程放置了项(顺便提一句,文档还清楚地表明,对queue.empty()的调用对于multiprocessing.Queue实例是不可靠的。
显然,您需要反转操作顺序,即主进程在尝试加入子进程之前需要将所有项从队列中取出--这应该不使用queue.empty。
一种解决方案是对子进程从该队列获取的输入队列使用multiprocssing.JoinableQueue,并在创建将从该队列获取的子进程之前将要处理的所有项放在此队列上。然后,子进程从该队列中检索到一个项,并将响应放入输出队列,然后在输入队列上调用方法task_done,指示该工作的处理已经完成。然后,主进程只需调用输入队列上的方法join_queue,以确保所有输入任务都已被处理,并将它们的响应放到输出队列中。然后,只需对结果队列中的get_nowait进行调用,直到它引发Empty异常。
这是一般(然而,未经测试的)的想法。当发出信号而子进程没有处理整个输入队列时,您可能需要添加加法代码来处理这种情况:
"""Multiprocessing worker."""
from argparse import Namespace
from datetime import datetime
from logging import INFO, Logger, getLogger
from multiprocessing import Process, Queue, JoinableQueue
from queue import Empty
from signal import SIGUSR1, SIGUSR2, signal
from typing import Any, Iterator, Sequence, Type
from setproctitle import setproctitle
from homeinfotools.exceptions import SSHConnectionError
from homeinfotools.logging import syslogger
__all__ = ['BaseWorker', 'multiprocess']
class BaseWorker:
"""Stored args and manager to process systems."""
__slots__ = ('index', 'systems', 'results', 'running', 'current_system')
def __init__(self, index: int, systems: JoinableQueue, results: Queue):
"""Sets the command line arguments."""
self.index = index
self.systems = systems
self.results = results
self.running = True
self.current_system = None
def __call__(self, args: Namespace) -> None:
"""Runs the worker on the given system."""
setproctitle(self.name)
signal(SIGUSR1, self.signal)
signal(SIGUSR2, self.signal)
# We must process all the input even when self.running is False,
# else main process's call to join_queue will not succeed
while True:
try:
# Items are already on the queue so use get_nowait()
self.current_system = self.systems.get_nowait()
except Empty:
self.logger.info('Finished')
return
if self.running:
# then process the input for real:
result = self.process_system(self.current_system, args)
self.results.put_nowait((self.current_system, result))
# Show we are done with this item and any result has been put:
self.systems.task_done()
if not self.running:
self.logger.info('Aborted')
@property
def info(self) -> str:
"""Returns information about the state of the worker."""
if self.current_system is None:
return 'idle'
return f'Processing system #{self.current_system}'
@property
def logger(self) -> Logger:
"""Returns the worker's logger."""
logger = getLogger(self.name)
logger.setLevel(INFO)
return logger
@property
def name(self) -> str:
"""Returns the worker's name."""
return f'hidsltools-worker-{self.index}'
def signal(self, signal_number: int, _: Any) -> None:
"""Handles the given signal."""
if signal_number == SIGUSR1:
self.logger.info(self.info)
elif signal_number == SIGUSR2:
self.running = False
else:
self.logger.error('Received invalid signal: %i', signal_number)
def process_system(self, system: int, args: Namespace) -> dict:
"""Processes a single system."""
result = {'start': (start := datetime.now()).isoformat()}
try:
result['result'] = self.run(system, args)
except SSHConnectionError:
syslogger(system).error('Could not establish SSH connection.')
result['online'] = False
else:
result['online'] = True
result['end'] = (end := datetime.now()).isoformat()
result['duration'] = str(end - start)
return result
@staticmethod
def run(system: int, args: Namespace) -> dict:
"""Runs the respective processes."""
raise NotImplementedError()
def multiprocess(
worker_cls: Type[BaseWorker],
systems: list[int],
processes: int,
args: Namespace
) -> dict:
"""Spawns workers and waits for them to finish."""
# First put all items on the input queue:
systems_queue = sequence_to_queue(systems)
# Then start the children that will process the input queue
process_list = list(spawn_workers(
worker_cls,
processes,
systems_queue,
results := Queue(),
args
))
# Wait for all work to be completed:
systems_queue.join()
# Now iterate the output queue
d = dict(iter_queue(results))
# Finally wait for the processes to complete:
wait_for_processes(process_list)
return d
def sequence_to_queue(sequence: Sequence[Any]) -> JoinableQueue:
"""Returns a queue with items from the given sequence."""
queue = JoinableQueue(len(sequence))
for item in sequence:
queue.put(item)
return queue
def spawn_workers(
worker_cls: Type[BaseWorker],
amount: int,
systems: JoinableQueue,
results: Queue,
args: Namespace
) -> Iterator[Process]:
"""Spawns worker processes."""
for index in range(amount):
worker = worker_cls(index, systems, results)
process = Process(target=worker, args=(args,))
process.start()
yield process
def wait_for_processes(processes: list[Process]) -> None:
"""Wait for the given processes."""
try:
for process in processes:
process.join()
except KeyboardInterrupt:
for process in processes:
process.kill()
raise
def iter_queue(queue: Queue) -> Iterator[Any]:
"""Yield queue items."""
try:
while True:
yield queue.get_nowait()
except Empty:
passNote
在上面的代码中,我选择在创建子进程之前将所有输入项放在输入队列中。这允许子进程在输入队列上执行get_nowait方法,当它返回Empty异常时,子进程知道所有输入都已处理,并且可以返回,从而允许该进程的最终join完成。或者,主进程可以在启动子进程之后将项放到输入队列中。但是在这种情况下,子进程必须在队列上发出阻塞的get调用,并且永远不知道队列中是否以及何时会有任何未来的项。在这种情况下,子进程永远不会终止(除非主进程将N个哨兵项(例如None )放置在队列中,而子进程则在看到其中一个哨兵后终止)。因此,这些子进程将需要是永远不被主进程连接的守护进程。
第二种解决方案是,每个子进程在检测到输入队列中没有更多的项之后,将一个None项放到结果队列中(或者一些不能与实际响应值混淆的值,并且可以充当“此子进程不会写入更多输出”的指示符)。然后,主进程只是在结果队列上重复调用get,直到它检测到N个None项,其中N是在输入队列中处理项的子进程的数量。
顺便说一句,最好没有一个名为system的局部变量,它恰好与标准的Python模块同名。
https://stackoverflow.com/questions/73047215
复制相似问题