我创建了一个带有多进程池的进程池。我有很多任务要处理,但要获得任务的qps并不容易。因此,我希望获得池的活动进程数,以便设置适当的池大小。下面是完整的代码:
import time
from multiprocessing import Pool
def do_work(msg):
# do some work
if __name__ == '__main__':
consumer = KafkaConsumer(
group_id=worker_config.kafka_group_id,
bootstrap_servers=kafka_url,
auto_offset_reset=worker_config.kafka_reset,
enable_auto_commit=True)
consumer.subscribe(topics=worker_config.kafka_topics)
for message in consumer:
logging.info('topic=%s, partition=%d, msg=%s' % (message.topic, message.partition, msg))
pool.apply_async(do_work, (message,))
process_count = number_of_active_process_of_pool
logging.info("number_of_active_process_number is %d", process_count)
pool.close()
pool.join()发布于 2017-10-02 00:22:27
apply_async会给你一个AsyncResult:https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult
您可以使用.ready() on来查看是否已完成。通过这种方式,您可以获得已完成的任务量,进而得到剩余待完成的任务量。只要这个数字超过poolsize,您就可以假设poolsize许多进程正在运行,如果没有,那么剩余的任务数量就是正在运行的进程的数量。
替代方案:
如果不使用apply_async,而是使用队列,例如this one,则可以使用.qsize()获得近似的队列大小
还有一个multiprocessing.active_children,但它只有在这些进程结束时才有效,但是池不起作用;除非您将它排序为.join(),所以在您的情况下它可以工作。
https://stackoverflow.com/questions/46514272
复制相似问题