1、Future,Runnable和FutureTask
FutureTask实现了Future和Runnable接口
Future
public interface Future<V> {boolean cancel(boolean mayInterruptIfRunning);boolean isCancelled();boolean isDone();V get() throws InterruptedException, ExecutionException;V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;}
FutureTask
public class FutureTask<V> implements RunnableFuture<V> {private volatile int state;private static final int NEW = 0;private static final int COMPLETING = 1;private static final int NORMAL = 2;private static final int EXCEPTIONAL = 3;private static final int CANCELLED = 4;private static final int INTERRUPTING = 5;private static final int INTERRUPTED = 6;/** The underlying callable; nulled out after running */private Callable<V> callable;/** The result to return or exception to throw from get() */private Object outcome; // non-volatile, protected by state reads/writes/** The thread running the callable; CASed during run() */private volatile Thread runner;/** Treiber stack of waiting threads */private volatile WaitNode waiters;}
RunnableFuture
public interface RunnableFuture<V> extends Runnable, Future<V> {/*** Sets this Future to the result of its computation* unless it has been cancelled.*/void run();}
2、FutureTask 分析
2.1、FutureTask属性分析
FutureTask属性
| 属性名 | 含义 | 说明 |
|---|---|---|
| state | 当前任务状态 | 详情见FutureTask状态 |
| callable | 当前任务主体 | |
| outcome | 任务执行结果,任务正常执行结束 outcome保存执行结果,callable抛出异常 outcome保存异常 | |
| runner | 执行当前任务的线程 | 任务运行时会先通过CAS将任务的runner属性设置为当前执行线程 |
| waiters | 等待当前任务执行结果的线程,线程对象被封装为WaitNode对象,多个线程节点组成链表 | 1、任务执行完毕后或者任务取消后唤醒阻塞线程就需要通过该链表获取所有等待线程进行唤醒 2、当前等待任务结果线程被interrupted中断后也会,从当前任务的等待线程链路中找到当前线程唤醒并移出链表 |
FutureTask状态
| 状态编码 | 状态 | 说明 |
|---|---|---|
| NEW | 表示当前任务处于运行中或者线程池任务队列中 | 调用构造方法后任务就会被设置为NEW状态,直到任务运行完毕或者抛出异常才会改变状态,所以NEW状态可能在未运行也可能正在运行 |
| COMPLETING | 当前任务正在结束,但未完全结束,属于临界状态 | 任务运行完毕或者抛出异常时,使用set方法设置运行结果或者使用setException设置异常时,先将任务状态设置为COMPLETING,然后将结果或者异常设置给outcome,随后更改任务状态为NORMAL |
| NORMAL | 当前任务正常结束 | 任务正常运行完毕调用set方法设置运行结果,将结果或者异常设置给outcome后更改任务状态为NORMAL |
| EXCEPTIONAL | 当前任务执行发生异常 | 任务运行异常,使用 setException设置异常时,先将任务状态设置为COMPLETING,然后将异常设置给outcome,随后更改任务状态为 EXCEPTIONAL |
| CANCELLED | 当前任务被取消 | 如果当前任务处于NEW状态,外部线程调用cancel方法,并且传入mayInterruptIfRunning参数为false,会将状态改为CANCELLED,然后唤醒所有阻塞线程 |
| INTERRUPTING | 当前任务中断中 | 如果当前任务处于NEW状态,此时外部线程调用cancel方法,并且传入mayInterruptIfRunning参数为true,会先将状态改为INTERRUPTING,然后获取当期任务的执行线程调用中断方法,然后将状态改为INTERRUPTED |
| INTERRUPTED | 当前任务已中断 |
public class FutureTask<V> implements RunnableFuture<V> {//表示当前任务状态private volatile int state;//当前任务尚未执行private static final int NEW = 0;//当前任务正在结束,但未完全结束,属于临界状态private static final int COMPLETING = 1;//当前任务正常结束private static final int NORMAL = 2;//当前任务执行发生异常private static final int EXCEPTIONAL = 3;//当前任务被取消private static final int CANCELLED = 4;//当前任务中断中private static final int INTERRUPTING = 5;//当前任务已中断private static final int INTERRUPTED = 6;//装饰者模式装饰为callableprivate Callable<V> callable;/*** 正常情况:任务正常执行结束 outcome保存执行结果* 非正常情况: callable抛出异常 outcome保存异常*/private Object outcome; // non-volatile, protected by state reads/writes//当前任务被执行期间 保存当前执行任务的线程对象引用private volatile Thread runner;//因为有很多线程去get当前任务的结果,所以此处使用了一种头插头取的队列private volatile WaitNode waiters;}
2.2、FutureTask 构造方法
构造方法可传入Callable或者Runnable,传入Runnable时通过适配器模式将runable转换为callable,同时将当前FutureTask 设置为NEW状态
public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW;}public FutureTask(Runnable runnable, V result) {//适配器模式 将runable转换为callablethis.callable = Executors.callable(runnable, result);this.state = NEW;}
2.3、run方法
run方法是FutureTask 的执行逻辑,其中包含了CAS获取任务,执行业务代码,设置执行结果等工作
public void run() {/*** 条件1:state != NEW* true: 当前task被执行过了或者取消了,非NEW状态 线程不再处理* false: 当前task为NEW 说明当前任务未被执行或者正在被其它线程执行* 条件2:!RUNNER.compareAndSet(this, null, Thread.currentThread())* 前置条件:state == NEW* 通过CAS把当前线程设置到RUNNER属性,表明该任务由当前线程处理* true: 当前线程抢占任务失败,该任务已被其它线程执行* false: 当前线程抢占任务成功** 只要满足条件1或者2,即当前任务不是NEW状态或者当前任务被其他线程处理则返回,不做任务操作*/if (state != NEW ||!RUNNER.compareAndSet(this, null, Thread.currentThread()))return;try {//表明当前任务为NEW状态且当前线程抢占到任务Callable<V> c = callable;/*** 条件1:防止提交空任务导致空指针* 条件2:防止外部线程在第一次判断任务状态为NEW后到任务执行期间取消掉任务*/if (c != null && state == NEW) {//结果引用V result;/*** ran为true: callable.run代码块执行成功 未发生异常* ran为false: callable.run代码块执行失败 发生异常*/boolean ran;try {//调用任务本身的逻辑result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;//设置异常到outcomsetException(ex);}//执行正常 设置结果到outcomif (ran)set(result);}} finally {// 状态修改后将runner设置为nullrunner = null;//判断当前任务状态是否是中断中状态,此时当前线程调用Thread.yield()释放cpu时间片// 防止调用cancel(true)的线程在将任务状态从NEW改为INTERRUPTING后因为cpu调度挂起了,没有来的及调用 t.interrupt()方法int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}
2.4、awaitDone方法
此方法是提交任务到线程池后,外部线程等待返回结果的方法,结果返回前线程会被阻塞,任务执行完毕或者任务被取消后,将其唤醒。
/**** @param timed 是否设置等待超时时间* @param nanos 等待超时时间* @return* @throws InterruptedException*/private int awaitDone(boolean timed, long nanos)throws InterruptedException {long startTime = 0L; // Special value 0L means not yet parked//引用当前线程封装成WaitNode对象WaitNode q = null;//表示当前WaitNode对象没有放入等待者队列boolean queued = false;for (;;) {//获取最新状态int s = state;//任务执行完毕返回结果if (s > COMPLETING) {//如果调用awaitDone方法时 任务已经执行完毕 则直接返回结果 不用将WaitNode放入等待着队列了if (q != null)q.thread = null;return s;}else if (s == COMPLETING)// 线程已经执行完毕但是还未设置结果或者异常 处于完成的临界状态 让当前线程暂时释放cpuThread.yield();else if (Thread.interrupted()) {//当前等待线程被中断,当前线程不在等待执行结果,移出等待队列,抛出中断异常removeWaiter(q);throw new InterruptedException();}else if (q == null) {/*** 第一次自旋: 当前线程还未创建WaitNode对象,此时为当前线程创建WaitNode对象* 前置条件: 当前任务处于NEW状态 当前等待线程没有被中断*/if (timed && nanos <= 0L)return s;q = new WaitNode();}else if (!queued){/*** 第二次自旋: 将当前线程创建的WaitNode加入等待队列,设置为头节点* 前置条件:当前任务处于NEW状态 当前等待线程没有被中断,当前线程创建WaitNode对象 但是还未入队*/// 设置当前节点的next为WAITERS,然后把WAITERS设置为当前节点,保证WAITERS一直指向头节点queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);} else if (timed) {/*** 超时等待:设置了等待超时时间* 前置条件:当前线程创建WaitNode对象已经放入等待队列*/final long parkNanos;if (startTime == 0L) {//第一次执行该代码块 记录开始等待结果的时间startTime = System.nanoTime();if (startTime == 0L)startTime = 1L;parkNanos = nanos;} else {/*** 第一次执行该代码块会设置阻塞时间* 第二次执行该代码块就会 将当前线程移出等待队列并唤醒*/long elapsed = System.nanoTime() - startTime;if (elapsed >= nanos) {//如果超时 将当前线程移出等待队列并唤醒removeWaiter(q);return state;}//计算新的阻塞时间parkNanos = nanos - elapsed;}// nanoTime may be slow; recheck before parkingif (state < COMPLETING){//设置阻塞时间LockSupport.parkNanos(this, parkNanos);}}/**** 阻塞等待:* 前置条件:当前线程创建WaitNode对象已经放入等待队列,并且没有设置等待超时时间*/else{//阻塞住当前get操作线程 当前线程变为waiting状态 除非有其他线程将其唤醒或者中断LockSupport.park(this);}}}
2.5、cancel方法
cancel方法是外部线程取消任务的api,可以对已经执行的任务进行中断
/**** @param mayInterruptIfRunning 如果任务正在运行 是否进行中断 true : 中断运行的任务 false:不中断运行的任务* @return*/public boolean cancel(boolean mayInterruptIfRunning) {/*** 条件1:state == NEW 表示当前任务处于运行中或者线程池任务队列中* 条件2:STATE.compareAndSet(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED) CAS改变任务状态* 如果条件1和2有任何一项不满足 则取消任务失败*/if (!(state == NEW && STATE.compareAndSet(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try {//如果要中断已运行任务if (mayInterruptIfRunning) {try {Thread t = runner;/*** t !=null* fasle 说明当前任务在线程池队列中还没有线程获取它* true 说明当前任务已经被runner线程领取 可能已处于运行状态 调用中断接口*/if (t != null)t.interrupt();} finally { // final state//设置任务状态为中断完成STATE.setRelease(this, INTERRUPTED);}}} finally {//唤醒所有阻塞的线程finishCompletion();}return true;}
2.6、removeWaiter方法
当等待任务结果的外部线程等待超时或者被中断时,会调用removeWaiter方法将等待线程移出任务的等待队列,并将其唤醒。
/*** 把当前线程封装的节点从等待结果线程链表中移除* @param node*/private void removeWaiter(WaitNode node) {if (node != null) {node.thread = null; //把要移除node的thread设置为null,遍历链表时通过thread判断是否要移除retry:for (;;) {/*** q 当前节点* pred 当前节点的前一个节点* s 当前节点的下一个节点* 循环条件 当前节点不为空* 循环结束后 当前节点指向下一个节点*/for (WaitNode pred = null, q = waiters, s; q != null; q = s) {s = q.next;if (q.thread != null)pred = q;/*** pred != null 当前节点需要移除并且当前节点不是头节点* 前置条件: q.thread == null 说明当前节点需要移除* 让当前节点的前一个节点指向当前节点的下一节点 即可移除当前节点*/else if (pred != null) {pred.next = s;if (pred.thread == null) // check for racecontinue retry;}/*** !WAITERS.compareAndSet(this, q, s) CAS替换头节点失败* 前置条件: q.thread == null 说明当前节点需要移除* pred == null 说明当前节点为头节点* 即需要把头节点进行移除,通过CAS将头节点替换为其下一个节点*/else if (!WAITERS.compareAndSet(this, q, s))continue retry;}//说明本次要清除的等待线程对象全部清除完毕break;}}}
2.7、get方法
get方法是供外部线程获取执行结果的方法,如果任务未执行或者正在执行会调用awaitDone方法将等待线程阻塞。
public V get() throws InterruptedException, ExecutionException {int s = state;//判断任务处于未执行或者正在执行状态 调用get的外部线程会被阻塞if (s <= COMPLETING){//最核心的方法 阻塞等待任务执行完毕s = awaitDone(false, 0L);}return report(s);}
2.8、report方法
返回结果
private V report(int s) throws ExecutionException {//正常情况下outcome为任务运行结果 非正常情况下 outcome为抛出异常Object x = outcome;if (s == NORMAL){//任务正常结束 返回结果return (V)x;}if (s >= CANCELLED){//被取消了throw new CancellationException();}//说明callable实现有问题throw new ExecutionException((Throwable)x);}
2.9、finishCompletion方法
收尾方法
/*** 唤醒等待线程*/private void finishCompletion() {// q指向waiters链表的头节点for (WaitNode q; (q = waiters) != null;) {//通过CAS将waiters设置为null 可能会有竞争,外部线程调用cancel时也会触发finishCompletion方法if (WAITERS.weakCompareAndSet(this, q, null)) {for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;//唤醒等待结果的线程LockSupport.unpark(t);}//继续遍历链表 如果链表遍历完毕 则退出WaitNode next = q.next;if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}//扩展点done();//help gccallable = null; // to reduce footprint}
