前言

在flink排查一例问题的时候,偶然注意到一个CompletetabeFuture使用上对顺序依赖的case,在之前接触使用CompletetableFuture的时候没有考虑过其回调函数的执行顺序问题,写此文记录一下。

简介

首先因为我对CompletetableFuture的使用也不是很熟悉,所以先介绍下什么是CompletetableFuture。在介绍CompletetableFuture我们先了解下为什么会有CompletetableFuture。通常我们再提交一个异步任务之后会得到一个Future,来表示一个异步计算的结果。但是要获取异步任务的执行结果,你只能通过while循环通过isDone检测,或者通过get方法阻塞等待。为了提升异步编程的体验,很多类库都多Future做了扩展,例如netty扩展Future提供addListener,guava也提供了ListenableFuture接口,在到jdk8,jdk也原生提供了CompletetableFuture接口,增强了Future函数异步编程的能力。

ListenableFuture

这里先介绍guava中的ListenableFuture的使用和实现

使用

  1. @Test
  2. public void test() {
  3. ListeningExecutorService executorService =
  4. MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
  5. ListenableFuture lf =
  6. executorService.submit(
  7. () -> {
  8. // System.out.println("t1");
  9. throw new RuntimeException("Exception");
  10. });
  11. lf.addListener(
  12. () -> {
  13. System.out.println(
  14. "t1 completed callback1 " + Thread.currentThread().getName());
  15. },
  16. executorService);
  17. lf.addListener(
  18. () -> {
  19. System.out.println(
  20. "t1 completed callback2 " + Thread.currentThread().getName());
  21. },
  22. executorService);
  23. // 使用FutureCallback 处理成功与失败
  24. FutureCallback callback =
  25. new FutureCallback() {
  26. @Override
  27. public void onSuccess(@Nullable Object o) {
  28. System.out.println("success callback");
  29. }
  30. @Override
  31. public void onFailure(Throwable throwable) {
  32. System.out.println(throwable.getMessage());
  33. }
  34. };
  35. Futures.addCallback(lf, callback, executorService);
  36. }

执行结果

  1. t1 completed callback1 pool-1-thread-1
  2. t1 completed callback2 pool-1-thread-2
  3. Exception

这里面使用了两种接口addListener和addCallback,addCallback可以设置成功和失败的回调。

实现原理

在执行executorService.submit时,实际上是还是执行的父类ThreadExecutorPool的submit方法,但是其覆盖了newTaskFor方法,返回的是一个TrustedListenableFutureTask对象。

  1. public abstract class AbstractListeningExecutorService extends AbstractExecutorService implements ListeningExecutorService {
  2. public AbstractListeningExecutorService() {
  3. }
  4. protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  5. return TrustedListenableFutureTask.create(runnable, value);
  6. }
  7. protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
  8. return TrustedListenableFutureTask.create(callable);
  9. }
  10. public ListenableFuture<?> submit(Runnable task) {
  11. return (ListenableFuture)super.submit(task);
  12. }
  13. public <T> ListenableFuture<T> submit(Runnable task, @Nullable T result) {
  14. return (ListenableFuture)super.submit(task, result);
  15. }
  16. public <T> ListenableFuture<T> submit(Callable<T> task) {
  17. return (ListenableFuture)super.submit(task);
  18. }
  19. }

而submit函数返回的对象就是通过newTaskFor创建的TrustedListenableFutureTask

  1. public <T> Future<T> submit(Callable<T> task) {
  2. if (task == null) throw new NullPointerException();
  3. RunnableFuture<T> ftask = newTaskFor(task);
  4. execute(ftask);
  5. return ftask;
  6. }

因此addListener实际上是调用的TrustedListenableFutureTask的父类TrustedFuture

  1. public void addListener(Runnable listener, Executor executor) {
  2. Preconditions.checkNotNull(listener, "Runnable was null.");
  3. Preconditions.checkNotNull(executor, "Executor was null.");
  4. if (!this.isDone()) {
  5. AbstractFuture.Listener oldHead = this.listeners;
  6. if (oldHead != AbstractFuture.Listener.TOMBSTONE) {
  7. AbstractFuture.Listener newNode = new AbstractFuture.Listener(listener, executor);
  8. do {
  9. newNode.next = oldHead;
  10. if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) {
  11. return;
  12. }
  13. oldHead = this.listeners;
  14. } while(oldHead != AbstractFuture.Listener.TOMBSTONE);
  15. }
  16. }
  17. executeListener(listener, executor);
  18. }

如果异步任务已经完成那么就直接执行listener,如果没有,就添加到链表中,链表中保存了所有待执行的listener。而上面例子中Futures.addCallback实际上也是调用addListener的方法,只是对listener进行了封装,

  1. public static <V> void addCallback(ListenableFuture<V> future, FutureCallback<? super V> callback, Executor executor) {
  2. Preconditions.checkNotNull(callback);
  3. future.addListener(new Futures.CallbackListener(future, callback), executor);
  4. }
  1. private static final class CallbackListener<V> implements Runnable {
  2. final Future<V> future;
  3. final FutureCallback<? super V> callback;
  4. CallbackListener(Future<V> future, FutureCallback<? super V> callback) {
  5. this.future = future;
  6. this.callback = callback;
  7. }
  8. public void run() {
  9. Object value;
  10. try {
  11. value = Futures.getDone(this.future);
  12. } catch (ExecutionException var3) {
  13. this.callback.onFailure(var3.getCause());
  14. return;
  15. } catch (Error | RuntimeException var4) {
  16. this.callback.onFailure(var4);
  17. return;
  18. }
  19. this.callback.onSuccess(value);
  20. }
  21. }

注册完成了,那么回调函数是在哪里被执行的呢?回到刚刚的TrustedListenableFutureTask,在其执行完run方法之后会在finally块中执行回调函数的逻辑

  1. void afterRanInterruptibly(V result, Throwable error) {
  2. if (error == null) {
  3. TrustedListenableFutureTask.this.set(result);
  4. } else {
  5. TrustedListenableFutureTask.this.setException(error);
  6. }
  7. }

遍历链表,完成执行,但是提交任务都是通过executor来执行的不保障执行顺序

  1. while(next != null) {
  2. AbstractFuture.Listener curr = next;
  3. next = next.next;
  4. Runnable task = curr.task;
  5. if (task instanceof AbstractFuture.SetFuture) {
  6. AbstractFuture.SetFuture<?> setFuture = (AbstractFuture.SetFuture)task;
  7. future = setFuture.owner;
  8. if (future.value == setFuture) {
  9. Object valueToSet = getFutureValue(setFuture.future);
  10. if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) {
  11. continue label23;
  12. }
  13. }
  14. } else {
  15. executeListener(task, curr.executor);
  16. }
  17. }

CompletableFuture

completable future和listener future很类似,但是提供了更丰富的接口以及链式调用的能力,在Listener future中要实现链式的调用需要嵌套内部类来写。

使用

使用介绍这篇文档介绍的很详细可以参照,以及可以参考下java.util.concurrent.CompletableFuture的官方注释,比较详细
https://colobu.com/2016/02/29/Java-CompletableFuture/

注意点

  1. 一个CompletableFuture注册多个回调函数执行顺序

    1. @Test
    2. public void testCompletableFuture() {
    3. CompletableFuture<String> c = new CompletableFuture<>();
    4. c.whenComplete((String value, Throwable e) -> {
    5. System.out.println("whenComplete" + Thread.currentThread().getName());
    6. });
    7. c.thenAccept((String value) -> {
    8. System.out.println("thenAccept" + Thread.currentThread().getName());
    9. });
    10. c.complete("1");
    11. }

    结果

    1. thenAcceptmain
    2. whenCompletemain

    把注册顺序颠倒结果就会颠倒,也就是在没有异步函数的情况下,回调函数的执行是LIFO的,和ListenableFuture有差异,主要原因是CompletableFuture中存储回调函数是通过栈来保存的。 ```java

  • A CompletableFuture may have dependent completion actions,
  • collected in a linked stack. It atomically completes by CASing
  • a result field, and then pops off and runs those actions. This
  • applies across normal vs exceptional outcomes, sync vs async
  • actions, binary triggers, and various forms of completions. ```

参考

https://blog.csdn.net/PROGRAM_anywhere/article/details/83552126 ListenableFuture 实现原理