线程池的参数动态调整 总结: 线程池类 ThreadPoolExecutor 中已经提供了对应的方法,允许动态修改线程池参数: 1、动态修改核心数 当 allowCoreThreadTimeOut 参数设置为 true 的时候, 核心线程在空闲了 keepAliveTime 的时间后也会被回收的, 相当于线程池自动给你动态修改了 public void setCorePoolSize(int corePoolSize (null, true)) { if (workQueue.isEmpty()) break; } } } 2、 动态修改最大线程数 public void setMaximumPoolSize(int maximumPoolSize) { if (maximumPoolSize <= 0 || this.keepAliveTime; this.keepAliveTime = keepAliveTime; if (delta < 0) interruptIdleWorkers(); } 4、动态修改线程工厂
接上文线程池原理(1) 线程池的创建 通过ThreadPoolExecutor构造函数实现(推荐) ? 线程池执行流程 任务缓冲 任务缓冲模块是线程池能够管理任务的核心部分。线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。 线程池大小确定 线程池数量的确定一直是困扰着程序员的一个难题,大部分程序员在设定线程池大小的时候就是随心而定。 很多人甚至可能都会觉得把线程池配置过大一点比较好!我觉得这明显是有问题的。 I/O 密集型任务(2N):这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。 因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。 如何判断是 CPU 密集任务还是 IO 密集任务?
在使用Java线程池实现各种的需求过程中,很是能体会线程池的好处。但是随着需求增加,发现Java线程池自带的集中模式也有点不太够用。所以又想自己根据现有的API进行拓展的想法。 Java线程池执行task的流程图如下: Java线程池执行流程图 在是否创建新的线程池逻辑中,只有当核心线程数未满和任务队列已经满了两种情况,但是在性能测试过程中,经常会批量初始化很多数据,这个时候如果使用异步进行初始化 ,就需要一个相当大的等待队列长度,而通常线程池使用核心线程数和最大线程数来控制线程池的活跃线程数量。 无法实现动态根据等待队列中的数量多少灵活增加活跃线程数来提升异步任务的处理能力,也无法动态减低,减少线程池活跃线程,降低资源消耗。 * * 2.
废话不多说,开始我们的线程池源码的第二轮阅读。 回顾 简单回顾下上一篇线程池源码中涉及的两个方法,一个是execute() 执行任务的入口,还有一个是addWorker() 最通俗地理解就是是否需要添加新线程。 开始是一个循环,要么执行worker自带的第一个任务(firstTask),要么通过getTask() 获取任务 有任务首先得保证线程池是正常的,以下两种情况均调用wt.interrupt() 给「线程设置中断标志位 」 线程池处于STOP状态,也就是不接受新任务,也不执行队列中的任务 如果线程的标志位已经为true,那么清楚标志位,此时的线程池状态为STOP状态,这里看起来可能比较别扭,有了第一种情况为什么还要第二种 一进来也是一个死循环,可以先聚焦「什么时候会退出循环」,肯定是「不正常的情况」下会退出 当线程池状态不处于RUNNING或者SHUTDOWN的时候,或者是当线程处于SHUTDOWN但是工作队列中没有任务
一、如果要设计一个动态线程池,如何实现? 1)如果要实现一个动态线程池,首先需要考虑的是将线程池的相关配置信息外置。这样出现问题的时候,能够基于配置修改,实现热部署。修改配置后,就能生效。 2)如果线程池出现问题或者完成修改后,能够基于监控的信息,进行通知和告警。这样就需要考虑通知和告警的方式的多样性:比如基于钉钉、微信、飞书、电子邮件等渠道进行通知和告警。 二、 dynamic-tp动态线程池的思想思路 1.事件发布 根据引入的dynamic-tp-spring-cloud-starter-nacos或者dynamic-tp-spring-boot-starter-nacos 方便后续对线程池的操作。 task"); }, "task-" + i)); } } 由此可以看到实现了两个最为主要的功能:对线程池进行动态变更和对线程池的监控告警。
不弄动态加载库。 头文件:#include<c_pthread_pool.h> 初始化传参:E_PThread_Pool(int max_count,int min_count,int wait_sec); 传入最大线程数 、最低线程数、线程等待时间。 调度线程:addTask(Task *t); 使用方式:Task是个抽象基类,只有一个默认构造函数、一个析构函数, 以及一个纯虚函数virtual int run()=0; 使用时用一个子类继承,继承时可在子类构造中添加自己的东西
前言 线程池实现原理-1 addWorker实现 在看addWorker方法之前,我们先看一个例子,了解一下retry的使用 break retry 跳到retry处,且不再进入循环 continue = null || workQueue.isEmpty) * 1.如果当前线程池的状态>SHUTDOWN,addWorker返回false,添加任务失败 * 2.如果当前线程池的状态 workerStarted) addWorkerFailed(w); } return workerStarted; } 仔细理解一下这段代码,其实就能理解,当线程池处于 // 1.核心线程允许被销毁 // 2.核心线程数 > corePoolSize boolean timed = allowCoreThreadTimeOut || wc 2.wc > maximumPoolSize肯定要删除线程了 // 3.workQueue为空可以销毁线程,此时有可能所有线程都被销毁了 // 4.workQueue不为空
,运行情况不能及时感知到,直到出现问题 如果你有以上痛点,动态可监控线程池(DynamicTp)或许能帮助到你。 那么我们是否可以结合配置中心来做运行时线程池参数的动态调整呢? ThreadPoolExecutor 做一些扩展增强,主要实现以下目标」 1.实现对运行中线程池参数的动态修改,实时生效 2.实时监控线程池的运行状态,触发设置的报警策略时报警,报警信息推送办公平台 1.配置变更监听模块 2.服务内部线程池管理模块 3.三方组件线程池管理模块 4.监控模块 5.通知告警模块 代码结构 「adapter 模块」:主要是适配一些第三方组件的线程池管理,目前已经实现的有 Spring 容器中 2.接受配置监听模块的刷新事件,实现线程池参数的刷新 3.代码中通过依赖注入(推荐)或者 DtpRegistry.getDtpExecutor() 方法根据线程池名称来获取线程池实例
那么我们是否可以结合配置中心来做运行时线程池参数的动态调整呢? ThreadPoolExecutor 做一些扩展增强,主要实现以下目标」 ❝ 1.实现对运行中线程池参数的动态修改,实时生效 2.实时监控线程池的运行状态,触发设置的报警策略时报警,报警信息推送办公平台 ❝ 1.配置变更监听模块 2.服务内部线程池管理模块 3.三方组件线程池管理模块 4.监控模块 5.通知告警模块 ❞ 代码结构 ❝ 「adapter 模块」:主要是适配一些第三方组件的线程池管理,目前已经实现的有 ,生成线程池实例注册到内部线程池注册中心以及 Spring 容器中 2.接受配置监听模块的刷新事件,实现线程池参数的刷新 3.代码中通过依赖注入(推荐)或者 DtpRegistry.getDtpExecutor () 方法根据线程池名称来获取线程池实例 三方组件线程池管理 1.服务启动获取第三方中间件的线程池,被框架管理起来 2.接受参数刷新、指标收集、通知报警事件,进行相应的处理 监控模块 实现监控指标采集以及输出
2 线程池的创建 Executors中提供了一系列静态方法创建线程池: newSingleThreadExecutor:一个单线程的线程池。如果因异常结束,会再创建一个新的,保证按照提交顺序执行。 newWorkStealingPool:JDK8新增,根据所需的并行层次来动态创建和关闭线程,通过使用多个队列减少竞争,底层使用ForkJoinPool来实现。 通过 ctl.get() 得到线程池的当前线程数,如果线程数小于corePoolSize,则调用 **addWorker(commond,true)** 方法创建新的线程执行任务,否则执行步骤2; 2. ,但再也不会接受新的任务 shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务 3.7 线程池容量的动态调整 ThreadPoolExecutor 提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(), setCorePoolSize:设置核心池大小 setMaximumPoolSize:
大家好,这篇文章我们来介绍下动态线程池框架(DynamicTp)的adapter模块,上篇文章也大概介绍过了,该模块主要是用来适配一些第三方组件的线程池管理,让第三方组件内置的线程池也能享受到动态参数调整 gitee地址:https://gitee.com/yanhom/dynamic-tp github地址:https://github.com/lyh200/dynamic-tp *** 系列文章 美团动态线程池实践思路 #execute()方法的执行流程 1.判断如果当前线程数小于核心线程池,则新建一个线程来处理提交的任务2.如果当前线程数大于核心线程数且队列没满,则将任务放入任务队列等待执行 3.如果当前当前线程池数大于核心线程池 JUC线程池的执行流程,改写后Tomcat线程池执行流程如下: 1.判断如果当前线程数小于核心线程池,则新建一个线程来处理提交的任务 2.如果当前当前线程池数大于核心线程池,小于最大线程数,则创建新的线程执行提交的任务 同时也介绍了基于DynamicTp怎么动态调整线程池的参数,当我们做WebServer性能调优时,能动态调整参数真的是非常好用的。 再次欢迎大家使用DynamicTp框架,一起完善项目。
vs submit() execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否; submit()方法用于提交需要返回值的任务。 线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功 ,并且可以通过 Future 的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成, ,线程池的状态变为 SHUTDOWN。 线程池不再接受新任务了,但是队列里的任务得执行完毕。 shutdownNow() :关闭线程池,线程的状态变为 STOP。 线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。
主线程: 相当于生产者,只管向线程池提交任务。并不关心线程池是如何执行任务的。因此,并不关心是哪一个线程执行的这个任务。 线程池: 相当于消费者,负责接收任务,并将任务分配到一个空闲的线程中去执行。 python内置进程池 ? >>>执行结果 ? # 必须要有一个 main 测试 >>> if __name__ == "__main__": # Pool 的实例化必须在 main 测试之下 >>>pool = Pool(2) 池的其他操作 操作一 操作二: terminate - 中止进程池,中止所有任务。 # 会阻塞,知道结果产生了 >>> result = a_result.get() 使用线程池来实现并发服务器 ? >>>客户端 ? >>>执行结果 ?
而如果系统中使用 hippo4j,能够在控制台查看当前应用已有线程池,是否存在相同语义且业务可复用线程池实例,避免线程资源过度浪费。 2. 大家都知道,如果要修改运行中应用线程池参数,需要停止线上应用,调整成功后再发布,而这个过程异常的繁琐,如果能在运行中动态调整线程池的参数多好。 美团技术团队基于这些痛点,推出了动态线程池的概念,催生了一批动态线程池框架,hippo4j 也是其一。 再比如,压测时使用 hippo4j 动态调整线程池参数,对于开发测试来说,也是个不错的选择。 3. 因为如果线程池任务长时间执行,会影响整个应用的停止,进行了折中处理。 7. 三方框架中间件线程池适配 hippo4j 的目标是兼容所有框架的线程池,并可以提供监控和动态修改的能力。
线程池不是运行状态时,addWorker内部会判断线程池状态 // 2. addWorker第2个参数表示是否创建核心线程 // 3. addWorker返回false,则说明任务执行失败 线程池状态大于SHUTDOWN时,直接返回false // 2. 线程池状态等于SHUTDOWN,且firstTask不为null,直接返回false // 3. processWorkerExit(w, completedAbruptly); } } 线程池参数动态化 现有的解决方案的痛点。 如果是 CPU 密集型的,可以把核心线程数设置为核心数+1; 如果是包含 IO 操作的任务 但是往往一台服务器是部署了多个应用,一个应用也会有多个线程池,所以很难配置一个完美的参数 动态更新的工作原理是什么 其实可以把二者设置为相同的值,然后设置allowCoreThreadTimeOut 参数设置为 true ,核心线程在空闲了 keepAliveTime 的时间后也会被回收的,相当于线程池自动给你动态修改
前几天和一个大佬聊天的时候他说自己最近在做线程池的监控,刚刚把动态调整的功能开发完成。 想起我之前写过这方面的文章,就找出来看了一下:《如何设置线程池参数?美团给出了一个让面试官虎躯一震的回答。》 为了更好的描述这个坑,我先给大家回顾一下线程池动态调整的几个关键点。 首先,为什么需要对线程池的参数进行动态调整呢? 因为随着业务的发展,有可能出现一个线程池开始够用,但是渐渐的被塞满的情况。 带入一个实际的场景,也就是前面的示例代码,只是调整一下参数: 这个线程池核心线程数是 1,最大线程数是 2,队列长度是 5,最多能容纳的任务数是 7。 另外有一个线程在执行把核心线程池从 1 修改为 2 的操作。 假设我们记线程池 submit 提交了 6 个任务,正在提交第 7 个任务的时间点为 T1。 为什么是要强调这个时间点呢? corePoolSize : maximumPoolSize): wc 就是当前线程池,正在工作的线程数。 把我们前面的条件带进去,就是这样的 wc >=(false?2:2)。 即 wc=2。
那么我们是否可以结合配置中心来做运行时线程池参数的动态调整呢?答案是肯定的,而且配置中心相对都是高可用的,使用它也不用过于担心配置推送出现问题这类事儿,而且也能减少研发动态线程池组件的难度和工作量。 但线程池的参数并不好确定;需要有套机制在运行过程中动态去调整参数 无感知性,线程池运行过程中的各项指标一般感知不到;需要有套监控报警机制在事前、事中就能让开发人员感知到线程池的运行状况,及时处理 高可用性 做一些扩展,实现对运行中线程池参数的动态修改,实时生效;以及实时监控线程池的运行状态,触发设置的报警策略时报警,报警信息会推送办公平台(钉钉、企微等)。 ,对线程池参数动态化管理,增加监控、报警功能 基于Spring框架,现只支持SpringBoot项目使用,轻量级,引入starter即可食用 基于配置中心实现线程池参数动态调整,实时生效;集成主流配置中心 ,生成线程池实例注册到内部线程池注册中心中 2.监听模块监听到配置变更时,将变更信息传递给管理模块,实现线程池参数的刷新 3.代码中通过getExecutor()方法根据线程池名称来获取线程池对象实例
位表示工作线程数,最左边3位表示线程池状态。 ); } 第2处 发生拒绝的原因有两个(1)线程池状态非Runing (2)等待队列已满。 /** * 根据当前线程池状态,检查是否可以添加新的任务线程,如果可以则创建并启动任务 * 如果一切正常则返回true。 返回false 的可能如下: * 1.线程池没有处于RUNNING状态 * 2.线程工程创建新的任务线程失败 * @param firstTask 外部启动线程池时需要构造的第一个线程 corePoolSize : maximumPoolSize)) // 最大线程数不能超过2^29,否则影响左边3位的线程池状态值 return false;
所以需要通过线程池协调多个线程,并实现类似主次线程隔离、定时执行、周期执行等任务。线程池的作用包括: 利用线程池管理并复用线程、控制最大并发数等。 实现任务线程队列缓存策略和拒绝机制。 在了解线程池的基本作用后,我们学习一下线程池是如何创建线程的。 这个值的设置非常关键,设置过大会浪费资源,设置的过小会导致线程频繁地创建或销毁。 第2个参数:maximumPoolSize 表示线程池能够容纳同时执行的最大线程数。 从代码第2处来看,队列、线程工程、拒绝处理服务都必须有实例对象,但在实际编码中,很少有程序员对着三者进行实例化,而通过Executors这个线程池静态工厂提供默认实现,那么Executors与ThreadPoolExecutor execute(task1); } } } 当任务被拒绝的时候,拒绝策略会打印出当前线程池的大小以及达到了maximumPoolSize=2 ,且队列已满。
在对线程池配置参数进行调整时,一般需要对服务进行重启,这样修改的成本就会偏高。一种解决办法就是,将线程池的配置放到平台侧,运行开发同学根据系统运行情况对核心参数进行动态配置。 本文以Nacos作为服务配置中心,以修改线程池核心线程数、最大线程数为例,实现一个简单的动态化线程池。 5.controller 为了观察线程池动态变更的效果,增加Controller类。 这时,打印具体的线程状态,发现线程池参数修改成功。 总结 这里,只是简单实现了一个可以调整核心线程数和最大线程数的动态线程池。 具体的线程池实现原理可以参考美团的这篇文章:https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html,结合监控告警等实现一个完善的动态线程池产品