JDK提供的线程池结构:

image.png

1) 线程池状态

ThreadPoolExecutor 使用 int的高3位来表示线程池状态,低29位表示线程数量

image.png

从数字上比较, TERMINATED> TIDYING> STOP> SHUTDOWN >RUNNING

线程池状态信息(线程池状态+线程数量)存储在一个原子变量ctl中,目的是将线程池状态与线程个数合二为一,且保证对ctl的操作原子化,可以用一次cas原子操作进行赋值以改变线程池信息:

  1. // C为旧值,ctlOf返回结果为新值
  2. ctl.compareAndSet(c,ctlOf(targetState,workerCountOf(c)));
  3. // rs为高3位代表线程池状态,wc为低29位代表线程个数,ctl是合并它们
  4. private static int ctlOf(int rs, int wc){ return rs| wc;}

2) 构造方法

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long KeepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler)
  • corePoolSize 核心线程数目(最多保留的线程数)
  • maximumPoolSize 最大线程数目
  • keepAliveTime 生存时间-针对救急线程,指定救急线程无任务可做后,多久销毁
  • unit 时间单位-针对救急线程,销毁时间的单位
  • workQueue 阻塞队列
  • threadFactory 线程工厂-可以为线程创建时起一个好名字
  • handler 拒绝策略

与自定义线程池做区分:

1、线程池分为两种类型的线程:核心线程、救急线程
2、corePoolSize指线程池可以容纳的最大核心线程数目,maximumPoolSize指线程池可以容纳两种线程的最大数量,maximumPoolSize - corePoolSize == 救急线程数
3、当有新任务,核心线程优先处理任务,如果核心线程已经达到coreSize,任务被存进阻塞队列中。若阻塞队列已满,则新任务会被交给救急线程处理。当线程池中线程数量达到maximumPoolSize无法再处理新任务,这时才会执行拒绝策略。
4、核心线程没有生存时间限制,处理完任务后仍会被保存在线程池中(注意一般线程在执行完run方法后就被销毁,但线程池实现策略使得线程池中的核心线程一直保留在线程池中);而救急线程有生存时间限制,处理完任务后会被立即销毁掉(外包人员emoj)、

  1. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/23190196/1647747286179-53abe3d2-215e-4b78-b2da-31ce97df9fcc.png#clientId=u25df96f1-3111-4&from=paste&height=471&id=u5fbf05ca&margin=%5Bobject%20Object%5D&name=image.png&originHeight=831&originWidth=806&originalType=binary&ratio=1&size=107830&status=done&style=none&taskId=u4ed5dd40-7e0c-4600-96fe-0fb05b61a4b&width=456.9747314453125)

线程池运行流程:

  • 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务
  • 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue队列排队,直到有空闲的线程
  • 如果队列选择有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize数目的线程来救急
  • 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略,jdk提供了4种拒绝策略的实现,其它著名框架也提供了实现


    - AbortPolicy,让调用者抛出RejectedExecutionException异常,这是默认策略
    - CallerRunsPolicy,让调用者运行任务
    - DiscardPolicy,放弃本次任务
    - DiscardOldestPolicy,放弃队列中最早的任务,本任务取而代之
    - Dubbo的实现,抛出RejectedExecutionException异常之前会记录日志,并dump线程栈信息,方便定位问题
    - Netty的实现,创建一个新线程来执行任务
    - ActiveMQ的实现,带超时等待(60s)尝试放入队列,类似之前自定义的拒绝策略
    - PinPoint的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略

    1. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/23190196/1647757825814-e7551c62-c45f-4fe0-9fd4-c450d629f20d.png#clientId=u3cc09a9a-9541-4&from=paste&height=143&id=ub956118e&margin=%5Bobject%20Object%5D&name=image.png&originHeight=206&originWidth=1070&originalType=binary&ratio=1&size=117349&status=done&style=none&taskId=ud59526fe-40a9-4bb3-abd9-1fa900b2219&width=744.9957885742188)
  • 当高峰过去后,超过corePoolSize的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 来控制。

根据ThreadPoolExecutor构造方法,JDK Executors类中提供了众多工厂方法来创建各种用途的线程池,实际是对ThreadPoolExecutor构造方法的封装,通过指定构造方法ThreadPoolExecutor中不同的参数,来创建不同功能的线程池。

3) newFixedThreadPool

线程数量固定的线程池:

  1. public static ExecutorService newFixedThreadPool(int nThreads){
  2. return new ThreadPoolExecutor(nThread,nThread,0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
  3. }

特点

  • 核心线程数==最大线程数,无救急线程,因此也无需设置超时时间
  • 阻塞队列是无界的,可以存放任意数量的任务
  • 适用于任务量已知、相对耗时的任务

newFixedThreadPool线程池的使用:

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. public class TestThreadPoolExecutors {
  4. public static void main(String[] args) {
  5. ExecutorService pool = Executors.newFixedThreadPool(2);
  6. pool.execute(()->{
  7. System.out.println(1);
  8. });
  9. pool.execute(()->{
  10. System.out.println(1);
  11. });
  12. pool.execute(()->{
  13. System.out.println(1);
  14. });
  15. }
  16. }

线程池的execute方法,传入Runnable对象,因为Runnable是函数式接口,使用lambda表达式直接创建其实现类对象即可。
调用一次execute方法则相当于向线程池传入一个任务。

newFixedThreadPool线程池的使用,且自定义线程工厂:

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. import java.util.concurrent.ThreadFactory;
  4. import java.util.concurrent.atomic.AtomicInteger;
  5. public class TestThreadPoolExecutors {
  6. public static void main(String[] args) {
  7. ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {
  8. private AtomicInteger t = new AtomicInteger(1);
  9. @Override
  10. //实现newThread方法,创建线程,因为线程工厂的作用就是来创建线程
  11. public Thread newThread(Runnable r) {
  12. return new Thread(r,"mypool_t" + t.getAndIncrement());
  13. }
  14. });
  15. pool.execute(()->{
  16. System.out.println(1);
  17. });
  18. pool.execute(()->{
  19. System.out.println(1);
  20. });
  21. pool.execute(()->{
  22. System.out.println(1);
  23. });
  24. }
  25. }

4) newCachedThreadPool

带缓冲的线程池:

  1. public static ExecutorService newCachedThreadPool{
  2. return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
  3. }

特点

  • 核心线程数是0,最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是60s,意味着

    1. - 线程池中线程均是救急线程 60s后被回收)<br /> - 救急线程可以无限创建
  • 阻塞队列采用了 SynchronousQueue 实现特点是,该队列没有容量,但如果某一线程在向队列中存放任务时,没有其它线程来取,这一线程是无法将任务存放进任务队列中的(一手交钱、一手交货),该队列更像是用来在两个线程间交换任务

  • 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲1分钟后释放线程
  • 适合任务数比较密集,但每个任务执行时间较短的情况

5) newSingleThreadExecutor

单线程的线程池:

  1. public static ExecutorService newSingleThreadExecutor(){
  2. return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1,1,0L,
  3. TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>()));
  5. }

使用场景:
希望多个任务排队执行,线程数固定为1,任务数多于1时,会放入无界队列排队。任务执行完毕,这唯一的线程不会被释放

区别:

  • 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作。
  • Executors.newSingleThreadExecutor()线程个数始终为1,不能修改

    1. - FinalizableDelegatedExecutorService应用的是装饰器模式,只对外暴露了ExecutorService接口,因此不能调用ThreadPoolExecutor中独有的实现方法,而只能调用从ExecutorService继承/实现的公共方法,如shutdownstop等方法
  • Executors.newFixedThreadPool(1)初始为1,虽然实现效果与newSingleThreadExecutor()效果一致,但之后还可以手动修改线程池内线程容量

    1. - 对外暴露的是 ThreadPoolExecutor对象,可以从拿到的ExecutorService引用强转为(ThreadPoolExecutor)类对象,从而调用如 setCorePoolSize方法以恶意修改线程池信息。

注意,一个线程正常执行过程中如果出现异常,那么线程则会强制终止,与进程遇到错误而停止是一个道理,除非线程内部代码使用try-catch块处理异常。
下例中创建单线程池,执行到 int i = 1/0这行代码时,会抛出异常,但线程池中的唯一线程不会终止,线程池会重新创建一个线程以保证接下来任务的顺利执行。

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. public class TestSingleThreadExecutor {
  4. public static void main(String[] args) {
  5. //创建单线程池
  6. ExecutorService pool = Executors.newSingleThreadExecutor();
  7. pool.execute(()->{
  8. System.out.println(1);
  9. int i = 1/0;
  10. });
  11. pool.execute(()->{
  12. System.out.println(2);
  13. });
  14. pool.execute(()->{
  15. System.out.println(3);
  16. int i = 1/0;
  17. });
  18. }
  19. }

6) 提交任务

创建线程池会需要向线程池中提交任务以执行,除了基本的execute方法外有其它提交任务的方式:
image.png

1、execute

execute方法直接传入Runnable对象,这里的“任务”指的是Runnable对象,方法无返回结果。

2、submit

submit方法传入Callable对象,这里的任务指的是Callable对象(注意线程实现方式除了Runnable,还有Callable)。此方法返回任务执行结果,其中T表示返回结果的类型。

任务执行结果以Future对象的形式返回,与Runnable对象的run()方法不同,Callable是call()方法;

  1. import java.util.concurrent.*;
  2. public class TestSingleThreadExecutor {
  3. public static void main(String[] args) throws ExecutionException, InterruptedException {
  4. //创建单线程池
  5. ExecutorService pool = Executors.newSingleThreadExecutor();
  6. //返回任务执行结果,Future对象
  7. Future<String> future = pool.submit(new Callable<String>() {
  8. @Override
  9. public String call() throws Exception {
  10. Thread.sleep(1000);
  11. return "ok";
  12. }
  13. });
  14. System.out.println(future.get());
  15. }
  16. }

3、invokeAll

invokeAll指以任务集合的形式向线程池提交任务,其中集合中存储对象的类型为Callable类,集合类的选取不固定,方法返回值是List>

  1. import java.util.Arrays;
  2. import java.util.List;
  3. import java.util.concurrent.ExecutionException;
  4. import java.util.concurrent.ExecutorService;
  5. import java.util.concurrent.Executors;
  6. import java.util.concurrent.Future;
  7. public class TestInovkeAll {
  8. public static void main(String[] args) throws InterruptedException {
  9. //创建容量为2的线程池
  10. ExecutorService executorService = Executors.newFixedThreadPool(2);
  11. List<Future<String>> futures = executorService.invokeAll(Arrays.asList(
  12. () -> {
  13. System.out.println("begin");
  14. Thread.sleep(1000);
  15. return "1";
  16. },
  17. ()->{
  18. System.out.println("begin");
  19. Thread.sleep(500);
  20. return "2";
  21. }
  22. ,()->{
  23. System.out.println("begin");
  24. Thread.sleep(2000);
  25. return "3";
  26. }
  27. ));
  28. futures.forEach(f->{
  29. try {
  30. System.out.println(f.get());
  31. } catch (InterruptedException e) {
  32. e.printStackTrace();
  33. } catch (ExecutionException e) {
  34. e.printStackTrace();
  35. }
  36. });
  37. }
  38. }

本例中的实现选择Arrays.asList方法,方法中传入Callable列表,因为Callable是函数式接口,这里直接使用lambda表达式更方便。
注意,前两个任务提交给线程池后,两个线程并行执行task1、task2,task2只等待0.5秒,所以先执行,但注意主线程遍历是要等到所有任务执行完才会执行(同步等待),所以遍历的顺序仍然是1、2、3

3、invokeAny

invokeAny提交tasks中所有任务,但不会等待所有任务执行完毕(即拿到所有任务的返回结果),而是哪个任务先执行完毕,则返回此任务执行结果,其它任务取消,因为只返回一个任务结果,所以返回值类型是Object类型(T类型)

  1. <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  2. throws InterruptedException, ExecutionException;

如下例,将三个task提交给线程池(这里应该是顺序提交),前两个任务被分配到两个线程上,第三个任务由于无线程可分配,存入任务队列中。
明显任务2睡眠时间更短,所以返回task2的结果。

  1. import java.util.Arrays;
  2. import java.util.List;
  3. import java.util.concurrent.ExecutionException;
  4. import java.util.concurrent.ExecutorService;
  5. import java.util.concurrent.Executors;
  6. import java.util.concurrent.Future;
  7. public class TestInvokeAny {
  8. public static void main(String[] args) throws ExecutionException, InterruptedException {
  9. ExecutorService executorService = Executors.newFixedThreadPool(2);
  10. String result = executorService.invokeAny(Arrays.asList(
  11. () -> {
  12. System.out.println("begin");
  13. Thread.sleep(1000);
  14. System.out.println("end");
  15. return "1";
  16. },
  17. ()->{
  18. System.out.println("begin");
  19. Thread.sleep(500);
  20. return "2";
  21. }
  22. ,()->{
  23. System.out.println("begin");
  24. Thread.sleep(2000);
  25. System.out.println("end");
  26. return "3";
  27. }
  28. ));
  29. System.out.println(result);
  30. }
  31. }

7) 关闭线程池

shutdown

  1. /*
  2. 线程状态变为 SHUTDOWN
  3. - 不会接收新任务
  4. - 但已提交任务会执行完毕,阻塞队列中任务也算已提交任务
  5. - 此方法不会阻塞调用线程的执行
  6. */
  7. void shutdown();

shutdown源码:

  1. public void shutdown(){
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. checkShutdownAccess();
  6. //修改线程池状态
  7. advanceRunState(SHUTDOWN);
  8. //仅会打断空闲线程,没有执行任务的线程结束掉
  9. interruptIdleWorkers();
  10. onShutdown();
  11. }finally {
  12. mainLock.unlock();
  13. }
  14. //尝试终结(没有运行的线程可以立刻终结)
  15. tryTerminate();
  16. }

shutdownNow

  1. /*
  2. 线程状态变为 STOP
  3. - 不会接收新任务
  4. - 会将线程池中所有线程打断(包括空闲线程以及正在执行任务的线程),并将队列中的任务返回.
  5. -
  6. */
  7. List<Runnable> shutdownNow();
  1. import java.util.concurrent.Callable;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. public class TestShutdownNow {
  5. public static void main(String[] args) {
  6. ExecutorService executorService = Executors.newFixedThreadPool(10);
  7. executorService.submit(new Callable<String>() {
  8. @Override
  9. public String call() throws Exception {
  10. for(int i=0;i<10000000;i++){
  11. int j=i;
  12. j++;
  13. }
  14. return "string";
  15. }
  16. });
  17. executorService.shutdown();
  18. }
  19. }

经测试,shutdownNow的打断正在执行的线程并不是调用interrupt方法,所以上例中即使线程没有进入【WAITING】状态,也会被强制终止。
总结:shutdownNow会强制打断所有线程池中的线程,并将任务队列中的任务返回。

其它方法

  1. // 只要不在RUNNING状态的线程池,此方法就返回true
  2. boolean isShutdown();
  3. // 线程池状态是否是 TERMINATED,终止代表线程池已经终止
  4. boolean isTerminated();
  5. // 调用 shutdown后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池TERMINATED后做些事情,可以利用此方法等待。
  6. boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

shutdown方法只是将线程池状态标记为SHUTDOWN状态,不再接收新任务,但旧任务会继续执行。通常shutdown方法的线程不会等待线程池进入终止状态,如果等待线程池一段时间,可以调用awaitTermination方法判断,注意方法传入等待时长、返回布尔值。

awaitTermination方法一般与shutdown方法配合使用,用来判断shutdown调用后的固定时间内,线程池是否关闭成功:

  1. // 普通任务处理类
  2. class Task implements Runnable {
  3. @Override
  4. public void run() {
  5. System.out.println("普通任务");
  6. }
  7. }
  8. // 长时间任务处理类
  9. class LongTask implements Runnable {
  10. @Override
  11. public void run() {
  12. System.out.println("长时间任务");
  13. try {
  14. TimeUnit.SECONDS.sleep(5);
  15. } catch (InterruptedException e) {
  16. }
  17. }
  18. }
  19. public class TestAwaitTermination {
  20. public static void main(String[] args) throws InterruptedException {
  21. ExecutorService service = Executors.newFixedThreadPool(4);
  22. service.execute(new Task());
  23. service.execute(new Task());
  24. service.execute(new LongTask());
  25. service.execute(new Task());
  26. service.shutdown();
  27. while (!service.awaitTermination(1, TimeUnit.SECONDS)) {
  28. System.out.println("线程池没有关闭");
  29. }
  30. System.out.println("线程池已经关闭");
  31. }
  32. }