首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Python MultiProcessing

Python MultiProcessing
EN

Stack Overflow用户
提问于 2016-06-03 10:55:35
回答 2查看 1.2K关注 0票数 0

对于RabbitMQ使用者,我使用。在应用程序启动时,我创建了4个WorkerProcesses。

代码语言:javascript
复制
def start_workers(num=4):
    for i in xrange(num):
        process = WorkerProcess()
        process.start()

下面你可以找到我的WorkerClass。到目前为止,逻辑还在工作,我创建了4个并行的消费者过程。但问题是在一个过程被杀之后。我想要创造一个新的过程。下面的逻辑中的问题是,新进程是从旧进程创建的子进程,过了一段时间,内存就用完了。process是否有可能启动一个新进程并正确地杀死旧进程?

代码语言:javascript
复制
class WorkerProcess(multiprocessing.Process):

def ___init__(self):
    app.logger.info('%s: Starting new Thread!', self.name)
    super(multiprocessing.Process, self).__init__()

def shutdown(self):
    process = WorkerProcess()
    process.start()
    return True

def kill(self):
    start_workers(1)
    self.terminate()

def run(self):
    try:
        # Connect to RabbitMQ
        credentials = pika.PlainCredentials(app.config.get('RABBIT_USER'), app.config.get('RABBIT_PASS'))
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=app.config.get('RABBITMQ_SERVER'), port=5672, credentials=credentials))
        channel = connection.channel()

        # Declare the Queue
        channel.queue_declare(queue='screenshotlayer',
                              auto_delete=False,
                              durable=True)

        app.logger.info('%s: Start to consume from RabbitMQ.', self.name)
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(callback, queue='screenshotlayer')
        channel.start_consuming()
        app.logger.info('%s: Thread is going to sleep!', self.name)

        # do what channel.start_consuming() does but with stoppping signal
        #while self.stop_working.is_set():
        #    channel.transport.connection.process_data_events()

        channel.stop_consuming()
        connection.close()
    except Exception as e:
               self.shutdown()
    return 0

谢谢

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-06-03 11:57:31

在主进程中,跟踪您的子进程(在list中)并使用.join(timeout=50) (https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process.join)循环它们。

然后检查他是否还活着(alive)。

如果他不是,用一个新的代替他。

代码语言:javascript
复制
def start_workers(n):
    wks = []
    for _ in range(n):
        wks.append(WorkerProcess())
        wks[-1].start()
    while True:
        #Remove all terminated process
        wks = [p for p in wks if p.is_alive()]

        #Start new process
        for i in range(n-len(wks)):
            wks.append(WorkerProcess())
            wks[-1].start()
票数 1
EN

Stack Overflow用户

发布于 2016-06-03 12:10:29

我不会亲自处理流程池管理。相反,我将使用来自concurrent.future模块的concurrent.future

不需要继承WorkerProcess来继承Process类。只需在类中编写实际代码,然后将其提交给进程池执行器。执行器将有一个进程池随时准备执行您的任务。

这样你就可以让事情变得简单,少让你头疼。

你可以在我的博客文章中读到更多关于http://masnun.com/2016/03/29/python-a-quick-introduction-to-the-concurrent-futures-module.html的信息。

示例代码:

代码语言:javascript
复制
from concurrent.futures import ProcessPoolExecutor
from time import sleep

def return_after_5_secs(message):
    sleep(5)
    return message

pool = ProcessPoolExecutor(3)

future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(5)
print(future.done())
print("Result: " + future.result())
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37612535

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档