线程池基础知识
线程池带来的好处很多,譬如降低资源开销、方便线程维护等,线程池在日常的开发中也经常被用到。以下面的代码为例(在生产环境不建议这么用):
private static final int POOL_SIZE = 4;
ExecutorService executorService = Executors.newFixedThreadPool(POOL_SIZE);
executorService.execute(() -> {
System.out.println("current thread running");
});
上面的代码通过Executors工具类创建了具有4个线程的线程池,具体的创建线程池构造方法如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
具体解释下这几个参数:
- corePoolSize,线程池中的核心线程数,当一个任务被提交后,如果当前线程数小于核心线程数,会创建一个新的线程执行当前任务;
- maximumPoolSize,线程池中的最大线程数,如果工作队列满了,当前还有可用的工作线程,那么创建一个工作线程;
核心线程和最大线程数类似于数据库中的最小-最大链接数。
- keepAliveTime,如果当前线程数大于核心线程数,在给定的时间内没有新的任务,超过核心线程数的这部分线程会被终止掉;
- unit,时间单位
- workQueue,如果核心线程池满了,新的任务会加入到阻塞队列中,默认使用的是LinkedBlockingQueue,队列默认的最大长度是Integer.MAX_VALUE
- threadFactory,创建线程的工厂类,如果没有指定,使用默认的线程工厂类
- handler,当线程池和工作队列满后,新提交的任务会被拒绝,拒绝的策略在上面构造方法的最后一个参数中体现,如果没有特殊要求,使用默认的拒绝策略(AbortPolicy)就可以,直接抛出RejectedExecutionException。除此之外,还有另外三种拒绝策略:DiscardPolicy会拒绝掉新提交的任务,而且不会有任何提示;DiscardOldestPolicy会拒绝掉最老的任务,然后重新调用ThreadPoolExecutor.execute(Runnable r);CallerRunsPolicy会在主线程中直接run该任务。
线程池使用
- 使用线程池时要设置阻塞队列的大小,否则阻塞队列可以无限大,那么任务可能会源源不断的加入到阻塞队列中,这样会造成内存溢出。当用Executors创建SingleThreadExecutor时,也会出现同样的问题,所以建议手动的方式创建线程池:
private static final int threadPoolCount = Runtime.getRuntime().availableProcessors() + 1;
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("example-pool-%d").setDaemon(true).build();
ExecutorService pool = new ThreadPoolExecutor(threadPoolCount, threadPoolCount, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<Runnable>(20), factory,
new ThreadPoolExecutor.AbortPolicy());
pool.execute(() -> System.out.println(Thread.currentThread().getName()));
pool.shutdown();
ThreadFactory用于描述线程池中的每个线程,如果没有指定,使用默认的线程工厂类(Executors.defaultThreadFactory()),Guava提供了创建线程工厂的构造器。
- 尽量减少线程池作为局部变量,以下面的代码为例,每次方法调用都去创建一个线程池,如果存在大量的方法调用,那么线程数会迅速膨胀。另外一点需要注意的是,使用完一定要调用shutdown方法关闭线程池。
public void poolTest2(int count) {
ExecutorService localExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
for (int i = 0; i < count; i++) {
localExecutor.execute(() -> {
//do something
});
}
localExecutor.shutdown();
}
- 使用线程池时,要注意共享变量的线程安全问题,例如下面的demo,由于ArrayList不是线程安全的集合,在并发执行代码的过程中,有可能得不到正确结果。
public void poolTest3(int count) {
List<Integer> result = Lists.newArrayList();
ExecutorService localExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
for (int i = 0; i < count; i++) {
executorService.execute(() -> {
for (int j = 0; j < 100; j++) {
result.add(j);
}
});
}
System.out.println(result.size());
}
换成下面这种方式就不会出现线程安全问题,但是注意future.get(),记得要加上超时时间,因为如果存在被抛弃的任务,那么就会无限的循环等待。
public void poolTest4(int count) {
List<Integer> result = Lists.newArrayList();
List<Future<List>> futures = Lists.newArrayList();
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
for (int i = 0; i < count; i++) {
futures.add(executorService.submit(new Callable<List>() {
@Override
public List call() throws Exception {
List<Integer> localResult = Lists.newArrayList();
for (int j = 0; j < 100; j++) {
localResult.add(j);
}
return localResult;
}
}));
}
futures.forEach((future) -> {
try {
*result.addAll(future.get(3,TimeUnit.SECONDS));*
} catch (Exception e) {
}
});
System.out.println(result.size());
}
参考文献
[1]http://bijian1013.iteye.com/blog/2307676
[]https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html