对于RabbitMQ使用者,我使用。在应用程序启动时,我创建了4个WorkerProcesses。
def start_workers(num=4):
for i in xrange(num):
process = WorkerProcess()
process.start()下面你可以找到我的WorkerClass。到目前为止,逻辑还在工作,我创建了4个并行的消费者过程。但问题是在一个过程被杀之后。我想要创造一个新的过程。下面的逻辑中的问题是,新进程是从旧进程创建的子进程,过了一段时间,内存就用完了。process是否有可能启动一个新进程并正确地杀死旧进程?
class WorkerProcess(multiprocessing.Process):
def ___init__(self):
app.logger.info('%s: Starting new Thread!', self.name)
super(multiprocessing.Process, self).__init__()
def shutdown(self):
process = WorkerProcess()
process.start()
return True
def kill(self):
start_workers(1)
self.terminate()
def run(self):
try:
# Connect to RabbitMQ
credentials = pika.PlainCredentials(app.config.get('RABBIT_USER'), app.config.get('RABBIT_PASS'))
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=app.config.get('RABBITMQ_SERVER'), port=5672, credentials=credentials))
channel = connection.channel()
# Declare the Queue
channel.queue_declare(queue='screenshotlayer',
auto_delete=False,
durable=True)
app.logger.info('%s: Start to consume from RabbitMQ.', self.name)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='screenshotlayer')
channel.start_consuming()
app.logger.info('%s: Thread is going to sleep!', self.name)
# do what channel.start_consuming() does but with stoppping signal
#while self.stop_working.is_set():
# channel.transport.connection.process_data_events()
channel.stop_consuming()
connection.close()
except Exception as e:
self.shutdown()
return 0谢谢
发布于 2016-06-03 11:57:31
在主进程中,跟踪您的子进程(在list中)并使用.join(timeout=50) (https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process.join)循环它们。
然后检查他是否还活着(alive)。
如果他不是,用一个新的代替他。
def start_workers(n):
wks = []
for _ in range(n):
wks.append(WorkerProcess())
wks[-1].start()
while True:
#Remove all terminated process
wks = [p for p in wks if p.is_alive()]
#Start new process
for i in range(n-len(wks)):
wks.append(WorkerProcess())
wks[-1].start()发布于 2016-06-03 12:10:29
我不会亲自处理流程池管理。相反,我将使用来自concurrent.future模块的concurrent.future。
不需要继承WorkerProcess来继承Process类。只需在类中编写实际代码,然后将其提交给进程池执行器。执行器将有一个进程池随时准备执行您的任务。
这样你就可以让事情变得简单,少让你头疼。
你可以在我的博客文章中读到更多关于http://masnun.com/2016/03/29/python-a-quick-introduction-to-the-concurrent-futures-module.html的信息。
示例代码:
from concurrent.futures import ProcessPoolExecutor
from time import sleep
def return_after_5_secs(message):
sleep(5)
return message
pool = ProcessPoolExecutor(3)
future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(5)
print(future.done())
print("Result: " + future.result())https://stackoverflow.com/questions/37612535
复制相似问题