import lombok.experimental.UtilityClass;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicInteger;import static com.isyscore.rpc.service.util.ThreadPoolFactoryUtil.RejectEnum.*;/*** 线程池工厂* <p>对于一些情况,额外新增阻塞式拒绝策略** @author shizi* @since 2020/3/3 下午8:39*/@Slf4j@UtilityClasspublic class ThreadPoolFactoryUtil {/*** 获取单例线程池** @param threadName 线程名* @return 线程池*/public ThreadPoolExecutor getSinglePool(String threadName) {return getPool(threadName, 1, 1, 0, new LinkedBlockingQueue<>(20000), false, BLOCK);}/*** 获取可变更线程池** @param threadName 线程名* @return 线程池*/public ThreadPoolExecutor getCachePool(String threadName) {return getPool(threadName, 1, Integer.MAX_VALUE, 60 * 1000, new SynchronousQueue<>(), true, ABORT);}/*** 获取固定数目线程池** @param threadName 线程名* @param coreSize 核心线程个数* @return 线程池*/public ThreadPoolExecutor getFixedPool(String threadName, Integer coreSize) {return getPool(threadName, coreSize, coreSize, 0, new LinkedBlockingQueue<>(20000), false, BLOCK);}/*** 获取固定数目线程池* <p>* 允许核心线程池超时退出** @param threadName 线程名* @param coreSize 核心线程个数* @return 线程池*/public ThreadPoolExecutor getFixedPoolOfAllowTimeout(String threadName, Integer coreSize) {return getPool(threadName, coreSize, coreSize, 60 * 1000, new LinkedBlockingQueue<>(20000), true, BLOCK);}/*** 获取对应系统核心线程个数的线程池* <p>* 不允许核心线程池超时退出** @param threadName 线程名* @return 线程池*/public ThreadPoolExecutor getSystemCorePool(String threadName) {return getSystemCorePool(threadName, 0, false, BLOCK);}/*** 获取对应系统核心线程个数的线程池** @param threadName 线程名字* @return 线程池*/public ThreadPoolExecutor getSystemCorePoolOfAllowTimeout(String threadName) {return getSystemCorePool(threadName, 60 * 1000, true, BLOCK);}/*** 系统核心线程个数倍数的线程池* <p>* 不允许核心线程池超时退出** @param threadName 线程名* @param coreSizeMultiples 核心线程数的倍数* @param maxSizeMultiples 最大线程数的倍数* @return 线程池*/public ThreadPoolExecutor getSystemCorePoolOfMulti(String threadName, Integer coreSizeMultiples, Integer maxSizeMultiples) {return getPool(threadName, Runtime.getRuntime().availableProcessors() * coreSizeMultiples, Runtime.getRuntime().availableProcessors() * maxSizeMultiples, 0,new LinkedBlockingQueue<>(20000), false, BLOCK);}/*** 系统核心线程个数倍数的线程池** @param threadName 线程名字* @return 线程池*/public ThreadPoolExecutor getSystemCorePoolOfMultiAllowTimeout(String threadName, Integer coreSizeMultiples, Integer maxSizeMultiples) {return getPool(threadName, Runtime.getRuntime().availableProcessors() * coreSizeMultiples, Runtime.getRuntime().availableProcessors() * maxSizeMultiples, 60 * 1000,new LinkedBlockingQueue<>(20000), true, BLOCK);}/*** 构造默认系统cpu个数的线程池** @param threadName 线程池名字* @param aliveSecondTime 存活时间(毫秒)* @param allowCoreThreadTimeout 是否允许核心线程超时清理* @param rejectEnum 拒绝策略* @return 线程池*/public ThreadPoolExecutor getSystemCorePool(String threadName, Integer aliveSecondTime, Boolean allowCoreThreadTimeout, RejectEnum rejectEnum) {return getPool(threadName, Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), aliveSecondTime,new LinkedBlockingQueue<Runnable>(20000), allowCoreThreadTimeout, rejectEnum);}/*** 创建线程池** @param threadName 线程名字* @param coreSize 核心线程数* @param maxSize 最大线程数* @param aliveMilliSecondTime 存活时间(毫秒)* @param blockingQueue 阻塞队列* @param allowCoreThreadTimeout 是否允许核心线程超时清理* @param rejectEnum 拒绝策略:* abort(抛异常)* discard(丢弃任务)* discardOldest(丢弃队列中最老的)* callRun(直接运行)* block(阻塞)* @return 线程池*/public ThreadPoolExecutor getPool(String threadName, Integer coreSize, Integer maxSize, Integer aliveMilliSecondTime, BlockingQueue<Runnable> blockingQueue,Boolean allowCoreThreadTimeout, RejectEnum rejectEnum) {if (null == rejectEnum) {throw new RuntimeException("reject is null");}RejectedExecutionHandler rejectedExecutionHandler;switch (rejectEnum) {case ABORT:rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();break;case DISCARD:rejectedExecutionHandler = new ThreadPoolExecutor.DiscardPolicy();break;case DISCARD_OLDEST:rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();break;case CALL_RUN:rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();break;case BLOCK:rejectedExecutionHandler = new BlockRejectedExecutionHandler();break;default:throw new RuntimeException("not support reject:" + rejectEnum.name());}ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(coreSize, maxSize, aliveMilliSecondTime, TimeUnit.MILLISECONDS, blockingQueue, new ThreadFactory() {private final AtomicInteger atomicInteger = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, threadName + "_" + atomicInteger.getAndIncrement());}}, rejectedExecutionHandler);threadPoolExecutor.allowCoreThreadTimeOut(allowCoreThreadTimeout);return threadPoolExecutor;}/*** 重写拒绝策略,用于在任务量超大情况下任务的阻塞提交,防止任务丢失*/private class BlockRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {executor.getQueue().put(r);} catch (InterruptedException e) {log.warn("thread interrupt", e);Thread.currentThread().interrupt();}}}public enum RejectEnum {/*** 异常*/ABORT,/*** 丢弃*/DISCARD,/*** 丢弃最老的*/DISCARD_OLDEST,/*** 直接运行*/CALL_RUN,/*** 阻塞等待*/BLOCK;}}
线程池帮助
import lombok.Setter;import lombok.experimental.UtilityClass;import java.util.Collection;import java.util.List;import java.util.concurrent.*;/*** @author shizi* @since 2020-12-01 17:09:23*/@UtilityClasspublic class ThreadPoolHelper {/*** 带有阻塞提交的线程池*/@Setterprivate ThreadPoolExecutor poolExecutor = ThreadPoolFactoryUtil.getSystemCorePoolOfMultiAllowTimeout("isyscore-config-client-thread", 5, 100);public void shutdown() {poolExecutor.shutdown();}public List<Runnable> shutdownNow() {return poolExecutor.shutdownNow();}public boolean isShutdown() {return poolExecutor.isShutdown();}public boolean isTerminated() {return poolExecutor.isTerminated();}public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {return poolExecutor.awaitTermination(timeout, unit);}public <T> Future<T> submit(Callable<T> task) {return poolExecutor.submit(task);}public <T> Future<T> submit(Runnable task, T result) {return poolExecutor.submit(task, result);}public Future<?> submit(Runnable task) {return poolExecutor.submit(task);}public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {return poolExecutor.invokeAll(tasks);}public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {return poolExecutor.invokeAll(tasks, timeout, unit);}public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {return poolExecutor.invokeAny(tasks);}public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {return poolExecutor.invokeAny(tasks, timeout, unit);}public void execute(Runnable command) {poolExecutor.execute(command);}}
