首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >为什么在线程池中会把null作为任务添加

为什么在线程池中会把null作为任务添加

原创
作者头像
半月无霜
修改2025-02-18 14:15:02
修改2025-02-18 14:15:02
2850
举报
文章被收录于专栏:半月无霜半月无霜

一、前言

不知道大家有没有注意到,在线程池的源码中,有个addWorker()方法。

这个方法的作用是给线程池添加一个任务,以便线程池去调度线程,去执行添加的任务。

那么查看这个方法的调用,发现线程池内部的源码中,出现了许多addWorker(null, true/false)这样的调用

image-20230530225334992
image-20230530225334992

这是为啥呢?这不是一个添加任务的方法么,添加一个null任务,对线程池运行整体情况有什么用?

二、分析

想要知道线程池为什么要这样调用,就要清楚如果添加一个为null的任务会出现什么情况把。

代码语言:javascript
复制
     private boolean addWorker(Runnable firstTask, boolean core) {
         retry:
         for (;;) {
             int c = ctl.get();
             int rs = runStateOf(c);
 ​
             // Check if queue empty only if necessary.
             if (rs >= SHUTDOWN &&
                 ! (rs == SHUTDOWN &&
                    firstTask == null &&
                    ! workQueue.isEmpty()))
                 return false;
 ​
             for (;;) {
                 int wc = workerCountOf(c);
                 if (wc >= CAPACITY ||
                     wc >= (core ? corePoolSize : maximumPoolSize))
                     return false;
                 if (compareAndIncrementWorkerCount(c))
                     break retry;
                 c = ctl.get();  // Re-read ctl
                 if (runStateOf(c) != rs)
                     continue retry;
                 // else CAS failed due to workerCount change; retry inner loop
             }
         }
 ​
         boolean workerStarted = false;
         boolean workerAdded = false;
         Worker w = null;
         try {
             w = new Worker(firstTask);
             final Thread t = w.thread;
             if (t != null) {
                 final ReentrantLock mainLock = this.mainLock;
                 mainLock.lock();
                 try {
                     // Recheck while holding lock.
                     // Back out on ThreadFactory failure or if
                     // shut down before lock acquired.
                     int rs = runStateOf(ctl.get());
 ​
                     if (rs < SHUTDOWN ||
                         (rs == SHUTDOWN && firstTask == null)) {
                         if (t.isAlive()) // precheck that t is startable
                             throw new IllegalThreadStateException();
                         workers.add(w);
                         int s = workers.size();
                         if (s > largestPoolSize)
                             largestPoolSize = s;
                         workerAdded = true;
                     }
                 } finally {
                     mainLock.unlock();
                 }
                 if (workerAdded) {
                     t.start();
                     workerStarted = true;
                 }
             }
         } finally {
             if (! workerStarted)
                 addWorkerFailed(w);
         }
         return workerStarted;
     }

上面是完整的addWorker()方法的源码

首先是第一个for循环,标记为retry的这个,里面就时检查了一下线程池的状态,与运行状态

  • 如果线程池已关闭或正在关闭,那么久返回false
  • 检查线程池当前工作线程,是否超过最大线程数,如果超过,则返回false
  • 如果可以添加工作线程,那就通过cas的方式进行添加
    • 添加成功,就跳出retry这个循环
    • 添加失败,则需要继续往后判断,重新获取ctl,与刚刚cas添加的是否发生变化
      • 如果发生了变化,则回到retry这个循环,继续上面的步骤
      • 如果没有发生变化,则回到内部的循环重新进行cas添加

    流程图如下


如果ctl添加成功了,那么接下来就是添加firstTask,来看下一段

首先就是将传入的firstTask,封装成为一个Worker,至于这个类,后面会讲解到

Worker里面有个thread线程

  • 判断这个thread线程不为空
    • 不为空之后就对mainLock进行加锁,然后重新检查线程状态
    • 一但程池状态允许添加新线程,则将新线程加入线程池并更新最大线程数
    • 启动新线程
代码语言:javascript
复制
     // 这中间代码省略了,第一部分是判断状态和添加任务数,第二部分是判断状态和启动任务
     // 如果线程池的状态是RUNNING,那么一个任务是大概率都是可以添加成功的
     private boolean addWorker(Runnable firstTask, boolean core) {
         // ... 省略了,关心下面t.start();做了什么即可
         if (workerAdded) {
             t.start();
             workerStarted = true;
         }
     }

三、Worker

上面这个方法还是不清楚,添加了null到底有什么用,实际上的问题核心指向了Worker

代码语言:javascript
复制
     // 主要还是要看启动Worker做了什么
     private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
         
         // 首先构造方法
         Worker(Runnable firstTask) {
             setState(-1);
             // 传入一个任务后,作为自己的属性
             this.firstTask = firstTask;
             // 将自己作为任务构建了一个线程作为自己的属性。他自己也实现了Runnable接口
             this.thread = getThreadFactory().newThread(this);
         }
         
         // 当上面t.start();启动的是Worker的run方法
         public void run() {
             runWorker(this);
         }
 ​
         // 上面run();方法调用过来的
         final void runWorker(Worker w) {
             // 线程池里面的,当前的线程
             Thread wt = Thread.currentThread();
             // 当前真正要执行的任务,可能为null,本小节直接定义null
             Runnable task = w.firstTask;
             // 将属性变为null
             w.firstTask = null;
             w.unlock();
             boolean completedAbruptly = true;
             try {
                 // 重点在这里,当task==null时,它会去getTask();方法中去获取task进行判断
                 // 如果getTask();方法返回的是null,那么说明本次循环结束,任务运行完成
                 // 如果getTask();方法返回的是队列中的任务,那么进入循环体,执行任务
                 while (task != null || (task = getTask()) != null) {
                     w.lock();
                     // 下面就是一些判断状态和执行任务的代码了
                     if ((runStateAtLeast(ctl.get(), STOP) ||
                          (Thread.interrupted() &&
                           runStateAtLeast(ctl.get(), STOP))) &&
                         !wt.isInterrupted())
                         wt.interrupt();
                     try {
                         beforeExecute(wt, task);
                         Throwable thrown = null;
                         try {
                             // 这里才是真正执行我们任务的地方
                             task.run();
                         } catch (RuntimeException x) {
                             thrown = x; throw x;
                         } catch (Error x) {
                             thrown = x; throw x;
                         } catch (Throwable x) {
                             thrown = x; throw new Error(x);
                         } finally {
                             afterExecute(task, thrown);
                         }
                     } finally {
                         task = null;
                         w.completedTasks++;
                         w.unlock();
                     }
                 }
                 completedAbruptly = false;
             } finally {
                 processWorkerExit(w, completedAbruptly);
             }
         }
 ​
         // 这是从上面runWorker();方法调用而来
         private Runnable getTask() {
             // 最后poll()方法是否超时
             boolean timedOut = false;
 ​
             for (;;) {
                 // 获取线程池状态
                 int c = ctl.get();
                 int rs = runStateOf(c);
 ​
                 // 检查阻塞队列是否为空
                 // 当状态是RUNNING时,false
                 // 当状态是SHUTDOWN时,判断队列是否为空,如果有值,false
                 // 如果状态是后面几种状态时,无论队列是否有值,true
                 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                     decrementWorkerCount();
                     // 这里返回null,就代表task=null了
                     return null;
                 }
 ​
                 // 当前线程池运行的线程数
                 int wc = workerCountOf(c);
 ​
                 // allowCoreThreadTimeOut,这个讲一个,这个布尔值代表,核心线程数是否也可以被回收
                 // 如果为true,空闲时会保证keepAliveTime的时候,过期销毁
                 // 如果为false(默认),那么在空闲时也会保持活动
                 // 这里主要判断是否允许超时保留核心线程,用来确定下面阻塞队列的阻塞时间
                 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
 ​
                 // 加这个判断,主要是想留一个线程在这循环阻塞,加快从队列中取任务的流程步骤
                 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
                     if (compareAndDecrementWorkerCount(c))
                         return null;
                     continue;
                 }
 ​
                 try {
                     // 获取任务
                     Runnable r = timed ?
                         workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                         workQueue.take();
                     // 返回
                     if (r != null)
                         return r;
                     // 没有就一直处在循环之中,并配合上面的107行判断使用
                     timedOut = true;
                 } catch (InterruptedException retry) {
                     timedOut = false;
                 }
             }
         }
 ​
     }

看上面的注释,主要就是看这个判断了

while (task != null || (task = getTask()) != null)

  1. 当task==null时,它会去getTask()方法中去获取task
    1. getTask()获取到的tasknull,则退出while循环了
    2. getTask()获取到的task是有具体的任务的,则进行while循环,进行执行具体的任务

getTask()方法是去线程池的阻塞队列中获取任务,本身会有阻塞效果,时间到了,自然会获取到队列中的task

所以,这就解释了为什么要添加一个null进去 丢一个null进去后,就可以理解为激活了线程池的一个线程,执行了一个包装了null的一个worker 而这个worker,会去调起task,但由于这个task是空的,那么就直接调用getTask()方法,从队列中获取一个task 所以答案就是这样调起的效率更高,少了很多的步骤 比如说,前面线程池状态的检查,新增线程,将task封装worker

四、最后

多阅读源码,有助于自己编码水平的上升,尤其是某些思想,别人优秀的代码虽然复杂,但其中的思想真让人拍手惊叹。

上面是线程池的部分源码解读,希望对您有帮助。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、前言
  • 二、分析
  • 三、Worker
  • 四、最后
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档