CompletableFuture:异步编程没那么难
异步化,是并行方案得以实施的基础,更深入地讲其实就是:利用多线程优化性能这个核心方案得以实施的基础。
CompletableFuture 的核心优势
1.无需手工维护线程
2.语义更清晰
3.代码更简练并且专注于业务逻辑
示例, 实现烧水泡茶程序
//任务1:洗水壶->烧开水CompletableFuture<Void> f1 =CompletableFuture.runAsync(()->{System.out.println("T1:洗水壶...");sleep(1, TimeUnit.SECONDS);System.out.println("T1:烧开水...");sleep(15, TimeUnit.SECONDS);});//任务2:洗茶壶->洗茶杯->拿茶叶CompletableFuture<String> f2 =CompletableFuture.supplyAsync(()->{System.out.println("T2:洗茶壶...");sleep(1, TimeUnit.SECONDS);System.out.println("T2:洗茶杯...");sleep(2, TimeUnit.SECONDS);System.out.println("T2:拿茶叶...");sleep(1, TimeUnit.SECONDS);return "龙井";});//任务3:任务1和任务2完成后执行:泡茶CompletableFuture<String> f3 =f1.thenCombine(f2, (__, tf)->{System.out.println("T1:拿到茶叶:" + tf);System.out.println("T1:泡茶...");return "上茶:" + tf;});//等待任务3执行结果System.out.println(f3.join());void sleep(int t, TimeUnit u) {try {u.sleep(t);}catch(InterruptedException e){}}// 一次执行结果:T1:洗水壶...T2:洗茶壶...T1:烧开水...T2:洗茶杯...T2:拿茶叶...T1:拿到茶叶:龙井T1:泡茶...上茶:龙井
创建 CompletableFuture 对象
//使用默认线程池static CompletableFuture<Void>runAsync(Runnable runnable)static <U> CompletableFuture<U>supplyAsync(Supplier<U> supplier)//可以指定线程池static CompletableFuture<Void>runAsync(Runnable runnable, Executor executor)static <U> CompletableFuture<U>supplyAsync(Supplier<U> supplier, Executor executor)
- runAsync(Runnable runnable)和supplyAsync(Supplier supplier),它们之间的区别是:Runnable 接口的 run() 方法没有返回值,而 Supplier 接口的 get() 方法是有返回值的。
- 默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议根据不同的业务类型创建不同的线程池,以避免互相干扰。
- 创建完 CompletableFuture 对象之后,会自动地异步执行 runnable.run() 方法或者 supplier.get() 方法,对于一个异步操作,你需要关注两个问题:一个是异步操作什么时候结束,另一个是如何获取异步操作的执行结果。因为 CompletableFuture 类实现了 Future 接口,所以这两个问题你都可以通过 Future 接口来解决。另外,CompletableFuture 类还实现了 CompletionStage 接口.
如何理解 CompletionStage 接口
任务是有时序关系的,比如有串行关系、并行关系、汇聚关系等。CompletionStage 接口可以清晰地描述任务之间的这种时序关系.
串行关系
并行关系
汇聚关系
1.描述串行关系
CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口。
thenApply 系列函数里参数 fn 的类型是接口 Function,这个接口里与 CompletionStage 相关的方法是 R apply(T t),这个方法既能接收参数也支持返回值,所以 thenApply 系列方法返回的是CompletionStage。
而 thenAccept 系列方法里参数 consumer 的类型是接口Consumer,这个接口里与 CompletionStage 相关的方法是 void accept(T t),这个方法虽然支持参数,但却不支持回值,所以 thenAccept 系列方法返回的是CompletionStage。
thenRun 系列方法里 action 的参数是 Runnable,所以 action 既不能接收参数也不支持返回值,所以 thenRun 系列方法返回的也是CompletionStage。
这些方法里面 Async 代表的是异步执行 fn、consumer 或者 action。其中,需要你注意的是 thenCompose 系列方法,这个系列的方法会新创建出一个子流程,最终结果和 thenApply 系列是相同的。 ```java
CompletionStage
示例:(首先通过 supplyAsync() 启动一个异步流程,之后是两个串行操作,整体看起来还是挺简单的。不过,虽然这是一个异步流程,但任务①②③却是串行执行的,②依赖①的执行结果,③依赖②的执行结果。)```javaCompletableFuture<String> f0 =CompletableFuture.supplyAsync(() -> "Hello World") //①.thenApply(s -> s + " QQ") //②.thenApply(String::toUpperCase);//③System.out.println(f0.join());//输出结果HELLO WORLD QQ
2.描述AND汇聚关系
CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。
CompletionStage<R> thenCombine(other, fn);CompletionStage<R> thenCombineAsync(other, fn);CompletionStage<Void> thenAcceptBoth(other, consumer);CompletionStage<Void> thenAcceptBothAsync(other, consumer);CompletionStage<Void> runAfterBoth(other, action);CompletionStage<Void> runAfterBothAsync(other, action);
3.描述OR汇聚关系
CompletionStage 接口里面描述 OR 汇聚关系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。
CompletionStage applyToEither(other, fn);CompletionStage applyToEitherAsync(other, fn);CompletionStage acceptEither(other, consumer);CompletionStage acceptEitherAsync(other, consumer);CompletionStage runAfterEither(other, action);CompletionStage runAfterEitherAsync(other, action);
示例:
CompletableFuture<String> f1 =CompletableFuture.supplyAsync(()->{int t = getRandom(5, 10);sleep(t, TimeUnit.SECONDS);return String.valueOf(t);});CompletableFuture<String> f2 =CompletableFuture.supplyAsync(()->{int t = getRandom(5, 10);sleep(t, TimeUnit.SECONDS);return String.valueOf(t);});CompletableFuture<String> f3 =f1.applyToEither(f2,s -> s);System.out.println(f3.join());
4.异常处理
虽然上面我们提到的 fn、consumer、action 它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常, CompletionStage 接口给我们提供的方案非常简单,比 try{}catch{}还要简单,下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。
CompletionStage exceptionally(fn);CompletionStage<R> whenComplete(consumer);CompletionStage<R> whenCompleteAsync(consumer);CompletionStage<R> handle(fn);CompletionStage<R> handleAsync(fn);
示例:
exceptionally() 的使用非常类似于 try{}catch{}中的 catch{},但是由于支持链式编程方式,所以相对更简单。既然有 try{}catch{},那就一定还有 try{}finally{},whenComplete() 和 handle() 系列方法就类似于 try{}finally{}中的 finally{},无论是否发生异常都会执行 whenComplete() 中的回调函数 consumer 和 handle() 中的回调函数 fn。whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。
CompletableFuture<Integer>f0 = CompletableFuture.supplyAsync(()->7/0)).thenApply(r->r*10).exceptionally(e->0);System.out.println(f0.join());
CompletionService:如何批量执行异步任务?
场景:
“ThreadPoolExecutor+Future”的方案:用三个线程异步执行询价,通过三次调用 Future 的 get() 方法获取询价结果,之后将询价结果保存在数据库中。
// 创建线程池ExecutorService executor =Executors.newFixedThreadPool(3);// 异步向电商S1询价Future<Integer> f1 =executor.submit(()->getPriceByS1());// 异步向电商S2询价Future<Integer> f2 =executor.submit(()->getPriceByS2());// 异步向电商S3询价Future<Integer> f3 =executor.submit(()->getPriceByS3());// 获取电商S1报价并保存r=f1.get();executor.execute(()->save(r));// 获取电商S2报价并保存r=f2.get();executor.execute(()->save(r));// 获取电商S3报价并保存r=f3.get();executor.execute(()->save(r));
上面的这个方案本身没有太大问题,但是有个地方的处理需要你注意,那就是如果获取电商 S1 报价的耗时很长,那么即便获取电商 S2 报价的耗时很短,也无法让保存 S2 报价的操作先执行,因为这个主线程都阻塞在了 f1.get() 操作上.
解决方案: 增加一个阻塞队列,获取到 S1、S2、S3 的报价都进入阻塞队列,然后在主线程中消费阻塞队列,这样就能保证先获取到的报价先保存到数据库了。
// 创建阻塞队列BlockingQueue<Integer> bq =new LinkedBlockingQueue<>();//电商S1报价异步进入阻塞队列executor.execute(()->bq.put(f1.get()));//电商S2报价异步进入阻塞队列executor.execute(()->bq.put(f2.get()));//电商S3报价异步进入阻塞队列executor.execute(()->bq.put(f3.get()));//异步保存所有报价for (int i=0; i<3; i++) {Integer r = bq.take();executor.execute(()->save(r));}
利用 CompletionService 实现询价系统
利用 CompletionService 不但能帮你解决先获取到的报价先保存到数据库的问题,而且还能让代码更简练。
CompletionService 的实现原理也是内部维护了一个阻塞队列,当任务执行结束就把任务的执行结果加入到阻塞队列中,不同的是 CompletionService 是把任务执行结果的 Future 对象加入到阻塞队列中,而上面的示例代码是把任务最终的执行结果放入了阻塞队列中。
CompletionService 接口的实现类是 ExecutorCompletionService,这个实现类的构造方法有两个,分别是:ExecutorCompletionService(Executor executor);ExecutorCompletionService(Executor executor, BlockingQueue> completionQueue)。这两个构造方法都需要传入一个线程池,如果不指定 completionQueue,那么默认会使用无界的 LinkedBlockingQueue。任务执行结果的 Future 对象就是加入到 completionQueue 中。
示例: (通过 CompletionService 接口提供的 submit() 方法提交了三个询价操作,这三个询价操作将会被 CompletionService 异步执行。最后,我们通过 CompletionService 接口提供的 take() 方法获取一个 Future 对象)
// 创建线程池ExecutorService executor =Executors.newFixedThreadPool(3);// 创建CompletionServiceCompletionService<Integer> cs = newExecutorCompletionService<>(executor);// 异步向电商S1询价cs.submit(()->getPriceByS1());// 异步向电商S2询价cs.submit(()->getPriceByS2());// 异步向电商S3询价cs.submit(()->getPriceByS3());// 将询价结果异步保存到数据库for (int i=0; i<3; i++) {Integer r = cs.take().get();executor.execute(()->save(r));}
CompletionService 接口说明
CompletionService 接口提供的方法有 5 个,这 5 个方法的方法签名如下所示
take()、poll() 都是从阻塞队列中获取并移除一个元素;它们的区别在于如果阻塞队列是空的,那么调用 take() 方法的线程会被阻塞,而 poll() 方法会返回 null 值。 poll(long timeout, TimeUnit unit) 方法支持以超时的方式获取并移除阻塞队列头部的一个元素,如果等待了 timeout unit 时间,阻塞队列还是空的,那么该方法会返回 null 值。
Future<V> submit(Callable<V> task);Future<V> submit(Runnable task, V result);Future<V> take()throws InterruptedException;Future<V> poll();Future<V> poll(long timeout, TimeUnit unit)throws InterruptedException;
利用 CompletionService 实现 Dubbo 中的 Forking Cluster
Dubbo 中有一种叫做 Forking 的集群模式,这种集群模式下,支持并行地调用多个查询服务,只要有一个成功返回结果,整个服务就可以返回了。例如你需要提供一个地址转坐标的服务,为了保证该服务的高可用和性能,你可以并行地调用 3 个地图服务商的 API,然后只要有 1 个正确返回了结果 r,那么地址转坐标这个服务就可以直接返回 r 了。这种集群模式可以容忍 2 个地图服务商服务异常,但缺点是消耗的资源偏多。
geocoder(addr) {//并行执行以下3个查询服务,r1=geocoderByS1(addr);r2=geocoderByS2(addr);r3=geocoderByS3(addr);//只要r1,r2,r3有一个返回//则返回return r1|r2|r3;}
利用 CompletionService 可以快速实现 Forking 这种集群模式,比如下面的示例代码就展示了具体是如何实现的。首先我们创建了一个线程池 executor 、一个 CompletionService 对象 cs 和一个Future类型的列表 futures,每次通过调用 CompletionService 的 submit() 方法提交一个异步任务,会返回一个 Future 对象,我们把这些 Future 对象保存在列表 futures 中。通过调用 cs.take().get(),我们能够拿到最快返回的任务执行结果,只要我们拿到一个正确返回的结果,就可以取消所有任务并且返回最终结果了。
// 创建线程池ExecutorService executor =Executors.newFixedThreadPool(3);// 创建CompletionServiceCompletionService<Integer> cs =new ExecutorCompletionService<>(executor);// 用于保存Future对象List<Future<Integer>> futures =new ArrayList<>(3);//提交异步任务,并保存future到futuresfutures.add(cs.submit(()->geocoderByS1()));futures.add(cs.submit(()->geocoderByS2()));futures.add(cs.submit(()->geocoderByS3()));// 获取最快返回的任务执行结果Integer r = 0;try {// 只要有一个成功返回,则breakfor (int i = 0; i < 3; ++i) {r = cs.take().get();//简单地通过判空来检查是否成功返回if (r != null) {break;}}} finally {//取消所有任务for(Future<Integer> f : futures)f.cancel(true);}// 返回结果return r;
总结:
当需要批量提交异步任务的时候建议你使用 CompletionService。CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单。除此之外,CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如 Forking Cluster 这样的需求。
Fork/Join:单机版的MapReduce
对于简单的并行任务,你可以通过“线程池 +Future”的方案来解决;如果任务之间有聚合关系,无论是 AND 聚合还是 OR 聚合,都可以通过 CompletableFuture 来解决;而批量的并行任务,则可以通过 CompletionService 来解决。
并发编程可以分为三个层面的问题,分别是分工、协作和互斥,当你关注于任务的时候,你会发现你的视角已经从并发编程的细节中跳出来了,你应用的更多的是现实世界的思维模式,类比的往往是现实世界里的分工,所以我把线程池、Future、CompletableFuture 和 CompletionService 都列到了分工里面。
从上到下,依次为简单并行任务、聚合任务和批量并行任务示意图
上面提到的简单并行、聚合、批量并行这三种任务模型,基本上能够覆盖日常工作中的并发场景了,但还是不够全面,因为还有一种“分治”的任务模型没有覆盖到。分治,顾名思义,即分而治之,是一种解决复杂问题的思维方法和模式;具体来讲,指的是把一个复杂的问题分解成多个相似的子问题,然后再把子问题分解成更小的子问题,直到子问题简单到可以直接求解。理论上来讲,解决每一个问题都对应着一个任务,所以对于问题的分治,实际上就是对于任务的分治。
分治思想在很多领域都有广泛的应用,例如算法领域有分治算法(归并排序、快速排序都属于分治算法,二分法查找也是一种分治算法);大数据领域知名的计算框架 MapReduce 背后的思想也是分治。既然分治这种任务模型如此普遍,那 Java 显然也需要支持,Java 并发包里提供了一种叫做 Fork/Join 的并行计算框架,就是用来支持分治这种任务模型的。
分治任务模型
分治任务模型可分为两个阶段:一个阶段是任务分解,也就是将任务迭代地分解为子任务,直至子任务可以直接计算出结果;另一个阶段是结果合并,即逐层合并子任务的执行结果,直至获得最终结果。下图是一个简化的分治任务模型图,你可以对照着理解。

在这个分治任务模型里,任务和分解后的子任务具有相似性,这种相似性往往体现在任务和子任务的算法是相同的,但是计算的数据规模是不同的。具备这种相似性的问题,我们往往都采用递归算法。
Fork/Join 的使用
Fork/Join 是一个并行计算的框架,主要就是用来支持分治任务模型的,这个计算框架里的 Fork 对应的是分治任务模型里的任务分解,Join 对应的是结果合并。Fork/Join 计算框架主要包含两部分,一部分是分治任务的线程池 ForkJoinPool,另一部分是分治任务 ForkJoinTask。这两部分的关系类似于 ThreadPoolExecutor 和 Runnable 的关系,都可以理解为提交任务到线程池,只不过分治任务有自己独特类型 ForkJoinTask。
ForkJoinTask 是一个抽象类,它的方法有很多,最核心的是 fork() 方法和 join() 方法,其中 fork() 方法会异步地执行一个子任务,而 join() 方法则会阻塞当前线程来等待子任务的执行结果。ForkJoinTask 有两个子类——RecursiveAction 和 RecursiveTask,通过名字你就应该能知道,它们都是用递归的方式来处理分治任务的。这两个子类都定义了抽象方法 compute(),不过区别是 RecursiveAction 定义的 compute() 没有返回值,而 RecursiveTask 定义的 compute() 方法是有返回值的。这两个子类也是抽象类,在使用的时候,需要你定义子类去扩展。
接下来我们就来实现一下,看看如何用 Fork/Join 这个并行计算框架计算斐波那契数列(下面的代码源自 Java 官方示例)。首先我们需要创建一个分治任务线程池以及计算斐波那契数列的分治任务,之后通过调用分治任务线程池的 invoke() 方法来启动分治任务。由于计算斐波那契数列需要有返回值,所以 Fibonacci 继承自 RecursiveTask。分治任务 Fibonacci 需要实现 compute() 方法,这个方法里面的逻辑和普通计算斐波那契数列非常类似,区别之处在于计算 Fibonacci(n - 1) 使用了异步子任务,这是通过 f1.fork() 这条语句实现的。
static void main(String[] args){//创建分治任务线程池ForkJoinPool fjp =new ForkJoinPool(4);//创建分治任务Fibonacci fib =new Fibonacci(30);//启动分治任务Integer result =fjp.invoke(fib);//输出结果System.out.println(result);}//递归任务static class Fibonacci extendsRecursiveTask<Integer>{final int n;Fibonacci(int n){this.n = n;}protected Integer compute(){if (n <= 1)return n;Fibonacci f1 =new Fibonacci(n - 1);//创建子任务f1.fork();Fibonacci f2 =new Fibonacci(n - 2);//等待子任务结果,并合并结果return f2.compute() + f1.join();}}
ForkJoinPool 工作原理
Fork/Join 并行计算的核心组件是 ForkJoinPool
ThreadPoolExecutor 本质上是一个生产者 - 消费者模式的实现,内部有一个任务队列,这个任务队列是生产者和消费者通信的媒介;ThreadPoolExecutor 可以有多个工作线程,但是这些工作线程都共享一个任务队列。
ForkJoinPool 本质上也是一个生产者 - 消费者的实现,但是更加智能,你可以参考下面的 ForkJoinPool 工作原理图来理解其原理。ThreadPoolExecutor 内部只有一个任务队列,而 ForkJoinPool 内部有多个任务队列,当我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个任务队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队列中。
如果工作线程对应的任务队列空了,是不是就没活儿干了呢?不是的,ForkJoinPool 支持一种叫做“任务窃取”的机制,如果工作线程空闲了,那它可以“窃取”其他工作任务队列里的任务,例如下图中,线程 T2 对应的任务队列已经空了,它可以“窃取”线程 T1 对应的任务队列的任务。如此一来,所有的工作线程都不会闲下来了。
ForkJoinPool 中的任务队列采用的是双端队列,工作线程正常获取任务和“窃取任务”分别是从任务队列不同的端消费,这样能避免很多不必要的数据竞争。我们这里介绍的仅仅是简化后的原理,ForkJoinPool 的实现远比我们这里介绍的复杂,如果你感兴趣,建议去看它的源码。
模拟 MapReduce 统计单词数量
学习 MapReduce 有一个入门程序,统计一个文件里面每个单词的数量,下面我们来看看如何用 Fork/Join 并行计算框架来实现。我们可以先用二分法递归地将一个文件拆分成更小的文件,直到文件里只有一行数据,然后统计这一行数据里单词的数量,最后再逐级汇总结果,你可以对照前面的简版分治任务模型图来理解这个过程。思路有了,我们马上来实现。下面的示例程序用一个字符串数组 String[] fc 来模拟文件内容,fc 里面的元素与文件里面的行数据一一对应。关键的代码在 compute() 这个方法里面,这是一个递归方法,前半部分数据 fork 一个递归任务去处理(关键代码 mr1.fork()),后半部分数据则在当前任务中递归处理(mr2.compute())。
static void main(String[] args){String[] fc = {"hello world","hello me","hello fork","hello join","fork join in world"};//创建ForkJoin线程池ForkJoinPool fjp =new ForkJoinPool(3);//创建任务MR mr = new MR(fc, 0, fc.length);//启动任务Map<String, Long> result =fjp.invoke(mr);//输出结果result.forEach((k, v)->System.out.println(k+":"+v));}//MR模拟类static class MR extendsRecursiveTask<Map<String, Long>> {private String[] fc;private int start, end;//构造函数MR(String[] fc, int fr, int to){this.fc = fc;this.start = fr;this.end = to;}@Override protectedMap<String, Long> compute(){if (end - start == 1) {return calc(fc[start]);} else {int mid = (start+end)/2;MR mr1 = new MR(fc, start, mid);mr1.fork();MR mr2 = new MR(fc, mid, end);//计算子任务,并返回合并的结果return merge(mr2.compute(),mr1.join());}}//合并结果private Map<String, Long> merge(Map<String, Long> r1,Map<String, Long> r2) {Map<String, Long> result =new HashMap<>();result.putAll(r1);//合并结果r2.forEach((k, v) -> {Long c = result.get(k);if (c != null)result.put(k, c+v);elseresult.put(k, v);});return result;}//统计单词数量private Map<String, Long>calc(String line) {Map<String, Long> result =new HashMap<>();//分割单词String [] words =line.split("\\s+");//统计单词数量for (String w : words) {Long v = result.get(w);if (v != null)result.put(w, v+1);elseresult.put(w, 1L);}return result;}}
总结:
Fork/Join 并行计算框架主要解决的是分治任务。分治的核心思想是“分而治之”:将一个大的任务拆分成小的子任务去解决,然后再把子任务的结果聚合起来从而得到最终结果。这个过程非常类似于大数据处理中的 MapReduce,所以你可以把 Fork/Join 看作单机版的 MapReduce。
Fork/Join 并行计算框架的核心组件是 ForkJoinPool。ForkJoinPool 支持任务窃取机制,能够让所有线程的工作量基本均衡,不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好。Java 1.8 提供的 Stream API 里面并行流也是以 ForkJoinPool 为基础的。不过需要你注意的是,默认情况下所有的并行流计算都共享一个 ForkJoinPool,这个共享的 ForkJoinPool 默认的线程数是 CPU 的核数;如果所有的并行流计算都是 CPU 密集型计算的话,完全没有问题,但是如果存在 I/O 密集型的并行流计算,那么很可能会因为一个很慢的 I/O 计算而拖慢整个系统的性能。所以建议用不同的 ForkJoinPool 执行不同类型的计算任务。
