1. import lombok.experimental.UtilityClass;
    2. import lombok.extern.slf4j.Slf4j;
    3. import java.util.concurrent.*;
    4. import java.util.concurrent.atomic.AtomicInteger;
    5. import static com.isyscore.rpc.service.util.ThreadPoolFactoryUtil.RejectEnum.*;
    6. /**
    7. * 线程池工厂
    8. * <p>对于一些情况,额外新增阻塞式拒绝策略
    9. *
    10. * @author shizi
    11. * @since 2020/3/3 下午8:39
    12. */
    13. @Slf4j
    14. @UtilityClass
    15. public class ThreadPoolFactoryUtil {
    16. /**
    17. * 获取单例线程池
    18. *
    19. * @param threadName 线程名
    20. * @return 线程池
    21. */
    22. public ThreadPoolExecutor getSinglePool(String threadName) {
    23. return getPool(threadName, 1, 1, 0, new LinkedBlockingQueue<>(20000), false, BLOCK);
    24. }
    25. /**
    26. * 获取可变更线程池
    27. *
    28. * @param threadName 线程名
    29. * @return 线程池
    30. */
    31. public ThreadPoolExecutor getCachePool(String threadName) {
    32. return getPool(threadName, 1, Integer.MAX_VALUE, 60 * 1000, new SynchronousQueue<>(), true, ABORT);
    33. }
    34. /**
    35. * 获取固定数目线程池
    36. *
    37. * @param threadName 线程名
    38. * @param coreSize 核心线程个数
    39. * @return 线程池
    40. */
    41. public ThreadPoolExecutor getFixedPool(String threadName, Integer coreSize) {
    42. return getPool(threadName, coreSize, coreSize, 0, new LinkedBlockingQueue<>(20000), false, BLOCK);
    43. }
    44. /**
    45. * 获取固定数目线程池
    46. * <p>
    47. * 允许核心线程池超时退出
    48. *
    49. * @param threadName 线程名
    50. * @param coreSize 核心线程个数
    51. * @return 线程池
    52. */
    53. public ThreadPoolExecutor getFixedPoolOfAllowTimeout(String threadName, Integer coreSize) {
    54. return getPool(threadName, coreSize, coreSize, 60 * 1000, new LinkedBlockingQueue<>(20000), true, BLOCK);
    55. }
    56. /**
    57. * 获取对应系统核心线程个数的线程池
    58. * <p>
    59. * 不允许核心线程池超时退出
    60. *
    61. * @param threadName 线程名
    62. * @return 线程池
    63. */
    64. public ThreadPoolExecutor getSystemCorePool(String threadName) {
    65. return getSystemCorePool(threadName, 0, false, BLOCK);
    66. }
    67. /**
    68. * 获取对应系统核心线程个数的线程池
    69. *
    70. * @param threadName 线程名字
    71. * @return 线程池
    72. */
    73. public ThreadPoolExecutor getSystemCorePoolOfAllowTimeout(String threadName) {
    74. return getSystemCorePool(threadName, 60 * 1000, true, BLOCK);
    75. }
    76. /**
    77. * 系统核心线程个数倍数的线程池
    78. * <p>
    79. * 不允许核心线程池超时退出
    80. *
    81. * @param threadName 线程名
    82. * @param coreSizeMultiples 核心线程数的倍数
    83. * @param maxSizeMultiples 最大线程数的倍数
    84. * @return 线程池
    85. */
    86. public ThreadPoolExecutor getSystemCorePoolOfMulti(String threadName, Integer coreSizeMultiples, Integer maxSizeMultiples) {
    87. return getPool(threadName, Runtime.getRuntime().availableProcessors() * coreSizeMultiples, Runtime.getRuntime().availableProcessors() * maxSizeMultiples, 0,
    88. new LinkedBlockingQueue<>(20000), false, BLOCK);
    89. }
    90. /**
    91. * 系统核心线程个数倍数的线程池
    92. *
    93. * @param threadName 线程名字
    94. * @return 线程池
    95. */
    96. public ThreadPoolExecutor getSystemCorePoolOfMultiAllowTimeout(String threadName, Integer coreSizeMultiples, Integer maxSizeMultiples) {
    97. return getPool(threadName, Runtime.getRuntime().availableProcessors() * coreSizeMultiples, Runtime.getRuntime().availableProcessors() * maxSizeMultiples, 60 * 1000,
    98. new LinkedBlockingQueue<>(20000), true, BLOCK);
    99. }
    100. /**
    101. * 构造默认系统cpu个数的线程池
    102. *
    103. * @param threadName 线程池名字
    104. * @param aliveSecondTime 存活时间(毫秒)
    105. * @param allowCoreThreadTimeout 是否允许核心线程超时清理
    106. * @param rejectEnum 拒绝策略
    107. * @return 线程池
    108. */
    109. public ThreadPoolExecutor getSystemCorePool(String threadName, Integer aliveSecondTime, Boolean allowCoreThreadTimeout, RejectEnum rejectEnum) {
    110. return getPool(threadName, Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), aliveSecondTime,
    111. new LinkedBlockingQueue<Runnable>(20000), allowCoreThreadTimeout, rejectEnum);
    112. }
    113. /**
    114. * 创建线程池
    115. *
    116. * @param threadName 线程名字
    117. * @param coreSize 核心线程数
    118. * @param maxSize 最大线程数
    119. * @param aliveMilliSecondTime 存活时间(毫秒)
    120. * @param blockingQueue 阻塞队列
    121. * @param allowCoreThreadTimeout 是否允许核心线程超时清理
    122. * @param rejectEnum 拒绝策略:
    123. * abort(抛异常)
    124. * discard(丢弃任务)
    125. * discardOldest(丢弃队列中最老的)
    126. * callRun(直接运行)
    127. * block(阻塞)
    128. * @return 线程池
    129. */
    130. public ThreadPoolExecutor getPool(String threadName, Integer coreSize, Integer maxSize, Integer aliveMilliSecondTime, BlockingQueue<Runnable> blockingQueue,
    131. Boolean allowCoreThreadTimeout, RejectEnum rejectEnum) {
    132. if (null == rejectEnum) {
    133. throw new RuntimeException("reject is null");
    134. }
    135. RejectedExecutionHandler rejectedExecutionHandler;
    136. switch (rejectEnum) {
    137. case ABORT:
    138. rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
    139. break;
    140. case DISCARD:
    141. rejectedExecutionHandler = new ThreadPoolExecutor.DiscardPolicy();
    142. break;
    143. case DISCARD_OLDEST:
    144. rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
    145. break;
    146. case CALL_RUN:
    147. rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
    148. break;
    149. case BLOCK:
    150. rejectedExecutionHandler = new BlockRejectedExecutionHandler();
    151. break;
    152. default:
    153. throw new RuntimeException("not support reject:" + rejectEnum.name());
    154. }
    155. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(coreSize, maxSize, aliveMilliSecondTime, TimeUnit.MILLISECONDS, blockingQueue, new ThreadFactory() {
    156. private final AtomicInteger atomicInteger = new AtomicInteger(0);
    157. @Override
    158. public Thread newThread(Runnable r) {
    159. return new Thread(r, threadName + "_" + atomicInteger.getAndIncrement());
    160. }
    161. }, rejectedExecutionHandler);
    162. threadPoolExecutor.allowCoreThreadTimeOut(allowCoreThreadTimeout);
    163. return threadPoolExecutor;
    164. }
    165. /**
    166. * 重写拒绝策略,用于在任务量超大情况下任务的阻塞提交,防止任务丢失
    167. */
    168. private class BlockRejectedExecutionHandler implements RejectedExecutionHandler {
    169. @Override
    170. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    171. try {
    172. executor.getQueue().put(r);
    173. } catch (InterruptedException e) {
    174. log.warn("thread interrupt", e);
    175. Thread.currentThread().interrupt();
    176. }
    177. }
    178. }
    179. public enum RejectEnum {
    180. /**
    181. * 异常
    182. */
    183. ABORT,
    184. /**
    185. * 丢弃
    186. */
    187. DISCARD,
    188. /**
    189. * 丢弃最老的
    190. */
    191. DISCARD_OLDEST,
    192. /**
    193. * 直接运行
    194. */
    195. CALL_RUN,
    196. /**
    197. * 阻塞等待
    198. */
    199. BLOCK;
    200. }
    201. }

    线程池帮助

    1. import lombok.Setter;
    2. import lombok.experimental.UtilityClass;
    3. import java.util.Collection;
    4. import java.util.List;
    5. import java.util.concurrent.*;
    6. /**
    7. * @author shizi
    8. * @since 2020-12-01 17:09:23
    9. */
    10. @UtilityClass
    11. public class ThreadPoolHelper {
    12. /**
    13. * 带有阻塞提交的线程池
    14. */
    15. @Setter
    16. private ThreadPoolExecutor poolExecutor = ThreadPoolFactoryUtil.getSystemCorePoolOfMultiAllowTimeout("isyscore-config-client-thread", 5, 100);
    17. public void shutdown() {
    18. poolExecutor.shutdown();
    19. }
    20. public List<Runnable> shutdownNow() {
    21. return poolExecutor.shutdownNow();
    22. }
    23. public boolean isShutdown() {
    24. return poolExecutor.isShutdown();
    25. }
    26. public boolean isTerminated() {
    27. return poolExecutor.isTerminated();
    28. }
    29. public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
    30. return poolExecutor.awaitTermination(timeout, unit);
    31. }
    32. public <T> Future<T> submit(Callable<T> task) {
    33. return poolExecutor.submit(task);
    34. }
    35. public <T> Future<T> submit(Runnable task, T result) {
    36. return poolExecutor.submit(task, result);
    37. }
    38. public Future<?> submit(Runnable task) {
    39. return poolExecutor.submit(task);
    40. }
    41. public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
    42. return poolExecutor.invokeAll(tasks);
    43. }
    44. public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
    45. return poolExecutor.invokeAll(tasks, timeout, unit);
    46. }
    47. public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
    48. return poolExecutor.invokeAny(tasks);
    49. }
    50. public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    51. return poolExecutor.invokeAny(tasks, timeout, unit);
    52. }
    53. public void execute(Runnable command) {
    54. poolExecutor.execute(command);
    55. }
    56. }