简介及用法

CompletableFuture是什么?

  1. 可以明确完成的future,可以set value和status。
  2. 实现了CompletionStage接口支持任务依赖。可以用于java异步编程。

基本用法:https://www.callicoder.com/java-8-completablefuture-tutorial/

CompletableFuture源码分析

CompletableFuture成员变量:

  1. //通过result为不为null判断future是否完成,AltResult用来封装null结果和捕获异常
  2. volatile Object result; // Either the result or boxed AltResult
  3. //无锁并发栈,使用cas实现,用来实现任务依赖
  4. volatile Completion stack; // Top of Treiber stack of dependent actions
  5. final boolean internalComplete(Object r) { // CAS from null to r
  6. return RESULT.compareAndSet(this, null, r);
  7. }
  8. /** Returns true if successfully pushed c onto stack. */
  9. final boolean tryPushStack(Completion c) {
  10. Completion h = stack;
  11. //把当前栈顶设为c的next,NEXT是Completion next字段的VarHandler
  12. NEXT.set(c, h); // CAS piggyback
  13. //cas栈顶
  14. return STACK.compareAndSet(this, h, c);
  15. }

stack节点类Completion及其子类:
image.png
CompletableFuture默认使用ForkJoinPool._commonPool_()
CompletableFuture内部任务依赖用Completion表示,Completion继承自ForkJoinTask,实现了异步处理。并且对应不同的场景有很多子类实现。
Completion分类:
- single-input (UniCompletion),
- two-input (BiCompletion),
- projected (BiCompletions using exactly one of two inputs),
- shared (CoCompletion, used by the second of two sources),
- zero-input source actions,
- Signallers that unblock waiters.
具体逻辑jdk源码注释写比较清楚。

下面我们结合一段demo代码,简单过一下源码。

  1. CompletableFuture.supplyAsync(() -> {
  2. try {
  3. TimeUnit.SECONDS.sleep(1);
  4. } catch (InterruptedException e) {
  5. throw new IllegalStateException(e);
  6. }
  7. System.out.println("Some Result");
  8. return "Some Result";
  9. }).thenApplyAsync(result -> {
  10. System.out.println("Processed Result" + result);
  11. return "Processed Result" + result;
  12. });

从任务定义的角度分析:

supplyAsync->asyncSupplyStage,主要是生成一个AsyncSupply丢到线程池里执行。

  1. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
  2. return asyncSupplyStage(ASYNC_POOL, supplier);
  3. }
  4. static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
  5. Supplier<U> f) {
  6. if (f == null) throw new NullPointerException();
  7. CompletableFuture<U> d = new CompletableFuture<U>();
  8. //直接生成AsyncSupply执行
  9. e.execute(new AsyncSupply<U>(d, f));
  10. return d;
  11. }
  12. static final class AsyncSupply<T> extends ForkJoinTask<Void>
  13. implements Runnable, AsynchronousCompletionTask {
  14. CompletableFuture<T> dep; Supplier<? extends T> fn;
  15. AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn) {
  16. this.dep = dep; this.fn = fn;
  17. }
  18. public final Void getRawResult() { return null; }
  19. public final void setRawResult(Void v) {}
  20. public final boolean exec() { run(); return false; }
  21. public void run() {
  22. CompletableFuture<T> d; Supplier<? extends T> f;
  23. if ((d = dep) != null && (f = fn) != null) {
  24. dep = null; fn = null;
  25. if (d.result == null) {
  26. //set value
  27. try {
  28. d.completeValue(f.get());
  29. } catch (Throwable ex) {
  30. d.completeThrowable(ex);
  31. }
  32. }
  33. //触发依赖任务
  34. d.postComplete();
  35. }
  36. }
  37. }

thenApplyAsync-》uniApplyStage,生成一个

  1. public <U> CompletableFuture<U> thenApplyAsync(
  2. Function<? super T,? extends U> fn) {
  3. return uniApplyStage(defaultExecutor(), fn);
  4. }
  5. private <V> CompletableFuture<V> uniApplyStage(
  6. Executor e, Function<? super T,? extends V> f) {
  7. if (f == null) throw new NullPointerException();
  8. Object r;
  9. if ((r = result) != null)
  10. return uniApplyNow(r, e, f);
  11. //生成一个新的CompletableFuture
  12. CompletableFuture<V> d = newIncompleteFuture();
  13. //封装UniApply类型的completion,push到栈
  14. unipush(new UniApply<T,V>(e, d, this, f));
  15. return d;
  16. }

从任务执行的角度分析:

AsyncSupply task执行逻辑简单,如上面所示。这里主要看一下postComplete,主要作用是递归触发依赖任务。
代码注释1、2、3、4、5为主流程

  1. final void postComplete() {
  2. /*
  3. * On each step, variable f holds current dependents to pop
  4. * and run. It is extended along only one path at a time,
  5. * pushing others to avoid unbounded recursion.
  6. */
  7. CompletableFuture<?> f = this; Completion h;
  8. while ((h = f.stack) != null ||//1.h当前future栈头
  9. (f != this && (h = (f = this).stack) != null)) { //5.如果this.stack!=null,跳出循环。
  10. CompletableFuture<?> d; Completion t;
  11. if (STACK.compareAndSet(f, h, t = h.next)) {//2.cas栈到next节点
  12. if (t != null) {
  13. if (f != this) { //把嵌套依赖任务放入当前stack
  14. pushStack(h);
  15. continue;
  16. }
  17. NEXT.compareAndSet(h, t, null); // try to detach 3.cas 栈头h的next为null
  18. }
  19. f = (d = h.tryFire(NESTED)) == null ? this : d;//4.触发栈头h,放入线程池带执行
  20. }
  21. }
  22. }

Completion.tryFire()方法:
usages:
image.png
参数mode
static final int SYNC = 0; //当定义任务时,如果前置任务已经完成,则立即触发
static final int ASYNC = 1;//只在Completion run和exec中调用,也就是任务执行时使用
static final int NESTED = -1;//只在postComplete方法中调用,用于任务触发
tryFire会调用两次:
1任务的触发,最终会调用UniCompletion.claim()方法
2任务的执行。

  1. abstract static class Completion extends ForkJoinTask<Void>
  2. implements Runnable, AsynchronousCompletionTask {
  3. volatile Completion next; // Treiber stack link
  4. /**
  5. * Performs completion action if triggered, returning a
  6. * dependent that may need propagation, if one exists.
  7. *
  8. * @param mode SYNC, ASYNC, or NESTED
  9. */
  10. abstract CompletableFuture<?> tryFire(int mode);
  11. /** Returns true if possibly still triggerable. Used by cleanStack. */
  12. abstract boolean isLive();
  13. public final void run() { tryFire(ASYNC); }
  14. public final boolean exec() { tryFire(ASYNC); return false; }
  15. public final Void getRawResult() { return null; }
  16. public final void setRawResult(Void v) {}
  17. }
  18. abstract static class UniCompletion<T,V> extends Completion {
  19. Executor executor; // executor to use (null if none)
  20. CompletableFuture<V> dep; // the dependent to complete
  21. CompletableFuture<T> src; // source for action
  22. UniCompletion(Executor executor, CompletableFuture<V> dep,
  23. CompletableFuture<T> src) {
  24. this.executor = executor; this.dep = dep; this.src = src;
  25. }
  26. /**
  27. * Returns true if action can be run. Call only when known to
  28. * be triggerable. Uses FJ tag bit to ensure that only one
  29. * thread claims ownership. If async, starts as task -- a
  30. * later call to tryFire will run action.
  31. */
  32. final boolean claim() {
  33. Executor e = executor;
  34. if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
  35. if (e == null)
  36. return true;
  37. executor = null; // disable
  38. e.execute(this);
  39. }
  40. return false;
  41. }
  42. final boolean isLive() { return dep != null; }
  43. }