CompletableFuture是 java8 中新增的一个类,算是对 Future 的一种增强,用起来很方便,也是会经常用到的一个工具类,熟悉一下。
目录
- CompletionStage 接口
- CompletableFuture 类
- CompletableFuture 常见的方法- runAsync 和 supplyAsync 方法
- 计算结果完成时的回调方法
- thenApply 方法
- handle 方法
- thenAccept 消费处理结果
- thenRun 方法
- thenCombine 合并任务
- thenAcceptBoth
- applyToEither 方法
- acceptEither 方法
- runAfterEither 方法
- runAfterBoth
- thenCompose 方法
 
- 
CompletionStage 接口
- CompletionStage 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段 
- 一个阶段的计算执行可以是一个 Function,Consumer 或者 Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
- 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发 - CompletableFuture 类
- 在 Java8 中,CompletableFuture 提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。 
- 它可能代表一个明确完成的 Future,也有可能代表一个完成阶段( CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。
- 它实现了 Future 和 CompletionStage 接口
CompletableFuture 常见的方法runAsync 和 supplyAsync 方法CompletableFuture 提供了四个静态方法来创建一个异步操作。
 public static CompletableFuturerunAsync(Runnable runnable) 
 public static CompletableFuturerunAsync(Runnable runnable, Executor executor) 
 public static CompletableFuture supplyAsync(Supplier supplier)
 public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
没有指定 Executor 的方法会使用 ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。
- runAsync 方法不支持返回值。
- supplyAsync 可以支持返回值。
示例代码
//无返回值
public static void runAsync() throws Exception {
    CompletableFuture
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
        }
        System.out.println(“run end …”);
    });
future.get();<br />}
//有返回值
public static void supplyAsync() throws Exception {
    CompletableFuture
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
        }
        System.out.println(“run end …”);
        return System.currentTimeMillis();
    });
**long** time = future.get();<br /> System.out.println("time = "+time);<br />}
计算结果完成时的回调方法
当 CompletableFuture 的计算结果完成,或者抛出异常的时候,可以执行特定的 Action。主要是下面的方法:
public CompletableFuture
public CompletableFuture
public CompletableFuture
public CompletableFuture
可以看到 Action 的类型是 BiConsumer<? super T,? super Throwable>它可以处理正常的计算结果,或者异常情况。
whenComplete 和 whenCompleteAsync 的区别:
- whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
- whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
示例代码
public static void whenComplete() throws Exception {
    CompletableFuture
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
        }
        if(new Random().nextInt()%2>=0) {
            int i = 12/0;
        }
        System.out.println(“run end …”);
    });
future.whenComplete(**new** BiConsumer<Void, Throwable>() {<br /> @Override<br /> **public** **void** **accept**(Void t, Throwable action) {<br /> System.out.println("执行完成!");<br /> }
});<br /> future.exceptionally(**new** Function<Throwable, Void>() {<br /> @Override<br /> **public** Void **apply**(Throwable t) {<br /> System.out.println("执行失败!"+t.getMessage());<br /> **return** **null**;<br /> }<br /> });
TimeUnit.SECONDS.sleep(2);<br />}
thenApply 方法
当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。
public  CompletableFuture thenApply(Function<? super T,? extends U> fn)
public  CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn)
public  CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
Function<? super T,? extends U> T:上一个任务返回结果的类型 U:当前任务的返回值类型
示例代码
private static void thenApply() throws Exception {
    CompletableFuture
        @Override
        public Long get() {
            long result = new Random().nextInt(100);
            System.out.println(“result1=”+result);
            return result;
        }
    }).thenApply(new Function
        @Override
        public Long apply(Long t) {
            long result = t5;
            System.out.println(“result2=”+result);
            *return result;
        }
    });
**long** result = future.get();<br /> System.out.println(result);<br />}
handle 方法
handle 是执行任务完成时对结果的处理。handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。
public  CompletionStage handle(BiFunction<? super T, Throwable, ? extends U> fn);
public  CompletionStage handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public  CompletionStage handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
示例代码
public static void handle() throws Exception{
    CompletableFuture
@Override<br /> **public** Integer **get**() {<br /> **int** i= 10/0;<br /> **return** **new** Random().nextInt(10);<br /> }<br /> }).handle(**new** BiFunction<Integer, Throwable, Integer>() {<br /> @Override<br /> **public** Integer **apply**(Integer param, Throwable throwable) {<br /> **int** result = -1;<br /> **if**(throwable==**null**){<br /> result = param * 2;<br /> }**else**{<br /> System.out.println(throwable.getMessage());<br /> }<br /> **return** result;<br /> }<br /> });<br /> System.out.println(future.get());<br />}
从示例中可以看出,在 handle 中可以根据任务是否有异常来进行做相应的后续处理操作。而 thenApply 方法,如果上个任务出现错误,则不会执行 thenApply 方法。
thenAccept 消费处理结果
接收任务的处理结果,并消费处理,无返回结果。
public CompletionStage
public CompletionStage
public CompletionStage
示例代码
public static void thenAccept() throws Exception{
    CompletableFuture
        @Override
        public Integer get() {
            return new Random().nextInt(10);
        }
    }).thenAccept(integer -> {
        System.out.println(integer);
    });
    future.get();
}
从示例代码中可以看出,该方法只是消费执行完成的任务,并可以根据上面的任务返回的结果进行处理。并没有后续的输错操作。
thenRun 方法
跟 thenAccept 方法不一样的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenAccept 。
public CompletionStage
public CompletionStage
public CompletionStage
示例代码
public static void thenRun() throws Exception{
    CompletableFuture
        @Override
        public Integer get() {
            return new Random().nextInt(10);
        }
    }).thenRun(() -> {
        System.out.println(“thenRun …”);
    });
    future.get();
}
该方法同 thenAccept 方法类似。不同的是上个任务处理完成后,并不会把计算的结果传给 thenRun 方法。只是处理完任务后,执行 thenAccept 的后续操作。
thenCombine 合并任务
thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。
public  CompletionStage
public  CompletionStage
public  CompletionStage
示例代码
private static void thenCombine() throws Exception {
    CompletableFuture
        @Override
        public String get() {
            return “hello”;
        }
    });
    CompletableFuture
        @Override
        public String get() {
            return “hello”;
        }
    });
    CompletableFuture
        @Override
        public String apply(String t, String u) {
            return t+” “+u;
        }
    });
    System.out.println(result.get());
}
thenAcceptBoth
当两个 CompletionStage 都执行完成后,把结果一块交给 thenAcceptBoth 来进行消耗
public  CompletionStage
public  CompletionStage
public  CompletionStage
示例代码
private static void thenAcceptBoth() throws Exception {
    CompletableFuture
        @Override
        public Integer get() {
            int t = new Random().nextInt(3);
            try {
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(“f1=”+t);
            return t;
        }
    });
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(**new** Supplier<Integer>() {<br /> @Override<br /> **public** Integer **get**() {<br /> **int** t = **new** Random().nextInt(3);<br /> **try** {<br /> TimeUnit.SECONDS.sleep(t);<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> }<br /> System.out.println("f2="+t);<br /> **return** t;<br /> }<br /> });<br /> f1.thenAcceptBoth(f2, **new** BiConsumer<Integer, Integer>() {<br /> @Override<br /> **public** **void** **accept**(Integer t, Integer u) {<br /> System.out.println("f1="+t+";f2="+u+";");<br /> }<br /> });<br />}
applyToEither 方法
两个 CompletionStage,谁执行返回的结果快,我就用那个 CompletionStage 的结果进行下一步的转化操作。
public  CompletionStage applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public  CompletionStage applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public  CompletionStage applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
示例代码
private static void applyToEither() throws Exception {
    CompletableFuture
        @Override
        public Integer get() {
            int t = new Random().nextInt(3);
            try {
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(“f1=”+t);
            return t;
        }
    });
    CompletableFuture
        @Override
        public Integer get() {
            int t = new Random().nextInt(3);
            try {
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(“f2=”+t);
            return t;
        }
    });
CompletableFuture<Integer> result = f1.applyToEither(f2, **new** Function<Integer, Integer>() {<br /> @Override<br /> **public** Integer **apply**(Integer t) {<br /> System.out.println(t);<br /> **return** t * 2;<br /> }<br /> });
System.out.println(result.get());<br />}
acceptEither 方法
两个 CompletionStage,谁执行返回的结果快,我就用那个 CompletionStage 的结果进行下一步的消耗操作。
public CompletionStage
public CompletionStage
public CompletionStage
示例代码
private static void acceptEither() throws Exception {
    CompletableFuture
        @Override
        public Integer get() {
            int t = new Random().nextInt(3);
            try {
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(“f1=”+t);
            return t;
        }
    });
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(**new** Supplier<Integer>() {<br /> @Override<br /> **public** Integer **get**() {<br /> **int** t = **new** Random().nextInt(3);<br /> **try** {<br /> TimeUnit.SECONDS.sleep(t);<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> }<br /> System.out.println("f2="+t);<br /> **return** t;<br /> }<br /> });<br /> f1.acceptEither(f2, **new** Consumer<Integer>() {<br /> @Override<br /> **public** **void** **accept**(Integer t) {<br /> System.out.println(t);<br /> }<br /> });<br />}
runAfterEither 方法
两个 CompletionStage,任何一个完成了都会执行下一步的操作(Runnable)
public CompletionStage
public CompletionStage
public CompletionStage
示例代码
private static void runAfterEither() throws Exception {
    CompletableFuture
        @Override
        public Integer get() {
            int t = new Random().nextInt(3);
            try {
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(“f1=”+t);
            return t;
        }
    });
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(**new** Supplier<Integer>() {<br /> @Override<br /> **public** Integer **get**() {<br /> **int** t = **new** Random().nextInt(3);<br /> **try** {<br /> TimeUnit.SECONDS.sleep(t);<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> }<br /> System.out.println("f2="+t);<br /> **return** t;<br /> }<br /> });<br /> f1.runAfterEither(f2, **new** Runnable() {
@Override<br /> **public** **void** **run**() {<br /> System.out.println("上面有一个已经完成了。");<br /> }<br /> });<br />}
runAfterBoth
两个 CompletionStage,都完成了计算才会执行下一步的操作(Runnable)
public CompletionStage
public CompletionStage
public CompletionStage
示例代码
private static void runAfterBoth() throws Exception {
    CompletableFuture
        @Override
        public Integer get() {
            int t = new Random().nextInt(3);
            try {
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(“f1=”+t);
            return t;
        }
    });
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(**new** Supplier<Integer>() {<br /> @Override<br /> **public** Integer **get**() {<br /> **int** t = **new** Random().nextInt(3);<br /> **try** {<br /> TimeUnit.SECONDS.sleep(t);<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> }<br /> System.out.println("f2="+t);<br /> **return** t;<br /> }<br /> });<br /> f1.runAfterBoth(f2, **new** Runnable() {
@Override<br /> **public** **void** **run**() {<br /> System.out.println("上面两个任务都执行完成了。");<br /> }<br /> });<br />}
thenCompose 方法
thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。
public  CompletableFuture thenCompose(Function<? super T, ? extends CompletionStage> fn);
public  CompletableFuture thenComposeAsync(Function<? super T, ? extends CompletionStage> fn) ;
public  CompletableFuture thenComposeAsync(Function<? super T, ? extends CompletionStage> fn, Executor executor) ;
示例代码
private static void thenCompose() throws Exception {
        CompletableFuture
            @Override
            public Integer get() {
                int t = new Random().nextInt(3);
                System.out.println(“t1=”+t);
                return t;
            }
        }).thenCompose(new Function
            @Override
            public CompletionStage
                return CompletableFuture.supplyAsync(new Supplier
                    @Override
                    public Integer get() {
                        int t = param 2;
                        System.out.println(“t2=”+t);
                        *return t;
                    }
                });
            }
});<br /> System.out.println("thenCompose result : "+f.get());<br /> }
 
                         
                                

