自己的使用场景

DPS项目中对于某个组件的查询逻辑,会有今日的数据、同比的数据、环比的数据等,每类数据是异步请求的,需要等各种数据都查询结束后,再执行后续的逻辑。

  1. Optional<CompletableFuture<DataHolder>> selfFuture = querySelf(queryElement);
  2. Optional<CompletableFuture<DataHolder>> chainFuture = queryChainIf(queryContext, componentConfig, virtualTable);
  3. Optional<CompletableFuture<DataHolder>> yoyFuture = queryYoyIf(queryContext, componentConfig, virtualTable);
  4. // 每类数据是异步请求,等待10s,等待所有数据都返回后再执行其他逻辑
  5. CompletableFuture.allOf(Stream.of(selfFuture, chainFuture, yoyFuture).filter(Optional::isPresent)
  6. .map(Optional::get).toArray(CompletableFuture[]::new)).get(10, TimeUnit.SECONDS);
  7. // 执行其他逻辑
  8. xxxxxx

为什么有CompletableFuture?

使用Future获取异步执行结果时,一般使用其get()方法,或isDone()方法判断是否结束,但这种方法会导致主线程阻塞。

而CompletableFuture可以传递回调函数,当异步任务完成或异常时,可以执行回调函数,主线程不必等待。

  1. public class Main {
  2. public static void main(String[] args) throws Exception {
  3. // 创建异步执行任务: 这里会使用默认的线程池。
  4. CompletableFuture<Double> cf = CompletableFuture.supplyAsync(Main::fetchPrice);
  5. // 如果执行成功:
  6. cf.thenAccept((result) -> {
  7. System.out.println("price: " + result);
  8. });
  9. // 如果执行异常:
  10. cf.exceptionally((e) -> {
  11. e.printStackTrace();
  12. return null;
  13. });
  14. // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
  15. Thread.sleep(200);
  16. }
  17. static Double fetchPrice() {
  18. try {
  19. Thread.sleep(100);
  20. } catch (InterruptedException e) {
  21. }
  22. if (Math.random() < 0.3) {
  23. throw new RuntimeException("fetch price failed!");
  24. }
  25. return 5 + Math.random() * 20;
  26. }
  27. }

可见CompletableFuture的优点是:

  • 异步任务结束时,会自动回调某个对象的方法;
  • 异步任务出错时,会自动回调某个对象的方法;
  • 主线程设置好回调后,不再关心异步任务的执行。

    如何提交一个返回CompletableFuture的任务到线程池

    一般我们提交任务到线程池,是调用线程池的submit方法,然后会返回Future,再通过Future get结果。

而CompletableFuture提供了相反的创建逻辑,CompletableFuture可以作为主对象提交任务到线程池。

  1. CompletableFuture.supplyAsync(() -> {
  2. return queryCode("中国石油");
  3. }, xxxExecutor);

这种写法无需将真正执行的代码类 实现 Runnable/Callable接口,非常简单,只关注业务逻辑

thenApplyAsync - 多个CompletableFuture串行执行

如果只是实现了异步回调机制,我们还看不出CompletableFuture相比Future的优势。CompletableFuture更强大的功能是,多个CompletableFuture可以串行执行,例如,定义两个CompletableFuture,第一个CompletableFuture根据证券名称查询证券代码,第二个CompletableFuture根据证券代码查询证券价格,这两个CompletableFuture实现串行操作如下:
// CompletableFuture
import java.util.concurrent.CompletableFuture;

  1. public class Main {
  2. public static void main(String[] args) throws Exception {
  3. // 第一个任务:
  4. CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
  5. return queryCode("中国石油");
  6. });
  7. // cfQuery成功后继续执行下一个任务:
  8. CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
  9. return fetchPrice(code);
  10. });
  11. // cfFetch成功后打印结果:
  12. cfFetch.thenAccept((result) -> {
  13. System.out.println("price: " + result);
  14. });
  15. // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
  16. Thread.sleep(2000);
  17. }
  18. static String queryCode(String name) {
  19. try {
  20. Thread.sleep(100);
  21. } catch (InterruptedException e) {
  22. }
  23. return "601857";
  24. }
  25. static Double fetchPrice(String code) {
  26. try {
  27. Thread.sleep(100);
  28. } catch (InterruptedException e) {
  29. }
  30. return 5 + Math.random() * 20;
  31. }
  32. }

Run

anyOf - 多个 CompletableFuture并行执行

除了串行执行外,多个CompletableFuture还可以并行执行。例如,我们考虑这样的场景:
同时从新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作:
// CompletableFuture
import java.util.concurrent.CompletableFuture;

  1. public class Main {
  2. public static void main(String[] args) throws Exception {
  3. // 两个CompletableFuture执行异步查询:
  4. CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {
  5. return queryCode("中国石油", "https://finance.sina.com.cn/code/");
  6. });
  7. CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {
  8. return queryCode("中国石油", "https://money.163.com/code/");
  9. });
  10. // 用anyOf合并为一个新的CompletableFuture:
  11. CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);
  12. // 两个CompletableFuture执行异步查询:
  13. CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {
  14. return fetchPrice((String) code, "https://finance.sina.com.cn/price/");
  15. });
  16. CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {
  17. return fetchPrice((String) code, "https://money.163.com/price/");
  18. });
  19. // 用anyOf合并为一个新的CompletableFuture:
  20. CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);
  21. // 最终结果:
  22. cfFetch.thenAccept((result) -> {
  23. System.out.println("price: " + result);
  24. });
  25. // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
  26. Thread.sleep(200);
  27. }
  28. static String queryCode(String name, String url) {
  29. System.out.println("query code from " + url + "...");
  30. try {
  31. Thread.sleep((long) (Math.random() * 100));
  32. } catch (InterruptedException e) {
  33. }
  34. return "601857";
  35. }
  36. static Double fetchPrice(String code, String url) {
  37. System.out.println("query price from " + url + "...");
  38. try {
  39. Thread.sleep((long) (Math.random() * 100));
  40. } catch (InterruptedException e) {
  41. }
  42. return 5 + Math.random() * 20;
  43. }
  44. }

Run
上述逻辑实现的异步查询规则实际上是:

  1. ┌─────────────┐ ┌─────────────┐
  2. Query Code Query Code
  3. from sina from 163
  4. └─────────────┘ └─────────────┘
  5. └───────┬───────┘
  6. ┌─────────────┐
  7. anyOf
  8. └─────────────┘
  9. ┌───────┴────────┐
  10. ┌─────────────┐ ┌─────────────┐
  11. Query Price Query Price
  12. from sina from 163
  13. └─────────────┘ └─────────────┘
  14. └────────┬───────┘
  15. ┌─────────────┐
  16. anyOf
  17. └─────────────┘
  18. ┌─────────────┐
  19. Display Price
  20. └─────────────┘

allOf - 实现所有CompletableFuture都必须成功

除了anyOf()可以实现“任意个CompletableFuture只要一个成功”,allOf()可以实现“所有CompletableFuture都必须成功”,这些组合操作可以实现非常复杂的异步流程控制。

使用场景

有三个线程,必须等待三个线程全部结束时,主线程才继续往下走。并且设置各子线程等待时间最长10s

  1. CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3)
  2. .get(10, TimeUnit.SECOND);

以上写法,
(1)allOf会返回一个新的CompletableFuture,只有当其下每个CompletableFuture都结束时,它才结束。
(2)不需要在每个CompletableFuture上都写.get(10, TimeUnit.SECOND),写法上也简便了很多。

最后我们注意CompletableFuture的命名规则:

  • xxx():表示该方法将继续在已有的线程中执行;
  • xxxAsync():表示将异步在线程池中执行。