3.1 Dubbo已有线程池

dubbo在使用时,都是通过创建真实的业务线程池进行操作的。目前已知的线程池模型有两个和java中
的相互对应:

  • fix: 表示创建固定大小的线程池。也是Dubbo默认的使用方式,默认创建的执行线程数为200,并且是没有任何等待队列的。所以再极端的情况下可能会存在问题,比如某个操作大量执行时,可能存在堵塞的情况。后面也会讲相关的处理办法。
  • cache: 创建非固定大小的线程池,当线程不足时,会自动创建新的线程。但是使用这种的时候需要注意,如果突然有高TPS的请求过来,方法没有及时完成,则会造成大量的线程创建,对系统的CPU和负载都是压力,执行越多反而会拖慢整个系统。

    3.2 自定义线程池

    在真实的使用过程中可能会因为使用fifix模式的线程池,导致具体某些业务场景因为线程池中的线程数量不足而产生错误,而很多业务研发是对这些无感知的,只有当出现错误的时候才会去查看告警或者通过客户反馈出现严重的问题才去查看,结果发现是线程池满了。所以可以在创建线程池的时,通过某些手段对这个线程池进行监控,这样就可以进行及时的扩缩容机器或者告警。下面的这个程序就是这样子的,会在创建线程池后进行对其监控,并且及时作出相应处理。
    (1)线程池实现, 这里主要是基于对 FixedThreadPool 中的实现做扩展出线程监控的部分 ```

public class WatchingThreadPool extends FixedThreadPool implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(WatchingThreadPool.class); private static final double ALARM_PERCENT = 0.90; private final Map THREAD_POOLS = new ConcurrentHashMap<>();

  1. public WatchingThreadPool() { // 每隔3秒打印线程使用情况
  2. Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this, 1, 3, TimeUnit.SECONDS);
  3. }
  4. @Override
  5. public Executor getExecutor(URL url) { // 从父类中创建线程池
  6. final Executor executor = super.getExecutor(url);
  7. if (executor instanceof ThreadPoolExecutor) {
  8. THREAD_POOLS.put(url, ((ThreadPoolExecutor) executor));
  9. }
  10. return executor;
  11. }
  12. @Override
  13. public void run() {
  14. //遍历线程池,如果超出指定的部分,进行操作,比如接入公司的告警系统或者短信平台
  15. for (Map.Entry<URL, ThreadPoolExecutor> entry : THREAD_POOLS.entrySet()) {
  16. final URL url = entry.getKey();
  17. final ThreadPoolExecutor executor = entry.getValue();
  18. // 当前执行中的线程数
  19. final int activeCount = executor.getActiveCount();
  20. // 总计线程数
  21. final int poolSize = executor.getCorePoolSize();
  22. double used = (double) activeCount / poolSize;
  23. final int usedNum = (int) (used * 100);
  24. LOGGER.info("线程池执行状态:[{}/{}]:{}%", activeCount, poolSize, usedNum);
  25. if (used >= ALARM_PERCENT) {
  26. LOGGER.error("超出警戒值!host:{}, 当前已使用量:{}%, URL:{}", url.getIp(), usedNum, url);
  27. }
  28. }
  29. }

} ``` (2)SPI声明,创建文件 META-INF/dubbo/org.apache.dubbo.common.threadpool.ThreadPool

watching=包名.线程池名

(3)在服务提供方项目引入该依赖
(4)在服务提供方项目中设置使用该线程池生成器

dubbo.provider.threadpool=watching

(5)接下来需要做的就是模拟整个流程,因为该线程当前是每1秒抓一次数据,所以我们需要对该方法
的提供者超过1秒的时间(比如这里用休眠 Thread.sleep ),消费者则需要启动多个线程来并行执行,来
模拟整个并发情况。
(6)在调用方则尝试简单通过for循环启动多个线程来执行 查看服务提供方的监控情况