创建异步对象方式
CompletableFuture提供了四个静态方法来创建一个异步操作
public static CompletableFuture<Void> runAsync(Runnable runnable)public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都雷同。
- runAsync()不支持返回值。
- supplyAsync()可以支持返回值。
示例:
//无返回值public static void runAsync() throws Exception {CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {}System.out.println("run end ...");});System.out.println(future.get());}//有返回值public static void supplyAsync() throws Exception {CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {}System.out.println("run end ...");return System.currentTimeMillis();});long time = future.get();System.out.println("time = "+time);}
计算完成时回调方法
当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。
CompletableFuture 有以下几个方法可以在计算完成时回调
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
可以看到Action的类型是BiConsumer<? super T,? super Throwable>它可以处理正常的计算结果,或者异常情况。
whenComplete 和 whenCompleteAsync 的区别:
- whenComplete:是把当前任务的线程执行完后,继续执行 whenComplete 的任务,是同一个线程
- whenCompleteAsync:是把当前任务的线程执行完后,把 whenCompleteAsync 这个任务继续提交给线程池来进行执行
- exceptionally:可以得到异常,同时返回默认值
示例:
// 方法完成后的感知CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {log.info("当前线程" + Thread.currentThread().getId());int i = 10 / 0;log.info("运行结果" + i);return i;}, exector).whenComplete((result, exception) -> {// 可以得到异常信息,没有返回值log.info("异步任务完成了....结果是:" + result + ", 异常是:" + exception);}).exceptionally((throwable)->{// 可以得到异常,同时返回默认值return 10;});log.info("supplyAsync=" + supplyAsync.get());
Handler()方法
handle()方法是执行任务完成时对结果的处理,handle() 方法和 thenApply()方法处理方式基本一样。不同的是 handle() 是在任务完成后再执行,还可以处理异常的任务。thenApply()只可以执行正常的任务,任务出现异常则不执行 thenApply方法
示例:
// 方法执行完成后的处理CompletableFuture<Integer> supplyAsyncHandler = CompletableFuture.supplyAsync(() -> {log.info("当前线程" + Thread.currentThread().getId());int i = 10 / 5;log.info("运行结果" + i);return i;}, exector).handle((result, thr) -> {if (result != null) {return result * 2;}if (thr != null) {return 0;}return 0;});log.info("supplyAsyncHandler=" + supplyAsyncHandler.get());
从示例中可以看出,在 handle 中可以根据任务是否有异常来进行做相应的后续处理操作。而 thenApply方法,如果上个任务出现错误,则不会执行 thenApply方法
串行化执行
所谓串行化的意思是多个线程执行时,他们之间存在先后执行顺序,按照编排顺序执行依次这些线程,并且下一个线程可以可以获取上一个线程的执行结果。
实现线程的串行化方式可以使用 thenApply()、thenAccept()、thenRun()方法来实现
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)public CompletionStage<Void> thenAccept(Consumer<? super T> action);public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);public CompletionStage<Void> thenRun(Runnable action);public CompletionStage<Void> thenRunAsync(Runnable action);public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
Function<? super T,? extends U> 泛型T:表示上一个任务返回结果的类型,泛型U:表示当前任务的返回值类型
示例:
/*** 线程串行化* 1.thenRun 不能获取到上一步的执行结果,无返回值*/CompletableFuture.supplyAsync(() -> {log.info("当前线程" + Thread.currentThread().getId());int i = 10 / 5;log.info("运行结果" + i);return i;}, exector).thenRunAsync(()->{log.info("任务二启动了");});/*** 2.thenAcceptAsync 能接受上一步结果,但无返回值*/CompletableFuture.supplyAsync(() -> {log.info("当前线程" + Thread.currentThread().getId());int i = 10 / 5;log.info("运行结果" + i);return i;},exector).thenAcceptAsync((result)->{log.info("任务三启动了...我没有返回值" + result );});/*** 3.thenApplyAsync 能接受上一步结果,有返回值*/CompletableFuture<String> thenApplyAsync = CompletableFuture.supplyAsync(() -> {log.info("当前线程" + Thread.currentThread().getId());int i = 10 / 5;log.info("运行结果" + i);return i;}, exector).thenApplyAsync(result -> {log.info("任务四启动了...我可以有返回值");return "hello" + result;}, exector);log.info("任务四的返回值" + thenApplyAsync.get());
两个任务组合执行
所有任务都要完成
两组任务执行完成,交给第三个任务来处理这两个任务的结果
thenCombine
thenCombine 会把两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理,有返回值
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
thenAcceptBoth
当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行处理,无法回执
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
示例:
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {log.info("任务一线程开始" + Thread.currentThread().getId());int i = 10 / 5;log.info("任务一线程结束" + i);return i;}, exector);CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {return "Hello";}, exector);future01.runAfterBothAsync(future02, ()->{log.info("任务三开始...");},exector);// 当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗,无返回值future01.thenAcceptBothAsync(future02, (f1, f2) -> {log.info("任务三开始,之前结果:" + f1 + "---->" + f2);}, exector);// 当两个CompletionStage的任务都执行完成后,把两个任务的结果一块交给thenCombine来处理,有返回值CompletableFuture<String> completableFuture = future01.thenCombineAsync(future02, (f1, f2) -> {log.info("任务三开始,之前结果:" + f1 + "---->" + f2);return f1 + ":" + f2 + "->Hello";}, exector);log.info("任务三开始,返回值" + completableFuture.get());
其中一个任务完成
两组任务执行时,谁先返回结果(只要一个任务执行完毕),第三组任务就开始执行
applyToEither
两组任务,谁执行返回的结果快,我就用那个任务的结果进行下一步的转化操作,有返回值
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
acceptEither
两组任务,谁执行返回的结果快,我就用那个任务的结果进行下一步的消耗操作 ,无返回值
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
示例:
/*** 两个任务只要一个完成就执行任务三* runAfterEitherAsync 不感知结果,自己业务返回值*/CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {log.info("两个任务,完成一个。future1");return "两个任务,完成一个。future1";}, exector);CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}log.info("两个任务,完成一个,future2");return "两个任务,完成一个,future2";}, exector);//不感知结果,自己没有返回值future1.runAfterEitherAsync(future2, () -> {log.info("两个任务任务完成一个即可,任务三开始....");}, exector);// 感知结果,自己没有返回值future1.acceptEitherAsync(future2, (result) -> {log.info("两个任务任务完成一个即可,任务三开始....之前的值:" + result);}, exector);// 自己感知结果,自己有返回值CompletableFuture<String> applyToEitherAsync = future1.applyToEitherAsync(future2, (result) -> {log.info("两个任务任务完成一个即可,任务三开始....之前的值:" + result);return "两个任务任务完成一个即可,任务三开始....之前的值:" + result + ",返回值->哈哈";}, exector);log.info("两个任务任务完成一个即可,任务三开始....之前的值" + applyToEitherAsync.get());
多组任务执行
多组任务一起执行,当所有任务或者其中一个任务完成后获取任务结果
CompletableFuture<String> futureImage = CompletableFuture.supplyAsync(() -> {log.info("查询商品图片信息");return "hello.jpg";});CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> {log.info("查询商品属性");return "黑色256g";});CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> {log.info("查询商品介绍");return "华为";});//futureImage.get();futureAttr.get();futureDesc.get();// 所有任务完成CompletableFuture<Void> allOf = CompletableFuture.allOf(futureImage, futureAttr, futureDesc);allOf.get(); // 等待所有结果完成log.info("查询商品图片信息"+futureImage.get()+",查询商品属性"+futureAttr.get()+",查询商品介绍"+futureDesc.get());// 任何一个任务完成CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futureImage, futureAttr, futureDesc);anyOf.get(); // 等待任何一个任务完成log.info("查询商品图片信息" + futureImage.get() + ",查询商品属性" + futureAttr.get() + ",查询商品介绍" + futureDesc.get());
