问题来源是研究flink-runtime源码中发现很多注册在terminateRuture和resultFuture上的回调,模式如下
CompletableFuture terminateFuture = new CompletableFuture();terminateFuture.thenAccept(function1);terminateFuture.thenAccept(function2);
并且实际上function1 和 function2之间还有依赖关系,因此好奇回调函数执行顺序的具体机制,本文就以几个例子来说明其内部原理
例子一
@Testpublic void testCompleteOrder1() throws InterruptedException {CompletableFuture<String> base = CompletableFuture.supplyAsync(() -> {LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));return "source";});base.thenAccept(new Function1());base.thenAccept(new Function2());LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(20));}static class Function1 implements Consumer {@Overridepublic void accept(Object o) {System.out.println("entered first consumer in " + Thread.currentThread());LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));System.out.println("leaving first consumer");}}static class Function2 implements Consumer {@Overridepublic void accept(Object o) {System.out.println("entered second consumer in " + Thread.currentThread());LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));System.out.println("leaving second consumer");}}
测试用例, 用静态内部类的方式测试,debug时比较好区分执行的是哪个fn。
输出
entered second consumer in Thread[ForkJoinPool.commonPool-worker-1,5,main]leaving second consumerentered first consumer in Thread[ForkJoinPool.commonPool-worker-1,5,main]leaving first consumer
我们可以看到输出结果是按照LIFO的方式输出的,即使second中block了10s,也是顺序输出的。
为什么是按照LIFO的方式输出的呢?先来看thenAccept调用会做什么?
private CompletableFuture<Void> uniAcceptStage(Executor e,Consumer<? super T> f) {if (f == null) throw new NullPointerException();// 创建一个新的CompletableFutureCompletableFuture<Void> d = new CompletableFuture<Void>();if (e != null || !d.uniAccept(this, f, null)) {// d => dep this => src f => consumer functionUniAccept<T> c = new UniAccept<T>(e, d, this, f);// 将UniAccept 保存到栈push(c);c.tryFire(SYNC);}return d;}
push stack实现
/** Pushes the given completion (if it exists) unless done. */final void push(UniCompletion<?,?> c) {if (c != null) {// 该future没有完成,尝试push stackwhile (result == null && !tryPushStack(c))lazySetNext(c, null); // clear on failure}}/** Returns true if successfully pushed c onto stack. */final boolean tryPushStack(Completion c) {Completion h = stack; // 获取当前栈顶lazySetNext(c, h); // 将c.next 指向hreturn UNSAFE.compareAndSwapObject(this, STACK, h, c); // 将this的STACK的字段从h修改成新的 c}static void lazySetNext(Completion c, Completion next) {// NEXT = u.objectFieldOffset(Completion.class.getDeclaredField("next"));// 利用Unsafe工具,修改c 对象的next变量的值,NEXT offset值在初始化时获取UNSAFE.putOrderedObject(c, NEXT, next);}
例子二
@Testpublic void testCompleteOrder1() throws InterruptedException {CompletableFuture<String> base = CompletableFuture.supplyAsync(() -> {LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));return "source";});base.thenAccept(new Function1());LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));base.thenAccept(new Function2());LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));}static class Function1 implements Consumer {@Overridepublic void accept(Object o) {System.out.println("entered first consumer in " + Thread.currentThread());LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));System.out.println("leaving first consumer");}}static class Function2 implements Consumer {@Overridepublic void accept(Object o) {System.out.println("entered second consumer in " + Thread.currentThread());LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));System.out.println("leaving second consumer");}}
entered first consumer in Thread[ForkJoinPool.commonPool-worker-1,5,main]entered second consumer in Thread[main,5,main]leaving second consumerleaving first consumer
和例子1不一样的地方在于在注册第二个thenAccept()回调之前先block了2s, 输出结果便不再按照LIFO的方式触发,并且执行线程也发生了变化,那么其原因是什么呢?
再测试将两次thenAccept之间的park时间改到1s,发现结果又和例子一一致了,那么我们也许能够猜出一些原因。
实际上上因为在执行thenAccept时,首先会检查这个future的结果是否已经完成,如果没有完成将completion保存到栈上,如果已经完成,那么就会在当前线程 ,也就是main线程中直接触发,那么此时就是两个线程并行执行回调,无法保序
例子三
为了保证Function2在Function1之后执行需要通过以下的方式
@Testpublic void test3() {CompletableFuture<String> base = CompletableFuture.supplyAsync(() -> {LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));return "source";});CompletableFuture<Void> next = base.thenAccept(new Function3());base.thenAcceptBoth(next, new Function4()).join();}static class Function3 implements Consumer {@Overridepublic void accept(Object o) {System.out.println("entered first consumer in " + Thread.currentThread());LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));System.out.println("leaving first consumer");}}static class Function4 implements BiConsumer {@Overridepublic void accept(Object first, Object second) {System.out.println("entered second consumer in " + Thread.currentThread());LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));System.out.println("leaving second consumer");}}@Testpublic void test3() {CompletableFuture<String> base = CompletableFuture.supplyAsync(() -> {LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));return "source";});CompletableFuture<Void> next = base.thenAccept(s -> {System.out.println("entered first consumer in " + Thread.currentThread());LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));System.out.println("leaving first consumer");});LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(4));base.thenAcceptBoth(next, (s, ignored) -> {System.out.println("entered second consumer in " + Thread.currentThread());LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));System.out.println("leaving second consumer");}).join();}
输出结果
entered first consumer in Thread[ForkJoinPool.commonPool-worker-1,5,main]leaving first consumerentered second consumer in Thread[ForkJoinPool.commonPool-worker-1,5,main]leaving second consumer
这个比较好理解,Function4的执行已经依赖Function3执行的future完成后才会触发,因此变成了按照编码顺序执行了
总结
- 对同一个completableFuture通过thenApply注册两个回调,其执行顺序是LIFO,但是最终两个Function的执行结果可能并行,也可能串行,没有严格保障
- 如果需要保序,可以显示通过thenAcceptBoth的方式串行起来执行
