首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >工作人员刷新的python多处理池通知

工作人员刷新的python多处理池通知
EN

Stack Overflow用户
提问于 2020-01-15 18:37:18
回答 2查看 1K关注 0票数 1

我使用Python2.7的multiprocessing.Pool来管理3名工人。每个工作人员都相当复杂,在一些第三方代码中存在资源泄漏(大概),在连续运行6-8小时后会导致问题。因此,我希望使用maxtasksperchild定期刷新员工。

我还希望每个工作人员都将自己的日志文件写入自己的日志文件中。如果没有maxtasksperchild,我将使用共享multiprocessing.Value为每个工作人员分配一个整数(0、1或2),然后使用该整数来命名日志文件。

使用maxtasksperchild,我想在完成工作后重用日志文件。所以,如果整个过程持续一个月,我只想要三个日志文件,而不是每个生成的工人一个日志文件。

如果我能够传递一个回调(例如,与当前支持的finalizer一起使用的initializer ),这将是非常简单的。没有它,我就看不出一种健壮而简单的方法来实现它。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-01-15 22:07:19

这是AFAIK没有文档的,但是multiprocessing有一个Finalizer类,“它支持使用弱引用完成对象”。您可以使用它在initializer中注册终结器。

不过,在这种情况下,我不认为multiprocessing.Value是一个有用的同步选择。多个工作人员可以同时退出,指示哪个文件-整数是免费的,这是一个(锁定)计数器所不能提供的。

我建议使用多个裸multiprocessing.Lock,每个文件一个,而不是:

代码语言:javascript
复制
from multiprocessing import Pool, Lock, current_process
from multiprocessing.util import Finalize


def f(n):
    global fileno
    for _ in range(int(n)):  # xrange for Python 2
        pass
    return fileno


def init_fileno(file_locks):
    for i, lock in enumerate(file_locks):
        if lock.acquire(False):  # non-blocking attempt
            globals()['fileno'] = i
            print("{} using fileno: {}".format(current_process().name, i))
            Finalize(lock, lock.release, exitpriority=15)
            break


if __name__ == '__main__':

    n_proc = 3
    file_locks = [Lock() for _ in range(n_proc)]

    pool = Pool(
        n_proc, initializer=init_fileno, initargs=(file_locks,),
        maxtasksperchild=2
    )

    print(pool.map(func=f, iterable=[50e6] * 18))
    pool.close()
    pool.join()
    # all locks should be available if all finalizers did run
    assert all(lock.acquire(False) for lock in file_locks)

输出:

代码语言:javascript
复制
ForkPoolWorker-1 using fileno: 0
ForkPoolWorker-2 using fileno: 1
ForkPoolWorker-3 using fileno: 2
ForkPoolWorker-4 using fileno: 0
ForkPoolWorker-5 using fileno: 1
ForkPoolWorker-6 using fileno: 2
[0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2]

Process finished with exit code 0

请注意,使用Python 3时,不能可靠地使用Pool的上下文管理器,而不能使用上面所示的旧方法。池的上下文管理器(不幸地)调用terminate(),这可能会在终结器有机会运行之前杀死工人进程。

票数 1
EN

Stack Overflow用户

发布于 2020-01-16 18:14:14

最后我做了以下几件事。它假设PID不会很快被回收(对于我来说,Ubuntu是正确的,但在Unix上一般不是这样)。我不认为它有任何其他的假设,但我真的对Ubuntu很感兴趣,所以我没有仔细地研究其他平台,比如Windows。

代码使用一个数组来跟踪哪个PID声明了哪个索引。然后,当一个新工作人员启动时,它将查看是否不再使用任何PID。如果它找到了一个,它假设这是因为工人已经完成了它的工作(或者因为另一个原因而被终止)。如果它找不到,那我们就倒霉了!因此,这并不完美,但我认为这比我迄今所见或考虑过的任何事情都简单。

代码语言:javascript
复制
def run_pool():
    child_pids = Array('i', 3)
    pool = Pool(3, initializser=init_worker, initargs=(child_pids,), maxtasksperchild=1000)

def init_worker(child_pids):
    with child_pids.get_lock():
        available_index = None
        for index, pid in enumerate(child_pids):
            # PID 0 means unallocated (this happens when our pool is started), we reclaim PIDs
            # which are no longer in use. We also reclaim the lucky case where a PID was recycled
            # but assigned to one of our workers again, so we know we can take it over
            if not pid or not _is_pid_in_use(pid) or pid == os.getpid():
                available_index = index
                break

        if available_index is not None:
            child_pids[available_index] = os.getpid()
        else:
            # This is unexpected - it means all of the PIDs are in use so we have a logical error
            # or a PID was recycled before we could notice and reclaim its index
            pass

def _is_pid_in_use(pid):
    try:
        os.kill(pid, 0)
        return True
    except OSError:
        return False
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59757697

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档