CompletableFuture可以指定异步处理流程:

runAsync&supplyAsync

创建CompletableFuture对象可以通过CompletableFuture类的静态方法创建一个CompletableFuture对象。常用的两个方法:

  • CompletableFuture.runAsync():创建一个CompletableFuture对象,该对象会在一个新线程中执行指定的任务,并且不返回结果。
  • CompletableFuture.supplyAsync():创建一个CompletableFuture对象,该对象会在一个新线程中执行指定的任务,并且返回一个结果。

获取任务结果方法

  1. // 如果完成则返回结果,否则就抛出具体的异常
  2. public T get() throws InterruptedException, ExecutionException
  3. // 最大时间等待返回结果,否则就抛出具体异常
  4. public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
  5. // 完成时返回结果值,否则抛出unchecked异常。为了更好地符合通用函数形式的使用,如果完成此 CompletableFuture所涉及的计算引发异常,则此方法将引发unchecked异常并将底层异常作为其原因
  6. public T join()
  7. // 如果完成则返回结果值(或抛出任何遇到的异常),否则返回给定的 valueIfAbsent。
  8. public T getNow(T valueIfAbsent)
  9. // 如果任务没有完成,返回的值设置为给定值
  10. public boolean complete(T value)
  11. // 如果任务没有完成,就抛出给定异常
  12. public boolean completeExceptionally(Throwable ex)

thenAccept&thenAcceptAsync

thenAccept&thenAcceptAsync表示某个任务执行完成后执行的动作,即回调方法。会将该任务的执行结果即方法返回值作为入参传递到回调方法中,无返回值

区别在于,使用thenAccep方法时子任务与父任务使用的是同一个线程,而thenAccepAsync在子任务中可能是另起一个线程执行任务,并且thenAccepAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
  3. System.out.println(Thread.currentThread() + " cf1 do something....");
  4. return 1;
  5. });
  6. CompletableFuture<Void> cf2 = cf1.thenAccept((result) -> {
  7. System.out.println(Thread.currentThread() + " cf2 do something....");
  8. });
  9. //等待任务1执行完成
  10. System.out.println("cf1结果->" + cf1.get());
  11. //等待任务2执行完成
  12. System.out.println("cf2结果->" + cf2.get());
  13. //------------------------------------------------
  14. CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(() -> {
  15. System.out.println(Thread.currentThread() + " cf3 do something....");
  16. return 1;
  17. });
  18. CompletableFuture<Void> cf4 = cf3.thenAcceptAsync((result) -> {
  19. System.out.println(Thread.currentThread() + " cf4 do something....");
  20. });
  21. //等待任务1执行完成
  22. System.out.println("cf3结果->" + cf3.get());
  23. //等待任务2执行完成
  24. System.out.println("cf4结果->" + cf4.get());
  25. }

thenApply&thenApplyAsync

表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,带有返回值

区别在于,使用thenApply方法时子任务与父任务使用的是同一个线程,而thenApplyAsync在子任务中是另起一个线程执行任务,并且thenApplyAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
  3. System.out.println(Thread.currentThread() + " cf1 do something....");
  4. return 1;
  5. });
  6. CompletableFuture<Integer> cf2 = cf1.thenApplyAsync((result) -> {
  7. System.out.println(Thread.currentThread() + " cf2 do something....");
  8. result += 2;
  9. return result;
  10. });
  11. //等待任务1执行完成
  12. System.out.println("cf1结果->" + cf1.get());
  13. //等待任务2执行完成
  14. System.out.println("cf2结果->" + cf2.get());
  15. //------------------------------------------------
  16. CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(() -> {
  17. System.out.println(Thread.currentThread() + " cf3 do something....");
  18. return 1;
  19. });
  20. CompletableFuture<Integer> cf4 = cf3.thenApply((result) -> {
  21. System.out.println(Thread.currentThread() + " cf4 do something....");
  22. result += 2;
  23. return result;
  24. });
  25. //等待任务1执行完成
  26. System.out.println("cf3结果->" + cf3.get());
  27. //等待任务2执行完成
  28. System.out.println("cf4结果->" + cf4.get());
  29. }

thenRun&thenRunAsync

该方法同 thenAccept 方法类似。区别thenRun 不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenAccept 。

不同的是上个任务处理完成后,并不会把计算的结果传给 thenRun 方法。只是处理玩任务后,执行 thenAccept 的后续操作。

区别在于,使用thenRun方法时子任务与父任务使用的是同一个线程,而thenRunAsync在子任务中可能是另起一个线程执行任务,并且thenRunAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {
  3. return new Random().nextInt(10);
  4. }).thenAccept(r -> { //区别在这
  5. System.out.println("thenRun r:" + r);
  6. });
  7. future1.get();
  8. //thenAccept VS thenRun
  9. CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> {
  10. return new Random().nextInt(10);
  11. }).thenRun(() -> {
  12. System.out.println("thenRun ...");
  13. });
  14. future2.get();
  15. }

whenComplete&whenCompleteAsync

whenComplete是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。

whenCompleteAsync和whenComplete区别也是whenCompleteAsync可能会另起一个线程执行任务,并且thenRunAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
  3. System.out.println(Thread.currentThread() + " cf1 do something....");
  4. int a = 1/0;
  5. return 1;
  6. });
  7. CompletableFuture<Integer> cf2 = cf1.whenComplete((result, e) -> {
  8. System.out.println("上个任务结果:" + result);
  9. System.out.println("上个任务抛出异常:" + e);
  10. System.out.println(Thread.currentThread() + " cf2 do something....");
  11. });
  12. //等待任务1执行完成
  13. //System.out.println("cf1结果->" + cf1.get());
  14. //等待任务2执行完成
  15. System.out.println("cf2结果->" + cf2.get());
  16. }

handle&handleAsync

跟whenComplete基本一致,区别在于handle的回调方法有返回值

exceptional

exceptionally()方法来处理任务中发生的异常。该方法会在任务发生异常时,执行一个指定的函数并附带默认值。

  1. public static void main(String[] args) {
  2. CompletableFuture.supplyAsync(() -> {
  3. System.out.println("测试【thenAccept&exceptionally】创建方式!!!");
  4. //int i = 1/0;
  5. return "有返回值";
  6. }).thenAccept(x -> {
  7. System.out.println("x:" + x); //正常执行完毕进入此函数
  8. }).exceptionally(e -> {
  9. System.out.println("e:" + e); //正常异常进入此函数
  10. return null;
  11. });
  12. }

CompletableFuture的组合

在实际开发中,我们经常需要将多个CompletableFuture对象组合在一起,以便于并行执行多个任务并等待所有任务完成后处理结果。在这种情况下,我们可以使用CompletableFuture的组合方法

thenCombine&thenCompose

  • thenCombine():将两个CompletableFuture对象的结果合并为一个结果。
  1. public static void main(String[] args) {
  2. CompletableFuture<List<String>> painting = CompletableFuture.supplyAsync(() -> {
  3. // 第一个任务获取美术课需要带的东西,返回一个list
  4. List<String> stuff = new ArrayList<>();
  5. stuff.add("画笔");
  6. stuff.add("颜料");
  7. return stuff;
  8. });
  9. CompletableFuture<List<String>> handWork = CompletableFuture.supplyAsync(() -> {
  10. // 第二个任务获取劳技课需要带的东西,返回一个list
  11. List<String> stuff = new ArrayList<>();
  12. stuff.add("剪刀");
  13. stuff.add("折纸");
  14. return stuff;
  15. });
  16. CompletableFuture<List<String>> total = painting
  17. // 传入handWork列表,然后得到两个CompletableFuture的参数Stuff1和2
  18. .thenCombine(handWork, (stuff1, stuff2) -> {
  19. // 合并成新的list
  20. List<String> totalStuff = Stream.of(stuff1, stuff1).flatMap(Collection::stream).collect(Collectors.toList());
  21. return totalStuff;
  22. });
  23. System.out.println(JSON.toJSONString(total.join()));
  24. }
  • thenCompose():将一个CompletableFuture对象的结果作为另一个CompletableFuture对象的输入。
  1. public static void main(String[] args) {
  2. CompletableFuture<List<String>> total = CompletableFuture.supplyAsync(() -> {
  3. // 第一个任务获取美术课需要带的东西,返回一个list
  4. List<String> stuff = new ArrayList<>();
  5. stuff.add("画笔");
  6. stuff.add("颜料");
  7. return stuff;
  8. }).thenCompose(list -> {
  9. // 向第二个任务传递参数list(上一个任务美术课所需的东西list)
  10. CompletableFuture<List<String>> insideFuture = CompletableFuture.supplyAsync(() -> {
  11. List<String> stuff = new ArrayList<>();
  12. // 第二个任务获取劳技课所需的工具
  13. stuff.add("剪刀");
  14. stuff.add("折纸");
  15. // 合并两个list,获取课程所需所有工具
  16. List<String> allStuff = Stream.of(list, stuff).flatMap(Collection::stream).collect(Collectors.toList());
  17. return allStuff;
  18. });
  19. return insideFuture;
  20. });
  21. System.out.println(JSON.toJSONString(total.join()));
  22. }

anyOf&allOf

anyOf() 任意一个执行完成,就可以进行下一步动作

allOf() 全部完成所有任务,才可以进行下一步任务

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. // 异步任务,无返回值,采用内部的forkjoin线程池
  3. CompletableFuture c1 = CompletableFuture.runAsync(() -> {
  4. System.out.println("打开开关,开始制作,就不用管了");
  5. });
  6. // 异步任务,无返回值,使用自定义的线程池
  7. CompletableFuture c11 = CompletableFuture.runAsync(() -> {
  8. System.out.println("打开开关,开始制作,就不用管了【自定义线程池版】");
  9. }, newSingleThreadExecutor());
  10. // 异步任务,有返回值,使用内部默认的线程池
  11. CompletableFuture<String> c2 = CompletableFuture.supplyAsync(() -> {
  12. System.out.println("清洗米饭");
  13. int i = 1 / 0;
  14. return "干净的米饭";
  15. });
  16. //anyOf:任意一个执行完成,就可以进行下一步动作。即使一个报异常也不影响继续执行
  17. System.out.println("start");
  18. CompletableFuture<Object> anyOf = CompletableFuture.anyOf(c1, c2);
  19. anyOf.thenAccept(r -> {
  20. System.out.println("r:" + r);
  21. }).exceptionally(e -> {
  22. //e:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
  23. System.out.println("e:" + e);
  24. return null;
  25. });
  26. //allOf:全部完成所有任务,并且不能有异常,才可以进行下一步任务
  27. CompletableFuture<Void> allOf = CompletableFuture.allOf(c1, c2);
  28. allOf.thenAccept(r -> {
  29. System.out.println("r:" + r);
  30. }).exceptionally(e -> {
  31. System.out.println("e:" + e); //e:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
  32. return null;
  33. });
  34. System.out.println("end");
  35. }

小结

常用语法

  • thenAccept()处理正常结果;
  • exceptional()处理异常结果;
  • thenApplyAsync()用于串行化另一个CompletableFuture
  • anyOf()allOf()用于并行化多个CompletableFuture

练习代码

boot-asyn.zip