内部状态
线程有五种状态:新建,就绪,运行,阻塞,死亡,线程池同样有五种状态:Running, SHUTDOWN, STOP, TIDYING, TERMINATED。\
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
变量ctl定义为AtomicInteger ,其功能非常强大,记录了“线程池中的任务数量”和“线程池的状态”两个信息。共32位,其中高3位表示”线程池状态”,低29位表示”线程池中的任务数量”。
RUNNING -- 对应的高3位值是111。
SHUTDOWN -- 对应的高3位值是000。
STOP -- 对应的高3位值是001。
TIDYING -- 对应的高3位值是010。
TERMINATED -- 对应的高3位值是011。
RUNNING:处于RUNNING状态的线程池能够接受新任务,以及对新添加的任务进行处理。
SHUTDOWN:处于SHUTDOWN状态的线程池不可以接受新任务,但是可以对已添加的任务进行处理。
STOP:处于STOP状态的线程池不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
TIDYING:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
TERMINATED:线程池彻底终止的状态。
各个状态的转换如下:
ThreadPoolExecutor
核心参数
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
**ThreadPoolExecutor**
3 个最重要的参数:
**corePoolSize**
: 核心线程数线程数定义了最小可以同时运行的线程数量。**maximumPoolSize**
: 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。**workQueue**
: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
ThreadPoolExecutor
其他常见参数:
**keepAliveTime**
:当线程池中的线程数量大于corePoolSize
的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime
才会被回收销毁;**unit**
:keepAliveTime
参数的时间单位。**threadFactory**
:executor 创建新线程的时候会用到。
用于设置创建线程的工厂。该对象可以通过Executors.defaultThreadFactory(),如下:
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
返回的是DefaultThreadFactory对象,源码如下:
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
ThreadFactory的左右就是提供创建线程的功能的线程工厂。他是通过newThread()方法提供创建线程的功能,newThread()方法创建的线程都是“非守护线程”而且“线程优先级都是Thread.NORM_PRIORITY”。
**handler**
:拒绝策略。
线程池提供了四种拒绝策略:
- AbortPolicy:直接抛出异常,默认策略;
- CallerRunsPolicy:用调用者所在的线程来执行任务;
- DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
- DiscardPolicy:直接丢弃任务;
当然我们也可以实现自己的拒绝策略,例如记录日志等等,实现RejectedExecutionHandler接口即可。
常见的workQueue
1.SynchronousQueue::不存储元素的阻塞队列,这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大(CacheThreadPool使用)
2.LinkedBlockingQueue:这个队列接收到任务的时候,如果当前已经创建的核心线程数小于线程池的核心线程数上限,则新建线程(核心线程)处理任务;如果当前已经创建的核心线程数等于核心线程数上限,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize(FixedThreadPool 与 SingleThreadExecutor)
3.ArrayBlockingQueue:可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误,或是执行实现定义好的饱和策略
4.DelayQueue:队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务(ScheduledThreadPoolExecutor)
下面这张图可以加深你对线程池中各个参数的相互关系的理解(图片来源:《Java 性能调优实战》):
阿里巴巴推荐使用ThreadPoolExecutor创建线程池的原因
在《阿里巴巴 Java 开发手册》“并发处理”这一章节,明确指出线程资源必须通过线程池提供,不允许在应用中自行显示创建线程。
为什么呢?
使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
另外《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
Executors 返回线程池对象的弊端如下:
**FixedThreadPool**
和**SingleThreadExecutor**
: 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。- CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
方式一:通过**ThreadPoolExecutor**
构造函数实现(推荐) 方式二:通过 Executor 框架的工具类 Executors 来实现 我们可以创建三种类型的 ThreadPoolExecutor:
- FixedThreadPool
- SingleThreadExecutor
- CachedThreadPool
- WorkStealingPool
- ScheduledThreadPool
内部其实都是通过ThreadPoolExecutor实现。
线程池原理
线程首先会先执行 5 个任务,然后这些任务有任务被执行完的话,就会去拿新的任务执行。
// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int workerCountOf(int c) {
return c & CAPACITY;
}
//任务队列
private final BlockingQueue<Runnable> workQueue;
public void execute(Runnable command) {
// 如果任务为null,则抛出异常。
if (command == null)
throw new NullPointerException();
// ctl 中保存的线程池当前的一些状态信息
int c = ctl.get();
// 下面会涉及到 3 步 操作
// 1.首先判断当前线程池中之行的任务数量是否小于 corePoolSize
// 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.如果当前之行的任务数量大于等于 corePoolSize 的时候就会走到这里
// 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果当前线程池为空就新创建一个线程并执行。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
//如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
else if (!addWorker(command, false))
reject(command);
}
常用线程池
FixedThreadPool
/**
* 创建一个可重用固定数量线程的线程池
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
从上面源代码可以看出新创建的 **FixedThreadPool**
的 **corePoolSize**
和 **maximumPoolSize**
都被设置为 nThreads,这个 nThreads 参数是我们使用的时候自己传递的。
执行任务过程介绍
FixedThreadPool
的 execute()
方法运行示意图(该图片来源:《Java 并发编程的艺术》):
上图说明:
- 如果当前运行的线程数小于 corePoolSize, 如果再来新任务的话,就创建新的线程来执行任务;
- 当前运行的线程数等于 corePoolSize 后, 如果再来新任务的话,会将任务加入
LinkedBlockingQueue
; 线程池中的线程执行完 手头的任务后,会在循环中反复从
LinkedBlockingQueue
中获取任务来执行;为什么不推荐使用?
FixedThreadPool
**FixedThreadPool**
使用无界队列**LinkedBlockingQueue**
(队列的容量为 Integer.MAX_VALUE)作为线程池的工作队列会对线程池带来如下影响 :当线程池中的线程数达到
corePoolSize
后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize;- 由于使用无界队列时
maximumPoolSize
将是一个无效参数,因为不可能存在任务队列满的情况。所以,通过创建FixedThreadPool
的源码可以看出创建的FixedThreadPool
的corePoolSize
和maximumPoolSize
被设置为同一个值。 - 由于 1 和 2,使用无界队列时
keepAliveTime
将是一个无效参数; - 运行中的
FixedThreadPool
(未执行shutdown()
或shutdownNow()
)不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出)。
SingleThreadExecutor
SingleThreadExecutor
是只有一个线程的线程池。下面看看SingleThreadExecutor 的实现:
/**
*返回只有一个线程的线程池
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
从上面源代码可以看出新创建的 SingleThreadExecutor
的 corePoolSize
和 maximumPoolSize
都被设置为 1.其他参数和 FixedThreadPool
相同。
执行任务过程介绍
**SingleThreadExecutor**
的运行示意图(该图片来源:《Java 并发编程的艺术》):
上图说明;
- 如果当前运行的线程数少于 corePoolSize,则创建一个新的线程执行任务;
- 当前线程池中有一个运行的线程后,将任务加入
LinkedBlockingQueue
- 线程执行完当前的任务后,会在循环中反复从
LinkedBlockingQueue
中获取任务来执行;
为什么不推荐使用?SingleThreadExecutor
SingleThreadExecutor
使用无界队列 LinkedBlockingQueue
作为线程池的工作队列(队列的容量为 Intger.MAX_VALUE)。SingleThreadExecutor
使用无界队列作为线程池的工作队列会对线程池带来的影响与 FixedThreadPool
相同。说简单点就是可能会导致 OOM。
CachedThreadPool
CachedThreadPool
是一个会根据需要创建新线程的线程池。下面通过源码来看看 CachedThreadPool
的实现:
/**
* 创建一个线程池,根据需要创建新线程,但会在先前构建的线程可用时重用它。
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool
的corePoolSize
被设置为空(0),maximumPoolSize
被设置为 Integer.MAX.VALUE,即它是无界的,这也就意味着如果主线程提交任务的速度高于 maximumPool
中线程处理任务的速度时,CachedThreadPool
会不断创建新的线程。极端情况下,这样会导致耗尽 cpu 和内存资源。
执行任务过程介绍
CachedThreadPool 的 execute()方法的执行示意图(该图片来源:《Java 并发编程的艺术》):
上图说明:
- 首先执行
SynchronousQueue.offer(Runnable task)
提交任务到任务队列。如果当前maximumPool
中有闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)
,那么主线程执行 offer 操作与空闲线程执行的poll
操作配对成功,主线程把任务交给空闲线程执行,execute()
方法执行完成,否则执行下面的步骤 2; - 当初始
maximumPool
为空,或者maximumPool
中没有空闲线程时,将没有线程执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)
。这种情况下,步骤 1 将失败,此时CachedThreadPool
会创建新线程执行任务,execute 方法执行完成;
为什么不推荐使用?CachedThreadPool
CachedThreadPool
允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
ScheduledThreadPoolExecutor
**ScheduledThreadPoolExecutor**
主要用来在给定的延迟后运行任务,或者定期执行任务。 这个在实际项目中基本不会被用到,因为有其他方案选择比如quartz
。大家只需要简单了解一下它的思想。关于如何在 Spring Boot 中 实现定时任务,可以查看这篇文章《5 分钟搞懂如何在 Spring Boot 中 Schedule Tasks》。**ScheduledThreadPoolExecutor**
使用的任务队列 **DelayQueue**
封装了一个 **PriorityQueue**
,**PriorityQueue**
会对队列中的任务进行排序,执行所需时间短的放在前面先被执行(**ScheduledFutureTask**
的 **time**
变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行(**ScheduledFutureTask**
的 **squenceNumber**
变量小的先执行)。**ScheduledThreadPoolExecutor**
和 **Timer**
的比较:
Timer
对系统时钟的变化敏感,ScheduledThreadPoolExecutor
不是;Timer
只有一个执行线程,因此长时间运行的任务可以延迟其他任务。ScheduledThreadPoolExecutor
可以配置任意数量的线程。 此外,如果你想(通过提供 ThreadFactory),你可以完全控制创建的线程;- 在
TimerTask
中抛出的运行时异常会杀死一个线程,从而导致Timer
死机:-( …即计划任务将不再运行。ScheduledThreadExecutor
不仅捕获运行时异常,还允许您在需要时处理它们(通过重写afterExecute
方法ThreadPoolExecutor
)。抛出异常的任务将被取消,但其他任务将继续运行。
综上,在 JDK1.5 之后,你没有理由再使用 Timer 进行任务调度了。
备注: Quartz 是一个由 java 编写的任务调度库,由 OpenSymphony 组织开源出来。在实际项目开发中使用 Quartz 的还是居多,比较推荐使用 Quartz。因为 Quartz 理论上能够同时对上万个任务进行调度,拥有丰富的功能特性,包括任务调度、任务持久化、可集群化、插件等等。
运行机制
**ScheduledThreadPoolExecutor**
的执行主要分为两大部分:
- 当调用
ScheduledThreadPoolExecutor
的**scheduleAtFixedRate()**
方法或者**scheduleWithFixedDelay()**
方法时,会向ScheduledThreadPoolExecutor
的**DelayQueue**
添加一个实现了**RunnableScheduledFuture**
接口的**ScheduledFutureTask**
。 - 线程池中的线程从
DelayQueue
中获取ScheduledFutureTask
,然后执行任务。
**ScheduledThreadPoolExecutor**
为了实现周期性的执行任务,对 **ThreadPoolExecutor**
做了如下修改:
- 使用
**DelayQueue**
作为任务队列; - 获取任务的方不同
- 执行周期任务后,增加了额外的处理
ScheduledThreadPoolExecutor 执行周期任务的步骤
- 线程 1 从
DelayQueue
中获取已到期的ScheduledFutureTask(DelayQueue.take())
。到期任务是指ScheduledFutureTask
的 time 大于等于当前系统的时间; - 线程 1 执行这个
ScheduledFutureTask
; - 线程 1 修改
ScheduledFutureTask
的 time 变量为下次将要被执行的时间; 线程 1 把这个修改 time 之后的
ScheduledFutureTask
放回DelayQueue
中(DelayQueue.add()
)。任务提交
线程池根据业务不同的需求提供了两种方式提交任务:Executor.execute()、ExecutorService.submit()。其中ExecutorService.submit()可以获取该任务执行的Future。
我们以Executor.execute()为例,来看看线程池的任务提交经历了那些过程。
定义:public interface Executor { void execute(Runnable command); }
execute
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
执行流程如下:
如果线程池当前线程数小于corePoolSize,则调用addWorker创建新线程执行任务,成功返回true,失败执行步骤2。
- 如果线程池处于RUNNING状态,则尝试加入阻塞队列,如果加入阻塞队列成功,则尝试进行Double Check,如果加入失败,则执行步骤3。
- 如果线程池不是RUNNING状态或者加入阻塞队列失败,则尝试创建新线程直到maxPoolSize,如果失败,则调用reject()方法运行相应的拒绝策略。
在步骤2中如果加入阻塞队列成功了,则会进行一个Double Check的过程。Double Check过程的主要目的是判断加入到阻塞队里中的线程是否可以被执行。如果线程池不是RUNNING状态,则调用remove()方法从阻塞队列中删除该任务,然后调用reject()方法处理任务。否则需要确保还有线程执行。
addWorker
当线程中的当前线程数小于corePoolSize,则调用addWorker()创建新线程执行任务,当前线程数则是根据ctl变量来获取的,调用workerCountOf(ctl)获取低29位即可:
private static int workerCountOf(int c) { return c & CAPACITY; }
addWorker(Runnable firstTask, boolean core)方法用于创建线程执行任务,源码如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 获取当前线程状态
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 内层循环,worker + 1
for (;;) {
// 线程数量
int wc = workerCountOf(c);
// 如果当前线程数大于线程最大上限CAPACITY return false
// 若core == true,则与corePoolSize 比较,否则与maximumPoolSize ,大于 return false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// worker + 1,成功跳出retry循环
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS add worker 失败,再次读取ctl
c = ctl.get();
// 如果状态不等于之前获取的state,跳出内层循环,继续去外层循环判断
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 新建线程:Worker
w = new Worker(firstTask);
// 当前线程
final Thread t = w.thread;
if (t != null) {
// 获取主锁:mainLock
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 线程状态
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN ==> 线程处于RUNNING状态
// 或者线程处于SHUTDOWN状态,且firstTask == null(可能是workQueue中仍有未执行完成的任务,创建没有初始任务的worker线程执行)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 当前线程已经启动,抛出异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers是一个HashSet<Worker>
workers.add(w);
// 设置最大的池大小largestPoolSize,workerAdded设置为true
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
// 启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 线程启动失败
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
- 判断当前线程是否可以添加任务,如果可以则进行下一步,否则return false;
- rs >= SHUTDOWN ,表示当前线程处于SHUTDOWN ,STOP、TIDYING、TERMINATED状态
- rs == SHUTDOWN , firstTask != null时不允许添加线程,因为线程处于SHUTDOWN 状态,不允许添加任务
- rs == SHUTDOWN , firstTask == null,但workQueue.isEmpty() == true,不允许添加线程,因为firstTask == null是为了添加一个没有任务的线程然后再从workQueue中获取任务的,如果workQueue == null,则说明添加的任务没有任何意义。
- 内嵌循环,通过CAS worker + 1
- 获取主锁mailLock,如果线程池处于RUNNING状态获取处于SHUTDOWN状态且 firstTask == null,则将任务添加到workers Queue中,然后释放主锁mainLock,然后启动线程,然后return true,如果中途失败导致workerStarted= false,则调用addWorkerFailed()方法进行处理。
在这里需要好好理论addWorker中的参数,在execute()方法中,有三处调用了该方法:
第一次:workerCountOf(c) < corePoolSize ==> addWorker(command, true),这个很好理解,当然线程池的线程数量小于 corePoolSize ,则新建线程执行任务即可,在执行过程core == true,内部与corePoolSize比较即可。
第二次:加入阻塞队列进行Double Check时,else if (workerCountOf(recheck) == 0) ==>addWorker(null, false)。如果线程池中的线程==0,按照道理应该该任务应该新建线程执行任务,但是由于已经该任务已经添加到了阻塞队列,那么就在线程池中新建一个空线程,然后从阻塞队列中取线程即可。
第三次:线程池不是RUNNING状态或者加入阻塞队列失败:else if (!addWorker(command, false)),这里core == fase,则意味着是与maximumPoolSize比较。
在新建线程执行任务时,将讲Runnable包装成一个Worker,Woker为ThreadPoolExecutor的内部类
Woker内部类
Woker的源码如下:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
从Worker的源码中我们可以看到Woker继承AQS,实现Runnable接口,所以可以认为Worker既是一个可以执行的任务,也可以达到获取锁释放锁的效果。这里继承AQS主要是为了方便线程的中断处理。这里注意两个地方:构造函数、run()。构造函数主要是做三件事:1.设置同步状态state为-1,同步状态大于0表示就已经获取了锁,2.设置将当前任务task设置为firstTask,3.利用Worker本身对象this和ThreadFactory创建线程对象。
当线程thread启动(调用start()方法)时,其实就是执行Worker的run()方法,内部调用runWorker()。
runWorker
final void runWorker(Worker w) {
// 当前线程
Thread wt = Thread.currentThread();
// 要执行的任务
Runnable task = w.firstTask;
w.firstTask = null;
// 释放锁,运行中断
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
// worker 获取锁
w.lock();
// 确保只有当线程是stoping时,才会被设置为中断,否则清楚中断标示
// 如果线程池状态 >= STOP ,且当前线程没有设置中断状态,则wt.interrupt()
// 如果线程池状态 < STOP,但是线程已经中断了,再次判断线程池是否 >= STOP,如果是 wt.interrupt()
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 自定义方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
// 完成任务数 + 1
w.completedTasks++;
// 释放锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
运行流程
- 根据worker获取要执行的任务task,然后调用unlock()方法释放锁,这里释放锁的主要目的在于中断,因为在new Worker时,设置的state为-1,调用unlock()方法可以将state设置为0,这里主要原因就在于interruptWorkers()方法只有在state >= 0时才会执行;
- 通过getTask()获取执行的任务,调用task.run()执行,当然在执行之前会调用worker.lock()上锁,执行之后调用worker.unlock()放锁;
- 在任务执行前后,可以根据业务场景自定义beforeExecute() 和 afterExecute()方法,则两个方法在ThreadPoolExecutor中是空实现;
- 如果线程执行完成,则会调用getTask()方法从阻塞队列中获取新任务,如果阻塞队列为空,则根据是否超时来判断是否需要阻塞;
task == null或者抛出异常(beforeExecute()、task.run()、afterExecute()均有可能)导致worker线程终止,则调用processWorkerExit()方法处理worker退出流程。
getTask()
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { // 线程池状态 int c = ctl.get(); int rs = runStateOf(c); // 线程池中状态 >= STOP 或者 线程池状态 == SHUTDOWN且阻塞队列为空,则worker - 1,return null if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 判断是否需要超时控制 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 从阻塞队列中获取task // 如果需要超时控制,则调用poll(),否则调用take() Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
timed == true,调用poll()方法,如果在keepAliveTime时间内还没有获取task的话,则返回null,继续循环。timed == false,则调用take()方法,该方法为一个阻塞方法,没有任务时会一直阻塞挂起,直到有任务加入时对该线程唤醒,返回任务。
processWorkerExit()
在runWorker()方法中,无论最终结果如何,都会执行processWorkerExit()方法对worker进行退出处理。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// true:用户线程运行异常,需要扣减
// false:getTask方法中扣减线程数量
if (completedAbruptly)
decrementWorkerCount();
// 获取主锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 从HashSet中移出worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// 有worker线程移除,可能是最后一个线程退出需要尝试终止线程池
tryTerminate();
int c = ctl.get();
// 如果线程为running或shutdown状态,即tryTerminate()没有成功终止线程池,则判断是否有必要一个worker
if (runStateLessThan(c, STOP)) {
// 正常退出,计算min:需要维护的最小线程数量
if (!completedAbruptly) {
// allowCoreThreadTimeOut 默认false:是否需要维持核心线程的数量
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果min ==0 或者workerQueue为空,min = 1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果线程数量大于最少数量min,直接返回,不需要新增线程
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 添加一个没有firstTask的worker
addWorker(null, false);
}
}
首先completedAbruptly的值来判断是否需要对线程数-1处理,如果completedAbruptly == true,说明在任务运行过程中出现了异常,那么需要进行减1处理,否则不需要,因为减1处理在getTask()方法中处理了。然后从HashSet中移出该worker,过程需要获取mainlock。然后调用tryTerminate()方法处理,该方法是对最后一个线程退出做终止线程池动作。如果线程池没有终止,那么线程池需要保持一定数量的线程,则通过addWorker(null,false)新增一个空的线程。
addWorkerFailed()
在addWorker()方法中,如果线程t==null,或者在add过程出现异常,会导致workerStarted == false,那么在最后会调用addWorkerFailed()方法:
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 从HashSet中移除该worker
if (w != null)
workers.remove(w);
// 线程数 - 1
decrementWorkerCount();
// 尝试终止线程
tryTerminate();
} finally {
mainLock.unlock();
}
}
线程终止
线程池ThreadPoolExecutor提供了shutdown()和shutDownNow()用于关闭线程池。
shutdown():按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。
shutdownNow() :尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。
shutdown
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 推进线程状态
advanceRunState(SHUTDOWN);
// 中断空闲的线程
interruptIdleWorkers();
// 交给子类实现
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate();
}
shutdownNow
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
// 中断所有线程
interruptWorkers();
// 返回等待执行的任务列表
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
与shutdown不同,shutdownNow会调用interruptWorkers()方法中断所有线程。
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
同时会调用drainQueue()方法返回等待执行到任务列表。
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
线程池处理异常方法
线程池执行流程
java中的线程池用的是ThreadPoolExecutor,真正执行代码的部分是runWorker方法:final void runWorker(Worker w)
// 省略无关部分
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
可以看到,程序会捕获包括Error在内的所有异常,并且在程序最后,将出现过的异常和当前任务传递给afterExecute方法。
而ThreadPoolExecutor中的afterExecute方法是没有任何实现的。
protected void afterExecute(Runnable r, Throwable t) { }
对于线程池、包括线程的异常处理推荐一下方式:
1 直接try/catch,个人 基本都是用这种方式
2 线程直接重写UncaughtExceptionHandler()方法:
Thread t = new Thread();
t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
LOGGER.error(t + " throws exception: " + e);
}
});
//如果是线程池的模式:
ExecutorService threadPool = Executors.newFixedThreadPool(1, r -> {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler(
(t1, e) -> LOGGER.error(t1 + " throws exception: " + e));
return t;
});
//这种方式submit依然不会被设置的setUncaughtExceptionHandler方法捕获的 而execute是可以的
//如果使用submit,要么用get判断 要么自己写try catch
3 也可以直接重写protected void afterExecute(Runnable r, Throwable t) { }方法
线程池大小确定
线程池数量的确定一直是困扰着程序员的一个难题,大部分程序员在设定线程池大小的时候就是随心而定。
很多人甚至可能都会觉得把线程池配置过大一点比较好!我觉得这明显是有问题的。就拿我们生活中非常常见的一例子来说:并不是人多就能把事情做好,增加了沟通交流成本。你本来一件事情只需要 3 个人做,你硬是拉来了 6 个人,会提升做事效率嘛?我想并不会。 线程数量过多的影响也是和我们分配多少人做事情一样,对于多线程这个场景来说主要是增加了上下文切换成本。不清楚什么是上下文切换的话,可以看我下面的介绍。
上下文切换: 多线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。概括来说就是:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换。 上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。 Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。
类比于实现世界中的人类通过合作做某件事情,我们可以肯定的一点是线程池大小设置过大或者过小都会有问题,合适的才是最好。
如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的! CPU 根本没有得到充分利用。
但是,如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。
有一个简单并且适用面比较广的公式:
- CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
- I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
如何判断是 CPU 密集任务还是 IO 密集任务?
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。单凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。
Executors
主要用来创建线程池,代理了线程池的创建,使得你的创建入口参数变得简单
重要方法
- newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
- newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
- newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
- newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
常见对比
vs
Runnable``Callable
Runnable
自 Java 1.0 以来一直存在,但Callable
仅在 Java 1.5 中引入,目的就是为了来处理Runnable
不支持的用例。Runnable
接口不会返回结果或抛出检查异常,但是Callable
接口可以。所以,如果任务不需要返回结果或抛出异常推荐使用 **`Runnable` 接口,这样代码看起来会更加简洁。
工具类Executors
可以实现Runnable
对象和Callable
对象之间的相互转换。(Executors.callable(Runnable task
)或Executors.callable(Runnable task,Object resule)
)。 ```java @FunctionalInterface public interface Runnable { /- 被线程执行,没有返回值也无法抛出异常 */ public abstract void run(); }
@FunctionalInterface
public interface Callable
* 计算结果,或在无法这样做时抛出异常。
* @return 计算得出的结果
* @throws 如果无法计算结果,则抛出异常
*/
V call() throws Exception;
}
<a name="aGiKw"></a>
#### [ vs ](https://snailclimb.gitee.io/javaguide/#/./docs/java/multi-thread/java%E7%BA%BF%E7%A8%8B%E6%B1%A0%E5%AD%A6%E4%B9%A0%E6%80%BB%E7%BB%93?id=_432-execute-vs-submit)`execute()``submit()`
1. `**execute()**`**方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;**
1. `**submit()**`**方法用于提交需要返回值的任务。线程池会返回一个 **`**Future**`** 类型的对象,通过这个 **`**Future**`** 对象可以判断任务是否执行成功** ,并且可以通过 `Future` 的 `get()`方法来获取返回值,`get()`方法会阻塞当前线程直到任务完成,而使用 `get(long timeout,TimeUnit unit)`方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
我们以**`AbstractExecutorService`**接口中的一个 `submit` 方法为例子来看看源代码:
```java
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
上面方法调用的 newTaskFor
方法返回了一个 FutureTask
对象。
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
我们再来看看execute()
方法:
public void execute(Runnable command) {
...
}
VSshutdown()``shutdownNow()
**shutdown()**
:关闭线程池,线程池的状态变为SHUTDOWN
。线程池不再接受新任务了,但是队列里的任务得执行完毕。**shutdownNow()**
:关闭线程池,线程的状态变为STOP
。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。VS
isTerminated()``isShutdown()
**isShutDown**
当调用shutdown()
方法后返回为 true。**isTerminated**
当调用shutdown()
方法后,并且所有提交的任务完成后返回为 true