最近我们组杨青同学遇到一个使用线程池不当的问题:异步处理的线程池线程将主线程hang住了,分析代码发现是线程池的拒绝策略设置得不合理,设置为CallerRunsPolicy。当异步线程的执行效率降低时,阻塞队列满了,触发了拒绝策略,进而导致主线程hang死。

    从这个问题中,我们学到了两点:

    • 线程池的使用,需要充分分析业务场景后作出选择,必要的情况下需要自定义线程池;
    • 线程池的运行状况,也需要监控

    关于线程池的监控,我参考了《Java编程的艺术》中提供的思路实现的,分享下我的代码片段,如下:

    1. public class AsyncThreadExecutor implements AutoCloseable {
    2. private static final int DEFAULT_QUEUE_SIZE = 1000;
    3. private static final int DEFAULT_POOL_SIZE = 10;
    4. @Setter
    5. private int queueSize = DEFAULT_QUEUE_SIZE;
    6. @Setter
    7. private int poolSize = DEFAULT_POOL_SIZE;
    8. /**
    9. * 用于周期性监控线程池的运行状态
    10. */
    11. private final ScheduledExecutorService scheduledExecutorService =
    12. Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("async thread executor monitor").build());
    13. /**
    14. * 自定义异步线程池
    15. * (1)任务队列使用有界队列
    16. * (2)自定义拒绝策略
    17. */
    18. private final ThreadPoolExecutor threadPoolExecutor =
    19. new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(queueSize),
    20. new BasicThreadFactory.Builder().namingPattern("async-thread-%d").build(),
    21. (r, executor) -> log.error("the async executor pool is full!!"));
    22. private final ExecutorService executorService = threadPoolExecutor;
    23. @PostConstruct
    24. public void init() {
    25. scheduledExecutorService.scheduleAtFixedRate(() -> {
    26. /**
    27. * 线程池需要执行的任务数
    28. */
    29. long taskCount = threadPoolExecutor.getTaskCount();
    30. /**
    31. * 线程池在运行过程中已完成的任务数
    32. */
    33. long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
    34. /**
    35. * 曾经创建过的最大线程数
    36. */
    37. long largestPoolSize = threadPoolExecutor.getLargestPoolSize();
    38. /**
    39. * 线程池里的线程数量
    40. */
    41. long poolSize = threadPoolExecutor.getPoolSize();
    42. /**
    43. * 线程池里活跃的线程数量
    44. */
    45. long activeCount = threadPoolExecutor.getActiveCount();
    46. log.info("async-executor monitor. taskCount:{}, completedTaskCount:{}, largestPoolSize:{}, poolSize:{}, activeCount:{}",
    47. taskCount, completedTaskCount, largestPoolSize, poolSize, activeCount);
    48. }, 0, 10, TimeUnit.MINUTES);
    49. }
    50. public void execute(Runnable task) {
    51. executorService.execute(task);
    52. }
    53. @Override
    54. public void close() throws Exception {
    55. executorService.shutdown();
    56. }
    57. }

    这里的主要思路是:(1)使用有界队列的固定数量线程池;(2)拒绝策略是将任务丢弃,但是需要记录错误日志;(3)使用一个调度线程池对业务线程池进行监控。

    在查看监控日志的时候,看到下图所示的监控日志:
    Java线程池监控小结 - 图1

    这里我对largestPooSize的含义比较困惑,按字面理解是“最大的线程池数量”,但是按照线程池的定义,maximumPoolSize和coreSize相同的时候(在这里,都是10),一个线程池里的最大线程数是10,那么为什么largestPooSize可以是39呢?我去翻这块的源码:

    1. /**
    2. * Returns the largest number of threads that have ever
    3. * simultaneously been in the pool.
    4. *
    5. * @return the number of threads
    6. */
    7. public int getLargestPoolSize() {
    8. final ReentrantLock mainLock = this.mainLock;
    9. mainLock.lock();
    10. try {
    11. return largestPoolSize;
    12. } finally {
    13. mainLock.unlock();
    14. }
    15. }

    注释的翻译是:返回在这个线程池里曾经同时存在过的线程数。再看这个变量largestPoolSize在ThreadExecutor中的赋值的地方,代码如下:

    1. private boolean addWorker(Runnable firstTask, boolean core) {
    2. retry:
    3. for (;;) {
    4. int c = ctl.get();
    5. int rs = runStateOf(c);
    6. // Check if queue empty only if necessary.
    7. if (rs >= SHUTDOWN &&
    8. ! (rs == SHUTDOWN &&
    9. firstTask == null &&
    10. ! workQueue.isEmpty()))
    11. return false;
    12. for (;;) {
    13. int wc = workerCountOf(c);
    14. if (wc >= CAPACITY ||
    15. wc >= (core ? corePoolSize : maximumPoolSize))
    16. return false;
    17. if (compareAndIncrementWorkerCount(c))
    18. break retry;
    19. c = ctl.get(); // Re-read ctl
    20. if (runStateOf(c) != rs)
    21. continue retry;
    22. // else CAS failed due to workerCount change; retry inner loop
    23. }
    24. }
    25. boolean workerStarted = false;
    26. boolean workerAdded = false;
    27. Worker w = null;
    28. try {
    29. w = new Worker(firstTask);
    30. final Thread t = w.thread;
    31. if (t != null) {
    32. final ReentrantLock mainLock = this.mainLock;
    33. mainLock.lock();
    34. try {
    35. // Recheck while holding lock.
    36. // Back out on ThreadFactory failure or if
    37. // shut down before lock acquired.
    38. int rs = runStateOf(ctl.get());
    39. if (rs < SHUTDOWN ||
    40. (rs == SHUTDOWN && firstTask == null)) {
    41. if (t.isAlive()) // precheck that t is startable
    42. throw new IllegalThreadStateException();
    43. workers.add(w);
    44. int s = workers.size();
    45. if (s > largestPoolSize)
    46. largestPoolSize = s;//这里这里!
    47. workerAdded = true;
    48. }
    49. } finally {
    50. mainLock.unlock();
    51. }
    52. if (workerAdded) {
    53. t.start();
    54. workerStarted = true;
    55. }
    56. }
    57. } finally {
    58. if (! workerStarted)
    59. addWorkerFailed(w);
    60. }
    61. return workerStarted;
    62. }

    发现两点:

    • largestPoolSize是worker集合的历史最大值,只增不减。largestPoolSize的大小是线程池曾创建的线程个数,跟线程池的容量无关;
    • largestPoolSize<=maximumPoolSize。

    PS:杨青同学是这篇文章的灵感来源,他做了很多压测。给了我很多思路,并跟我一起分析了一些代码。


    本号专注于后端技术、JVM问题排查和优化、Java面试题、个人成长和自我管理等主题,为读者提供一线开发者的工作和成长经验,期待你能在这里有所收获。
    Java线程池监控小结 - 图2