1. CompletableFuture 简介
CompletableFuture 在Java 里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。
CompletableFuture 实现了Future, CompletionStage 接口,实现了Future接口就可以兼容现在有线程池框架,而CompletionStage 接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture 类。
2. Future 与CompletableFuture
Futrue 在Java 里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个Futrue,在Future 里面有isDone 方法来 判断任务是否处理结束,还有get 方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。
Future 的主要缺点如下:
(1)不支持手动完成
我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成
(2)不支持进一步的非阻塞调用
通过Future 的get 方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为Future 不支持回调函数,所以无法实现这个功能
(3)不支持链式调用
对于Future 的执行结果,我们想继续传到下一个Future 处理使用,从而形成一个链式的pipline 调用,这在Future 中是没法实现的。
(4)不支持多个Future 合并
比如我们有10 个Future 并行执行,我们想在所有的Future 运行完毕之后,执行某些函数,是没法通过Future 实现的。
(5)不支持异常处理
Future 的API 没有任何的异常处理的api,所以在异步运行时,如果出了问题是不好定位的。
3. CompletableFuture 入门
1.使用CompletableFuture
场景:主线程里面创建一个CompletableFuture,然后主线程调用get 方法会阻塞,最后我们在一个子线程中使其终止。
2. 没有返回值的异步任务
CompletableFuture.runAsync();
public static void main(String[] args) throws ExecutionException, InterruptedException {//没有返回值的 异步调用CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {System.out.println(Thread.currentThread().getName() + " completableFuture1");});completableFuture1.get();}
3. 有返回值的异步任务
CompletableFuture.supplyAsync();
public static void main(String[] args) throws ExecutionException, InterruptedException {//有返回值的 异步调用CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getName() + " completableFuture2");return 1024;});/*** 异步完成后的回调* t 返回值* u 异常信息 无异常返回null*/completableFuture2.whenComplete((t,u)->{System.out.println("t: "+ t);System.out.println("u: "+ u);});}
4. 线程依赖
当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。
private static Integer num = 10;/*** 先对一个数加10,然后取平方* @param args*/public static void main(String[] args) throws Exception{System.out.println("主线程开始");CompletableFuture<Integer> future =CompletableFuture.supplyAsync(() -> {try {System.out.println("加10 任务开始");num += 10;} catch (Exception e) {e.printStackTrace();}return num;}).thenApply(integer -> {return num * num;});Integer integer = future.get();System.out.println("主线程结束, 子线程的结果为:" + integer);}
5. 消费处理结果
thenAccept 消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。
public static void main(String[] args) throws Exception{System.out.println("主线程开始");CompletableFuture.supplyAsync(() -> {try {System.out.println("加10 任务开始");num += 10;} catch (Exception e) {e.printStackTrace();}return num;}).thenApply(integer -> {return num * num;}).thenAccept(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) {System.out.println("子线程全部处理完成,最后调用了accept,结果为:" + integer);}});}
6. 异常处理
exceptionally 异常处理,出现异常时触发.
public static void main(String[] args) throws Exception{System.out.println("主线程开始");CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {int i= 1/0;System.out.println("加10 任务开始");num += 10;return num;}).exceptionally(ex -> {System.out.println(ex.getMessage());return -1;});System.out.println(future.get());}
handle 类似于thenAccept/thenRun 方法,是最后一步的处理调用,但是同时可以处理异常
public static void main(String[] args) throws Exception{System.out.println("主线程开始");CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("加10 任务开始");num += 10;return num;}).handle((i,ex) ->{System.out.println("进入handle 方法");if(ex != null){System.out.println("发生了异常,内容为:" + ex.getMessage());return -1;}else{System.out.println("正常完成,内容为: " + i);return i;}});System.out.println(future.get());}
7. 结果合并
thenCompose 合并两个有依赖关系的CompletableFutures 的执行结果
public static void main(String[] args) throws Exception{System.out.println("主线程开始");//第一步加10CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("加10 任务开始");num += 10;return num;});//合并CompletableFuture<Integer> future1 = future.thenCompose(i ->//再来一个CompletableFutureCompletableFuture.supplyAsync(() -> {return i + 1;}));System.out.println(future.get());System.out.println(future1.get());}
thenCombine 合并两个没有依赖关系的CompletableFutures 任务
public static void main(String[] args) throws Exception{System.out.println("主线程开始");CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {System.out.println("加10 任务开始");num += 10;return num;});CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {System.out.println("乘以10 任务开始");num = num * 10;return num;});//合并两个结果CompletableFuture<Object> future = job1.thenCombine(job2, new BiFunction<Integer, Integer, List<Integer>>() {@Overridepublic List<Integer> apply(Integer a, Integer b) {List<Integer> list = new ArrayList<>();list.add(a);list.add(b);return list;}});System.out.println("合并结果为:" + future.get());}
8. 合并多个任务的结果allOf 与anyOf
allOf: 一系列独立的future 任务,等其所有的任务执行完后做一些事情
List<CompletableFuture> list = new ArrayList<>();//...//多任务合并List<Integer> collect = list.stream().map(CompletableFuture<Integer>::join).collect(Collectors.toList());System.out.println(collect);
anyOf: 只要在多个future 里面有一个返回,整个任务就可以结束,而不需要等到每一个future 结束
CompletableFuture<Integer>[] futures = new CompletableFuture[4];//...CompletableFuture<Object> future = CompletableFuture.anyOf(futures);System.out.println(future.get());
