我使用Python2.7的multiprocessing.Pool来管理3名工人。每个工作人员都相当复杂,在一些第三方代码中存在资源泄漏(大概),在连续运行6-8小时后会导致问题。因此,我希望使用maxtasksperchild定期刷新员工。
我还希望每个工作人员都将自己的日志文件写入自己的日志文件中。如果没有maxtasksperchild,我将使用共享multiprocessing.Value为每个工作人员分配一个整数(0、1或2),然后使用该整数来命名日志文件。
使用maxtasksperchild,我想在完成工作后重用日志文件。所以,如果整个过程持续一个月,我只想要三个日志文件,而不是每个生成的工人一个日志文件。
如果我能够传递一个回调(例如,与当前支持的finalizer一起使用的initializer ),这将是非常简单的。没有它,我就看不出一种健壮而简单的方法来实现它。
发布于 2020-01-15 22:07:19
这是AFAIK没有文档的,但是multiprocessing有一个Finalizer类,“它支持使用弱引用完成对象”。您可以使用它在initializer中注册终结器。
不过,在这种情况下,我不认为multiprocessing.Value是一个有用的同步选择。多个工作人员可以同时退出,指示哪个文件-整数是免费的,这是一个(锁定)计数器所不能提供的。
我建议使用多个裸multiprocessing.Lock,每个文件一个,而不是:
from multiprocessing import Pool, Lock, current_process
from multiprocessing.util import Finalize
def f(n):
global fileno
for _ in range(int(n)): # xrange for Python 2
pass
return fileno
def init_fileno(file_locks):
for i, lock in enumerate(file_locks):
if lock.acquire(False): # non-blocking attempt
globals()['fileno'] = i
print("{} using fileno: {}".format(current_process().name, i))
Finalize(lock, lock.release, exitpriority=15)
break
if __name__ == '__main__':
n_proc = 3
file_locks = [Lock() for _ in range(n_proc)]
pool = Pool(
n_proc, initializer=init_fileno, initargs=(file_locks,),
maxtasksperchild=2
)
print(pool.map(func=f, iterable=[50e6] * 18))
pool.close()
pool.join()
# all locks should be available if all finalizers did run
assert all(lock.acquire(False) for lock in file_locks)输出:
ForkPoolWorker-1 using fileno: 0
ForkPoolWorker-2 using fileno: 1
ForkPoolWorker-3 using fileno: 2
ForkPoolWorker-4 using fileno: 0
ForkPoolWorker-5 using fileno: 1
ForkPoolWorker-6 using fileno: 2
[0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2]
Process finished with exit code 0请注意,使用Python 3时,不能可靠地使用Pool的上下文管理器,而不能使用上面所示的旧方法。池的上下文管理器(不幸地)调用terminate(),这可能会在终结器有机会运行之前杀死工人进程。
发布于 2020-01-16 18:14:14
最后我做了以下几件事。它假设PID不会很快被回收(对于我来说,Ubuntu是正确的,但在Unix上一般不是这样)。我不认为它有任何其他的假设,但我真的对Ubuntu很感兴趣,所以我没有仔细地研究其他平台,比如Windows。
代码使用一个数组来跟踪哪个PID声明了哪个索引。然后,当一个新工作人员启动时,它将查看是否不再使用任何PID。如果它找到了一个,它假设这是因为工人已经完成了它的工作(或者因为另一个原因而被终止)。如果它找不到,那我们就倒霉了!因此,这并不完美,但我认为这比我迄今所见或考虑过的任何事情都简单。
def run_pool():
child_pids = Array('i', 3)
pool = Pool(3, initializser=init_worker, initargs=(child_pids,), maxtasksperchild=1000)
def init_worker(child_pids):
with child_pids.get_lock():
available_index = None
for index, pid in enumerate(child_pids):
# PID 0 means unallocated (this happens when our pool is started), we reclaim PIDs
# which are no longer in use. We also reclaim the lucky case where a PID was recycled
# but assigned to one of our workers again, so we know we can take it over
if not pid or not _is_pid_in_use(pid) or pid == os.getpid():
available_index = index
break
if available_index is not None:
child_pids[available_index] = os.getpid()
else:
# This is unexpected - it means all of the PIDs are in use so we have a logical error
# or a PID was recycled before we could notice and reclaim its index
pass
def _is_pid_in_use(pid):
try:
os.kill(pid, 0)
return True
except OSError:
return Falsehttps://stackoverflow.com/questions/59757697
复制相似问题