线程池

  1. 线程池管理器:用于创建并管理线程池, 包括创建线程池,销毁线程池,添加新任务;
  2. 工作线程:线程池中线程, 在没有任务时处于等待状态, 可以循环的执行任务;
  3. 任务接口:每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作, 任务的执行状态等;
  4. 任务队列:用于存放没有处理的任务。提供一种缓冲机制。 | 类型 | 名称 | 描述 | | —- | —- | —- | | 接口 | Executor | 最上层的接口,定义了执行任务的方法execute | | 接口 | ExecutorService | 继承了Executor,拓展了Callable、Future、关闭方法 | | 接口 | ScheduledExecutorService | 继承了ExecutorService,增加了定时任务相关方法 | | 实现类 | ThreadPoolExecutor | 基础、标准的线程池实现 | | 实现类 | ScheduledThreadPoolService | 继承了ThreadPoolExecutor,实现了ScheduledExecutorService中相关定时任务的方法。 |

ThreadPoolExecutor

ThreadPoolExecutor线程池执行过程:
image.png

  1. /** 线程池的使用 */
  2. public class Demo9 {
  3. /**
  4. * 测试: 提交15个执行时间需要3秒的任务,看线程池的状况
  5. *
  6. * @param threadPoolExecutor 传入不同的线程池,看不同的结果
  7. * @throws Exception
  8. */
  9. public void testCommon(ThreadPoolExecutor threadPoolExecutor) throws Exception {
  10. // 测试: 提交15个执行时间需要3秒的任务,看超过大小的2个,对应的处理情况
  11. for (int i = 0; i < 15; i++) {
  12. int n = i;
  13. threadPoolExecutor.submit(new Runnable() {
  14. @Override
  15. public void run() {
  16. try {
  17. System.out.println("开始执行:" + n);
  18. Thread.sleep(3000L);
  19. System.err.println("执行结束:" + n);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. });
  25. System.out.println("任务提交成功 :" + i);
  26. }
  27. // 查看线程数量,查看队列等待数量
  28. Thread.sleep(500L);
  29. System.out.println("当前线程池线程数量为:" + threadPoolExecutor.getPoolSize());
  30. System.out.println("当前线程池等待的数量为:" + threadPoolExecutor.getQueue().size());
  31. // 等待15秒,查看线程数量和队列数量(理论上,会被超出核心线程数量的线程自动销毁)
  32. Thread.sleep(15000L);
  33. System.out.println("当前线程池线程数量为:" + threadPoolExecutor.getPoolSize());
  34. System.out.println("当前线程池等待的数量为:" + threadPoolExecutor.getQueue().size());
  35. }
  36. /**
  37. * 1、线程池信息: 核心线程数量5,最大数量10,无界队列,超出核心线程数量的线程存活时间:5秒, 指定拒绝策略的
  38. *
  39. * @throws Exception
  40. */
  41. private void threadPoolExecutorTest1() throws Exception {
  42. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,
  43. new LinkedBlockingQueue<Runnable>());
  44. testCommon(threadPoolExecutor);
  45. // 预计结果:线程池线程数量为:5,超出数量的任务,其他的进入队列中等待被执行
  46. }
  47. /**
  48. * 2、 线程池信息: 核心线程数量5,最大数量10,队列大小3,超出核心线程数量的线程存活时间:5秒, 指定拒绝策略的
  49. *
  50. * @throws Exception
  51. */
  52. private void threadPoolExecutorTest2() throws Exception {
  53. // 创建一个 核心线程数量为5,最大数量为10,等待队列最大是3 的线程池,也就是最大容纳13个任务。
  54. // 默认的策略是抛出RejectedExecutionException异常,java.util.concurrent.ThreadPoolExecutor.AbortPolicy
  55. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,
  56. new LinkedBlockingQueue<Runnable>(3), new RejectedExecutionHandler() {
  57. @Override
  58. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  59. System.err.println("有任务被拒绝执行了");
  60. }
  61. });
  62. testCommon(threadPoolExecutor);
  63. // 预计结果:
  64. // 1、 5个任务直接分配线程开始执行
  65. // 2、 3个任务进入等待队列
  66. // 3、 队列不够用,临时加开5个线程来执行任务(5秒没活干就销毁)
  67. // 4、 队列和线程池都满了,剩下2个任务,没资源了,被拒绝执行。
  68. // 5、 任务执行,5秒后,如果无任务可执行,销毁临时创建的5个线程
  69. }
  70. /**
  71. * 3、 线程池信息: 核心线程数量5,最大数量5,无界队列,超出核心线程数量的线程存活时间:5秒
  72. *
  73. * @throws Exception
  74. */
  75. private void threadPoolExecutorTest3() throws Exception {
  76. // 和Executors.newFixedThreadPool(int nThreads)一样的
  77. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
  78. new LinkedBlockingQueue<Runnable>());
  79. testCommon(threadPoolExecutor);
  80. // 预计结:线程池线程数量为:5,超出数量的任务,其他的进入队列中等待被执行
  81. }
  82. /**
  83. * 4、 线程池信息:
  84. * 核心线程数量0,最大数量Integer.MAX_VALUE,SynchronousQueue队列,超出核心线程数量的线程存活时间:60秒
  85. *
  86. * @throws Exception
  87. */
  88. private void threadPoolExecutorTest4() throws Exception {
  89. // SynchronousQueue,实际上它不是一个真正的队列,因为它不会为队列中元素维护存储空间。与其他队列不同的是,它维护一组线程,这些线程在等待着把元素加入或移出队列。
  90. // 在使用SynchronousQueue作为工作队列的前提下,客户端代码向线程池提交任务时,
  91. // 而线程池中又没有空闲的线程能够从SynchronousQueue队列实例中取一个任务,
  92. // 那么相应的offer方法调用就会失败(即任务没有被存入工作队列)。
  93. // 此时,ThreadPoolExecutor会新建一个新的工作者线程用于对这个入队列失败的任务进行处理(假设此时线程池的大小还未达到其最大线程池大小maximumPoolSize)。
  94. // 和Executors.newCachedThreadPool()一样的
  95. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
  96. new SynchronousQueue<Runnable>());
  97. testCommon(threadPoolExecutor);
  98. // 预计结果:
  99. // 1、 线程池线程数量为:15,超出数量的任务,其他的进入队列中等待被执行
  100. // 2、 所有任务执行结束,60秒后,如果无任务可执行,所有线程全部被销毁,池的大小恢复为0
  101. Thread.sleep(60000L);
  102. System.out.println("60秒后,再看线程池中的数量:" + threadPoolExecutor.getPoolSize());
  103. }
  104. /**
  105. * 5、 定时执行线程池信息:3秒后执行,一次性任务,到点就执行 <br/>
  106. * 核心线程数量5,最大数量Integer.MAX_VALUE,DelayedWorkQueue延时队列,超出核心线程数量的线程存活时间:0秒
  107. *
  108. * @throws Exception
  109. */
  110. private void threadPoolExecutorTest5() throws Exception {
  111. // 和Executors.newScheduledThreadPool()一样的
  112. ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(5);
  113. threadPoolExecutor.schedule(new Runnable() {
  114. @Override
  115. public void run() {
  116. System.out.println("任务被执行,现在时间:" + System.currentTimeMillis());
  117. }
  118. }, 3000, TimeUnit.MILLISECONDS);
  119. System.out.println(
  120. "定时任务,提交成功,时间是:" + System.currentTimeMillis() + ", 当前线程池中线程数量:" + threadPoolExecutor.getPoolSize());
  121. // 预计结果:任务在3秒后被执行一次
  122. }
  123. /**
  124. * 6、 定时执行线程池信息:线程固定数量5 ,<br/>
  125. * 核心线程数量5,最大数量Integer.MAX_VALUE,DelayedWorkQueue延时队列,超出核心线程数量的线程存活时间:0秒
  126. *
  127. * @throws Exception
  128. */
  129. private void threadPoolExecutorTest6() throws Exception {
  130. ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(5);
  131. // 周期性执行某一个任务,线程池提供了两种调度方式,这里单独演示一下。测试场景一样。
  132. // 测试场景:提交的任务需要3秒才能执行完毕。看两种不同调度方式的区别
  133. // 效果1: 提交后,2秒后开始第一次执行,之后每间隔1秒,固定执行一次(如果发现上次执行还未完毕,则等待完毕,完毕后立刻执行)。
  134. // 也就是说这个代码中是,3秒钟执行一次(计算方式:每次执行三秒,间隔时间1秒,执行结束后马上开始下一次执行,无需等待)
  135. threadPoolExecutor.scheduleAtFixedRate(new Runnable() {
  136. @Override
  137. public void run() {
  138. try {
  139. Thread.sleep(3000L);
  140. } catch (InterruptedException e) {
  141. e.printStackTrace();
  142. }
  143. System.out.println("任务-1 被执行,现在时间:" + System.currentTimeMillis());
  144. }
  145. }, 2000, 1000, TimeUnit.MILLISECONDS);
  146. // 效果2:提交后,2秒后开始第一次执行,之后每间隔1秒,固定执行一次(如果发现上次执行还未完毕,则等待完毕,等上一次执行完毕后再开始计时,等待1秒)。
  147. // 也就是说这个代码钟的效果看到的是:4秒执行一次。 (计算方式:每次执行3秒,间隔时间1秒,执行完以后再等待1秒,所以是 3+1)
  148. threadPoolExecutor.scheduleWithFixedDelay(new Runnable() {
  149. @Override
  150. public void run() {
  151. try {
  152. Thread.sleep(3000L);
  153. } catch (InterruptedException e) {
  154. e.printStackTrace();
  155. }
  156. System.out.println("任务-2 被执行,现在时间:" + System.currentTimeMillis());
  157. }
  158. }, 2000, 1000, TimeUnit.MILLISECONDS);
  159. }
  160. /**
  161. * 7、 终止线程:线程池信息: 核心线程数量5,最大数量10,队列大小3,超出核心线程数量的线程存活时间:5秒, 指定拒绝策略的
  162. *
  163. * @throws Exception
  164. */
  165. private void threadPoolExecutorTest7() throws Exception {
  166. // 创建一个 核心线程数量为5,最大数量为10,等待队列最大是3 的线程池,也就是最大容纳13个任务。
  167. // 默认的策略是抛出RejectedExecutionException异常,java.util.concurrent.ThreadPoolExecutor.AbortPolicy
  168. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,
  169. new LinkedBlockingQueue<Runnable>(3), new RejectedExecutionHandler() {
  170. @Override
  171. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  172. System.err.println("有任务被拒绝执行了");
  173. }
  174. });
  175. // 测试: 提交15个执行时间需要3秒的任务,看超过大小的2个,对应的处理情况
  176. for (int i = 0; i < 15; i++) {
  177. int n = i;
  178. threadPoolExecutor.submit(new Runnable() {
  179. @Override
  180. public void run() {
  181. try {
  182. System.out.println("开始执行:" + n);
  183. Thread.sleep(3000L);
  184. System.err.println("执行结束:" + n);
  185. } catch (InterruptedException e) {
  186. System.out.println("异常:" + e.getMessage());
  187. }
  188. }
  189. });
  190. System.out.println("任务提交成功 :" + i);
  191. }
  192. // 1秒后终止线程池
  193. Thread.sleep(1000L);
  194. threadPoolExecutor.shutdown();
  195. // 再次提交提示失败
  196. threadPoolExecutor.submit(new Runnable() {
  197. @Override
  198. public void run() {
  199. System.out.println("追加一个任务");
  200. }
  201. });
  202. // 结果分析
  203. // 1、 10个任务被执行,3个任务进入队列等待,2个任务被拒绝执行
  204. // 2、调用shutdown后,不接收新的任务,等待13任务执行结束
  205. // 3、 追加的任务在线程池关闭后,无法再提交,会被拒绝执行
  206. }
  207. /**
  208. * 8、 立刻终止线程:线程池信息: 核心线程数量5,最大数量10,队列大小3,超出核心线程数量的线程存活时间:5秒, 指定拒绝策略的
  209. *
  210. * @throws Exception
  211. */
  212. private void threadPoolExecutorTest8() throws Exception {
  213. // 创建一个 核心线程数量为5,最大数量为10,等待队列最大是3 的线程池,也就是最大容纳13个任务。
  214. // 默认的策略是抛出RejectedExecutionException异常,java.util.concurrent.ThreadPoolExecutor.AbortPolicy
  215. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,
  216. new LinkedBlockingQueue<Runnable>(3), new RejectedExecutionHandler() {
  217. @Override
  218. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  219. System.err.println("有任务被拒绝执行了");
  220. }
  221. });
  222. // 测试: 提交15个执行时间需要3秒的任务,看超过大小的2个,对应的处理情况
  223. for (int i = 0; i < 15; i++) {
  224. int n = i;
  225. threadPoolExecutor.submit(new Runnable() {
  226. @Override
  227. public void run() {
  228. try {
  229. System.out.println("开始执行:" + n);
  230. Thread.sleep(3000L);
  231. System.err.println("执行结束:" + n);
  232. } catch (InterruptedException e) {
  233. System.out.println("异常:" + e.getMessage());
  234. }
  235. }
  236. });
  237. System.out.println("任务提交成功 :" + i);
  238. }
  239. // 1秒后终止线程池
  240. Thread.sleep(1000L);
  241. List<Runnable> shutdownNow = threadPoolExecutor.shutdownNow();
  242. // 再次提交提示失败
  243. threadPoolExecutor.submit(new Runnable() {
  244. @Override
  245. public void run() {
  246. System.out.println("追加一个任务");
  247. }
  248. });
  249. System.out.println("未结束的任务有:" + shutdownNow.size());
  250. // 结果分析
  251. // 1、 10个任务被执行,3个任务进入队列等待,2个任务被拒绝执行
  252. // 2、调用shutdownnow后,队列中的3个线程不再执行,10个线程被终止
  253. // 3、 追加的任务在线程池关闭后,无法再提交,会被拒绝执行
  254. }
  255. public static void main(String[] args) throws Exception {
  256. // new Demo9().threadPoolExecutorTest1();
  257. // new Demo9().threadPoolExecutorTest2();
  258. // new Demo9().threadPoolExecutorTest3();
  259. // new Demo9().threadPoolExecutorTest4();
  260. // new Demo9().threadPoolExecutorTest5();
  261. // new Demo9().threadPoolExecutorTest6();
  262. // new Demo9().threadPoolExecutorTest7();
  263. new Demo9().threadPoolExecutorTest8();
  264. }
  265. }

线程池API - Executors工具类

Executors是用于创建线程池的工具类,常用方法如下:

  • newFixedThreadPool(int nThreads) 创建一个固定大小、任务队列容量无界的线程池。核心线程数= 最大线程数。
  • newCachedThreadPool() 创建的是一个大小无界的缓冲线程池。它的任务队列是一个同步队列。任务加入到池中,如果池中有空闲线程,则用空闲线程执行,如无则创建新线程执行。池中的线程空闲超过60 秒,将被销毁释放。线程数随任务的多少变化适用于执行耗时较小的异步任务。池的核心线程数=0,最大线程数=Integer.MAX_VALUE
  • newSingleThreadExecutor()只有一个线程来执行无界任务队列的单一线程池。该线程池确保任务按加入的顺序一个一个依次执行。当唯一的线程因任务异常中止时, 将创建一个新的线程来继续执行后续的任务。与newFixedThreadPool(1)的区别在于, 单一线程池的池大小在newSingleThreadExecutor 方法中硬编码,不能再改变的。
  • newScheduledThreadPool(int corePoolSize) 能定时执行任务的线程池。该池的核心线程数由参数指定, 最大线程数=Integer.MAX_VALUE

如何确定线程池中线程数量

  • 计算型任务: cpu 数量的1-2 倍。
  • IO型任务: 相对比计算型任务, 需多一些线程, 要根据具体的IO阻塞时长进行考量决定。如tomcat 中默认的最大线程数为: 200 。
  • 也可考虑根据需要在一个最小数量和最大数量间自动增减线程数(Executors.newCachedThreadPool())。