我有一个multiprocessing.pool.Pool,它由一个不和谐的机器人管理。由于discord.py是异步的,所以我使用pool.starmap_async()、pool.apply_async()和AsyncResult.get()来管理任务。机器人在启动时启动一个池(看起来很奇怪,但由于时间长,它是我所做工作中最有效的方法)。
是否有一种方法可以检查当前在给定时刻池中有多少进程在排队/正在执行?检查函数可以访问Pool本身,但不能访问任何AsyncResult。
我还可以使用其他方法,也就是池中活动/排队进程的#。
发布于 2021-01-30 10:15:00
由于
是异步的,所以我使用
pool.starmap_async()、pool.apply_async()和AsyncResult.get()来管理任务。
除非采取特殊的预防措施,否则这看起来不正确,因为AsyncResult.get()会阻塞事件循环。apply_async等方法的名称中的异步不是异步部署的那种异步。多处理使用同步代码与子进程通信,在后台线程中这样做可以让代码继续处理其他事情。
结合异步和多处理的一种更安全的方法是concurrent.futures模块,它提供内部使用多处理的ProcessPoolExecutor类,其执行器异步通过run_in_executor支持。
,是否有一种方法可以检查当前在给定时刻池中有多少进程在排队/执行?
我认为没有公共API来查询这些信息,但是如果您拥有这个池,您可以很容易地维护所需的统计数据。例如(未经测试):
class Pool:
def __init__(self, nworkers):
self._executor = concurrent.futures.ProcessPoolExecutor(nworkers)
self._nworkers = nworkers
self._submitted = 0
async def submit(self, fn, *args):
self._submitted += 1
loop = asyncio.get_event_loop()
fut = loop.run_in_executor(self._executor, fn, *args)
try:
return await fut
finally:
self._submitted -= 1
def stats(self):
queued = max(0, self._submitted - self._nworkers)
executing = min(self._submitted, self._nworkers)
return queued, executing您可以通过调用submit()来使用它,您可以等待它立即得到结果,或者传递给create_task()以获得一个您以后可以等待的未来,或者gather()以及其他的未来等等。
https://stackoverflow.com/questions/65962126
复制相似问题