官方文档:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

1 示例

Demo1:验证CompletableFuture的阻塞和异步

  1. public class CompletableFutureTest {
  2. public static void main(String[] args) {
  3. Long start = System.currentTimeMillis();
  4. // new CompletableFutureTest().futureBlock();
  5. Long end = System.currentTimeMillis();
  6. // System.out.println("futhre阻塞耗时:" + (end - start));
  7. Long start2 = System.currentTimeMillis();
  8. new CompletableFutureTest().futhreSync();
  9. Long end2 = System.currentTimeMillis();
  10. System.out.println("futhre异步耗时:" + (end2 - start2));
  11. }
  12. public void futureBlock() {
  13. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10,
  14. 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10000),
  15. new ThreadFactoryBuilder().setNameFormat("checkExcelImportProductInfoList-pool-%d").build(),
  16. new ThreadPoolExecutor.CallerRunsPolicy());
  17. List<CompletableFuture> list = new ArrayList();
  18. for (int i = 0; i < 10; i++) {
  19. int finalI = i;
  20. CompletableFuture future = CompletableFuture.supplyAsync(() -> this.toDo(finalI), threadPoolExecutor).exceptionally(e -> {
  21. e.printStackTrace();
  22. return null;
  23. });
  24. list.add(future);
  25. }
  26. list.stream().forEach(m -> {
  27. try {
  28. Map map = (Map) m.get();//阻塞
  29. next(map);
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. } catch (ExecutionException e) {
  33. e.printStackTrace();
  34. }
  35. });
  36. threadPoolExecutor.shutdown();
  37. }
  38. public void futhreSync() {
  39. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10,
  40. 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10000),
  41. new ThreadFactoryBuilder().setNameFormat("checkExcelImportProductInfoList-pool-%d").build(),
  42. new ThreadPoolExecutor.CallerRunsPolicy());
  43. for (int i = 0; i < 10; i++) {
  44. int finalI = i;
  45. CompletableFuture future = CompletableFuture.supplyAsync(() -> this.toDo(finalI), threadPoolExecutor).thenApply(map -> {
  46. next(map);
  47. return null;
  48. }).exceptionally(e -> {
  49. e.printStackTrace();
  50. return null;
  51. });
  52. }
  53. threadPoolExecutor.shutdown();
  54. }
  55. public Map toDo(int i) {
  56. if (i == 3 ) {
  57. try {
  58. Thread.sleep(10000);
  59. } catch (InterruptedException e) {
  60. e.printStackTrace();
  61. }
  62. }
  63. if (i == 6 ) {
  64. try {
  65. Thread.sleep(5000);
  66. } catch (InterruptedException e) {
  67. e.printStackTrace();
  68. }
  69. }
  70. Map map = new HashMap();
  71. map.put(i, Thread.currentThread().getName());
  72. return map;
  73. }
  74. public void next(Map map) {
  75. Iterator i = map.keySet().iterator();
  76. while (i.hasNext()) {
  77. Integer integer = (Integer) i.next();
  78. System.out.println("key:" + integer + " ;value:" + map.get(integer));
  79. }
  80. }
  81. }

运行阻塞方法futureBlock时,输出:

  1. key:0 ;value:checkExcelImportProductInfoList-pool-0
  2. key:1 ;value:checkExcelImportProductInfoList-pool-1
  3. key:2 ;value:checkExcelImportProductInfoList-pool-2
  4. key:3 ;value:checkExcelImportProductInfoList-pool-3
  5. key:4 ;value:checkExcelImportProductInfoList-pool-4
  6. key:5 ;value:checkExcelImportProductInfoList-pool-0
  7. key:6 ;value:checkExcelImportProductInfoList-pool-0
  8. key:7 ;value:checkExcelImportProductInfoList-pool-1
  9. key:8 ;value:checkExcelImportProductInfoList-pool-1
  10. key:9 ;value:checkExcelImportProductInfoList-pool-1
  11. futhre阻塞耗时:10082

在i=3时阻塞10s,导致其他线程无法进行下一步
image.png

运行异步futhreSync方法时,输出:

  1. key:0 ;value:checkExcelImportProductInfoList-pool-0
  2. key:1 ;value:checkExcelImportProductInfoList-pool-1
  3. futhre异步耗时:80
  4. key:5 ;value:checkExcelImportProductInfoList-pool-0
  5. key:7 ;value:checkExcelImportProductInfoList-pool-0
  6. key:8 ;value:checkExcelImportProductInfoList-pool-0
  7. key:9 ;value:checkExcelImportProductInfoList-pool-0
  8. key:2 ;value:checkExcelImportProductInfoList-pool-2
  9. key:4 ;value:checkExcelImportProductInfoList-pool-4
  10. key:6 ;value:checkExcelImportProductInfoList-pool-1
  11. key:3 ;value:checkExcelImportProductInfoList-pool-3

只有在i=3时阻塞10s,i=6时阻塞5s;
image.png

Demo2:验证CompletableFuture异步执行,并统计所有线程执行的结果

  1. package com.tools.threadDemo.CompletableFutureTest;
  2. import com.google.common.util.concurrent.ThreadFactoryBuilder;
  3. import java.util.*;
  4. import java.util.concurrent.*;
  5. public class CompletableFutureTest2 {
  6. private ExecutorService threadPoolExecutor;
  7. public static void main(String[] args) {
  8. List<CompletableFuture> list = new ArrayList<>();
  9. List<Map> resultList = new ArrayList<>();//模拟统计所有线程的结果
  10. for (int i = 0; i < 10; i++) {
  11. list.add(new CompletableFutureTest2().futhreSync(i, resultList));
  12. }
  13. CompletableFuture.allOf(list.toArray(new CompletableFuture[list.size()])).join();//待所有线程执行完成后再执行
  14. System.out.println("result>>>>>>>>>>>>" + resultList);
  15. }
  16. private synchronized void initClientInfoExecutor(int corePoolSize, int maximumPoolSize) {
  17. threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
  18. 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10000),
  19. new ThreadFactoryBuilder().setNameFormat("initClientInfoExecutor-pool-%d").build(),
  20. new ThreadPoolExecutor.CallerRunsPolicy());
  21. }
  22. public CompletableFuture futhreSync(int i, List<Map> mapList) {
  23. initClientInfoExecutor(8, 10);
  24. List<CompletableFuture> futureList = new ArrayList<>();
  25. int finalI = i;
  26. CompletableFuture future = CompletableFuture.supplyAsync(() -> this.toDo(finalI, mapList), threadPoolExecutor).thenApply(map -> {
  27. next(map);
  28. return map;
  29. }).exceptionally(e -> {
  30. e.printStackTrace();
  31. return null;
  32. });
  33. threadPoolExecutor.shutdown();
  34. return future;
  35. }
  36. /**
  37. * 多线程执行的任务
  38. */
  39. public Map toDo(int i, List<Map> mapList) {
  40. if (i == 3) {
  41. try {
  42. Thread.sleep(10000);
  43. } catch (InterruptedException e) {
  44. e.printStackTrace();
  45. }
  46. }
  47. if (i == 6) {
  48. try {
  49. Thread.sleep(5000);
  50. } catch (InterruptedException e) {
  51. e.printStackTrace();
  52. }
  53. }
  54. Map map = new HashMap();
  55. map.put(i, Thread.currentThread().getName());
  56. if (i % 2 == 0) {
  57. Map map1 = new HashMap();
  58. map1.put(i, Thread.currentThread().getName());
  59. mapList.add(map1);
  60. }
  61. return map;
  62. }
  63. /**
  64. * 必须依赖上一个线程执行完成后再执行
  65. */
  66. public void next(Map map) {
  67. Iterator i = map.keySet().iterator();
  68. while (i.hasNext()) {
  69. Integer integer = (Integer) i.next();
  70. System.out.println("key:" + integer + " ;value:" + map.get(integer));
  71. }
  72. }
  73. }

输出:

  1. key:0 ;value:initClientInfoExecutor-pool-0
  2. key:1 ;value:initClientInfoExecutor-pool-0
  3. key:2 ;value:initClientInfoExecutor-pool-0
  4. key:4 ;value:initClientInfoExecutor-pool-0
  5. key:5 ;value:initClientInfoExecutor-pool-0
  6. key:7 ;value:initClientInfoExecutor-pool-0
  7. key:8 ;value:initClientInfoExecutor-pool-0
  8. key:9 ;value:initClientInfoExecutor-pool-0
  9. key:6 ;value:initClientInfoExecutor-pool-0
  10. key:3 ;value:initClientInfoExecutor-pool-0
  11. result>>>>>>>>>>>>[{0=initClientInfoExecutor-pool-0}, {2=initClientInfoExecutor-pool-0}, {4=initClientInfoExecutor-pool-0}, {8=initClientInfoExecutor-pool-0}, {6=initClientInfoExecutor-pool-0}]

2 源码分析

2.1 andTree

andTree是allOf方法的底层实现,addTree的多个任务是各自独立并行执行,addTree将多个任务通过递归的方式两两组队,任一一个任务执行完成都会判断两两组队的两个任务是否都执行完了,如果是则触发上层的任务判断逻辑,直到最终所有任务都执行完了。

  1. public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
  2. return andTree(cfs, 0, cfs.length - 1);
  3. }
  1. //lo是起始数组索引,hi是终止数组索引
  2. static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs,
  3. int lo, int hi) {
  4. CompletableFuture<Void> d = new CompletableFuture<Void>();
  5. if (lo > hi) // 空
  6. d.result = NIL;
  7. else {
  8. CompletableFuture<?> a, b;
  9. int mid = (lo + hi) >>> 1;
  10. //通过递归,将多个任务转换成两两成对的,最底层的执行完了会触发上层
  11. //这里的触发只是任务是否执行完成的判断逻辑,各任务是各自独立并行执行
  12. if ((a = (lo == mid ? cfs[lo] :
  13. andTree(cfs, lo, mid))) == null ||
  14. (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
  15. andTree(cfs, mid+1, hi))) == null)
  16. throw new NullPointerException();
  17. //如果a,b有一个未执行完,则返回false
  18. if (!d.biRelay(a, b)) {
  19. //不满足完成条件,生成一个中继并压栈,再次尝试同步完成。若不满足条件,ab任何一个完成后都会再间接调用它的tryFire。
  20. BiRelay<?,?> c = new BiRelay<>(d, a, b);
  21. //除非ab均完成,否则bipush要进ab两者的栈。
  22. a.bipush(b, c);
  23. c.tryFire(SYNC);
  24. }
  25. }
  26. return d;
  27. }