首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >多处理程序不会终止

多处理程序不会终止
EN

Stack Overflow用户
提问于 2022-07-20 06:59:39
回答 1查看 49关注 0票数 1

我有一个使用multiprocessing的程序。

以下是相关的核心:

代码语言:javascript
复制
"""Multiprocessing worker."""

from argparse import Namespace
from datetime import datetime
from logging import INFO, Logger, getLogger
from multiprocessing import Process, Queue
from queue import Empty
from signal import SIGUSR1, SIGUSR2, signal
from typing import Any, Iterator, Sequence, Type

from setproctitle import setproctitle

from homeinfotools.exceptions import SSHConnectionError
from homeinfotools.logging import syslogger


__all__ = ['BaseWorker', 'multiprocess']


class BaseWorker:
    """Stored args and manager to process systems."""

    __slots__ = ('index', 'systems', 'results', 'running', 'current_system')

    def __init__(self, index: int, systems: Queue, results: Queue):
        """Sets the command line arguments."""
        self.index = index
        self.systems = systems
        self.results = results
        self.running = True
        self.current_system = None

    def __call__(self, args: Namespace) -> None:
        """Runs the worker on the given system."""
        setproctitle(self.name)
        signal(SIGUSR1, self.signal)
        signal(SIGUSR2, self.signal)

        while self.running:
            try:
                self.current_system = system = self.systems.get(timeout=1)
            except Empty:
                self.logger.info('Finished')
                return

            result = self.process_system(system, args)
            self.results.put_nowait((system, result))

        self.logger.info('Aborted')

    @property
    def info(self) -> str:
        """Returns information about the state of the worker."""
        if self.current_system is None:
            return 'idle'

        return f'Processing system #{self.current_system}'

    @property
    def logger(self) -> Logger:
        """Returns the worker's logger."""
        logger = getLogger(self.name)
        logger.setLevel(INFO)
        return logger

    @property
    def name(self) -> str:
        """Returns the worker's name."""
        return f'hidsltools-worker-{self.index}'

    def signal(self, signal_number: int, _: Any) -> None:
        """Handles the given signal."""
        if signal_number == SIGUSR1:
            self.logger.info(self.info)
        elif signal_number == SIGUSR2:
            self.running = False
        else:
            self.logger.error('Received invalid signal: %i', signal_number)

    def process_system(self, system: int, args: Namespace) -> dict:
        """Processes a single system."""
        result = {'start': (start := datetime.now()).isoformat()}

        try:
            result['result'] = self.run(system, args)
        except SSHConnectionError:
            syslogger(system).error('Could not establish SSH connection.')
            result['online'] = False
        else:
            result['online'] = True

        result['end'] = (end := datetime.now()).isoformat()
        result['duration'] = str(end - start)
        return result

    @staticmethod
    def run(system: int, args: Namespace) -> dict:
        """Runs the respective processes."""
        raise NotImplementedError()


def multiprocess(
        worker_cls: Type[BaseWorker],
        systems: list[int],
        processes: int,
        args: Namespace
) -> dict:
    """Spawns workers and waits for them to finish."""

    wait_for_processes(list(spawn_workers(
        worker_cls,
        processes,
        sequence_to_queue(systems),
        results := Queue(),
        args
    )))
    return dict(iter_queue(results))


def sequence_to_queue(sequence: Sequence[Any]) -> Queue:
    """Returns a queue with items from the given sequence."""

    queue = Queue(len(sequence))

    for item in sequence:
        queue.put(item)

    return queue


def spawn_workers(
        worker_cls: Type[BaseWorker],
        amount: int,
        systems: Queue,
        results: Queue,
        args: Namespace
) -> Iterator[Process]:
    """Spawns worker processes."""

    for index in range(amount):
        worker = worker_cls(index, systems, results)
        process = Process(target=worker, args=(args,))
        process.start()
        yield process


def wait_for_processes(processes: list[Process]) -> None:
    """Wait for the given processes."""

    try:
        for process in processes:
            process.join()
    except KeyboardInterrupt:
        for process in processes:
            process.kill()

        raise


def iter_queue(queue: Queue) -> Iterator[Any]:
    """Yield queue items."""

    while not queue.empty():
        yield queue.get()

您可以找到完整的程序这里

现在存在的问题是,即使在所有子进程都完成之后,程序也不会终止,特别是当有许多子进程时:

代码语言:javascript
复制
$ sysrpc -Rp 8 {201..900}
[ERROR] sysrpc.201: Could not establish SSH connection.
[ERROR] sysrpc.222: Could not establish SSH connection.
[ERROR] sysrpc.208: Could not establish SSH connection.
[ERROR] sysrpc.209: Could not establish SSH connection.
[ERROR] sysrpc.210: Could not establish SSH connection.
[ERROR] sysrpc.217: Could not establish SSH connection.
[ERROR] sysrpc.223: Could not establish SSH connection.
[ERROR] sysrpc.224: Could not establish SSH connection.
[ERROR] sysrpc.225: Could not establish SSH connection.
[ERROR] sysrpc.226: Could not establish SSH connection.
[ERROR] sysrpc.228: Could not establish SSH connection.
[ERROR] sysrpc.229: Could not establish SSH connection.
[ERROR] sysrpc.230: Could not establish SSH connection.
[ERROR] sysrpc.231: Could not establish SSH connection.
[ERROR] sysrpc.232: Could not establish SSH connection.
[ERROR] sysrpc.233: Could not establish SSH connection.
[ERROR] sysrpc.234: Could not establish SSH connection.
[ERROR] sysrpc.247: Could not establish SSH connection.
[ERROR] sysrpc.235: Could not establish SSH connection.
[ERROR] sysrpc.256: Could not establish SSH connection.
[ERROR] sysrpc.258: Could not establish SSH connection.
[ERROR] sysrpc.241: Could not establish SSH connection.
[ERROR] sysrpc.246: Could not establish SSH connection.
[ERROR] sysrpc.278: Could not establish SSH connection.
[ERROR] sysrpc.262: Could not establish SSH connection.
[ERROR] sysrpc.287: Could not establish SSH connection.
[ERROR] sysrpc.276: Could not establish SSH connection.
[ERROR] sysrpc.299: Could not establish SSH connection.
[ERROR] sysrpc.301: Could not establish SSH connection.
[ERROR] sysrpc.263: Could not establish SSH connection.
[ERROR] sysrpc.268: Could not establish SSH connection.
[ERROR] sysrpc.315: Could not establish SSH connection.
[ERROR] sysrpc.274: Could not establish SSH connection.
[ERROR] sysrpc.320: Could not establish SSH connection.
[ERROR] sysrpc.318: Could not establish SSH connection.
[ERROR] sysrpc.302: Could not establish SSH connection.
[ERROR] sysrpc.330: Could not establish SSH connection.
[ERROR] sysrpc.311: Could not establish SSH connection.
[ERROR] sysrpc.312: Could not establish SSH connection.
[ERROR] sysrpc.335: Could not establish SSH connection.
[ERROR] sysrpc.294: Could not establish SSH connection.
[ERROR] sysrpc.358: Could not establish SSH connection.
[ERROR] sysrpc.360: Could not establish SSH connection.
[ERROR] sysrpc.362: Could not establish SSH connection.
[ERROR] sysrpc.308: Could not establish SSH connection.
[ERROR] sysrpc.336: Could not establish SSH connection.
[ERROR] sysrpc.380: Could not establish SSH connection.
[ERROR] sysrpc.386: Could not establish SSH connection.
[ERROR] sysrpc.349: Could not establish SSH connection.
[ERROR] sysrpc.357: Could not establish SSH connection.
[ERROR] sysrpc.417: Could not establish SSH connection.
[ERROR] sysrpc.422: Could not establish SSH connection.
[ERROR] sysrpc.435: Could not establish SSH connection.
[ERROR] sysrpc.408: Could not establish SSH connection.
[ERROR] sysrpc.412: Could not establish SSH connection.
[ERROR] sysrpc.420: Could not establish SSH connection.
[ERROR] sysrpc.436: Could not establish SSH connection.
[ERROR] sysrpc.390: Could not establish SSH connection.
[ERROR] sysrpc.439: Could not establish SSH connection.
[ERROR] sysrpc.440: Could not establish SSH connection.
[ERROR] sysrpc.441: Could not establish SSH connection.
[ERROR] sysrpc.443: Could not establish SSH connection.
[ERROR] sysrpc.446: Could not establish SSH connection.
[ERROR] sysrpc.433: Could not establish SSH connection.
[ERROR] sysrpc.448: Could not establish SSH connection.
[ERROR] sysrpc.449: Could not establish SSH connection.
[ERROR] sysrpc.450: Could not establish SSH connection.
[ERROR] sysrpc.451: Could not establish SSH connection.
[ERROR] sysrpc.452: Could not establish SSH connection.
[ERROR] sysrpc.453: Could not establish SSH connection.
[ERROR] sysrpc.454: Could not establish SSH connection.
[ERROR] sysrpc.455: Could not establish SSH connection.
[ERROR] sysrpc.456: Could not establish SSH connection.
[ERROR] sysrpc.457: Could not establish SSH connection.
[ERROR] sysrpc.464: Could not establish SSH connection.
[ERROR] sysrpc.458: Could not establish SSH connection.
[ERROR] sysrpc.459: Could not establish SSH connection.
[ERROR] sysrpc.460: Could not establish SSH connection.
[ERROR] sysrpc.461: Could not establish SSH connection.
[ERROR] sysrpc.462: Could not establish SSH connection.
[ERROR] sysrpc.463: Could not establish SSH connection.
[ERROR] sysrpc.465: Could not establish SSH connection.
[ERROR] sysrpc.469: Could not establish SSH connection.
[ERROR] sysrpc.471: Could not establish SSH connection.
[ERROR] sysrpc.473: Could not establish SSH connection.
[ERROR] sysrpc.474: Could not establish SSH connection.
[ERROR] sysrpc.479: Could not establish SSH connection.
[ERROR] sysrpc.475: Could not establish SSH connection.
[ERROR] sysrpc.476: Could not establish SSH connection.
[ERROR] sysrpc.477: Could not establish SSH connection.
[ERROR] sysrpc.480: Could not establish SSH connection.
[ERROR] sysrpc.481: Could not establish SSH connection.
[ERROR] sysrpc.482: Could not establish SSH connection.
[ERROR] sysrpc.483: Could not establish SSH connection.
[ERROR] sysrpc.484: Could not establish SSH connection.
[ERROR] sysrpc.489: Could not establish SSH connection.
[ERROR] sysrpc.491: Could not establish SSH connection.
[ERROR] sysrpc.492: Could not establish SSH connection.
[ERROR] sysrpc.497: Could not establish SSH connection.
[ERROR] sysrpc.528: Could not establish SSH connection.
[ERROR] sysrpc.498: Could not establish SSH connection.
[ERROR] sysrpc.532: Could not establish SSH connection.
[ERROR] sysrpc.533: Could not establish SSH connection.
[ERROR] sysrpc.505: Could not establish SSH connection.
[ERROR] sysrpc.541: Could not establish SSH connection.
[ERROR] sysrpc.544: Could not establish SSH connection.
[ERROR] sysrpc.546: Could not establish SSH connection.
[ERROR] sysrpc.511: Could not establish SSH connection.
[ERROR] sysrpc.521: Could not establish SSH connection.
[ERROR] sysrpc.569: Could not establish SSH connection.
[ERROR] sysrpc.573: Could not establish SSH connection.
[ERROR] sysrpc.579: Could not establish SSH connection.
[ERROR] sysrpc.586: Could not establish SSH connection.
[ERROR] sysrpc.543: Could not establish SSH connection.
[ERROR] sysrpc.559: Could not establish SSH connection.
[ERROR] sysrpc.563: Could not establish SSH connection.
[ERROR] sysrpc.572: Could not establish SSH connection.
[ERROR] sysrpc.602: Could not establish SSH connection.
[ERROR] sysrpc.606: Could not establish SSH connection.
[ERROR] sysrpc.581: Could not establish SSH connection.
[ERROR] sysrpc.614: Could not establish SSH connection.
[ERROR] sysrpc.584: Could not establish SSH connection.
[ERROR] sysrpc.616: Could not establish SSH connection.
[ERROR] sysrpc.618: Could not establish SSH connection.
[ERROR] sysrpc.588: Could not establish SSH connection.
[ERROR] sysrpc.632: Could not establish SSH connection.
[ERROR] sysrpc.642: Could not establish SSH connection.
[ERROR] sysrpc.646: Could not establish SSH connection.
[ERROR] sysrpc.626: Could not establish SSH connection.
[ERROR] sysrpc.627: Could not establish SSH connection.
[ERROR] sysrpc.657: Could not establish SSH connection.
[ERROR] sysrpc.647: Could not establish SSH connection.
[ERROR] sysrpc.650: Could not establish SSH connection.
[ERROR] sysrpc.611: Could not establish SSH connection.
[ERROR] sysrpc.623: Could not establish SSH connection.
[ERROR] sysrpc.668: Could not establish SSH connection.
[ERROR] sysrpc.669: Could not establish SSH connection.
[ERROR] sysrpc.670: Could not establish SSH connection.
[ERROR] sysrpc.656: Could not establish SSH connection.
[ERROR] sysrpc.663: Could not establish SSH connection.
[ERROR] sysrpc.687: Could not establish SSH connection.
[ERROR] sysrpc.688: Could not establish SSH connection.
[ERROR] sysrpc.689: Could not establish SSH connection.
[ERROR] sysrpc.690: Could not establish SSH connection.
[ERROR] sysrpc.691: Could not establish SSH connection.
[ERROR] sysrpc.693: Could not establish SSH connection.
[ERROR] sysrpc.702: Could not establish SSH connection.
[ERROR] sysrpc.694: Could not establish SSH connection.
[ERROR] sysrpc.710: Could not establish SSH connection.
[ERROR] sysrpc.715: Could not establish SSH connection.
[ERROR] sysrpc.717: Could not establish SSH connection.
[ERROR] sysrpc.719: Could not establish SSH connection.
[ERROR] sysrpc.696: Could not establish SSH connection.
[ERROR] sysrpc.697: Could not establish SSH connection.
[ERROR] sysrpc.705: Could not establish SSH connection.
[ERROR] sysrpc.742: Could not establish SSH connection.
[ERROR] sysrpc.743: Could not establish SSH connection.
[ERROR] sysrpc.744: Could not establish SSH connection.
[ERROR] sysrpc.747: Could not establish SSH connection.
[ERROR] sysrpc.748: Could not establish SSH connection.
[ERROR] sysrpc.722: Could not establish SSH connection.
[ERROR] sysrpc.695: Could not establish SSH connection.
[ERROR] sysrpc.773: Could not establish SSH connection.
[ERROR] sysrpc.776: Could not establish SSH connection.
[ERROR] sysrpc.783: Could not establish SSH connection.
[ERROR] sysrpc.798: Could not establish SSH connection.
[ERROR] sysrpc.803: Could not establish SSH connection.
[ERROR] sysrpc.812: Could not establish SSH connection.
[ERROR] sysrpc.818: Could not establish SSH connection.
[ERROR] sysrpc.819: Could not establish SSH connection.
[ERROR] sysrpc.822: Could not establish SSH connection.
[ERROR] sysrpc.821: Could not establish SSH connection.
[ERROR] sysrpc.830: Could not establish SSH connection.
[ERROR] sysrpc.831: Could not establish SSH connection.
[ERROR] sysrpc.835: Could not establish SSH connection.
[ERROR] sysrpc.789: Could not establish SSH connection.
[ERROR] sysrpc.790: Could not establish SSH connection.
[ERROR] sysrpc.841: Could not establish SSH connection.
[ERROR] sysrpc.826: Could not establish SSH connection.
[ERROR] sysrpc.842: Could not establish SSH connection.
[ERROR] sysrpc.848: Could not establish SSH connection.
[ERROR] sysrpc.849: Could not establish SSH connection.
[ERROR] sysrpc.850: Could not establish SSH connection.
[ERROR] sysrpc.851: Could not establish SSH connection.
[ERROR] sysrpc.853: Could not establish SSH connection.
[ERROR] sysrpc.854: Could not establish SSH connection.
[ERROR] sysrpc.860: Could not establish SSH connection.
[ERROR] sysrpc.866: Could not establish SSH connection.
[ERROR] sysrpc.867: Could not establish SSH connection.
[ERROR] sysrpc.868: Could not establish SSH connection.
[ERROR] sysrpc.872: Could not establish SSH connection.
[ERROR] sysrpc.856: Could not establish SSH connection.
[ERROR] sysrpc.883: Could not establish SSH connection.
[ERROR] sysrpc.881: Could not establish SSH connection.
[ERROR] sysrpc.877: Could not establish SSH connection.
[ERROR] sysrpc.876: Could not establish SSH connection.
[ERROR] sysrpc.882: Could not establish SSH connection.
[ERROR] sysrpc.887: Could not establish SSH connection.
[ERROR] sysrpc.884: Could not establish SSH connection.
[ERROR] sysrpc.888: Could not establish SSH connection.
[INFO] hidsltools-worker-2: Finished
[ERROR] sysrpc.885: Could not establish SSH connection.
[INFO] hidsltools-worker-0: Finished
[ERROR] sysrpc.896: Could not establish SSH connection.
[ERROR] sysrpc.886: Could not establish SSH connection.
[INFO] hidsltools-worker-5: Finished
[ERROR] sysrpc.897: Could not establish SSH connection.
[INFO] hidsltools-worker-1: Finished
[INFO] hidsltools-worker-4: Finished
[ERROR] sysrpc.898: Could not establish SSH connection.
[ERROR] sysrpc.899: Could not establish SSH connection.
[INFO] hidsltools-worker-7: Finished
[INFO] hidsltools-worker-6: Finished
[ERROR] sysrpc.900: Could not establish SSH connection.
[INFO] hidsltools-worker-3: Finished

正如您所看到的,所有八个工人都完成了,但是程序没有终止。如果我按回车,这也不会终止主进程:

代码语言:javascript
复制
...
[ERROR] sysrpc.900: Could not establish SSH connection.
[INFO] hidsltools-worker-3: Finished

只有当我按Ctrl+C时,主进程才会退出:

代码语言:javascript
复制
[INFO] hidsltools-worker-3: Finished







^CProcess Process-7:
1 ✗ neumann@ThinkCentre ~ $ 

为什么主进程不终止,即使所有子进程都终止了?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-07-20 16:03:57

如果您阅读了multiprocessing.Queue,特别是警告,您将看到:

前面提到的警告,如果子进程已经将项放到队列中(并且它没有使用JoinableQueue.cancel_join_thread),那么在所有缓冲项都被刷新到管道之前,该进程不会终止。 这意味着,如果尝试加入该进程,则可能会出现死锁,除非您确信队列中的所有项都已被消耗。类似地,如果子进程是非守护进程,则父进程在尝试连接其所有非守护进程子进程时可能挂起退出。 请注意,使用管理器创建的队列没有此问题。见程序设计指导方针。

具体来说,您调用的是wait_for_processes,它尝试对子进程进行join,然后调用iter_queue,后者在队列中调用get,子进程放置了项(顺便提一句,文档还清楚地表明,对queue.empty()的调用对于multiprocessing.Queue实例是不可靠的。

显然,您需要反转操作顺序,即主进程在尝试加入子进程之前需要将所有项从队列中取出--这应该不使用queue.empty

一种解决方案是对子进程从该队列获取的输入队列使用multiprocssing.JoinableQueue,并在创建将从该队列获取的子进程之前将要处理的所有项放在此队列上。然后,子进程从该队列中检索到一个项,并将响应放入输出队列,然后在输入队列上调用方法task_done,指示该工作的处理已经完成。然后,主进程只需调用输入队列上的方法join_queue,以确保所有输入任务都已被处理,并将它们的响应放到输出队列中。然后,只需对结果队列中的get_nowait进行调用,直到它引发Empty异常。

这是一般(然而,未经测试的)的想法。当发出信号而子进程没有处理整个输入队列时,您可能需要添加加法代码来处理这种情况:

代码语言:javascript
复制
"""Multiprocessing worker."""

from argparse import Namespace
from datetime import datetime
from logging import INFO, Logger, getLogger
from multiprocessing import Process, Queue, JoinableQueue
from queue import Empty
from signal import SIGUSR1, SIGUSR2, signal
from typing import Any, Iterator, Sequence, Type

from setproctitle import setproctitle

from homeinfotools.exceptions import SSHConnectionError
from homeinfotools.logging import syslogger


__all__ = ['BaseWorker', 'multiprocess']


class BaseWorker:
    """Stored args and manager to process systems."""

    __slots__ = ('index', 'systems', 'results', 'running', 'current_system')

    def __init__(self, index: int, systems: JoinableQueue, results: Queue):
        """Sets the command line arguments."""
        self.index = index
        self.systems = systems
        self.results = results
        self.running = True
        self.current_system = None

    def __call__(self, args: Namespace) -> None:
        """Runs the worker on the given system."""
        setproctitle(self.name)
        signal(SIGUSR1, self.signal)
        signal(SIGUSR2, self.signal)

        # We must process all the input even when self.running is False,
        # else main process's call to join_queue will not succeed
        while True:
            try:
                # Items are already on the queue so use get_nowait()
                self.current_system = self.systems.get_nowait()
            except Empty:
                self.logger.info('Finished')
                return

            if self.running:
                # then process the input for real:
                result = self.process_system(self.current_system, args)
                self.results.put_nowait((self.current_system, result))
            # Show we are done with this item and any result has been put:
            self.systems.task_done()

        if not self.running:
            self.logger.info('Aborted')

    @property
    def info(self) -> str:
        """Returns information about the state of the worker."""
        if self.current_system is None:
            return 'idle'

        return f'Processing system #{self.current_system}'

    @property
    def logger(self) -> Logger:
        """Returns the worker's logger."""
        logger = getLogger(self.name)
        logger.setLevel(INFO)
        return logger

    @property
    def name(self) -> str:
        """Returns the worker's name."""
        return f'hidsltools-worker-{self.index}'

    def signal(self, signal_number: int, _: Any) -> None:
        """Handles the given signal."""
        if signal_number == SIGUSR1:
            self.logger.info(self.info)
        elif signal_number == SIGUSR2:
            self.running = False
        else:
            self.logger.error('Received invalid signal: %i', signal_number)

    def process_system(self, system: int, args: Namespace) -> dict:
        """Processes a single system."""
        result = {'start': (start := datetime.now()).isoformat()}

        try:
            result['result'] = self.run(system, args)
        except SSHConnectionError:
            syslogger(system).error('Could not establish SSH connection.')
            result['online'] = False
        else:
            result['online'] = True

        result['end'] = (end := datetime.now()).isoformat()
        result['duration'] = str(end - start)
        return result

    @staticmethod
    def run(system: int, args: Namespace) -> dict:
        """Runs the respective processes."""
        raise NotImplementedError()


def multiprocess(
        worker_cls: Type[BaseWorker],
        systems: list[int],
        processes: int,
        args: Namespace
) -> dict:
    """Spawns workers and waits for them to finish."""

    # First put all items on the input queue:
    systems_queue = sequence_to_queue(systems)
    # Then start the children that will process the input queue
    process_list = list(spawn_workers(
        worker_cls,
        processes,
        systems_queue,
        results := Queue(),
        args
    ))
    # Wait for all work to be completed:
    systems_queue.join()
    # Now iterate the output queue
    d = dict(iter_queue(results))
    # Finally wait for the processes to complete:
    wait_for_processes(process_list)
    return d


def sequence_to_queue(sequence: Sequence[Any]) -> JoinableQueue:
    """Returns a queue with items from the given sequence."""

    queue = JoinableQueue(len(sequence))

    for item in sequence:
        queue.put(item)

    return queue


def spawn_workers(
        worker_cls: Type[BaseWorker],
        amount: int,
        systems: JoinableQueue,
        results: Queue,
        args: Namespace
) -> Iterator[Process]:
    """Spawns worker processes."""

    for index in range(amount):
        worker = worker_cls(index, systems, results)
        process = Process(target=worker, args=(args,))
        process.start()
        yield process


def wait_for_processes(processes: list[Process]) -> None:
    """Wait for the given processes."""

    try:
        for process in processes:
            process.join()
    except KeyboardInterrupt:
        for process in processes:
            process.kill()

        raise


def iter_queue(queue: Queue) -> Iterator[Any]:
    """Yield queue items."""

    try:
        while True:
            yield queue.get_nowait()
    except Empty:
        pass

Note

在上面的代码中,我选择在创建子进程之前将所有输入项放在输入队列中。这允许子进程在输入队列上执行get_nowait方法,当它返回Empty异常时,子进程知道所有输入都已处理,并且可以返回,从而允许该进程的最终join完成。或者,主进程可以在启动子进程之后将项放到输入队列中。但是在这种情况下,子进程必须在队列上发出阻塞的get调用,并且永远不知道队列中是否以及何时会有任何未来的项。在这种情况下,子进程永远不会终止(除非主进程将N个哨兵项(例如None )放置在队列中,而子进程则在看到其中一个哨兵后终止)。因此,这些子进程将需要是永远不被主进程连接的守护进程。

第二种解决方案是,每个子进程在检测到输入队列中没有更多的项之后,将一个None项放到结果队列中(或者一些不能与实际响应值混淆的值,并且可以充当“此子进程不会写入更多输出”的指示符)。然后,主进程只是在结果队列上重复调用get,直到它检测到N个None项,其中N是在输入队列中处理项的子进程的数量。

顺便说一句,最好没有一个名为system的局部变量,它恰好与标准的Python模块同名。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73047215

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档