一,Future和FutureTask的关系
1.Future
/*** @author 二十* @since 2021/8/28 3:50 下午*/public class FutureTaskTest {public static void main(String[] args) {ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));Future<String> future = executor.submit(() -> "callable");Future<?> future1 = executor.submit(() -> System.out.println("runnable"));FutureTask<String> futureTask = new FutureTask<>(() -> "hahah");}}
当我们调用线程池的submit()方法的时候,会给我们返回一个Future类型的对象。那么这个Future又是什么?
public interface Future<V> {//取消任务boolean cancel(boolean mayInterruptIfRunning);//是否取消了任务boolean isCancelled();//是否执行完了任务boolean isDone();//获取线程的执行结果V get() throws InterruptedException, ExecutionException;//获取结果超时V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;}
这个接口主要的功能就是可以获取到线程执行完成后的结果。
2.两者的关系

FutureTask可以看做是Future 和 Runnable 两个接口的实现类。
二,FutureTask源码
1.属性
//表示当前任务的状态private volatile int state;//任务尚未执行private static final int NEW = 0;//任务还未结束private static final int COMPLETING = 1;//任务正常执行结束private static final int NORMAL = 2;//任务发生了异常,由callable.call()向上抛出private static final int EXCEPTIONAL = 3;//任务被取消private static final int CANCELLED = 4;//任务正在中断private static final int INTERRUPTING = 5;//任务已经被中断private static final int INTERRUPTED = 6;//submit(runnable/callable) 使用装饰者模式将runnable装饰成callableprivate Callable<V> callable;//正常情况下:用来保存call的执行结果,异常情况下:用来保存call抛出的异常private Object outcome; // non-volatile, protected by state reads/writes//执行当前任务的线程private volatile Thread runner;//因为会有很多线程去get当前任务的结果,所以 这里使用了一种数据结构 stack 头插 头取 的一个队列。private volatile WaitNode waiters;
2.WaitNode内部类
这个类里面封装着执行任务的线程和指向下一个线程的指针。
static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }}
3.构造器
当我们传入一个runnable接口的时候:
public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW; // ensure visibility of callable}
调用了Executors.callable(runnable, result);
public static <T> Callable<T> callable(Runnable task, T result) {if (task == null)throw new NullPointerException();return new RunnableAdapter<T>(task, result);}
当任务不为空,实际上返回的是一个RunnableAdapter对象。
static final class RunnableAdapter<T> implements Callable<T> {final Runnable task;final T result;RunnableAdapter(Runnable task, T result) {this.task = task;this.result = result;}public T call() {task.run();return result;}}
在这里,将runnable装饰成了callable。
4.run()
public void run() {if(任务不是刚创建||任务没有获取到执行权){return}try{if(如果任务不为空&&任务没有被其他线程执行){声明 result声明 ran = truetry{调用call方法执行任务,并将结果赋值给resultran = true}catch(Exception e){result=nullran =false调用setException(ex)}if(ran){执行set(result)}}}catch(Exception e){}finally{释放任务执行者if(任务状态为被打断中或者已经被打断){执行 handlePossibleCancellationInterrupt(s)}}}
5.set()
protected void set(V v){if(cas的方式设置当前任务的状态为完成中成功){将任务结果赋值给outcome设置当前任务状态为正常结束调用finishCompletion()}}
6.setException()
protected void setException(Throwable t) {if (cas的方式设置当前任务的状态为完成中成功) {将异常赋值给outcome设置当前任务状态为异常finishCompletion();}}
7.handlePossibleCancellationInterrupt()
private void handlePossibleCancellationInterrupt(int s) {if (如果当前任务状态为被打断)while (当前任务状态为被打断)Thread.yield(); // 释放当前线程cpu的执行权}
8.finishCompletion()
private void finishCompletion() {for循环让q指向当前链表的头节点//这里的操作是为了可能任务还没开始执行就被其他线程取消了,这个时候将等待结果的线程全部唤醒,小概率事件if(使用cas设置waiters为null成功){for(;;){获取q包装的线程if(当前节点包装的线程不为空){不让q在继续包装这个线程唤醒q之前包装的线程}q指向链表的下一个节点if(下一个节点为空){break}}break}done()将callable设置为null帮助gc}
9.done()
protected void done() { }
10.get()
public V get() throws InterruptedException, ExecutionException {获取当前任务执行状态if (当前任务尚未执行完)s = awaitDone(false, 0L);return report(s);}
11.awaitDone()
//参数说明:是否设置了超时时间 超时时间private int awaitDone(boolean timed, long nanos) throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;声明 当前线程是否进入获取结果的等待队列 =falsefor (;;) {if (如果当前线程被其他线程用中断的方式唤醒) {当前节点出队列并抛出中断异常}//假设当前线程是被其他线程以unpark的方式正常唤醒,那么就会走下面的逻辑获取当前任务状态//大于完成中有可能正常结束,也有可能异常结束if (如果当前任务状态大于完成中) {if (如果当前线程创建过node)释放nodereturn 当前任务状态;}else if (当前任务状态是完成中)释放线程的cpu执行权,接着等else if (等待节点为空,说明是第一次自旋,还没有创建节点)创建一个新的节点else if (创建好了对象但是还没有入队)cas的方式加入队列else if (有超时时间) {走超时逻辑}else阻塞当前线程}}
12.report(int s)
//参数说明:当前任务状态private V report(int s) throws ExecutionException {if (任务正常结束)return 任务结果;if (任务被取消或中断)抛异常否则就是任务发生异常,将异常向上抛出}
13.cancel()
public boolean cancel(boolean mayInterruptIfRunning) {//说明此时任务不能取消了,直接返回falseif (!(任务状态==刚创建 &&cas修改任务状态为取消或中断成功)))return false;try {if (尝试打断成功) {try {获取执行任务的线程if (执行任务的线程!=null)给执行任务的线程设置中断标识} finally {cas的方式设置任务的状态为被打断}}} finally {finishCompletion();}return true;}
三,流程梳理
首先明确一点,不管我们传入的是Runnable还是Callable接口,他最终用的都是Callable,Runnable接口会被他通过装饰者模式封装为Callable。
一个任务执行的入口其实就是run方法。
进入run方法,首先他会判断当前任务是否已经被执行过或者当前线程通过cas的方式并没有抢到执行当前任务的机会。如果是的话,说明已经有线程正在执行或者执行完了当前的任务,直接返回即可。
接下来他会判断当前任务是否为空或者当前任务的状态,其实判断状态就是为了判断当前任务是不是被取消了。如果任务不为空并且没有被取消,他会执行call方法(实际上就是我们自己的业务代码)并将结果设置到result将任务的是否被执行状态改成true表示顺利执行完。
call方法执行过程中如果发生异常了,会将结果设置为null,是否被顺利执行的状态为设置为false,表示执行过程发生了异常。
接下来,它会将异常信息封装到outcome,然后设置但该你任务的状态为异常结束,并自旋的方式唤醒所有等待队列中等待获取结果的线程。
如果call正常执行完了,他会用cas的方式设置当前任务的完成状态为完成中,如果设置成功,outcome来接收任务的结果,设置当前任务的状态为正常完成状态,并且唤醒所有等待结果的线程。
最终它会将执行当前任务的线程设置为null,并判断,如果当前任务的状态是被打断中或者已经被打断,他就会自旋判断如果当前线程的状态为被打断,让执行当前任务的线程释放cpu。
get方法是获取当前任务的结果。首先他会获取当前任务的状态,如果状态小于等于未完成首先他会通过自旋的方式,第一次自旋尚未给当前线程创建waitNode对象,此时就需要位当前线程创建waitNode对象。第二次自旋,创建好了对象还没有入队,cas的方式入队。第三次自旋,判断是否设置超时时间,如果没设置超时时间,当前get操作的线程就会被park了。 线程状态会变为 WAITING状态,相当于休眠了..除非有其它线程将你唤醒 或者 将当前线程 中断。他会获取当前线程的任务状态,如果任务还没有完成,释放cpu接着等。如果任务已经完成,此时需要将node设置位null help GC,直接返回当前的状态。除此之外还要判断,如果当前线程被其他线程用中断的方式唤醒,这种唤醒方式会将Thread的中断标记位设置为false,当前线程出队,get方法抛出中断异常。
如果状态表示已经有结果,会执行report方法。
report方法,如果任务正常执行结束,返回结果,如果任务被取消或者中断了,抛出异常,如果任务执行过程中发生异常结束了,返回异常。
最后就是任务的取消方法 cancel。他会先判断state == NEW 成立 表示当前任务处于运行中 或者 处于线程池 任务队列中..并且cas修改状态成功,他就会尝试取打断。
如果尝试打断成功,给runner线程一个中断信号..如果你的程序是响应中断 会走中断逻辑..假设你程序不是响应中断的..啥也不会发生。最后,设置线程状态为中断,唤醒获取结果的线程。
