前言

本文主要内容

  • 介绍线程池的参数
  • 介绍Java并发库提供的线程池
  • 介绍线程池如何实现 Thread-Per-Message 模式
  • 介绍线程池如何实现 Work Thread 模式

状态转换

参数

概览

看一下 ThreadPoolExecutor 构造器的参数
image.png

  • corePoolSize:线程池中一直处于 RUNNING 的线程数
  • maximumPoolSize:同时处于 RUNNIING 状态的最大线程数
  • keepAliveTime:超出corePoolSize数量的活跃线程等待新任务的时间
  • unit:时间单位
  • workQueue:阻塞队列,存储任务
  • threadFactory:创建线程的工厂
  • handler:拒绝策略

    TimeUnit

    时间单位,是一个枚举类
    image.png

工作队列

工作队列使用BlockingQueue,这只是一个接口
image.png
看一看它的实现类都有哪些
image.png
看起来很多,但是并不复杂,部分特征体现在名字上。按照是否有界进行分类

有界

内部的队列实现有容量限制

ArrayBlockingQueue

image.png

  • 使用数组实现
  • 构造时,必须传入容量大小
  • 使用 ReentrantLock 保证线程安全
  • 使用 Condition notEmptyCondition notFull 来进行通信,实现阻塞

    LinkedBlockingQueue

  • 使用链表实现

  • 使用 int capacity 来限制容量,若不设置,则为 Integer.MAX_VALUE
  • 使用 ReentrantLock 保证线程安全
  • 使用 Condition notEmptyCondition notFull 来进行通信,实现阻塞

    无界

    内部的队列实现无容量限制,可能会造成OOM

    LinkedBlockingDeque

  • 使用链表实现

  • 双端队列,前后都可以添加和移出
  • 使用 ReentrantLock 保证线程安全
  • 使用 Condition notEmptyCondition notFull 来进行通信,实现阻塞


SynchronousQueue

并不维护队列,put时必须有线程进行poll,否则阻塞,poll 时也必须有线程进行put,否则阻塞。

LinkedTransferQueue

LinkedBlockingQueue 与 SynchronousQueue 的结合,即可以维持队列,又可以将信息直接传给等待的线程,而不存入队列中。put时,如果有等待的线程,则直接交给等待的线程,如果没有,则放入队列尾

PriorityBlockingQueue

  • 使用数组实现
  • 可以设置初始容量,在添加时,当容量不足会使用 tryGrow 方法进行扩容(扩大约 50%)
  • 支持按照优先级出队(细节先省略,之后另开篇并发容器再聊)
  • 使用 ReentrantLock 保证线程安全
  • 使用 Condition notEmptyCondition notFull 来进行通信,实现阻塞


DelayQueue

在 PriorityBlockingQueue 的基础上增加延迟出队
image.png

拒绝策略

RejectedExecutionHandler为拒绝策略接口,只定义了一个方法
image.png
当线程池不再接受任务时,调用 rejectedExecution 处理后续传递来的任务,其实现类有
image.png
默认使用 AbortPolicy
image.png

CallerRunsPolicy

如果线程池不是 shutdown 状态,则在当前线程执行任务,否则直接丢弃
image.png

AbortPolicy(默认)

抛出 RejectedExecutionException 异常,注明是哪个任务,来自哪个线程池
image.png

DiscardPolicy

直接丢弃
image.png

DiscardOldestPolicy

丢弃最早加入队列中的任务,执行当前接收到的任务
image.png

线程工厂

创建线程,可以在这里给创建的线程添加一些附加属性,或者进行记录。比如说根据线程进行的工作,给线程进行命名,方便排查错误
image.png
这里就来看下默认使用的DefaultThreadFactory,如果不传递线程工厂的话,就使用这个工厂类

DefaultThreadFactory

代码如下

  1. /**
  2. * The default thread factory.
  3. */
  4. private static class DefaultThreadFactory implements ThreadFactory {
  5. private static final AtomicInteger poolNumber = new AtomicInteger(1);//记录线程池数量,给线程池编号
  6. private final ThreadGroup group;
  7. private final AtomicInteger threadNumber = new AtomicInteger(1);//记录线程数量,给池中线程编号
  8. private final String namePrefix;
  9. DefaultThreadFactory() {
  10. SecurityManager s = System.getSecurityManager();//获取安全管理器,设置线程权限,避免访问敏感资源
  11. group = (s != null) ? s.getThreadGroup() ://如果未设置安全管理器,则使用当前线程的线程组
  12. Thread.currentThread().getThreadGroup();
  13. namePrefix = "pool-" +
  14. poolNumber.getAndIncrement() +
  15. "-thread-";
  16. }
  17. public Thread newThread(Runnable r) {
  18. Thread t = new Thread(group, r,
  19. namePrefix + threadNumber.getAndIncrement(),
  20. 0);//设置线程组 线程名字 线程堆栈大小
  21. if (t.isDaemon())
  22. t.setDaemon(false);//设置为非守护线程
  23. if (t.getPriority() != Thread.NORM_PRIORITY)//设置为默认优先级 5
  24. t.setPriority(Thread.NORM_PRIORITY);
  25. return t;
  26. }
  27. }

主要完成了以下工作

  • 对线程进行命名:pool-线程池编号-thread-线程编号
  • 设置线程组
    • 如果设置了SecurityManager(关于SecurityManager可以看下面的参考),则使用SecurityManager的线程组
    • 如果未设置,则使用当前线程的线程组
  • 设置为非守护线程
  • 设置线程优先级为默认优先级 5

    Java并发库提供的线程池

    newCachedThreadPool()

其实提供的线程池就是将以上的所介绍的参数实现进行组装,看看源码就知道功能了。看下源码
image.png
主要关注两个方面,一是最大线程数为 Integer.MAX_VALUE,当任务过多时,有资源耗尽的风险;二是阻塞队列使用 SynchronousQueue,也就是不维护队列,任务到来就直接传递给处理的线程。
这里也可以选择另外一个方法,指定创建线程的工厂
image.png

newFixedThreadPool(int nThreads)

这个就相当于使用 Work Thread 模式了,只有核心线程。阻塞队列使用的是 LinkedBlockingQueue,虽然这是个有界队列,但是该方法没有暴露设置阻塞队列大小的参数,默认是 Integer.MAX_VALUE,因此当任务数量过多时,也有资源耗尽的危险

image.png
这个也有指定线程工厂的方法,不贴图了

newSingleThreadExecutor()

保证线程池中有一个活跃线程,但是使用了 LinkedBlockingQueue 做阻塞队列,且无法设置阻塞队列大小,因此也存在资源耗尽的风险
image.png

newSingleThreadScheduledExecutor()

newWorkStealingPool(int parallelism)

线程池实现 Work Thread 模式的方法

我觉得线程池是 Thread-Per-Message 模式Work Thread 模式的结合,也可以说是 Work Thread 模式的优化版,解决了它无法灵活应对任务数突然增多的问题。core 线程使用 Work Thread 模式,来复用线程;maximumPoolSize - corePoolSize 的部分线程数就是使用了 Thread-Per-Message模式,来应对突增的任务数,并在随着任务数的递减而逐渐销毁,减少资源消耗,同时 maximumPoolSize 也限制了线程占用的最大资源,避免服务器因创建大量线程而宕机。

使用延迟加载初始化线程池

ThreadPoolExecutor并不是在创建实例的时候,就创建了 corePoolSize 数量的线程,而是使用延迟加载,当有任务到达时,如果当前线程数量小于 corePoolSize 则创建核心线程,去进行处理
image.pngimage.png

通过不停获取任务来保证核心线程一直运行

线程池的内部类 Worker 实现了 Runnable ,来看下它的 run 方法。这里是跳转到了 runWorker
image.png
在 runWorker 中,实现线程不退出的方法如下。通过不停获取任务来来保证线程不退出
image.png

线程池实现 Thread-Per-Message 模式的方法

任务数超过核心线程数时创建非核心线程

以下是 ThreadPoolExecutor 中 execute 方法 的部分代码
image.png
在这个方法中,先判断当前任务数是否小于核心数,如果小于则创建核心线程去执行该任务
如果大于核心线程数,则将任务添加到阻塞队列中,然后创建非核心线程去执行

利用任务获取时间来保证非核心线程按时退出

非核心线程当执行完任务后,会随着任务量的减少,逐渐退出,这个功能是通过使用获取任务时间来实现的。相关代码在 getTask() 方法中
image.png
getTask 中的主体逻辑在这个死循环中
image.png
在 getTask() 方法中通过 timed 来记录是否需要设定获取任务时间。如果当前活跃线程数 > 核心线程数量,则设置获取任务时间
image.png
之后,需要在规定时间内获取到任务,如果超时,timeOut 会设置为 true
image.png
之后在下次循环中,CAS 减少活跃线程数记录,然后返回 null,也就是标志着获取任务失败
image.png
获取任务失败,退出循环,清理退出线程
image.png

参考