这是我关于stackoverflow的第一个问题。我基本上可以在这里找到我需要知道的东西。顺便说一句,非常感谢你。
然而。如果我试图杀死我的ProcessPoolExecutor,它将只在生成的整个队列中工作(..我想是吗?)。有什么简单的方法可以立即清除Processpoolexecutor的队列吗?
from concurrent.futures import ProcessPoolExecutor
from time import sleep
from random import randint
def something_fancy():
sleep(randint(0, 5))
return 'im back!'
class Work:
def __init__(self):
self.exe = ProcessPoolExecutor(4)
def start_procs(self):
for i in range(300):
t = self.exe.submit(something_fancy)
t.add_done_callback(self.done)
def done(self, f):
print f.result()
def kill(self):
self.exe.shutdown()
if __name__ == '__main__':
work_obj = Work()
work_obj.start_procs()
sleep(5)
work_obj.kill()所以我想要做的是生成一个300的队列,该队列由4个进程完成。5秒后,它应该会退出。
我需要使用进程,因为gil btw。
发布于 2017-12-31 23:18:09
使用shutdown(wait=False),它会返回得更快。wait的默认值是True,否则它还提供一个.Cancel(),如果不能取消,则返回False。
尽管如此,它仍将完成所有处理:
如果
wait为True,则此方法直到所有挂起的未来完成执行并且与executor关联的资源被释放后才会返回。
如果wait为False,则此方法将立即返回,并且当所有挂起的未来完成执行时,与executor关联的资源将被释放。无论wait的值是多少,整个Python程序都不会退出,直到所有挂起的期货执行完毕。
如果您有固定的时间,则应提供超时:
map(func, *iterables, timeout=None, chunksize=1),可以是以秒为单位指定的浮点型或整型
发布于 2018-01-01 00:01:48
谢谢帕特里克
有了提示,我可以通过将Futures添加到列表并手动调整队列大小来取消每个进程。如果没有它,仍然有太多的进程正在启动。
似乎没有api来调整队列大小、暂停执行或删除进程队列。
然而,实现这一点的唯一方法是在线程中运行主对象,这样主脚本就可以随时杀死它。我仍然在努力赶上"CancelledError“。
看起来很“脏”,对我来说不是蟒蛇。我会采纳任何其他的建议。非常感谢。
from concurrent.futures import ProcessPoolExecutor, CancelledError
from time import sleep
from random import randint
from threading import Thread
def something_fancy():
sleep(randint(0, 5))
return 'im back!'
class Work:
def __init__(self):
self.exe = ProcessPoolExecutor(4)
self.futures = []
self.max_queue = 50
self.killed = False
def start_procs(self):
for i in range(200000):
while not self.killed:
if len(self.futures) <= self.max_queue:
t = self.exe.submit(something_fancy)
t.add_done_callback(self.done)
self.futures.append(t)
break
def done(self, f):
print f.result()
self.futures.remove(f)
def kill(self):
self.killed = True
for future in self.futures:
try:
future.cancel()
except CancelledError, e:
print e
if __name__ == '__main__':
work_obj = Work()
Thread(target=work_obj.start_procs).start()
sleep(5)
work_obj.kill()edit
from concurrent.futures import ProcessPoolExecutor, CancelledError
from time import sleep
from random import randint
from threading import Thread
def something_fancy():
sleep(0.5)
return 'Hello World, Future was running!'
class Work:
def __init__(self):
cpu_usage = 4
self.exe = ProcessPoolExecutor(cpu_usage)
self.futures = []
self.max_queue = cpu_usage*3
self.stop = False
self.paused = False
def start_procs(self):
for i in range(200000):
while not self.stop:
if len(self.futures) <= self.max_queue:
if not self.paused:
t = self.exe.submit(something_fancy)
t.add_done_callback(self._done)
self.futures.append(t)
break
def _done(self, f):
print f.result()
self.futures.remove(f)
def pause(self):
self.paused = False if self.paused else True
def shutdown(self):
self.stop = True
for future in self.futures:
try:
future.cancel()
except CancelledError, e:
print e
if __name__ == '__main__':
work_obj = Work()
Thread(target=work_obj.start_procs).start()
print 'Started'
sleep(5)
work_obj.pause()
print 'Paused'
sleep(5)
work_obj.pause()
print 'Continue'
sleep(5)
work_obj.shutdown()
print 'Shutdown'这是有效的-仍然不能捕获CancelledError,并且仍然相当脏。
https://stackoverflow.com/questions/48043555
复制相似问题