图解线程池的工作原理

image.png

线程池的重要类

Executor接口

代表线程池的接口,有一个execute()方法,入参是Runnable类型对象。
通过分配一个线程进行处理执行。

ExecutorService接口

Executor的子接口,相当于是一个线程池的接口,有销毁线程池等方法。

Executors辅助类

线程池的辅助工具类,辅助入口类,可以通过Executors快速创建需要的线程池,比如Executors._newSingleThreadExecutor()_Executors.newFixedThreadPool()Executors.newCachedThreadPool()Executors.newScheduledThreadPool()

ThreadPoolExecutor类

ExecutorService接口的实现类,真正代表一个线程池的类,一般在Executors里创建一个线程池时,内部都是直接创建一个ThreadPoolExecutor的实例对象进行返回。

  • corePoolSize:线程池中的核心线程数。
  • maximumPoolSize:线程池里允许创建的最大线程数
  • keepAliveTime:如果线程数量大于corePoolSize时,多出来的线程在空闲时间内会等待指定的keepAliveTime时间后自动释放掉。
  • unit:keepAliveTime的时间单位。
  • workQueue:通过ThreadPoolExecutor.execute()方法扔进来的Runnable工作任务,如果已经到达了corePoolSize个数量都在处理任务时,多余的工作任务就会进入到workQueue进行排队。
  • threadFactory:如果需要创建新的线程放入线程池时,就通过这个线程工厂来创建线程。
  • handler:如果workQueue具有固定的大小,往队列里扔的任务数量超过队列大小,且已经有maximumPoolSize个线程负责处理任务,则新的任务就会使用handler的拒绝策略进行拒绝处理。
    1. public ThreadPoolExecutor(int corePoolSize,
    2. int maximumPoolSize,
    3. long keepAliveTime,
    4. TimeUnit unit,
    5. BlockingQueue<Runnable> workQueue,
    6. ThreadFactory threadFactory,
    7. RejectedExecutionHandler handler) {
    8. if (corePoolSize < 0 ||
    9. maximumPoolSize <= 0 ||
    10. maximumPoolSize < corePoolSize ||
    11. keepAliveTime < 0)
    12. throw new IllegalArgumentException();
    13. if (workQueue == null || threadFactory == null || handler == null)
    14. throw new NullPointerException();
    15. this.corePoolSize = corePoolSize;
    16. this.maximumPoolSize = maximumPoolSize;
    17. this.workQueue = workQueue;
    18. this.keepAliveTime = unit.toNanos(keepAliveTime);
    19. this.threadFactory = threadFactory;
    20. this.handler = handler;
    21. }

    线程池内的任务如何被提交

    任务对应的就是Runnable对象,通过执行ThreadPoolExecutor.execute()方法就可以处理工作任务。 ```java /*
  • 分3步执行
    1. 如果工作线程的数量小于核心线程的数量,那新的工作任务请求过来时,
  • 就会去创建一个新的线程去处理任务,直到创建的线程数量已经达到了核心线程数量。
  • 具体的创建形式就是 addWorker *
    1. 在一个任务成功进入工作队列进行排队,需要检查是否能够添加一个线程来处理任务
  • 否则,该任务就需要在工作队列中进行等待有空闲的线程来处理。

    1. 如果新的任务无法进入工作队列进行排队,此时就会创建新的线程处理新任务,
  • 创建的线程数量不会大于最大线程数量maximumPoolSize。
  • 如果创建线程失败,就会通过handler策略拒绝任务。 */

    1. ```java
    2. // addWorker: 添加新的线程去处理任务
    3. // workQueue.offer(command): 将任务放入到工作队列当中
    4. // isRunning(): 线程池处理运行状态,新的任务就只能放入到工作队列当中,等待线程空闲后进行处理
    5. public void execute(Runnable command) {
    6. // 如果任务为空,直接抛出异常
    7. if (command == null)
    8. throw new NullPointerException();
    9. // 原子变量ctl共同存储 线程状态+线程数量。int类型存储,高3位表示线程状态,后29位表示线程数量
    10. int c = ctl.get();
    11. // 判断当前的工作线程数量 是否小于核心线程数量,如果小于,则添加新的线程去执行任务
    12. if (workerCountOf(c) < corePoolSize) {
    13. // true: 代表的是可以创建最大数量为corePoolSize内的线程
    14. if (addWorker(command, true))
    15. return;
    16. c = ctl.get();
    17. }
    18. // 如果线程池处于RUNNING状态,就将任务放入到阻塞队列中
    19. if (isRunning(c) && workQueue.offer(command)) {
    20. // 重新获取ctl的值,因为把任务添加到队列时,线程的状态可能已经改变,所以需要重新获取
    21. int recheck = ctl.get();
    22. // 线程状态不是RUNNING状态,则从队列中删除
    23. if (! isRunning(recheck) && remove(command))
    24. reject(command);
    25. // 如果当前线程池一个空闲线程都没有,且队列已满,则添加一个非核心线程(对应maximumPoolSize)
    26. else if (workerCountOf(recheck) == 0)
    27. // false: 代表的是可以创建最大数量为maximumPoolSize内的线程
    28. addWorker(null, false);
    29. }
    30. // 如果队列满了,则添加非核心线程,如果添加失败,则执行拒绝策略
    31. else if (!addWorker(command, false))
    32. reject(command);
    33. }

    任务是如何被线程池消费的

    基于Worker类,对应的就是工作线程,在这个类中可以知道工作线程是如何消费任务的。
    Worker类实现了Runnable接口,当线程启动时,就会去执行run()方法。通过run()方法去消费一个任务;
    runWorker()方法,内部会task.run()去消费任务;通过getTask()方法去任务队列当中获取任务,workQueue.take()
    Runnable task = w.firstTask, 工作线程负责处理的第一个任务task。 ```java private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

  1. public void run() {
  2. runWorker(this);
  3. }
  4. final void runWorker(Worker w) {
  5. Thread wt = Thread.currentThread();
  6. Runnable task = w.firstTask;
  7. w.firstTask = null;
  8. w.unlock(); // allow interrupts
  9. boolean completedAbruptly = true;
  10. try {
  11. while (task != null || (task = getTask()) != null) {
  12. w.lock();
  13. if ((runStateAtLeast(ctl.get(), STOP) ||
  14. (Thread.interrupted() &&
  15. runStateAtLeast(ctl.get(), STOP))) &&
  16. !wt.isInterrupted())
  17. wt.interrupt();
  18. try {
  19. beforeExecute(wt, task);
  20. Throwable thrown = null;
  21. try {
  22. task.run();
  23. } catch (RuntimeException x) {
  24. thrown = x; throw x;
  25. } catch (Error x) {
  26. thrown = x; throw x;
  27. } catch (Throwable x) {
  28. thrown = x; throw new Error(x);
  29. } finally {
  30. afterExecute(task, thrown);
  31. }
  32. } finally {
  33. task = null;
  34. w.completedTasks++;
  35. w.unlock();
  36. }
  37. }
  38. completedAbruptly = false;
  39. } finally {
  40. processWorkerExit(w, completedAbruptly);
  41. }
  42. }
  43. private Runnable getTask() {
  44. boolean timedOut = false; // Did the last poll() time out?
  45. for (;;) {
  46. int c = ctl.get();
  47. int rs = runStateOf(c);
  48. // Check if queue empty only if necessary.
  49. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  50. decrementWorkerCount();
  51. return null;
  52. }
  53. int wc = workerCountOf(c);
  54. // Are workers subject to culling?
  55. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  56. if ((wc > maximumPoolSize || (timed && timedOut))
  57. && (wc > 1 || workQueue.isEmpty())) {
  58. if (compareAndDecrementWorkerCount(c))
  59. return null;
  60. continue;
  61. }
  62. try {
  63. // 获取任务队列中的任务
  64. Runnable r = timed ?
  65. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  66. workQueue.take();
  67. if (r != null)
  68. return r;
  69. timedOut = true;
  70. } catch (InterruptedException retry) {
  71. timedOut = false;
  72. }
  73. }
  74. }

}

```