在web开发中,服务器需要接受并处理请求,所以会为一个请求来分配一个线程来进行处理。如果每次请求都新创建一个线程的话实现起来非常简便,但是存在一个问题: 如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁线程,(线程资源由操作系统维护,需要用户态切换到内核态)如此一来会大大降低系统的效率。线程池为线程生命周期的开销和资源不足问题提供了解决方案。

线程池参数详解

corePoolSize

线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当 前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到 阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会 提前创建并启动所有核心线程。

maximumPoolSize

线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线 程执行任务,前提是当前线程数小于maximumPoolSize

keepAliveTime

线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时 候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待 的时间超过了keepAliveTime,若手动配置了allowCoreThreadTimeOut = true 时,核心线程超过最大空闲时间也能被清除。

unit

keepAliveTime的时间单位

workQueue

阻塞队列,用来存放未来得及处理的任务,常用阻塞队列如下
1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞 吐量通常要高于ArrayBlockingQuene;
3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
4、priorityBlockingQuene:具有优先级的无界阻塞队列;

threadFactory

它是ThreadFactory类型的变量,用来创建新线程。默认使用 Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程 时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设 置了线程的名称。

handler

线程池的拒绝策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必 须采取一种策略处理该任务,线程池提供了4种策略:
AbortPolicy:直接抛出RejectedExecutionException异常
DiscardPolicy:丢弃任务,但是不抛出异常
DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务
CallerRunsPolicy:由当前提交任务的线程处理

线程池状态

ctl

ctl是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这 里可以看到,使用了Integer类型来保存,高3位保存runState低29位保存 workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常 量表示workerCount的上限值

  1. //初始化线程池状态为RUNNING,初始化work数量为0
  2. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  3. private static final int COUNT_BITS = Integer.SIZE - 3; // 29
  4. private static final int CAPACITY = (1 << COUNT_BITS) - 1; //1左移29位减1
  1. //获取运行状态
  2. private static int runStateOf(int c) { return c & ~CAPACITY; }
  3. //获取活动线程数
  4. private static int workerCountOf(int c) { return c & CAPACITY; }
  5. //获取运行状态和活动线程数
  6. private static int ctlOf(int rs, int wc) { return rs | wc; }

微信截图_20210701143208.png

线程池5种状态

  1. private static final int RUNNING = -1 << COUNT_BITS; //高3位为111
  2. private static final int SHUTDOWN = 0 << COUNT_BITS; //高3位为000
  3. private static final int STOP = 1 << COUNT_BITS; //高3位为001
  4. private static final int TIDYING = 2 << COUNT_BITS; //高3位为010
  5. private static final int TERMINATED = 3 << COUNT_BITS; //高3位为011

RUNNING
线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0。

SHUTDOWN
线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。调用线程池的shutdown()接口时,线程池由RUNNING 转换为 SHUTDOWN。

STOP
线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。调用线程池的shutdownNow()接口时,线程池由 RUNNING 或者 SHUTDOWN 转换为 STOP

TIDYING
当所有的任务已终止,工作线程数线程数为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()(terminated()在 ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理可以通过重载terminated()函数来实现)当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也 为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。

TERMINATED
线程池彻底终止,就变成TERMINATED状态,线程池处在TIDYING状态时,执行完terminated()方法之后,就会由 TIDYING > TERMINATED。
微信截图_20210701153354.png

线程池任务执行过程

execute()

  1. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
    2. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
    3. 如 果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
    4. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
    1. public void execute(Runnable command) {
    2. if (command == null)
    3. throw new NullPointerException();
    4. int c = ctl.get();
    5. //如果当线程池数量小于corePoolSize
    6. if (workerCountOf(c) < corePoolSize) {
    7. //创建worker
    8. if (addWorker(command, true))
    9. return;
    10. c = ctl.get();
    11. }
    12. //如果当线程池数量大于corePoolSize,将任务放入到阻塞队列
    13. if (isRunning(c) && workQueue.offer(command)) {
    14. int recheck = ctl.get();
    15. //复查当前线程池是否是RUNNING状态
    16. if (! isRunning(recheck) && remove(command))
    17. reject(command);
    18. else if (workerCountOf(recheck) == 0)
    19. addWorker(null, false);
    20. }
    21. //如果阻塞队列放满,则创建非核心线程
    22. else if (!addWorker(command, false))
    23. //如果创建失败(超过最大线程数),执行拒绝策略
    24. reject(command);
    25. }
    微信截图_20210701160358.png

addWorker()

Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属性:firstTask用它来保存传入的任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。在调用构造方法时,需要把任务传入,这里通过 getThreadFactory().newThread(this) 来新建一个线程,newThread方法传入的参数是 this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在 启动的时候会调用Worker类中的run方法

  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. retry:
  3. //重新对当前线程池的状态判断
  4. for (;;) {
  5. int c = ctl.get();
  6. int rs = runStateOf(c);
  7. if (rs >= SHUTDOWN &&
  8. ! (rs == SHUTDOWN &&
  9. firstTask == null &&
  10. ! workQueue.isEmpty()))
  11. return false;
  12. for (;;) {
  13. int wc = workerCountOf(c);
  14. if (wc >= CAPACITY ||
  15. wc >= (core ? corePoolSize : maximumPoolSize))
  16. return false;
  17. if (compareAndIncrementWorkerCount(c))
  18. break retry;
  19. c = ctl.get(); // Re-read ctl
  20. if (runStateOf(c) != rs)
  21. continue retry;
  22. }
  23. }
  24. boolean workerStarted = false;
  25. boolean workerAdded = false;
  26. Worker w = null;
  27. try {
  28. //创建worker对象,并把任务传进去
  29. w = new Worker(firstTask);
  30. final Thread t = w.thread;
  31. if (t != null) {
  32. final ReentrantLock mainLock = this.mainLock;
  33. mainLock.lock();
  34. try {
  35. int rs = runStateOf(ctl.get());
  36. //重新对线程池状态进行判断
  37. if (rs < SHUTDOWN ||
  38. (rs == SHUTDOWN && firstTask == null)) {
  39. if (t.isAlive()) // precheck that t is startable
  40. throw new IllegalThreadStateException();
  41. //将worker放入workers集合中
  42. workers.add(w);
  43. int s = workers.size();
  44. if (s > largestPoolSize)
  45. largestPoolSize = s;
  46. workerAdded = true;
  47. }
  48. } finally {
  49. mainLock.unlock();
  50. }
  51. if (workerAdded) {
  52. //开启线程,调用worker的run()方法
  53. t.start();
  54. workerStarted = true;
  55. }
  56. }
  57. } finally {
  58. if (! workerStarted)
  59. addWorkerFailed(w);
  60. }
  61. return workerStarted;
  62. }
  1. Worker(Runnable firstTask) {
  2. setState(-1); // inhibit interrupts until runWorker
  3. this.firstTask = firstTask;
  4. //创建线程,并把当前worker对象传进去
  5. this.thread = getThreadFactory().newThread(this);
  6. }

runWorker()

执行完第一次任务后,getTask()从阻塞队列中取任务,拿到任务后开始执行。如果task为null则跳出循环,执行processWorkerExit()方法,runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。(completedAbruptly变量来表示在执行任务过程中是否出现了异常 , 在 processWorkerExit方法中会对该变量的值进行判断)

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. Runnable task = w.firstTask;
  4. //将firstTask止为null,下次执行任务会从阻塞队列里面拿
  5. w.firstTask = null;
  6. w.unlock(); // allow interrupts
  7. boolean completedAbruptly = true;
  8. try {
  9. //进入while循环,当执行完第一次任务后不断从阻塞队列取任务
  10. while (task != null || (task = getTask()) != null) {
  11. w.lock();
  12. //重新对线程池状态进行判断
  13. if ((runStateAtLeast(ctl.get(), STOP) ||
  14. (Thread.interrupted() &&
  15. runStateAtLeast(ctl.get(), STOP))) &&
  16. !wt.isInterrupted())
  17. wt.interrupt();
  18. try {
  19. beforeExecute(wt, task);
  20. Throwable thrown = null;
  21. try {
  22. //拿到任务后开始执行
  23. task.run();
  24. } catch (RuntimeException x) {
  25. thrown = x; throw x;
  26. } catch (Error x) {
  27. thrown = x; throw x;
  28. } catch (Throwable x) {
  29. thrown = x; throw new Error(x);
  30. } finally {
  31. afterExecute(task, thrown);
  32. }
  33. } finally {
  34. //task置为null,下次从阻塞队列中取任务
  35. task = null;
  36. w.completedTasks++;
  37. w.unlock();
  38. }
  39. }
  40. //修改状态,如果任务执行中出现异常没有修改到该状态,在processWorkerExit()方法中会根据该状态创建新的worker
  41. completedAbruptly = false;
  42. } finally {
  43. //执行线程的销毁
  44. processWorkerExit(w, completedAbruptly);
  45. }
  46. }

getTask()

从阻塞队列中取任务

  1. 当允许销毁核心线程 allowCoreThreadTimeOut = true(需手动配置)或当前线程数大于核心线程数,调用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)取阻塞队列取任务,在keepAliveTime时间后还没任务,直接返回null,runWorker()方法跳出循环,线程执行完毕,等待销毁
  2. 否则调用 workQueue.take(),若取不到任务则阻塞,直到有新的任务到来

    1. private Runnable getTask() {
    2. boolean timedOut = false; // Did the last poll() time out?
    3. //重新对线程池状态进行判断
    4. for (;;) {
    5. int c = ctl.get();
    6. int rs = runStateOf(c);
    7. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    8. decrementWorkerCount();
    9. return null;
    10. }
    11. int wc = workerCountOf(c);
    12. //状态设置,当在阻塞队列中拿不到任务是否需要阻塞
    13. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    14. //当前线程大于corePoolSize小于maximumPoolSize,且阻塞队列为空,销毁超出核心线程数的线程
    15. if ((wc > maximumPoolSize || (timed && timedOut))
    16. && (wc > 1 || workQueue.isEmpty())) {
    17. if (compareAndDecrementWorkerCount(c))
    18. return null;
    19. continue;
    20. }
    21. try {
    22. //根据上述timed采取不同的方式去阻塞队列中取任务
    23. Runnable r = timed ?
    24. //如果在阻塞队列中等待keepAliveTime后仍取不到任务,直接返回null
    25. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    26. //如果在阻塞队列中取不到任务,则阻塞等待直到新的任务到来
    27. workQueue.take();
    28. if (r != null)
    29. return r;
    30. //没有拿到任务timedOut设为true
    31. timedOut = true;
    32. } catch (InterruptedException retry) {
    33. timedOut = false;
    34. }
    35. }
    36. }

processWorkerExit()

线程开始销毁,其中会对方法执行中是否出现异常进行判断

  1. completedAbruptly = true,线程执行任务中出现异常,重新创建worker
  2. completedAbruptly = false,任务正常执行,走逻辑判断

    1. private void processWorkerExit(Worker w, boolean completedAbruptly) {
    2. //如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1
    3. if (completedAbruptly)
    4. decrementWorkerCount();
    5. final ReentrantLock mainLock = this.mainLock;
    6. mainLock.lock();
    7. try {
    8. completedTaskCount += w.completedTasks;
    9. // 从workers中移除,也就表示着从线程池中移除了一个工作线程
    10. workers.remove(w);
    11. } finally {
    12. mainLock.unlock();
    13. }
    14. // 根据线程池状态进行判断是否结束线程池
    15. tryTerminate();
    16. int c = ctl.get();
    17. //当线程池是RUNNING或SHUTDOWN状态时
    18. if (runStateLessThan(c, STOP)) {
    19. //completedAbruptly=false,运行中没出现异常会进入下面判断
    20. //completedAbruptly=true,直接addWorker
    21. if (!completedAbruptly) {
    22. int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
    23. //如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker,即min=1
    24. if (min == 0 && ! workQueue.isEmpty())
    25. min = 1;
    26. //若当前线程数>= min,则直接return,不需要创建worker
    27. if (workerCountOf(c) >= min)
    28. return; // replacement not needed
    29. }
    30. //直接创建worker
    31. addWorker(null, false);
    32. }
    33. }

    微信截图_20210701212121.png
    至此,processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程, runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入 processWorkerExit方法。