首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Python Processpoolexecutor - kill queue?

Python Processpoolexecutor - kill queue?
EN

Stack Overflow用户
提问于 2017-12-31 23:10:50
回答 2查看 4.3K关注 0票数 3

这是我关于stackoverflow的第一个问题。我基本上可以在这里找到我需要知道的东西。顺便说一句,非常感谢你。

然而。如果我试图杀死我的ProcessPoolExecutor,它将只在生成的整个队列中工作(..我想是吗?)。有什么简单的方法可以立即清除Processpoolexecutor的队列吗?

代码语言:javascript
复制
from concurrent.futures import ProcessPoolExecutor
from time import sleep
from random import randint


def something_fancy():
    sleep(randint(0, 5))
    return 'im back!'


class Work:
    def __init__(self):
        self.exe = ProcessPoolExecutor(4)

    def start_procs(self):
        for i in range(300):
            t = self.exe.submit(something_fancy)
            t.add_done_callback(self.done)

    def done(self, f):
        print f.result()

    def kill(self):
        self.exe.shutdown()


if __name__ == '__main__':
    work_obj = Work()
    work_obj.start_procs()
    sleep(5)
    work_obj.kill()

所以我想要做的是生成一个300的队列,该队列由4个进程完成。5秒后,它应该会退出。

我需要使用进程,因为gil btw。

EN

回答 2

Stack Overflow用户

发布于 2017-12-31 23:18:09

使用shutdown(wait=False),它会返回得更快。wait的默认值是True,否则它还提供一个.Cancel(),如果不能取消,则返回False。

link to the docu

尽管如此,它仍将完成所有处理:

如果waitTrue,则此方法直到所有挂起的未来完成执行并且与executor关联的资源被释放后才会返回。

如果waitFalse,则此方法将立即返回,并且当所有挂起的未来完成执行时,与executor关联的资源将被释放。无论wait的值是多少,整个Python程序都不会退出,直到所有挂起的期货执行完毕。

如果您有固定的时间,则应提供超时:

代码语言:javascript
复制
map(func, *iterables, timeout=None, chunksize=1)

,可以是以秒为单位指定的浮点型或整型

票数 1
EN

Stack Overflow用户

发布于 2018-01-01 00:01:48

谢谢帕特里克

有了提示,我可以通过将Futures添加到列表并手动调整队列大小来取消每个进程。如果没有它,仍然有太多的进程正在启动。

似乎没有api来调整队列大小、暂停执行或删除进程队列。

然而,实现这一点的唯一方法是在线程中运行主对象,这样主脚本就可以随时杀死它。我仍然在努力赶上"CancelledError“。

看起来很“脏”,对我来说不是蟒蛇。我会采纳任何其他的建议。非常感谢。

代码语言:javascript
复制
from concurrent.futures import ProcessPoolExecutor, CancelledError
from time import sleep
from random import randint
from threading import Thread


def something_fancy():
    sleep(randint(0, 5))
    return 'im back!'


class Work:
    def __init__(self):
        self.exe = ProcessPoolExecutor(4)
        self.futures = []
        self.max_queue = 50
        self.killed = False

    def start_procs(self):
        for i in range(200000):
            while not self.killed:
                if len(self.futures) <= self.max_queue:
                    t = self.exe.submit(something_fancy)
                    t.add_done_callback(self.done)
                    self.futures.append(t)
                    break

    def done(self, f):
        print f.result()
        self.futures.remove(f)

    def kill(self):
        self.killed = True
        for future in self.futures:
            try:
                future.cancel()
            except CancelledError, e:
                print e


if __name__ == '__main__':
    work_obj = Work()
    Thread(target=work_obj.start_procs).start()
    sleep(5)
    work_obj.kill()

edit

代码语言:javascript
复制
from concurrent.futures import ProcessPoolExecutor, CancelledError
from time import sleep
from random import randint
from threading import Thread


def something_fancy():
    sleep(0.5)
    return 'Hello World, Future was running!'


class Work:
    def __init__(self):
        cpu_usage = 4
        self.exe = ProcessPoolExecutor(cpu_usage)
        self.futures = []
        self.max_queue = cpu_usage*3
        self.stop = False
        self.paused = False

    def start_procs(self):
        for i in range(200000):
            while not self.stop:
                if len(self.futures) <= self.max_queue:
                    if not self.paused:
                        t = self.exe.submit(something_fancy)
                        t.add_done_callback(self._done)
                        self.futures.append(t)
                        break

    def _done(self, f):
        print f.result()
        self.futures.remove(f)

    def pause(self):
        self.paused = False if self.paused else True

    def shutdown(self):
        self.stop = True
        for future in self.futures:
            try:
                future.cancel()
            except CancelledError, e:
                print e


if __name__ == '__main__':
    work_obj = Work()
    Thread(target=work_obj.start_procs).start()
    print 'Started'
    sleep(5)
    work_obj.pause()
    print 'Paused'
    sleep(5)
    work_obj.pause()
    print 'Continue'
    sleep(5)
    work_obj.shutdown()
    print 'Shutdown'

这是有效的-仍然不能捕获CancelledError,并且仍然相当脏。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48043555

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档