一.使用

  1. public class Test {
  2. public static void main(String[] args) throws InterruptedException {
  3. ExecutorService executorService = Executors.newFixedThreadPool(2);
  4. CompletableFuture.supplyAsync(() -> {
  5. try {
  6. Thread.sleep(2000);
  7. } catch (Exception e) {
  8. e.printStackTrace();
  9. }
  10. System.out.println("AAA " + Thread.currentThread().getName());
  11. return "hello ";
  12. }, executorService).thenAccept(s -> {
  13. System.out.println(s + "world");
  14. System.out.println("BBB " + Thread.currentThread().getName());
  15. });
  16. System.out.println(Thread.currentThread().getName());
  17. Thread.sleep(40000);
  18. }
  19. }
  20. 输出:
  21. main
  22. AAA pool-1-thread-1
  23. hello world
  24. BBB pool-1-thread-1

二.源码

  1. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
  2. Executor executor) {
  3. return asyncSupplyStage(screenExecutor(executor), supplier);
  4. }
  5. static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
  6. Supplier<U> f) {
  7. if (f == null) throw new NullPointerException();
  8. CompletableFuture<U> d = new CompletableFuture<U>();
  9. e.execute(new AsyncSupply<U>(d, f));
  10. return d;
  11. }
  12. static final class AsyncSupply<T> extends ForkJoinTask<Void>
  13. implements Runnable, AsynchronousCompletionTask {
  14. CompletableFuture<T> dep; Supplier<T> fn;
  15. AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
  16. this.dep = dep; this.fn = fn;
  17. }
  18. public void run() {
  19. CompletableFuture<T> d; Supplier<T> f;
  20. if ((d = dep) != null && (f = fn) != null) {
  21. dep = null; fn = null;
  22. if (d.result == null) {
  23. try {
  24. // 执行到这Thread.sleep(2000);
  25. d.completeValue(f.get());
  26. } catch (Throwable ex) {
  27. d.completeThrowable(ex);
  28. }
  29. }
  30. d.postComplete();
  31. }
  32. }
  33. }
  34. public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
  35. return uniAcceptStage(null, action);
  36. }
  37. private CompletableFuture<Void> uniAcceptStage(Executor e,
  38. Consumer<? super T> f) {
  39. if (f == null) throw new NullPointerException();
  40. CompletableFuture<Void> d = new CompletableFuture<Void>();
  41. if (e != null || !d.uniAccept(this, f, null)) {
  42. UniAccept<T> c = new UniAccept<T>(e, d, this, f);
  43. // 入栈volatile Completion stack;
  44. push(c);
  45. c.tryFire(SYNC);
  46. }
  47. return d;
  48. }
  49. final <S> boolean uniAccept(CompletableFuture<S> a,
  50. Consumer<? super S> f, UniAccept<S> c) {
  51. Object r; Throwable x;
  52. // a.result == null
  53. if (a == null || (r = a.result) == null || f == null)
  54. return false;
  55. }
  56. static final class UniAccept<T> extends UniCompletion<T,Void> {
  57. Consumer<? super T> fn;
  58. UniAccept(Executor executor, CompletableFuture<Void> dep,
  59. CompletableFuture<T> src, Consumer<? super T> fn) {
  60. super(executor, dep, src); this.fn = fn;
  61. }
  62. final CompletableFuture<Void> tryFire(int mode) {
  63. CompletableFuture<Void> d; CompletableFuture<T> a;
  64. if ((d = dep) == null ||
  65. !d.uniAccept(a = src, fn, mode > 0 ? null : this))
  66. // 执行到这
  67. return null;
  68. dep = null; src = null; fn = null;
  69. return d.postFire(a, mode);
  70. }
  71. }
  72. final void postComplete() {
  73. CompletableFuture<?> f = this; Completion h;
  74. // f.stack != null
  75. while ((h = f.stack) != null ||
  76. (f != this && (h = (f = this).stack) != null)) {
  77. CompletableFuture<?> d; Completion t;
  78. // stack = h.next
  79. if (f.casStack(h, t = h.next)) {
  80. if (t != null) {
  81. if (f != this) {
  82. pushStack(h);
  83. continue;
  84. }
  85. h.next = null; // detach
  86. }
  87. f = (d = h.tryFire(NESTED)) == null ? this : d;
  88. }
  89. }
  90. }
  91. final CompletableFuture<Void> tryFire(int mode) {
  92. CompletableFuture<Void> d; CompletableFuture<T> a;
  93. if ((d = dep) == null ||
  94. !d.uniAccept(a = src, fn, mode > 0 ? null : this))
  95. return null;
  96. dep = null; src = null; fn = null;
  97. return d.postFire(a, mode);
  98. }
  99. final <S> boolean uniAccept(CompletableFuture<S> a,
  100. Consumer<? super S> f, UniAccept<S> c) {
  101. Object r; Throwable x;
  102. // a.result:hello
  103. if (a == null || (r = a.result) == null || f == null)
  104. return false;
  105. tryComplete: if (result == null) {
  106. if (r instanceof AltResult) {
  107. if ((x = ((AltResult)r).ex) != null) {
  108. completeThrowable(x, r);
  109. break tryComplete;
  110. }
  111. r = null;
  112. }
  113. try {
  114. if (c != null && !c.claim())
  115. return false;
  116. @SuppressWarnings("unchecked") S s = (S) r;
  117. // s:hello
  118. f.accept(s);
  119. completeNull();
  120. } catch (Throwable ex) {
  121. completeThrowable(ex);
  122. }
  123. }
  124. return true;
  125. }