前言
在flink排查一例问题的时候,偶然注意到一个CompletetabeFuture使用上对顺序依赖的case,在之前接触使用CompletetableFuture的时候没有考虑过其回调函数的执行顺序问题,写此文记录一下。
简介
首先因为我对CompletetableFuture的使用也不是很熟悉,所以先介绍下什么是CompletetableFuture。在介绍CompletetableFuture我们先了解下为什么会有CompletetableFuture。通常我们再提交一个异步任务之后会得到一个Future,来表示一个异步计算的结果。但是要获取异步任务的执行结果,你只能通过while循环通过isDone检测,或者通过get方法阻塞等待。为了提升异步编程的体验,很多类库都多Future做了扩展,例如netty扩展Future提供addListener,guava也提供了ListenableFuture接口,在到jdk8,jdk也原生提供了CompletetableFuture接口,增强了Future函数异步编程的能力。
ListenableFuture
这里先介绍guava中的ListenableFuture的使用和实现
使用
@Test
public void test() {
ListeningExecutorService executorService =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
ListenableFuture lf =
executorService.submit(
() -> {
// System.out.println("t1");
throw new RuntimeException("Exception");
});
lf.addListener(
() -> {
System.out.println(
"t1 completed callback1 " + Thread.currentThread().getName());
},
executorService);
lf.addListener(
() -> {
System.out.println(
"t1 completed callback2 " + Thread.currentThread().getName());
},
executorService);
// 使用FutureCallback 处理成功与失败
FutureCallback callback =
new FutureCallback() {
@Override
public void onSuccess(@Nullable Object o) {
System.out.println("success callback");
}
@Override
public void onFailure(Throwable throwable) {
System.out.println(throwable.getMessage());
}
};
Futures.addCallback(lf, callback, executorService);
}
执行结果
t1 completed callback1 pool-1-thread-1
t1 completed callback2 pool-1-thread-2
Exception
这里面使用了两种接口addListener和addCallback,addCallback可以设置成功和失败的回调。
实现原理
在执行executorService.submit时,实际上是还是执行的父类ThreadExecutorPool的submit方法,但是其覆盖了newTaskFor方法,返回的是一个TrustedListenableFutureTask对象。
public abstract class AbstractListeningExecutorService extends AbstractExecutorService implements ListeningExecutorService {
public AbstractListeningExecutorService() {
}
protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return TrustedListenableFutureTask.create(runnable, value);
}
protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return TrustedListenableFutureTask.create(callable);
}
public ListenableFuture<?> submit(Runnable task) {
return (ListenableFuture)super.submit(task);
}
public <T> ListenableFuture<T> submit(Runnable task, @Nullable T result) {
return (ListenableFuture)super.submit(task, result);
}
public <T> ListenableFuture<T> submit(Callable<T> task) {
return (ListenableFuture)super.submit(task);
}
}
而submit函数返回的对象就是通过newTaskFor创建的TrustedListenableFutureTask
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
因此addListener实际上是调用的TrustedListenableFutureTask的父类TrustedFuture
public void addListener(Runnable listener, Executor executor) {
Preconditions.checkNotNull(listener, "Runnable was null.");
Preconditions.checkNotNull(executor, "Executor was null.");
if (!this.isDone()) {
AbstractFuture.Listener oldHead = this.listeners;
if (oldHead != AbstractFuture.Listener.TOMBSTONE) {
AbstractFuture.Listener newNode = new AbstractFuture.Listener(listener, executor);
do {
newNode.next = oldHead;
if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) {
return;
}
oldHead = this.listeners;
} while(oldHead != AbstractFuture.Listener.TOMBSTONE);
}
}
executeListener(listener, executor);
}
如果异步任务已经完成那么就直接执行listener,如果没有,就添加到链表中,链表中保存了所有待执行的listener。而上面例子中Futures.addCallback实际上也是调用addListener的方法,只是对listener进行了封装,
public static <V> void addCallback(ListenableFuture<V> future, FutureCallback<? super V> callback, Executor executor) {
Preconditions.checkNotNull(callback);
future.addListener(new Futures.CallbackListener(future, callback), executor);
}
private static final class CallbackListener<V> implements Runnable {
final Future<V> future;
final FutureCallback<? super V> callback;
CallbackListener(Future<V> future, FutureCallback<? super V> callback) {
this.future = future;
this.callback = callback;
}
public void run() {
Object value;
try {
value = Futures.getDone(this.future);
} catch (ExecutionException var3) {
this.callback.onFailure(var3.getCause());
return;
} catch (Error | RuntimeException var4) {
this.callback.onFailure(var4);
return;
}
this.callback.onSuccess(value);
}
}
注册完成了,那么回调函数是在哪里被执行的呢?回到刚刚的TrustedListenableFutureTask,在其执行完run方法之后会在finally块中执行回调函数的逻辑
void afterRanInterruptibly(V result, Throwable error) {
if (error == null) {
TrustedListenableFutureTask.this.set(result);
} else {
TrustedListenableFutureTask.this.setException(error);
}
}
遍历链表,完成执行,但是提交任务都是通过executor来执行的不保障执行顺序
while(next != null) {
AbstractFuture.Listener curr = next;
next = next.next;
Runnable task = curr.task;
if (task instanceof AbstractFuture.SetFuture) {
AbstractFuture.SetFuture<?> setFuture = (AbstractFuture.SetFuture)task;
future = setFuture.owner;
if (future.value == setFuture) {
Object valueToSet = getFutureValue(setFuture.future);
if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) {
continue label23;
}
}
} else {
executeListener(task, curr.executor);
}
}
CompletableFuture
completable future和listener future很类似,但是提供了更丰富的接口以及链式调用的能力,在Listener future中要实现链式的调用需要嵌套内部类来写。
使用
使用介绍这篇文档介绍的很详细可以参照,以及可以参考下java.util.concurrent.CompletableFuture的官方注释,比较详细
https://colobu.com/2016/02/29/Java-CompletableFuture/
注意点
一个CompletableFuture注册多个回调函数执行顺序
@Test
public void testCompletableFuture() {
CompletableFuture<String> c = new CompletableFuture<>();
c.whenComplete((String value, Throwable e) -> {
System.out.println("whenComplete" + Thread.currentThread().getName());
});
c.thenAccept((String value) -> {
System.out.println("thenAccept" + Thread.currentThread().getName());
});
c.complete("1");
}
结果
thenAcceptmain
whenCompletemain
把注册顺序颠倒结果就会颠倒,也就是在没有异步函数的情况下,回调函数的执行是LIFO的,和ListenableFuture有差异,主要原因是CompletableFuture中存储回调函数是通过栈来保存的。 ```java
- A CompletableFuture may have dependent completion actions,
- collected in a linked stack. It atomically completes by CASing
- a result field, and then pops off and runs those actions. This
- applies across normal vs exceptional outcomes, sync vs async
- actions, binary triggers, and various forms of completions. ```
参考
https://blog.csdn.net/PROGRAM_anywhere/article/details/83552126 ListenableFuture 实现原理