创建异步对象方式

CompletableFuture提供了四个静态方法来创建一个异步操作

  1. public static CompletableFuture<Void> runAsync(Runnable runnable)
  2. public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
  3. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
  4. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都雷同。

  • runAsync()不支持返回值。
  • supplyAsync()可以支持返回值。

示例:

  1. //无返回值
  2. public static void runAsync() throws Exception {
  3. CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
  4. try {
  5. TimeUnit.SECONDS.sleep(1);
  6. } catch (InterruptedException e) {
  7. }
  8. System.out.println("run end ...");
  9. });
  10. System.out.println(future.get());
  11. }
  12. //有返回值
  13. public static void supplyAsync() throws Exception {
  14. CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
  15. try {
  16. TimeUnit.SECONDS.sleep(1);
  17. } catch (InterruptedException e) {
  18. }
  19. System.out.println("run end ...");
  20. return System.currentTimeMillis();
  21. });
  22. long time = future.get();
  23. System.out.println("time = "+time);
  24. }

计算完成时回调方法

当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。

CompletableFuture 有以下几个方法可以在计算完成时回调

  1. public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
  2. public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
  3. public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
  4. public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

可以看到Action的类型是BiConsumer<? super T,? super Throwable>它可以处理正常的计算结果,或者异常情况。
whenComplete 和 whenCompleteAsync 的区别:

  1. whenComplete:是把当前任务的线程执行完后,继续执行 whenComplete 的任务,是同一个线程
  2. whenCompleteAsync:是把当前任务的线程执行完后,把 whenCompleteAsync 这个任务继续提交给线程池来进行执行
  3. exceptionally:可以得到异常,同时返回默认值

示例:

  1. // 方法完成后的感知
  2. CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
  3. log.info("当前线程" + Thread.currentThread().getId());
  4. int i = 10 / 0;
  5. log.info("运行结果" + i);
  6. return i;
  7. }, exector).whenComplete((result, exception) -> {
  8. // 可以得到异常信息,没有返回值
  9. log.info("异步任务完成了....结果是:" + result + ", 异常是:" + exception);
  10. }).exceptionally((throwable)->{
  11. // 可以得到异常,同时返回默认值
  12. return 10;
  13. });
  14. log.info("supplyAsync=" + supplyAsync.get());

Handler()方法

handle()方法是执行任务完成时对结果的处理,handle() 方法和 thenApply()方法处理方式基本一样。不同的是 handle() 是在任务完成后再执行,还可以处理异常的任务。thenApply()只可以执行正常的任务,任务出现异常则不执行 thenApply方法

示例:

  1. // 方法执行完成后的处理
  2. CompletableFuture<Integer> supplyAsyncHandler = CompletableFuture.supplyAsync(() -> {
  3. log.info("当前线程" + Thread.currentThread().getId());
  4. int i = 10 / 5;
  5. log.info("运行结果" + i);
  6. return i;
  7. }, exector).handle((result, thr) -> {
  8. if (result != null) {
  9. return result * 2;
  10. }
  11. if (thr != null) {
  12. return 0;
  13. }
  14. return 0;
  15. });
  16. log.info("supplyAsyncHandler=" + supplyAsyncHandler.get());

从示例中可以看出,在 handle 中可以根据任务是否有异常来进行做相应的后续处理操作。而 thenApply方法,如果上个任务出现错误,则不会执行 thenApply方法

串行化执行

所谓串行化的意思是多个线程执行时,他们之间存在先后执行顺序,按照编排顺序执行依次这些线程,并且下一个线程可以可以获取上一个线程的执行结果。
实现线程的串行化方式可以使用 thenApply()、thenAccept()、thenRun()方法来实现

  1. public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
  2. public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
  3. public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
  4. public CompletionStage<Void> thenAccept(Consumer<? super T> action);
  5. public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
  6. public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
  7. public CompletionStage<Void> thenRun(Runnable action);
  8. public CompletionStage<Void> thenRunAsync(Runnable action);
  9. public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

Function<? super T,? extends U> 泛型T:表示上一个任务返回结果的类型,泛型U:表示当前任务的返回值类型

示例:

  1. /**
  2. * 线程串行化
  3. * 1.thenRun 不能获取到上一步的执行结果,无返回值
  4. */
  5. CompletableFuture.supplyAsync(() -> {
  6. log.info("当前线程" + Thread.currentThread().getId());
  7. int i = 10 / 5;
  8. log.info("运行结果" + i);
  9. return i;
  10. }, exector).thenRunAsync(()->{
  11. log.info("任务二启动了");
  12. });
  13. /**
  14. * 2.thenAcceptAsync 能接受上一步结果,但无返回值
  15. */
  16. CompletableFuture.supplyAsync(() -> {
  17. log.info("当前线程" + Thread.currentThread().getId());
  18. int i = 10 / 5;
  19. log.info("运行结果" + i);
  20. return i;
  21. },exector).thenAcceptAsync((result)->{
  22. log.info("任务三启动了...我没有返回值" + result );
  23. });
  24. /**
  25. * 3.thenApplyAsync 能接受上一步结果,有返回值
  26. */
  27. CompletableFuture<String> thenApplyAsync = CompletableFuture.supplyAsync(() -> {
  28. log.info("当前线程" + Thread.currentThread().getId());
  29. int i = 10 / 5;
  30. log.info("运行结果" + i);
  31. return i;
  32. }, exector).thenApplyAsync(result -> {
  33. log.info("任务四启动了...我可以有返回值");
  34. return "hello" + result;
  35. }, exector);
  36. log.info("任务四的返回值" + thenApplyAsync.get());

两个任务组合执行

所有任务都要完成

两组任务执行完成,交给第三个任务来处理这两个任务的结果

thenCombine

thenCombine 会把两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理,有返回值

  1. public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
  2. public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
  3. public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

thenAcceptBoth

当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行处理,无法回执

  1. public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
  2. public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
  3. public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);

示例:

  1. CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
  2. log.info("任务一线程开始" + Thread.currentThread().getId());
  3. int i = 10 / 5;
  4. log.info("任务一线程结束" + i);
  5. return i;
  6. }, exector);
  7. CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
  8. return "Hello";
  9. }, exector);
  10. future01.runAfterBothAsync(future02, ()->{
  11. log.info("任务三开始...");
  12. },exector);
  13. // 当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗,无返回值
  14. future01.thenAcceptBothAsync(future02, (f1, f2) -> {
  15. log.info("任务三开始,之前结果:" + f1 + "---->" + f2);
  16. }, exector);
  17. // 当两个CompletionStage的任务都执行完成后,把两个任务的结果一块交给thenCombine来处理,有返回值
  18. CompletableFuture<String> completableFuture = future01.thenCombineAsync(future02, (f1, f2) -> {
  19. log.info("任务三开始,之前结果:" + f1 + "---->" + f2);
  20. return f1 + ":" + f2 + "->Hello";
  21. }, exector);
  22. log.info("任务三开始,返回值" + completableFuture.get());

其中一个任务完成

两组任务执行时,谁先返回结果(只要一个任务执行完毕),第三组任务就开始执行

applyToEither

两组任务,谁执行返回的结果快,我就用那个任务的结果进行下一步的转化操作,有返回值

  1. public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
  2. public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
  3. public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);

acceptEither

两组任务,谁执行返回的结果快,我就用那个任务的结果进行下一步的消耗操作 ,无返回值

  1. public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
  2. public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
  3. public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);

示例:

  1. /**
  2. * 两个任务只要一个完成就执行任务三
  3. * runAfterEitherAsync 不感知结果,自己业务返回值
  4. */
  5. CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
  6. log.info("两个任务,完成一个。future1");
  7. return "两个任务,完成一个。future1";
  8. }, exector);
  9. CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
  10. try {
  11. Thread.sleep(3000);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. log.info("两个任务,完成一个,future2");
  16. return "两个任务,完成一个,future2";
  17. }, exector);
  18. //不感知结果,自己没有返回值
  19. future1.runAfterEitherAsync(future2, () -> {
  20. log.info("两个任务任务完成一个即可,任务三开始....");
  21. }, exector);
  22. // 感知结果,自己没有返回值
  23. future1.acceptEitherAsync(future2, (result) -> {
  24. log.info("两个任务任务完成一个即可,任务三开始....之前的值:" + result);
  25. }, exector);
  26. // 自己感知结果,自己有返回值
  27. CompletableFuture<String> applyToEitherAsync = future1.applyToEitherAsync(future2, (result) -> {
  28. log.info("两个任务任务完成一个即可,任务三开始....之前的值:" + result);
  29. return "两个任务任务完成一个即可,任务三开始....之前的值:" + result + ",返回值->哈哈";
  30. }, exector);
  31. log.info("两个任务任务完成一个即可,任务三开始....之前的值" + applyToEitherAsync.get());

多组任务执行

多组任务一起执行,当所有任务或者其中一个任务完成后获取任务结果

  1. CompletableFuture<String> futureImage = CompletableFuture.supplyAsync(() -> {
  2. log.info("查询商品图片信息");
  3. return "hello.jpg";
  4. });
  5. CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> {
  6. log.info("查询商品属性");
  7. return "黑色256g";
  8. });
  9. CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> {
  10. log.info("查询商品介绍");
  11. return "华为";
  12. });
  13. //futureImage.get();futureAttr.get();futureDesc.get();
  14. // 所有任务完成
  15. CompletableFuture<Void> allOf = CompletableFuture.allOf(futureImage, futureAttr, futureDesc);
  16. allOf.get(); // 等待所有结果完成
  17. log.info("查询商品图片信息"+futureImage.get()+",查询商品属性"+futureAttr.get()+",查询商品介绍"+futureDesc.get());
  18. // 任何一个任务完成
  19. CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futureImage, futureAttr, futureDesc);
  20. anyOf.get(); // 等待任何一个任务完成
  21. log.info("查询商品图片信息" + futureImage.get() + ",查询商品属性" + futureAttr.get() + ",查询商品介绍" + futureDesc.get());