ThreadPoolTaskExecutor是spring core包中的。
ThreadPoolExecutor是JDK中的JUC。
ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理。
1.自定义线程池
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class NamedThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final AtomicInteger threadNumber = new AtomicInteger(1);
private String namePrefix;
private final ThreadGroup group;
public NamedThreadFactory( String name ) {
this.namePrefix = namePrefix = name + "-" + poolNumber.getAndIncrement() + "-thread-";
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class ThreadPoolConfig {
@Bean
public ThreadPoolTaskExecutor myThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor myThreadPoolTaskExecutor = new ThreadPoolTaskExecutor();
//线程池维护线程的最少数量
/*
*
* N为cpu个数
* 如果是CPU密集型应用,则线程池大小设置为N+1
* 如果是IO密集型应用,则线程池大小设置为2N+1
* 此方法返回可用处理器的虚拟机的最大数量; 不小于1
* Runtime.getRuntime().availableProcessors();
* linux查询方式:
* cat /proc/cpuinfo| grep "processor"| wc -l
*/
//配置核心线程数
myThreadPoolTaskExecutor.setCorePoolSize(5);
//配置最大线程数
myThreadPoolTaskExecutor.setMaxPoolSize(6);
//配置队列大小
myThreadPoolTaskExecutor.setQueueCapacity(200);
//设置保活秒数
myThreadPoolTaskExecutor.setKeepAliveSeconds(60);
//配置线程池中的线程的名称前缀
myThreadPoolTaskExecutor.setThreadNamePrefix("Async-Thread-");
//rejection-policy:设置拒绝策略,当pool已经达到max size的时候,如何处理新任务
//CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
myThreadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//设置等待任务完成时关机
myThreadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
//设置等待终止时间
myThreadPoolTaskExecutor.setAwaitTerminationSeconds(60);
//执行初始化
myThreadPoolTaskExecutor.initialize();
// 设置线程池中的线程的名称前缀(创建线程或线程池时请指定有意义的线程名称,方便出错时回溯)
myThreadPoolTaskExecutor.setThreadFactory(new NamedThreadFactory("Async-ThreadPool-"));
return myThreadPoolTaskExecutor;
}
}
2.某个实现类中的某个方法使用线程池
import com.xqny.logtest.service.ExecutorService;
import lombok.SneakyThrows;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;
/**
* @author Raint
* @date 2022年04月13日 11:37
*/
@Service("ExecutorService")
public class ExecutorServiceImpl implements ExecutorService {
@Resource
private ThreadPoolTaskExecutor myThreadPoolTaskExecutor; // myThreadPoolTaskExecutor 为beanName
@Override
@SneakyThrows
public void testasync(){
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("业务线程1");
}, myThreadPoolTaskExecutor);
CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("业务线程2");
}, myThreadPoolTaskExecutor);
// 等待两个线程执行完
CompletableFuture.allOf(completableFuture1, completableFuture2).get();
return;
}
}
第一个线程睡眠2秒,第二个现场睡眠5秒,异步执行时间为5秒,而不是7秒