1. 线程池简介

线程池(英语:Thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
例子: 10年前单核CPU电脑,假的多线程,像马戏团小丑玩多个球,CPU需要来回切换。 现在是多核电脑,多个线程各自跑在独立的CPU上,不用切换效率高。
线程池的优势: 线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

它的主要特点为:
• 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
• 提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行。
• 提高线程的可管理性: 线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。 • Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类

image.png

2. 线程池参数说明

2.1 线程池中的七个重要的参数及流程(重要)

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. if (corePoolSize < 0 ||
  9. maximumPoolSize <= 0 ||
  10. maximumPoolSize < corePoolSize ||
  11. keepAliveTime < 0)
  12. throw new IllegalArgumentException();
  13. if (workQueue == null || threadFactory == null || handler == null)
  14. throw new NullPointerException();
  15. this.corePoolSize = corePoolSize;
  16. this.maximumPoolSize = maximumPoolSize;
  17. this.workQueue = workQueue;
  18. this.keepAliveTime = unit.toNanos(keepAliveTime);
  19. this.threadFactory = threadFactory;
  20. this.handler = handler;
  21. }

• corePoolSize 线程池的核心线程数 (常驻线程数量)
• maximumPoolSize能容纳的最大线程数 (最大线程数量)
• keepAliveTime空闲线程存活
• unit 存活的时间单位
• workQueue 存放提交但未执行任务的队列 (阻塞队列)
• threadFactory 创建线程的工厂类 (线程工厂)
• handler 等待队列满后的拒绝策略 (拒绝策略)

线程池中,有三个重要的参数,决定影响了拒绝策略:
corePoolSize - 核心线程数,也即最小的线程数
workQueue - 阻塞队列
maximumPoolSize - 最大线程数

当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到maximumPoolSize 最大线程数配置。此时,再多余的任务,则会触发线程池的拒绝策略了。

总结起来,当提交的任务数大于(workQueue.size() +maximumPoolSize ),就会触发线程池的拒绝策略。

2.2 拒绝策略(重点)

image.png

CallerRunsPolicy: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
AbortPolicy: 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
DiscardPolicy: 直接丢弃,其他啥都没有
DiscardOldestPolicy: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列workQueue 中最老的一个任务,并将新任务加入

AbortPolicy 抛出异常Rejected Execution Exception
CallerRunsPolicy 使用调用线程直接运行任务
DiscardPolicy 直接丢弃
DiscardOldestPolicy

3. 线程池使用方式

在 Executors 类里面提供了一些静态工厂,生成一些常用的线程池。

  1. newFixedThreadPool:创建固定大小的线程池,线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
  2. newCachedThreadPool:创建一个可缓存的线程池,如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60 秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说 JVM)能够创建的最大线程大小。
  3. newSingleThreadExecutor:创建一个单线程的线程池,这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
  4. newScheduledThreadPool:创建一个大小无限的线程池,此线程池支持定时以及周期性执行任务的需求。
  5. newSingleThreadScheduledExecutor:创建一个单线程的线程池,此线程池支持定时以及周期性执行任务的需求。
  6. newWorkStealingPool:jdk1.8提供的线程池,底层使用的是ForkJoinPool实现,创建一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用cpu核数的线程来并行执行任务

4. 线程池底层原理

image.png
1. 在创建了线程池后,线程池中的线程数为零
2. 当调用execute()方法添加一个请求任务时,线程池会做出如下判断: 2.1 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务; 2.2 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列; 2.3 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务; 2.4 如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
3. 当一个线程完成任务时,它会从队列中取下一个任务来执行
4. 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
4.1 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。
4.2 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。

5. ThreadPoolTaskExecutor 拓展

ThreadPoolTaskExecutor 这个类则是spring包下的,是sring为我们提供的线程池类,这里重点讲解这个类的用法,可以使用基于xml配置或者Configuration的方式创建。

  1. @Configuration
  2. public class ExecturConfig {
  3. @Bean("taskExector")
  4. public Executor taskExector() {
  5. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  6. int i = Runtime.getRuntime().availableProcessors();//获取到服务器的cpu内核
  7. executor.setCorePoolSize(5);//核心池大小
  8. executor.setMaxPoolSize(100);//最大线程数
  9. executor.setQueueCapacity(1000);//队列程度
  10. executor.setKeepAliveSeconds(1000);//线程空闲时间
  11. executor.setThreadNamePrefix("tsak-asyn");//线程前缀名称
  12. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());//配置拒绝策略
  13. return executor;
  14. }
  15. }

其底层队列具体使用哪个,是根据queueCapacity判断的

  1. /**
  2. * Create the BlockingQueue to use for the ThreadPoolExecutor.
  3. * <p>A LinkedBlockingQueue instance will be created for a positive
  4. * capacity value; a SynchronousQueue else.
  5. * @param queueCapacity the specified queue capacity
  6. * @return the BlockingQueue instance
  7. * @see java.util.concurrent.LinkedBlockingQueue
  8. * @see java.util.concurrent.SynchronousQueue
  9. */
  10. protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
  11. if (queueCapacity > 0) {
  12. return new LinkedBlockingQueue<Runnable>(queueCapacity);
  13. }
  14. else {
  15. return new SynchronousQueue<Runnable>();
  16. }
  17. }