image.png

前言

话说有一天,产品经理突然找到正在摸鱼的你。

产品:『我们要加一个聚合搜索功能,当用户在我们网站查询一件商品时,我们分别从 A、B、C 三个网站上查询这个信息,然后再把得到的结果返回给用户』

你:『哦,就是写个爬虫,从 3 个网站上抓取数据是吧?』

产品:『呸,爬虫是犯法的,这叫数据分析,怎么样,能实现吧?』

你:『可以』

产品:『好的,明天上线』

你:『。。。』

Code 1.0

你很快完成了开发,代码如下:

  1. /*
  2. *
  3. * * *
  4. * * * blog.coder4j.cn
  5. * * * Copyright (C) B0A6-B0B0 All Rights Reserved.
  6. * *
  7. *
  8. */
  9. package cn.coder4j.study.example.thread;
  10. import cn.hutool.core.thread.ThreadUtil;
  11. import com.google.common.collect.Lists;
  12. import java.util.List;
  13. /**
  14. * @author buhao
  15. * @version TestCompletionService.java, v 0.A B0B0-0B-A8 A9:0C buhao
  16. */
  17. public class TestCompletionService {
  18. public static void main(String[] args) {
  19. // 查询信息
  20. String queryName = "java";
  21. // 调用查询接口
  22. long startTime = System.currentTimeMillis();
  23. List<String> result = queryInfoCode1(queryName);
  24. System.out.println("耗时: " + (System.currentTimeMillis() - startTime));
  25. System.out.println(result);
  26. }
  27. /**
  28. * 聚合查询信息 code 1
  29. *
  30. * @param queryName
  31. * @return
  32. */
  33. private static List<String> queryInfoCode1(String queryName) {
  34. List<String> resultList = Lists.newArrayList();
  35. String webA = searchWebA(queryName);
  36. resultList.add(webA);
  37. String webB = searchWebB(queryName);
  38. resultList.add(webB);
  39. String webC = searchWebC(queryName);
  40. resultList.add(webC);
  41. return resultList;
  42. }
  43. /**
  44. * 查询网站 A
  45. *
  46. * @param name
  47. * @return
  48. */
  49. public static String searchWebA(String name) {
  50. ThreadUtil.sleep(5000);
  51. return "webA";
  52. }
  53. /**
  54. * 查询网站B
  55. *
  56. * @param name
  57. * @return
  58. */
  59. public static String searchWebB(String name) {
  60. ThreadUtil.sleep(3000);
  61. return "webB";
  62. }
  63. /**
  64. * 查询网站C
  65. *
  66. * @param name
  67. * @return
  68. */
  69. public static String searchWebC(String name) {
  70. ThreadUtil.sleep(500);
  71. return "webC";
  72. }
  73. }

你运行了一下代码,结果如下:

  1. 耗时: 8512
  2. [webA, webB, webC]

我去,怎么请求一下要8秒多?上线了,产品还不砍死我。

debug 了一下代码,发现问题出在了请求的网站上:

  1. /**
  2. * 查询网站 A
  3. *
  4. * @param name
  5. * @return
  6. */
  7. public static String searchWebA(String name) {
  8. ThreadUtil.sleep(5000);
  9. return "webA";
  10. }
  11. /**
  12. * 查询网站B
  13. *
  14. * @param name
  15. * @return
  16. */
  17. public static String searchWebB(String name) {
  18. ThreadUtil.sleep(3000);
  19. return "webB";
  20. }
  21. /**
  22. * 查询网站C
  23. *
  24. * @param name
  25. * @return
  26. */
  27. public static String searchWebC(String name) {
  28. ThreadUtil.sleep(500);
  29. return "webC";
  30. }

网站 A、网站 B 因为年久失修,没人维护,接口响应很慢,平均响应时间一个是 5秒,一个是 3秒(这里使用 sleep 模拟)。网站 C 性能还可以,平均响应时间 0.5 秒。 而我们程序的执行时间就是 网站A 响应时间 + 网站 B 响应时间 + 网站 C 响应时间。

Code 2.0

好了,问题知道了,因为请求的网站太慢了,那么如何解决呢?总不能打电话找他们把网站优化一下让我爬吧。书上教导我们要先从自己身上找问题。先看看自己代码哪里可以优化。

一分析代码发现,我们的代码全是串行化, A 网站请求完,再请求 B 网站,B 网站请求完再请求 C 网站。突然想到提高效率的第一要义,提高代码的并行率。为什么要一个一个串行请求,而不是 A、B、C 三个网站一起请求呢,Java 的多线程很轻松就可以实现,代码如下:

  1. /*
  2. *
  3. * * *
  4. * * * blog.coder4j.cn
  5. * * * Copyright (C) B0A6-B0B0 All Rights Reserved.
  6. * *
  7. *
  8. */
  9. package cn.coder4j.study.example.thread;
  10. import cn.hutool.core.thread.ThreadUtil;
  11. import com.google.common.collect.Lists;
  12. import java.util.List;
  13. import java.util.concurrent.ExecutionException;
  14. import java.util.concurrent.ExecutorService;
  15. import java.util.concurrent.Executors;
  16. import java.util.concurrent.Future;
  17. /**
  18. * @author buhao
  19. * @version TestCompletionService.java, v 0.A B0B0-0B-A8 A9:0C buhao
  20. */
  21. public class TestCompletionService {
  22. public static void main(String[] args) throws ExecutionException, InterruptedException {
  23. // 查询信息
  24. String queryName = "java";
  25. // 调用查询接口
  26. long startTime = System.currentTimeMillis();
  27. List<String> result = queryInfoCode2(queryName);
  28. System.out.println("耗时: " + (System.currentTimeMillis() - startTime));
  29. System.out.println(result);
  30. }
  31. /**
  32. * 聚合查询信息 code 1
  33. *
  34. * @param queryName
  35. * @return
  36. */
  37. private static List<String> queryInfoCode1(String queryName) {
  38. List<String> resultList = Lists.newArrayList();
  39. String webA = searchWebA(queryName);
  40. resultList.add(webA);
  41. String webB = searchWebB(queryName);
  42. resultList.add(webB);
  43. String webC = searchWebC(queryName);
  44. resultList.add(webC);
  45. return resultList;
  46. }
  47. /**
  48. * 聚合查询信息 code 2
  49. *
  50. * @param queryName
  51. * @return
  52. */
  53. private static List<String> queryInfoCode2(String queryName) throws ExecutionException, InterruptedException {
  54. List<String> resultList = Lists.newArrayList();
  55. // 创建3个线程的线程池
  56. ExecutorService pool = Executors.newFixedThreadPool(3);
  57. try {
  58. // 创建任务的 feature
  59. Future<String> webAFuture = pool.submit(() -> searchWebA(queryName));
  60. Future<String> webBFuture = pool.submit(() -> searchWebB(queryName));
  61. Future<String> webCFuture = pool.submit(() -> searchWebC(queryName));
  62. // 得到任务结果
  63. resultList.add(webAFuture.get());
  64. resultList.add(webBFuture.get());
  65. resultList.add(webCFuture.get());
  66. } finally {
  67. // 关闭线程池
  68. pool.shutdown();
  69. }
  70. return resultList;
  71. }
  72. /**
  73. * 查询网站 A
  74. *
  75. * @param name
  76. * @return
  77. */
  78. public static String searchWebA(String name) {
  79. ThreadUtil.sleep(5000);
  80. return "webA";
  81. }
  82. /**
  83. * 查询网站B
  84. *
  85. * @param name
  86. * @return
  87. */
  88. public static String searchWebB(String name) {
  89. ThreadUtil.sleep(3000);
  90. return "webB";
  91. }
  92. /**
  93. * 查询网站C
  94. *
  95. * @param name
  96. * @return
  97. */
  98. public static String searchWebC(String name) {
  99. ThreadUtil.sleep(500);
  100. return "webC";
  101. }
  102. }

这里的重点代码如下:

  1. /**
  2. * 聚合查询信息 code 2
  3. *
  4. * @param queryName
  5. * @return
  6. */
  7. private static List<String> queryInfoCode2(String queryName) throws ExecutionException, InterruptedException {
  8. List<String> resultList = Lists.newArrayList();
  9. // 创建3个线程的线程池
  10. ExecutorService pool = Executors.newFixedThreadPool(3);
  11. try {
  12. // 创建任务的 feature
  13. Future<String> webAFuture = pool.submit(() -> searchWebA(queryName));
  14. Future<String> webBFuture = pool.submit(() -> searchWebB(queryName));
  15. Future<String> webCFuture = pool.submit(() -> searchWebC(queryName));
  16. // 得到任务结果
  17. resultList.add(webAFuture.get());
  18. resultList.add(webBFuture.get());
  19. resultList.add(webCFuture.get());
  20. } finally {
  21. // 关闭线程池
  22. pool.shutdown();
  23. }
  24. return resultList;
  25. }

请求网站的代码其实一行没变,变的是我们调用请求方法的地方,把之前串行的代码,变成了多线程的形式,而且还不是普通的多线程的形式,因为我们要在主线程获得线程的结果,所以还要使用 Future 的形式。(这里可以参考之前的文章【并发那些事】创建线程的三种方式)。

好的运行一下代码,看看效果,结果如下:

  1. 耗时: 5058
  2. [webA, webB, webC]

嗯,效果明显,从 8 秒多下降到了 5 秒多,但是还是很长,没法接受的长。做为一个有追求的程序员,还要去优化。我们分析一下,刚开始代码是串行的,流程如下,总请求时间是三次请求的总时长。

image.png
然后我们优化了一下,把串行请求给并行化,流程如下:
image.png
因为是并行化,类似木桶效应,决定最长时间的因素,是你请求中最耗时的的那个操作,这里是时间为 5 秒的请求 A 网站操作。

Code 3.0

其实分析到这里,在不能优化 AB 网站的请求时间的前提下,已经很难优化了。但是方法总比困难多,我们的确没办法再去压缩总请求时间,但是可以让用户体验更好一点,这里需要引入两个技术一个是 Websocket,一个是 CompletionService。其中websocket 可以简单的理解成服务端推送技术,就是不需要客户端主动请求,而是通过服务端主动推送消息(ws 在本文中不是重点,会一笔带过,具体实现可以参考前文【websocket】spring boot 集成 websocket 的四种方式),下面我们直接上代码

  1. /*
  2. *
  3. * * *
  4. * * * blog.coder4j.cn
  5. * * * Copyright (C) B0A6-B0B0 All Rights Reserved.
  6. * *
  7. *
  8. */
  9. package cn.coder4j.study.example.thread;
  10. import cn.hutool.core.thread.ThreadUtil;
  11. import com.google.common.collect.Lists;
  12. import java.util.List;
  13. import java.util.concurrent.ExecutionException;
  14. import java.util.concurrent.ExecutorCompletionService;
  15. import java.util.concurrent.ExecutorService;
  16. import java.util.concurrent.Executors;
  17. import java.util.concurrent.Future;
  18. /**
  19. * @author buhao
  20. * @version TestCompletionService.java, v 0.A B0B0-0B-A8 A9:0C buhao
  21. */
  22. public class TestCompletionService {
  23. public static void main(String[] args) throws ExecutionException, InterruptedException {
  24. // 查询信息
  25. String queryName = "java";
  26. // 调用查询接口
  27. long startTime = System.currentTimeMillis();
  28. queryInfoCode3(queryName);
  29. System.out.println("耗时: " + (System.currentTimeMillis() - startTime));
  30. }
  31. /**
  32. * 聚合查询信息 code 1
  33. *
  34. * @param queryName
  35. * @return
  36. */
  37. private static List<String> queryInfoCode1(String queryName) {
  38. List<String> resultList = Lists.newArrayList();
  39. String webA = searchWebA(queryName);
  40. resultList.add(webA);
  41. String webB = searchWebB(queryName);
  42. resultList.add(webB);
  43. String webC = searchWebC(queryName);
  44. resultList.add(webC);
  45. return resultList;
  46. }
  47. /**
  48. * 聚合查询信息 code 2
  49. *
  50. * @param queryName
  51. * @return
  52. */
  53. private static List<String> queryInfoCode2(String queryName) throws ExecutionException, InterruptedException {
  54. List<String> resultList = Lists.newArrayList();
  55. // 创建3个线程的线程池
  56. ExecutorService pool = Executors.newFixedThreadPool(3);
  57. try {
  58. // 创建任务的 feature
  59. Future<String> webAFuture = pool.submit(() -> searchWebA(queryName));
  60. Future<String> webBFuture = pool.submit(() -> searchWebB(queryName));
  61. Future<String> webCFuture = pool.submit(() -> searchWebC(queryName));
  62. // 得到任务结果
  63. resultList.add(webAFuture.get());
  64. resultList.add(webBFuture.get());
  65. resultList.add(webCFuture.get());
  66. } finally {
  67. // 关闭线程池
  68. pool.shutdown();
  69. }
  70. return resultList;
  71. }
  72. /**
  73. * 聚合查询信息 code 3
  74. *
  75. * @param queryName
  76. * @return
  77. */
  78. private static void queryInfoCode3(String queryName) throws ExecutionException, InterruptedException {
  79. // 开始时间
  80. long startTime = System.currentTimeMillis();
  81. // 创建 CompletionService
  82. ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(Executors.newFixedThreadPool(3));
  83. // 创建任务的 feature
  84. executorCompletionService.submit(() -> searchWebA(queryName));
  85. executorCompletionService.submit(() -> searchWebB(queryName));
  86. executorCompletionService.submit(() -> searchWebC(queryName));
  87. for (int i = 0; i < 3; i++) {
  88. Future take = executorCompletionService.take();
  89. System.out.println("获得请求结果 -> " + take.get());
  90. System.out.println("通过 ws 推送给客户端,总共耗时" + (System.currentTimeMillis() - startTime));
  91. }
  92. }
  93. /**
  94. * 查询网站 A
  95. *
  96. * @param name
  97. * @return
  98. */
  99. public static String searchWebA(String name) {
  100. ThreadUtil.sleep(5000);
  101. return "webA";
  102. }
  103. /**
  104. * 查询网站B
  105. *
  106. * @param name
  107. * @return
  108. */
  109. public static String searchWebB(String name) {
  110. ThreadUtil.sleep(3000);
  111. return "webB";
  112. }
  113. /**
  114. * 查询网站C
  115. *
  116. * @param name
  117. * @return
  118. */
  119. public static String searchWebC(String name) {
  120. ThreadUtil.sleep(500);
  121. return "webC";
  122. }
  123. }

核心代码如下:

  1. /**
  2. * 聚合查询信息 code 3
  3. *
  4. * @param queryName
  5. * @return
  6. */
  7. private static void queryInfoCode3(String queryName) throws ExecutionException, InterruptedException {
  8. // 开始时间
  9. long startTime = System.currentTimeMillis();
  10. // 创建 CompletionService
  11. ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(Executors.newFixedThreadPool(3));
  12. // 创建任务的 feature
  13. executorCompletionService.submit(() -> searchWebA(queryName));
  14. executorCompletionService.submit(() -> searchWebB(queryName));
  15. executorCompletionService.submit(() -> searchWebC(queryName));
  16. for (int i = 0; i < 3; i++) {
  17. Future take = executorCompletionService.take();
  18. System.out.println("获得请求结果 -> " + take.get());
  19. System.out.println("通过 ws 推送给客户端,总共耗时" + (System.currentTimeMillis() - startTime));
  20. }
  21. }

先看执行结果:

  1. 获得请求结果 -> webC
  2. 通过 ws 推送给客户端,总共耗时561
  3. 获得请求结果 -> webB
  4. 通过 ws 推送给客户端,总共耗时3055
  5. 获得请求结果 -> webA
  6. 通过 ws 推送给客户端,总共耗时5060
  7. 耗时: 5060

我们来分析一下执行结果,首先总耗时时间还是 5 秒多没变,但是我们不是等全部执行完再推送给客户端,而是执行完一个就推送一个,并且发现了一个规律,最先推送的是请求最快的,然后是第二快的,最后推最慢的那一个。也就是说推送结果是有序的。给用户的体验就是点击按钮后,1秒内会展示网站 C 的数据,然后过了2秒又在原有基础上又添加导示了网站 B 数据,又过了2秒,又增加展示了网站 A数据。 这种体验要比用户一直白屏 5 秒,然后一下返回所有数据要好的多。

是不是很神奇,这背后的功臣就是 CompletionService,他的源码如下:

  1. package java.util.concurrent;
  2. /**
  3. * A service that decouples the production of new asynchronous tasks
  4. * from the consumption of the results of completed tasks. Producers
  5. * {@code submit} tasks for execution. Consumers {@code take}
  6. * completed tasks and process their results in the order they
  7. * complete. A {@code CompletionService} can for example be used to
  8. * manage asynchronous I/O, in which tasks that perform reads are
  9. * submitted in one part of a program or system, and then acted upon
  10. * in a different part of the program when the reads complete,
  11. * possibly in a different order than they were requested.
  12. *
  13. * <p>Typically, a {@code CompletionService} relies on a separate
  14. * {@link Executor} to actually execute the tasks, in which case the
  15. * {@code CompletionService} only manages an internal completion
  16. * queue. The {@link ExecutorCompletionService} class provides an
  17. * implementation of this approach.
  18. *
  19. * <p>Memory consistency effects: Actions in a thread prior to
  20. * submitting a task to a {@code CompletionService}
  21. * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
  22. * actions taken by that task, which in turn <i>happen-before</i>
  23. * actions following a successful return from the corresponding {@code take()}.
  24. */
  25. public interface CompletionService<V> {
  26. /**
  27. * Submits a value-returning task for execution and returns a Future
  28. * representing the pending results of the task. Upon completion,
  29. * this task may be taken or polled.
  30. *
  31. * @param task the task to submit
  32. * @return a Future representing pending completion of the task
  33. * @throws RejectedExecutionException if the task cannot be
  34. * scheduled for execution
  35. * @throws NullPointerException if the task is null
  36. */
  37. Future<V> submit(Callable<V> task);
  38. /**
  39. * Submits a Runnable task for execution and returns a Future
  40. * representing that task. Upon completion, this task may be
  41. * taken or polled.
  42. *
  43. * @param task the task to submit
  44. * @param result the result to return upon successful completion
  45. * @return a Future representing pending completion of the task,
  46. * and whose {@code get()} method will return the given
  47. * result value upon completion
  48. * @throws RejectedExecutionException if the task cannot be
  49. * scheduled for execution
  50. * @throws NullPointerException if the task is null
  51. */
  52. Future<V> submit(Runnable task, V result);
  53. /**
  54. * Retrieves and removes the Future representing the next
  55. * completed task, waiting if none are yet present.
  56. *
  57. * @return the Future representing the next completed task
  58. * @throws InterruptedException if interrupted while waiting
  59. */
  60. Future<V> take() throws InterruptedException;
  61. /**
  62. * Retrieves and removes the Future representing the next
  63. * completed task, or {@code null} if none are present.
  64. *
  65. * @return the Future representing the next completed task, or
  66. * {@code null} if none are present
  67. */
  68. Future<V> poll();
  69. /**
  70. * Retrieves and removes the Future representing the next
  71. * completed task, waiting if necessary up to the specified wait
  72. * time if none are yet present.
  73. *
  74. * @param timeout how long to wait before giving up, in units of
  75. * {@code unit}
  76. * @param unit a {@code TimeUnit} determining how to interpret the
  77. * {@code timeout} parameter
  78. * @return the Future representing the next completed task or
  79. * {@code null} if the specified waiting time elapses
  80. * before one is present
  81. * @throws InterruptedException if interrupted while waiting
  82. */
  83. Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
  84. }

可以看到 CompletionService 方法,分别如下:

  1. Future submit(Callable task);

    submit 用于提交一个 Callable 对象,用于提交一个可以获得结果的线程任务

  1. Future submit(Runnable task, V result);

    submit 用于提交一个 Runnable 对象及 result 对象,类似于上面的 submit,但是 runnable 的返回值 void 无法获得线程的结果,所以添加了 result 用于做为参数的桥梁

  1. Future take() throws InterruptedException;

    take 用于取出最新的线程执行结果,注意这里是阻塞的

  1. Future poll();

    take 用于取出最新的线程执行结果,是非阻塞的,如果没有结果就返回 null

  1. Future poll(long timeout, TimeUnit unit) throws InterruptedException;

    同上,只是加了一个超时时间

另外,CompletionService 是接口,无法直接使用,通常使用他的实现类 ExecutorCompletionService,具体使用方法如上面的 demo。

可能看到这里会很好奇 ExecutorCompletionService 实现原理,其实原理很简单,他在内部维护了一个阻塞队列,提交的任务,先执行完的先进入队列,所以你通过 poll 或 take 获得的肯定是最先执行完的任务结果。

其它

1. 项目代码

因为篇幅有限,无法贴完所有代码,如遇到问题可到github上查看源码。

关于

欢迎关注我的个人公众号 KIWI的碎碎念 ,关注后回复 福利,海量学习内容免费分享!

image.png