Future的不足

在JDK1.8以前,通过调用线程池的submit方法可以让任务以异步的方式运行,该方法会返回一个Future对象.

Future是Java 5添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel方法停止任务的执行。

虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?

如果想获取Future的结果,可以通过get()方法获取,但是该方法会阻塞当前线程,我们可以在做完剩下的某些工作的时候调用get()方法试图去获取结果。
也可以调用非阻塞的方法isDone来确定操作是否完成,isDone这种方式有点儿类似下面的过程:
CompletableFuture概念和使用 - 图1

Future缺点

1.结果的获取不方便(要么调用get方法进入阻塞,要不轮询获取任务的结果.)
2.很难直接表述多个Future结果的依赖性(之前都是用CompletionService )

CompletableFuture类图

CompletableFuture概念和使用 - 图2
JDK1.8新加入的CompletableFuture 实现了 Future 和 CompletionStage 两个接口。实现 Future 接口是为了关注异步任务什么时候结束,和获取异步任务执行的结果,意味着可以像以前一样通过阻塞或者轮询的方式获得结果,实现 CompletionStage 接口,其提供了非常丰富的功能,实现了串行关系、并行关系、汇聚关系等。

CompletableFuture介绍

JDK1.8才新加入的一个实现类CompletableFuture,实现了Future, CompletionStage两个接口。实现了Future接口,意味着可以像以前一样通过阻塞或者轮询的方式获得结果。

CompletableFuture的优势

1)无需手工维护线程,给任务分配线程的工作无需开发人员关注;
2)在使用上,语义更加清晰明确;
例如:t3 = t1.thenCombine(t2, () -> { // doSomething … } 能够明确的表述任务 3 要等任务 2 和 任务 1完成后才会开始执行。
3)代码更加简练,支持链式调用,让你更专注业务逻辑。
4)方便的处理异常情况

CompletableFuture的应用场景

存在IO密集型的任务可以选择CompletableFuture,IO部分交由另外一个线程去执行。Logback、Log4j2异步日志记录的实现原理就是新起了一个线程去执行IO操作,这部分可以以CompletableFuture.runAsync(()->{ioOperation();})的方式去调用,有关Logback异步日志记录的原理可以参考这篇文章Logback异步日志记录。如果是CPU密集型就不推荐使用了推荐使用并行流.

API介绍


除了直接new出一个CompletableFuture的实例,还可以通过工厂方法创建CompletableFuture的实例

工厂方法:

static CompletableFuture[**Void**](http://www.matools.com/file/manual/jdk_api_1.8_google/java/lang/Void.html) runAsync(Runnable runnable)
返回一个新的CompletableFuture,它在运行给定操作后由运行在 ForkJoinPool.commonPool()中的任务 异步完成。
static CompletableFuture[**Void**](http://www.matools.com/file/manual/jdk_api_1.8_google/java/lang/Void.html) runAsync(Runnable runnable, Executor executor)
返回一个新的CompletableFuture,它在运行给定操作之后由在给定执行程序中运行的任务异步完成。
static CompletableFuture supplyAsync(Supplier supplier)
返回一个新的CompletableFuture,它通过在 ForkJoinPool.commonPool()中运行的任务与通过调用给定的供应商获得的值 异步完成。
static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
返回一个新的CompletableFuture,由给定执行器中运行的任务异步完成,并通过调用给定的供应商获得的值。


Asynsc表示异步,而supplyAsync与runAsync不同在与前者异步返回一个结果,后者是void.第二个函数第二个参数表示是用我们自己创建的线程池,否则采用默认的ForkJoinPool.commonPool()作为它的线程池。


获得结果的方法
public T get()
public T get(long timeout, TimeUnit unit)
public T getNow(T valueIfAbsent)
public T join()
getNow有点特殊,如果结果已经计算完则返回结果或者抛出异常,否则返回给定的valueIfAbsent值。
join返回计算的结果或者抛出一个unchecked异常(CompletionException),它和get对抛出的异常的处理有些细微的区别。

辅助方法

static CompletableFuture[**Void**](http://www.matools.com/file/manual/jdk_api_1.8_google/java/lang/Void.html) allOf(CompletableFuture<?>… cfs)
返回一个新的CompletableFuture,当所有给定的CompletableFutures完成时,完成。
static CompletableFuture[**Object**](http://www.matools.com/file/manual/jdk_api_1.8_google/java/lang/Object.html) anyOf(CompletableFuture<?>… cfs)
返回一个新的CompletableFuture,当任何一个给定的CompletableFutures完成时,完成相同的结果。


allOf方法是当所有的CompletableFuture都执行完后执行计算。
anyOf方法是当任意一个CompletableFuture执行完后就会执行计算,计算的结果相同。

zjj_parent_2019/09/20_14:51:16_zqzz9pyp7gnal4bztiobvmn5uksbhg


CompletionStage是一个接口,从命名上看得知是一个完成的阶段,它代表了一个特定的计算的阶段,可以同步或者异步的被完成。你可以把它看成一个计算流水线上的一个单元,并最终会产生一个最终结果,这意味着几个CompletionStage可以串联起来,一个完成的阶段可以触发下一阶段的执行,接着触发下一次,再接着触发下一次,……….。
总结CompletableFuture几个关键点:
1、计算可以由 Future ,Consumer 或者 Runnable 接口中的 apply,accept 或者 run等方法表示。
2、计算的执行主要有以下
a. 默认执行
b. 使用默认的CompletionStage的异步执行提供者异步执行。这些方法名使用someActionAsync这种格式表示。
c. 使用 Executor 提供者异步执行。这些方法同样也是someActionAsync这种格式,但是会增加一个Executor 参数。
CompletableFuture里大约有五十种方法,但是可以进行归类,

变换类 thenApply:

CompletableFuture thenApply(Function<? super T,? extends U> fn)
返回一个新的CompletionStage,当此阶段正常完成时,将以该阶段的结果作为所提供函数的参数执行。
CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn)
返回一个新的CompletionStage,当该阶段正常完成时,将使用此阶段的默认异步执行工具执行此阶段的结果作为所提供函数的参数。
CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
返回一个新的CompletionStage,当此阶段正常完成时,将使用提供的执行程序执行此阶段的结果作为提供函数的参数。


关键入参是函数式接口Function。它的入参是上一个阶段计算后的结果,返回值是经过转化后结果。
消费类 thenAccept:

CompletableFuture[**Void**](http://www.matools.com/file/manual/jdk_api_1.8_google/java/lang/Void.html) thenAccept(Consumer<? super T> action)
返回一个新的CompletionStage,当此阶段正常完成时,将以该阶段的结果作为提供的操作的参数执行。
CompletableFuture[**Void**](http://www.matools.com/file/manual/jdk_api_1.8_google/java/lang/Void.html) thenAcceptAsync(Consumer<? super T> action)
返回一个新的CompletionStage,当此阶段正常完成时,将使用此阶段的默认异步执行工具执行,此阶段的结果作为提供的操作的参数。
CompletableFuture[**Void**](http://www.matools.com/file/manual/jdk_api_1.8_google/java/lang/Void.html) thenAcceptAsync(Consumer<? super T> action, Executor executor)
返回一个新的CompletionStage,当此阶段正常完成时,将使用提供的执行程序执行此阶段的结果作为提供的操作的参数。


关键入参是函数式接口Consumer。它的入参是上一个阶段计算后的结果, 没有返回值。

执行操作类 thenRun:

CompletableFuture[Void](http://www.matools.com/file/manual/jdk_api_1.8_google/java/lang/Void.html) thenRun(Runnable action)
返回一个新的CompletionStage,当此阶段正常完成时,执行给定的操作。
CompletableFuture[Void](http://www.matools.com/file/manual/jdk_api_1.8_google/java/lang/Void.html) thenRunAsync(Runnable action)
返回一个新的CompletionStage,当此阶段正常完成时,使用此阶段的默认异步执行工具执行给定的操作。
CompletableFuture[Void](http://www.matools.com/file/manual/jdk_api_1.8_google/java/lang/Void.html) thenRunAsync(Runnable action, Executor executor)
返回一个新的CompletionStage,当此阶段正常完成时,使用提供的执行程序执行给定的操作。


对上一步的计算结果不关心,执行下一个操作,入参是一个Runnable的实例,表示上一步完成后执行的操作。
结合转化类:

CompletableFuture thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
返回一个新的CompletionStage,当这个和另一个给定的阶段都正常完成时,两个结果作为提供函数的参数执行。
CompletableFuture thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
返回一个新的CompletionStage,当这个和另一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行,其中两个结果作为提供函数的参数。
CompletableFuture thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn,Executor executor)
返回一个新的CompletionStage,当这个和另一个给定阶段正常完成时,使用提供的执行器执行,其中两个结果作为提供的函数的参数


需要上一步的处理返回值,并且other代表的CompletionStage 有返回值之后,利用这两个返回值,进行转换后返回指定类型的值。
两个CompletionStage是并行执行的,它们之间并没有先后依赖顺序,other并不会等待先前的CompletableFuture执行完毕后再执行。
结合转化类

CompletableFuture thenCompose(Function<? super T,? extends CompletionStage> fn)
返回一个新的CompletionStage,当这个阶段正常完成时,这个阶段将作为提供函数的参数执行。
CompletableFuture thenComposeAsync(Function<? super T,? extends CompletionStage> fn)
返回一个新的CompletionStage,当此阶段正常完成时,将使用此阶段的默认异步执行工具执行,此阶段作为提供的函数的参数。
CompletableFuture thenComposeAsync(Function<? super T,? extends CompletionStage> fn, Executor executor)
返回一个新的CompletionStage,当此阶段正常完成时,将使用提供的执行程序执行此阶段的结果作为提供函数的参数。

对于Compose可以连接两个CompletableFuture,其内部处理逻辑是当第一个CompletableFuture处理没有完成时会合并成一个CompletableFuture,如果处理完成,第二个future会紧接上一个CompletableFuture进行处理。
第一个CompletableFuture 的处理结果是第二个future需要的输入参数。

结合消费类:

CompletableFuture[Void](http://www.matools.com/file/manual/jdk_api_1.8_google/java/lang/Void.html) thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
返回一个新的CompletionStage,当这个和另一个给定的阶段都正常完成时,两个结果作为提供的操作的参数被执行。
CompletableFuture[Void](http://www.matools.com/file/manual/jdk_api_1.8_google/java/lang/Void.html) thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
返回一个新的CompletionStage,当这个和另一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行,其中两个结果作为提供的操作的参数。
CompletableFuture[Void](http://www.matools.com/file/manual/jdk_api_1.8_google/java/lang/Void.html) thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
返回一个新的CompletionStage,当这个和另一个给定阶段正常完成时,使用提供的执行器执行,其中两个结果作为提供的函数的参数。

需要上一步的处理返回值,并且other代表的CompletionStage 有返回值之后,利用这两个返回值,进行消费

运行后执行类:

CompletableFuture[Void](http://www.matools.com/file/manual/jdk_api_1.8_google/java/lang/Void.html) runAfterBoth(CompletionStage<?> other, Runnable action)
返回一个新的CompletionStage,当这个和另一个给定的阶段都正常完成时,执行给定的动作。
CompletableFuture[Void](http://www.matools.com/file/manual/jdk_api_1.8_google/java/lang/Void.html) runAfterBothAsync(CompletionStage<?> other, Runnable action)
返回一个新的CompletionStage,当这个和另一个给定阶段正常完成时,使用此阶段的默认异步执行工具执行给定的操作。
CompletableFuture[Void](http://www.matools.com/file/manual/jdk_api_1.8_google/java/lang/Void.html) runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)
返回一个新CompletionStage,当这和其他特定阶段正常完成,使用附带的执行见执行给定的动作CompletionStage覆盖特殊的完成规则的文档。

不关心这两个CompletionStage的结果,只关心这两个CompletionStage都执行完毕,之后再进行操作(Runnable)。
取最快转换类:

CompletableFuture applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,执行相应的结果作为提供的函数的参数。
CompletableFuture applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行,其中相应的结果作为提供函数的参数。
CompletableFuture applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,将使用提供的执行器执行,其中相应的结果作为参数提供给函数。


两个CompletionStage,谁计算的快,我就用那个CompletionStage的结果进行下一步的转化操作。现实开发场景中,总会碰到有两种渠道完成同一个事情,所以就可以调用这个方法,找一个最快的结果进行处理。

取最快消费类:

CompletableFuture[Void](http://www.matools.com/file/manual/jdk_api_1.8_google/java/lang/Void.html) acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,执行相应的结果作为提供的操作的参数。
CompletableFuture[Void](http://www.matools.com/file/manual/jdk_api_1.8_google/java/lang/Void.html) acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行,其中相应的结果作为提供的操作的参数。
CompletableFuture[Void](http://www.matools.com/file/manual/jdk_api_1.8_google/java/lang/Void.html) acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,将使用提供的执行器执行,其中相应的结果作为参数提供给函数。

两个CompletionStage,谁计算的快,我就用那个CompletionStage的结果进行下一步的消费操作。

取最快运行后执行类:

CompletableFuture[Void](http://www.matools.com/file/manual/jdk_api_1.8_google/java/lang/Void.html) runAfterEither(CompletionStage<?> other, Runnable action)
返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,执行给定的操作。
CompletableFuture[Void](http://www.matools.com/file/manual/jdk_api_1.8_google/java/lang/Void.html) runAfterEitherAsync(CompletionStage<?> other, Runnable action)
返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,使用此阶段的默认异步执行工具执行给定的操作。
CompletableFuture[Void](http://www.matools.com/file/manual/jdk_api_1.8_google/java/lang/Void.html) runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)
返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,使用提供的执行器执行给定的操作。


两个CompletionStage,任何一个完成了都会执行下一步的操作(Runnable)。

异常补偿类:
CompletableFuture概念和使用 - 图3
当运行时出现了异常,可以通过exceptionally进行补偿。

运行后记录结果类:

CompletableFuture[T](http://www.matools.com/file/manual/jdk_api_1.8_google/java/util/concurrent/CompletableFuture.html) whenComplete(BiConsumer<? super T,? super Throwable> action)
返回与此阶段相同的结果或异常的新的CompletionStage,当此阶段完成时,使用结果(或 null如果没有))和此阶段的异常(或 null如果没有))执行给定的操作。
CompletableFuture[T](http://www.matools.com/file/manual/jdk_api_1.8_google/java/util/concurrent/CompletableFuture.html) whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
返回一个与此阶段相同结果或异常的新CompletionStage,当此阶段完成时,执行给定操作将使用此阶段的默认异步执行工具执行给定操作,结果(或 null如果没有))和异常(或 null如果没有)这个阶段作为参数。
CompletableFuture[T](http://www.matools.com/file/manual/jdk_api_1.8_google/java/util/concurrent/CompletableFuture.html) whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
返回与此阶段相同的结果或异常的新的CompletionStage,当此阶段完成时,使用提供的执行者执行给定的操作,如果没有,则使用结果(或 null如果没有))和异常(或 null如果没有))作为论据。


action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常。所以不会对结果产生任何的作用。
运行后处理结果类:

CompletableFuture handle(BiFunction<? super T,Throwable,? extends U> fn)
返回一个新的CompletionStage,当此阶段正常或异常完成时,将使用此阶段的结果和异常作为所提供函数的参数执行。
CompletableFuture handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
返回一个新的CompletionStage,当该阶段完成正常或异常时,将使用此阶段的默认异步执行工具执行,此阶段的结果和异常作为提供函数的参数。
CompletableFuture handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
返回一个新的CompletionStage,当此阶段完成正常或异常时,将使用提供的执行程序执行此阶段的结果和异常作为提供的函数的参数。


运行完成时,对结果的处理。这里的完成时有两种情况,一种是正常执行,返回值。另外一种是遇到异常抛出造成程序的中断。

代码演示

  1. import java.util.concurrent.CompletableFuture;
  2. import java.util.concurrent.TimeUnit;
  3. public class Test03 {
  4. public static void main(String[] args) throws Exception {
  5. long l = System.currentTimeMillis();
  6. // 暂存数据
  7. // 任务 1:调用推荐接口获取数据
  8. CompletableFuture<String> recommendTask =
  9. CompletableFuture.supplyAsync(() -> {
  10. System.out.println("recommendTask开始工作: 获取推荐接口数据...");
  11. sleepSeconds(5);//睡眠五秒
  12. return "[recommendTask 板块数据]";
  13. });
  14. // 任务 2:调用搜索接口获取数据
  15. CompletableFuture<String> searchTask =
  16. CompletableFuture.supplyAsync(() -> {
  17. System.out.println("searchTask开始工作: 调用搜索接口获取数据...");
  18. sleepSeconds(3); ////睡眠五秒
  19. return " [searchTask 板块数据] ";
  20. });
  21. // 任务 3:任务 1 和任务 2 完成后执行,聚合结果
  22. CompletableFuture<String> polymerizationTask =
  23. recommendTask.thenCombine(searchTask, (t1Result, t2Result) -> {
  24. System.out.println("polymerizationTask开始工作" + t1Result + " 与 " + t2Result + "实现去重逻辑处理");
  25. return "[recommendTask 和 searchTask 板块数据聚合结果]";
  26. });
  27. // 等待任务 3 执行结果
  28. String result = polymerizationTask.get(6, TimeUnit.SECONDS);
  29. System.out.println("主线程获取polymerizationTask的结果:" + result);
  30. System.out.println("执行任务需要的毫秒值: " + (System.currentTimeMillis() - l) + "毫秒");
  31. }
  32. // 睡眠工具类
  33. static void sleepSeconds(int timeout) {
  34. try {
  35. TimeUnit.SECONDS.sleep(timeout);
  36. } catch (InterruptedException e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. }

控制台打印结果
可以看到 recommendTask 和 searchTask 异步去执行, 然后polymerizationTask等待recommendTask 和searchTask 执行完之后汇总他们的结果,

  1. recommendTask开始工作: 获取推荐接口数据...
  2. searchTask开始工作: 调用搜索接口获取数据...
  3. polymerizationTask开始工作[recommendTask 板块数据] [searchTask 板块数据] 实现去重逻辑处理
  4. 主线程获取polymerizationTask的结果:[recommendTask searchTask 板块数据聚合结果]
  5. 执行任务需要的毫秒值: 5058毫秒