一.使用
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<Integer> myCallable = new MyCallable();
FutureTask<Integer> futureTask = new FutureTask<>(myCallable);
Thread thread = new Thread(futureTask);
thread.start();
Integer integer = futureTask.get();
System.out.println(integer);
}
}
class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws InterruptedException {
System.out.println("call");
Thread.sleep(5000);
return 1;
}
}
二.源码
Callable
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
FutureTask
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>
public FutureTask(Callable<V> callable) {
this.callable = callable;
this.state = NEW;
}
FutureTask.run
public void run() {
// private volatile Thread runner; runner属性设置为当前线程
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
// 使用LockSupport.unpark唤醒所有等待的线程
finishCompletion();
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
// 使用LockSupport.unpark唤醒所有等待的线程
finishCompletion();
}
}
FutureTask.get
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 没有执行完毕或者正在设置执行结果时等待
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
WaitNode q = null;
boolean queued = false;
for (;;) {
if (q == null)
// 第一次循环
q = new WaitNode();
else if (!queued)
// 第二次循环入队,后进先出
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else
// 第三次循环
LockSupport.park(this);
}
}
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}