本文将继续整理CompletableFuture的特性。
3.3 转换
我们可以通过CompletableFuture来异步获取一组数据,并对数据进行一些转换,类似RxJava、Scala的map、flatMap操作。

3.3.1 map

方法名 描述
thenApply(Function<? super T,? extends U> fn) 接受一个Function<? super T,? extends U>参数用来转换CompletableFuture
thenApplyAsync(Function<? super T,? extends U> fn) 接受一个Function<? super T,? extends U>参数用来转换CompletableFuture,使用ForkJoinPool
thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) 接受一个Function<? super T,? extends U>参数用来转换CompletableFuture,使用指定的线程池

thenApply的功能相当于将CompletableFuture转换成CompletableFuture

  1. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
  2. future = future.thenApply(new Function<String, String>() {
  3. @Override
  4. public String apply(String s) {
  5. return s + " World";
  6. }
  7. }).thenApply(new Function<String, String>() {
  8. @Override
  9. public String apply(String s) {
  10. return s.toUpperCase();
  11. }
  12. });
  13. try {
  14. System.out.println(future.get());
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. } catch (ExecutionException e) {
  18. e.printStackTrace();
  19. }

再用lambda表达式简化一下

  1. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
  2. .thenApply(s -> s + " World").thenApply(String::toUpperCase);
  3. try {
  4. System.out.println(future.get());
  5. } catch (InterruptedException e) {
  6. e.printStackTrace();
  7. } catch (ExecutionException e) {
  8. e.printStackTrace();
  9. }

执行结果:

  1. HELLO WORLD

下面的例子,展示了数据流的类型经历了如下的转换:String -> Integer -> Double。

  1. CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> "10")
  2. .thenApply(Integer::parseInt)
  3. .thenApply(i->i*10.0);
  4. try {
  5. System.out.println(future.get());
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. } catch (ExecutionException e) {
  9. e.printStackTrace();
  10. }

执行结果:

  1. 100.0

3.3.2 flatMap

方法名 描述
thenCompose(Function<? super T, ? extends CompletionStage> fn) 在异步操作完成的时候对异步操作的结果进行一些操作,并且仍然返回CompletableFuture类型。
thenComposeAsync(Function<? super T, ? extends CompletionStage> fn) 在异步操作完成的时候对异步操作的结果进行一些操作,并且仍然返回CompletableFuture类型。使用ForkJoinPool。
thenComposeAsync(Function<? super T, ? extends CompletionStage> fn,Executor executor) 在异步操作完成的时候对异步操作的结果进行一些操作,并且仍然返回CompletableFuture类型。使用指定的线程池。

thenCompose可以用于组合多个CompletableFuture,将前一个结果作为下一个计算的参数,它们之间存在着先后顺序。

  1. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
  2. .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
  3. try {
  4. System.out.println(future.get());
  5. } catch (InterruptedException e) {
  6. e.printStackTrace();
  7. } catch (ExecutionException e) {
  8. e.printStackTrace();
  9. }

执行结果:

  1. Hello World

下面的例子展示了多次调用thenCompose()

  1. CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> "100")
  2. .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "100"))
  3. .thenCompose(s -> CompletableFuture.supplyAsync(() -> Double.parseDouble(s)));
  4. try {
  5. System.out.println(future.get());
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. } catch (ExecutionException e) {
  9. e.printStackTrace();
  10. }

执行结果:

  1. 100100.0

3.4 组合

方法名 描述
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) 当两个CompletableFuture都正常完成后,执行提供的fn,用它来组合另外一个CompletableFuture的结果。
thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) 当两个CompletableFuture都正常完成后,执行提供的fn,用它来组合另外一个CompletableFuture的结果。使用ForkJoinPool。
thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) 当两个CompletableFuture都正常完成后,执行提供的fn,用它来组合另外一个CompletableFuture的结果。使用指定的线程池。

现在有CompletableFuture、CompletableFuture和一个函数(T,U)->V,thenCompose就是将CompletableFuture和CompletableFuture变为CompletableFuture

  1. CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "100");
  2. CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 100);
  3. CompletableFuture<Double> future = future1.thenCombine(future2, (s, i) -> Double.parseDouble(s + i));
  4. try {
  5. System.out.println(future.get());
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. } catch (ExecutionException e) {
  9. e.printStackTrace();
  10. }

执行结果:

  1. 100100.0

使用thenCombine()之后future1、future2之间是并行执行的,最后再将结果汇总。这一点跟thenCompose()不同。

thenAcceptBoth跟thenCombine类似,但是返回CompletableFuture类型。

方法名 描述
thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) 当两个CompletableFuture都正常完成后,执行提供的action,用它来组合另外一个CompletableFuture的结果。
thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) 当两个CompletableFuture都正常完成后,执行提供的action,用它来组合另外一个CompletableFuture的结果。使用ForkJoinPool。
thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor) 当两个CompletableFuture都正常完成后,执行提供的action,用它来组合另外一个CompletableFuture的结果。使用指定的线程池。
  1. CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "100");
  2. CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 100);
  3. CompletableFuture<Void> future = future1.thenAcceptBoth(future2, (s, i) -> System.out.println(Double.parseDouble(s + i)));
  4. try {
  5. future.get();
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. } catch (ExecutionException e) {
  9. e.printStackTrace();
  10. }

执行结果:

  1. 100100.0

3.5 计算结果完成时的处理
当CompletableFuture完成计算结果后,我们可能需要对结果进行一些处理。

3.5.1 执行特定的Action

方法名 描述
whenComplete(BiConsumer<? super T,? super Throwable> action) 当CompletableFuture完成计算结果时对结果进行处理,或者当CompletableFuture产生异常的时候对异常进行处理。
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) 当CompletableFuture完成计算结果时对结果进行处理,或者当CompletableFuture产生异常的时候对异常进行处理。使用ForkJoinPool。
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) 当CompletableFuture完成计算结果时对结果进行处理,或者当CompletableFuture产生异常的时候对异常进行处理。使用指定的线程池。
  1. CompletableFuture.supplyAsync(() -> "Hello")
  2. .thenApply(s->s+" World")
  3. .thenApply(s->s+ "\nThis is CompletableFuture demo")
  4. .thenApply(String::toLowerCase)
  5. .whenComplete((result, throwable) -> System.out.println(result));

执行结果:

  1. hello world
  2. this is completablefuture demo

3.5.2 执行完Action可以做转换

方法名 描述
handle(BiFunction<? super T, Throwable, ? extends U> fn) 当CompletableFuture完成计算结果或者抛出异常的时候,执行提供的fn
handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) 当CompletableFuture完成计算结果或者抛出异常的时候,执行提供的fn,使用ForkJoinPool。
handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) 当CompletableFuture完成计算结果或者抛出异常的时候,执行提供的fn,使用指定的线程池。
  1. CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> "100")
  2. .thenApply(s->s+"100")
  3. .handle((s, t) -> s != null ? Double.parseDouble(s) : 0);
  4. try {
  5. System.out.println(future.get());
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. } catch (ExecutionException e) {
  9. e.printStackTrace();
  10. }

执行结果:

  1. 100100.0

在这里,handle()的参数是BiFunction,apply()方法返回R,相当于转换的操作。

  1. @FunctionalInterface
  2. public interface BiFunction<T, U, R> {
  3. /**
  4. * Applies this function to the given arguments.
  5. *
  6. * @param t the first function argument
  7. * @param u the second function argument
  8. * @return the function result
  9. */
  10. R apply(T t, U u);
  11. /**
  12. * Returns a composed function that first applies this function to
  13. * its input, and then applies the {@code after} function to the result.
  14. * If evaluation of either function throws an exception, it is relayed to
  15. * the caller of the composed function.
  16. *
  17. * @param <V> the type of output of the {@code after} function, and of the
  18. * composed function
  19. * @param after the function to apply after this function is applied
  20. * @return a composed function that first applies this function and then
  21. * applies the {@code after} function
  22. * @throws NullPointerException if after is null
  23. */
  24. default <V> BiFunction<T, U, V> andThen(Function<? super R, ? extends V> after) {
  25. Objects.requireNonNull(after);
  26. return (T t, U u) -> after.apply(apply(t, u));
  27. }
  28. }

而whenComplete()的参数是BiConsumer,accept()方法返回void。

  1. @FunctionalInterface
  2. public interface BiConsumer<T, U> {
  3. /**
  4. * Performs this operation on the given arguments.
  5. *
  6. * @param t the first input argument
  7. * @param u the second input argument
  8. */
  9. void accept(T t, U u);
  10. /**
  11. * Returns a composed {@code BiConsumer} that performs, in sequence, this
  12. * operation followed by the {@code after} operation. If performing either
  13. * operation throws an exception, it is relayed to the caller of the
  14. * composed operation. If performing this operation throws an exception,
  15. * the {@code after} operation will not be performed.
  16. *
  17. * @param after the operation to perform after this operation
  18. * @return a composed {@code BiConsumer} that performs in sequence this
  19. * operation followed by the {@code after} operation
  20. * @throws NullPointerException if {@code after} is null
  21. */
  22. default BiConsumer<T, U> andThen(BiConsumer<? super T, ? super U> after) {
  23. Objects.requireNonNull(after);
  24. return (l, r) -> {
  25. accept(l, r);
  26. after.accept(l, r);
  27. };
  28. }
  29. }

所以,handle()相当于whenComplete()+转换。

3.5.3 纯消费(执行Action)

方法名 描述
thenAccept(Consumer<? super T> action) 当CompletableFuture完成计算结果,只对结果执行Action,而不返回新的计算值
thenAcceptAsync(Consumer<? super T> action) 当CompletableFuture完成计算结果,只对结果执行Action,而不返回新的计算值,使用ForkJoinPool。
thenAcceptAsync(Consumer<? super T> action, Executor executor) 当CompletableFuture完成计算结果,只对结果执行Action,而不返回新的计算值

thenAccept()是只会对计算结果进行消费而不会返回任何结果的方法。

  1. CompletableFuture.supplyAsync(() -> "Hello")
  2. .thenApply(s->s+" World")
  3. .thenApply(s->s+ "\nThis is CompletableFuture demo")
  4. .thenApply(String::toLowerCase)
  5. .thenAccept(System.out::print);

执行结果:

  1. hello world
  2. this is completablefuture demo