概述

FutureTask 是接口 Future 的实现类,而 Future 表示一个异步计算的结果。Future 提供比如检查计算是否已完成、等待计算完成、获取计算结果待方法,让另一个线程可以同步阻塞获取其它线程的计算结果。一旦计算执行结束,任务就不能被重启或取消。

示例

下面代码展示了将 search 搜索过程交给线程池异步执行,稍后主线程通过调用 Future#get() 方法阻塞等待异步线程完成计算,当计算得到结果后会唤醒主线程,主线程就可以得到结果并返回。

  1. interface ArchiveSearcher { String search(String target); }
  2. class App {
  3. // 线程池,用于异步执行search任务
  4. ExecutorService executor = Executors.newFixedThreadPool(8);
  5. ArchiveSearcher searcher = new ArchiveSearcherImpl();
  6. // 搜索目录
  7. void showSearch(final String target) throws InterruptedException {
  8. // #1 交给线程池异常执行,并返回一个Future凭证,当前线程可以通过这个凭证获取计算结果
  9. Future<String> future = executor.submit(new Callable<String>() {
  10. public String call() {
  11. return searcher.search(target);
  12. }});
  13. // #2 当前线程可以继续执行自己逻辑,不会被阻塞。搜索过程在线程池中异步执行
  14. displayOtherThings();
  15. try {
  16. // #3 阻塞并等待Future返回结果
  17. displayText(future.get()); // use future
  18. } catch (ExecutionException ex) {
  19. cleanup();
  20. return;
  21. }
  22. }
  23. }

Future 接口

Future 是一个接口,定义了比如检查计算是否已经完成,获取计算结果等 API。

API 描述
boolean cancel(boolean) 尝试取消执行任务。如果遇到 ① 计算已完成 ② 任务已经取消 ③ 由于某些特殊原因无法被取消,那么这个操作会失败。如果方法执行成功,并且调用时任务还未执行,那么就不会继续执行任务。如果任务已开始执行,那么是否应该中断应取决于入参 boolean 值。
boolean isCancelled() true:任务在完成之前被取消。
false:任务没有被取消
boolean isDone true:任务已执行完成。以下情况会返回 rue:① 任务正常执行完毕,② 出现异常,③ 被取消。
false:
V get() throws InterruptedException, ExecutionException; 当前线程阻塞并等待计算完成。这个方法可能会出现三种异常:① CancellationException:任务被取消了
ExecutionException:计算过程出现异常
InterruptedException:当前阻塞的线程被中断
V get(long TimeUnit) 带超时的阻塞并获取计算结果

可以看到,Future 定义的 API 比较简洁,其中一个 API 用于取消任务、两个 API 用于检查判断、另有两个 API 用于计算结果的获取。另外提一点,get() 是可以响应线程中断的。

实现类:FutureTask

继承体系

FutureTask.png
FutureTask 实现了 RunnableFuture 接口,而 RunnableFuture 继承了 FutureRunnable 接口,所以 FutureTask 可以当作一个 Runnable 直接被 Thread 执行,此外,FutureTask 内部还有一个 Callable 变量,可以包装一个实现 Callable 接口的对象。

状态定义

每个任务都是拥有状态的,不同的任务状态对应不同的执行逻辑:

任务状态 描述
NEW(0) 初始状态,刚被创建,还未开始执行任务。
COMPLETING(1) 执行完成。任务已正常执行结束,或执行过程出现异常。但是执行结果还未保存到 outcome 字段。这个状态持续时间较短。
NORMAL(2) 执行完成,并且结果已成功写入 outcome 字段。属于最终态之一。
EXCEPTIONAL(3) 任务执行发生异常,且异常原因已成功写入 outcome 字段。属于最终态之一。
CANCELLED(4) 任务还未开始执行或在执行中被取消,用户调用了 cancel(false) 方法取消任务但不中断线程。属于最终态之一。
INTERRUPTING(5) 任务还未开始执行或在执行中被取消,用户调用了 cancel(true) 方法取消任务,但还未中断线程前的状态。属于一个中间态。
INTERRUPTED(6) 调用 interrupt() 中断任务执行线程之后状态从 (5) -> (6)。属于最终态之一。

FutureTask 状态转换.png
值得一提的是,任务的中间态的持续时间非常短,你可以大致理解为 state > 0 就意味着任务已执行完成。

内部类:WaitNode

WaitNodeFutureTask 的内部类,实现 FutureTask 的等待队列。队列的实现是一个意向链表,FutureTask 使用 WaitNode 包装所有等待任务执行完毕的线程,它们之间构成单向链表。
WaitNode 内部结构如下:

  1. static final class WaitNode {
  2. volatile Thread thread;
  3. volatile WaitNode next;
  4. WaitNode() { thread = Thread.currentThread(); }
  5. }

与 AQS 的双向队列相比,WaitNode 的实现就特别简单,节点没有复杂的状态标志位,只是一个简单的单向链表结构。
我们在查看 JDK 文档时会看到 Treiber Stack 关键字,意为无锁并发栈,通过 CAS 实现线程安全的并发栈。FutureTask 中由 WaitNode 构成的单向链表本质就是一个 Treiber Stack。在代码中只需要保存一个指向栈顶节点的指针就可以使用这个栈。
FutureTask 并发栈.png

核心变量

  1. // java.util.concurrent.FutureTask
  2. public class FutureTask<V> implements RunnableFuture<V> {
  3. // 任务状态
  4. private volatile int state;
  5. // 任务本体,因为只有实现 Callable 的类才有返回值
  6. private Callable<V> callable;
  7. // 任务计算所得到的结果,也是 get() 方法返回的数据
  8. private Object outcome; // non-volatile, protected by state reads/writes
  9. // 执行 callable 任务的线程
  10. private volatile Thread runner;
  11. // 并发栈,底层是单向链表 + CAS 操作实现的并发栈(Treiber stack)
  12. // 用于存放等待计算结果的线程
  13. private volatile WaitNode waiters;
  14. // ...
  15. }

总共有 5 个核心变量,这 5 个变量共同承载 FutureTask 核心逻辑。

构造函数

  1. public FutureTask(Callable<V> callable) {
  2. if (callable == null)
  3. throw new NullPointerException();
  4. this.callable = callable;
  5. this.state = NEW; // ensure visibility of callable
  6. }
  7. public FutureTask(Runnable runnable, V result) {
  8. this.callable = Executors.callable(runnable, result);
  9. this.state = NEW; // ensure visibility of callable
  10. }

FutureTask 共有两个构造函数:

  1. 直接传入 Callable 接口的实现类,这个接口是有返回值的。
  2. 对于已经继承 Runnable 接口的实现类来说,它们无法改变了。于是,使用一个适配器 Executors.callable(Runnable, V) 将 Runnable 和 结果 V 进行绑定。FutureTask 会把计算好的结果写入 result。

构造函数将任务状态设置为初始值 NEW。

源码解析

执行任务:run

FutureTask 实现 RunnableFuture 接口,它可以被提交到线程池中执行。线程池调用 FutureTask#run() 方法,在这个方法中执行任务。

  1. 首先检查 state 状态是否正常,如果不正常,直接 return。如果状态正常,通过 CAS 设置 runner 变量,如果设置失败,说明有其它线程执行当前的任务,也直接 return。
  2. 这一步就是执行任务了,一般是通过线程池执行任务。
  3. 如果在执行任务的过程中出现异常,进入 #5 代码分支:会将 state 状态设置为 EXCEPTIONAL 最终状态,并调用 finishCompletion() 唤醒所有阻塞在 get() 方法的线程。
  4. 如果任务正常执行完毕,进入 #6 代码分支:会将 state 状态设置为 NORMAL 最终状态,并调用 finishCompletion() 唤醒所有阻塞在 get() 方法的线程。
  5. 最后进入 finally 语句,主要是检查是否有遗漏的中断。如果发现 s >= INTERRUPTING,意味着执行任务的线程很可能被中断了。

    1. // java.util.concurrent.FutureTask#run
    2. public void run() {
    3. // #1 检查 state 状态是否为 NEW,且 CAS 设置 runner 为当前线程
    4. // 只要其中一个返回 false,则立即返回。说明任务已被其它线程修改了,当前线程来晚了
    5. if (state != NEW ||
    6. !UNSAFE.compareAndSwapObject(this, runnerOffset,
    7. null, Thread.currentThread()))
    8. return;
    9. try {
    10. // #2 更新成功,当前线程可以执行任务了
    11. Callable<V> c = callable;
    12. if (c != null && state == NEW) {
    13. V result;
    14. boolean ran;
    15. try {
    16. // #3 执行任务,并获取结果
    17. result = c.call();
    18. // #4 表示任务正常完成
    19. ran = true;
    20. } catch (Throwable ex) {
    21. // #5 出现异常,CAS 保留异常信息
    22. result = null;
    23. ran = false;
    24. setException(ex);
    25. }
    26. // #6 CAS 设置结果。因为有其它线程可能中断任务,所以需要CAS操作。set()操作流程如下:
    27. // ① CAS更新状态为 COMPLETING
    28. // ② 将 outcome 指向 result
    29. // ③ putOrderInt(state, NORMAL) 更新 state 为 NORMAL,并立即回写到主存中
    30. // ④ 移除并唤醒所有等待线程
    31. if (ran)
    32. set(result);
    33. }
    34. } finally {
    35. // #7 将 runner 置为 null
    36. runner = null;
    37. // #8 检查中断
    38. int s = state;
    39. if (s >= INTERRUPTING)
    40. handlePossibleCancellationInterrupt(s);
    41. }
    42. }

    任务正常完成:set(result)

    如果任务正常完成,就调用 set(V result) 方法设置结果并通知所有等待结果获取的线程。

    // java.util.concurrent.FutureTask#set
    protected void set(V v) {
     // #1 CAS 更新 state 状态为 COMPLETING
     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
         // #2 更新 result
         outcome = v;
    
         // #3 将 state 状态设置成 NORMAL,属于最终态之一
         UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
    
         // #4 唤醒所有阻塞在 get() 方法调用的线程
         finishCompletion();
     }
    }
    

    唤醒线程:finishCompletion

    这一步代码也比较简单,就是遍历由 WaitNode 构成的 Treiber Stack,并唤醒线程。在所有线程都被唤醒后,调用扩展方法 done() 以完成任务结束前最后额外工作,并清理 callable 。

    // java.util.concurrent.FutureTask#finishCompletion
    private void finishCompletion() {
     // ① 遍历由 WaitNode 所构成的单向链表,这里的 for 目的是确保 waiters == null
     for (WaitNode q; (q = waiters) != null;) {
         // #2 CAS 修改 waiters = null,相当于清空了 Treiber Stack
         if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
             // #3 遍历链表,挨个唤醒线程
             for (;;) {
                 Thread t = q.thread;
                 if (t != null) {
                     q.thread = null;
                     // #4 唤醒线程
                     LockSupport.unpark(t);
                 }
    
                 // #5 获取下一个节点
                 WaitNode next = q.next;
                 if (next == null)
                     break;
    
                 // #6 help GC
                 q.next = null;
                 q = next;
             }
             break;
         }
     }
    
     // #7 这是一个扩展点,用于实现一些任务执行结束前的额外操作
     done();
    
     // #8 清理 callable 任务
     callable = null;        // to reduce footprint
    }
    

    任务出现异常:setException(ex)

  6. 修改 state 状态。

  7. 将 outcome 设置为异常原因。
  8. 唤醒所有阻塞线程。

    protected void setException(Throwable t) {
     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
         outcome = t;
         UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
         finishCompletion();
     }
    }
    

    finally 语句

    finally {
     // #7 将 runner 置为 null
     runner = null;
    
     // #8 检查中断
     int s = state;
     if (s >= INTERRUPTING)
         handlePossibleCancellationInterrupt(s);
    }
    

    这里着重讲一下 finally 语句。在 finally 块中,我们先将 runner 属性置为 null,然后检查是否有遗漏的中断。如果发现 s >= INTERRUPTING,意味着执行任务的线程可能被中断了。
    那么,这里就有问题了:前面我们已经将 state 的状态设置为 NORMALEXCEPTIONAL,怎么会在 finally 语句中出现 INTERRUPTINGINTERRUPTED 状态呢? 这是因为,在多线程环境中,线程 A 正在执行任务,但是线程 B 取消了任务。此时其它线程有可能修改 state 的状态。这也是我们在设置终止状态时使用 Unsafe#putOrderInt 方法,而非 CAS 操作的原因:因为我们无法确保在修改 state 前是处于 COMPLETING 中间态还是 INTERRUPTING 中间态。
    我们不能对已完成任务的线程执行中断。

    handlePossibleCancellationInterrupt

    private void handlePossibleCancellationInterrupt(int s) {
     // It is possible for our interrupter to stall before getting a
     // chance to interrupt us.  Let's spin-wait patiently.
     if (s == INTERRUPTING)
         while (state == INTERRUPTING)
             Thread.yield(); // wait out pending interrupt
    
     // assert state == INTERRUPTED;
    
     // We want to clear any interrupt we may have received from
     // cancel(true).  However, it is permissible to use interrupts
     // as an independent mechanism for a task to communicate with
     // its caller, and there is no way to clear only the
     // cancellation interrupt.
     //
     // Thread.interrupted();
    }
    

    该方法是一个自旋操作,如果当前 state 状态是 INTERRUPTING,原地自旋直到 state 变为终止态。

    run 方法总结

    run 方法重点做了以下事情:

  9. 更新 runner 属性值:将它设置为当前线程。

  10. 调用 Callable#call 方法执行任务。
  11. 任务正常执行完毕。将 outcome 指向计算结果。任务执行过程中出现异常,将 outcome 执行异常栈。设置结果前,先将 state 设置中相关的中间态。
  12. 唤醒所有阻塞在 Future#get() 方法上的线程。
  13. 将 FutureTask 相关成员置为 null。
  14. 检查是否有遗漏的中断。如果有,线程自旋,直到中断状态变更为 INTERRUPTED 才退出。

    取消任务:cancel

    另一个线程可以调用 cancel 方法取消任务的执行。参数 mayInterruptIfRunning 表示如果能取消任务,是否中断执行任务的线程。需要注意的是,方法 cancel 是有概率失败的,即返回 false,情况如下:

  15. 任务已经执行完成了。无论正常完成还是出现异常。只要 state > NEW 就可以表示任务已完成执行。

  16. 任务已被取消过。
  17. 任务因某种原因无法被取消。

    // java.util.concurrent.FutureTask#cancel
    public boolean cancel(boolean mayInterruptIfRunning) {
     // #1 如果 state > NEW,可以说明任务已执行完毕,此刻就不允许中断任务
     if (!(state == NEW &&
           // #2 CAS 更新 state 状态为 INTERRUPTING 或 CANCELLED
           UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                                    mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
         return false;
     try {    // in case call to interrupt throws exception
    
         // #3 mayInterruptIfRunning = true 表示需要中断执行任务的中断
         if (mayInterruptIfRunning) {
             try {
                 Thread t = runner;
                 if (t != null)
                     // #4 调用 Thread.interrupt() 中断线程
                     t.interrupt();
             } finally {
                 // #5 设置 state 为 INTERRUPTED,并马上写回主存
                 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
             }
         }
     } finally {
         // #6 唤醒所有线程
         finishCompletion();
     }
    
     // #7 return
     return true;
    }
    

    从源码可以看到,方法 cancel() 也十分简单:

  18. 检查 state 是否为 NEW。并且 CAS 设置 state 状态为 INTERRUPTINGCANCELLED。如果任一条件不满足,返回 false。

  19. 状态设置成功,再判断入参参数 mayInterruptIfRunning ,如果为 true,意味着需要对任务执行线程调用 Thread.interrupt() 方法,并将 state 状态更改为 INTERRUPTED
  20. 唤醒所有正在阻塞的线程。

    获取任务结果:get

    Future 提供两个版本的 get,一个是一直阻塞的 get(),另一个是带超时时间的 get(long, TimeUnit)。我们就只讲 get() 的实现。
    get() 方法比较简单,判断 state 是否 <= COMPLETING,如果是,则调用 awaitDone 入队。

    public V get() throws InterruptedException, ExecutionException {
     int s = state;
     if (s <= COMPLETING)
         s = awaitDone(false, 0L);
     return report(s);
    }
    

    awaitDone

    // java.util.concurrent.FutureTask#awaitDone
    private int awaitDone(boolean timed, long nanos) throws InterruptedException {
     final long deadline = timed ? System.nanoTime() + nanos : 0L;
     WaitNode q = null;
     boolean queued = false;
     // #1 死循环
     for (;;) {
         // #2 判断当前线程是否被中断
         if (Thread.interrupted()) {
             // #3 如果被中断,需要从队列中移除当前线程
             removeWaiter(q);
             // #4 抛出异常
             throw new InterruptedException();
         }
    
         // #5 获取任务状态
         int s = state;
    
         // #6 s > COMPLETING 说明任务已完成,可以直接返回了
         if (s > COMPLETING) {
             if (q != null)
                 q.thread = null;
             return s;
         }
    
         // #7 任务即将完成,先放弃一段CPU时间片,只是给CPU一个提示
         // 有没有用还是需要大量的测试来决定的
         else if (s == COMPLETING) 
             Thread.yield();
    
         // #8 创建 WaitNode 对象,创建过程可能任务已经完成,
         // 这在下一轮 for 循环再判断吧
         else if (q == null)
             q = new WaitNode();
    
         // #9 判断是否入队没有
         else if (!queued)
             // 没有入队,则CAS更新栈顶指针
             queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                  q.next = waiters, q);
    
         // #10 判断是否已经超时了
         else if (timed) {
             nanos = deadline - System.nanoTime();
             if (nanos <= 0L) {
                 // #11 已经超时所设定的时间,移除WaiterNode节点,可能比较耗时
                 removeWaiter(q);
                 return state;
             }
             // #12 阻塞线程
             LockSupport.parkNanos(this, nanos);
         }
         else
             // #13 阻塞线程
             LockSupport.park(this);
     }
    }
    

    removeWaiter

    private void removeWaiter(WaitNode node) {
     if (node != null) {
         // #1 将 thread 变量置为 null
         node.thread = null;
         retry:
         // 死循环
         for (;;) {          // restart on removeWaiter race
    
             // #2 pred 保存前驱节点,这是用来修改指针引用的:想要删除某个节点,只需将前驱节点的next指针指向下一个节点即可
             for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                 s = q.next;
                 if (q.thread != null)
                     pred = q;
                 else if (pred != null) {
                     pred.next = s;
                     if (pred.thread == null) // check for race
                         continue retry;
                 }
                 // 栈顶结点出栈操作:将栈顶元素指向下一个节点。如果 CAS 失败,还会重来
                 // 即便 CAS 成功,还是会呆在 for 循环中,遍历栈中所有节点,删除 node.thread == null 的节点
                 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
                     continue retry;
             }
             break;
         }
     }
    }
    

    总结

    FutureTask 实现 Runnable 和 Future 接口,可以把 FutureTask 交给线程池执行。同时,由于实现 Runnable 接口的类没有返回值,FutureTask 也提供了包装类。
    FutureTask 内部实现 Treiber Stack 用来存储阻塞于 get() 方法调用的线程,一旦任务完成,就会唤醒所有的阻塞线程。遗憾的是,在获取任务结果时,如果任务还没有执行完成,则当前线程会自旋或挂起等待。