线程池
线程池是一种 “池化” 的线程使用模式,通过创建一定数量的线程,让这些线程处于就绪状态来提高系统响应速度,在线程使用完成后归还到线程池来达到重复利用的目标,从而降低系统资源的消耗。
池的好处
使用线程池,有如下优势
- 降低资源消耗
- 通过重复利用已创建的线程降低线程创建和销毁造成的消耗
- 提高响应速度
- 当任务到达时,任务可以不需要等到线程创建就能立即执行
- 提高线程的可管理性
- 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
- 提供更多更强大的功能
接口或类 | 说明 |
---|---|
Executor 接口 | 定义了一个接收 Runnable 对象的方法 executor |
ExecutorService 接口 | 一个比 Executor 使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回 Future 的方法 |
AbstractExecutorService 抽象类 | ExecutorService 执行方法的默认实现 |
ScheduledExecutorService 接口 | 一个可定时调度任务的接口 |
ScheduledThreadPoolExecutor类 | ScheduledExecutorService 的实现,一个可定时调度任务的线程池 |
ThreadPoolExecutor 类 | 多用于创建线程池 |
常用方法
Executors 常用方法如下
- newCachedThreadPool()
- 创建一个可缓存的线程池
- CachedThreadPool 适用于并发执行大量短期耗时短的任务,或者负载较轻的服务器
- newFiexedThreadPool(int nThreads)
- 创建固定数目线程的线程池
- FiexedThreadPool 适用于负载略重但任务不是特别多的场景,为了合理利用资源需要限制线程数量的场景
- newSingleThreadExecutor()
- 创建一个单线程化的 Executor
- SingleThreadExecutor 适用于串行执行任务的场景,每个任务按顺序执行,不需要并发执行
- newScheduledThreadPool(int corePoolSize)
- 创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代 Timer 类
- ScheduledThreadPool 是一个调度池,其实现了 schedule、scheduleAtFixedRate、scheduleWithFixedDelay 三个方法,可以实现延迟执行、周期执行等操作
- newSingleThreadScheduledExecutor()
- 创建一个 corePoolSize 为 1 的 ScheduledThreadPoolExecutor
newWorkStealingPool(int parallelism)
Executors 创建的 FiexedThreadPool 和 SingleThreadPool 任务队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM;
Executors 创建的 CachedThreadPool 和 ScheduledThreadPool 允许创建的线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM
ThreadPoolExecutor
Java 中,线程池的实现类是 ThreadPoolExecutor,其构造函数如下。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueueworkQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 复制代码 corePoolSize:线程池核心线程数
- maximumPoolSize:线程池所能容纳的最大线程数
- keepAliveTime:线程闲置存活时长
- timeUnit:线程闲置存活时长的时间单位,如 TimeUnit.MILLISECONDS、TimeUnit.SECONDS
- blockingQueue:任务队列,常用的任务队列包括
- ArrayBlockingQueue:一个数组实现的有界阻塞队列,此队列按照 FIFO 的原则对元素进行排序,支持公平访问队列
- LinkedBlockingQueue:一个由链表结构组成的可选有界阻塞队列,如果不指定大小,则使用 Integer.MAX_VALUE 作为队列大小,按照 FIFO 的原则对元素进行排序
- PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列,默认情况下采用自然顺序排列,也可以指定 Comparator
- DelayQueue:一个支持延时获取元素的无界阻塞队列,创建元素时可以指定多久以后才能从队列中获取当前元素,常用于缓存系统设计与定时任务调度等
- SynchronousQueue:一个不存储元素的阻塞队列,存入操作必须等待获取操作,反之亦然
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列,与 LinkedBlockingQueue 相比多了 transfer 和tryTranfer 方法,该方法在有消费者等待接收元素时会立即将元素传递给消费者
- LinkedBlockingDeque:一个由链表结构组成的双端阻塞队列,可以从队列的两端插入和删除元素
- threadFactory:线程工厂,用于指定为线程池创建新线程的方式,threadFactory 可以设置线程名称、线程组、优先级等参数,如通过 Google 工具包可以设置线程池里的线程名,代码如下。
new ThreadFactoryBuilder().setNameFormat(“general-detail-batch-%d”).build(); 复制代码
RejectedExecutionHandler:拒绝策略,常见的拒绝策略包括
- ThreadPoolExecutor.AbortPolicy:默认策略,当任务队列满时抛出 RejectedExecutionException 异常
- ThreadPoolExecutor.DiscardPolicy:丢弃掉不能执行的新任务,不抛任何异常
- ThreadPoolExecutor.CallerRunsPolicy:当任务队列满时使用调用者的线程直接执行该任务
- ThreadPoolExecutor.DiscardOldestPolicy:当任务队列满时丢弃阻塞队列头部的任务(即最老的任务),然后添加当前任务
线程的状态
Java 线程的 6 个状态的转换如下图所示。//Thread.State 源码
public enum State {
NEW,
RUNNABLE,
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED;
}
线程池的状态
```java public class ThreadPoolExecutor extends AbstractExecutorService{ private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; } ``` ThreadPoolExecutor 线程池有如下几种状态
RUNNING:运行状态,接受新任务,持续处理任务队列里的任务;
- SHUTDOWN:不再接受新任务,但要处理任务队列里的任务;
- STOP:不再接受新任务,不再处理任务队列里的任务,中断正在进行中的任务;
- TIDYING:表示线程池正在停止运作,中止所有任务,销毁所有工作线程,当线程池执行 terminated() 方法时进入 TIDYING 状态;
- TERMINATED:表示线程池已停止运作,所有工作线程已被销毁,所有任务已被清空或执行完毕,terminated() 方法执行完成;
线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量(workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起,如下代码所示。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl 是一个 AtomicInteger 类型的变量,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,它同时包含两部分的信息,线程池的运行状态(runState)和线程池内有效线程的数量(workerCount)。ctl 的高 3 位保存 runState,低 29 位保存 workerCount。两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。
关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法,如以下代码所示。
//计算当前运行状态
private static int runStateOf(int c){
return c & ~CAPACITY;
}
//计算当前线程数量
private static int workerCountOf(int c) {
return c & CAPACITY;
}
//通过状态和线程数生成ctl
private static int ctlOf(int rs, int wc){
return rs | wc;
}
线程池任务调度机制
线程池提交一个任务时,任务调度的主要步骤如下
- 当线程池里存活的核心线程数小于 corePoolSize 核心线程数参数的值时,线程池会创建一个核心线程去处理提交的任务;
- 如果线程池核心线程数已满,即线程数已经等于 corePoolSize,新提交的任务会被尝试放进任务队列 workQueue 中等待执行;
- 当线程池里面存活的线程数已经等于 corePoolSize 了,且任务队列 workQueue 已满,再判断当前线程数是否已达到 maximumPoolSize,即最大线程数是否已满;如果没到达,创建一个非核心线程执行提交的任务;
- 如果当前的线程数已达到了 maximumPoolSize,还有新的任务提交过来时,执行拒绝策略进行处理。
如上图所示,线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两个部分
- 任务管理
- 线程管理
「任务管理」部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转
- 直接申请线程执行该任务
- 缓冲到队列中等待线程执行
- 拒绝该任务
「线程管理」部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。
创建多少个线程合适
- ref 1-《Java并发编程实战》
- ref 2-日拱一兵-Java并发编程图册2022
对于 CPU 密集型程序,最佳线程数 = CPU核数 + 1。
计算密集型(CPU 密集型)的线程恰好在某时因为发⽣⼀个⻚错误或者因其他原因⽽暂停,刚好有⼀个 “额外” 的线程,可以确保在这种情况下 CPU 周期不会中断⼯作。—— 《Java并发编程实战》
对于 I/O 密集型程序,最佳线程数 = CPU核心数 (1 / CPU 利用率) = CPU核心数 (1 + (IO 耗时 / CPU 耗时))
参照上述公式,如程序几乎全部是 IO 耗时,则最佳线程数 = 2N(N 是 CPU 核数)或 2N + 1。
线程池监控
- ref 1-如何监控线程池
对于「线程池监控」,主要涉及到以下方面
推荐使用第 2 种方式,因为第一种方式中,会通过获取锁来获取线程池的状态,性能相对较差。
在获取线程池的状态(如当前线程数、活跃线程数、最大出现线程数、线程池任务完成总量等信息)时,会先获取到 mainLock,然后才开始计算。mainLock 是线程池的主锁,线程执行、线程销毁和线程池停止等都会使用到这把锁。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//获取线程池的状态
...
} finally {
mainLock.unlock();
}
监控的指标
指标 | 描述 |
---|---|
线程池当前负载 | 当前线程数 / 最大线程数 |
线程池峰值负载 | 线程池运行期间最大的负载 |
核心线程数 | 线程池的核心线程数 |
最大线程数 | 线程池限制同时存在的线程数 |
当前线程数 | 当前线程池的线程数 |
活跃线程数 | 执行任务的线程的大致数目 |
最大出现线程数 | 线程池中运行以来同时存在的最大线程数 |
阻塞队列 | 线程池暂存任务的容器 |
队列容量 | 队列中允许元素的最大数量 |
队列元素 | 队列中已存放的元素数量 |
队列剩余容量 | 队列中还可以存放的元素数量 |
线程池任务完成总量 | 已完成执行的任务的大致总数 |
拒绝策略执行次数 | 运行时抛出的拒绝次数总数 |
对于一个线程池,可调用 threadPoolExecutor 提供的相关方法,获取线程池的当前排队线程数、当前活动线程数、执行完成线程数、总线程数
//当前排队线程数
int queueSize = threadPoolExecutor.getQueue().size();
//当前活动线程数
int activeCount = threadPoolExecutor.getActiveCount();
//执行完成线程数
long completeTaskCount = threadPoolExecutor.getCompletedTaskCount();
//总线程数
long taskCount = threadPoolExecutor.getTaskCount();
监控数据存储
为便于定位问题,帮助开发排查问题,需要考虑将监控数据存储下来,如 ES 存储或 MySQL 存储。需要存储的信息如下(以 MySQL 为例)。
CREATE TABLE `his_run_data` (
`thread_pool_id` varchar(56) DEFAULT NULL COMMENT '线程池ID',
`instance_id` varchar(256) DEFAULT NULL COMMENT '实例ID',
`current_load` bigint(20) DEFAULT NULL COMMENT '当前负载',
`peak_load` bigint(20) DEFAULT NULL COMMENT '峰值负载',
`pool_size` bigint(20) DEFAULT NULL COMMENT '线程数',
`active_size` bigint(20) DEFAULT NULL COMMENT '活跃线程数',
`queue_capacity` bigint(20) DEFAULT NULL COMMENT '队列容量',
`queue_size` bigint(20) DEFAULT NULL COMMENT '队列元素',
`queue_remaining_capacity` bigint(20) DEFAULT NULL COMMENT '队列剩余容量',
`completed_task_count` bigint(20) DEFAULT NULL COMMENT '已完成任务计数',
`reject_count` bigint(20) DEFAULT NULL COMMENT '拒绝次数',
`timestamp` bigint(20) DEFAULT NULL COMMENT '时间戳',
`gmt_create` datetime DEFAULT NULL COMMENT '创建时间',
`gmt_modified` datetime DEFAULT NULL COMMENT '修改时间',
PRIMARY KEY (`id`),
KEY `idx_group_key` (`tp_id`,`instance_id`) USING BTREE,
KEY `idx_timestamp` (`timestamp`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='历史运行数据表';
抽象为一个公共服务
在上文分析的基础上,可考虑将监控线程池抽象为一个公共的服务。在「线程池监控服务」中,完成以下步骤
- 客户端定时采集线程池历史运行数据,将数据打包好发送服务端;
- 服务端接收客户端上报的数据,进行数据入库持久化存储;
- 服务端定期删除或存档客户端线程池历史运行数据;
- 由服务端统一对外提供线程池运行图表的数据展示。
在这里,建议将监控数据的「采集、上报」和「存储、归档」分离开,采集的监控数据可以用 MQ 发送给「存储、归档」服务端。好的设计,应该保证单一职责。
动态可观测线程池
在「线程池监控」的基础上,介绍下动态可观测线程池。
动态可观测线程池的核心思想是,通过中心配置(如 Nacos、Zookeeper)将线程池的核心参数 corePoolSize、maximumPoolSize、workQueue 动态可配,可通过业务流量或线程池监控的数据,对线程池进行反馈调节。
ThreadPoolExecutor 提供了下面几个方法来对线程池的参数进行调节。
public void setCorePoolSize(int corePoolSize);
public void setKeepAliveTime(long time, TimeUnit unit);
public void setMaximumPoolSize(int maximumPoolSize);
public void setRejectedExecutionHandler(RejectedExecutionHandler handler);
public void setThreadFactory(ThreadFactory threadFactory);
业内对「动态可观测线程池」也有较为成熟的解决方案,比如
- 美团动态化线程池的设计
- DynamicTp:基于配置中心的轻量级动态线程池
-
美团动态化线程池的设计
ref 1-Java线程池实现原理及其在美团业务中的实践
DynamicTp
ref 1-线程池如何观测
ref 2-DynamicTp | github
Hippo4J
Hippo4J 是一个动态可观测线程池框架,可为业务系统提高线上运行保障能力。Hippo4J 基于「美团动态线程池」设计理念开发,针对线程池增强了动态调参、监控、报警功能。
主要功能
Hippo4J 的主要功能包括
- 动态变更
- 应用运行时动态变更线程池参数,包括但不限于核心数、最大线程数、阻塞队列大小和拒绝策略等
- 支持集群内线程池的差异化配置
- 自定义报警
- 线程池运行时进行埋点,采集监控信息
- 提供 4 种报警维度,线程池过载、阻塞队列容量、运行超长以及拒绝策略报警
- 运行监控
- 支持自定义时长的线程池运行数据采集存储
- 提供可视化大屏监控运行指标 | 模块 | hippo4j-core | hippo4j-server | | —- | —- | —- | | 依赖 | Nacos、Apollo 等配置中心 | 部署 Hippo4J Server(内部无依赖中间件) | | 使用 | 配置中心补充线程池相关参数 | Hippo4J Server Web 控制台添加线程池记录 | | 功能 | 提供基础功能,包括参数动态化、运行时监控、报警等 | 对基础功能进行扩展,提供控制台界面、线程池堆栈查看、线程池运行信息实时查看、历史运行信息查看、线程池配置集群个性化等功能 |
使用建议 根据公司情况选择,如果基本功能可满足使用,选择 hippo4j-core 即可;如果希望更多功能,可以选择 hippo4j-server。
解决什么问题
相比原生线程池,Hippo4J 解决了以下问题
- 原生线程池创建时无法合理评估参数,Hippo4J 可对参数动态调整
- 使用原生线程池时,现有的参数配置在遇到突发流量洪峰时,可能会频繁拒绝任务
- Hippo4J 提供动态修改参数功能,避免修改线程池参数后重启线上应用
- 原生线程池无任务报警策略,Hippo4J 提供了 4 种报警策略,分别是
- 活跃度报警
- 队列容量报警
- 拒绝策略报警
- 运行时间过长报警
- Hippo4J 提供了对原生线程池运行状态的监控
- Hippo4J 提供了对原生线程池运行时堆栈信息的查看
作者:变速风声
链接:https://juejin.cn/post/7104814252510150692
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。