前言
很久以前看过JDK线程池实现源码,但最近的面试被问到线程实现原理的时候还说不清楚,甚至和连接池搞混。所以有必要重温一遍并记录下来以便日后翻阅。
线程池类图
类层次结构
Executor接口
Executor接口只有一个接受Runnable参数的execute方法,用来执行任务。
1 | package java.util.concurrent; |
###ExecutorService接口
ExecutorService接口主要定义线程池内部管理的接口
1 | package java.util.concurrent; |
AbstractExecutorService抽象类
AbstractExecutorService抽象类实现了大部分方法,但大部分方法都依赖Executor接口声明的execute方法,这里并没有实现该方法,而是把这个方法交由子类(java.util.concurrent.ThreadPoolExecutor)实现。
1 | package java.util.concurrent; |
#线程池原理
线程池的状态
- RUNNING:正在处理任务和接受队列中的任务。
- SHUTDOWN:不再接受新的任务,但是会继续处理完队列中的任务。
- STOP:不再接受新任务,也不继续处理队列中的任务,且会中止正在处理的任务。
- TIDYING:所有任务都处理结束,目前worker数为0,当线程池进入这个状态的时候,会调用terminated()方法。
- TERMINATED:线程池已经全部结束,并且terminated()方法执行完成。
1 | // 线程池状态计数器 |
##线程任务执行
线程任务执行涉及属性
1 | private final BlockingQueue<Runnable> workQueue;//任务队列 |
- workQueue
任务队列。用来存放任务,在任务量比较大,核心线程在执行任务的时候,会把新提交的任务放到任务队列中,如果任务队列再满了的话,会尝试创建临时线程来处理任务,如果临时线程全部满了的话,就会启动拒绝策略。 - mainLock
线程池主锁。对线程池本身的一些状态操作的时候,必须使用这个锁。 - handler
拒绝策略处理器。这拒绝策略处理器可以由调用着传入。已实现的拒绝策略有java.util.concurrent.ThreadPoolExecutor.AbortPolicy、java.util.concurrent.ThreadPoolExecutor.DiscardPolicy、java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy和java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy - keepAliveTime
临时线程等待超时时间。这个属性表示创建的临时线程在空闲的时候最长的等待时间,如果超过这个时间,线程就会结束掉。 - corePoolSize
核心线程数。表示的是整个线程池中最少存活的线程数。实际上,在任务数比较多的情况下,线程池会创建一些新的临时线程来执行任务,等到任务数降下来之后,临时线程就会逐渐减少,总的线程数会回落到核心线程数。值得注意的是,当线程池任务队列中没有任务的时候,所有线程都会处于等待状态,但是临时线程等待会超时后就会结束,而核心线程是不允许等待超时的,因此核心线程不会减少,这也保证了线程池的线程数量不会少于核心线程数。
执行execute
1 | public void execute(Runnable command) { |
addWorker方法
1 | /** |
Worker类
每一个Worker类对象代表一个线程
1 | private final class Worker extends AbstractQueuedSynchronizer implements Runnable { |
###runWorker方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 获取待执行的任务
while (task != null || (task = getTask()) != null) {
// 获取当前线程锁
w.lock();
// 检查当前线程状态是否允许运行
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行前处理。默认不处理,留给扩展实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行后处理。默认不处理,留给扩展实现
afterExecute(task, thrown);
}
} finally {
task = null;
// 计数器++
w.completedTasks++;
// 释放锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask方法
1 | /** |
processWorkerExit方法
1 | private void processWorkerExit(Worker w, boolean completedAbruptly) { |
总结
- 线程池(Thread Pool)是限制程序同一时刻运行线程数量的一种实现。线程池最多不会超过maximumPoolSize个线程在运行
- 提交给线程池的任务也是一个线程,有线程池工作线程负责调度执行。当线程池的活跃线程未超过corePoolSize个时,线程池创建核心线程调度执行任务;当线程的活跃线程数超过corePoolSize,且小于maximumPoolSize,线程池创建临时线程调度执行;当线程池活跃线程等于maximumPoolSize时,提交任务到任务队列;任务队列已满,则转入拒绝策略
- 线程池中每个活跃线程在调度执行完一个任务后,会从任务队列阻塞获取新的任务调度执行。非核心工作线程阻塞到超时,核心工作线程默认阻塞到队列有任务
- 临时工作线程(非核心)从任务队列获取新任务超时后进入回收逻辑