问题来源是研究flink-runtime源码中发现很多注册在terminateRuture和resultFuture上的回调,模式如下
CompletableFuture terminateFuture = new CompletableFuture();
terminateFuture.thenAccept(function1);
terminateFuture.thenAccept(function2);
并且实际上function1 和 function2之间还有依赖关系,因此好奇回调函数执行顺序的具体机制,本文就以几个例子来说明其内部原理
例子一
@Test
public void testCompleteOrder1() throws InterruptedException {
CompletableFuture<String> base = CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
return "source";
});
base.thenAccept(new Function1());
base.thenAccept(new Function2());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(20));
}
static class Function1 implements Consumer {
@Override
public void accept(Object o) {
System.out.println("entered first consumer in " + Thread.currentThread());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
System.out.println("leaving first consumer");
}
}
static class Function2 implements Consumer {
@Override
public void accept(Object o) {
System.out.println("entered second consumer in " + Thread.currentThread());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));
System.out.println("leaving second consumer");
}
}
测试用例, 用静态内部类的方式测试,debug时比较好区分执行的是哪个fn。
输出
entered second consumer in Thread[ForkJoinPool.commonPool-worker-1,5,main]
leaving second consumer
entered first consumer in Thread[ForkJoinPool.commonPool-worker-1,5,main]
leaving first consumer
我们可以看到输出结果是按照LIFO的方式输出的,即使second中block了10s,也是顺序输出的。
为什么是按照LIFO的方式输出的呢?先来看thenAccept调用会做什么?
private CompletableFuture<Void> uniAcceptStage(Executor e,
Consumer<? super T> f) {
if (f == null) throw new NullPointerException();
// 创建一个新的CompletableFuture
CompletableFuture<Void> d = new CompletableFuture<Void>();
if (e != null || !d.uniAccept(this, f, null)) {
// d => dep this => src f => consumer function
UniAccept<T> c = new UniAccept<T>(e, d, this, f);
// 将UniAccept 保存到栈
push(c);
c.tryFire(SYNC);
}
return d;
}
push stack实现
/** Pushes the given completion (if it exists) unless done. */
final void push(UniCompletion<?,?> c) {
if (c != null) {
// 该future没有完成,尝试push stack
while (result == null && !tryPushStack(c))
lazySetNext(c, null); // clear on failure
}
}
/** Returns true if successfully pushed c onto stack. */
final boolean tryPushStack(Completion c) {
Completion h = stack; // 获取当前栈顶
lazySetNext(c, h); // 将c.next 指向h
return UNSAFE.compareAndSwapObject(this, STACK, h, c); // 将this的STACK的字段从h修改成新的 c
}
static void lazySetNext(Completion c, Completion next) {
// NEXT = u.objectFieldOffset(Completion.class.getDeclaredField("next"));
// 利用Unsafe工具,修改c 对象的next变量的值,NEXT offset值在初始化时获取
UNSAFE.putOrderedObject(c, NEXT, next);
}
例子二
@Test
public void testCompleteOrder1() throws InterruptedException {
CompletableFuture<String> base = CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
return "source";
});
base.thenAccept(new Function1());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
base.thenAccept(new Function2());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));
}
static class Function1 implements Consumer {
@Override
public void accept(Object o) {
System.out.println("entered first consumer in " + Thread.currentThread());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
System.out.println("leaving first consumer");
}
}
static class Function2 implements Consumer {
@Override
public void accept(Object o) {
System.out.println("entered second consumer in " + Thread.currentThread());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
System.out.println("leaving second consumer");
}
}
entered first consumer in Thread[ForkJoinPool.commonPool-worker-1,5,main]
entered second consumer in Thread[main,5,main]
leaving second consumer
leaving first consumer
和例子1不一样的地方在于在注册第二个thenAccept()回调之前先block了2s, 输出结果便不再按照LIFO的方式触发,并且执行线程也发生了变化,那么其原因是什么呢?
再测试将两次thenAccept之间的park时间改到1s,发现结果又和例子一一致了,那么我们也许能够猜出一些原因。
实际上上因为在执行thenAccept时,首先会检查这个future的结果是否已经完成,如果没有完成将completion保存到栈上,如果已经完成,那么就会在当前线程 ,也就是main线程中直接触发,那么此时就是两个线程并行执行回调,无法保序
例子三
为了保证Function2在Function1之后执行需要通过以下的方式
@Test
public void test3() {
CompletableFuture<String> base = CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
return "source";
});
CompletableFuture<Void> next = base.thenAccept(new Function3());
base.thenAcceptBoth(next, new Function4()).join();
}
static class Function3 implements Consumer {
@Override
public void accept(Object o) {
System.out.println("entered first consumer in " + Thread.currentThread());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
System.out.println("leaving first consumer");
}
}
static class Function4 implements BiConsumer {
@Override
public void accept(Object first, Object second) {
System.out.println("entered second consumer in " + Thread.currentThread());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
System.out.println("leaving second consumer");
}
}@Test
public void test3() {
CompletableFuture<String> base = CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
return "source";
});
CompletableFuture<Void> next = base.thenAccept(s -> {
System.out.println("entered first consumer in " + Thread.currentThread());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
System.out.println("leaving first consumer");
});
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(4));
base.thenAcceptBoth(next, (s, ignored) -> {
System.out.println("entered second consumer in " + Thread.currentThread());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
System.out.println("leaving second consumer");
})
.join();
}
输出结果
entered first consumer in Thread[ForkJoinPool.commonPool-worker-1,5,main]
leaving first consumer
entered second consumer in Thread[ForkJoinPool.commonPool-worker-1,5,main]
leaving second consumer
这个比较好理解,Function4的执行已经依赖Function3执行的future完成后才会触发,因此变成了按照编码顺序执行了
总结
- 对同一个completableFuture通过thenApply注册两个回调,其执行顺序是LIFO,但是最终两个Function的执行结果可能并行,也可能串行,没有严格保障
- 如果需要保序,可以显示通过thenAcceptBoth的方式串行起来执行