线程池

自定义线程池

image.png

步骤1:自定义拒绝策略接口

  1. @FunctionalInterface // 拒绝策略
  2. interface RejectPolicy<T> {
  3. void reject(BlockingQueue<T> queue, T task);
  4. }

步骤2:自定义任务队列

  1. class BlockingQueue<T> {
  2. // 1. 任务队列
  3. private Deque<T> queue = new ArrayDeque<>();
  4. // 2. 锁
  5. private ReentrantLock lock = new ReentrantLock();
  6. // 3. 生产者条件变量
  7. private Condition fullWaitSet = lock.newCondition();
  8. // 4. 消费者条件变量
  9. private Condition emptyWaitSet = lock.newCondition();
  10. // 5. 容量
  11. private int capcity;
  12. public BlockingQueue(int capcity) {
  13. this.capcity = capcity;
  14. }
  15. // 带超时阻塞获取
  16. public T poll(long timeout, TimeUnit unit) {
  17. lock.lock();
  18. try {
  19. // 将 timeout 统一转换为 纳秒
  20. long nanos = unit.toNanos(timeout);
  21. while (queue.isEmpty()) {
  22. try {
  23. // 返回值是剩余时间
  24. if (nanos <= 0) {
  25. return null;
  26. }
  27. nanos = emptyWaitSet.awaitNanos(nanos);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. T t = queue.removeFirst();
  33. fullWaitSet.signal();
  34. return t;
  35. } finally {
  36. lock.unlock();
  37. }
  38. }
  39. // 阻塞获取
  40. public T take() {
  41. lock.lock();
  42. try {
  43. while (queue.isEmpty()) {
  44. try {
  45. emptyWaitSet.await();
  46. } catch (InterruptedException e) {
  47. e.printStackTrace();
  48. }
  49. }
  50. T t = queue.removeFirst();
  51. fullWaitSet.signal();
  52. return t;
  53. } finally {
  54. lock.unlock();
  55. }
  56. }
  57. // 阻塞添加
  58. public void put(T task) {
  59. lock.lock();
  60. try {
  61. while (queue.size() == capcity) {
  62. try {
  63. log.debug("等待加入任务队列 {} ...", task);
  64. fullWaitSet.await();
  65. } catch (InterruptedException e) {
  66. e.printStackTrace();
  67. }
  68. }
  69. log.debug("加入任务队列 {}", task);
  70. queue.addLast(task);
  71. emptyWaitSet.signal();
  72. } finally {
  73. lock.unlock();
  74. }
  75. }
  76. // 带超时时间阻塞添加
  77. public boolean offer(T task, long timeout, TimeUnit timeUnit) {
  78. lock.lock();
  79. try {
  80. long nanos = timeUnit.toNanos(timeout);
  81. while (queue.size() == capcity) {
  82. try {
  83. if(nanos <= 0) {
  84. return false;
  85. }
  86. log.debug("等待加入任务队列 {} ...", task);
  87. nanos = fullWaitSet.awaitNanos(nanos);
  88. } catch (InterruptedException e) {
  89. e.printStackTrace();
  90. }
  91. }
  92. log.debug("加入任务队列 {}", task);
  93. queue.addLast(task);
  94. emptyWaitSet.signal();
  95. return true;
  96. } finally {
  97. lock.unlock();
  98. }
  99. }
  100. public int size() {
  101. lock.lock();
  102. try {
  103. return queue.size();
  104. } finally {
  105. lock.unlock();
  106. }
  107. }
  108. public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
  109. lock.lock();
  110. try {
  111. // 判断队列是否满
  112. if(queue.size() == capcity) {
  113. rejectPolicy.reject(this, task);
  114. } else { // 有空闲
  115. log.debug("加入任务队列 {}", task);
  116. queue.addLast(task);
  117. emptyWaitSet.signal();
  118. }
  119. } finally {
  120. lock.unlock();
  121. }
  122. }
  123. }

步骤3:自定义线程池

  1. class ThreadPool {
  2. // 任务队列
  3. private BlockingQueue<Runnable> taskQueue;
  4. // 线程集合
  5. private HashSet<Worker> workers = new HashSet<>();
  6. // 核心线程数
  7. private int coreSize;
  8. // 获取任务时的超时时间
  9. private long timeout;
  10. private TimeUnit timeUnit;
  11. private RejectPolicy<Runnable> rejectPolicy;
  12. // 执行任务
  13. public void execute(Runnable task) {
  14. // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
  15. // 如果任务数超过 coreSize 时,加入任务队列暂存
  16. synchronized (workers) {
  17. if (workers.size() < coreSize) {
  18. Worker worker = new Worker(task);
  19. log.debug("新增 worker{}, {}", worker, task);
  20. workers.add(worker);
  21. worker.start();
  22. } else {
  23. // taskQueue.put(task);
  24. // 1) 死等
  25. // 2) 带超时等待
  26. // 3) 让调用者放弃任务执行
  27. // 4) 让调用者抛出异常
  28. // 5) 让调用者自己执行任务
  29. taskQueue.tryPut(rejectPolicy, task);
  30. }
  31. }
  32. }
  33. public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity,
  34. RejectPolicy<Runnable> rejectPolicy) {
  35. this.coreSize = coreSize;
  36. this.timeout = timeout;
  37. this.timeUnit = timeUnit;
  38. this.taskQueue = new BlockingQueue<>(queueCapcity);
  39. this.rejectPolicy = rejectPolicy;
  40. }
  41. class Worker extends Thread {
  42. private Runnable task;
  43. public Worker(Runnable task) {
  44. this.task = task;
  45. }
  46. @Override
  47. public void run() {
  48. // 执行任务
  49. // 1) 当 task 不为空,执行任务
  50. // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
  51. // while(task != null || (task = taskQueue.take()) != null) {
  52. while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
  53. try {
  54. log.debug("正在执行...{}", task);
  55. task.run();
  56. } catch (Exception e) {
  57. e.printStackTrace();
  58. } finally {
  59. task = null;
  60. }
  61. }
  62. synchronized (workers) {
  63. log.debug("worker 被移除{}", this);
  64. workers.remove(this);
  65. }
  66. }
  67. }
  68. }

步骤四:测试

  1. public static void main(String[] args) {
  2. ThreadPool threadPool = new ThreadPool(1,
  3. 1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{
  4. // 1. 死等
  5. // queue.put(task);
  6. // 2) 带超时等待
  7. // queue.offer(task, 1500, TimeUnit.MILLISECONDS);
  8. // 3) 让调用者放弃任务执行
  9. // log.debug("放弃{}", task);
  10. // 4) 让调用者抛出异常
  11. // throw new RuntimeException("任务执行失败 " + task);
  12. // 5) 让调用者自己执行任务
  13. task.run();
  14. });
  15. for (int i = 0; i < 4; i++) {
  16. int j = i;
  17. threadPool.execute(() -> {
  18. try {
  19. Thread.sleep(1000L);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. log.debug("{}", j);
  24. });
  25. }
  26. }

自己实现的线程池

  1. @Slf4j(topic = "c.PoolTest1")
  2. public class PoolTest1 {
  3. public static void main(String[] args) {
  4. Pool pool = new Pool(2, 1000, TimeUnit.MILLISECONDS, 1, (queue, task) -> {
  5. // 死等
  6. // queue.push(task);
  7. // 带超时等待
  8. // queue.offer(task, 2000,TimeUnit.MILLISECONDS);
  9. // 让调用者放弃任务执行
  10. // log.info("啥都不写,就是放弃{}!!!", task);
  11. // 让调用者抛出异常
  12. // throw new RuntimeException("抛出异常,后面的都不能运行");
  13. // 让调用者自己操作
  14. task.run();
  15. });
  16. for (int i = 0; i < 4; i++) {
  17. int j = i;
  18. pool.execute(() -> {
  19. try {
  20. Thread.sleep(1000);
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. log.info("{}", j);
  25. });
  26. }
  27. }
  28. }
  29. @Slf4j(topic = "c.Pool")
  30. class Pool {
  31. // 任务队列
  32. private BlockingQueues<Runnable> taskQueue;
  33. // 线程集合
  34. private HashSet<Work> workers = new HashSet<>();
  35. // 核心线程数
  36. private int coreSize;
  37. // 获取任务的超时时间
  38. private long timeout;
  39. // 队列的容量
  40. private TimeUnit timeUnit;
  41. // 拒绝策略的接口
  42. private RejectPolicy<Runnable> rejectPolicy;
  43. public Pool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapicity, RejectPolicy<Runnable> rejectPolicy) {
  44. this.taskQueue = new BlockingQueues<>(queueCapicity);
  45. this.coreSize = coreSize;
  46. this.timeout = timeout;
  47. this.timeUnit = timeUnit;
  48. this.rejectPolicy = rejectPolicy;
  49. }
  50. // 执行任务
  51. public void execute(Runnable task) {
  52. // 锁住workers
  53. synchronized (workers) {
  54. if (workers.size() < coreSize) {
  55. // 创建新的工作线程
  56. Work work = new Work(task);
  57. log.info("新增worker:{}", work);
  58. // 放入工作线程集合
  59. workers.add(work);
  60. work.start();
  61. } else {
  62. // 如果工作线程不够用,放入阻塞队列中
  63. // 死等
  64. // taskQueue.push(task);
  65. taskQueue.tryPut(rejectPolicy, task);
  66. }
  67. }
  68. }
  69. class Work extends Thread {
  70. // 任务
  71. private Runnable task;
  72. public Work(Runnable task) {
  73. this.task = task;
  74. }
  75. @Override
  76. public void run() {
  77. // 执行任务
  78. // 1.如果任务不为空,直接执行
  79. // 2.如果任务为空,那么从任务队列中获取
  80. // while (task != null || (task = taskQueue.take()) != null) { // 如果没加超时时间,线程会一直等待任务
  81. while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
  82. try {
  83. log.info("正在执行...{}", task);
  84. task.run();
  85. } catch (Exception exception) {
  86. exception.printStackTrace();
  87. } finally {
  88. // 执行完毕,任务为空
  89. task = null;
  90. }
  91. }
  92. synchronized (workers) {
  93. log.info("一直没任务,移除线程:{}", this);
  94. workers.remove(this);
  95. }
  96. }
  97. }
  98. }
  99. /**
  100. * 阻塞队列
  101. *
  102. * @param <T>
  103. */
  104. @Slf4j(topic = "c.BlockingQueues")
  105. class BlockingQueues<T> {
  106. // 线程队列
  107. private Deque<T> quque = new ArrayDeque<>();
  108. // 锁
  109. private ReentrantLock lock = new ReentrantLock();
  110. // 生产者条件变量
  111. private Condition proWaitSet = lock.newCondition();
  112. // 消费者条件变量
  113. private Condition conWaitSet = lock.newCondition();
  114. // 容量
  115. private int capicity;
  116. public BlockingQueues(int capicity) {
  117. this.capicity = capicity;
  118. }
  119. // 带超时的阻塞获取
  120. public T poll(long waitTime, TimeUnit timeUnit) {
  121. // 先加锁
  122. lock.lock();
  123. try {
  124. // 将time统一转化为纳秒
  125. long nanos = timeUnit.toNanos(waitTime);
  126. while (quque.isEmpty()) {
  127. try {
  128. // 等待完毕
  129. if (nanos <= 0) {
  130. return null;
  131. }
  132. // 没有就等待
  133. // 此时重新赋值是为了防止虚假唤醒,唤醒之后,时间就变少了
  134. // awaitNanos方法会返回剩余时间
  135. nanos = conWaitSet.awaitNanos(nanos);
  136. } catch (InterruptedException e) {
  137. e.printStackTrace();
  138. }
  139. }
  140. return quque.poll();
  141. } finally {
  142. proWaitSet.signal();
  143. lock.unlock();
  144. }
  145. }
  146. // 阻塞获取
  147. public T take() {
  148. // 先加锁
  149. lock.lock();
  150. try {
  151. while (quque.isEmpty()) {
  152. try {
  153. // 没有就等待
  154. conWaitSet.await();
  155. } catch (InterruptedException e) {
  156. e.printStackTrace();
  157. }
  158. }
  159. return quque.poll();
  160. } finally {
  161. proWaitSet.signal();
  162. lock.unlock();
  163. }
  164. }
  165. public void push(T t) {
  166. // 先加锁
  167. try {
  168. lock.lock();
  169. while (quque.size() == capicity) {
  170. try {
  171. log.info("任务队列已满,等待加入:{}", t);
  172. proWaitSet.await();
  173. } catch (InterruptedException e) {
  174. e.printStackTrace();
  175. }
  176. }
  177. quque.addLast(t);
  178. log.info("加入任务队列:{}", t);
  179. conWaitSet.signal();
  180. } finally {
  181. lock.unlock();
  182. }
  183. }
  184. /**
  185. * 带超时时间的阻塞添加
  186. *
  187. * @param task
  188. * @param timeout
  189. * @param timeUnit
  190. * @return
  191. */
  192. public boolean offer(T task, int timeout, TimeUnit timeUnit) {
  193. // 先加锁
  194. try {
  195. lock.lock();
  196. long nanos = timeUnit.toNanos(timeout);
  197. while (quque.size() == capicity) {
  198. try {
  199. if (nanos <= 0) {
  200. return false;
  201. }
  202. log.info("任务队列已满,等待加入:{}", task);
  203. nanos = proWaitSet.awaitNanos(nanos);
  204. } catch (InterruptedException e) {
  205. e.printStackTrace();
  206. }
  207. }
  208. quque.addLast(task);
  209. log.info("加入任务队列:{}", task);
  210. conWaitSet.signal();
  211. return true;
  212. } finally {
  213. lock.unlock();
  214. }
  215. }
  216. /**
  217. * 带有拒接策略的放入任务
  218. *
  219. * @param rejectPolicy
  220. * @param task
  221. */
  222. public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
  223. try {
  224. lock.lock();
  225. // 队列已满
  226. if (quque.size() == capicity) {
  227. // 调用操作者实现的操作
  228. rejectPolicy.reject(this, task);
  229. } else {
  230. // 还有空闲
  231. quque.addLast(task);
  232. log.info("加入任务队列:{}", task);
  233. conWaitSet.signal();
  234. }
  235. } finally {
  236. lock.unlock();
  237. }
  238. }
  239. public int size() {
  240. lock.lock();
  241. try {
  242. return quque.size();
  243. } finally {
  244. lock.unlock();
  245. }
  246. }
  247. }
  248. /**
  249. * 拒接策略
  250. *
  251. * @param <T>
  252. */
  253. @FunctionalInterface
  254. interface RejectPolicy<T> {
  255. void reject(BlockingQueues<T> queues, T task);
  256. }

ThreadPoolExecutor

image.png

线程池状态

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量

image.png

从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING(因为是高三位,第一位是符号所以RUNNING是负数)

这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值。

  1. // c 为旧值, ctlOf 返回结果为新值
  2. ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
  3. // rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
  4. private static int ctlOf(int rs, int wc) { return rs | wc; }

构造方法

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler)
  • corePoolSize 核心线程数目 (最多保留的线程数)
  • maximumPoolSize 最大线程数目
  • keepAliveTime 生存时间 - 针对救急线程
  • unit 时间单位 - 针对救急线程
  • workQueue 阻塞队列
  • threadFactory 线程工厂 - 可以为线程创建时起个好名字
  • handler 拒绝策略

工作方式:

image.png

  • 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
  • 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排队,直到有空闲的线程。
  • 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。
  • 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现,其它著名框架也提供了实现
    • AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
    • CallerRunsPolicy 让调用者运行任务
    • DiscardPolicy 放弃本次任务
    • DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
    • Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方
    • 便定位问题
    • Netty 的实现,是创建一个新线程来执行任务
    • ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
    • PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
  • 当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由keepAliveTime 和 unit 来控制。

image.png

根据这个构造方法,JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池。

newFixedThreadPool

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

特点

  • 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着
    • 全部都是救急线程(60s 后可以回收)
    • 救急线程可以无限创建
  • 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)
  1. SynchronousQueue<Integer> integers = new SynchronousQueue<>();
  2. new Thread(() -> {
  3. try {
  4. log.debug("putting {} ", 1);
  5. integers.put(1);
  6. log.debug("{} putted...", 1);
  7. log.debug("putting...{} ", 2);
  8. integers.put(2);
  9. log.debug("{} putted...", 2);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. },"t1").start();
  14. sleep(1);
  15. new Thread(() -> {
  16. try {
  17. log.debug("taking {}", 1);
  18. integers.take();
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. },"t2").start();
  23. sleep(1);
  24. new Thread(() -> {
  25. try {
  26. log.debug("taking {}", 2);
  27. integers.take();
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. },"t3").start();

输出

  1. 11:48:15.500 c.TestSynchronousQueue [t1] - putting 1
  2. 11:48:16.500 c.TestSynchronousQueue [t2] - taking 1
  3. 11:48:16.500 c.TestSynchronousQueue [t1] - 1 putted...
  4. 11:48:16.500 c.TestSynchronousQueue [t1] - putting...2
  5. 11:48:17.502 c.TestSynchronousQueue [t3] - taking 2
  6. 11:48:17.503 c.TestSynchronousQueue [t1] - 2 putted...

评价 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。 适合任务数比较密集,但每个任务执行时间较短的情况。

newSingleThreadExecutor

使用场景:

希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程

也不会被释放。

区别:

  • 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
  • Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改
    • FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法
  • Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改
    • 对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改

提交任务

  1. // 执行任务
  2. void execute(Runnable command);
  3. // 提交任务 task,用返回值 Future 获得任务执行结果
  4. <T> Future<T> submit(Callable<T> task);
  5. // 提交 tasks 中所有任务
  6. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  7. throws InterruptedException;
  8. // 提交 tasks 中所有任务,带超时时间
  9. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  10. long timeout, TimeUnit unit)
  11. throws InterruptedException;
  12. // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
  13. <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  14. throws InterruptedException, ExecutionException;
  15. // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
  16. <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  17. long timeout, TimeUnit unit)
  18. throws InterruptedException, ExecutionException, TimeoutException;

关闭线程

shutdown

  1. /*
  2. 线程池状态变为 SHUTDOWN
  3. - 不会接收新任务
  4. - 但已提交任务会执行完
  5. - 此方法不会阻塞调用线程的执行
  6. */
  7. void shutdown();
  1. public void shutdown() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. checkShutdownAccess();
  6. // 修改线程池状态
  7. advanceRunState(SHUTDOWN);
  8. // 仅会打断空闲线程
  9. interruptIdleWorkers();
  10. onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
  11. } finally {
  12. mainLock.unlock();
  13. }
  14. // 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
  15. tryTerminate();
  16. }

shutdownNow

  1. /*
  2. 线程池状态变为 STOP
  3. - 不会接收新任务
  4. - 会将队列中的任务返回
  5. - 并用 interrupt 的方式中断正在执行的任务
  6. */
  7. List<Runnable> shutdownNow();
  1. public List<Runnable> shutdownNow() {
  2. List<Runnable> tasks;
  3. final ReentrantLock mainLock = this.mainLock;
  4. mainLock.lock();
  5. try {
  6. checkShutdownAccess();
  7. // 修改线程池状态
  8. advanceRunState(STOP);
  9. // 打断所有线程
  10. interruptWorkers();
  11. // 获取队列中剩余任务
  12. tasks = drainQueue();
  13. } finally {
  14. mainLock.unlock();
  15. }
  16. // 尝试终结
  17. tryTerminate();
  18. return tasks;
  19. }

其它方法

  1. // 不在 RUNNING 状态的线程池,此方法就返回 true
  2. boolean isShutdown();
  3. // 线程池状态是否是 TERMINATED
  4. boolean isTerminated();
  5. // 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待
  6. boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

模式之 Worker Thread

任务调度线程池

在『任务调度线程池』功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。

  1. public static void main(String[] args) {
  2. Timer timer = new Timer();
  3. TimerTask task1 = new TimerTask() {
  4. @Override
  5. public void run() {
  6. log.debug("task 1");
  7. sleep(2);
  8. }
  9. };
  10. TimerTask task2 = new TimerTask() {
  11. @Override
  12. public void run() {
  13. log.debug("task 2");
  14. }
  15. };
  16. // 使用 timer 添加两个任务,希望它们都在 1s 后执行
  17. // 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此『任务1』的延时,影响了『任务2』的执行
  18. timer.schedule(task1, 1000);
  19. timer.schedule(task2, 1000);
  20. }

输出

  1. 20:46:09.444 c.TestTimer [main] - start...
  2. 20:46:10.447 c.TestTimer [Timer-0] - task 1
  3. 20:46:12.448 c.TestTimer [Timer-0] - task 2

使用 ScheduledExecutorService 改写:

  1. ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
  2. // 添加两个任务,希望它们都在 1s 后执行
  3. executor.schedule(() -> {
  4. System.out.println("任务1,执行时间:" + new Date());
  5. try { Thread.sleep(2000); } catch (InterruptedException e) { }
  6. }, 1000, TimeUnit.MILLISECONDS);
  7. executor.schedule(() -> {
  8. System.out.println("任务2,执行时间:" + new Date());
  9. }, 1000, TimeUnit.MILLISECONDS);

输出

  1. 任务1,执行时间:Thu Jan 03 12:45:17 CST 2019
  2. 任务2,执行时间:Thu Jan 03 12:45:17 CST 2019

scheduleAtFixedRate 例子:

  1. ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
  2. log.debug("start...");
  3. pool.scheduleAtFixedRate(() -> {
  4. log.debug("running...");
  5. }, 1, 1, TimeUnit.SECONDS);

输出

  1. 21:45:43.167 c.TestTimer [main] - start...
  2. 21:45:44.215 c.TestTimer [pool-1-thread-1] - running...
  3. 21:45:45.215 c.TestTimer [pool-1-thread-1] - running...
  4. 21:45:46.215 c.TestTimer [pool-1-thread-1] - running...
  5. 21:45:47.215 c.TestTimer [pool-1-thread-1] - running...

scheduleAtFixedRate 例子(任务执行时间超过了间隔时间):

  1. ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
  2. log.debug("start...");
  3. pool.scheduleAtFixedRate(() -> {
  4. log.debug("running...");
  5. sleep(2);
  6. }, 1, 1, TimeUnit.SECONDS);

输出分析:一开始,延时 1s,接下来,由于任务执行时间 > 间隔时间,间隔被『撑』到了 2s

  1. 21:44:30.311 c.TestTimer [main] - start...
  2. 21:44:31.360 c.TestTimer [pool-1-thread-1] - running...
  3. 21:44:33.361 c.TestTimer [pool-1-thread-1] - running...
  4. 21:44:35.362 c.TestTimer [pool-1-thread-1] - running...
  5. 21:44:37.362 c.TestTimer [pool-1-thread-1] - running

scheduleWithFixedDelay 例子:

  1. ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
  2. log.debug("start...");
  3. pool.scheduleWithFixedDelay(()-> {
  4. log.debug("running...");
  5. sleep(2);
  6. }, 1, 1, TimeUnit.SECONDS);

输出分析:一开始,延时 1s,scheduleWithFixedDelay 的间隔是 上一个任务结束 <-> 延时 <-> 下一个任务开始 所以间隔都是 3s

  1. 21:40:55.078 c.TestTimer [main] - start...
  2. 21:40:56.140 c.TestTimer [pool-1-thread-1] - running...
  3. 21:40:59.143 c.TestTimer [pool-1-thread-1] - running...
  4. 21:41:02.145 c.TestTimer [pool-1-thread-1] - running...
  5. 21:41:05.147 c.TestTimer [pool-1-thread-1] - running...

评价 整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线程也不会被释放。用来执行延迟或反复执行的任务。

正确处理执行任务异常

方法1:主动捉异常

  1. pool.submit(() -> {
  2. try {
  3. log.debug("task1");
  4. int i = 1 / 0;
  5. } catch (Exception e) {
  6. log.error("error:", e);
  7. }
  8. });

输出

  1. 21:59:04.558 c.TestTimer [pool-1-thread-1] - task1
  2. 21:59:04.562 c.TestTimer [pool-1-thread-1] - error:
  3. java.lang.ArithmeticException: / by zero
  4. at cn.itcast.n8.TestTimer.lambda$main$0(TestTimer.java:28)
  5. at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  6. at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  7. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  8. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  9. at java.lang.Thread.run(Thread.java:748)

方法2:使用 Future

  1. ExecutorService pool = Executors.newFixedThreadPool(1);
  2. ExecutorService pool = Executors.newFixedThreadPool(1);
  3. Future<Boolean> f = pool.submit(() -> {
  4. log.debug("task1");
  5. int i = 1 / 0;
  6. return true;
  7. });
  8. log.debug("result:{}", f.get());

输出

  1. 21:54:58.208 c.TestTimer [pool-1-thread-1] - task1
  2. Exception in thread "main" java.util.concurrent.ExecutionException:
  3. java.lang.ArithmeticException: / by zero
  4. at java.util.concurrent.FutureTask.report(FutureTask.java:122)
  5. at java.util.concurrent.FutureTask.get(FutureTask.java:192)
  6. at cn.itcast.n8.TestTimer.main(TestTimer.java:31)
  7. Caused by: java.lang.ArithmeticException: / by zero
  8. at cn.itcast.n8.TestTimer.lambda$main$0(TestTimer.java:28)
  9. at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  10. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  11. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  12. at java.lang.Thread.run(Thread.java:748)

应用之定时任务

如何让每周四 18:00:00 定时执行任务?

  1. public class TestSchedule {
  2. // 如何每周四18:00:00定时执行任务?
  3. public static void main(String[] args) {
  4. // 获取当前时间
  5. LocalDateTime now = LocalDateTime.now();
  6. // 获取周四时间
  7. LocalDateTime time = now.withHour(18).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.THURSDAY);
  8. // 如果当前时间大于本周周四,必须找到下周四
  9. if (now.compareTo(time) > 0) {
  10. time = time.plusWeeks(1);
  11. }
  12. // initailDelay 代表当前时间和周四的时间差
  13. // period 一周间隔的时间
  14. long initailDelay = Duration.between(now, time).toMillis();
  15. long period = 1000 * 60 * 60 * 24 * 7;
  16. ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
  17. pool.scheduleAtFixedRate(() -> {
  18. System.out.println("running ... ");
  19. }, initailDelay, period, TimeUnit.SECONDS);
  20. }
  21. }

Tomcat线程池

Tomcat 在哪里用到了线程池呢

  • LimitLatch 用来限流,可以控制最大连接个数,类似 J.U.C 中的 Semaphore 后面再讲
  • Acceptor 只负责【接收新的 socket 连接】
  • Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】
  • 一旦可读,封装一个任务对象(socketProcessor),提交给 Executor 线程池处理
  • Executor 线程池中的工作线程最终负责【处理请求】

Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同

  • 如果总线程数达到 maximumPoolSize
    • 这时不会立刻抛 RejectedExecutionException 异常
    • 而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常

tomcat7的源码

  1. submittedCount.incrementAndGet();
  2. try {
  3. super.execute(command);
  4. } catch (RejectedExecutionException rx) {
  5. if (super.getQueue() instanceof TaskQueue) {
  6. final TaskQueue queue = (TaskQueue)super.getQueue();
  7. try {
  8. if (!queue.force(command, timeout, unit)) {
  9. submittedCount.decrementAndGet();
  10. throw new RejectedExecutionException("Queue capacity is full.");
  11. }
  12. } catch (InterruptedException x) {
  13. submittedCount.decrementAndGet();
  14. Thread.interrupted();
  15. throw new RejectedExecutionException(x);
  16. }
  17. } else {
  18. submittedCount.decrementAndGet();
  19. throw rx;
  20. }
  21. }
  22. }

TaskQueue.java

  1. public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
  2. if ( parent.isShutdown() )
  3. throw new RejectedExecutionException(
  4. "Executor not running, can't force a command into the queue"
  5. );
  6. return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task
  7. is rejected
  8. }

Connector 配置

image.png

Executor 线程配置

image.png

image.png

Fork/Join

概念

Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算

所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解

Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率

Fork/Join 默认会创建与 cpu 核心数大小相同的线程池

使用

提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值),例如下面定义了一个对 1~n 之间的整数求和的任务。

  1. class AddTask1 extends RecursiveTask<Integer> {
  2. int n;
  3. public AddTask1(int n) {
  4. this.n = n;
  5. }
  6. @Override
  7. public String toString() {
  8. return "{" + n + '}';
  9. }
  10. @Override
  11. protected Integer compute() {
  12. // 如果 n 已经为 1,可以求得结果了
  13. if (n == 1) {
  14. log.debug("join() {}", n);
  15. return n;
  16. }
  17. // 将任务进行拆分(fork)
  18. AddTask1 t1 = new AddTask1(n - 1);
  19. t1.fork();
  20. log.debug("fork() {} + {}", n, t1);
  21. // 合并(join)结果
  22. int result = n + t1.join();
  23. log.debug("join() {} + {} = {}", n, t1, result);
  24. return result;
  25. }
  26. }

然后提交给 ForkJoinPool 来执行

  1. public static void main(String[] args) {
  2. ForkJoinPool pool = new ForkJoinPool(4);
  3. System.out.println(pool.invoke(new AddTask3(1, 10)));
  4. }

结果

  1. [ForkJoinPool-1-worker-0] - fork() 2 + {1}
  2. [ForkJoinPool-1-worker-1] - fork() 5 + {4}
  3. [ForkJoinPool-1-worker-0] - join() 1
  4. [ForkJoinPool-1-worker-0] - join() 2 + {1} = 3
  5. [ForkJoinPool-1-worker-2] - fork() 4 + {3}
  6. [ForkJoinPool-1-worker-3] - fork() 3 + {2}
  7. [ForkJoinPool-1-worker-3] - join() 3 + {2} = 6
  8. [ForkJoinPool-1-worker-2] - join() 4 + {3} = 10
  9. [ForkJoinPool-1-worker-1] - join() 5 + {4} = 15
  10. 15

image.png

改进

  1. class AddTask3 extends RecursiveTask<Integer> {
  2. int begin;
  3. int end;
  4. public AddTask3(int begin, int end) {
  5. this.begin = begin;
  6. this.end = end;
  7. }
  8. @Override
  9. public String toString() {
  10. return "{" + begin + "," + end + '}';
  11. }
  12. @Override
  13. protected Integer compute() {
  14. // 5, 5
  15. if (begin == end) {
  16. log.debug("join() {}", begin);
  17. return begin;
  18. }
  19. // 4, 5
  20. if (end - begin == 1) {
  21. log.debug("join() {} + {} = {}", begin, end, end + begin);
  22. return end + begin;
  23. }
  24. // 1 5
  25. int mid = (end + begin) / 2; // 3
  26. AddTask3 t1 = new AddTask3(begin, mid); // 1,3
  27. t1.fork();
  28. AddTask3 t2 = new AddTask3(mid + 1, end); // 4,5
  29. t2.fork();
  30. log.debug("fork() {} + {} = ?", t1, t2);
  31. int result = t1.join() + t2.join();
  32. log.debug("join() {} + {} = {}", t1, t2, result);
  33. return result;
  34. }
  35. }

结果

  1. [ForkJoinPool-1-worker-0] - join() 1 + 2 = 3
  2. [ForkJoinPool-1-worker-3] - join() 4 + 5 = 9
  3. [ForkJoinPool-1-worker-0] - join() 3
  4. [ForkJoinPool-1-worker-1] - fork() {1,3} + {4,5} = ?
  5. [ForkJoinPool-1-worker-2] - fork() {1,2} + {3,3} = ?
  6. [ForkJoinPool-1-worker-2] - join() {1,2} + {3,3} = 6
  7. [ForkJoinPool-1-worker-1] - join() {1,3} + {4,5} = 15
  8. 15

image.png