1. package com.cict.iwu.project.uitl;
    2. import com.google.common.util.concurrent.ThreadFactoryBuilder;
    3. import lombok.extern.slf4j.Slf4j;
    4. import java.util.Map;
    5. import java.util.Optional;
    6. import java.util.concurrent.*;
    7. /**
    8. * @author: lian wm
    9. * @date: 2019/8/13 15:49
    10. */
    11. @Slf4j
    12. public class ThreadUtils {
    13. /**
    14. * 线程池map
    15. */
    16. private static final Map<String,ThreadPoolExecutor> poolMap = new ConcurrentHashMap<>();
    17. private static final ThreadPoolExecutor defaultThreadPoolExecutor = createFixedPool("ThreadUtilsDefaultPool",5);
    18. /**
    19. * 用于高并发实时任务的线程池
    20. * 线程池不限制线程上限,且默认持有20个核心线程,但非核心线程仅存活较短时间将销毁
    21. * 用于即时异步的短期任务
    22. * */
    23. private static final ThreadPoolExecutor cacheThreadPool = new ThreadPoolExecutor(
    24. 20,
    25. Integer.MAX_VALUE,
    26. 15L,
    27. TimeUnit.SECONDS,
    28. new SynchronousQueue<Runnable>(),
    29. getThreadFactory("RealTimeTaskPool")
    30. );
    31. static{
    32. cacheThreadPool.prestartAllCoreThreads();
    33. }
    34. /**
    35. * 生成线程池工厂的格式化
    36. * @param poolName
    37. * @return
    38. */
    39. private static String genThreadFactoryNameFormat(String poolName){
    40. if(poolName == null){
    41. poolName = "ThreadPool";
    42. }
    43. poolName = poolName.trim();
    44. return poolName + "-%03d";
    45. }
    46. /**
    47. * 获取线程工厂
    48. * @param poolName
    49. * @return
    50. */
    51. private static ThreadFactory getThreadFactory(String poolName){
    52. return new ThreadFactoryBuilder()
    53. .setNameFormat(genThreadFactoryNameFormat("RealTimeTaskPool"))
    54. .setDaemon(true)
    55. .build();
    56. }
    57. /**
    58. * 创建固定线程数的线程池
    59. * @param poolName
    60. * @return
    61. */
    62. private static ThreadPoolExecutor createFixedPool(String poolName,int poolSize){
    63. if(poolName == null){
    64. poolName = "ThreadUtilsDefaultPool";
    65. }
    66. if(poolSize <= 0 || poolSize >= 50){
    67. poolSize = 5;
    68. }
    69. poolName = poolName.trim();
    70. return new ThreadPoolExecutor(
    71. poolSize,
    72. poolSize,
    73. 15L,
    74. TimeUnit.SECONDS,
    75. new ArrayBlockingQueue<Runnable>(2000),
    76. getThreadFactory(poolName)
    77. );
    78. }
    79. /**
    80. * 将任务放入线程池中执行
    81. * @param poolName 线程池名称
    82. * @param runnable 任务
    83. */
    84. public static void run(String poolName,Runnable runnable){
    85. if(runnable == null){
    86. return;
    87. }
    88. Optional.ofNullable(getPool(poolName))
    89. .ifPresent(pool -> pool.execute(runnable));
    90. }
    91. /**
    92. * 将任务放入线程池中,等待一段时间后执行
    93. * @param poolName 线程池名称
    94. * @param runnable 任务
    95. * @param delayValue 延迟数值,默认为0
    96. * @param timeUnit 时间单位,默认为毫秒
    97. */
    98. public static void runDelay(String poolName,Runnable runnable,Long delayValue,TimeUnit timeUnit){
    99. run(poolName,() -> {
    100. try {
    101. Optional.ofNullable(timeUnit)
    102. .orElse(TimeUnit.MILLISECONDS)
    103. .sleep(Optional.ofNullable(delayValue).orElse(0L));
    104. } catch (InterruptedException e) {
    105. log.error("执行异常",e);
    106. }
    107. runnable.run();
    108. });
    109. }
    110. /**
    111. * 将任务放入线程池中执行,并返回一个可用来查看状态的对象
    112. * @param poolName 线程池名称
    113. * @param callable 任务
    114. * @param <T>
    115. * @return
    116. */
    117. public static <T> FutureTask<T> task(String poolName,Callable<T> callable){
    118. if(callable == null){
    119. return new FutureTask<T>(() -> null);
    120. }
    121. ThreadPoolExecutor threadPoolExecutor = getPool(poolName);
    122. FutureTask task = new FutureTask(callable);
    123. threadPoolExecutor.execute(task);
    124. return task;
    125. }
    126. /**
    127. * 启动一个线程执行任务
    128. * @param runnable 任务
    129. * @return
    130. */
    131. public static void runNow(Runnable runnable){
    132. if(runnable == null){
    133. throw new NullPointerException("任务不能为空");
    134. }
    135. cacheThreadPool.submit(runnable);
    136. }
    137. /**
    138. * 启动一个线程执行任务,并返回一个FutureTask对象
    139. * @param callable
    140. * @param <T>
    141. * @return
    142. */
    143. public static <T> FutureTask<T> runTask(Callable<T> callable){
    144. if(callable == null){
    145. return new FutureTask<T>(() -> null);
    146. }
    147. FutureTask task = new FutureTask(callable);
    148. cacheThreadPool.submit(task);
    149. return task;
    150. }
    151. /**
    152. * 获取线程池
    153. * @param poolName
    154. * @return
    155. */
    156. private static ThreadPoolExecutor getPool(String poolName){
    157. return getPool(poolName,5);
    158. }
    159. /**
    160. * 获取线程池
    161. * @param poolName 该参数将会自动大写去空
    162. * @param corePoolSize 核心线程数,该参数仅在首次创建时生效
    163. * @return
    164. */
    165. private static ThreadPoolExecutor getPool(String poolName,int corePoolSize){
    166. ThreadPoolExecutor threadPoolExecutor = null;
    167. if(poolName == null){
    168. return defaultThreadPoolExecutor;
    169. }
    170. poolName = poolName.trim().toUpperCase();
    171. threadPoolExecutor = poolMap.get(poolName);
    172. if(threadPoolExecutor == null){
    173. synchronized (ThreadUtils.class){
    174. threadPoolExecutor = poolMap.get(poolName);
    175. if(threadPoolExecutor == null){
    176. threadPoolExecutor = createFixedPool(poolName,corePoolSize);
    177. poolMap.put(poolName,threadPoolExecutor);
    178. }
    179. }
    180. }
    181. return threadPoolExecutor;
    182. }
    183. /**
    184. * 多次尝试,直到返回的结果不为null
    185. * @param callable 查询方法
    186. * @param retryTimes 重试次数,范围[1-20],每次间隔1秒.当重复请求总时间达到20秒后,即便次数未到,也会终止轮询
    187. * @param <T>
    188. * @return
    189. */
    190. public static <T> T queryUntilNotNull(Callable<T> callable, int retryTimes){
    191. if(retryTimes <= 0){
    192. retryTimes = 1;
    193. }
    194. if(retryTimes >= 20){
    195. retryTimes = 20;
    196. }
    197. long deadLine = System.currentTimeMillis() + 20 * 1000;
    198. for (int i = 0; i < retryTimes; i++) {
    199. T resultTemp = null;
    200. try {
    201. resultTemp = callable.call();
    202. } catch (Exception e) {
    203. e.printStackTrace();
    204. }
    205. if(resultTemp != null){
    206. return resultTemp;
    207. }else{
    208. if(System.currentTimeMillis() >= deadLine){
    209. break;
    210. }
    211. try {
    212. TimeUnit.SECONDS.sleep(1);
    213. } catch (InterruptedException e) {
    214. e.printStackTrace();
    215. }
    216. }
    217. }
    218. return null;
    219. }
    220. /**
    221. * 多次尝试,直到返回的结果不为null
    222. * @param callable 查询方法
    223. * @param <T>
    224. * @return
    225. */
    226. public static <T> T queryUntilNotNull(Callable<T> callable){
    227. return queryUntilNotNull(callable,20);
    228. }
    229. /**
    230. * 将线程加入到当前线程中
    231. * @param threads
    232. */
    233. public static void join(Thread... threads){
    234. if(threads != null){
    235. for (Thread thread : threads) {
    236. try {
    237. thread.join();
    238. } catch (InterruptedException e) {
    239. log.error("线程发生中断异常",e);
    240. }
    241. }
    242. }
    243. }
    244. }