1. 线程池
线程池是一种多线程处理形式,是线程的集合,线程池在系统启动时即创建大量空闲的线程,程序将一个任务传给线程池,线程池就会启动一条线程来执行这个任务,执行结束以后再次将线程返回线程池中成为空闲状态,等待执行下一个任务。
线程池状态
private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
RUNNING
- 该状态的线程池会接收新任务,并处理阻塞队列中的任务
- 调用线程池的shutdown()方法,可以切换到SHUTDOWN状态
- 调用线程池的shutdownNow()方法,可以切换到STOP状态
SHUTDOWN
- 该状态的线程池不会接收新任务,但会处理阻塞队列中的任务
- 队列为空,并且线程池中执行的任务也为空,进入TIDYING状态
STOP
- 该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务
- 线程池中执行的任务为空,进入TIDYING状态
TIDYING
- 该状态表明所有的任务已经运行终止,记录的任务数量为0
- terminated()执行完毕,进入TERMINATED状态
TERMINATED
- 该状态表示线程池彻底终止
线程池优点
- 统一分配、管理和监控线程
- 可以复用池中的线程,避免频繁创建和销毁,降低资源的消耗,提高响应速度
- 控制最大并发数
ThreadPoolExecutor常见方法

execute是Executor中声明的方法,适用于不需要关注返回值的场景,只需要将线程丢到线程池中去执行就可以了。
submit是ExecutorService中声明的方法,适用于需要关注返回值的场景。底层还是调用execute()方法,它利用了Future来获取任务执行结果。
shutdownNow:对正在执行的任务全部发出Thread.interrupt()终止所有线程,对还未开始执行的任务全部取消,并且返回还没开始的任务列表。
shutdown:当我们调用shutdown后,线程池将不再接受新的任务,但也不会去强制终止已经提交或者正在执行中的任务。
awaitTermination:判断线程池中是否有继续运行的线程。waitTermination方法接收timeout和TimeUnit两个参数,用于设定超时时间及单位。当等待超过设定时间时,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。一般情况下会和shutdown方法组合使用。
2. 常见线程池及使用场景
newFixedThreadPool 固定数量线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- 核心线程数和最大线程数大小一样
keepAliveTime为0, 意味着一旦有多余的空闲线程,就会被立即停- 阻塞队列是
LinkedBlockingQueue(队列容量Integer.MAX_VALUE) - 实际线程数量将永远维持着在nthreads maximumPoolSize和keepAliveTime将无效
场景:用于已知并发压力的情况下,对线程数做限制,适用于处理CPU密集型的任务。
newSingleThreadExecutor 单个线程线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 核心线程数和最大线程数大小一样且都是1
keepAliveTime为0- 阻塞队列是
LinkedBlockingQueue
newCachedThreadPool 可缓存线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 核心线程数为0,且最大线程数为
Integer.MAX_VALUE - 阻塞队列是
SynchronousQueue - 非核心线程空闲存活时间为60秒
- 当线程池大小超过了处理任务所需的线程,那么就会回收部分空闲(一般是60秒无执行)的线程,当有任务来时,又智能添加新线程来执行
newScheduledThreadPool 定时线程池
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
- 最大线程数为
Integer.MAX_VALUE - 阻塞队列是
DelayedWorkQueue keepAliveTime为0scheduleAtFixedRate():按某种速率周期执行scheduleWithFixedDelay():在某个延迟后执行
newWorkStealingPool 抢占式线程池
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
- Java8中新增的线程池,实际上是ForkJoinPool
- 采用fork join算法。将一个任务可以分解为多个小任务 ( 任务不能十分简单 )
不建议使用 Executors静态工厂构建线程池
不允许使用Executors静态工厂构建线程池,要通过ThreadPoolExecutor这样的方式,会更加明确线程池的运行规则,规避资源耗尽的风险。
1)FixedThreadPool 和 SingleThreadPool
允许的请求队列(底层实现是LinkedBlockingQueue)长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
2)CachedThreadPool 和 ScheduledThreadPool
允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。3. 线程池参数
corePoolSize
核心线程数量,会一直存在除非allowCoreThreadTimeOut设置为true。默认情况下线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法。
CPU密集是该任务需要大量的运算,而没有阻塞,CPU一直全速运行。
IO密集型是该任务需要大量的IO,即大量的阻塞。
对于 CPU 密集型的计算场景,理论上线程的数量 =CPU 核数+1;
对于 I/O 密集型计算场景,最佳线程数 =CPU 核数 [ 1 +(I/O 耗时 / CPU 耗时)],经验值=**2 CPU 的核数 + 1**
1)提交到线程池中的任务,IO耗时占比是90%,计算耗时占比10%,忽略提交到线程池中的任务数量,在4C8G的机器上,理想情况下线程池中创建多少个线程是最优的(40)。
2)有一类cpu密集性的任务,没有IO操作,日常的时候只有1个任务,流量高峰会有50个任务,4C8G的机器上,使用的线程池,如何设置corePoolSize, maxPoolSize以及BlockingQueue的大小。
- 提交第一个任务,创建出core,1个线程
- 提交第二个到第47个任务时,这些任务进入到队列中,此时队列已满
- 提交第48个任务到第50个任务时,创建出max,此时一共有4个线程
- 4个线程同时将队列里的46个任务消费完
一段时间后,max - core数量的线程销毁,即销毁3个线程,还剩下一个线程
maximumPoolSize
线程池允许的最大线程池数量,表示在线程池中最多能创建多少个线程。
keepAliveTime、unit
keepAliveTime表示线程没有任务执行时最多保持多久时间会终止。默认情况下当线程数量超过corePoolSize,keepAliveTime才会起作用。即空闲线程的最大超时时间,直到线程池中的线程数不超过corePoolSize。 值为0表示,线程不会被回收。
unit:超时时间的单位。//TimeUnit类中有7种静态属性 TimeUnit.DAYS; //天 TimeUnit.HOURS; //小时 TimeUnit.MINUTES; //分钟 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //纳秒workQueue
工作队列,保存未执行的Runnable 任务。
**BlockingQueue一个基于数组的有界阻塞队列
- 按 FIFO(先进先出)原则对元素进行排序。
LinkedBlockingQueue
- 一个基于链表结构的阻塞队列
- 此队列按FIFO (先进先出) 排序元素,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE
- 吞吐量通常要高ArrayBlockingQueue
- 静态工厂方法Executors.newFixedThreadPool()使用了这个队列
DelayQueue
- 任务定时周期的延迟执行的队列
- 根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序
- newScheduledThreadPool线程池使用了这个队列
SynchronousQueue
- 一个不存储元素的阻塞队列
- 每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态
- 吞吐量通常要高于LinkedBlockingQueue
- 静态工厂方法Executors.newCachedThreadPool使用了这个队列
- 使用SynchronoousQueue队列,通常要设置很大的maximumPoolSize值,否则很容易执行拒绝策略
PriorityBlockingQueue
- 具有优先级的无限阻塞队列
按优先级对元素进行排序,元素的优先级是通过自然顺序或Comparator来定义的
threadFactory
创建线程的工厂类。 JDK提供了线程工厂的默认实现DefaultThreadFactory,但还是建议自定义实现,这样可以自定义线程创建的过程,例如线程分组、自定义线程名称等。
handler
拒绝策略, 当任务队列存满并且线程池个数达到maximunPoolSize后采取的策略。ThreadPoolExecutor中已经包含四种处理策略。
AbortPolicy策略:默认拒绝策略,该策略会直接抛出 RejectedExecutionException 异常
- CallerRunsPolicy 策略:只要线程池未关闭,该策略直接在调用者线程中运行当前的被丢弃的任务
- DiscardPolicy策略:丢弃无法处理的任务, 不抛异常
DiscardOleddestPolicy策略: 丢弃最老的一个请求,即把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。
除了JDK默认提供的四种拒绝策略,我们可以根据自己的业务需求去自定义拒绝策略,自定义时直接实现RejectedExecutionHandler接口即可。 ```java public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } /**
*在调用任务的线程中执行 */public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) { //在调用者线程者线程中,运行当前被丢弃的任务。 //显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降 r.run(); }} }
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
/**
* 队列满了,丢掉任务,不会抛出异常,其实就是什么也不做
* 该策略陌陌的丢弃无法处理的任务,不进行任何处理
*/
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
/**
* 该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { }
/**
* 将队列的首部任务直接丢弃,然后执行当前的任务
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
<a name="ybpvc"></a>
### 工作原理
```java
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- 1)execute方法提交一个任务,线程池里存活的核心线程数小于线程数corePoolSize时,线程池会创建一个核心线程去处理提交的任务, 即使此时线程池中存在空闲线程。

- 2)如果线程池核心线程数已满,即线程数已经等于corePoolSize,一个新提交的任务,会被放进任务队列workQueue排队等待执行。

- 3)当线程池里面存活的线程数已经等于corePoolSize了,并且任务队列workQueue也满,判断线程数是否达到maximumPoolSize,即最大线程数是否已满,如果没到达,创建一个非核心线程执行提交的任务。

- 4)如果当前的线程数达到了maximumPoolSize,还有新的任务过来的话,直接采用拒绝策略处理。

- 5)当线程池中的线程执行完任务空闲时,会尝试从workQueue中取头结点任务执行。

- 6) 当线程池中线程数超过corePoolSize,并且未配置allowCoreThreadTimeOut=true,空闲时间超过keepAliveTime的线程会被销毁,保持线程池中线程数为corePoolSize。 ;当设置allowCoreThreadTimeOut=true时,任何空闲时间超过keepAliveTime的线程都会被销毁。

4.底层原理
变量说明

ctl用于表示线程池的状态和线程数,在ThreadPoolExecutor中使用32位二进制数来表示线程池的状态和线程池中线程数量,其中前3位表示线程池状态,后29位表示线程池中线程数。

RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED分别表示线程池的不同状态,转算后的结果如下。 
细节说明
submit()、execute()提交线程任务
- 如果当前核心线程数量没达到最大值corePoolSize,创建新线程来执行此任务
- 如果当前核心线程到达最大,向阻塞队列添加任务
- 如果核心线程已满,阻塞队列已满,尝试开启非核心线程来执行任务
- 如果线程池不处于RUNNABLE状态,或者处于饱和状态,执行任务拒绝策略
addWorker方法
第一部分是检查线程池的状态,如果不满足条件会直接返回false,然后进入死循环等待成功增加线程,如果增加线程成功,那么就可以进入第二部分,真正新增线程,执行任务。
将线程和任务封装到了Worker中,然后将Worker添加到HashSet集合中,添加成功后通过线程对象的start方法启动线程执行任务。
- 线程池处于 RUNNBALE 或者处于 SHUTDOWN 并在阻塞队列里还有任务时,需要添加新线程。自旋确保 CAS 成功,然后添加新线程
- 线程存于Worker,线程池存有Worker信息,就能访问线程
- 线程启动失败,则移除Worker,销毁线程
内部类Worker
Worker继承了AbstractQueuedSynchronizer,并且实现了Runnable接口。
runWorker方法执行任务
线程首个任务为firstTask,之后通过getTask()就从阻塞队列里任务。 使用循环通过getTask方法不断从阻塞队列中获取任务执行,如果任务不为空则执行任务,这里实现了线程的复用,不断的获取任务执行,不用重新创建线程;队列中获取的任务为null,则将Worker从HashSet集合中清除,注意这个清除就是空闲线程的回收。
线程池提供了beforeExecute()和afterExecute()通知子类任务执行前后的回调,让子类有时机能执行自己的事情。
getTask方法
线程池里的线程从阻塞队列里拿任务,如果存在非核心线程,假设阻塞队列里没有任务,那么非核心线程也要在等到keepAliveTime时间后才会释放。如果当前仅有核心线程存在,如果允许释放核心线程的话,也就和非核线程的处理方式一样,反之,则通过take()一直阻塞直到拿到任务,这也就是线程池里的核心线程为什么不死的原因。
processWorkerExit方法释放线程
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 到这里说明线程中断,先通过decrementWorkerCount()减少线程数值
// 否则,说明是线程没有从阻塞队列获取到线程
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// completedTaskCount记录线程池总共完成的任务
// w.completedTasks则是线程完成的任务数
completedTaskCount += w.completedTasks;
// 移除Worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// 线程池状态改变,尝试中止线程池
tryTerminate();
int c = ctl.get();
// 检查线程池状态,线程池处于RUNNABLE或者SHUTDOWN则进入
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 线程池最小数量,取决于是否能释放核心线程
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果任务队列还有线程,最起码都要有一个线程来处理任务
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
// 因为线程中断,可能导致没有线程来执行阻塞队列里的任务
// 因此尝试创建线程去执行任务
addWorker(null, false);
}
}
5.线程池一定比使用单线程高效?
Redis就是单线程的,但它却非常高效,基本操作都能达到十万量级/s。从线程这个角度来看,部分原因在于:多线程带来线程上下文切换开销,单线程就没有这种开销。
总结





