线程池基础知识

线程池带来的好处很多,譬如降低资源开销、方便线程维护等,线程池在日常的开发中也经常被用到。以下面的代码为例(在生产环境不建议这么用):

  1. private static final int POOL_SIZE = 4;
  2. ExecutorService executorService = Executors.newFixedThreadPool(POOL_SIZE);
  3. executorService.execute(() -> {
  4. System.out.println("current thread running");
  5. });

上面的代码通过Executors工具类创建了具有4个线程的线程池,具体的创建线程池构造方法如下:

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. }

具体解释下这几个参数:

  • corePoolSize,线程池中的核心线程数,当一个任务被提交后,如果当前线程数小于核心线程数,会创建一个新的线程执行当前任务;
  • maximumPoolSize,线程池中的最大线程数,如果工作队列满了,当前还有可用的工作线程,那么创建一个工作线程;

    核心线程和最大线程数类似于数据库中的最小-最大链接数。

  • keepAliveTime,如果当前线程数大于核心线程数,在给定的时间内没有新的任务,超过核心线程数的这部分线程会被终止掉;
  • unit,时间单位
  • workQueue,如果核心线程池满了,新的任务会加入到阻塞队列中,默认使用的是LinkedBlockingQueue,队列默认的最大长度是Integer.MAX_VALUE
  • threadFactory,创建线程的工厂类,如果没有指定,使用默认的线程工厂类
  • handler,当线程池和工作队列满后,新提交的任务会被拒绝,拒绝的策略在上面构造方法的最后一个参数中体现,如果没有特殊要求,使用默认的拒绝策略(AbortPolicy)就可以,直接抛出RejectedExecutionException。除此之外,还有另外三种拒绝策略:DiscardPolicy会拒绝掉新提交的任务,而且不会有任何提示;DiscardOldestPolicy会拒绝掉最老的任务,然后重新调用ThreadPoolExecutor.execute(Runnable r);CallerRunsPolicy会在主线程中直接run该任务。

线程池执行流程.png

线程池使用

  • 使用线程池时要设置阻塞队列的大小,否则阻塞队列可以无限大,那么任务可能会源源不断的加入到阻塞队列中,这样会造成内存溢出。当用Executors创建SingleThreadExecutor时,也会出现同样的问题,所以建议手动的方式创建线程池:
  1. private static final int threadPoolCount = Runtime.getRuntime().availableProcessors() + 1;
  2. ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("example-pool-%d").setDaemon(true).build();
  3. ExecutorService pool = new ThreadPoolExecutor(threadPoolCount, threadPoolCount, 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingDeque<Runnable>(20), factory,
  5. new ThreadPoolExecutor.AbortPolicy());
  6. pool.execute(() -> System.out.println(Thread.currentThread().getName()));
  7. pool.shutdown();

ThreadFactory用于描述线程池中的每个线程,如果没有指定,使用默认的线程工厂类(Executors.defaultThreadFactory()),Guava提供了创建线程工厂的构造器。

  • 尽量减少线程池作为局部变量,以下面的代码为例,每次方法调用都去创建一个线程池,如果存在大量的方法调用,那么线程数会迅速膨胀。另外一点需要注意的是,使用完一定要调用shutdown方法关闭线程池。
  1. public void poolTest2(int count) {
  2. ExecutorService localExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
  3. for (int i = 0; i < count; i++) {
  4. localExecutor.execute(() -> {
  5. //do something
  6. });
  7. }
  8. localExecutor.shutdown();
  9. }
  • 使用线程池时,要注意共享变量的线程安全问题,例如下面的demo,由于ArrayList不是线程安全的集合,在并发执行代码的过程中,有可能得不到正确结果。
  1. public void poolTest3(int count) {
  2. List<Integer> result = Lists.newArrayList();
  3. ExecutorService localExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
  4. for (int i = 0; i < count; i++) {
  5. executorService.execute(() -> {
  6. for (int j = 0; j < 100; j++) {
  7. result.add(j);
  8. }
  9. });
  10. }
  11. System.out.println(result.size());
  12. }

换成下面这种方式就不会出现线程安全问题,但是注意future.get(),记得要加上超时时间,因为如果存在被抛弃的任务,那么就会无限的循环等待。

  1. public void poolTest4(int count) {
  2. List<Integer> result = Lists.newArrayList();
  3. List<Future<List>> futures = Lists.newArrayList();
  4. ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
  5. for (int i = 0; i < count; i++) {
  6. futures.add(executorService.submit(new Callable<List>() {
  7. @Override
  8. public List call() throws Exception {
  9. List<Integer> localResult = Lists.newArrayList();
  10. for (int j = 0; j < 100; j++) {
  11. localResult.add(j);
  12. }
  13. return localResult;
  14. }
  15. }));
  16. }
  17. futures.forEach((future) -> {
  18. try {
  19. *result.addAll(future.get(3,TimeUnit.SECONDS));*
  20. } catch (Exception e) {
  21. }
  22. });
  23. System.out.println(result.size());
  24. }

参考文献

[1]http://bijian1013.iteye.com/blog/2307676
[]https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html