一.使用
public class Test {public static void main(String[] args) throws ExecutionException, InterruptedException {Callable<Integer> myCallable = new MyCallable();FutureTask<Integer> futureTask = new FutureTask<>(myCallable);Thread thread = new Thread(futureTask);thread.start();Integer integer = futureTask.get();System.out.println(integer);}}class MyCallable implements Callable<Integer> {@Overridepublic Integer call() throws InterruptedException {System.out.println("call");Thread.sleep(5000);return 1;}}
二.源码
Callable
@FunctionalInterfacepublic interface Callable<V> {V call() throws Exception;}
FutureTask
public class FutureTask<V> implements RunnableFuture<V>public interface RunnableFuture<V> extends Runnable, Future<V>public FutureTask(Callable<V> callable) {this.callable = callable;this.state = NEW;}
FutureTask.run
public void run() {// private volatile Thread runner; runner属性设置为当前线程if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {runner = null;int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL);// 使用LockSupport.unpark唤醒所有等待的线程finishCompletion();}}protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);// 使用LockSupport.unpark唤醒所有等待的线程finishCompletion();}}
FutureTask.get
public V get() throws InterruptedException, ExecutionException {int s = state;// 没有执行完毕或者正在设置执行结果时等待if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);}private int awaitDone(boolean timed, long nanos)throws InterruptedException {WaitNode q = null;boolean queued = false;for (;;) {if (q == null)// 第一次循环q = new WaitNode();else if (!queued)// 第二次循环入队,后进先出queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else// 第三次循环LockSupport.park(this);}}static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }}private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}
