前言
在flink排查一例问题的时候,偶然注意到一个CompletetabeFuture使用上对顺序依赖的case,在之前接触使用CompletetableFuture的时候没有考虑过其回调函数的执行顺序问题,写此文记录一下。
简介
首先因为我对CompletetableFuture的使用也不是很熟悉,所以先介绍下什么是CompletetableFuture。在介绍CompletetableFuture我们先了解下为什么会有CompletetableFuture。通常我们再提交一个异步任务之后会得到一个Future,来表示一个异步计算的结果。但是要获取异步任务的执行结果,你只能通过while循环通过isDone检测,或者通过get方法阻塞等待。为了提升异步编程的体验,很多类库都多Future做了扩展,例如netty扩展Future提供addListener,guava也提供了ListenableFuture接口,在到jdk8,jdk也原生提供了CompletetableFuture接口,增强了Future函数异步编程的能力。
ListenableFuture
这里先介绍guava中的ListenableFuture的使用和实现
使用
@Testpublic void test() {ListeningExecutorService executorService =MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));ListenableFuture lf =executorService.submit(() -> {// System.out.println("t1");throw new RuntimeException("Exception");});lf.addListener(() -> {System.out.println("t1 completed callback1 " + Thread.currentThread().getName());},executorService);lf.addListener(() -> {System.out.println("t1 completed callback2 " + Thread.currentThread().getName());},executorService);// 使用FutureCallback 处理成功与失败FutureCallback callback =new FutureCallback() {@Overridepublic void onSuccess(@Nullable Object o) {System.out.println("success callback");}@Overridepublic void onFailure(Throwable throwable) {System.out.println(throwable.getMessage());}};Futures.addCallback(lf, callback, executorService);}
执行结果
t1 completed callback1 pool-1-thread-1t1 completed callback2 pool-1-thread-2Exception
这里面使用了两种接口addListener和addCallback,addCallback可以设置成功和失败的回调。
实现原理
在执行executorService.submit时,实际上是还是执行的父类ThreadExecutorPool的submit方法,但是其覆盖了newTaskFor方法,返回的是一个TrustedListenableFutureTask对象。
public abstract class AbstractListeningExecutorService extends AbstractExecutorService implements ListeningExecutorService {public AbstractListeningExecutorService() {}protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return TrustedListenableFutureTask.create(runnable, value);}protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return TrustedListenableFutureTask.create(callable);}public ListenableFuture<?> submit(Runnable task) {return (ListenableFuture)super.submit(task);}public <T> ListenableFuture<T> submit(Runnable task, @Nullable T result) {return (ListenableFuture)super.submit(task, result);}public <T> ListenableFuture<T> submit(Callable<T> task) {return (ListenableFuture)super.submit(task);}}
而submit函数返回的对象就是通过newTaskFor创建的TrustedListenableFutureTask
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}
因此addListener实际上是调用的TrustedListenableFutureTask的父类TrustedFuture
public void addListener(Runnable listener, Executor executor) {Preconditions.checkNotNull(listener, "Runnable was null.");Preconditions.checkNotNull(executor, "Executor was null.");if (!this.isDone()) {AbstractFuture.Listener oldHead = this.listeners;if (oldHead != AbstractFuture.Listener.TOMBSTONE) {AbstractFuture.Listener newNode = new AbstractFuture.Listener(listener, executor);do {newNode.next = oldHead;if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) {return;}oldHead = this.listeners;} while(oldHead != AbstractFuture.Listener.TOMBSTONE);}}executeListener(listener, executor);}
如果异步任务已经完成那么就直接执行listener,如果没有,就添加到链表中,链表中保存了所有待执行的listener。而上面例子中Futures.addCallback实际上也是调用addListener的方法,只是对listener进行了封装,
public static <V> void addCallback(ListenableFuture<V> future, FutureCallback<? super V> callback, Executor executor) {Preconditions.checkNotNull(callback);future.addListener(new Futures.CallbackListener(future, callback), executor);}
private static final class CallbackListener<V> implements Runnable {final Future<V> future;final FutureCallback<? super V> callback;CallbackListener(Future<V> future, FutureCallback<? super V> callback) {this.future = future;this.callback = callback;}public void run() {Object value;try {value = Futures.getDone(this.future);} catch (ExecutionException var3) {this.callback.onFailure(var3.getCause());return;} catch (Error | RuntimeException var4) {this.callback.onFailure(var4);return;}this.callback.onSuccess(value);}}
注册完成了,那么回调函数是在哪里被执行的呢?回到刚刚的TrustedListenableFutureTask,在其执行完run方法之后会在finally块中执行回调函数的逻辑
void afterRanInterruptibly(V result, Throwable error) {if (error == null) {TrustedListenableFutureTask.this.set(result);} else {TrustedListenableFutureTask.this.setException(error);}}
遍历链表,完成执行,但是提交任务都是通过executor来执行的不保障执行顺序
while(next != null) {AbstractFuture.Listener curr = next;next = next.next;Runnable task = curr.task;if (task instanceof AbstractFuture.SetFuture) {AbstractFuture.SetFuture<?> setFuture = (AbstractFuture.SetFuture)task;future = setFuture.owner;if (future.value == setFuture) {Object valueToSet = getFutureValue(setFuture.future);if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) {continue label23;}}} else {executeListener(task, curr.executor);}}
CompletableFuture
completable future和listener future很类似,但是提供了更丰富的接口以及链式调用的能力,在Listener future中要实现链式的调用需要嵌套内部类来写。
使用
使用介绍这篇文档介绍的很详细可以参照,以及可以参考下java.util.concurrent.CompletableFuture的官方注释,比较详细
https://colobu.com/2016/02/29/Java-CompletableFuture/
注意点
一个CompletableFuture注册多个回调函数执行顺序
@Testpublic void testCompletableFuture() {CompletableFuture<String> c = new CompletableFuture<>();c.whenComplete((String value, Throwable e) -> {System.out.println("whenComplete" + Thread.currentThread().getName());});c.thenAccept((String value) -> {System.out.println("thenAccept" + Thread.currentThread().getName());});c.complete("1");}
结果
thenAcceptmainwhenCompletemain
把注册顺序颠倒结果就会颠倒,也就是在没有异步函数的情况下,回调函数的执行是LIFO的,和ListenableFuture有差异,主要原因是CompletableFuture中存储回调函数是通过栈来保存的。 ```java
- A CompletableFuture may have dependent completion actions,
- collected in a linked stack. It atomically completes by CASing
- a result field, and then pops off and runs those actions. This
- applies across normal vs exceptional outcomes, sync vs async
- actions, binary triggers, and various forms of completions. ```
参考
https://blog.csdn.net/PROGRAM_anywhere/article/details/83552126 ListenableFuture 实现原理
