概述
FutureTask
是接口 Future
的实现类,而 Future
表示一个异步计算的结果。Future
提供比如检查计算是否已完成、等待计算完成、获取计算结果待方法,让另一个线程可以同步阻塞获取其它线程的计算结果。一旦计算执行结束,任务就不能被重启或取消。
示例
下面代码展示了将 search
搜索过程交给线程池异步执行,稍后主线程通过调用 Future#get()
方法阻塞等待异步线程完成计算,当计算得到结果后会唤醒主线程,主线程就可以得到结果并返回。
interface ArchiveSearcher { String search(String target); }
class App {
// 线程池,用于异步执行search任务
ExecutorService executor = Executors.newFixedThreadPool(8);
ArchiveSearcher searcher = new ArchiveSearcherImpl();
// 搜索目录
void showSearch(final String target) throws InterruptedException {
// #1 交给线程池异常执行,并返回一个Future凭证,当前线程可以通过这个凭证获取计算结果
Future<String> future = executor.submit(new Callable<String>() {
public String call() {
return searcher.search(target);
}});
// #2 当前线程可以继续执行自己逻辑,不会被阻塞。搜索过程在线程池中异步执行
displayOtherThings();
try {
// #3 阻塞并等待Future返回结果
displayText(future.get()); // use future
} catch (ExecutionException ex) {
cleanup();
return;
}
}
}
Future 接口
Future 是一个接口,定义了比如检查计算是否已经完成,获取计算结果等 API。
API | 描述 |
---|---|
boolean cancel(boolean) | 尝试取消执行任务。如果遇到 ① 计算已完成 ② 任务已经取消 ③ 由于某些特殊原因无法被取消,那么这个操作会失败。如果方法执行成功,并且调用时任务还未执行,那么就不会继续执行任务。如果任务已开始执行,那么是否应该中断应取决于入参 boolean 值。 |
boolean isCancelled() | true:任务在完成之前被取消。 false:任务没有被取消 |
boolean isDone | true:任务已执行完成。以下情况会返回 rue:① 任务正常执行完毕,② 出现异常,③ 被取消。 false: |
V get() throws InterruptedException, ExecutionException; | 当前线程阻塞并等待计算完成。这个方法可能会出现三种异常:① CancellationException:任务被取消了 ExecutionException:计算过程出现异常 InterruptedException:当前阻塞的线程被中断 |
V get(long TimeUnit) | 带超时的阻塞并获取计算结果 |
可以看到,Future 定义的 API 比较简洁,其中一个 API 用于取消任务、两个 API 用于检查判断、另有两个 API 用于计算结果的获取。另外提一点,get()
是可以响应线程中断的。
实现类:FutureTask
继承体系
FutureTask
实现了 RunnableFuture
接口,而 RunnableFuture
继承了 Future
和 Runnable
接口,所以 FutureTask 可以当作一个 Runnable 直接被 Thread 执行,此外,FutureTask 内部还有一个 Callable
变量,可以包装一个实现 Callable
接口的对象。
状态定义
每个任务都是拥有状态的,不同的任务状态对应不同的执行逻辑:
任务状态 | 描述 |
---|---|
NEW(0) | 初始状态,刚被创建,还未开始执行任务。 |
COMPLETING(1) | 执行完成。任务已正常执行结束,或执行过程出现异常。但是执行结果还未保存到 outcome 字段。这个状态持续时间较短。 |
NORMAL(2) | 执行完成,并且结果已成功写入 outcome 字段。属于最终态之一。 |
EXCEPTIONAL(3) | 任务执行发生异常,且异常原因已成功写入 outcome 字段。属于最终态之一。 |
CANCELLED(4) | 任务还未开始执行或在执行中被取消,用户调用了 cancel(false) 方法取消任务但不中断线程。属于最终态之一。 |
INTERRUPTING(5) | 任务还未开始执行或在执行中被取消,用户调用了 cancel(true) 方法取消任务,但还未中断线程前的状态。属于一个中间态。 |
INTERRUPTED(6) | 调用 interrupt() 中断任务执行线程之后状态从 (5) -> (6)。属于最终态之一。 |
值得一提的是,任务的中间态的持续时间非常短,你可以大致理解为 state > 0
就意味着任务已执行完成。
内部类:WaitNode
WaitNode
是 FutureTask
的内部类,实现 FutureTask 的等待队列。队列的实现是一个意向链表,FutureTask 使用 WaitNode
包装所有等待任务执行完毕的线程,它们之间构成单向链表。
WaitNode 内部结构如下:
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
与 AQS 的双向队列相比,WaitNode 的实现就特别简单,节点没有复杂的状态标志位,只是一个简单的单向链表结构。
我们在查看 JDK 文档时会看到 Treiber Stack 关键字,意为无锁并发栈,通过 CAS 实现线程安全的并发栈。FutureTask
中由 WaitNode
构成的单向链表本质就是一个 Treiber Stack。在代码中只需要保存一个指向栈顶节点的指针就可以使用这个栈。
核心变量
// java.util.concurrent.FutureTask
public class FutureTask<V> implements RunnableFuture<V> {
// 任务状态
private volatile int state;
// 任务本体,因为只有实现 Callable 的类才有返回值
private Callable<V> callable;
// 任务计算所得到的结果,也是 get() 方法返回的数据
private Object outcome; // non-volatile, protected by state reads/writes
// 执行 callable 任务的线程
private volatile Thread runner;
// 并发栈,底层是单向链表 + CAS 操作实现的并发栈(Treiber stack)
// 用于存放等待计算结果的线程
private volatile WaitNode waiters;
// ...
}
总共有 5 个核心变量,这 5 个变量共同承载 FutureTask 核心逻辑。
构造函数
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
FutureTask
共有两个构造函数:
- 直接传入 Callable 接口的实现类,这个接口是有返回值的。
- 对于已经继承 Runnable 接口的实现类来说,它们无法改变了。于是,使用一个适配器
Executors.callable(Runnable, V)
将 Runnable 和 结果 V 进行绑定。FutureTask 会把计算好的结果写入 result。
源码解析
执行任务:run
FutureTask 实现 RunnableFuture 接口,它可以被提交到线程池中执行。线程池调用 FutureTask#run()
方法,在这个方法中执行任务。
- 首先检查 state 状态是否正常,如果不正常,直接 return。如果状态正常,通过 CAS 设置 runner 变量,如果设置失败,说明有其它线程执行当前的任务,也直接 return。
- 这一步就是执行任务了,一般是通过线程池执行任务。
- 如果在执行任务的过程中出现异常,进入
#5
代码分支:会将 state 状态设置为EXCEPTIONAL
最终状态,并调用finishCompletion()
唤醒所有阻塞在get()
方法的线程。 - 如果任务正常执行完毕,进入
#6
代码分支:会将 state 状态设置为NORMAL
最终状态,并调用finishCompletion()
唤醒所有阻塞在get()
方法的线程。 最后进入
finally
语句,主要是检查是否有遗漏的中断。如果发现s >= INTERRUPTING
,意味着执行任务的线程很可能被中断了。// java.util.concurrent.FutureTask#run
public void run() {
// #1 检查 state 状态是否为 NEW,且 CAS 设置 runner 为当前线程
// 只要其中一个返回 false,则立即返回。说明任务已被其它线程修改了,当前线程来晚了
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
// #2 更新成功,当前线程可以执行任务了
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// #3 执行任务,并获取结果
result = c.call();
// #4 表示任务正常完成
ran = true;
} catch (Throwable ex) {
// #5 出现异常,CAS 保留异常信息
result = null;
ran = false;
setException(ex);
}
// #6 CAS 设置结果。因为有其它线程可能中断任务,所以需要CAS操作。set()操作流程如下:
// ① CAS更新状态为 COMPLETING
// ② 将 outcome 指向 result
// ③ putOrderInt(state, NORMAL) 更新 state 为 NORMAL,并立即回写到主存中
// ④ 移除并唤醒所有等待线程
if (ran)
set(result);
}
} finally {
// #7 将 runner 置为 null
runner = null;
// #8 检查中断
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
任务正常完成:set(result)
如果任务正常完成,就调用
set(V result)
方法设置结果并通知所有等待结果获取的线程。// java.util.concurrent.FutureTask#set protected void set(V v) { // #1 CAS 更新 state 状态为 COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // #2 更新 result outcome = v; // #3 将 state 状态设置成 NORMAL,属于最终态之一 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // #4 唤醒所有阻塞在 get() 方法调用的线程 finishCompletion(); } }
唤醒线程:finishCompletion
这一步代码也比较简单,就是遍历由
WaitNode
构成的 Treiber Stack,并唤醒线程。在所有线程都被唤醒后,调用扩展方法done()
以完成任务结束前最后额外工作,并清理 callable 。// java.util.concurrent.FutureTask#finishCompletion private void finishCompletion() { // ① 遍历由 WaitNode 所构成的单向链表,这里的 for 目的是确保 waiters == null for (WaitNode q; (q = waiters) != null;) { // #2 CAS 修改 waiters = null,相当于清空了 Treiber Stack if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // #3 遍历链表,挨个唤醒线程 for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; // #4 唤醒线程 LockSupport.unpark(t); } // #5 获取下一个节点 WaitNode next = q.next; if (next == null) break; // #6 help GC q.next = null; q = next; } break; } } // #7 这是一个扩展点,用于实现一些任务执行结束前的额外操作 done(); // #8 清理 callable 任务 callable = null; // to reduce footprint }
任务出现异常:setException(ex)
修改 state 状态。
- 将 outcome 设置为异常原因。
唤醒所有阻塞线程。
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
finally 语句
finally { // #7 将 runner 置为 null runner = null; // #8 检查中断 int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }
这里着重讲一下 finally 语句。在 finally 块中,我们先将 runner 属性置为 null,然后检查是否有遗漏的中断。如果发现
s >= INTERRUPTING
,意味着执行任务的线程可能被中断了。
那么,这里就有问题了:前面我们已经将 state 的状态设置为NORMAL
或EXCEPTIONAL
,怎么会在 finally 语句中出现INTERRUPTING
和INTERRUPTED
状态呢? 这是因为,在多线程环境中,线程 A 正在执行任务,但是线程 B 取消了任务。此时其它线程有可能修改 state 的状态。这也是我们在设置终止状态时使用Unsafe#putOrderInt
方法,而非 CAS 操作的原因:因为我们无法确保在修改 state 前是处于COMPLETING
中间态还是INTERRUPTING
中间态。
我们不能对已完成任务的线程执行中断。handlePossibleCancellationInterrupt
private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let's spin-wait patiently. if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt // assert state == INTERRUPTED; // We want to clear any interrupt we may have received from // cancel(true). However, it is permissible to use interrupts // as an independent mechanism for a task to communicate with // its caller, and there is no way to clear only the // cancellation interrupt. // // Thread.interrupted(); }
该方法是一个自旋操作,如果当前 state 状态是
INTERRUPTING
,原地自旋直到 state 变为终止态。run 方法总结
run 方法重点做了以下事情:
更新 runner 属性值:将它设置为当前线程。
- 调用
Callable#call
方法执行任务。 - 任务正常执行完毕。将 outcome 指向计算结果。任务执行过程中出现异常,将 outcome 执行异常栈。设置结果前,先将 state 设置中相关的中间态。
- 唤醒所有阻塞在
Future#get()
方法上的线程。 - 将 FutureTask 相关成员置为 null。
检查是否有遗漏的中断。如果有,线程自旋,直到中断状态变更为
INTERRUPTED
才退出。取消任务:cancel
另一个线程可以调用
cancel
方法取消任务的执行。参数mayInterruptIfRunning
表示如果能取消任务,是否中断执行任务的线程。需要注意的是,方法cancel
是有概率失败的,即返回 false,情况如下:任务已经执行完成了。无论正常完成还是出现异常。只要
state > NEW
就可以表示任务已完成执行。- 任务已被取消过。
任务因某种原因无法被取消。
// java.util.concurrent.FutureTask#cancel public boolean cancel(boolean mayInterruptIfRunning) { // #1 如果 state > NEW,可以说明任务已执行完毕,此刻就不允许中断任务 if (!(state == NEW && // #2 CAS 更新 state 状态为 INTERRUPTING 或 CANCELLED UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception // #3 mayInterruptIfRunning = true 表示需要中断执行任务的中断 if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) // #4 调用 Thread.interrupt() 中断线程 t.interrupt(); } finally { // #5 设置 state 为 INTERRUPTED,并马上写回主存 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // #6 唤醒所有线程 finishCompletion(); } // #7 return return true; }
从源码可以看到,方法
cancel()
也十分简单:检查 state 是否为 NEW。并且 CAS 设置 state 状态为
INTERRUPTING
或CANCELLED
。如果任一条件不满足,返回 false。- 状态设置成功,再判断入参参数
mayInterruptIfRunning
,如果为 true,意味着需要对任务执行线程调用Thread.interrupt()
方法,并将 state 状态更改为INTERRUPTED
。 -
获取任务结果:get
Future
提供两个版本的 get,一个是一直阻塞的get()
,另一个是带超时时间的get(long, TimeUnit)
。我们就只讲get()
的实现。get()
方法比较简单,判断 state 是否<= COMPLETING
,如果是,则调用awaitDone
入队。public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
awaitDone
// java.util.concurrent.FutureTask#awaitDone private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; // #1 死循环 for (;;) { // #2 判断当前线程是否被中断 if (Thread.interrupted()) { // #3 如果被中断,需要从队列中移除当前线程 removeWaiter(q); // #4 抛出异常 throw new InterruptedException(); } // #5 获取任务状态 int s = state; // #6 s > COMPLETING 说明任务已完成,可以直接返回了 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // #7 任务即将完成,先放弃一段CPU时间片,只是给CPU一个提示 // 有没有用还是需要大量的测试来决定的 else if (s == COMPLETING) Thread.yield(); // #8 创建 WaitNode 对象,创建过程可能任务已经完成, // 这在下一轮 for 循环再判断吧 else if (q == null) q = new WaitNode(); // #9 判断是否入队没有 else if (!queued) // 没有入队,则CAS更新栈顶指针 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // #10 判断是否已经超时了 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { // #11 已经超时所设定的时间,移除WaiterNode节点,可能比较耗时 removeWaiter(q); return state; } // #12 阻塞线程 LockSupport.parkNanos(this, nanos); } else // #13 阻塞线程 LockSupport.park(this); } }
removeWaiter
private void removeWaiter(WaitNode node) { if (node != null) { // #1 将 thread 变量置为 null node.thread = null; retry: // 死循环 for (;;) { // restart on removeWaiter race // #2 pred 保存前驱节点,这是用来修改指针引用的:想要删除某个节点,只需将前驱节点的next指针指向下一个节点即可 for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; if (pred.thread == null) // check for race continue retry; } // 栈顶结点出栈操作:将栈顶元素指向下一个节点。如果 CAS 失败,还会重来 // 即便 CAS 成功,还是会呆在 for 循环中,遍历栈中所有节点,删除 node.thread == null 的节点 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } }
总结
FutureTask 实现 Runnable 和 Future 接口,可以把 FutureTask 交给线程池执行。同时,由于实现 Runnable 接口的类没有返回值,FutureTask 也提供了包装类。
FutureTask 内部实现 Treiber Stack 用来存储阻塞于get()
方法调用的线程,一旦任务完成,就会唤醒所有的阻塞线程。遗憾的是,在获取任务结果时,如果任务还没有执行完成,则当前线程会自旋或挂起等待。