线程池的意义:

  1. 降低资源消耗,复用已创建的线程,降低开销、控制最大并发数;
  2. 隔离线程环境,可以配置独立线程池,将较慢的线程与较快的隔离开,避免相互影响;
  3. 实现任务线程队列缓冲策略和拒绝策略;
  4. 实现某些与时间相关的功能,如定时执行和周期执行等。

6.1 自定义线程池

image-20210430145117537.png

  1. import lombok.extern.slf4j.Slf4j;
  2. import java.util.ArrayDeque;
  3. import java.util.Deque;
  4. import java.util.HashSet;
  5. import java.util.concurrent.TimeUnit;
  6. import java.util.concurrent.locks.Condition;
  7. import java.util.concurrent.locks.ReentrantLock;
  8. /**
  9. * @author KHighness
  10. * @since 2021-04-30
  11. */
  12. @Slf4j(topic = "ParaKThreadPoolDemo")
  13. public class ParaKThreadPoolDemo {
  14. public static void main(String[] args) {
  15. ParaKThreadPool threadPool = new ParaKThreadPool(
  16. 1, 1000, TimeUnit.MILLISECONDS, 1,
  17. // (1) 一直等待
  18. // BlockingQueue::put
  19. // (2) 超时等待
  20. (queue, task) -> {
  21. queue.offer(task, 1500, TimeUnit.MILLISECONDS);
  22. }
  23. // (3) 放弃执行
  24. // (queue, task) -> {
  25. // log.debug("放弃执行: {}", task);
  26. // }
  27. // (4) 抛出异常
  28. // (queue, task) -> {
  29. // throw new RuntimeException("任务执行失败: " + task);
  30. // }
  31. // (5) 自己执行
  32. // (queue, task) -> {
  33. // task.run();
  34. // }
  35. );
  36. for (int i = 0; i < 3; i++) {
  37. int k = i;
  38. threadPool.execute(() -> {
  39. try {
  40. TimeUnit.SECONDS.sleep(1);
  41. } catch (InterruptedException e) {
  42. e.printStackTrace();
  43. }
  44. log.debug("{}", k);
  45. });
  46. }
  47. }
  48. }
  49. @Slf4j(topic = "ParaKThreadPool")
  50. class ParaKThreadPool {
  51. // 任务队列
  52. private BlockingQueue<Runnable> taskQueue;
  53. // 线程集合
  54. private final HashSet<Worker> workers = new HashSet<>();
  55. // 核心线程数
  56. private final int coreSize;
  57. // 获取任务的超时时间
  58. private final long timeout;
  59. // 时间单位
  60. private final TimeUnit timeUnit;
  61. // 拒绝策略
  62. private RejectPolicy<Runnable> rejectPolicy;
  63. // 构造函数
  64. public ParaKThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
  65. this.coreSize = coreSize;
  66. this.timeout = timeout;
  67. this.timeUnit = timeUnit;
  68. this.taskQueue = new BlockingQueue<>(queueCapacity);
  69. this.rejectPolicy = rejectPolicy;
  70. }
  71. // 执行任务
  72. public void execute(Runnable task) {
  73. // 当任务数没有超过coreSize时,直接交给worker对象执行
  74. // 如果任务数超过coreSize时,加入任务队列暂存
  75. synchronized (workers) {
  76. if (workers.size() < coreSize) {
  77. Worker worker = new Worker(task);
  78. log.debug("执行线程新增: {}", worker);
  79. workers.add(worker);
  80. worker.start();
  81. } else {
  82. // (1) 一直等待
  83. // (2) 超时等待
  84. // (3) 放弃执行
  85. // (4) 抛出异常
  86. // (5) 让调用者自己执行任务
  87. taskQueue.tryPut(rejectPolicy, task);
  88. }
  89. }
  90. }
  91. class Worker extends Thread {
  92. private Runnable task;
  93. public Worker(Runnable task) {
  94. this.task = task;
  95. }
  96. @Override
  97. public void run() {
  98. // 执行任务
  99. // (1) 当task不为空,执行任务
  100. // (2) 当task执行完毕,再接着从任务队列获取任务并执行
  101. // while (task != null || (task = taskQueue.take()) != null) {
  102. while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
  103. try {
  104. log.debug("正在执行 => {}", task);
  105. task.run();
  106. } catch (Exception e) {
  107. e.printStackTrace();
  108. } finally {
  109. task = null;
  110. }
  111. }
  112. synchronized (workers) {
  113. log.debug("执行线程移除: {}", this);
  114. workers.remove(this);
  115. }
  116. }
  117. }
  118. public static void main(String[] args) {
  119. }
  120. }
  121. @Slf4j(topic = "BlockingQueue")
  122. class BlockingQueue<T> {
  123. // 任务队列
  124. private final Deque<T> queue = new ArrayDeque<>();
  125. // 锁
  126. private final ReentrantLock lock = new ReentrantLock();
  127. // 最大容量
  128. private final int capacity;
  129. // 生产者条件变量
  130. private final Condition fullWaitSet = lock.newCondition();
  131. // 消费者条件变量
  132. private final Condition emptyWaitSet = lock.newCondition();
  133. public BlockingQueue(int capacity) {
  134. this.capacity = capacity;
  135. }
  136. // 带超时的阻塞获取
  137. public T poll(long timeout, TimeUnit unit) {
  138. lock.lock();
  139. try {
  140. // 将超时时间统一转换为纳秒
  141. long nanos = unit.toNanos(timeout);
  142. while (queue.isEmpty()) {
  143. try {
  144. // 返回剩余时间
  145. if (nanos <= 0) {
  146. return null;
  147. }
  148. nanos = emptyWaitSet.awaitNanos(nanos);
  149. } catch (InterruptedException e) {
  150. e.printStackTrace();
  151. }
  152. }
  153. T t = queue.removeFirst();
  154. fullWaitSet.signal();
  155. return t;
  156. } finally {
  157. lock.unlock();
  158. }
  159. }
  160. // 阻塞获取
  161. public T take() {
  162. lock.lock();
  163. try {
  164. // 队列为空,阻塞
  165. while (queue.isEmpty()) {
  166. try {
  167. emptyWaitSet.await();
  168. } catch (InterruptedException e) {
  169. e.printStackTrace();
  170. }
  171. }
  172. T t = queue.removeFirst();
  173. fullWaitSet.signal();
  174. return t;
  175. } finally {
  176. lock.unlock();
  177. }
  178. }
  179. // 阻塞添加
  180. public void put(T element) {
  181. lock.lock();
  182. try {
  183. // 队列已满,阻塞
  184. while (queue.size() == capacity) {
  185. log.debug("等待加入任务队列: {}...", element);
  186. fullWaitSet.await();
  187. }
  188. log.debug("加入任务队列: {}", element);
  189. queue.addLast(element);
  190. emptyWaitSet.signal();
  191. } catch (InterruptedException e) {
  192. e.printStackTrace();
  193. } finally {
  194. lock.unlock();
  195. }
  196. }
  197. // 带超时时间添加
  198. public boolean offer(T element, long timeout, TimeUnit timeUnit) {
  199. lock.lock();
  200. try {
  201. long nanos = timeUnit.toNanos(timeout);
  202. // 队列已满,阻塞
  203. while (queue.size() == capacity) {
  204. if (nanos <= 0) {
  205. return false;
  206. }
  207. log.debug("等待加入任务队列: {}...", element);
  208. try {
  209. nanos = fullWaitSet.awaitNanos(nanos);
  210. } catch (InterruptedException e) {
  211. e.printStackTrace();
  212. }
  213. }
  214. log.debug("加入任务队列: {}", element);
  215. queue.addLast(element);
  216. emptyWaitSet.signal();
  217. return true;
  218. }finally {
  219. lock.unlock();
  220. }
  221. }
  222. // 获取大小
  223. public int size() {
  224. lock.lock();
  225. try {
  226. return queue.size();
  227. } finally {
  228. lock.unlock();
  229. }
  230. }
  231. public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
  232. lock.lock();
  233. try {
  234. // 判断队列是否已满
  235. if (queue.size() == capacity) {
  236. rejectPolicy.reject(this, task);
  237. } else { // 有空闲
  238. log.debug("加入任务队列 {}", task);
  239. queue.addLast(task);
  240. emptyWaitSet.signal();
  241. }
  242. } finally {
  243. lock.unlock();
  244. }
  245. }
  246. }
  247. @FunctionalInterface
  248. interface RejectPolicy<T> {
  249. void reject(BlockingQueue<T> queue, T task);
  250. }

6.2 ThreadPoolExecutor

6.2.1 线程池的继承关系

image.png

6.2.2 Executor框架结构

三大部分:

  • 任务类(Runnable / Callable):执行任务需要实现的Runnable或者Callable接口。
  • 任何的执行(Executor):任务执行机制的核心接口Executor,以及继承自Executor接口的ExecutorService接口。
  • 异步计算结果(Future):Future接口以及FutureTask类都可以代表异步计算的结果。

使用示意:

image-20210430184722889.png

6.2.3 线程池状态

ThreadPoolExecutor使用int的高3位来表示线程池状态,低29位表示线程数量。

状态名 高3位 接收新任务 处理阻塞队列任务 说明
RUNNING 111 Y Y 接收新任务,同时处理任务队列中的任务
SHUTDOWN 000 N Y 不会接收新任务,但会处理阻塞队列剩余任务
STOP 001 N N 会中断正在执行的任务,并抛弃阻塞队列任务
TIDYING 010 - - 任务全执行完毕,活动线程为0即将进入终结
TERMINATED 011 - - 终结状态

从数字上比较(第一位是符号位),TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING。

这些信息存储在一个原子变量ctl中,目的是将线程池状态与线程个数合二为一,这样就可以用一次CAS原子操作进行赋值。

  1. // c为旧值,ctlOf返回结果为新值
  2. ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)));
  3. // rs为高三位代表线程池状态,wc为低29位代表线程个数,ctl是合并它们
  4. private static int ctlOf(int rs, int wc) { return rs | wc; }

6.2.4 线程池属性

  1. /** 工作线程 */
  2. private final class Worker extends AbstractQueuedSynchronizer implements Runnable
  3. {
  4. /** 正在运行的线程 */
  5. final Thread thread;
  6. /** 初始化任务 */
  7. Runnable firstTask;
  8. /** 已完成的任务数量 */
  9. volatile long completedTasks;
  10. }
  11. /** 阻塞队列 */
  12. private final BlockingQueue<Runnable> workQueue;
  13. /** 锁 */
  14. private final ReentrantLock mainLock = new ReentrantLock();
  15. /** 线程容器 */
  16. private final HashSet<Worker> workers = new HashSet<Worker>();

6.2.5 构造函数和参数

最全构造函数:

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler)

参数解释:

  • corePoolSize:核心线程数量(一直存在,不会被销毁)
  • maximumPoolSize:最大线程数量
    • maximumPool - corePoolSize:救急线程数量(存活时间为keepAliveTime
    • 在没有空闲的核心线程和任务队列满了的情况下才使用救急线程
  • keepAliveTime:最大生存时间(对于救急线程)
  • unit:时间单位
  • workQueue:阻塞队列(存放任务)
  • threadFactory:线程工厂(用于命名)
  • handler:拒绝策略

工作流程:

image-20210430203563881.png

  1. 如果核心线程池未满(workCount < corePoolSize),创建一个新的线程执行任务;
  2. 如果核心线程池已满,工作队列未满(workCount >= corePoolSize),将线程存储在工作队列;
  3. 如果工作队列已满,线程数小于最大线程数(workCount < maximumPoolSize),创建一个新线程处理任务;
  4. 如果超过最大线程数量(workCount >= maximumPoolSize),按照拒绝策略来处理任务。

6.2.6 拒绝策略和场景

如果线程数量达到maximumPoolSize仍然有新人物时会执行拒绝策略,JDK提供了四种实现。

image-20210430215158450.png

具体:

(1)AbortPolicy终止策略:丢弃任务并抛出RejectedExecutionException异常,这是默认策略。

  • 详述:这是线程池默认的拒绝策略,在任务不能再提交的时候,抛出异常,及时反馈程序运行状态。如果是比较关键的业务,推荐使用此拒绝策略,这样子在系统不能承载更大的并发量的时候,能够及时的通过异常发现。
  • 功能:当触发拒绝策略时,直接抛出拒绝执行的异常,中止策略的意思也就是打断当前执行流程。
  • 使用场景:这个就没有特殊的场景了,但是有一点要正确处理抛出的异常。ThreadPoolExecutor中默认的策略就是AbortPolicy,ExecutorService接口的系列ThreadPoolExecutor因为都没有显示的设置拒绝策略,所以默认的都是这个。但是请注意,ExecutorService中的线程池实例队列都是无界的,也就是说把内存撑爆了都不会触发拒绝策略。当自己自定义线程池实例时,使用这个策略一定要处理好触发策略时抛的异常,因为他会打断当前的执行流程。

(2)DiscardPolicy丢弃策略:丢弃任务,但是不抛出异常。如果线程队列已满,则后续提交的任务都会被丢弃,且静默丢弃。

  • 详述:使用此策略,可能使我们无法发现系统的异常状态。建议是一些无关紧要的业务采用此策略。
  • 功能:直接静静地丢弃这个任务,不触发任何动作。
  • 使用场景:如果你提交的任务无关紧要,你就可以使用它 。因为它就是个空实现,会悄无声息的吞噬你的的任务。所以这个策略基本上不用了。

(3)DiscardOldestPolicy弃老策略:丢弃队列最前面的任务,然后重新提交被拒绝的任务。

  • 详述:喜新厌旧的拒绝策略。是否采用还是得根据实际业务是否允许丢弃老任务来认真衡量。
  • 功能:如果线程池未关闭,就弹出队列头部的元素,然后尝试执行。
  • 使用场景:这个策略还是会丢弃任务,丢弃时也是毫无声息,但是特点是丢弃的是老的未执行的任务,而且是待执行优先级较高的任务。基于这个特性,想到的场景就是,发布消息和修改消息,当消息发布出去后,还未执行,此时更新的消息又来了,这个时候未执行的消息的版本比现在提交的消息版本要低就可以被丢弃了。因为队列中还有可能存在消息版本更低的消息会排队执行,所以在真正处理消息的时候一定要做好消息的版本比较。

(4)CallerRunsPolicy调用者运行策略:由调用线程处理该任务。

  • 功能:当触发拒绝策略时,只要线程没有关闭,就由提交任务的当前线程处理。
  • 使用场景:一般在不允许失败的、对性能要求不高、并发量较小的场景下使用,因为线程池一般情况下不会关闭,也就是提交的任务一定会被运行,但是由于是调用者线程自己执行的,当多次提交任务时,就会阻塞后续任务执行,性能和效率自然就慢了。

其他框架实现

  • Dubbo:在抛出RejectedExecutionException异常之前会记录日志和dump线程栈信息,方便定位问题
  • Netty:创建一个新的线程来执行任务
  • ActiveMQ:超时等待(60s),尝试放入队列
  • PinPoint:使用了一种拒绝策略链,会逐一尝试策略链中每种拒绝策略

口诀:拒终丢老调(线程池拒绝策略:终止策略、丢弃策略、弃老策略、调用者运行策略)

场景:

  • 终止策略:无特殊场景
  • 丢弃策略:无关紧要的任务
  • 弃老策略:发布消息
  • 调用者运行策略:不允许失败

过程:

  1. 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
  2. 当线程数达到corePoolSize并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue队列,直到有空闲的线程。
  3. 如果队列选择了有界队列,那么任务超过了队列大小时,会创建maximumPoolSize-corePoolSize数目的线程来救急。
  4. 如果线程达到maximumPoolSize仍然有新任务时会执行拒绝策略。
  5. 当高峰过去后,超过corePoolSize的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间kepAliveTimeunit来控制。

6.2.7 固定大小线程池

newFixedThreadPool

构造方法的参数:

  • 核心线程数:nThreads
  • 线程工厂:threadFactory
  1. public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>(),
  5. threadFactory);
  6. }

特点:

  • 核心线程数 = 最大线程数,没有救急线程,无需设置超时时间。
  • 阻塞队列是无界的,可以放任意数量的任务。

场景:适用于任务量已知,相对耗时的任务。

6.2.8 无界限制线程池

newCachedThreadPool

构造方法的参数:

  • 线程工厂:threadFactory
  1. public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>(),
  5. threadFactory);
  6. }

特点:

  • 核心线程是0,最大线程数是Integer.MAX_VALUE,救急线程的空闲生存时间是60S,意味着
    • 全部都是救急线程(空闲60S后可以回收)
    • 救急线程可以无限创建
  • 任务队列采用SyncheonousQueue实现特点是,它没有容量,没有线程来取是放不进去的

场景:

  • 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲1分钟后释放线程。
  • 适合任务数比较密集,但每个任务执行时间较短的情况。

SyncheonousQueue示例:

  1. @Slf4j(topic = "SynchronousQueue")
  2. public class SynchronousQueueDemo {
  3. public static void main(String[] args) throws InterruptedException {
  4. SynchronousQueue<Integer> integers = new SynchronousQueue<>();
  5. new Thread(() -> {
  6. try {
  7. log.debug("putting {}", 1);
  8. integers.put(1);
  9. log.debug("{} finish", 1);
  10. log.debug("putting {}", 2);
  11. integers.put(2);
  12. log.debug("{} finish", 2);
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. }, "t1").start();
  17. TimeUnit.SECONDS.sleep(1);
  18. new Thread(() -> {
  19. try {
  20. log.debug("taking {}", 1);
  21. integers.take();
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. }, "t2").start();
  26. TimeUnit.SECONDS.sleep(1);
  27. new Thread(() -> {
  28. try {
  29. log.debug("taking {}", 2);
  30. integers.take();
  31. } catch (InterruptedException e) {
  32. e.printStackTrace();
  33. }
  34. }, "t3").start();
  35. }
  36. }

输出:

  1. 2021-05-01 15:19:10.611 [t1] DEBUG SynchronousQueue - putting 1
  2. 2021-05-01 15:19:11.621 [t2] DEBUG SynchronousQueue - taking 1
  3. 2021-05-01 15:19:11.621 [t1] DEBUG SynchronousQueue - 1 finish
  4. 2021-05-01 15:19:11.621 [t1] DEBUG SynchronousQueue - putting 2
  5. 2021-05-01 15:19:12.632 [t3] DEBUG SynchronousQueue - taking 2
  6. 2021-05-01 15:19:12.633 [t1] DEBUG SynchronousQueue - 2 finish

6.2.9 始终如一线程池

newSingleThreadExecutor

构造方法的参数:

  • 线程工厂:threadFactory
  1. public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>(),
  6. threadFactory));
  7. }

场景:希望多个任务排队执行。线程数固定为1,任务数多于1时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。

区别:

  • 对比:创建一个单线程串行执行任务
    • 单线程如果任务执行失败而终止那么没有任何补救措施。
    • newSingleThreadExecutor线程池还会新建一个线程,保证线程池的正常工作。
  • 对比:Executors.newFixedThreadPool(1)
    • newFixedThreadPool对外暴露的是ThreadPoolExecutor对象,可以强转后调用setCoreSize等方法进行修改。
    • FinalizableDelegatedExecutorService应用的是装饰器模式,只对外暴露了ExecutorService接口,因此不能ThreadPoolExecutor中特有的方法。

6.2.10 执行/提交任务

  1. // 执行任务
  2. void execute(Runnable command);
  3. // 提交任务task,用返回值Future获得任务执行结果
  4. <T> Future<T> submit(Callable<T> task);
  5. // 提交tasks中所有任务
  6. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
  7. // 提交taks中所有任务,带超时时间
  8. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
  9. // 提交tasks中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其他任务取消
  10. <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
  11. // 提交tasks中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其他任务取消
  12. <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) sthrows InterruptedException, ExecutionException, TimeoutException;

6.2.11 关闭线程池

shutdown

  • 将线程池的状态改为SHUTDOWN
  • 不再接收新任务,但是会将阻塞队列中的任务执行完
  1. public void shutdown() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. checkShutdownAccess();
  6. // 修改线程池状态
  7. advanceRunState(SHUTDOWN);
  8. // 仅打断空闲线程
  9. interruptIdleWorkers();
  10. onShutdown(); // hook for ScheduledThreadPoolExecutor
  11. } finally {
  12. mainLock.unlock();
  13. }
  14. // 尝试终结
  15. tryTerminate();
  16. }

shutdownNow

  • 将线程池的状态改为STOP
  • 不再接收新任务,也不会再执行阻塞队列中的任务
  • 会将阻塞队列中未执行的任务作为方法返回值
  • 并用interrupt的方式中断正在执行的任务
  1. public List<Runnable> shutdownNow() {
  2. List<Runnable> tasks;
  3. final ReentrantLock mainLock = this.mainLock;
  4. mainLock.lock();
  5. try {
  6. checkShutdownAccess();
  7. // 修改线程池状态
  8. advanceRunState(STOP);
  9. // 打断所有线程
  10. interruptWorkers();
  11. // 获取队列中剩余任务
  12. tasks = drainQueue();
  13. } finally {
  14. mainLock.unlock();
  15. }
  16. // 尝试终结
  17. tryTerminate();
  18. return tasks;
  19. }

其他方法:

  1. // 判断线程池状态是否是非RUNNING
  2. public boolean isShutdown();
  3. // 判断线程池状态是否是TERMINATED
  4. public boolean isTerminated();
  5. // 调用shutdown后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池TERMINATED后做些事情,可以利用此方法等待
  6. public boolean awaitTermination(long timeout, TimeUnit unit);

6.3 异步模式之工作线程

6.3.1 概述

定义:让有限的工作线程(Worker Thread)来轮流移除处理无限多的任务。也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式。

例如:海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那么成本就太高了。(对比另一种多线程设计模式:Thread-Per-Message)

注意:不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率。

例如:如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不咋地,分成服务员(线程池A)与厨师(线程池B)更为合理,当然还有更细的分工。

饥饿:固定大小线程池会有饥饿现象

  • 两个工人是同一个线程池中的两个线程
  • 他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作
    • 客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
    • 后厨做菜:做就完了
  • 如果只有一个客人,工人A处理了点餐任务,接下来它要等着工人B去把菜做好,然后上菜
  • 如果同时来了两个客人,这个时候工人A和工人B都去处理点餐了,这时没人做饭了,饥饿

解决

可以增加线程池的大小,但是不是根本解决之道。不同的任务类型,创建不同的线程池。

  1. @Slf4j(topic = "Starvation")
  2. public class StarvationDemo {
  3. /**
  4. * 菜单
  5. */
  6. static final List<String> MENU = Arrays.asList("鱼香肉丝", "宫保鸡丁", "红烧肉", "烤鸡翅");
  7. static final Random RANDOM = new Random();
  8. /**
  9. * 随机做个菜
  10. */
  11. static String cooking() {
  12. return MENU.get(RANDOM.nextInt(MENU.size()));
  13. }
  14. public static void main(String[] args) {
  15. // 服务员
  16. ExecutorService waiterPool = Executors.newFixedThreadPool(1);
  17. // 厨师
  18. ExecutorService cookPool = Executors.newFixedThreadPool(1);
  19. // 10位客人
  20. for (int i = 0; i < 10; i++) {
  21. waiterPool.execute(() -> {
  22. log.debug("处理点餐");
  23. Future<String> future = cookPool.submit(() -> {
  24. log.debug("做菜");
  25. return cooking();
  26. });
  27. try {
  28. log.debug("上菜: {}", future.get());
  29. } catch (InterruptedException | ExecutionException e) {
  30. e.printStackTrace();
  31. }
  32. });
  33. }
  34. }
  35. }

6.3.2 创建多少线程池合适

  • 过小会导致程序不能充分地利用系统资源、容易导致饥饿
  • 过大后导致更多的线程上下文切换,占用更多内存

CPU密集型运算

例如:加密、解密和计算等。

一般:CPU核数 + 1

+1是保证当线程由于页缺失故障(操作系统)或其他原因导致暂停时,额外的这个线程就能顶上去,保证CPU时钟周期不被浪费。

I/O密集型运算

例如:文件读写、网络通信。

一般:CPU核数 * 2

经验公式:线程数 = 核数 * 期望CPU利用率 * 总时间(CPU计算时间 + 等待时间) / CPU计算时间

CPU不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但当你执行IO操作时、远程RPC调用时,包括进行数据操作时,这时候CPU就闲下来了,你可以利用多线程提高它的利用率。

6.3.3 任务调度线程池

在『任务调度线程池』功能加入之前,可以使用java.util.Timer来实现定时功能,Timer的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务再执行,前一个任务的延迟或异常都将会影响到之后的任务。

例如:

  1. @Slf4j(topic = "Timer")
  2. public class TimerDemo {
  3. public static void main(String[] args) {
  4. Timer timer = new Timer();
  5. TimerTask task1 = new TimerTask() {
  6. @Override
  7. public void run() {
  8. log.debug("task1");
  9. try {
  10. TimeUnit.SECONDS.sleep(2);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. }
  15. };
  16. TimerTask task2 = new TimerTask() {
  17. @Override
  18. public void run() {
  19. log.debug("task2");
  20. }
  21. };
  22. log.debug("start...");
  23. timer.schedule(task1, 1000);
  24. timer.schedule(task2, 1000);
  25. }
  26. }

输出:

  1. 2021-05-01 22:34:08.514 [main] DEBUG Timer - start...
  2. 2021-05-01 22:34:09.528 [Timer-0] DEBUG Timer - task1
  3. 2021-05-01 22:34:11.541 [Timer-0] DEBUG Timer - task2

由于task1的原因导致task2被延迟执行。

使用ScheduledThreadPool

  1. @Slf4j(topic = "NewScheduledThreadPool")
  2. public class NewScheduledThreadPoolDemo {
  3. public static void main(String[] args) {
  4. ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
  5. pool.schedule(() -> {
  6. log.debug("task1");
  7. try {
  8. TimeUnit.SECONDS.sleep(2);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. int i = 1 / 0;
  13. }, 1, TimeUnit.SECONDS);
  14. pool.schedule(() -> {
  15. log.debug("task2");
  16. }, 1, TimeUnit.SECONDS);
  17. }
  18. }

结果:

  1. 2021-05-01 22:46:23.053 [pool-1-thread-2] DEBUG NewScheduledThreadPool - task2
  2. 2021-05-01 22:46:23.053 [pool-1-thread-1] DEBUG NewScheduledThreadPool - task1

task1出现延迟或者抛出异常都不会影响到task2的执行。

scheduleAtFixedRate:创建并执行周期性操作。该操作在给定的初始延迟initialDelay后首先启用,随后在给定的周期period内启用。

作用:给一个执行的开始和下一个执行的开始加上给定的延迟period。

异常:如果任务的任何执行遇到异常,则会抑制后续执行。否则,任务将仅通过取消或终止执行程序来终止。如果此任务的任何执行时间超过其周期,则后续执行可能会较晚开始,但不会并发执行。『任务的异常被catch则不影响后续执行』

  1. // 延迟一秒执行,间隔时间为Max{设置间隔时间,线程执行时间}
  2. pool.scheduleAtFixedRate(() -> {
  3. log.debug("running");
  4. try {
  5. TimeUnit.SECONDS.sleep(2);
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. }
  9. }, 1, 1, TimeUnit.SECONDS);

scheduleWithFixedDelay:创建并执行周期性操作。该操作在给定的初始延迟initialDelay后首先启用,随后在给定的周期period内启用。

作用:给一个执行的终止和下一个执行的开始加上给定的延迟period。

异常:如果任务的任何执行遇到异常,则会抑制后续执行。否则,任务将仅通过取消或终止执行程序来终止。『任务的异常被catch则不影响后续执行』

  1. // 延迟一秒执行,间隔时间为设置时间+线程睡眠时间
  2. pool.scheduleWithFixedDelay(() -> {
  3. log.debug("running");
  4. try {
  5. TimeUnit.SECONDS.sleep(2);
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. }
  9. }, 1, 1, TimeUnit.SECONDS);

6.3.4 正确处理异常

线程池中的线程执行任务时,如果任务抛出了异常,默认是中断该任务而不是抛出异常或者打印异常信息。

  • 主动捕获异常
  • 使用Future

6.3.5 实现定时任务

  1. @Slf4j(topic = "Schedule")
  2. public class ScheduleDemo {
  3. private static final ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
  4. public static void main(String[] args) {
  5. // 每周六15:16:00打印KHighness
  6. solution(DayOfWeek.SUNDAY, 15, 16, 00, () -> { log.debug("KHighness"); });
  7. }
  8. /**
  9. * 指定时间执行任务
  10. *
  11. * @param day 周几
  12. * @param hour 几时
  13. * @param minute 几分
  14. * @param second 几秒
  15. * @param task 任务
  16. */
  17. public static void solution(DayOfWeek day, int hour, int minute, int second, Runnable task) {
  18. // 当前时间
  19. LocalDateTime now = LocalDateTime.now();
  20. // 目标时间
  21. LocalDateTime tar = now.withHour(hour).withMinute(minute).withSecond(second).with(day);
  22. // 如果当前时间 > 目标时间,则为下周
  23. if (now.compareTo(tar) > 0) {
  24. tar.plusWeeks(1);
  25. }
  26. // 间隔时间
  27. long initialDelay = Duration.between(now, tar).toMillis();
  28. // 循环周期
  29. long period = 7 * 24 * 3600 * 1000;
  30. // 执行任务
  31. pool.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.MILLISECONDS);
  32. }
  33. }

6.4 Tomcat线程池

6.4.1 总体结构

image-20210502151900797.png

  • LimitLatch用来限流,可以控制最大连接个数,类似JUC中的Semaphore
  • Acceptor只负责接收新的socket连接
  • Poller只负责监听socket channel是否有新的可读的I/O事件
  • 一旦可读,封装一个任务对象socketProcessor,提交给Executor线程池处理
  • Executor线程池中的工作线程最终负责处理请求

6.4.2 不同之处

Tomcat线程池扩展了ThreadPoolExecutor,行为稍有不同

  • 如果总线程数达到maximumPoolSize

    • 这时不会立刻抛RejectedExecutionException
    • 而是尝试将任务放入队列,如果还失败,才抛出RejectedExecutionException


    tomcat-7.0.42源码

  1. public void execute(Runnable command, long timeout, TimeUnit unit) {
  2. submittedCount.incrementAndGet();
  3. try {
  4. super.execute(command);
  5. } catch (RejectedExecutionException rx) {
  6. if (super.getQueue() instanceof TaskQueue) {
  7. final TaskQueue queue = (TaskQueue)super.getQueue();
  8. try {
  9. // 尝试将任务重新加入阻塞队列
  10. if (!queue.force(command, timeout, unit)) {
  11. submittedCount.decrementAndGet();
  12. throw new RejectedExecutionException("Queue capacity is full.");
  13. }
  14. } catch (InterruptedException x) {
  15. // 依然失败,抛出异常
  16. submittedCount.decrementAndGet();
  17. Thread.interrupted();
  18. throw new RejectedExecutionException(x);
  19. }
  20. } else {
  21. submittedCount.decrementAndGet();
  22. throw rx;
  23. }
  24. }
  25. }
  1. public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
  2. if ( parent.isShutdown() )
  3. throw new RejectedExecutionException(
  4. "Executor not running, can't force a command into the queue"
  5. );
  6. return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
  7. }

6.4.3 可配置项

Connector配置

配置项 默认值 说明
acceptorThreadCount 1 accter线程数量
pollerThreadCount 1 poller线程数量
minSpareThreads 10 核心线程数,即corePoolSize
maxThreads 200 最大线程数,即maximumPoolSize
executor - Executor名称,用来引用下面的Executor

Executor配置

配置项 默认值 说明
threadPriority 5 线程优先级
daemon true 是否守护线程
minSpareThreads 25 核心线程数,即corePoolSize
maxThreads 200 最大线程数,即maximumPoolSize
maxIdleTime 60000 线程生存时间,单位是毫秒,默认值即1分钟
maxQueueSize Integer.MAX_VALUE 队列长度
prestartminSpareThreads false 核心线程是否在服务器启动时启动

image-20210502155938527.png

6.5 Fork/Join

6.5.1 概述

Fork/Join是JDK1.7加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的CPU密集型运算。

所谓的任务拆分,是将一个大任务拆分成算法上相同的小人物,直至不能拆分成可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列,都可以用分治思想进行求解。

Fork/Join在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率。

Fork/Join默认会创建与CPU核心数大小相同的线程池。

6.5.2 使用

提交给Fork/Join线程池的任务需要继承RecursiveTask(有返回值)或RecuisiveAction(没有返回值),例如下面定义了一个对1~n之间的整数求和的任务。

如下面定义了一个对1~n之间的整数求和的任务

  1. @Slf4j(topic = "ForkJoin")
  2. public class ForkJoinDemo {
  3. public static void main(String[] args) {
  4. ForkJoinPool pool = new ForkJoinPool(4);
  5. System.out.println(pool.invoke(new Task(5)));
  6. }
  7. }
  8. @Slf4j(topic = "Task")
  9. class Task extends RecursiveTask<Integer> {
  10. private final int n;
  11. public Task(int n) {
  12. this.n = n;
  13. }
  14. @Override
  15. public String toString() {
  16. return "『" + n + "』";
  17. }
  18. @Override
  19. protected Integer compute() {
  20. if (n == 1) return 1;
  21. Task tas = new Task(n - 1);
  22. tas.fork();
  23. log.debug("fork: {} {}", n, tas);
  24. int res = n + tas.join();
  25. log.debug("join: {} + {} = {}", n, tas, res);
  26. return res;
  27. }
  28. }

结果:

  1. 2021-05-02 17:04:41.631 [ForkJoinPool-1-worker-2] DEBUG Task - fork: 4 3
  2. 2021-05-02 17:04:41.631 [ForkJoinPool-1-worker-1] DEBUG Task - fork: 5 4
  3. 2021-05-02 17:04:41.631 [ForkJoinPool-1-worker-0] DEBUG Task - fork: 2 1
  4. 2021-05-02 17:04:41.631 [ForkJoinPool-1-worker-3] DEBUG Task - fork: 3 2
  5. 2021-05-02 17:04:41.634 [ForkJoinPool-1-worker-0] DEBUG Task - join: 2 + 1 = 3
  6. 2021-05-02 17:04:41.634 [ForkJoinPool-1-worker-3] DEBUG Task - join: 3 + 2 = 6
  7. 2021-05-02 17:04:41.634 [ForkJoinPool-1-worker-2] DEBUG Task - join: 4 + 3 = 10
  8. 2021-05-02 17:04:41.634 [ForkJoinPool-1-worker-1] DEBUG Task - join: 5 + 4 = 15
  9. 15

流程:

image-20210502171007613.png

改进:任务拆分

  1. @Slf4j(topic = "AddTask")
  2. class AddTask extends RecursiveTask<Integer> {
  3. int begin;
  4. int end;
  5. public AddTask(int begin, int end) {
  6. this.begin = begin;
  7. this.end = end;
  8. }
  9. @Override
  10. public String toString() {
  11. return "『" + begin + ", " + end + "』";
  12. }
  13. @Override
  14. protected Integer compute() {
  15. if (begin == end) {
  16. log.debug("join: {}", begin);
  17. return begin;
  18. }
  19. if (end - begin == 1) {
  20. log.debug("join: {} + {} = {}", begin, end, begin + end);
  21. return begin + end;
  22. }
  23. int mid = begin + (end - begin >> 1);
  24. AddTask task1 = new AddTask(begin, mid);
  25. task1.fork();
  26. AddTask task2 = new AddTask(mid + 1, end);
  27. task2.fork();
  28. log.debug("fork: {} + {} = ?", task1, task2);
  29. int res = task1.join() + task2.join();
  30. log.debug("join: {} + {} = {}", task1, task2, res);
  31. return res;
  32. }
  33. }

流程:

  1. 2021-05-02 17:39:00.721 [ForkJoinPool-1-worker-0] DEBUG AddTask - join: 1 + 2 = 3
  2. 2021-05-02 17:39:00.721 [ForkJoinPool-1-worker-2] DEBUG AddTask - fork: 1, 2 + 3, 3 = ?
  3. 2021-05-02 17:39:00.721 [ForkJoinPool-1-worker-3] DEBUG AddTask - join: 4 + 5 = 9
  4. 2021-05-02 17:39:00.721 [ForkJoinPool-1-worker-1] DEBUG AddTask - fork: 1, 3 + 4, 5 = ?
  5. 2021-05-02 17:39:00.724 [ForkJoinPool-1-worker-0] DEBUG AddTask - join: 3
  6. 2021-05-02 17:39:00.724 [ForkJoinPool-1-worker-2] DEBUG AddTask - join: 1, 2 + 3, 3 = 6
  7. 2021-05-02 17:39:00.724 [ForkJoinPool-1-worker-1] DEBUG AddTask - join: 1, 3 + 4, 5 = 15
  8. 15

流程:

image-20210502174452155.png