大家好,这篇文章我们来介绍下动态线程池框架(DynamicTp)的 adapter 模块,上篇文章也大概介绍过了,该模块主要是用来适配一些第三方组件的线程池管理,让第三方组件内置的线程池也能享受到动态参数调整,监控告警这些增强功能。


DynamicTp 项目地址

目前 500 多 star,感谢你的 star,欢迎 pr,业务之余给开源贡献一份力量

gitee 地址gitee.com/yanhom/dyna…

github 地址github.com/lyh200/dyna…


系列文章

美团动态线程池实践思路,开源了

动态线程池框架(DynamicTp),监控及源码解析篇


adapter 已接入组件

adapter 模块目前已经接入了 SpringBoot 内置的三大 WebServer(Tomcat、Jetty、Undertow)的线程池管理,实现层面也是和核心模块做了解耦,利用 spring 的事件机制进行通知监听处理。

动态线程池(DynamicTp)之动态调整Tomcat、Jetty、Undertow线程池参数篇 - 掘金 - 图1

可以看出有两个监听器

  1. 当监听到配置中心配置变更时,在更新我们项目内部线程池后会发布一个 RefreshEvent 事件,DtpWebRefreshListener 监听到该事件后会去更新对应 WebServer 的线程池参数。
  2. 同样监控告警也是如此,在 DtpMonitor 中执行监控任务时会发布 CollectEvent 事件,DtpWebCollectListener 监听到该事件后会去采集相应 WebServer 的线程池指标数据。

要想去管理第三方组件的线程池,首先肯定要对这些组件有一定的熟悉度,了解整个请求的一个处理过程,找到对应处理请求的线程池,这些线程池不一定是 JUC 包下的 ThreadPoolExecutor 类,也可能是组件自己实现的线程池,但是基本原理都差不多。

Tomcat、Jetty、Undertow 这三个都是这样,他们并没有直接使用 JUC 提供的线程池实现,而是自己实现了一套,或者扩展了 JUC 的实现;翻源码找到相应的线程池后,然后看有没有暴露 public 方法供我们调用获取,如果没有就需要考虑通过反射来拿了。


Tomcat 内部线程池的实现

  • Tomcat 内部线程池没有直接使用 JUC 下的 ThreadPoolExecutor,而是选择继承 JUC 下的 Executor 体系类,然后重写 execute() 等方法,不同版本有差异。
  1. 继承 JUC 原生 ThreadPoolExecutor(9.0.50 版本及以下),并覆写了一些方法,主要 execute() 和 afterExecute()

  2. 继承 JUC 的 AbstractExecutorService(9.0.51 版本及以上),代码基本是拷贝 JUC 的 ThreadPoolExecutor,也相应的微调了 execute() 方法

注意 Tomcat 实现的线程池类名称也叫 ThreadPoolExecutor,名字跟 JUC 下的是一样的,Tomcat 的 ThreadPoolExecutor 类 execute() 方法如下:

  1. public void execute(Runnable command, long timeout, TimeUnit unit) {
  2. submittedCount.incrementAndGet();
  3. try {
  4. super.execute(command);
  5. } catch (RejectedExecutionException rx) {
  6. if (super.getQueue() instanceof TaskQueue) {
  7. final TaskQueue queue = (TaskQueue)super.getQueue();
  8. try {
  9. if (!queue.force(command, timeout, unit)) {
  10. submittedCount.decrementAndGet();
  11. throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
  12. }
  13. } catch (InterruptedException x) {
  14. submittedCount.decrementAndGet();
  15. throw new RejectedExecutionException(x);
  16. }
  17. } else {
  18. submittedCount.decrementAndGet();
  19. throw rx;
  20. }
  21. }
  22. }

可以看出他是先调用父类的 execute() 方法,然后捕获 RejectedExecutionException 异常,再去判断如果任务队列类型是 TaskQueue,则尝试将任务添加到任务队列中,如果添加失败,证明队列已满,然后再执行拒绝策略,此处 submittedCount 是一个原子变量,记录提交到此线程池但未执行完成的任务数(主要在下面要提到的 TaskQueue 队列的 offer() 方法用),为什么要这样设计呢?继续往下看!

  • Tomcat 定义了阻塞队列 TaskQueue 继承自 LinkedBlockingQueue,该队列主要重写了 offer() 方法。
  1. @Override
  2. public boolean offer(Runnable o) {
  3. if (parent==null) return super.offer(o);
  4. if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
  5. if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
  6. if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
  7. return super.offer(o);
  8. }

可以看到他在入队之前做了几个判断,这里的 parent 就是所属的线程池对象

  1. 如果 parent 为 null,直接调用父类 offer 方法入队

  2. 如果当前线程数等于最大线程数,则直接调用父类 offer() 方法入队

  3. 如果当前未执行的任务数量小于等于当前线程数,仔细思考下,是不是说明有空闲的线程呢,那么直接调用父类 offer() 入队后就马上有线程去执行它

  4. 如果当前线程数小于最大线程数量,则直接返回 false,然后回到 JUC 线程池的执行流程回想下,是不是就去添加新线程去执行任务了呢

  5. 其他情况都直接入队

  • 因为 Tomcat 线程池主要是来做 IO 任务的,做这一切的目的主要也是为了以最小代价的改动更好的支持 IO 密集型的场景,JUC 自带的线程池主要是适合于 CPU 密集型的场景,可以回想一下 JUC 原生线程池 ThreadPoolExecutor#execute() 方法的执行流程
  1. 判断如果当前线程数小于核心线程池,则新建一个线程来处理提交的任务

  2. 如果当前线程数大于核心线程数且队列没满,则将任务放入任务队列等待执行

  3. 如果当前当前线程池数大于核心线程池,小于最大线程数,且任务队列已满,则创建新的线程执行提交的任务

  4. 如果当前线程数等于最大线程数,且队列已满,则拒绝该任务

可以看出当当前线程数大于核心线程数时,JUC 原生线程池首先是把任务放到队列里等待执行,而不是先创建线程执行。

如果 Tomcat 接收的请求数量大于核心线程数,请求就会被放到队列中,等待核心线程处理,这样会降低请求的总体处理速度,所以 Tomcat 并没有使用 JUC 原生线程池,利用 TaskQueue 的 offer() 方法巧妙的修改了 JUC 线程池的执行流程,改写后 Tomcat 线程池执行流程如下:

  1. 判断如果当前线程数小于核心线程池,则新建一个线程来处理提交的任务

  2. 如果当前当前线程池数大于核心线程池,小于最大线程数,则创建新的线程执行提交的任务

  3. 如果当前线程数等于最大线程数,则将任务放入任务队列等待执行

  4. 如果队列已满,则执行拒绝策略

  • Tomcat 核心线程池有对应的获取方法,获取方式如下
  1. public Executor doGetTp(WebServer webServer) {
  2. TomcatWebServer tomcatWebServer = (TomcatWebServer) webServer;
  3. return tomcatWebServer.getTomcat().getConnector().getProtocolHandler().getExecutor();
  4. }
  • 想要动态调整 Tomcat 线程池的线程参数,可以在引入 DynamicTp 依赖后,在配置文件中添加以下配置就行,参数名称也是和 SpringBoot 提供的 Properties 配置类参数相同,配置文件完整示例看项目 readme 介绍
  1. spring:
  2. dynamic:
  3. tp:
  4. // 其他配置项
  5. tomcatTp:
  6. minSpare: 100
  7. max: 400

Tomcat 线程池就介绍到这里吧,通过以上的一些介绍想必大家对 Tomcat 线程池执行任务的流程都很清楚了吧。


Jetty 内部线程池的实现

  • Jetty 内部线程池,定义了一个继承自 Executor 的 ThreadPool 顶级接口,实现类有以下几个

动态线程池(DynamicTp)之动态调整Tomcat、Jetty、Undertow线程池参数篇 - 掘金 - 图2

  • 内部主要使用 QueuedThreadPool 这个实现类,该线程池执行流程就不在详细解读了,感兴趣的可以自己去看源码,核心思想都差不多,围绕核心线程数、最大线程数、任务队列三个参数入手,跟 Tocmat 比对着来看,其实也挺简单的。
  1. public void execute(Runnable job)
  2. {
  3. int startThread;
  4. while (true)
  5. {
  6. long counts = _counts.get();
  7. int threads = AtomicBiInteger.getHi(counts);
  8. if (threads == Integer.MIN_VALUE)
  9. throw new RejectedExecutionException(job.toString());
  10. int idle = AtomicBiInteger.getLo(counts);
  11. startThread = (idle <= 0 && threads < _maxThreads) ? 1 : 0;
  12. if (!_counts.compareAndSet(counts, threads + startThread, idle + startThread - 1))
  13. continue;
  14. break;
  15. }
  16. if (!_jobs.offer(job))
  17. {
  18. if (addCounts(-startThread, 1 - startThread))
  19. LOG.warn("{} rejected {}", this, job);
  20. throw new RejectedExecutionException(job.toString());
  21. }
  22. if (LOG.isDebugEnabled())
  23. LOG.debug("queue {} startThread={}", job, startThread);
  24. while (startThread-- > 0)
  25. startThread();
  26. }
  • Jetty 线程池有提供 public 的获取方法,获取方式如下
  1. public Executor doGetTp(WebServer webServer) {
  2. JettyWebServer jettyWebServer = (JettyWebServer) webServer;
  3. return jettyWebServer.getServer().getThreadPool();
  4. }
  • 想要动态调整 Jetty 线程池的线程参数,可以在引入 DynamicTp 依赖后,在配置文件中添加以下配置就行,参数名称也是和 SpringBoot 提供的 Properties 配置类参数相同,配置文件完整示例看项目 readme 介绍
  1. spring:
  2. dynamic:
  3. tp:
  4. // 其他配置项
  5. jettyTp:
  6. min: 100
  7. max: 400

Undertow 内部线程池的实现

  • Undertow 因为其性能彪悍,轻量,现在用的还是挺多的,wildfly(前身 Jboss)从 8 开始内部默认的 WebServer 用 Undertow 了,之前是 Tomcat 吧。了解 Undertow 的小伙伴应该知道,他底层是基于 XNIO 框架(3.X 之前)来做的,这也是 Jboss 开发的一款基于 java nio 的优秀网络框架。但 Undertow 宣布从 3.0 开始底层网络框架要切换成 Netty 了,官方给的原因是说起网络编程,Netty 已经是事实上标准,用 Netty 的好处远大于 XNIO 能提供的,所以让我们期待 3.0 的发布吧,只可惜三年前就宣布了,至今也没动静,不知道是夭折了还是咋的,说实话,改动也挺大的,看啥时候发布吧,以下的介绍是基于 Undertow 2.x 版本来的
  • Undertow 内部是定义了一个叫 TaskPool 的线程池顶级接口,该接口有如图所示的几个实现。其实这几个实现类都是采用组合的方式,内部都维护一个 JUC 的 Executor 体系类或者维护 Jboss 提供的 EnhancedQueueExecutor 类(也继承 JUC ExecutorService 类),执行流程可以自己去分析

动态线程池(DynamicTp)之动态调整Tomcat、Jetty、Undertow线程池参数篇 - 掘金 - 图3

  • 具体的创建代码如下,根据外部是否传入,如果有传入则用外部传入的类,如果没有,根据参数设置内部创建一个,具体是用 JUC 的 ThreadPoolExecutor 还是 Jboss 的 EnhancedQueueExecutor,根据配置参数选择

动态线程池(DynamicTp)之动态调整Tomcat、Jetty、Undertow线程池参数篇 - 掘金 - 图4

  • Undertow 线程池没有提供 public 的获取方法,所以通过反射来获取,获取方式如下
  1. public Executor doGetTp(WebServer webServer) {
  2. UndertowWebServer undertowWebServer = (UndertowWebServer) webServer;
  3. Field undertowField = ReflectionUtils.findField(UndertowWebServer.class, "undertow");
  4. if (Objects.isNull(undertowField)) {
  5. return null;
  6. }
  7. ReflectionUtils.makeAccessible(undertowField);
  8. Undertow undertow = (Undertow) ReflectionUtils.getField(undertowField, undertowWebServer);
  9. if (Objects.isNull(undertow)) {
  10. return null;
  11. }
  12. return undertow.getWorker();
  13. }
  • 想要动态调整 Undertow 线程池的线程参数,可以在引入 DynamicTp 依赖后,在配置文件中添加以下配置就行,配置文件完整示例看项目 readme 介绍
  1. spring:
  2. dynamic:
  3. tp:
  4. // 其他配置项
  5. undertowTp:
  6. coreWorkerThreads: 100
  7. maxWorkerThreads: 400
  8. workerKeepAlive: 60

总结

以上介绍了 Tomcat、Jetty、Undertow 三大 WebServer 内置线程池的一些情况,重点介绍了 Tomcat 的,篇幅有限,其他两个感兴趣可以自己分析,原理都差不多。同时也介绍了基于 DynamicTp 怎么动态调整线程池的参数,当我们做 WebServer 性能调优时,能动态调整参数真的是非常好用的。

再次欢迎大家使用 DynamicTp 框架,一起完善项目。

下篇文章打算分享一个 DynamicTp 使用过程中因为 Tomcat 版本不一致导致的监控线程 halt 住的奇葩问题,通过一个问题来掌握 ScheduledExecutorService 的原理,欢迎大家持续关注。


联系我

欢迎加我微信或者关注公众号交流,一起变强!

公众号:CodeFox

微信:yanhom1314
https://juejin.cn/post/7073286368629096485