1. 为什么要使用线程池

在大多数并发框架中都会使用线程池来管理线程,使用线程池管理线程主要有如下好处:

  • 降低资源消耗。通过复用已存在的线程和降低线程关闭的次数来尽可能降低系统性能损耗;
  • 提升系统响应速度。通过复用线程,省去创建线程的过程,因此整体上提升了系统的响应速度;
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此,需要使用线程池来管理线程。

2. 线程池的工作原理

当一个并发任务提交给线程池,线程池分配线程去执行任务的过程如下图所示:
image.png


从图可以看出,线程池执行所提交的任务过程主要有这样几个阶段:

  • 判断核心线程数是否已满,核心线程数大小和corePoolSize参数有关,未满则创建线程执行任务
  • 若核心线程池已满,判断队列是否满,队列是否满和workQueue参数有关,若未满则加入队列中
  • 若队列已满,判断线程池是否已满,线程池是否已满和maximumPoolSize参数有关,若未满创建线程执行任务
  • 若线程池已满,则采用拒绝策略处理无法执执行的任务,拒绝策略和handler参数有关

3. 线程池的创建

创建线程池主要是ThreadPoolExecutor类来完成,ThreadPoolExecutor的有许多重载的构造方法,通过参数最多的构造方法来理解创建线程池有哪些需要配置的参数。ThreadPoolExecutor的构造方法为:

  1. ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler)

下面对参数进行说明:

  1. **corePoolSize**:表示核心线程池的大小。当提交一个任务时,如果当前核心线程池的线程个数没有达到 corePoolSize,则会创建新的线程来执行所提交的任务,即使当前核心线程池有空闲的线程
    1. 如果当前核心线程池的线程个数已经达到了 corePoolSize,则不再重新创建线程。
    2. 如果调用了 prestartCoreThread() 或者 prestartAllCoreThreads(),线程池创建的时候所有的核心线程都会被创建并且启动。
  2. **maximumPoolSize**:表示线程池能创建线程的最大个数。如果当阻塞队列已满时,并且当前线程池线程个数没有超过 maximumPoolSize 的话,就会创建新的线程来执行任务。
  3. **keepAliveTime**:空闲线程存活时间。如果当前线程池的线程个数已经超过了 corePoolSize,并且多于 corePoolSize 的线程空闲时间超过了 keepAliveTime的话,就会将这些空闲线程销毁,这样可以尽可能降低系统资源消耗。
  4. unit:时间单位。为 keepAliveTime 指定时间单位。
  5. workQueue:阻塞队列。用于保存任务的阻塞队列,关于阻塞队列可以看这篇文章。可以使用 ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, PriorityBlockingQueue

    1. ArrayBlockingQueue
      1. 数组实现的有界阻塞队列。该队列命令元素 FIFO(先进先出)
      2. 队列一旦创建,容量不能改变(阻塞的意思是:当队列容量满时,尝试将元素放入队列将导致操作阻塞;尝试从一个空队列中取一个元素也会同样阻塞)
      3. 不能保证线程访问队列的公平性
      4. 公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问到 ArrayBlockingQueue,如果保证公平性,通常会降低吞吐量
    2. **LinkedBlockingQueue**
      1. 链表实现的有界阻塞队列,FIFO
      2. 与 ArrayBlockingQueue 相比起来具有更高的吞吐量,为了防止 LinkedBlockingQueue 容量迅速增,损耗大量内存。通常在创建时会指定其大小,如果未指定,容量等于 Integer.MAX_VALUE
    3. PriorityBlockingQueue
      1. 支持优先级的无界阻塞队列
      2. 默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator 来指定排序规则。
    4. SynchronousQueue
      1. SynchronousQueue 每个插入操作必须等待另一个线程进行相应的删除操作,因此,SynchronousQueue 实际上没有存储任何数据元素,因为只有线程在删除数据时,其他线程才能插入数据,同样的,如果当前有线程在插入数据时,线程才能删除数据。
      2. SynchronousQueue 也可以通过构造器参数来为其指定公平性。
    5. LinkedTransferQueue
      1. 链表数据结构构成的无界阻塞队列,该队列实现了 TransferQueue 接口(https://juejin.cn/post/6844903602444582920
    6. LinkedBlockingDeque
      1. 基于链表数据结构的有界阻塞双端队列,如果在创建对象时为指定大小时,其默认大小为 Integer.MAX_VALUE。
      2. 与 LinkedBlockingQueue 相比,主要的不同点在于,LinkedBlockingDeque 具有双端队列的特性
    7. DelayQueue
      1. DelayQueue 是一个存放实现 Delayed 接口的数据的无界阻塞队列
      2. 只有当数据对象的延时时间达到时才能插入到队列进行存储。如果当前所有的数据都还没有达到创建时所指定的延时期,则队列没有队头,并且线程通过 poll 等方法获取数据元素则返回 null。
      3. 所谓数据延时期满时,则是通过 Delayed 接口的getDelay(TimeUnit.NANOSECONDS) 来进行判定,如果该方法返回的是小于等于 0 则说明该数据元素的延时期已满。
  6. threadFactory:创建线程的工厂类。可以通过指定线程工厂为每个创建出来的线程设置更有意义的名字,如果出现并发问题,也方便查找问题原因。

  7. handler:饱和策略。当线程池的阻塞队列已满和指定的线程都已经开启,说明当前线程池已经处于饱和状态了,那么就需要采用一种策略来处理这种情况。采用的策略有这几种:
    1. AbortPolicy: 直接拒绝所提交的任务,并抛出**RejectedExecutionException**异常;
    2. CallerRunsPolicy:只用调用者所在的线程来执行任务;
    3. DiscardPolicy:不处理直接丢弃掉任务;
    4. DiscardOldestPolicy:丢弃掉阻塞队列中存放时间最久的任务,执行当前任务

下图为ThreadPoolExecutor的execute方法的执行示意图:
image.png

execute方法执行逻辑有这样几种情况:

  1. 如果当前运行的线程少于corePoolSize,则会创建新的线程来执行新的任务;
  2. 如果运行的线程个数等于或者大于corePoolSize,则会将提交的任务存放到阻塞队列workQueue中;
  3. 如果当前workQueue队列已满的话,则会创建新的线程来执行任务;
  4. 如果线程个数已经超过了maximumPoolSize,则会使用饱和策略RejectedExecutionHandler来进行处理。

只要创建的总线程数 >= maximumPoolSize 的时候,线程池就不会继续执行任务了而会去执行拒绝策略的逻辑

ThreadPoolExecutor

image.png

  • Running:运行状态。能处理新提交的任务,也能处理阻塞队列中的任务
  • Shutdown:关闭状态。不能接受新提交的任务,但是能够继续处理阻塞队列中已经保存的任务。
  • Stop:不接收新的任务,也不能处理队列中已经保存的任务
  • Tidying:如果所有的任务都已经终止,线程池会进入该状态
  • Terminated:Tidying 状态调用 terminated() 进入该状态。

ThreadPool 提供的方法

  • execute ():提交任务,交给线程池执行
  • submit ():提交任务,能够返回执行结果 execute+ Future
  • shutdown ():关闭线程池,等待任务都执行完
  • shutdownNow() :关闭线程池,不等待任务执行完
  • getTaskCount ():线程池已执行和未执行的任务总数
  • getCompletedTaskCount ():已完成的任务数量
  • getPoolSize ():线程池当前的线程数量
  • getActiveCount ():当前线程池中正在执行任务的线程数量
  • getQueue():返回执行程序使用的任务队列
  • isTerminating():如果此执行程序正在终止,则返回true
  • isTerminated():当所有的线程都关闭成功,则返回 ture

线程池类图
image.png

Executor 接口

  • Executors.newCachedThreadPool:创建可缓存的线程池
  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }
  • Executors.newFixedThreadPool:创建定长线程池
  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }
  • Executors.newScheduledThreadPool:创建定长线程池,并且支持定时和周期性的执行
  1. public ScheduledThreadPoolExecutor(int corePoolSize) {
  2. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  3. new DelayedWorkQueue());
  4. }

Demo:

  1. public static void main(String[] args) {
  2. ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
  3. executorService.schedule(new Runnable() {
  4. @Override
  5. public void run() {
  6. log.warn("schedule run");
  7. }
  8. }, 3, TimeUnit.SECONDS);
  9. executorService.scheduleAtFixedRate(new Runnable() {
  10. @Override
  11. public void run() {
  12. log.warn("schedule run");
  13. }
  14. }, 1, 3, TimeUnit.SECONDS);
  15. // executorService.shutdown();
  16. Timer timer = new Timer();
  17. timer.schedule(new TimerTask() {
  18. @Override
  19. public void run() {
  20. log.warn("timer run");
  21. }
  22. }, new Date(), 5 * 1000);
  23. }
  • Executors.newSingleThreadExecutor:创建单线程化的线程池
  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }

4. 线程池的关闭

关闭线程池,可以通过 shutdownshutdownNow 这两个方法。它们的原理都是遍历线程池中所有的线程,然后依次中断线程。shutdownshutdownNow 还是有不一样的地方:

  1. shutdownNow 首先将线程池的状态设置为 STOP,然后尝试停止所有的正在执行和未执行任务的线程,并返回等待执行任务的列表(会中断正在执行的任务)。
  2. shutdown 只是将线程池的状态设置为 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程(正在执行的任务会继续执行完)。
  • 调用了这两个方法的任意一个,isShutdown 方法都会返回 true,
  • 当所有的线程都关闭成功,才表示线程池成功关闭,这时调用 isTerminated 方法才会返回 true。

5 线程池的合理配置

一些共识

工作线程并不是设置的越大越好

  • 一来服务器 CPU 核数有限,同时并发的线程数是有限的,1 核 CPU 设置10000个工作线程没有意义。
  • 线程切换是有开销的,如果线程切换过于频繁,反而会使性能降低。

调用 **sleep()** 函数的时候,线程是否一直占用CPU

  • 不占用,等待时会把CPU让出来,给其他需要CPU资源的线程使用。
  • 不止调用 sleep() 函数,在进行一些阻塞调用,例如网络编程中的阻塞 accept()【等待客户端连接】和阻塞 recv()【等待下游回包】也不占用 CPU 资源。

如果CPU是单核,设置多线程有意义么,能提高并发性能么?

  • 回答:即使是单核,使用多线程也是有意义的。
  • 多线程编码可以让我们的服务/代码更加清晰,有些 IO 线程收发包,有些 Worker 线程进行任务处理,有些 Timeout 线程进行超时检测。
  • 如果有一个任务一直占用 CPU 资源在进行计算,那么此时增加线程并不能增加并发,例如这样的一个代码:

    1. while(1){ i++; }
  • 该代码一直不停的占用 CPU 资源进行计算,会使 CPU 占用率达到100%。

  • 通常来说,Worker 线程一般不会一直占用 CPU 进行计算,此时即使 CPU 是单核,增加 Worker 线程也能够提高并发,因为这个线程在休息的时候,其他的线程可以继续工作。

常见服务线程模型

IO 线程与工作线程通过队列解耦类模型
image.png

如上图,大部分 Web-Server 与服务框架都是使用这样的一种 IO 线程与 Worker 线程通过队列解耦 类线程模型:

  • 有少数几个 IO 线程监听上游发过来的请求,并进行收发包(生产者)
  • 有一个或者多个任务队列,作为 IO 线程与 Worker 线程异步解耦的数据传输通道(临界资源)
  • 有多个工作线程执行正真的任务(消费者)

这个线程模型应用很广,符合大部分场景,这个线程模型的特点是,工作线程内部是同步阻塞执行任务的(回想一下 tomcat 线程中是怎么执行 Java 程序的,dubbo 工作线程中是怎么执行任务的),因此可以通过增加 Worker 线程数来增加并发能力,今天要讨论的重点是:「该模型 Worker 线程数设置为多少能达到最大的并发」。

纯异步线程模型:任何地方都没有阻塞,这种线程模型只需要设置很少的线程数就能够做到很高的吞吐量(使用较少,不讨论)

image.png
了解工作线程的工作模式,对量化分析线程数的设置非常有帮助:
上图是一个典型的工作线程的处理过程,从开始处理start到结束处理end,该任务的处理共有7个步骤:

  1. 1. 从工作队列里拿出任务,进行一些本地初始化计算,例如http协议分析、参数解析、参数校验等
  2. 2. 访问cache拿一些数据
  3. 3. 拿到cache里的数据后,再进行一些本地计算,这些计算和业务逻辑相关
  4. 4. 通过RPC调用下游service再拿一些数据,或者让下游service去处理一些相关的任务
  5. 5. RPC调用结束后,再进行一些本地计算,怎么计算和业务逻辑相关
  6. 6. 访问DB进行一些数据操作
  7. 7. 操作完数据库之后做一些收尾工作,同样这些收尾工作也是本地计算,和业务逻辑相关
  8. 分析整个处理的时间轴,会发现:
  9. 1)其中1357步骤中【上图中粉色时间轴】,线程进行本地业务逻辑计算时需要占用CPU
  10. 2)而246步骤中【上图中橙色时间轴】,访问cacheserviceDB过程中线程处于一个等待结果的状态,不需要占用CPU
  11. 进一步的分解,这个“等待结果”的时间共分为三部分:
  12. 1. 请求在网络上传输到下游的cacheserviceDB
  13. 2. 下游cacheserviceDB进行任务处理。
  14. 3. cacheserviceDB将报文在网络上传回工作线程。
  15. ## 量化分析并合理设置工作线程数
  16. 最后一起来回答工作线程数设置为多少合理的问题。
  17. 通过上面的分析,Worker线程在执行的过程中,有一部计算时间需要占用CPU,另一部分等待时间不需要占用CPU,通过量化分析,例如打日志进行统计,可以统计出整个Worker线程执行过程中这两部分时间的比例,例如:
  18. 1. 时间轴1357【上图中粉色时间轴】的计算执行时间是100ms
  19. 2. 时间轴246【上图中橙色时间轴】的等待时间也是100ms
  20. 得到的结果是,这个线程计算和等待的时间是11,即有50%的时间在计算(占用CPU),50%的时间在等待(不占用CPU):
  21. 1. 假设此时是单核,则设置为2个工作线程就可以把CPU充分利用起来,让CPU跑到100%
  22. 2. 假设此时是N核,则设置为2N个工作线程就可以把CPU充分利用起来,让CPU跑到N*100%。

:::info N核服务器,通过执行业务的单线程分析出本地计算时间为 x,等待时间为 y,则工作线程数(线程池线程数)设置为 **N*(x+y)/x**,能让CPU的利用率最大化。 :::

:::info

  • CPU 密集型任务,就需要尽量压榨 CPU,配置尽可能少的线程数量,参考值可以设为 NCPU+1 (NCPU:CPU数量)
    • CPU 密集型:指的是系统的硬盘、内存性能相对 CPU 要好很多(硬盘、内存 > CPU)
  • IO 密集型任务,由于需要等待 IO 操作,线程并不是一直在执行任务,则配置尽可能多的线程。参考值可以设置为 2*NCPU
    • 系统的 CPU 性能相对硬盘、内存要好很多 (CPU > 硬盘、内存)
  • 混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。 :::

注:

  • 可以通过 Runtime.getRuntime().availableProcessors() 方法获得当前设备的 CPU 个数。
  • 阻塞队列最好是使用有界队列,如果采用无界队列的话,一旦任务积压在阻塞队列中的话就会占用过多的内存资源,甚至会使得系统崩溃。

6. 生产上的一个使用实例

  1. /**
  2. * 同步录播课章节顺序
  3. *
  4. * @return
  5. * @throws InterruptedException
  6. */
  7. @ResponseBody
  8. @RequestMapping(value = "/synchroCourseVideoChapter", method = RequestMethod.GET)
  9. public String synchroCourseVideoChapter() throws InterruptedException {
  10. log.info("#############videoChapter:同步排序章节开始");
  11. String status;
  12. ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(100);
  13. ClassModule module = new ClassModule();
  14. module.setPageSize(100);
  15. module.setDelFlag(0);
  16. module.setTeachMethod("TEACH_METHOD_VIDEO");
  17. int page = 1;
  18. for (; ; page++) {
  19. module.setPage(page);
  20. List<ClassModule> classModules = classModuleServiceImpl.findClassModuleByPage(module);
  21. if (classModules.isEmpty() || classModules.size() == 0) {
  22. log.info("#############videoChapter:暂无章节可以处理");
  23. break;
  24. }
  25. for (ClassModule classModule : classModules)
  26. executorService.execute(new CourseVideoChapterThread(classModule.getId()));
  27. // 获取任务队列的长度
  28. int size = executorService.getQueue().size();
  29. try {
  30. // 判断任务队列长度,计算休眠时间
  31. Thread.sleep((size / 10) == 0 ? (size / 10) : 20);
  32. } catch (InterruptedException e) {
  33. e.printStackTrace();
  34. }
  35. }
  36. // 关闭线程池 启动一次顺序关闭,执行以前提交的任务,但不接受新任务
  37. executorService.shutdown();
  38. for (; ; ) {
  39. // 用死循环去判断执行程序是否已经终止,来更改状态
  40. if (executorService.isTerminated()) {
  41. // 所有的子线程都结束了
  42. status = JsonMsg.SUCCESS;
  43. log.info("#############videoChapter:更新录播课章节顺序处理完成");
  44. break;
  45. }
  46. Thread.sleep(1000);
  47. }
  48. return status;
  49. }

资料:

ThreadPoolExecutor源码分析,很详细