不知道大家有没有注意到,在线程池的源码中,有个addWorker()方法。
这个方法的作用是给线程池添加一个任务,以便线程池去调度线程,去执行添加的任务。
那么查看这个方法的调用,发现线程池内部的源码中,出现了许多addWorker(null, true/false)这样的调用

这是为啥呢?这不是一个添加任务的方法么,添加一个null任务,对线程池运行整体情况有什么用?
想要知道线程池为什么要这样调用,就要清楚如果添加一个为null的任务会出现什么情况把。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}上面是完整的addWorker()方法的源码
首先是第一个for循环,标记为retry的这个,里面就时检查了一下线程池的状态,与运行状态
falsefalsecas的方式进行添加retry这个循环ctl,与刚刚cas添加的是否发生变化retry这个循环,继续上面的步骤cas添加流程图如下

如果ctl添加成功了,那么接下来就是添加firstTask,来看下一段
首先就是将传入的firstTask,封装成为一个Worker,至于这个类,后面会讲解到
Worker里面有个thread线程
thread线程不为空mainLock进行加锁,然后重新检查线程状态 // 这中间代码省略了,第一部分是判断状态和添加任务数,第二部分是判断状态和启动任务
// 如果线程池的状态是RUNNING,那么一个任务是大概率都是可以添加成功的
private boolean addWorker(Runnable firstTask, boolean core) {
// ... 省略了,关心下面t.start();做了什么即可
if (workerAdded) {
t.start();
workerStarted = true;
}
}上面这个方法还是不清楚,添加了null到底有什么用,实际上的问题核心指向了Worker
// 主要还是要看启动Worker做了什么
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// 首先构造方法
Worker(Runnable firstTask) {
setState(-1);
// 传入一个任务后,作为自己的属性
this.firstTask = firstTask;
// 将自己作为任务构建了一个线程作为自己的属性。他自己也实现了Runnable接口
this.thread = getThreadFactory().newThread(this);
}
// 当上面t.start();启动的是Worker的run方法
public void run() {
runWorker(this);
}
// 上面run();方法调用过来的
final void runWorker(Worker w) {
// 线程池里面的,当前的线程
Thread wt = Thread.currentThread();
// 当前真正要执行的任务,可能为null,本小节直接定义null
Runnable task = w.firstTask;
// 将属性变为null
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// 重点在这里,当task==null时,它会去getTask();方法中去获取task进行判断
// 如果getTask();方法返回的是null,那么说明本次循环结束,任务运行完成
// 如果getTask();方法返回的是队列中的任务,那么进入循环体,执行任务
while (task != null || (task = getTask()) != null) {
w.lock();
// 下面就是一些判断状态和执行任务的代码了
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 这里才是真正执行我们任务的地方
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
// 这是从上面runWorker();方法调用而来
private Runnable getTask() {
// 最后poll()方法是否超时
boolean timedOut = false;
for (;;) {
// 获取线程池状态
int c = ctl.get();
int rs = runStateOf(c);
// 检查阻塞队列是否为空
// 当状态是RUNNING时,false
// 当状态是SHUTDOWN时,判断队列是否为空,如果有值,false
// 如果状态是后面几种状态时,无论队列是否有值,true
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
// 这里返回null,就代表task=null了
return null;
}
// 当前线程池运行的线程数
int wc = workerCountOf(c);
// allowCoreThreadTimeOut,这个讲一个,这个布尔值代表,核心线程数是否也可以被回收
// 如果为true,空闲时会保证keepAliveTime的时候,过期销毁
// 如果为false(默认),那么在空闲时也会保持活动
// 这里主要判断是否允许超时保留核心线程,用来确定下面阻塞队列的阻塞时间
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 加这个判断,主要是想留一个线程在这循环阻塞,加快从队列中取任务的流程步骤
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 返回
if (r != null)
return r;
// 没有就一直处在循环之中,并配合上面的107行判断使用
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
}看上面的注释,主要就是看这个判断了
while (task != null || (task = getTask()) != null)
getTask()方法中去获取taskgetTask()获取到的task是null,则退出while循环了getTask()获取到的task是有具体的任务的,则进行while循环,进行执行具体的任务而getTask()方法是去线程池的阻塞队列中获取任务,本身会有阻塞效果,时间到了,自然会获取到队列中的task
所以,这就解释了为什么要添加一个
null进去 丢一个null进去后,就可以理解为激活了线程池的一个线程,执行了一个包装了null的一个worker而这个worker,会去调起task,但由于这个task是空的,那么就直接调用getTask()方法,从队列中获取一个task所以答案就是这样调起的效率更高,少了很多的步骤 比如说,前面线程池状态的检查,新增线程,将task封装worker等
多阅读源码,有助于自己编码水平的上升,尤其是某些思想,别人优秀的代码虽然复杂,但其中的思想真让人拍手惊叹。
上面是线程池的部分源码解读,希望对您有帮助。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。