线程池源码分析,带你一起上分
前言
多线程系列我们前面已经更新过七个章节了,强烈建议小伙伴按照顺序学习:
无论是Java小白还是高级大佬,线程池基本是面试中的必考题,这个知识点虽然涉及的东西较多,但在理解上却不是很难,非常适合在面试上拿分。在上一篇文章《蹲坑也能进大厂》多线程系列-线程池精讲(必看)中,小伙伴们对线程池应该有了一个基本的认识,对线程池不是很了解的同学,建议先看上一篇。
本篇花Gie对「如何用好线程池以及它的原理」进行探讨,在面试中想要拿到更理想的工资,以及在日常工作中的问题排查,了解它的内部结构是必不可少的。
ps:本文使用JDK8环境讲解
正文
「我:狗哥狗哥,学完了上一章,可以帮我总结一下线程池的有哪几个重要部分组成吗?」
问题不大,线程池的组成部分主要有四个:
-
「线程池管理器」:用于管理线程池,如停止线程池、创建线程池等; -
「工作线程」:用于从队列中读取并执行任务; -
「任务队列」:存放来不及执行的任务; -
「任务接口」:一个一个被用来执行的任务,未执行时存放在任务队列中。
「我:线程池的家族史可以介绍一下吗?那么多类我都快乱死了」
线程池涉及的类是比较多,但区分下来还是不难理解的,我们先来看这个结构图,这几种是我们经常看到的:
-
「Executor」:是一个顶级接口,内部只包含一个 execute()
方法; -
「ExecutorService」:也是一个接口,它继承了Executor接口,并新增了 shutdown()
、submit()
等方法; -
「Executors」:是一个工具类,它提供了我们常用的创建线程方法,例如: newSingleThreadExecutor
、newFixedThreadPool
等。 -
「ThreadPoolExecutor」:是真正意义的线程池。
「我:搜得思耐,也不是很难嘛,那如何向线程池中提交任务呢?」
花Gie,你居然在我面前装X,看我教你做人。
提交任务方式有两种,其实本质上还是一种,因为submit
最终调用的还是execute()
方法:
-
「execute()」:用于提交不需要返回值的任务,所以也就意味着无法判断是否执行成功。
ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
service.execute(new Runnable() {
@Override
public void run() {}
});
-
「submit」:线程池会返回一个future类型的对象,通过这个future对象可以判读是否执行成功,并且还可以通过get()方法来获取返回值。
ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
Future<Object> future =(Future<Object>) service.submit(new Runnable() {
@Override
public void run() {
System.out.println(1);
}
});
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}finally {
//关闭线程池
service.shutdown();
}
「我:so easy嘛,狗子,有没有刺激点的」
嘚瑟,你再给我嘚瑟,既然基础知识点掌握完了,那我们就来深入了解一下源码吧,用源码的方式了解线程池的一生。
「这么突然吗,我甚至还不知道线程池的生命周期有哪几种......」
线程池的生命周期有五个:
-
「RUNNING」:此时能够接受新任务,并处理排队任务; -
「SHUTDOWN」:不再接受新任务,但是会处理排队任务; -
「STOP」:不接受新任务,也不处理排队任务,并且会中断正在执行的任务; -
「TIDYING」:所有任务都已终止,workworkerCount为零时,线程就会转换到此状态,并且运行 terminated()
函数; -
「TERMINATED」:terminated()函数运行完成。
「我:阿里嘎多欧卡桑,嘤嘤嘤~」
花Gie,你这么浪真的好么。接下来正式介绍线程池的一些重要源码吧,首先要看的是上面提到过的execute
方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//步骤一
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//步骤二
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//步骤三
else if (!addWorker(command, false))
reject(command);
}
我们可以根据条件分为三个大的步骤来分析:
-
步骤一分析
代码第三局有一个ctl
,它是用于记录线程池状态和运行线程数。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
这里会判断正在运行的线程是否达到核心线程数,如果为true
,就会调用addWorker
新增一个工作线程,并运行当前任务(command
),如果新增线程失败,就会重新获取ctl。
//运行线程数是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
//新增线程到线程池,并将当前任务添加到新增的线程中
if (addWorker(command, true))
return;
//创建线程失败,重新获取clt。
c = ctl.get();
}
-
步骤二
isRunning:判断线程池的是否为运行状态
如果运行线程数不小于核心线程数,就会执行以下6个子步骤:
//1.线程池是运行状态并且运行线程大于核心线程数时,把任务放入队列中。
if (isRunning(c) && workQueue.offer(command)) {
//2.获取线程池状态
int recheck = ctl.get();
//3.如果线程池不是运行状态,把任务移除队列
if (! isRunning(recheck) && remove(command))
//4.执行拒绝策略
reject(command);
//5.判断当前运行线程数是否为0
else if (workerCountOf(recheck) == 0)
//6.创建线程并加入到线程池
addWorker(null, false);
}
//移除任务
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
-
步骤三
如果前几个条件都不满足,也就是运行线程大于核心线程数时并且队列已满时,就会调用addWorker新建线程执行当前任务,如果新建失败,则表示运行线程已达到最大线程数,不能再次创建新的线程,此时就会执行拒绝策略。
//创建线程放入线程池中,并且运行当前任务。
else if (!addWorker(command, false))
//运行线程大于最大线程数时,失败则拒绝该任务
reject(command);
上面多次用到addWorker
方法,简单看下它的实现逻辑。
「这里做一个总结并附上部分源码注释」,小伙伴们啃起来,略长:
-
「addWorker(command, true)」:当线程数小于corePoolSize时,创建核心线程并且运行task。 -
「addWorker(command, false)」:当核心线程数已满,阻塞队列已满,并且线程数小于 maximumPoolSize
时,创建非核心线程并且运行task。 -
「addWorker(null, false)」:如果工作线程为0是,创建一个核心线程但是不运行task。(主要是避免工作队列中还有任务,但是工作线程为0,导致工作队列中的任务一直没有执行)
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//获取线程池状态和运行线程数。
int c = ctl.get();
//获取线程池的运行状态
int rs = runStateOf(c);
//线程池处于关闭状态、当前任务为null、队列不为空,斗直接返回失败
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//获取线程池中的线程数
int wc = workerCountOf(c);
//线程数超过CAPACITY,直接返回false;
//如果core为true,则运行线程数与核心线程数进行比较,为false则与最大线程数进行比较。
//并且运行线程数大于等于core时,返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//尝试增加线程数,如果成功,则跳出第一个for循环
if (compareAndIncrementWorkerCount(c))
break retry;
//如果增加线程数失败,则重新获取ctl
c = ctl.get();
//如果当前的运行状态不等于rs,说明状态已被改变,
//返回第一个for循环继续执行
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//根据当前任务来创建Worker对象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//获得锁以后,重新检查线程池状态
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
//把刚刚创建的线程加入到线程池中
workers.add(w);
int s = workers.size();
//记录线程池中出现过的最大线程数量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//启动线程,开始运行任务
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
「我:这一波...我可能要啃一个周末了,那线程池最后应该怎样关闭呢?」
有两种方式可以关闭正在运行的线程池:
-
「shutdown:」 将线程池的状态设置成SHUTDOWN状态,然后将没有执行任务的所有线程停止。 -
「shutdownNow:」 通过遍历线程池中的工作线程,并逐一调用线程的 interrupt
方法来中断线程,对于无法响应中断的任务可能会永远无法终止。shutdownNow
会首先将线程池的状态设置成STOP
,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。
无论调用哪一种方式去停止线程,再次调用isShutdown
方法都会返回true,当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed
方法会返回true
。
至于我们应该调用哪一种方法来关闭线程池,取决于我们添加到线程池中任务的特性来决定,如果任务不要求执行完整,可以调用shutdownNow
,但通常会使用shutdown
来关闭线程池。
总结
以上就是线程池的全部内容了,确实有点长,建议小伙伴能够静下心慢慢吭,切不可囫囵吞枣,不然浪费时间还没有学到东西,有疑问的小伙伴可以在下方留言。
点关注,防走丢
以上就是本期全部内容,「如有纰漏之处,请留言指教,非常感谢」。我是花GieGie ,有问题大家随时留言讨论 ,我们下期见🦮。
「原创不易,切勿白嫖」,如果你觉得这篇文章对你有点用的话,感谢老铁为本文点个赞、评论或转发一下
,因为这将是我输出更多优质文章的动力,感谢!