一,Future和FutureTask的关系

1.Future

  1. /**
  2. * @author 二十
  3. * @since 2021/8/28 3:50 下午
  4. */
  5. public class FutureTaskTest {
  6. public static void main(String[] args) {
  7. ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
  8. Future<String> future = executor.submit(() -> "callable");
  9. Future<?> future1 = executor.submit(() -> System.out.println("runnable"));
  10. FutureTask<String> futureTask = new FutureTask<>(() -> "hahah");
  11. }
  12. }

当我们调用线程池的submit()方法的时候,会给我们返回一个Future类型的对象。那么这个Future又是什么?

  1. public interface Future<V> {
  2. //取消任务
  3. boolean cancel(boolean mayInterruptIfRunning);
  4. //是否取消了任务
  5. boolean isCancelled();
  6. //是否执行完了任务
  7. boolean isDone();
  8. //获取线程的执行结果
  9. V get() throws InterruptedException, ExecutionException;
  10. //获取结果超时
  11. V get(long timeout, TimeUnit unit)
  12. throws InterruptedException, ExecutionException, TimeoutException;
  13. }

这个接口主要的功能就是可以获取到线程执行完成后的结果。

2.两者的关系

image.png

FutureTask可以看做是FutureRunnable 两个接口的实现类。

二,FutureTask源码

1.属性

  1. //表示当前任务的状态
  2. private volatile int state;
  3. //任务尚未执行
  4. private static final int NEW = 0;
  5. //任务还未结束
  6. private static final int COMPLETING = 1;
  7. //任务正常执行结束
  8. private static final int NORMAL = 2;
  9. //任务发生了异常,由callable.call()向上抛出
  10. private static final int EXCEPTIONAL = 3;
  11. //任务被取消
  12. private static final int CANCELLED = 4;
  13. //任务正在中断
  14. private static final int INTERRUPTING = 5;
  15. //任务已经被中断
  16. private static final int INTERRUPTED = 6;
  17. //submit(runnable/callable) 使用装饰者模式将runnable装饰成callable
  18. private Callable<V> callable;
  19. //正常情况下:用来保存call的执行结果,异常情况下:用来保存call抛出的异常
  20. private Object outcome; // non-volatile, protected by state reads/writes
  21. //执行当前任务的线程
  22. private volatile Thread runner;
  23. //因为会有很多线程去get当前任务的结果,所以 这里使用了一种数据结构 stack 头插 头取 的一个队列。
  24. private volatile WaitNode waiters;

2.WaitNode内部类

这个类里面封装着执行任务的线程和指向下一个线程的指针。

  1. static final class WaitNode {
  2. volatile Thread thread;
  3. volatile WaitNode next;
  4. WaitNode() { thread = Thread.currentThread(); }
  5. }

3.构造器

当我们传入一个runnable接口的时候:

  1. public FutureTask(Runnable runnable, V result) {
  2. this.callable = Executors.callable(runnable, result);
  3. this.state = NEW; // ensure visibility of callable
  4. }

调用了Executors.callable(runnable, result);

  1. public static <T> Callable<T> callable(Runnable task, T result) {
  2. if (task == null)
  3. throw new NullPointerException();
  4. return new RunnableAdapter<T>(task, result);
  5. }

当任务不为空,实际上返回的是一个RunnableAdapter对象。

  1. static final class RunnableAdapter<T> implements Callable<T> {
  2. final Runnable task;
  3. final T result;
  4. RunnableAdapter(Runnable task, T result) {
  5. this.task = task;
  6. this.result = result;
  7. }
  8. public T call() {
  9. task.run();
  10. return result;
  11. }
  12. }

在这里,将runnable装饰成了callable。

4.run()

  1. public void run() {
  2. if(任务不是刚创建||任务没有获取到执行权){
  3. return
  4. }
  5. try{
  6. if(如果任务不为空&&任务没有被其他线程执行){
  7. 声明 result
  8. 声明 ran = true
  9. try{
  10. 调用call方法执行任务,并将结果赋值给result
  11. ran = true
  12. }catch(Exception e){
  13. result=null
  14. ran =false
  15. 调用setException(ex)
  16. }
  17. if(ran){
  18. 执行set(result)
  19. }
  20. }
  21. }catch(Exception e){
  22. }finally{
  23. 释放任务执行者
  24. if(任务状态为被打断中或者已经被打断){
  25. 执行 handlePossibleCancellationInterrupt(s)
  26. }
  27. }
  28. }

5.set()

  1. protected void set(V v){
  2. if(cas的方式设置当前任务的状态为完成中成功){
  3. 将任务结果赋值给outcome
  4. 设置当前任务状态为正常结束
  5. 调用finishCompletion()
  6. }
  7. }

6.setException()

  1. protected void setException(Throwable t) {
  2. if (cas的方式设置当前任务的状态为完成中成功) {
  3. 将异常赋值给outcome
  4. 设置当前任务状态为异常
  5. finishCompletion();
  6. }
  7. }

7.handlePossibleCancellationInterrupt()

  1. private void handlePossibleCancellationInterrupt(int s) {
  2. if (如果当前任务状态为被打断)
  3. while (当前任务状态为被打断)
  4. Thread.yield(); // 释放当前线程cpu的执行权
  5. }

8.finishCompletion()

  1. private void finishCompletion() {
  2. for循环让q指向当前链表的头节点
  3. //这里的操作是为了可能任务还没开始执行就被其他线程取消了,这个时候将等待结果的线程全部唤醒,小概率事件
  4. if(使用cas设置waitersnull成功){
  5. for(;;){
  6. 获取q包装的线程
  7. if(当前节点包装的线程不为空){
  8. 不让q在继续包装这个线程
  9. 唤醒q之前包装的线程
  10. }
  11. q指向链表的下一个节点
  12. if(下一个节点为空){
  13. break
  14. }
  15. }
  16. break
  17. }
  18. done()
  19. callable设置为null帮助gc
  20. }

9.done()

  1. protected void done() { }

10.get()

  1. public V get() throws InterruptedException, ExecutionException {
  2. 获取当前任务执行状态
  3. if (当前任务尚未执行完)
  4. s = awaitDone(false, 0L);
  5. return report(s);
  6. }

11.awaitDone()

  1. //参数说明:是否设置了超时时间 超时时间
  2. private int awaitDone(boolean timed, long nanos) throws InterruptedException {
  3. final long deadline = timed ? System.nanoTime() + nanos : 0L;
  4. 声明 当前线程是否进入获取结果的等待队列 =false
  5. for (;;) {
  6. if (如果当前线程被其他线程用中断的方式唤醒) {
  7. 当前节点出队列
  8. 并抛出中断异常
  9. }
  10. //假设当前线程是被其他线程以unpark的方式正常唤醒,那么就会走下面的逻辑
  11. 获取当前任务状态
  12. //大于完成中有可能正常结束,也有可能异常结束
  13. if (如果当前任务状态大于完成中) {
  14. if (如果当前线程创建过node)
  15. 释放node
  16. return 当前任务状态;
  17. }
  18. else if (当前任务状态是完成中)
  19. 释放线程的cpu执行权,接着等
  20. else if (等待节点为空,说明是第一次自旋,还没有创建节点)
  21. 创建一个新的节点
  22. else if (创建好了对象但是还没有入队)
  23. cas的方式加入队列
  24. else if (有超时时间) {
  25. 走超时逻辑
  26. }
  27. else
  28. 阻塞当前线程
  29. }
  30. }

12.report(int s)

  1. //参数说明:当前任务状态
  2. private V report(int s) throws ExecutionException {
  3. if (任务正常结束)
  4. return 任务结果;
  5. if (任务被取消或中断)
  6. 抛异常
  7. 否则就是任务发生异常,将异常向上抛出
  8. }

13.cancel()

  1. public boolean cancel(boolean mayInterruptIfRunning) {
  2. //说明此时任务不能取消了,直接返回false
  3. if (!(任务状态==刚创建 &&
  4. cas修改任务状态为取消或中断成功)))
  5. return false;
  6. try {
  7. if (尝试打断成功) {
  8. try {
  9. 获取执行任务的线程
  10. if (执行任务的线程!=null)
  11. 给执行任务的线程设置中断标识
  12. } finally {
  13. cas的方式设置任务的状态为被打断
  14. }
  15. }
  16. } finally {
  17. finishCompletion();
  18. }
  19. return true;
  20. }

三,流程梳理

  1. 首先明确一点,不管我们传入的是Runnable还是Callable接口,他最终用的都是Callable,Runnable接口会被他通过装饰者模式封装为Callable。

  2. 一个任务执行的入口其实就是run方法。

  3. 进入run方法,首先他会判断当前任务是否已经被执行过或者当前线程通过cas的方式并没有抢到执行当前任务的机会。如果是的话,说明已经有线程正在执行或者执行完了当前的任务,直接返回即可。

  4. 接下来他会判断当前任务是否为空或者当前任务的状态,其实判断状态就是为了判断当前任务是不是被取消了。如果任务不为空并且没有被取消,他会执行call方法(实际上就是我们自己的业务代码)并将结果设置到result将任务的是否被执行状态改成true表示顺利执行完。

  5. call方法执行过程中如果发生异常了,会将结果设置为null,是否被顺利执行的状态为设置为false,表示执行过程发生了异常。

  6. 接下来,它会将异常信息封装到outcome,然后设置但该你任务的状态为异常结束,并自旋的方式唤醒所有等待队列中等待获取结果的线程。

  7. 如果call正常执行完了,他会用cas的方式设置当前任务的完成状态为完成中,如果设置成功,outcome来接收任务的结果,设置当前任务的状态为正常完成状态,并且唤醒所有等待结果的线程。

  8. 最终它会将执行当前任务的线程设置为null,并判断,如果当前任务的状态是被打断中或者已经被打断,他就会自旋判断如果当前线程的状态为被打断,让执行当前任务的线程释放cpu。

  9. get方法是获取当前任务的结果。首先他会获取当前任务的状态,如果状态小于等于未完成首先他会通过自旋的方式,第一次自旋尚未给当前线程创建waitNode对象,此时就需要位当前线程创建waitNode对象。第二次自旋,创建好了对象还没有入队,cas的方式入队。第三次自旋,判断是否设置超时时间,如果没设置超时时间,当前get操作的线程就会被park了。 线程状态会变为 WAITING状态,相当于休眠了..除非有其它线程将你唤醒 或者 将当前线程 中断。他会获取当前线程的任务状态,如果任务还没有完成,释放cpu接着等。如果任务已经完成,此时需要将node设置位null help GC,直接返回当前的状态。除此之外还要判断,如果当前线程被其他线程用中断的方式唤醒,这种唤醒方式会将Thread的中断标记位设置为false,当前线程出队,get方法抛出中断异常。

  10. 如果状态表示已经有结果,会执行report方法。

  11. report方法,如果任务正常执行结束,返回结果,如果任务被取消或者中断了,抛出异常,如果任务执行过程中发生异常结束了,返回异常。

  12. 最后就是任务的取消方法 cancel。他会先判断state == NEW 成立 表示当前任务处于运行中 或者 处于线程池 任务队列中..并且cas修改状态成功,他就会尝试取打断。

  13. 如果尝试打断成功,给runner线程一个中断信号..如果你的程序是响应中断 会走中断逻辑..假设你程序不是响应中断的..啥也不会发生。最后,设置线程状态为中断,唤醒获取结果的线程。