1、Future,Runnable和FutureTask

FutureTask实现了Future和Runnable接口

Future

  1. public interface Future<V> {
  2. boolean cancel(boolean mayInterruptIfRunning);
  3. boolean isCancelled();
  4. boolean isDone();
  5. V get() throws InterruptedException, ExecutionException;
  6. V get(long timeout, TimeUnit unit)
  7. throws InterruptedException, ExecutionException, TimeoutException;
  8. }

FutureTask

  1. public class FutureTask<V> implements RunnableFuture<V> {
  2. private volatile int state;
  3. private static final int NEW = 0;
  4. private static final int COMPLETING = 1;
  5. private static final int NORMAL = 2;
  6. private static final int EXCEPTIONAL = 3;
  7. private static final int CANCELLED = 4;
  8. private static final int INTERRUPTING = 5;
  9. private static final int INTERRUPTED = 6;
  10. /** The underlying callable; nulled out after running */
  11. private Callable<V> callable;
  12. /** The result to return or exception to throw from get() */
  13. private Object outcome; // non-volatile, protected by state reads/writes
  14. /** The thread running the callable; CASed during run() */
  15. private volatile Thread runner;
  16. /** Treiber stack of waiting threads */
  17. private volatile WaitNode waiters;
  18. }

RunnableFuture

  1. public interface RunnableFuture<V> extends Runnable, Future<V> {
  2. /**
  3. * Sets this Future to the result of its computation
  4. * unless it has been cancelled.
  5. */
  6. void run();
  7. }

2、FutureTask 分析

2.1、FutureTask属性分析

FutureTask属性

属性名 含义 说明
state 当前任务状态 详情见FutureTask状态
callable 当前任务主体
outcome 任务执行结果,任务正常执行结束 outcome保存执行结果,callable抛出异常 outcome保存异常
runner 执行当前任务的线程 任务运行时会先通过CAS将任务的runner属性设置为当前执行线程
waiters 等待当前任务执行结果的线程,线程对象被封装为WaitNode对象,多个线程节点组成链表 1、任务执行完毕后或者任务取消后唤醒阻塞线程就需要通过该链表获取所有等待线程进行唤醒
2、当前等待任务结果线程被interrupted中断后也会,从当前任务的等待线程链路中找到当前线程唤醒并移出链表

FutureTask状态

状态编码 状态 说明
NEW 表示当前任务处于运行中或者线程池任务队列中 调用构造方法后任务就会被设置为NEW状态,直到任务运行完毕或者抛出异常才会改变状态,所以NEW状态可能在未运行也可能正在运行
COMPLETING 当前任务正在结束,但未完全结束,属于临界状态 任务运行完毕或者抛出异常时,使用set方法设置运行结果或者使用setException设置异常时,先将任务状态设置为COMPLETING,然后将结果或者异常设置给outcome,随后更改任务状态为NORMAL
NORMAL 当前任务正常结束 任务正常运行完毕调用set方法设置运行结果,将结果或者异常设置给outcome后更改任务状态为NORMAL
EXCEPTIONAL 当前任务执行发生异常 任务运行异常,使用 setException设置异常时,先将任务状态设置为COMPLETING,然后将异常设置给outcome,随后更改任务状态为
EXCEPTIONAL
CANCELLED 当前任务被取消 如果当前任务处于NEW状态,外部线程调用cancel方法,并且传入mayInterruptIfRunning参数为false,会将状态改为CANCELLED,然后唤醒所有阻塞线程
INTERRUPTING 当前任务中断中 如果当前任务处于NEW状态,此时外部线程调用cancel方法,并且传入mayInterruptIfRunning参数为true,会先将状态改为INTERRUPTING,然后获取当期任务的执行线程调用中断方法,然后将状态改为INTERRUPTED
INTERRUPTED 当前任务已中断

  1. public class FutureTask<V> implements RunnableFuture<V> {
  2. //表示当前任务状态
  3. private volatile int state;
  4. //当前任务尚未执行
  5. private static final int NEW = 0;
  6. //当前任务正在结束,但未完全结束,属于临界状态
  7. private static final int COMPLETING = 1;
  8. //当前任务正常结束
  9. private static final int NORMAL = 2;
  10. //当前任务执行发生异常
  11. private static final int EXCEPTIONAL = 3;
  12. //当前任务被取消
  13. private static final int CANCELLED = 4;
  14. //当前任务中断中
  15. private static final int INTERRUPTING = 5;
  16. //当前任务已中断
  17. private static final int INTERRUPTED = 6;
  18. //装饰者模式装饰为callable
  19. private Callable<V> callable;
  20. /**
  21. * 正常情况:任务正常执行结束 outcome保存执行结果
  22. * 非正常情况: callable抛出异常 outcome保存异常
  23. */
  24. private Object outcome; // non-volatile, protected by state reads/writes
  25. //当前任务被执行期间 保存当前执行任务的线程对象引用
  26. private volatile Thread runner;
  27. //因为有很多线程去get当前任务的结果,所以此处使用了一种头插头取的队列
  28. private volatile WaitNode waiters;
  29. }

2.2、FutureTask 构造方法

构造方法可传入Callable或者Runnable,传入Runnable时通过适配器模式将runable转换为callable,同时将当前FutureTask 设置为NEW状态

  1. public FutureTask(Callable<V> callable) {
  2. if (callable == null)
  3. throw new NullPointerException();
  4. this.callable = callable;
  5. this.state = NEW;
  6. }
  7. public FutureTask(Runnable runnable, V result) {
  8. //适配器模式 将runable转换为callable
  9. this.callable = Executors.callable(runnable, result);
  10. this.state = NEW;
  11. }

2.3、run方法

run方法是FutureTask 的执行逻辑,其中包含了CAS获取任务,执行业务代码,设置执行结果等工作

  1. public void run() {
  2. /**
  3. * 条件1:state != NEW
  4. * true: 当前task被执行过了或者取消了,非NEW状态 线程不再处理
  5. * false: 当前task为NEW 说明当前任务未被执行或者正在被其它线程执行
  6. * 条件2:!RUNNER.compareAndSet(this, null, Thread.currentThread())
  7. * 前置条件:state == NEW
  8. * 通过CAS把当前线程设置到RUNNER属性,表明该任务由当前线程处理
  9. * true: 当前线程抢占任务失败,该任务已被其它线程执行
  10. * false: 当前线程抢占任务成功
  11. *
  12. * 只要满足条件1或者2,即当前任务不是NEW状态或者当前任务被其他线程处理则返回,不做任务操作
  13. */
  14. if (state != NEW ||
  15. !RUNNER.compareAndSet(this, null, Thread.currentThread()))
  16. return;
  17. try {
  18. //表明当前任务为NEW状态且当前线程抢占到任务
  19. Callable<V> c = callable;
  20. /**
  21. * 条件1:防止提交空任务导致空指针
  22. * 条件2:防止外部线程在第一次判断任务状态为NEW后到任务执行期间取消掉任务
  23. */
  24. if (c != null && state == NEW) {
  25. //结果引用
  26. V result;
  27. /**
  28. * ran为true: callable.run代码块执行成功 未发生异常
  29. * ran为false: callable.run代码块执行失败 发生异常
  30. */
  31. boolean ran;
  32. try {
  33. //调用任务本身的逻辑
  34. result = c.call();
  35. ran = true;
  36. } catch (Throwable ex) {
  37. result = null;
  38. ran = false;
  39. //设置异常到outcom
  40. setException(ex);
  41. }
  42. //执行正常 设置结果到outcom
  43. if (ran)
  44. set(result);
  45. }
  46. } finally {
  47. // 状态修改后将runner设置为null
  48. runner = null;
  49. //判断当前任务状态是否是中断中状态,此时当前线程调用Thread.yield()释放cpu时间片
  50. // 防止调用cancel(true)的线程在将任务状态从NEW改为INTERRUPTING后因为cpu调度挂起了,没有来的及调用 t.interrupt()方法
  51. int s = state;
  52. if (s >= INTERRUPTING)
  53. handlePossibleCancellationInterrupt(s);
  54. }
  55. }

2.4、awaitDone方法

此方法是提交任务到线程池后,外部线程等待返回结果的方法,结果返回前线程会被阻塞,任务执行完毕或者任务被取消后,将其唤醒。

  1. /**
  2. *
  3. * @param timed 是否设置等待超时时间
  4. * @param nanos 等待超时时间
  5. * @return
  6. * @throws InterruptedException
  7. */
  8. private int awaitDone(boolean timed, long nanos)
  9. throws InterruptedException {
  10. long startTime = 0L; // Special value 0L means not yet parked
  11. //引用当前线程封装成WaitNode对象
  12. WaitNode q = null;
  13. //表示当前WaitNode对象没有放入等待者队列
  14. boolean queued = false;
  15. for (;;) {
  16. //获取最新状态
  17. int s = state;
  18. //任务执行完毕返回结果
  19. if (s > COMPLETING) {
  20. //如果调用awaitDone方法时 任务已经执行完毕 则直接返回结果 不用将WaitNode放入等待着队列了
  21. if (q != null)
  22. q.thread = null;
  23. return s;
  24. }
  25. else if (s == COMPLETING)
  26. // 线程已经执行完毕但是还未设置结果或者异常 处于完成的临界状态 让当前线程暂时释放cpu
  27. Thread.yield();
  28. else if (Thread.interrupted()) {
  29. //当前等待线程被中断,当前线程不在等待执行结果,移出等待队列,抛出中断异常
  30. removeWaiter(q);
  31. throw new InterruptedException();
  32. }
  33. else if (q == null) {
  34. /**
  35. * 第一次自旋: 当前线程还未创建WaitNode对象,此时为当前线程创建WaitNode对象
  36. * 前置条件: 当前任务处于NEW状态 当前等待线程没有被中断
  37. */
  38. if (timed && nanos <= 0L)
  39. return s;
  40. q = new WaitNode();
  41. }
  42. else if (!queued){
  43. /**
  44. * 第二次自旋: 将当前线程创建的WaitNode加入等待队列,设置为头节点
  45. * 前置条件:当前任务处于NEW状态 当前等待线程没有被中断,当前线程创建WaitNode对象 但是还未入队
  46. */
  47. // 设置当前节点的next为WAITERS,然后把WAITERS设置为当前节点,保证WAITERS一直指向头节点
  48. queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
  49. } else if (timed) {
  50. /**
  51. * 超时等待:设置了等待超时时间
  52. * 前置条件:当前线程创建WaitNode对象已经放入等待队列
  53. */
  54. final long parkNanos;
  55. if (startTime == 0L) {
  56. //第一次执行该代码块 记录开始等待结果的时间
  57. startTime = System.nanoTime();
  58. if (startTime == 0L)
  59. startTime = 1L;
  60. parkNanos = nanos;
  61. } else {
  62. /**
  63. * 第一次执行该代码块会设置阻塞时间
  64. * 第二次执行该代码块就会 将当前线程移出等待队列并唤醒
  65. */
  66. long elapsed = System.nanoTime() - startTime;
  67. if (elapsed >= nanos) {
  68. //如果超时 将当前线程移出等待队列并唤醒
  69. removeWaiter(q);
  70. return state;
  71. }
  72. //计算新的阻塞时间
  73. parkNanos = nanos - elapsed;
  74. }
  75. // nanoTime may be slow; recheck before parking
  76. if (state < COMPLETING){
  77. //设置阻塞时间
  78. LockSupport.parkNanos(this, parkNanos);
  79. }
  80. }
  81. /**
  82. *
  83. * 阻塞等待:
  84. * 前置条件:当前线程创建WaitNode对象已经放入等待队列,并且没有设置等待超时时间
  85. */
  86. else{
  87. //阻塞住当前get操作线程 当前线程变为waiting状态 除非有其他线程将其唤醒或者中断
  88. LockSupport.park(this);
  89. }
  90. }
  91. }

2.5、cancel方法

cancel方法是外部线程取消任务的api,可以对已经执行的任务进行中断

  1. /**
  2. *
  3. * @param mayInterruptIfRunning 如果任务正在运行 是否进行中断 true : 中断运行的任务 false:不中断运行的任务
  4. * @return
  5. */
  6. public boolean cancel(boolean mayInterruptIfRunning) {
  7. /**
  8. * 条件1:state == NEW 表示当前任务处于运行中或者线程池任务队列中
  9. * 条件2:STATE.compareAndSet(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED) CAS改变任务状态
  10. * 如果条件1和2有任何一项不满足 则取消任务失败
  11. */
  12. if (!(state == NEW && STATE.compareAndSet
  13. (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
  14. return false;
  15. try {
  16. //如果要中断已运行任务
  17. if (mayInterruptIfRunning) {
  18. try {
  19. Thread t = runner;
  20. /**
  21. * t !=null
  22. * fasle 说明当前任务在线程池队列中还没有线程获取它
  23. * true 说明当前任务已经被runner线程领取 可能已处于运行状态 调用中断接口
  24. */
  25. if (t != null)
  26. t.interrupt();
  27. } finally { // final state
  28. //设置任务状态为中断完成
  29. STATE.setRelease(this, INTERRUPTED);
  30. }
  31. }
  32. } finally {
  33. //唤醒所有阻塞的线程
  34. finishCompletion();
  35. }
  36. return true;
  37. }

2.6、removeWaiter方法

当等待任务结果的外部线程等待超时或者被中断时,会调用removeWaiter方法将等待线程移出任务的等待队列,并将其唤醒。

  1. /**
  2. * 把当前线程封装的节点从等待结果线程链表中移除
  3. * @param node
  4. */
  5. private void removeWaiter(WaitNode node) {
  6. if (node != null) {
  7. node.thread = null; //把要移除node的thread设置为null,遍历链表时通过thread判断是否要移除
  8. retry:
  9. for (;;) {
  10. /**
  11. * q 当前节点
  12. * pred 当前节点的前一个节点
  13. * s 当前节点的下一个节点
  14. * 循环条件 当前节点不为空
  15. * 循环结束后 当前节点指向下一个节点
  16. */
  17. for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
  18. s = q.next;
  19. if (q.thread != null)
  20. pred = q;
  21. /**
  22. * pred != null 当前节点需要移除并且当前节点不是头节点
  23. * 前置条件: q.thread == null 说明当前节点需要移除
  24. * 让当前节点的前一个节点指向当前节点的下一节点 即可移除当前节点
  25. */
  26. else if (pred != null) {
  27. pred.next = s;
  28. if (pred.thread == null) // check for race
  29. continue retry;
  30. }
  31. /**
  32. * !WAITERS.compareAndSet(this, q, s) CAS替换头节点失败
  33. * 前置条件: q.thread == null 说明当前节点需要移除
  34. * pred == null 说明当前节点为头节点
  35. * 即需要把头节点进行移除,通过CAS将头节点替换为其下一个节点
  36. */
  37. else if (!WAITERS.compareAndSet(this, q, s))
  38. continue retry;
  39. }
  40. //说明本次要清除的等待线程对象全部清除完毕
  41. break;
  42. }
  43. }
  44. }

2.7、get方法

get方法是供外部线程获取执行结果的方法,如果任务未执行或者正在执行会调用awaitDone方法将等待线程阻塞。

  1. public V get() throws InterruptedException, ExecutionException {
  2. int s = state;
  3. //判断任务处于未执行或者正在执行状态 调用get的外部线程会被阻塞
  4. if (s <= COMPLETING){
  5. //最核心的方法 阻塞等待任务执行完毕
  6. s = awaitDone(false, 0L);
  7. }
  8. return report(s);
  9. }

2.8、report方法

返回结果

  1. private V report(int s) throws ExecutionException {
  2. //正常情况下outcome为任务运行结果 非正常情况下 outcome为抛出异常
  3. Object x = outcome;
  4. if (s == NORMAL){
  5. //任务正常结束 返回结果
  6. return (V)x;
  7. }
  8. if (s >= CANCELLED){
  9. //被取消了
  10. throw new CancellationException();
  11. }
  12. //说明callable实现有问题
  13. throw new ExecutionException((Throwable)x);
  14. }

2.9、finishCompletion方法

收尾方法

  1. /**
  2. * 唤醒等待线程
  3. */
  4. private void finishCompletion() {
  5. // q指向waiters链表的头节点
  6. for (WaitNode q; (q = waiters) != null;) {
  7. //通过CAS将waiters设置为null 可能会有竞争,外部线程调用cancel时也会触发finishCompletion方法
  8. if (WAITERS.weakCompareAndSet(this, q, null)) {
  9. for (;;) {
  10. Thread t = q.thread;
  11. if (t != null) {
  12. q.thread = null;
  13. //唤醒等待结果的线程
  14. LockSupport.unpark(t);
  15. }
  16. //继续遍历链表 如果链表遍历完毕 则退出
  17. WaitNode next = q.next;
  18. if (next == null)
  19. break;
  20. q.next = null; // unlink to help gc
  21. q = next;
  22. }
  23. break;
  24. }
  25. }
  26. //扩展点
  27. done();
  28. //help gc
  29. callable = null; // to reduce footprint
  30. }