1. import lombok.experimental.UtilityClass;
    2. import lombok.extern.slf4j.Slf4j;
    3. import java.util.concurrent.*;
    4. import java.util.concurrent.atomic.AtomicInteger;
    5. /**
    6. * @author shizi
    7. * @since 2020/3/3 下午8:39
    8. */
    9. @Slf4j
    10. @SuppressWarnings("all")
    11. @UtilityClass
    12. public class ThreadPoolFactory {
    13. /**
    14. * 获取单例线程池
    15. *
    16. * @param threadName 线程名
    17. * @return 线程池
    18. */
    19. public ThreadPoolExecutor getSinglePool(String threadName) {
    20. return getPool(threadName, 1, 1, 0, new LinkedBlockingQueue<>(20000), false, "block");
    21. }
    22. /**
    23. * 获取可变更线程池
    24. *
    25. * @param threadName 线程名
    26. * @return 线程池
    27. */
    28. public ThreadPoolExecutor getCachePool(String threadName) {
    29. return getPool(threadName, 1, Integer.MAX_VALUE, 60 * 1000, new SynchronousQueue<>(), true, "abort");
    30. }
    31. /**
    32. * 获取固定数目线程池
    33. *
    34. * @param threadName 线程池名字
    35. * @param coreSize 核心线程个数
    36. * @return 线程池
    37. */
    38. public ThreadPoolExecutor getFixedPool(String threadName, Integer coreSize) {
    39. return getPool(threadName, coreSize, coreSize, 0, new LinkedBlockingQueue<Runnable>(20000), false, "block");
    40. }
    41. /**
    42. * 构造默认系统cpu个数的线程池
    43. *
    44. * @param threadName 线程池名字
    45. * @param aliveSecondTime 存活时间(毫秒)
    46. * @param aliveMilliSecondTime 是否允许核心线程超时清理
    47. * @param rejectHandler 拒绝策略
    48. * @return 线程池
    49. */
    50. public ThreadPoolExecutor getSystemCorePool(String threadName, Integer aliveSecondTime, Boolean aliveMilliSecondTime, String rejectHandler) {
    51. return getPool(threadName, Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), aliveSecondTime,
    52. new LinkedBlockingQueue<Runnable>(20000), aliveMilliSecondTime, rejectHandler);
    53. }
    54. /**
    55. * 创建线程池
    56. *
    57. * @param threadName 线程名字
    58. * @param coreSize 核心线程数
    59. * @param maxSize 最大线程数
    60. * @param aliveMilliSecondTime 存活时间(毫秒)
    61. * @param blockingQueue 阻塞队列
    62. * @param alloCoreThreadTimeout 是否允许核心线程超时清理
    63. * @param rejectHandler 拒绝策略:
    64. * abort(抛异常)
    65. * discard(丢弃任务)
    66. * discardOldest(丢弃队列中最老的)
    67. * callRun(直接运行)
    68. * block(阻塞)
    69. * @return
    70. */
    71. public ThreadPoolExecutor getPool(String threadName, Integer coreSize, Integer maxSize, Integer aliveMilliSecondTime, BlockingQueue<Runnable> blockingQueue,
    72. Boolean alloCoreThreadTimeout, String rejectHandler) {
    73. if (null == rejectHandler) {
    74. throw new RuntimeException("reject is null");
    75. }
    76. RejectedExecutionHandler rejectedExecutionHandler;
    77. switch (rejectHandler) {
    78. case "abort":
    79. rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
    80. break;
    81. case "discard":
    82. rejectedExecutionHandler = new ThreadPoolExecutor.DiscardPolicy();
    83. break;
    84. case "discardOldest":
    85. rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
    86. break;
    87. case "callRun":
    88. rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
    89. break;
    90. case "block":
    91. rejectedExecutionHandler = new BlockRejectedExecutionHandler();
    92. break;
    93. default:
    94. throw new RuntimeException("not support reject:" + rejectHandler);
    95. }
    96. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(coreSize, maxSize, aliveMilliSecondTime, TimeUnit.MILLISECONDS, blockingQueue, new ThreadFactory() {
    97. private AtomicInteger atomicInteger = new AtomicInteger(0);
    98. @Override
    99. public Thread newThread(Runnable r) {
    100. return new Thread(r, threadName + "_" + atomicInteger.getAndIncrement());
    101. }
    102. }, rejectedExecutionHandler);
    103. if (alloCoreThreadTimeout) {
    104. threadPoolExecutor.allowCoreThreadTimeOut(alloCoreThreadTimeout);
    105. }
    106. return threadPoolExecutor;
    107. }
    108. /**
    109. * 重写拒绝策略,用于在任务量超大情况下任务的阻塞提交,防止任务丢失
    110. */
    111. private class BlockRejectedExecutionHandler implements RejectedExecutionHandler {
    112. @Override
    113. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    114. try {
    115. executor.getQueue().put(r);
    116. } catch (InterruptedException e) {
    117. log.warn(GlueConstant.LOG_PRE + "thread interrupt", e);
    118. Thread.currentThread().interrupt();
    119. }
    120. }
    121. }
    122. }