import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import static com.isyscore.rpc.service.util.ThreadPoolFactoryUtil.RejectEnum.*;
/**
* 线程池工厂
* <p>对于一些情况,额外新增阻塞式拒绝策略
*
* @author shizi
* @since 2020/3/3 下午8:39
*/
@Slf4j
@UtilityClass
public class ThreadPoolFactoryUtil {
/**
* 获取单例线程池
*
* @param threadName 线程名
* @return 线程池
*/
public ThreadPoolExecutor getSinglePool(String threadName) {
return getPool(threadName, 1, 1, 0, new LinkedBlockingQueue<>(20000), false, BLOCK);
}
/**
* 获取可变更线程池
*
* @param threadName 线程名
* @return 线程池
*/
public ThreadPoolExecutor getCachePool(String threadName) {
return getPool(threadName, 1, Integer.MAX_VALUE, 60 * 1000, new SynchronousQueue<>(), true, ABORT);
}
/**
* 获取固定数目线程池
*
* @param threadName 线程名
* @param coreSize 核心线程个数
* @return 线程池
*/
public ThreadPoolExecutor getFixedPool(String threadName, Integer coreSize) {
return getPool(threadName, coreSize, coreSize, 0, new LinkedBlockingQueue<>(20000), false, BLOCK);
}
/**
* 获取固定数目线程池
* <p>
* 允许核心线程池超时退出
*
* @param threadName 线程名
* @param coreSize 核心线程个数
* @return 线程池
*/
public ThreadPoolExecutor getFixedPoolOfAllowTimeout(String threadName, Integer coreSize) {
return getPool(threadName, coreSize, coreSize, 60 * 1000, new LinkedBlockingQueue<>(20000), true, BLOCK);
}
/**
* 获取对应系统核心线程个数的线程池
* <p>
* 不允许核心线程池超时退出
*
* @param threadName 线程名
* @return 线程池
*/
public ThreadPoolExecutor getSystemCorePool(String threadName) {
return getSystemCorePool(threadName, 0, false, BLOCK);
}
/**
* 获取对应系统核心线程个数的线程池
*
* @param threadName 线程名字
* @return 线程池
*/
public ThreadPoolExecutor getSystemCorePoolOfAllowTimeout(String threadName) {
return getSystemCorePool(threadName, 60 * 1000, true, BLOCK);
}
/**
* 系统核心线程个数倍数的线程池
* <p>
* 不允许核心线程池超时退出
*
* @param threadName 线程名
* @param coreSizeMultiples 核心线程数的倍数
* @param maxSizeMultiples 最大线程数的倍数
* @return 线程池
*/
public ThreadPoolExecutor getSystemCorePoolOfMulti(String threadName, Integer coreSizeMultiples, Integer maxSizeMultiples) {
return getPool(threadName, Runtime.getRuntime().availableProcessors() * coreSizeMultiples, Runtime.getRuntime().availableProcessors() * maxSizeMultiples, 0,
new LinkedBlockingQueue<>(20000), false, BLOCK);
}
/**
* 系统核心线程个数倍数的线程池
*
* @param threadName 线程名字
* @return 线程池
*/
public ThreadPoolExecutor getSystemCorePoolOfMultiAllowTimeout(String threadName, Integer coreSizeMultiples, Integer maxSizeMultiples) {
return getPool(threadName, Runtime.getRuntime().availableProcessors() * coreSizeMultiples, Runtime.getRuntime().availableProcessors() * maxSizeMultiples, 60 * 1000,
new LinkedBlockingQueue<>(20000), true, BLOCK);
}
/**
* 构造默认系统cpu个数的线程池
*
* @param threadName 线程池名字
* @param aliveSecondTime 存活时间(毫秒)
* @param allowCoreThreadTimeout 是否允许核心线程超时清理
* @param rejectEnum 拒绝策略
* @return 线程池
*/
public ThreadPoolExecutor getSystemCorePool(String threadName, Integer aliveSecondTime, Boolean allowCoreThreadTimeout, RejectEnum rejectEnum) {
return getPool(threadName, Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), aliveSecondTime,
new LinkedBlockingQueue<Runnable>(20000), allowCoreThreadTimeout, rejectEnum);
}
/**
* 创建线程池
*
* @param threadName 线程名字
* @param coreSize 核心线程数
* @param maxSize 最大线程数
* @param aliveMilliSecondTime 存活时间(毫秒)
* @param blockingQueue 阻塞队列
* @param allowCoreThreadTimeout 是否允许核心线程超时清理
* @param rejectEnum 拒绝策略:
* abort(抛异常)
* discard(丢弃任务)
* discardOldest(丢弃队列中最老的)
* callRun(直接运行)
* block(阻塞)
* @return 线程池
*/
public ThreadPoolExecutor getPool(String threadName, Integer coreSize, Integer maxSize, Integer aliveMilliSecondTime, BlockingQueue<Runnable> blockingQueue,
Boolean allowCoreThreadTimeout, RejectEnum rejectEnum) {
if (null == rejectEnum) {
throw new RuntimeException("reject is null");
}
RejectedExecutionHandler rejectedExecutionHandler;
switch (rejectEnum) {
case ABORT:
rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
break;
case DISCARD:
rejectedExecutionHandler = new ThreadPoolExecutor.DiscardPolicy();
break;
case DISCARD_OLDEST:
rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
break;
case CALL_RUN:
rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
break;
case BLOCK:
rejectedExecutionHandler = new BlockRejectedExecutionHandler();
break;
default:
throw new RuntimeException("not support reject:" + rejectEnum.name());
}
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(coreSize, maxSize, aliveMilliSecondTime, TimeUnit.MILLISECONDS, blockingQueue, new ThreadFactory() {
private final AtomicInteger atomicInteger = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, threadName + "_" + atomicInteger.getAndIncrement());
}
}, rejectedExecutionHandler);
threadPoolExecutor.allowCoreThreadTimeOut(allowCoreThreadTimeout);
return threadPoolExecutor;
}
/**
* 重写拒绝策略,用于在任务量超大情况下任务的阻塞提交,防止任务丢失
*/
private class BlockRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
log.warn("thread interrupt", e);
Thread.currentThread().interrupt();
}
}
}
public enum RejectEnum {
/**
* 异常
*/
ABORT,
/**
* 丢弃
*/
DISCARD,
/**
* 丢弃最老的
*/
DISCARD_OLDEST,
/**
* 直接运行
*/
CALL_RUN,
/**
* 阻塞等待
*/
BLOCK;
}
}
线程池帮助
import lombok.Setter;
import lombok.experimental.UtilityClass;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
/**
* @author shizi
* @since 2020-12-01 17:09:23
*/
@UtilityClass
public class ThreadPoolHelper {
/**
* 带有阻塞提交的线程池
*/
@Setter
private ThreadPoolExecutor poolExecutor = ThreadPoolFactoryUtil.getSystemCorePoolOfMultiAllowTimeout("isyscore-config-client-thread", 5, 100);
public void shutdown() {
poolExecutor.shutdown();
}
public List<Runnable> shutdownNow() {
return poolExecutor.shutdownNow();
}
public boolean isShutdown() {
return poolExecutor.isShutdown();
}
public boolean isTerminated() {
return poolExecutor.isTerminated();
}
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return poolExecutor.awaitTermination(timeout, unit);
}
public <T> Future<T> submit(Callable<T> task) {
return poolExecutor.submit(task);
}
public <T> Future<T> submit(Runnable task, T result) {
return poolExecutor.submit(task, result);
}
public Future<?> submit(Runnable task) {
return poolExecutor.submit(task);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return poolExecutor.invokeAll(tasks);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return poolExecutor.invokeAll(tasks, timeout, unit);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return poolExecutor.invokeAny(tasks);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return poolExecutor.invokeAny(tasks, timeout, unit);
}
public void execute(Runnable command) {
poolExecutor.execute(command);
}
}