一、前言

本文参考:https://blog.csdn.net/qq_31865983/article/details/106137777
CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
多线程的基本编程参考:
7-多线程

二、创建异步任务

1、Future

通常的线程池接口类ExecutorService,其中execute方法的返回值是void,即无法获取异步任务的执行状态,3个重载的submit方法的返回值是Future,可以据此获取任务执行的状态和结果,示例如下:

  1. public class callableTest {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. ExecutorService executorService = Executors.newFixedThreadPool(3);
  4. Future<String> future = executorService.submit(new Callable<String>() {
  5. @Override
  6. public String call() throws Exception {
  7. Thread.sleep(2000);
  8. int a = 1 / 0;
  9. return Thread.currentThread().getName();
  10. }
  11. });
  12. // 每次执行一个线程
  13. System.out.println("main thread start,time->" + System.currentTimeMillis());
  14. //等待子任务执行完成,如果已完成则直接返回结果
  15. //如果执行任务异常,则get方法会把之前捕获的异常重新抛出
  16. System.out.println(future.get());
  17. System.out.println("main thread end,time->" + System.currentTimeMillis());
  18. executorService.shutdown();
  19. }
  20. }

执行结果如下(get方法抛出异常导致主线程异常终止),若无异常则正常输出
image.png
子线程是异步执行的,主线程休眠等待子线程执行完成,子线程执行完成后唤醒主线程,主线程获取任务执行结果后退出。

2、supplyAsync/runAsync

supplyAsync表示创建带返回值的异步任务的,相当于ExecutorService submit(Callable task) 方法,runAsync表示创建无返回值的异步任务,相当于ExecutorService submit(Runnable task)方法,这两方法的效果跟submit是一样的,测试用例如下:

(1)runAsync

  1. public class Test01 {
  2. public static ExecutorService executorService = Executors.newFixedThreadPool(10);
  3. public static void main(String[] args) throws ExecutionException, InterruptedException {
  4. System.out.println("main....start.....");
  5. CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
  6. System.out.println("当前线程 :" + Thread.currentThread().getId());
  7. int i = 10 / 2;
  8. System.out.println("运行结果:" + i);
  9. }, executorService);
  10. executorService.shutdown();
  11. }
  12. }
  13. main....start.....
  14. 当前线程 13
  15. 运行结果:5

(2)supplyAsync

  1. CompletableFuture<Object> supplyAsyncFuture = CompletableFuture.supplyAsync(() -> {
  2. System.out.println("当前线程 :" + Thread.currentThread().getId());
  3. int i = 10 / 2;
  4. System.out.println("运行结果:" + i);
  5. return i;
  6. }, executorService);
  7. System.out.println(supplyAsyncFuture.get());
  8. main....start.....
  9. 当前线程 13
  10. 运行结果:5
  11. 5

(3)默认线程池

2-ForkJoinPool
上述两方法各有一个重载版本,可以指定执行异步任务的Executor实现,如果不指定,默认使用ForkJoinPool.commonPool(),如果机器是单核的,则默认使用ThreadPerTaskExecutor,该类是一个内部类,每次执行execute都会创建一个新线程。测试用例如下:
image.png

  1. CompletableFuture<Object> supplyAsyncFuture = CompletableFuture.supplyAsync(() -> {
  2. System.out.println("当前线程 :" + Thread.currentThread().getId());
  3. int i = 10 / 2;
  4. System.out.println("运行结果:" + i);
  5. return i;
  6. });
  7. System.out.println(supplyAsyncFuture.get());

三、异步回调函数

1、thenApply / thenApplyAsync

thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,当使用 ExecutorService 线程池的时候,这两个方法没有什么区别,当使用 ForkJoinPool 线程池的使用,thenApply的方法使用的是和上一个方法相同的线程,而 thenApplyAsync 使用的是新的线程。

(1)thenApply

ExecutorService
  1. public class Test03 {
  2. public static ExecutorService executorService = Executors.newFixedThreadPool(10);
  3. public static void main(String[] args) throws ExecutionException, InterruptedException {
  4. System.out.println("main....start.....");
  5. CompletableFuture<String> feature = CompletableFuture.supplyAsync(() -> {
  6. System.out.println("a、当前线程 :" + Thread.currentThread().getId());
  7. int i = 10 / 2;
  8. System.out.println("运行结果:" + i);
  9. return i;
  10. }, executorService).thenApply(res -> {
  11. System.out.println("b、当前线程 :" + Thread.currentThread().getId());
  12. System.out.println("任务四启动了。。。" + res);
  13. return "hello" + res;
  14. });
  15. System.out.println("main....end...." + feature.get());
  16. }
  17. }
  18. main....start.....
  19. a、当前线程 13
  20. 运行结果:5
  21. b、当前线程 14
  22. 任务四启动了。。。5
  23. main....end....hello5

ForkJoinPool
  1. public class Test03 {
  2. private static final ForkJoinPool pool = new ForkJoinPool();
  3. public static void main(String[] args) throws ExecutionException, InterruptedException {
  4. System.out.println("main....start.....");
  5. CompletableFuture<String> feature = CompletableFuture.supplyAsync(() -> {
  6. System.out.println("a、当前线程 :" + Thread.currentThread().getId());
  7. int i = 10 / 2;
  8. System.out.println("运行结果:" + i);
  9. return i;
  10. }, executorService).thenApply(res -> {
  11. System.out.println("b、当前线程 :" + Thread.currentThread().getId());
  12. System.out.println("任务四启动了。。。" + res);
  13. return "hello" + res;
  14. });
  15. System.out.println("main....end...." + feature.get());
  16. }
  17. }
  18. main....start.....
  19. a、当前线程 13
  20. 运行结果:5
  21. b、当前线程 13
  22. 任务四启动了。。。5
  23. main....end....hello5

(2)thenApplyAsync

ExecutorService
  1. public class Test03 {
  2. public static ExecutorService executorService = Executors.newFixedThreadPool(10);
  3. public static void main(String[] args) throws ExecutionException, InterruptedException {
  4. System.out.println("main....start.....");
  5. CompletableFuture<String> feature = CompletableFuture.supplyAsync(() -> {
  6. System.out.println("a、当前线程 :" + Thread.currentThread().getId());
  7. int i = 10 / 2;
  8. System.out.println("运行结果:" + i);
  9. return i;
  10. }, executorService).thenApplyAsync(res -> {
  11. System.out.println("b、当前线程 :" + Thread.currentThread().getId());
  12. System.out.println("任务四启动了。。。" + res);
  13. return "hello" + res;
  14. }, executorService);
  15. System.out.println("main....end...." + feature.get());
  16. }
  17. }
  18. main....start.....
  19. a、当前线程 13
  20. 运行结果:5
  21. b、当前线程 14
  22. 任务四启动了。。。5
  23. main....end....hello5

ForkJoinPool
  1. public class Test03 {
  2. private static final ForkJoinPool pool = new ForkJoinPool();
  3. public static void main(String[] args) throws ExecutionException, InterruptedException {
  4. System.out.println("main....start.....");
  5. CompletableFuture<String> feature = CompletableFuture.supplyAsync(() -> {
  6. System.out.println("a、当前线程 :" + Thread.currentThread().getId());
  7. int i = 10 / 2;
  8. System.out.println("运行结果:" + i);
  9. return i;
  10. }, executorService).thenApplyAsync(res -> {
  11. System.out.println("b、当前线程 :" + Thread.currentThread().getId());
  12. System.out.println("任务四启动了。。。" + res);
  13. return "hello" + res;
  14. }, executorService);
  15. System.out.println("main....end...." + feature.get());
  16. }
  17. }
  18. main....start.....
  19. a、当前线程 13
  20. 运行结果:5
  21. b、当前线程 14
  22. 任务四启动了。。。5
  23. main....end....hello5

2、thenAccept/thenApplyAsync

和 �thenApply / thenApplyAsync 的用法一致,只是,没有返回值,下面只举一个例子。

  1. public class Test03 {
  2. public static ExecutorService executorService = Executors.newFixedThreadPool(10);
  3. public static void main(String[] args) throws ExecutionException, InterruptedException {
  4. System.out.println("main....start.....");
  5. CompletableFuture<Void> feature = CompletableFuture.supplyAsync(() -> {
  6. System.out.println("当前线程 :" + Thread.currentThread().getId());
  7. int i = 10 / 2;
  8. System.out.println("运行结果:" + i);
  9. return i;
  10. }, executorService).thenAcceptAsync(res -> {
  11. System.out.println("任务三执行。。。。" + res);
  12. }, executorService);
  13. System.out.println("main....end...." + feature.get());
  14. }
  15. }
  16. main....start.....
  17. 当前线程 13
  18. 运行结果:5
  19. 任务三执行。。。。5
  20. main....end....null

3、exceptionally

exceptionally方法指定某个任务执行异常时执行的回调方法,会将抛出异常作为参数传递到回调方法中,如果该任务正常执行则会exceptionally方法返回的CompletionStage的result就是该任务正常执行的结果,测试用例如下:

  1. public class Test03 {
  2. private static final ForkJoinPool pool = new ForkJoinPool();
  3. public static void main(String[] args) throws ExecutionException, InterruptedException {
  4. System.out.println("main....start.....");
  5. CompletableFuture<Integer> supplyAsyncFuture = CompletableFuture.supplyAsync(() -> {
  6. System.out.println("当前线程 :" + Thread.currentThread().getId());
  7. int i = 10 / 0;
  8. System.out.println("运行结果:" + i);
  9. return i;
  10. }).exceptionally(throwable -> {
  11. throwable.printStackTrace();
  12. // 抛出异常
  13. throw new RuntimeException(throwable);
  14. // 异常时候的处理
  15. // return Integer.parseInt("1");
  16. });
  17. System.out.println(supplyAsyncFuture.get());
  18. }
  19. }
  20. main....start.....
  21. 当前线程 13
  22. java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
  23. at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
  24. at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
  25. at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
  26. at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1692)
  27. at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
  28. at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
  29. at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
  30. at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
  31. at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
  32. Caused by: java.lang.ArithmeticException: / by zero
  33. at demo08_completableFuture.Test01.lambda$main$0(Test01.java:26)
  34. at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
  35. ... 6 more
  36. 。。。。。。。。

4、whenComplete

whenComplete是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。测试用例如下:

  1. public class Test03 {
  2. private static final ForkJoinPool pool = new ForkJoinPool();
  3. public static void main(String[] args) throws ExecutionException, InterruptedException {
  4. System.out.println("main....start.....");
  5. /**
  6. * 方法完成后的感知
  7. */
  8. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  9. System.out.println("当前线程 :" + Thread.currentThread().getId());
  10. int i = 10 / 0;
  11. System.out.println("运行结果:" + i);
  12. return i;
  13. }, executorService).whenComplete((res, exception) -> {
  14. // 虽然能得到异常信息,但是没法修改结果
  15. System.out.println("异步任务成功完成了....结果是:" + res + "异常是:" + exception);
  16. }).exceptionally(throwable -> {
  17. // 感知异常,同时返回默认值
  18. return 10;
  19. });
  20. Integer integer = future.get();
  21. System.out.println("main....end...." + integer);
  22. }
  23. }
  24. 结果:
  25. main....start.....
  26. 当前线程 13
  27. 异步任务成功完成了....结果是:null异常是:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
  28. main....end....10

5、handle

跟whenComplete基本一致,区别在于handle的回调方法有返回值,且handle方法返回的CompletableFuture的result是回调方法的执行结果或者回调方法执行期间抛出的异常,与原始CompletableFuture的result无关了。测试用例如下:

  1. public class Test03 {
  2. private static final ForkJoinPool pool = new ForkJoinPool();
  3. public static void main(String[] args) throws ExecutionException, InterruptedException {
  4. System.out.println("main....start.....");
  5. ExecutorService executorService = Executors.newFixedThreadPool(3);
  6. CompletableFuture<Integer> feature = CompletableFuture.supplyAsync(() -> {
  7. System.out.println("当前线程 :" + Thread.currentThread().getId());
  8. int i = 10 / 2;
  9. System.out.println("运行结果:" + i);
  10. return i;
  11. }, executorService).handle((a, b) -> {
  12. if (b != null) {
  13. // 删除异常
  14. b.printStackTrace();
  15. }
  16. return a;
  17. });
  18. System.out.println(feature.get());
  19. }
  20. }

四、组合处理

1、thenCombine/thenAcceptBoth/runAfterBoth

这三个方法都是将两个CompletableFuture组合起来,只有这两个都正常执行完了才会执行某个任务,区别在于,thenCombine会将两个任务的执行结果作为方法入参传递到指定方法中,且该方法有返回值;thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值;runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。测试用例如下:

  1. public class Test07 {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. // 创建异步执行任务:
  4. CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
  5. System.out.println(Thread.currentThread() + "start cf time->" + System.currentTimeMillis());
  6. try {
  7. Thread.sleep(20000);
  8. } catch (InterruptedException e) {
  9. }
  10. System.out.println(Thread.currentThread() + "end cf time->" + System.currentTimeMillis());
  11. return 1.2;
  12. });
  13. CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
  14. System.out.println(Thread.currentThread() + " start job2,time->" + System.currentTimeMillis());
  15. try {
  16. Thread.sleep(15000);
  17. } catch (InterruptedException e) {
  18. }
  19. System.out.println(Thread.currentThread() + " exit job2,time->" + System.currentTimeMillis());
  20. return 3.2;
  21. });
  22. //cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,且有返回值
  23. CompletableFuture<Double> cf3 = cf.thenCombine(cf2, (a, b) -> {
  24. System.out.println(Thread.currentThread() + " start job3,time->" + System.currentTimeMillis());
  25. System.out.println("job3 param a->" + a + ",b->" + b);
  26. try {
  27. Thread.sleep(20000);
  28. } catch (InterruptedException e) {
  29. }
  30. System.out.println(Thread.currentThread() + " exit job3,time->" + System.currentTimeMillis());
  31. return a + b;
  32. });
  33. //cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,无返回值
  34. CompletableFuture cf4 = cf.thenAcceptBoth(cf2, (a, b) -> {
  35. System.out.println("job4 param a->" + a + ",b->" + b);
  36. try {
  37. Thread.sleep(15000);
  38. } catch (InterruptedException e) {
  39. }
  40. });
  41. //cf4和cf3都执行完成后,执行cf5,无入参,无返回值
  42. CompletableFuture cf5 = cf4.runAfterBoth(cf3, () -> {
  43. try {
  44. Thread.sleep(10000);
  45. } catch (InterruptedException e) {
  46. }
  47. System.out.println("cf5 do something");
  48. });
  49. //等待子任务执行完成
  50. System.out.println("cf run result->" + cf.get());
  51. System.out.println("cf3 run result->" + cf3.get());
  52. System.out.println("cf5 run result->" + cf5.get());
  53. System.out.println("main thread exit,time->" + System.currentTimeMillis());
  54. }
  55. }
  56. 执行结果:
  57. Thread[ForkJoinPool.commonPool-worker-5,5,main] start job2,time->1639575226010
  58. Thread[ForkJoinPool.commonPool-worker-19,5,main]start cf time->1639575226010
  59. Thread[ForkJoinPool.commonPool-worker-5,5,main] exit job2,time->1639575241012
  60. Thread[ForkJoinPool.commonPool-worker-19,5,main]end cf time->1639575246012
  61. Thread[main,5,main] start job3,time->1639575246013
  62. job4 param a->1.2,b->3.2
  63. job3 param a->1.2,b->3.2
  64. Thread[main,5,main] exit job3,time->1639575266020
  65. cf5 do something
  66. cf run result->1.2
  67. cf3 run result->4.4
  68. cf5 run result->null
  69. main thread exit,time->1639575276022

2、applyToEither / acceptEither / runAfterEither

这三个方法都是将两个CompletableFuture组合起来,只要其中一个执行完了就会执行某个任务,其区别在于applyToEither会将已经执行完成的任务的执行结果作为方法入参,并有返回值;acceptEither同样将已经执行完成的任务的执行结果作为方法入参,但是没有返回值;runAfterEither没有方法入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。测试用例如下:

  1. public class Test08 {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. // 创建异步执行任务:
  4. CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
  5. try {
  6. Thread.sleep(2000);
  7. } catch (InterruptedException e) {
  8. }
  9. return 1.2;
  10. });
  11. CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
  12. try {
  13. Thread.sleep(1500);
  14. } catch (InterruptedException e) {
  15. }
  16. return 3.2;
  17. });
  18. //cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,且有返回值
  19. CompletableFuture<Double> cf3 = cf.applyToEither(cf2, (result) -> {
  20. System.out.println("job3 param result->" + result);
  21. return result;
  22. });
  23. //cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,无返回值
  24. CompletableFuture cf4 = cf.acceptEither(cf2, (result) -> {
  25. System.out.println("job4 param result->"+result);
  26. });
  27. //cf4和cf3都执行完成后,执行cf5,无入参,无返回值
  28. CompletableFuture cf5 = cf4.runAfterEither(cf3, () -> {
  29. System.out.println("cf5 do something");
  30. });
  31. //等待子任务执行完成
  32. System.out.println("cf run result->" + cf.get());
  33. System.out.println("cf3 run result->" + cf3.get());
  34. System.out.println("cf5 run result->" + cf5.get());
  35. }
  36. }
  37. // 结果
  38. job4 param result->3.2
  39. cf5 do something
  40. job3 param result->3.2
  41. cf run result->1.2
  42. cf3 run result->3.2
  43. cf5 run result->null

3、thenCompose

thenCompose方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会返回一个新的CompletableFuture实例,如果该CompletableFuture实例的result不为null,则返回一个基于该result的新的CompletableFuture实例;如果该CompletableFuture实例为null,则,然后执行这个新任务,测试用例如下:

  1. public class Test09 {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. // 创建异步执行任务:
  4. CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
  5. try {
  6. Thread.sleep(2000);
  7. } catch (InterruptedException e) {
  8. }
  9. return 1.2;
  10. });
  11. CompletableFuture<String> cf2 = cf.thenCompose((param) -> {
  12. System.out.println("job3 .....param:" + param);
  13. try {
  14. Thread.sleep(2000);
  15. } catch (InterruptedException e) {
  16. }
  17. return CompletableFuture.supplyAsync(() -> {
  18. try {
  19. Thread.sleep(2000);
  20. } catch (InterruptedException e) {
  21. }
  22. return "job3 test";
  23. });
  24. });
  25. //等待子任务执行完成
  26. System.out.println("cf run result->" + cf.get());
  27. System.out.println("main thread start cf2.get(),time->" + System.currentTimeMillis());
  28. System.out.println("cf2 run result->" + cf2.get());
  29. }
  30. }
  31. // 结果
  32. cf run result->1.2
  33. job3 .....param:1.2
  34. main thread start cf2.get(),time->1640015940253
  35. cf2 run result->job3 test

4、allOf

allOf返回的CompletableFuture是多个任务都执行完成后才会执行,只要有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。

  1. public class Test10 {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
  4. try {
  5. Thread.sleep(2000);
  6. } catch (InterruptedException e) {
  7. }
  8. return 1.2;
  9. });
  10. CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
  11. try {
  12. Thread.sleep(1500);
  13. } catch (InterruptedException e) {
  14. }
  15. return 2.2;
  16. });
  17. CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(() -> {
  18. int a = 1/0;
  19. try {
  20. Thread.sleep(1500);
  21. } catch (InterruptedException e) {
  22. }
  23. return 3.2;
  24. });
  25. // 当三个线程都执行成功之后才执行 ,a 一般都是 null,b是输出异常
  26. CompletableFuture<Void> cf4 = CompletableFuture.allOf(cf, cf2, cf3).whenComplete((a, b) -> {
  27. System.out.println("cf4 a-->" + a);
  28. System.out.println("cf4 b-->" + b);
  29. });
  30. //等待子任务执行完成
  31. System.out.println("cf run result->" + cf.get());
  32. System.out.println("cf2 run result->" + cf2.get());
  33. System.out.println("cf3 run result->" + cf3.get());
  34. System.out.println("cf4 run result->" + cf4.get());
  35. }
  36. }
  37. // 结果
  38. cf4 a-->null
  39. cf4 b-->java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
  40. cf run result->1.2
  41. cf2 run result->2.2
  42. Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
  43. at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
  44. at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
  45. at demo08_completableFuture.Test10.main(Test10.java:45)
  46. Caused by: java.lang.ArithmeticException: / by zero
  47. at demo08_completableFuture.Test10.lambda$main$2(Test10.java:28)
  48. ...........

5、anyof

anyOf返回的CompletableFuture是多个任务只要其中一个执行完成就会执行,其get返回的是已经执行完成的任务的执行结果,如果该任务执行异常,则抛出异常。

  1. public class Test11 {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
  4. try {
  5. Thread.sleep(2000);
  6. } catch (InterruptedException e) {
  7. }
  8. return 1.2;
  9. });
  10. CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
  11. try {
  12. Thread.sleep(1500);
  13. } catch (InterruptedException e) {
  14. }
  15. return 2.2;
  16. });
  17. CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(() -> {
  18. try {
  19. Thread.sleep(1500);
  20. } catch (InterruptedException e) {
  21. }
  22. return 3.2;
  23. });
  24. // 当三个线程都执行成功之后才执行 ,a 一般都是 null,b是输出异常
  25. CompletableFuture<Object> cf4 = CompletableFuture.anyOf(cf, cf2, cf3).whenComplete((a, b) -> {
  26. System.out.println("cf4 a-->" + a);
  27. System.out.println("cf4 b-->" + b);
  28. });
  29. //等待子任务执行完成
  30. System.out.println("cf run result->" + cf.get());
  31. System.out.println("cf2 run result->" + cf2.get());
  32. System.out.println("cf3 run result->" + cf3.get());
  33. System.out.println("cf4 run result->" + cf4.get());
  34. }
  35. }
  36. 结果
  37. cf4 a-->3.2
  38. cf4 b-->null
  39. cf run result->1.2
  40. cf2 run result->2.2
  41. cf3 run result->3.2
  42. cf4 run result->3.2