线程池,提供了一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁额外开销,以及线程间上下切换带来的开销,提高了响应速度。

Java 中的线程池是通过 Executor 框架实现,该框架中使用到了 Executor,Executors 工具类,ExecutorService,ThreadPoolExecutor 这几个类。

Java 第四种创建线程的方法,就是通过线程池获得。

线程体系结构

各个类关系

  • java.util.concurrent.Executor:负责线程的使用和调度的接口
    • java.util.concurrent.ExecutorService:线程池的主要接口
      • java.util.concurrent.ThreadPoolExecutor:线程池的实现类
      • java.util.concurrent.ScheduledExecutorService:子接口,负责线程的调度
        • java.util.concurrent.ScheduledThreadPoolExecutor:继承 ThreadPoolExecutor **ScheduledExecutorService**

image.png
**
java.util.concurrent.Executors 工具类,可以创建线程

  • ExecutorService newFixedThreadPool 创建固定大小的线程池
  • ExecutorService newCachedThreadPool() 创建缓存线程池,线程池大小不固定,根据需求自动更改数量
  • ExecutorService newSingleThreadExecutor() 创建简单线程池,线程池就一个线程

java.util.concurrent.ScheduledExecutorService 创建固定大小的线程池,用于操作定时和延迟的任务。

创建固定大小的线程池

ExecutorService newFixedThreadPool 创建固定大小的线程池。
适用于执行长期任务。

image.png
创建一个定长的线程池,可控制线程的最大线程数,超出的线程会在线程队列中阻塞等待。
newFixedThreadPool 创建线程池 corePoolSize 和 maximumPoolSize 是相等的,他使用的内部阻塞队列是LinkedBlockingQueue。

测试案例

  1. package com.binfoo.type.fixed;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. public class ThreadPoolThreadFixedType {
  5. public void test00() {
  6. // 创建固定大小的连接池
  7. ExecutorService executorService = Executors.newFixedThreadPool(5);
  8. for (int index = 0; index < 10; index++) {
  9. executorService.submit(new Runnable() {
  10. @Override
  11. public void run() {
  12. System.out.println("Hello World," + Thread.currentThread().getName());
  13. }
  14. });
  15. }
  16. }
  17. public static void main(String[] args) {
  18. ThreadPoolThreadFixedType threadPoolThreadFixedType = new ThreadPoolThreadFixedType();
  19. threadPoolThreadFixedType.test00();
  20. }
  21. }

创建单一线程的线程池

ExecutorService newSingleThreadExecutor() 创建简单线程池,线程池就一个线程。

image.png
newSingleThreadExecutor 创建线程池 corePoolSize 和 maximumPoolSize 都是一,他使用的内部阻塞队列是LinkedBlockingQueue。
和 newFixedThreadPool 都是一样的,不同的是线程池线程的数量只能是一。

测试案例

  1. package com.binfoo.type.single;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. public class ThreadPoolThreadSingleType {
  5. public void test00() {
  6. ExecutorService executorService = Executors.newSingleThreadExecutor();
  7. for (int index = 0; index < 10; index++) {
  8. executorService.submit(new Runnable() {
  9. @Override
  10. public void run() {
  11. System.out.println("Hello World," + Thread.currentThread().getName());
  12. }
  13. });
  14. }
  15. }
  16. public static void main(String[] args) {
  17. ThreadPoolThreadSingleType threadPoolThreadSingleType = new ThreadPoolThreadSingleType();
  18. threadPoolThreadSingleType.test00();
  19. }
  20. }

创建按需线程数量的线程池

ExecutorService newCachedThreadPool() 创建缓存线程池,线程池大小不固定,根据需求自动更改数量。

image.png
创建一个可缓存的线程池,如果线程池长度超过了处理需要,可灵活回收空闲线程,若无可回收线程,则新建线程。
newCachedThreadPool 设置的 corePoolSize 为零,maximumPoolSize 是 Integer.MAX_VALUE(JVM 理论最大线程数量),使用的是 SynchronousQueue 同步阻塞队列,单线程空闲超过 60 秒则销毁。

测试案例

  1. package com.binfoo.type.cached;
  2. import com.binfoo.type.fixed.ThreadPoolThreadFixedType;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.Executors;
  5. public class ThreadPoolThreadCachedType {
  6. public void test00() {
  7. ExecutorService service = Executors.newCachedThreadPool();
  8. for (int index = 0; index < 10; index++) {
  9. service.submit(() -> System.out.println("Hello World," + Thread.currentThread().getName()));
  10. }
  11. }
  12. public static void main(String[] args) {
  13. ThreadPoolThreadCachedType threadPoolThreadCachedType = new ThreadPoolThreadCachedType();
  14. threadPoolThreadCachedType.test00();
  15. }
  16. }

线程池参数说明

线程池具体实现是 ThreadPoolExecutor 这个类,对应线程参数源码如下。
java.util.concurrent.ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue, java.util.concurrent.ThreadFactory, java.util.concurrent.RejectedExecutionHandler)
image.png
image.png

corePoolSize 线程池中的常驻线程数
maximumPoolSize 线程池中能够容纳同时执行的最大线程数
keepAliveTime 多余的空闲线程的存活时间
unit keepAliveTime 时间的单位
workQueue 任务队列,被提交但是尚未执行的队列
threadFactory 表示生成该线程池的工厂类,用于创建线程池的一般默认即可。
handler 拒绝策略,表示队列满了的处理策略

线程池的工作原理

当前需要处理的任务进入线程池,线程池会根据常驻线程数(线程核心数)进行分配任务,如果需要处理的任务大于核心常驻线程数,则线程池会新增线程直至线程数等于线程池预先设置的最大线程数上线。

如果线程池的线程都已经在处理,那么剩余的任务会进入阻塞队列进行等待。

如果所有的任务都处理完成,非常驻线程数的存活时间长于线程池设置的存活时间,则超出的线程就会被销毁。
最终该线程池会缩减至和常驻线程数大小一直的量。

正式的项目开发中,一般都不用 Executors 工具类创建线程池,都是需要手写 ThreadPoolExecutor 类。

线程池的拒绝策略

当线程池的线程达到最大值,并且线程中的缓存队列也得到最大值,这个时候需要走到通过拒绝策略对继续进入的任务进行拦截,根据不同的策略决定不同的走向。

AbortPolicy 直接抛出异常,默认策略
CallerRunsPolicy 用调用者所在的线程来执行任务
DiscardOldestPolicy 丢弃阻塞队列中靠最前的任务(等待最久的任务最先丢弃),并执行当前任务
DiscardPolicy 直接丢弃任务

默认的策略就是 AbortPolicy ,直接抛出异常。
image.png
都是实现了 RejectedExecutionHandler 接口。

AboryPoliy 策略

AbortPolicy 直接抛出异常,默认策略。

测试案例
配置五个最大线程,四个缓存队列数,如果同时十个线程进入则直接报异常。

  1. package com.binfoo.reject.abortpoliy;
  2. import java.util.concurrent.ArrayBlockingQueue;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.ThreadPoolExecutor;
  5. import java.util.concurrent.TimeUnit;
  6. public class AboryPoliyTest {
  7. public static void main(String[] args) {
  8. ExecutorService executor = new ThreadPoolExecutor(5, 5, 60,
  9. TimeUnit.SECONDS,
  10. new ArrayBlockingQueue<>(4));
  11. for (int index = 0; index < 10; index++) {
  12. executor.submit(new Runnable() {
  13. @Override
  14. public void run() {
  15. System.out.println("Hello World!");
  16. }
  17. });
  18. }
  19. executor.shutdown();
  20. }
  21. }

执行结果如下。
image.png

CallerRunsPolicy 策略

当任务进入线程池时候,线程数已经处于满载状态,则直接用调用者所在的线程来执行任务。

测试案例

  1. package com.binfoo.reject.callerruns;
  2. import java.util.concurrent.ArrayBlockingQueue;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.ThreadPoolExecutor;
  5. import java.util.concurrent.TimeUnit;
  6. public class CallerRunsTest {
  7. public static void main(String[] args) {
  8. ExecutorService executor = new ThreadPoolExecutor(5, 5, 60,
  9. TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),
  10. new ThreadPoolExecutor.CallerRunsPolicy());
  11. for (int index = 0; index < 20; index++) {
  12. executor.submit(() -> {
  13. if (!"main".equals(Thread.currentThread().getName())) {
  14. try {
  15. Thread.sleep(10000);
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. System.out.println("当前执行线程名称:" + Thread.currentThread().getName());
  21. });
  22. }
  23. executor.shutdown();
  24. }
  25. }

执行结果如下。
出来线程池的核心线程执行之外,剩余的通过主线程操作。
image.png

DiscardOldestPolicy 策略

丢弃阻塞队列中靠最前的任务(等待最久的任务最先丢弃),并执行当前任务。
测试案例忽略。

DiscardPolicy 策略

DiscardPolicy 直接丢弃任务。

测试案例

  1. package com.binfoo.reject.dicard;
  2. import java.util.concurrent.ArrayBlockingQueue;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.ThreadPoolExecutor;
  5. import java.util.concurrent.TimeUnit;
  6. public class DiscardTest {
  7. public static void main(String[] args) {
  8. ExecutorService service = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
  9. new ArrayBlockingQueue<>(5), new ThreadPoolExecutor.DiscardPolicy());
  10. for (int index = 0; index < 5; index++) {
  11. service.submit(new Runnable() {
  12. @Override
  13. public void run() {
  14. if (!"main".equals(Thread.currentThread().getName())) {
  15. try {
  16. Thread.sleep(5000);
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. System.out.println("当前执行线程名称:" + Thread.currentThread().getName());
  22. }
  23. });
  24. }
  25. service.shutdown();
  26. }
  27. }

执行结果如下。
image.png

参考资料

Java 线程池原理及源码分析