我有一些代码,我试图在一个Pool中创建4个进程。
一旦我得到任何异常(例如,它试图连接到的数据库关闭),我想杀死池,休眠10秒,然后创建一个包含4个进程的新池。
但是,该池似乎永远不会被终止,因为进程名称每次都会递增。池中是否有缓存来保存名称计数?
def connect_db()
pass
while True:
p = Pool(4)
for process in multiprocessing.active_children():
print(process.name) #why is the name incremented by 1 each time while loop iterates?
try:
r = p.map(connect_db, ())
except Exception as e:
pool.close()
pool.join()
time.sleep(10)前四个进程是SpawnPoolWorker-1到4,接下来4个是SpawnPoolWorker-5到8。它怎么知道我之前已经创建了4个进程?我每次都会创建一个新的Pool实例,还是我做错了什么?
发布于 2020-06-30 13:33:22
您没有看到预期结果的主要原因是下面这行代码:
r = p.map(connect_db, ())您正在使用一个空的iterable调用multiprocess.map,所以connect_db根本不会被调用,并且您没有到达代码的except部分,也没有关闭池等。
下面是一个可以工作的框架,其中包含一组用于调试的print语句。我附上了下面的输出,正如你所看到的,每一轮恰好有四个子进程。
import multiprocessing
import time
import random
def connect_db(i):
print(f"Trying to connect {i}")
time.sleep(random.random() * 2)
raise Exception("Failed to connect")
while True:
p = multiprocessing.Pool(4)
print("active children are:")
for idx, process in enumerate(multiprocessing.active_children()):
print(f"Child number {idx} is {process.name}") #why is the name incremented by 1 each time while loop iterates?
try:
print("About to create a pool")
r = p.map(connect_db, range(4))
print("Created a pool")
except Exception as e:
print(e)
print("terminating threads")
p.terminate()
p.close()
p.join()
time.sleep(5)输出:
active children are:
Child number 0 is ForkPoolWorker-2
Child number 1 is ForkPoolWorker-1
Child number 2 is ForkPoolWorker-4
Child number 3 is ForkPoolWorker-3
About to create a pool
Trying to connect 0
Trying to connect 1
Trying to connect 2
Trying to connect 3
Failed to connect
terminating threads
active children are:
Child number 0 is ForkPoolWorker-5
Child number 1 is ForkPoolWorker-6
Child number 2 is ForkPoolWorker-8
Child number 3 is ForkPoolWorker-7
About to create a pool
Trying to connect 0
Trying to connect 1
...最后要注意的是,如果用例确实是数据库连接,则有现成的连接池,您可能应该使用其中的一个。另外,我不确定是否可以跨进程共享数据库连接。
控制池中的进程名称
如果出于某种原因,您希望控制池中的进程名称,您可以通过创建自己的池上下文来实现:
import multiprocessing
from multiprocessing import context
import time
import random
process_counter = 0
class MyForkProcess(multiprocessing.context.ForkProcess):
def __init__(self, *args, **kwargs):
global process_counter
name = f"MyForkProcess-{process_counter}"
process_counter += 1
super(MyForkProcess, self).__init__(*args, name = name, **kwargs)
class MyContext(multiprocessing.context.ForkContext):
_name = 'MyForkContext'
Process = MyForkProcess
def connect_db(i):
print(f"Trying to connect {i}")
cp = multiprocessing.current_process()
print(f"The name of the child process is {cp.name}")
time.sleep(random.random() * 2)
raise Exception("Failed to connect")
context = MyContext()
while True:
p = context.Pool(4)
print("active children are:")
for idx, process in enumerate(multiprocessing.active_children()):
print(f"Child number {idx} is {process.name}") #why is the name incremented by 1 each time while loop iterates?
try:
print("About to create a pool")
r = p.map(connect_db, range(4))
print("Created a pool")
except Exception as e:
print(e)
print("terminating threads")
p.terminate()
process_counter = 0
p.close()
p.join()
time.sleep(5)现在的输出是:
active children are:
Child number 0 is MyForkPoolWorker-2
Child number 1 is MyForkPoolWorker-0
Child number 2 is MyForkPoolWorker-3
Child number 3 is MyForkPoolWorker-1
About to create a pool
Trying to connect 0
The name of the child process is MyForkPoolWorker-0
Trying to connect 1
The name of the child process is MyForkPoolWorker-1
Trying to connect 2
The name of the child process is MyForkPoolWorker-2
Trying to connect 3
The name of the child process is MyForkPoolWorker-3
Failed to connect
terminating threads
active children are:
Child number 0 is MyForkPoolWorker-2
Child number 1 is MyForkPoolWorker-0
Child number 2 is MyForkPoolWorker-1
Child number 3 is MyForkPoolWorker-3
About to create a pool
...https://stackoverflow.com/questions/62648217
复制相似问题