问题来源是研究flink-runtime源码中发现很多注册在terminateRuture和resultFuture上的回调,模式如下

  1. CompletableFuture terminateFuture = new CompletableFuture();
  2. terminateFuture.thenAccept(function1);
  3. terminateFuture.thenAccept(function2);

并且实际上function1 和 function2之间还有依赖关系,因此好奇回调函数执行顺序的具体机制,本文就以几个例子来说明其内部原理

例子一

  1. @Test
  2. public void testCompleteOrder1() throws InterruptedException {
  3. CompletableFuture<String> base = CompletableFuture.supplyAsync(() -> {
  4. LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
  5. return "source";
  6. });
  7. base.thenAccept(new Function1());
  8. base.thenAccept(new Function2());
  9. LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(20));
  10. }
  11. static class Function1 implements Consumer {
  12. @Override
  13. public void accept(Object o) {
  14. System.out.println("entered first consumer in " + Thread.currentThread());
  15. LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
  16. System.out.println("leaving first consumer");
  17. }
  18. }
  19. static class Function2 implements Consumer {
  20. @Override
  21. public void accept(Object o) {
  22. System.out.println("entered second consumer in " + Thread.currentThread());
  23. LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));
  24. System.out.println("leaving second consumer");
  25. }
  26. }

测试用例, 用静态内部类的方式测试,debug时比较好区分执行的是哪个fn。

输出

  1. entered second consumer in Thread[ForkJoinPool.commonPool-worker-1,5,main]
  2. leaving second consumer
  3. entered first consumer in Thread[ForkJoinPool.commonPool-worker-1,5,main]
  4. leaving first consumer

我们可以看到输出结果是按照LIFO的方式输出的,即使second中block了10s,也是顺序输出的。

为什么是按照LIFO的方式输出的呢?先来看thenAccept调用会做什么?

  1. private CompletableFuture<Void> uniAcceptStage(Executor e,
  2. Consumer<? super T> f) {
  3. if (f == null) throw new NullPointerException();
  4. // 创建一个新的CompletableFuture
  5. CompletableFuture<Void> d = new CompletableFuture<Void>();
  6. if (e != null || !d.uniAccept(this, f, null)) {
  7. // d => dep this => src f => consumer function
  8. UniAccept<T> c = new UniAccept<T>(e, d, this, f);
  9. // 将UniAccept 保存到栈
  10. push(c);
  11. c.tryFire(SYNC);
  12. }
  13. return d;
  14. }

push stack实现

  1. /** Pushes the given completion (if it exists) unless done. */
  2. final void push(UniCompletion<?,?> c) {
  3. if (c != null) {
  4. // 该future没有完成,尝试push stack
  5. while (result == null && !tryPushStack(c))
  6. lazySetNext(c, null); // clear on failure
  7. }
  8. }
  9. /** Returns true if successfully pushed c onto stack. */
  10. final boolean tryPushStack(Completion c) {
  11. Completion h = stack; // 获取当前栈顶
  12. lazySetNext(c, h); // 将c.next 指向h
  13. return UNSAFE.compareAndSwapObject(this, STACK, h, c); // 将this的STACK的字段从h修改成新的 c
  14. }
  15. static void lazySetNext(Completion c, Completion next) {
  16. // NEXT = u.objectFieldOffset(Completion.class.getDeclaredField("next"));
  17. // 利用Unsafe工具,修改c 对象的next变量的值,NEXT offset值在初始化时获取
  18. UNSAFE.putOrderedObject(c, NEXT, next);
  19. }

例子二

  1. @Test
  2. public void testCompleteOrder1() throws InterruptedException {
  3. CompletableFuture<String> base = CompletableFuture.supplyAsync(() -> {
  4. LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
  5. return "source";
  6. });
  7. base.thenAccept(new Function1());
  8. LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
  9. base.thenAccept(new Function2());
  10. LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));
  11. }
  12. static class Function1 implements Consumer {
  13. @Override
  14. public void accept(Object o) {
  15. System.out.println("entered first consumer in " + Thread.currentThread());
  16. LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
  17. System.out.println("leaving first consumer");
  18. }
  19. }
  20. static class Function2 implements Consumer {
  21. @Override
  22. public void accept(Object o) {
  23. System.out.println("entered second consumer in " + Thread.currentThread());
  24. LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
  25. System.out.println("leaving second consumer");
  26. }
  27. }
  1. entered first consumer in Thread[ForkJoinPool.commonPool-worker-1,5,main]
  2. entered second consumer in Thread[main,5,main]
  3. leaving second consumer
  4. leaving first consumer

和例子1不一样的地方在于在注册第二个thenAccept()回调之前先block了2s, 输出结果便不再按照LIFO的方式触发,并且执行线程也发生了变化,那么其原因是什么呢?
再测试将两次thenAccept之间的park时间改到1s,发现结果又和例子一一致了,那么我们也许能够猜出一些原因。
实际上上因为在执行thenAccept时,首先会检查这个future的结果是否已经完成,如果没有完成将completion保存到栈上,如果已经完成,那么就会在当前线程 ,也就是main线程中直接触发,那么此时就是两个线程并行执行回调,无法保序

例子三

为了保证Function2在Function1之后执行需要通过以下的方式

  1. @Test
  2. public void test3() {
  3. CompletableFuture<String> base = CompletableFuture.supplyAsync(() -> {
  4. LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
  5. return "source";
  6. });
  7. CompletableFuture<Void> next = base.thenAccept(new Function3());
  8. base.thenAcceptBoth(next, new Function4()).join();
  9. }
  10. static class Function3 implements Consumer {
  11. @Override
  12. public void accept(Object o) {
  13. System.out.println("entered first consumer in " + Thread.currentThread());
  14. LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
  15. System.out.println("leaving first consumer");
  16. }
  17. }
  18. static class Function4 implements BiConsumer {
  19. @Override
  20. public void accept(Object first, Object second) {
  21. System.out.println("entered second consumer in " + Thread.currentThread());
  22. LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
  23. System.out.println("leaving second consumer");
  24. }
  25. }@Test
  26. public void test3() {
  27. CompletableFuture<String> base = CompletableFuture.supplyAsync(() -> {
  28. LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
  29. return "source";
  30. });
  31. CompletableFuture<Void> next = base.thenAccept(s -> {
  32. System.out.println("entered first consumer in " + Thread.currentThread());
  33. LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
  34. System.out.println("leaving first consumer");
  35. });
  36. LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(4));
  37. base.thenAcceptBoth(next, (s, ignored) -> {
  38. System.out.println("entered second consumer in " + Thread.currentThread());
  39. LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
  40. System.out.println("leaving second consumer");
  41. })
  42. .join();
  43. }

输出结果

  1. entered first consumer in Thread[ForkJoinPool.commonPool-worker-1,5,main]
  2. leaving first consumer
  3. entered second consumer in Thread[ForkJoinPool.commonPool-worker-1,5,main]
  4. leaving second consumer

这个比较好理解,Function4的执行已经依赖Function3执行的future完成后才会触发,因此变成了按照编码顺序执行了

总结

  1. 对同一个completableFuture通过thenApply注册两个回调,其执行顺序是LIFO,但是最终两个Function的执行结果可能并行,也可能串行,没有严格保障
  2. 如果需要保序,可以显示通过thenAcceptBoth的方式串行起来执行

例子来源 https://stackoverflow.com/questions/37918062/what-is-the-order-in-which-multiple-thenaccept-blocks-of-a-completablefuture-are