import lombok.experimental.UtilityClass;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicInteger;/** * @author shizi * @since 2020/3/3 下午8:39 */@Slf4j@SuppressWarnings("all")@UtilityClasspublic class ThreadPoolFactory { /** * 获取单例线程池 * * @param threadName 线程名 * @return 线程池 */ public ThreadPoolExecutor getSinglePool(String threadName) { return getPool(threadName, 1, 1, 0, new LinkedBlockingQueue<>(20000), false, "block"); } /** * 获取可变更线程池 * * @param threadName 线程名 * @return 线程池 */ public ThreadPoolExecutor getCachePool(String threadName) { return getPool(threadName, 1, Integer.MAX_VALUE, 60 * 1000, new SynchronousQueue<>(), true, "abort"); } /** * 获取固定数目线程池 * * @param threadName 线程池名字 * @param coreSize 核心线程个数 * @return 线程池 */ public ThreadPoolExecutor getFixedPool(String threadName, Integer coreSize) { return getPool(threadName, coreSize, coreSize, 0, new LinkedBlockingQueue<Runnable>(20000), false, "block"); } /** * 构造默认系统cpu个数的线程池 * * @param threadName 线程池名字 * @param aliveSecondTime 存活时间(毫秒) * @param aliveMilliSecondTime 是否允许核心线程超时清理 * @param rejectHandler 拒绝策略 * @return 线程池 */ public ThreadPoolExecutor getSystemCorePool(String threadName, Integer aliveSecondTime, Boolean aliveMilliSecondTime, String rejectHandler) { return getPool(threadName, Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), aliveSecondTime, new LinkedBlockingQueue<Runnable>(20000), aliveMilliSecondTime, rejectHandler); } /** * 创建线程池 * * @param threadName 线程名字 * @param coreSize 核心线程数 * @param maxSize 最大线程数 * @param aliveMilliSecondTime 存活时间(毫秒) * @param blockingQueue 阻塞队列 * @param alloCoreThreadTimeout 是否允许核心线程超时清理 * @param rejectHandler 拒绝策略: * abort(抛异常) * discard(丢弃任务) * discardOldest(丢弃队列中最老的) * callRun(直接运行) * block(阻塞) * @return */ public ThreadPoolExecutor getPool(String threadName, Integer coreSize, Integer maxSize, Integer aliveMilliSecondTime, BlockingQueue<Runnable> blockingQueue, Boolean alloCoreThreadTimeout, String rejectHandler) { if (null == rejectHandler) { throw new RuntimeException("reject is null"); } RejectedExecutionHandler rejectedExecutionHandler; switch (rejectHandler) { case "abort": rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(); break; case "discard": rejectedExecutionHandler = new ThreadPoolExecutor.DiscardPolicy(); break; case "discardOldest": rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy(); break; case "callRun": rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); break; case "block": rejectedExecutionHandler = new BlockRejectedExecutionHandler(); break; default: throw new RuntimeException("not support reject:" + rejectHandler); } ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(coreSize, maxSize, aliveMilliSecondTime, TimeUnit.MILLISECONDS, blockingQueue, new ThreadFactory() { private AtomicInteger atomicInteger = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, threadName + "_" + atomicInteger.getAndIncrement()); } }, rejectedExecutionHandler); if (alloCoreThreadTimeout) { threadPoolExecutor.allowCoreThreadTimeOut(alloCoreThreadTimeout); } return threadPoolExecutor; } /** * 重写拒绝策略,用于在任务量超大情况下任务的阻塞提交,防止任务丢失 */ private class BlockRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { executor.getQueue().put(r); } catch (InterruptedException e) { log.warn(GlueConstant.LOG_PRE + "thread interrupt", e); Thread.currentThread().interrupt(); } } }}