首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >中断ThreadPoolExecutor

中断ThreadPoolExecutor
EN

Stack Overflow用户
提问于 2021-10-23 18:53:13
回答 1查看 68关注 0票数 0

当脚本中断时,如何立即停止concurrent.futures.ThreadPoolExecutor并丢弃所有挂起的操作,如下例所示:

代码语言:javascript
复制
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed


def get_parallel(arguments_list: list[dict]):
    try:
        with ThreadPoolExecutor(max_workers=25) as executor:
            futures_buffer = [executor.submit(requests.get, **kwargs) for kwargs in arguments_list]

            for future in as_completed(futures_buffer):
                try:
                    response = future.result()
                    print (response.url)
                    yield response.url, response.status_code, response.json()['args']
                except KeyboardInterrupt:
                    executor.shutdown(wait=False, cancel_futures=True)
                    yield 'KeyboardInterrupt 1'
                    return
                except Exception as exception:
                    yield exception
    except KeyboardInterrupt:
        yield 'KeyboardInterrupt 2'
        return


if __name__ == '__main__':
    arguments = [dict(url=f'https://httpbin.org/get?q={i}') for i in range(200)]
    for t in get_parallel(arguments):
        print(t)

正如现在的代码一样,当我从终端运行它,然后调用^C时,它将停止打印结果,但会挂起相同的时间,就像它没有被中断一样,最后它将打印KeyboardInterrupt 2

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-10-23 19:48:33

你不能在python中发信号通知单独的线程,即使你可以,也不能保证请求不会创建自己的线程作为工作线程。您可以终止进程,因此可以将请求委派给子进程,并在遇到键盘中断时终止它。要做到这一点,最干净的方法是管理您自己的子流程和工作项队列。但是,如果你不介意一点黑客攻击,concurrent.futures.ProcessPoolExecutor保留了它的池进程的列表,你可以劫持它。但它的黑客行为..。它可能会在未来的某个时候被打破。由于响应被pickled并发送回父进程,因此最好使用一个中间工作函数来从响应对象(而不是响应对象本身)获取有用数据。

代码语言:javascript
复制
import concurrent.futures
import os
import requests
import signal

def worker(arguments_list: dict):
    """use requests to get web page and return url, status, json args"""
    resp = requests.get(**arguments_list)
    # todo: when status_code not 200 and json decode fails, do...???
    return resp.url, resp.status_code, resp.json()['args']

def get_parallel(arguments_list: list[dict]):
    with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor:
        try:
            futures_buffer = [executor.submit(worker, kwargs) for kwargs in arguments_list]

            for future in concurrent.futures.as_completed(futures_buffer):
                    url, status_code, args = future.result()
                    print (url)
                    yield url, status_code, args
        except KeyboardInterrupt:
            for pid in executor._processes:
                os.kill(pid, signal.SIGKILL)
            yield 'KeyboardInterrupt 2'

if __name__ == '__main__':
    arguments = [dict(url=f'https://httpbin.org/get?q={i}') for i in range(200)]
    for t in get_parallel(arguments):
        print(t)
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69691136

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档