一,Future和FutureTask的关系
1.Future
/**
* @author 二十
* @since 2021/8/28 3:50 下午
*/
public class FutureTaskTest {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
Future<String> future = executor.submit(() -> "callable");
Future<?> future1 = executor.submit(() -> System.out.println("runnable"));
FutureTask<String> futureTask = new FutureTask<>(() -> "hahah");
}
}
当我们调用线程池的submit()方法的时候,会给我们返回一个Future类型的对象。那么这个Future又是什么?
public interface Future<V> {
//取消任务
boolean cancel(boolean mayInterruptIfRunning);
//是否取消了任务
boolean isCancelled();
//是否执行完了任务
boolean isDone();
//获取线程的执行结果
V get() throws InterruptedException, ExecutionException;
//获取结果超时
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
这个接口主要的功能就是可以获取到线程执行完成后的结果。
2.两者的关系
FutureTask
可以看做是Future
和 Runnable
两个接口的实现类。
二,FutureTask源码
1.属性
//表示当前任务的状态
private volatile int state;
//任务尚未执行
private static final int NEW = 0;
//任务还未结束
private static final int COMPLETING = 1;
//任务正常执行结束
private static final int NORMAL = 2;
//任务发生了异常,由callable.call()向上抛出
private static final int EXCEPTIONAL = 3;
//任务被取消
private static final int CANCELLED = 4;
//任务正在中断
private static final int INTERRUPTING = 5;
//任务已经被中断
private static final int INTERRUPTED = 6;
//submit(runnable/callable) 使用装饰者模式将runnable装饰成callable
private Callable<V> callable;
//正常情况下:用来保存call的执行结果,异常情况下:用来保存call抛出的异常
private Object outcome; // non-volatile, protected by state reads/writes
//执行当前任务的线程
private volatile Thread runner;
//因为会有很多线程去get当前任务的结果,所以 这里使用了一种数据结构 stack 头插 头取 的一个队列。
private volatile WaitNode waiters;
2.WaitNode内部类
这个类里面封装着执行任务的线程和指向下一个线程的指针。
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
3.构造器
当我们传入一个runnable接口的时候:
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
调用了Executors.callable(runnable, result);
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
当任务不为空,实际上返回的是一个RunnableAdapter对象。
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
在这里,将runnable装饰成了callable。
4.run()
public void run() {
if(任务不是刚创建||任务没有获取到执行权){
return
}
try{
if(如果任务不为空&&任务没有被其他线程执行){
声明 result
声明 ran = true
try{
调用call方法执行任务,并将结果赋值给result
ran = true
}catch(Exception e){
result=null
ran =false
调用setException(ex)
}
if(ran){
执行set(result)
}
}
}catch(Exception e){
}finally{
释放任务执行者
if(任务状态为被打断中或者已经被打断){
执行 handlePossibleCancellationInterrupt(s)
}
}
}
5.set()
protected void set(V v){
if(cas的方式设置当前任务的状态为完成中成功){
将任务结果赋值给outcome
设置当前任务状态为正常结束
调用finishCompletion()
}
}
6.setException()
protected void setException(Throwable t) {
if (cas的方式设置当前任务的状态为完成中成功) {
将异常赋值给outcome
设置当前任务状态为异常
finishCompletion();
}
}
7.handlePossibleCancellationInterrupt()
private void handlePossibleCancellationInterrupt(int s) {
if (如果当前任务状态为被打断)
while (当前任务状态为被打断)
Thread.yield(); // 释放当前线程cpu的执行权
}
8.finishCompletion()
private void finishCompletion() {
for循环让q指向当前链表的头节点
//这里的操作是为了可能任务还没开始执行就被其他线程取消了,这个时候将等待结果的线程全部唤醒,小概率事件
if(使用cas设置waiters为null成功){
for(;;){
获取q包装的线程
if(当前节点包装的线程不为空){
不让q在继续包装这个线程
唤醒q之前包装的线程
}
q指向链表的下一个节点
if(下一个节点为空){
break
}
}
break
}
done()
将callable设置为null帮助gc
}
9.done()
protected void done() { }
10.get()
public V get() throws InterruptedException, ExecutionException {
获取当前任务执行状态
if (当前任务尚未执行完)
s = awaitDone(false, 0L);
return report(s);
}
11.awaitDone()
//参数说明:是否设置了超时时间 超时时间
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
声明 当前线程是否进入获取结果的等待队列 =false
for (;;) {
if (如果当前线程被其他线程用中断的方式唤醒) {
当前节点出队列
并抛出中断异常
}
//假设当前线程是被其他线程以unpark的方式正常唤醒,那么就会走下面的逻辑
获取当前任务状态
//大于完成中有可能正常结束,也有可能异常结束
if (如果当前任务状态大于完成中) {
if (如果当前线程创建过node)
释放node
return 当前任务状态;
}
else if (当前任务状态是完成中)
释放线程的cpu执行权,接着等
else if (等待节点为空,说明是第一次自旋,还没有创建节点)
创建一个新的节点
else if (创建好了对象但是还没有入队)
cas的方式加入队列
else if (有超时时间) {
走超时逻辑
}
else
阻塞当前线程
}
}
12.report(int s)
//参数说明:当前任务状态
private V report(int s) throws ExecutionException {
if (任务正常结束)
return 任务结果;
if (任务被取消或中断)
抛异常
否则就是任务发生异常,将异常向上抛出
}
13.cancel()
public boolean cancel(boolean mayInterruptIfRunning) {
//说明此时任务不能取消了,直接返回false
if (!(任务状态==刚创建 &&
cas修改任务状态为取消或中断成功)))
return false;
try {
if (尝试打断成功) {
try {
获取执行任务的线程
if (执行任务的线程!=null)
给执行任务的线程设置中断标识
} finally {
cas的方式设置任务的状态为被打断
}
}
} finally {
finishCompletion();
}
return true;
}
三,流程梳理
首先明确一点,不管我们传入的是Runnable还是Callable接口,他最终用的都是Callable,Runnable接口会被他通过装饰者模式封装为Callable。
一个任务执行的入口其实就是run方法。
进入run方法,首先他会判断当前任务是否已经被执行过或者当前线程通过cas的方式并没有抢到执行当前任务的机会。如果是的话,说明已经有线程正在执行或者执行完了当前的任务,直接返回即可。
接下来他会判断当前任务是否为空或者当前任务的状态,其实判断状态就是为了判断当前任务是不是被取消了。如果任务不为空并且没有被取消,他会执行call方法(实际上就是我们自己的业务代码)并将结果设置到result将任务的是否被执行状态改成true表示顺利执行完。
call方法执行过程中如果发生异常了,会将结果设置为null,是否被顺利执行的状态为设置为false,表示执行过程发生了异常。
接下来,它会将异常信息封装到outcome,然后设置但该你任务的状态为异常结束,并自旋的方式唤醒所有等待队列中等待获取结果的线程。
如果call正常执行完了,他会用cas的方式设置当前任务的完成状态为完成中,如果设置成功,outcome来接收任务的结果,设置当前任务的状态为正常完成状态,并且唤醒所有等待结果的线程。
最终它会将执行当前任务的线程设置为null,并判断,如果当前任务的状态是被打断中或者已经被打断,他就会自旋判断如果当前线程的状态为被打断,让执行当前任务的线程释放cpu。
get方法是获取当前任务的结果。首先他会获取当前任务的状态,如果状态小于等于未完成首先他会通过自旋的方式,第一次自旋尚未给当前线程创建waitNode对象,此时就需要位当前线程创建waitNode对象。第二次自旋,创建好了对象还没有入队,cas的方式入队。第三次自旋,判断是否设置超时时间,如果没设置超时时间,当前get操作的线程就会被park了。 线程状态会变为 WAITING状态,相当于休眠了..除非有其它线程将你唤醒 或者 将当前线程 中断。他会获取当前线程的任务状态,如果任务还没有完成,释放cpu接着等。如果任务已经完成,此时需要将node设置位null help GC,直接返回当前的状态。除此之外还要判断,如果当前线程被其他线程用中断的方式唤醒,这种唤醒方式会将Thread的中断标记位设置为false,当前线程出队,get方法抛出中断异常。
如果状态表示已经有结果,会执行report方法。
report方法,如果任务正常执行结束,返回结果,如果任务被取消或者中断了,抛出异常,如果任务执行过程中发生异常结束了,返回异常。
最后就是任务的取消方法 cancel。他会先判断state == NEW 成立 表示当前任务处于运行中 或者 处于线程池 任务队列中..并且cas修改状态成功,他就会尝试取打断。
如果尝试打断成功,给runner线程一个中断信号..如果你的程序是响应中断 会走中断逻辑..假设你程序不是响应中断的..啥也不会发生。最后,设置线程状态为中断,唤醒获取结果的线程。