一.使用

  1. public class Test {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. Callable<Integer> myCallable = new MyCallable();
  4. FutureTask<Integer> futureTask = new FutureTask<>(myCallable);
  5. Thread thread = new Thread(futureTask);
  6. thread.start();
  7. Integer integer = futureTask.get();
  8. System.out.println(integer);
  9. }
  10. }
  11. class MyCallable implements Callable<Integer> {
  12. @Override
  13. public Integer call() throws InterruptedException {
  14. System.out.println("call");
  15. Thread.sleep(5000);
  16. return 1;
  17. }
  18. }

二.源码

Callable

  1. @FunctionalInterface
  2. public interface Callable<V> {
  3. V call() throws Exception;
  4. }

FutureTask

  1. public class FutureTask<V> implements RunnableFuture<V>
  2. public interface RunnableFuture<V> extends Runnable, Future<V>
  3. public FutureTask(Callable<V> callable) {
  4. this.callable = callable;
  5. this.state = NEW;
  6. }

FutureTask.run

  1. public void run() {
  2. // private volatile Thread runner; runner属性设置为当前线程
  3. if (state != NEW ||
  4. !UNSAFE.compareAndSwapObject(this, runnerOffset,
  5. null, Thread.currentThread()))
  6. return;
  7. try {
  8. Callable<V> c = callable;
  9. if (c != null && state == NEW) {
  10. V result;
  11. boolean ran;
  12. try {
  13. result = c.call();
  14. ran = true;
  15. } catch (Throwable ex) {
  16. result = null;
  17. ran = false;
  18. setException(ex);
  19. }
  20. if (ran)
  21. set(result);
  22. }
  23. } finally {
  24. runner = null;
  25. int s = state;
  26. if (s >= INTERRUPTING)
  27. handlePossibleCancellationInterrupt(s);
  28. }
  29. }
  30. protected void set(V v) {
  31. if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
  32. outcome = v;
  33. UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
  34. // 使用LockSupport.unpark唤醒所有等待的线程
  35. finishCompletion();
  36. }
  37. }
  38. protected void setException(Throwable t) {
  39. if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
  40. outcome = t;
  41. UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
  42. // 使用LockSupport.unpark唤醒所有等待的线程
  43. finishCompletion();
  44. }
  45. }

FutureTask.get

  1. public V get() throws InterruptedException, ExecutionException {
  2. int s = state;
  3. // 没有执行完毕或者正在设置执行结果时等待
  4. if (s <= COMPLETING)
  5. s = awaitDone(false, 0L);
  6. return report(s);
  7. }
  8. private int awaitDone(boolean timed, long nanos)
  9. throws InterruptedException {
  10. WaitNode q = null;
  11. boolean queued = false;
  12. for (;;) {
  13. if (q == null)
  14. // 第一次循环
  15. q = new WaitNode();
  16. else if (!queued)
  17. // 第二次循环入队,后进先出
  18. queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
  19. q.next = waiters, q);
  20. else
  21. // 第三次循环
  22. LockSupport.park(this);
  23. }
  24. }
  25. static final class WaitNode {
  26. volatile Thread thread;
  27. volatile WaitNode next;
  28. WaitNode() { thread = Thread.currentThread(); }
  29. }
  30. private V report(int s) throws ExecutionException {
  31. Object x = outcome;
  32. if (s == NORMAL)
  33. return (V)x;
  34. if (s >= CANCELLED)
  35. throw new CancellationException();
  36. throw new ExecutionException((Throwable)x);
  37. }