一、 Executor框架
Executor 框架是 Java5 之后引进的,在 Java 5 之后,通过 Executor 来启动线程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免 this 逃逸问题。
补充:this 逃逸是指在构造函数返回之前其他线程就持有该对象的引用. 调用尚未构造完全的对象的方法可能引发令人疑惑的错误。
Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor 框架让并发编程变得更加简单。
1.1 Executor框架的两级调度模型
在HotSpot VM的线程模型中,Java线程被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程也会被回收。os会调度所有线程并将它们分配给可用的CPU。
- 在上层,Java多线程程序通常把应用分解成为若干任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程。
- 在底层,操作系统内核将这些线程映射到硬件处理器上。
1.2 Executor框架的结构与成员
1.Executor框架的结构
Executor框架主要由3大部分组成:① 任务
执行任务需要实现的 Runnable 接口 或 Callable接口。Runnable 接口或 Callable 接口 实现类都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。② 任务的执行
包括任务执行机制的核心接口 Executor ,以及继承自 Executor 接口的 ExecutorService 接口。ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 这两个关键类实现了 ExecutorService 接口。
这里提了很多底层的类关系,但是,实际上我们需要更多关注的是 ThreadPoolExecutor 这个类,这个类在我们实际使用线程池的过程中,使用频率还是非常高的。注意: 通过查看 ScheduledThreadPoolExecutor 源代码我们发现 ScheduledThreadPoolExecutor 实际上是继承了 ThreadPoolExecutor 并实现了 ScheduledExecutorService ,而 ScheduledExecutorService 又实现了 ExecutorService,正如我们下面给出的类关系图显示的一样。
ThreadPoolExecutor 类描述:
//AbstractExecutorService实现了ExecutorService接口
public class ThreadPoolExecutor extends AbstractExecutorService
ScheduledThreadPoolExecutor 类描述:
//ScheduledExecutorService继承ExecutorService接口
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService
③ 异步计算的结果
Future 接口以及 Future 接口的实现类 FutureTask 类都可以代表异步计算的结果。
当我们把 Runnable接口 或 Callable 接口 的实现类提交给 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。(调用 submit() 方法时会返回一个 FutureTask 对象)
2. Executor框架的使用示意图
- 主线程首先要创建实现 Runnable 或者 Callable 接口的任务对象。工具类Executors可以通过调用方法(Executors.callable(Runnable task)或Execuotrs.callable(Runnable task,Object result))把一个Runnable对象封装成一个Callable对象。
- 把创建完成的实现 Runnable对象直接交给 ExecutorService 执行: ExecutorService.execute(Runnable command))或者也可以把 Runnable 对象或Callable 对象提交给 ExecutorService 执行(ExecutorService.submit(Runnable task)或 ExecutorService.submit(Callable
task))。 - 如果执行 ExecutorService.submit(…),ExecutorService 将返回一个实现Future接口的对象(我们刚刚也提到过了执行 execute()方法和 submit()方法的区别,submit()会返回一个 FutureTask 对象)。由于 FutureTask 实现了 Runnable,我们也可以创建 FutureTask,然后直接交给 ExecutorService 执行。
- 最后,主线程可以执行 FutureTask.get()方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。
3. Executor框架的成员
① ThreadPoolExecutor
ThreadPoolExecutor通常使用工厂类Executors来创建。Executors可以创建3种类型的ThreadPoolExecutor。
- FixedThreadPool——创建使用固定线程数的FixedThreadPool。 ```java public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
- SingleThreadExecutor——创建使用单个线程的SingleThreadExecutor
```java
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
- CachedThreadPool——创建一个会根据需要创建新线程的。 ```java public static ExecutorService newCachedThreadPool()
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
<a name="J0eh7"></a>
#### ② ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor通常使用工厂类Executors来创建。Executors可以创建2种类型的ScheduledThreadPoolExecutor。
- ScheduledThreadPoolExecutor——包含若干个线程的ScheduledThreadPoolExecutor
```java
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory)
- SingleThreadScheduledExecutor——只包含一个线程的ScheduledThreadPoolExecutor ```java public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)
<a name="gEvkY"></a>
#### ③ Future接口
Future接口和实现Future接口的FutureTask类用来表示异步计算的结果。当我们把Runnable接口或Callable接口的实现类提交(submit)给ThreadPoolExecutor或ScheduledThreadPoolExecutor时,ThreadPoolExecutor或ScheduledThreadPoolExecutor会向我们返回一个FutureTask对象。
<a name="YEWBi"></a>
#### ④ Runnable接口和Callable接口
Runnable接口和Callable接口的实现类都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行。它们之间的区别在于Runnable不会返回结果,而Callable可以返回结果。
<a name="ahzs9"></a>
# 二、ThreadPoolExecutor 详解
ThreadPoolExecutor 类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么),这里就不贴代码讲了,比较简单。
```java
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(
int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor 3 个最重要的参数(第六章单独讲解):
- corePoolSize : 核心线程数线程数定义了最小可以同时运行的线程数量。
- maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
- workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
ThreadPoolExecutor其他常见参数:
- keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
- unit : keepAliveTime 参数的时间单位。
- threadFactory :executor 创建新线程的时候会用到。
- handler :饱和策略。关于饱和策略下面单独介绍一下。
下面这张图可以加深你对线程池中各个参数的相互关系的理解:
ThreadPoolExecutor 饱和策略定义:
如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolTaskExecutor 定义一些策略:
- ThreadPoolExecutor.AbortPolicy :抛出 RejectedExecutionException来拒绝新任务的处理。
- ThreadPoolExecutor.CallerRunsPolicy :调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
- ThreadPoolExecutor.DiscardPolicy :不处理新任务,直接丢弃掉。
- ThreadPoolExecutor.DiscardOldestPolicy : 此策略将丢弃最早的未处理的任务请求。
举个例子:
Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 饱和策略的话来配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy。在默认情况下,ThreadPoolExecutor 将抛出 RejectedExecutionException 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy。当最大池被填满时,此策略为我们提供可伸缩队列。(这个直接查看 ThreadPoolExecutor 的构造函数源码就可以看出,比较简单的原因,这里就不贴代码了。)
2.1 FixedThreadPool详解
1. FixedThreadPool介绍
FixedThreadPool被称为可重用固定线程数的线程池。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool
的corePoolSize
和maximumPoolSize
都被设置成为创建FixedThreadPool
时指定的参数nThreads。
当线程池中的线程数大于corePoolSize
时,keepAliveTime
为多余的空闲线程设置等待新任务的最长时间,超过这个时间后多余的线程将被终止。这里把keepAliveTime
设置成0L,意味着多余的空闲线程会被立即终止
- 如果当前运行的线程数小于 corePoolSize, 如果再来新任务的话,就创建新的线程来执行任务;
- 当前运行的线程数等于 corePoolSize 后, 如果再来新任务的话,会将任务加入 LinkedBlockingQueue;
线程池中的线程执行完 手头的任务后,会在循环中反复从 LinkedBlockingQueue 中获取任务来执行;
2. 为什么不推荐使用FixedThreadPool?
FixedThreadPool 使用无界队列 LinkedBlockingQueue(队列的容量为 Integer.MAX_VALUE)作为线程池的工作队列会对线程池带来如下影响 :
当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize;
- 由于使用无界队列时 maximumPoolSize 将是一个无效参数,因为不可能存在任务队列满的情况。所以,通过创建 FixedThreadPool的源码可以看出创建的 FixedThreadPool 的 corePoolSize 和 maximumPoolSize 被设置为同一个值。
- 由于 1 和 2,使用无界队列时 keepAliveTime 将是一个无效参数;
- 运行中的 FixedThreadPool(未执行 shutdown()或 shutdownNow())不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出)。
2.2 SingleThreadExecutor详解
1. SingleThreadPool介绍
SingleThreadExecutor是使用单个worker线程的Executor。 ```java /* 返回只有一个线程的线程池 */ public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService
}(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue
从上面源代码可以看出新创建的 SingleThreadExecutor 的 corePoolSize 和 maximumPoolSize 都被设置为 1.其他参数和 FixedThreadPool 相同。SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量是Interger.MAX_VALUE)。
<a name="UxCgU"></a>
### 2. SingleThreadExecutor执行过程
![](https://cdn.nlark.com/yuque/0/2021/png/21408367/1630027065104-9233dd9b-7a2e-40ae-8529-1600c3447c71.png#clientId=u2d6c5e5f-6196-4&from=paste&id=uf222c0d3&margin=%5Bobject%20Object%5D&originHeight=456&originWidth=770&originalType=url&ratio=1&status=done&style=none&taskId=u04cab18a-eb60-4d2e-b201-ff82df7c2d8)
1. 如果当前运行的线程数少于 corePoolSize,则创建一个新的线程执行任务;
1. 当前线程池中有一个运行的线程后,将任务加入 LinkedBlockingQueue
1. 线程执行完当前的任务后,会在循环中反复从LinkedBlockingQueue 中获取任务来执行;
<a name="eWp5n"></a>
### 3. 为什么不推荐使用SingleThreadExecutor?
SingleThreadExecutor 使用无界队列 LinkedBlockingQueue 作为线程池的工作队列(队列的容量为 Intger.MAX_VALUE)。SingleThreadExecutor 使用无界队列作为线程池的工作队列会对线程池带来的影响与 FixedThreadPool 相同。说简单点就是可能会导致 OOM。
<a name="md13y"></a>
## 2.3 CachedThreadPool详解
<a name="UnxMN"></a>
### 1. CachedThreadPool介绍
CachedThreadPool 是一个会根据需要创建新线程的线程池。
```java
/**
* 创建一个线程池,根据需要创建新线程,但会在先前构建的线程可用时重用它。
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool 的corePoolSize 被设置为空(0),maximumPoolSize被设置为 Integer.MAX.VALUE,即它是无界的,这也就意味着如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新的线程。极端情况下,这样会导致耗尽 cpu 和内存资源。这里把keep Alive Time设置成60L,意味着CachedThreadPool中的空闲线程等待新任务的最长时间为60s,空闲线程超过60s后将会被终止。
2. CachedThreadPool的执行过程
- 首先执行 SynchronousQueue.offer(Runnable task) 提交任务到任务队列。如果当前 maximumPool 中有闲线程正在执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行 offer 操作与空闲线程执行的 poll 操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成,否则执行下面的步骤 2;
- 当初始 maximumPool 为空,或者 maximumPool 中没有空闲线程时,将没有线程执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤 1 将失败,此时 CachedThreadPool 会创建新线程执行任务,execute 方法执行完成;
- 在步骤2中创建的线程将任务执行完成后,会执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这个poll操作会让空闲线程最多在中SynchronousQueue等待60s。如果60s内主线程提交了一个新任务(主线程执行步骤1),那么这个空闲线程将执行主线程提交的任务;否则这个空闲线程将终止。由于空闲60s的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源。
SynchronousQueue是一个没有容量的阻塞队列。每个插入操作必须等待另一个线程的对应移除操作。CachedThreadPool使用SynchronousQueue,把主线程提交的任务传递给空闲线程执行。CachedThreadPool中任务传递的示意图如图:
三、ScheduledThreadPoolExecutor详解
3.1 ScheduledThreadPoolExecutor介绍
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运行,或者定期执行任务。ScheduledThreadPoolExecutor 使用的任务队列 DelayQueue 封装了一个 PriorityQueue,PriorityQueue 会对队列中的任务进行排序,执行所需时间短的放在前面先被执行(ScheduledFutureTask 的 time 变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行(ScheduledFutureTask 的 squenceNumber 变量小的先执行)。
3.2 ScheduledThreadPoolExecutor运行机制
DelayQueue是一个无界队列,所以ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中无意义。
ScheduledThreadPoolExecutor 的执行主要分为两大部分:
- 当调用 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate() 方法或者 scheduleWithFixedDelay() 方法时,会向 ScheduledThreadPoolExecutor 的 DelayQueue 添加一个实现了 RunnableScheduledFuture 接口的 ScheduledFutureTask 。
- 线程池中的线程从 DelayQueue 中获取 ScheduledFutureTask,然后执行任务。
ScheduledThreadPoolExecutor 为了实现周期性的执行任务,对 ThreadPoolExecutor做了如下修改:
- 使用 DelayQueue 作为任务队列;
- 获取任务的方不同
- 执行周期任务后,增加了额外的处理
3.3 ScheduledThreadPoolExecutor 的实现
ScheduledThreadPoolExecutor会把待调度的任务(ScheduledFutureTask)放到一个DelayQueue中。
ScheduledFutureTask主要包含3个成员变量:
- long型成员变量time,表示这个任务将要被执行的具体时间
- long型成员变量sequenceNumber,表示这个任务被添加到ScheduledThreadPoolExecutor中的序号
- long型成员变量period,表示任务执行的时间间隔
PriorityQueue 会对队列中的任务进行排序,执行所需时间短的放在前面先被执行(ScheduledFutureTask 的 time 变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行(ScheduledFutureTask 的 squenceNumber 变量小的先执行)。
- 线程 1 从 DelayQueue 中获取已到期的 ScheduledFutureTask(DelayQueue.take())。到期任务是指 ScheduledFutureTask的 time 大于等于当前系统的时间;
- 线程 1 执行这个 ScheduledFutureTask;
- 线程 1 修改 ScheduledFutureTask 的 time 变量为下次将要被执行的时间;
线程 1 把这个修改 time 之后的 ScheduledFutureTask 放回 DelayQueue 中(DelayQueue.add())。
四、FutureTask详解
4.1 FutureTask简介
Future接口和实现Future接口的FutureTask类,代表异步计算的结果。FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())。根据FutureTask.run()方法被执行的时机,FutureTask可以处于下面3中状态:
未启动。FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态。当创建一个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。
- 已启动。FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。
- 已完成。FutureTask.run()方法执行完成后正常结束,或被取消(FutureTask.cancel(…)),或执行FutureTask.run()方法时抛出异常而异常结束,FutureTask处于已完成状态。
FutureTask的状态迁移图:
- 当FutureTask处于未启动状态时:
- 执行FutureTask.get()方法将导致调用线程阻塞。
- 执行FutureTask.cancel()方法将导致此任务永远不会被执行。
- 当FutureTask处于已启动状态时:
- 执行FutureTask.get()方法将导致调用线程阻塞。
- 执行FutureTask.cancel(true)方法将以中断执行此任务线程的方式来试图停止任务。
- 执行FutureTask.cancel(false)方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成)。
当FutureTask处于已完成状态时:
- 执行FutureTask.get()方法将导致调用线程立即返回结果或抛出异常。
- 执行FutureTask.cancel(…)方法将返回false。
4.2 FutureTask的使用
可以把FutureTask交给Executor执行;也可以通过ExecutorService.submit(…)方法返回一个FutureTask,然后执行FutureTask.get()或FutureTask.cancel()。除此之外还可以单独使用FutureTask。
当一个线程需要等待另一个线程把某个任务执行完后他才能继续执行,此时可以使用FutureTask。假设有多个线程执行若干任务,每个任务最多只能被执行一次。当多个线程试图同时执行同一个任务时,只允许一个线程执行任务,其他线程需要等待这个任务执行完后才能继续执行。4.3 FutureTask的实现
FutureTask的实现基于AQS。每一个基于AQS实现的同步器都会包含两种类型的操作。如下:
至少一个acquire操作。这个操作阻塞调用线程,除非/直到AQS的状态允许这个线程继续执行。FutureTask的acquire操作为get()/get(long timeout,TimeUnit unit)方法调用
- 至少一个release操作。这个操作改变AQS的状态,改变后的状态可允许一个或多个阻塞线程被解除阻塞。FutureTask的release操作包括run()方法和cancel(…)方法。
基于“复合优先于继承”的原则,FutureTask声明了一个内部私有的继承于AQS的子类Sync,对FutureTask所有公共方法的调用都会委托给这个内部子类。
AQS被作为“模板方法模式”的基础类提供给FutureTask的内部子类Sync,这个内部子类只需要实现状态检查和状态更新的方法即可,这些方法将控制FutureTask的获取和释放操作。具体来说,Sync实现了AQS的tryAcquireShared(int arg)
方法和tryReleaseShared(int arg)
方法,Sync通过这两个方法来检查和更新哦同步状态。
FutureTask的设计示意图如图所示:
如图所示,Sync是FutureTask的内部私有类,它继承自AQS。创建FutureTask时会创建内部私有的成员对象Sync,FutureTask所有的公有方法都直接委托给了内部私有的Sync。
1. FutureTask.get()
FutureTask.get()方法会调用AQS.acquireSharedInterruptibly(int arg)方法,这个方法的执行过程如下:
- 调用AQS.acquireSharedInterruptibly(int arg)方法,这个方法首先会回调在子类Sync中实现的tryAcquireShared()方法来判断acquire操作是否可以成功。判断成功的条件是:state为执行完成状态RAN或已取消状态CANCELLED,且runner不为null
- 如果成功则get()方法立即返回,如果失败则到线程等待队列中去等待其他线程执行release操作。
- 当其他线程执行release操作(比如FutureTask.run()或FutureTask.cancel(…))唤醒当前线程后,当前线程再次执行tryAcquireShared()将返回正值1,当前线程将离开线程等待队列并唤醒它的后继线程(这里会产生级联唤醒的效果)。
-
2. FutureTask.run()
执行在构造函数中指定的任务(Callable.call())
- 以原子方式来更新同步状态(调用AQS.compareAndSetState(int expect,int update),设置state 为执行完成状态RAN)。如果这个原子操作成功,就设置代表计算结果的变量result的值为Callable.call()的返回值,然后调用AQS.releaseShared(int arg)。
- AQS.releaseShared(int arg)首先会回调在子类Sync中是实现的tryReleaseShared(arg)来执行release操作(设置运行任务的线程runner为nul,然后返回true);AQS.releaseShared(int arg),然后唤醒线程等待队列中的第一个线程。
- 调用FutureTask.done()
3. 总结
当执行FutureTask.get()方法时,如果FutureTask不是处于执行完成状态RAN或已取消状态CANCELLED,当前执行线程将到AQS的线程等待队列中等待。当某个线程执行FutureTask.run()方法或FutureTask.cancel(..)方法时,会唤醒线程等待队列的第一个线程。
五、线程池的实现原理
当向线程池提交一个任务之后,线程池的处理流程主要如图所示:
- 线程池判断核心线程池里的线程是否在执行任务。
- 如果不是,则创建一个新的工作线程来执行任务。
- 如果核心线程池里的线程都在执行任务,则进入下一个流程。
- 线程池判断工作队列是否与满。
- 如果工作队列没有满,则将提交的任务存储在这个个工作队列里。
- 如果工作队列满了,则进入下个流程
- 线程池判断线程池的线程是否都处于工作状态
- 如果没有,则创建一个新的工作线程来执行任务
- 如果满了,则交给饱和策略来处理这个任务
ThreadPoolExecutor执行execute()的示意图如图所示:
ThreadPoolExecutor执行execute方法分为4种情况:
- 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(执行这一步骤需要获取全局锁)。
- 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
- 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(执行这一步骤需要获取全局锁)。
- 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor采用这种思路,是为了在执行execute()方法时,尽可能地避免获取全局锁。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于或等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,也就避免了获取全局锁。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 如果线程数小于基本线程数,则创建线程并执行当前任务
if(poolSize>=corePoolSize || !addIfUnderCorePoolSize(command)){
// 如果线程数大于等于基本线程数或创建线程失败,则将当前任务放到工作队列中
if(runState==RUNNING && workQueue.offer(command)){
if(runState!= RUNNING || poolSize==0)
ensureQueuedTaskHandled(command);
}
//如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量,
//则创建一个线程执行任务
else if(!addIfUnderMaximumPoolSize(command))
//抛出RejectedExecutionException异常
reject(command);
}
}
线程池创建线程时,会将线程封装成工作线程worker,worker在执行完任务后,还会循环获取工作队列里的任务来执行。
public void run(){
try{
Runnable task=firstTask;
firstTask=null;
while(task!=null || (task=getTask()!=null)){
runTask(task);
task=null;
}
}finally{
workerDone(this);
}
}
线程池中的线程执行任务分为两种情况:
- 在execute()方法中创建一个线程,会让这个线程执行当前任务。
- 在这个线程执行完任务后,会反复从BlockingQueue获取任务来执行。
六、线程池的使用
6.1 线程池的各个参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:线程池的基本大小,当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了现成的prestartAllCoreThreads()方法,线程池将会提前创建并启动所有基本线程。
- runnableTaskQueue:任务队列,用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列:
- ArrayBlockingQueue——基于数组的有界阻塞队列,按FIFO原则对元素进行排序。
- LinkedBlockingQueue——基于链表的阻塞队列,按FIFO原则排序元素
- SynchronousQueue——不存储元素的阻塞队列。
- PriorityBlockingQueue——具有优先级的无限阻塞队列
- maximumPoolSize:线程池最大数量,线程池运行创建的最大线程数。如果队列满了,并且以创建的线程数小于最大线程数,则线程池会在创建新的线程执行任务。
- ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
- RejectedExecutionHandler:饱和策略,当队列和线程池都满了,说明线程池处于饱和状态,那么必须采用一种策略处理提交的任务。这个策略默认情况是AbortPolicy,表示无法处理新任务时抛出异常。
- AbortPolicy:直接抛出异常
- CallerRunsPolicy:只用调用者所在线程来运行任务
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务
- DiscardPolicy:不处理直接丢弃掉
- 实现RejectedExecutionHandler接口来自定义策略
- keepAliveTime:线程活动保持持剑,线程池的工作线程空闲后,保持存活的时间。
-
6.2 向线程池提交任务
可以使用两个方法向线程池提交任务:
execute():用于提交不需要返回值的任务
- submit():用于提交需要返回值的任务,线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值。get()会阻塞当前线程直到任务完成。
6.3 关闭线程池
可以调用线程池的shutdown或shutdownNow方法来关闭线程池,他们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。
但是shutdownNow首先将线程池的状态设置为STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。而shutdown方法只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。
七、线程池大小确定
线程池数量的确定一直是困扰着程序员的一个难题,大部分程序员在设定线程池大小的时候就是随心而定。
很多人甚至可能都会觉得把线程池配置过大一点比较好!我觉得这明显是有问题的。就拿我们生活中非常常见的一例子来说:并不是人多就能把事情做好,增加了沟通交流成本。你本来一件事情只需要 3 个人做,你硬是拉来了 6 个人,会提升做事效率嘛?我想并不会。 线程数量过多的影响也是和我们分配多少人做事情一样,对于多线程这个场景来说主要是增加了上下文切换成本。不清楚什么是上下文切换的话,可以看我下面的介绍。
上下文切换: 多线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。概括来说就是:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换。 上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。 Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。
类比于实现世界中的人类通过合作做某件事情,我们可以肯定的一点是线程池大小设置过大或者过小都会有问题,合适的才是最好。
如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的! CPU 根本没有得到充分利用。
但是,如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。
有一个简单并且适用面比较广的公式:
- CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
- I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
如何判断是 CPU 密集任务还是 IO 密集任务?
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。单凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。