使用 Future 获得异步执行结果时,要么调用阻塞方法 get(),要么轮询看 isDone() 是否为 true,这两种方法都不是很好,因为主线程也会被迫等待。
从 Java 8 开始引入了 CompletableFuture,它针对 Future 做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法
例如,获取股票价格:

  1. import java.util.concurrent.CompletableFuture;
  2. public class Main {
  3. public static void main(String[] args) throws Exception {
  4. // 创建异步执行任务:
  5. CompletableFuture<Double> cf = CompletableFuture.supplyAsync(Main::fetchPrice);
  6. // 如果执行成功:
  7. cf.thenAccept((result) -> {
  8. System.out.println("price: " + result);
  9. });
  10. // 如果执行异常:
  11. cf.exceptionally((e) -> {
  12. e.printStackTrace();
  13. return null;
  14. });
  15. // 主线程不要立刻结束,否则 CompletableFuture 默认使用的线程池会立刻关闭:
  16. Thread.sleep(200);
  17. }
  18. static Double fetchPrice() {
  19. try {
  20. Thread.sleep(100);
  21. } catch (InterruptedException e) {
  22. }
  23. if (Math.random() < 0.3) {
  24. throw new RuntimeException("fetch price failed!");
  25. }
  26. return 5 + Math.random() * 20;
  27. }
  28. }

创建一个 CompletableFuture 是通过 CompletableFuture.supplyAsync() 实现的,它需要一个实现了 Supplier 接口的对象(与 Comparable 类似):

  1. public interface Supplier<T> {
  2. T get();
  3. }

上面的写法用 lambda 语法简化了一下,直接传入 Main::fetchPrice,因为 Main.fetchPrice() 静态方法的签名符合 Supplier 接口的定义(除了方法名外),都没有传入参数。
紧接着,CompletableFuture 已经被提交给默认的线程池执行了,我们需要定义的是 CompletableFuture 完成时和异常时需要回调的实例。
完成时,CompletableFuture 会调用 Consumer 对象:

  1. public interface Consumer<T> {
  2. void accept(T t);
  3. }

异常时,CompletableFuture 会调用 Function 对象:

  1. public interface Function<T, R> {
  2. R apply(T t);
  3. }

在上述的例子中都用 lambda 语法简化了代码。
可见 CompletableFuture 的优点是:

  • 异步任务结束时,会自动回调某个对象的方法;
  • 异步任务出错时,会自动回调某个对象的方法;
  • 主线程设置好回调后,不再关心异步任务的执行。

如果只是实现了异步回调机制,我们还看不出 CompletableFuture 相比 Future 的优势。CompletableFuture 更强大的功能是,多个 CompletableFuture 可以串行执行,例如,定义两个 CompletableFuture,第一个 CompletableFuture 根据证券名称查询证券代码,第二个 CompletableFuture 根据证券代码查询证券价格,这两个 CompletableFuture 实现串行操作如下:
这里的串行是指,第一个任务完成之后,才能开始第二个任务。

  1. import java.util.concurrent.CompletableFuture;
  2. public class Main {
  3. public static void main(String[] args) throws Exception {
  4. // 第一个任务:
  5. CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
  6. return queryCode("中国石油");
  7. });
  8. // cfQuery 成功后继续执行下一个任务:
  9. CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
  10. return fetchPrice(code);
  11. });
  12. // cfFetch 成功后打印结果:
  13. cfFetch.thenAccept((result) -> {
  14. System.out.println("price: " + result);
  15. });
  16. // 主线程不要立刻结束,否则 CompletableFuture 默认使用的线程池会立刻关闭:
  17. Thread.sleep(1000);
  18. }
  19. static String queryCode(String name) {
  20. try {
  21. Thread.sleep(100);
  22. } catch (InterruptedException e) {
  23. }
  24. return "601857";
  25. }
  26. static Double fetchPrice(String code) {
  27. try {
  28. Thread.sleep(100);
  29. } catch (InterruptedException e) {
  30. }
  31. return 5 + Math.random() * 20;
  32. }
  33. }

除了串行执行外,多个 CompletableFuture 还可以并行执行。例如,我们考虑这样的场景:
同时从新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作:

  1. import java.util.concurrent.CompletableFuture;
  2. public class Main {
  3. public static void main(String[] args) throws Exception {
  4. // 两个CompletableFuture执行异步查询:
  5. CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {
  6. return queryCode("中国石油", "https://finance.sina.com.cn/code/");
  7. });
  8. CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {
  9. return queryCode("中国石油", "https://money.163.com/code/");
  10. });
  11. // 用anyOf合并为一个新的CompletableFuture:
  12. CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);
  13. // 两个CompletableFuture执行异步查询:
  14. CompletableFuture<String> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {
  15. return fetchPrice((String) code, "https://finance.sina.com.cn/price/");
  16. });
  17. CompletableFuture<String> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {
  18. return fetchPrice((String) code, "https://money.163.com/price/");
  19. });
  20. // 用anyOf合并为一个新的CompletableFuture:
  21. CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);
  22. // 最终结果:
  23. cfFetch.thenAccept((result) -> {
  24. System.out.println(result);
  25. });
  26. // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
  27. Thread.sleep(200);
  28. }
  29. static String queryCode(String name, String url) {
  30. System.out.println("query code from " + url + "...");
  31. try {
  32. Thread.sleep((long) (Math.random() * 100));
  33. } catch (InterruptedException e) {
  34. }
  35. return "601857";
  36. }
  37. static String fetchPrice(String code, String url) {
  38. System.out.println("query price from " + url + "...");
  39. try {
  40. Thread.sleep((long) (Math.random() * 100));
  41. } catch (InterruptedException e) {
  42. }
  43. return "from: " + url + " to: " + (5 + Math.random() * 20);
  44. }
  45. }

上述逻辑实现的异步查询规则实际上是:

  1. ┌─────────────┐ ┌─────────────┐
  2. Query Code Query Code
  3. from sina from 163
  4. └─────────────┘ └─────────────┘
  5. └───────┬───────┘
  6. ┌─────────────┐
  7. anyOf
  8. └─────────────┘
  9. ┌───────┴────────┐
  10. ┌─────────────┐ ┌─────────────┐
  11. Query Price Query Price
  12. from sina from 163
  13. └─────────────┘ └─────────────┘
  14. └────────┬───────┘
  15. ┌─────────────┐
  16. anyOf
  17. └─────────────┘
  18. ┌─────────────┐
  19. Display Price
  20. └─────────────┘

上述的例子中,使用 anyOf() 实现「任意个 CompletableFuture 只要一个成功」。还可以使用 allOf() 实现「所有 CompletableFuture 都必须成功」。
注意 CompletableFuture 的命名规则:

  • xxx() :表示该方法将继续在已有的线程中执行;
  • xxxAsync() :表示将异步在线程池中执行。

    小结

    CompletableFuture 可以指定异步处理流程:

  • thenAccept() 处理正常结果;

  • exceptional() 处理异常结果;
  • thenApplyAsync() 用于串行化另一个 CompletableFuture
  • anyOf()allOf() 用于并行化多个 CompletableFuture