Java 线程池

原理概述

Java线程池的实现原理 - 图1
其实java线程池的实现原理很简单,说白了就是一个线程集合workerSet和一个阻塞队列workQueue。当用户向线程池提交一个任务(也就是线程)时,线程池会先将任务放入workQueue中。
workerSet中的线程会不断的从workQueue中获取线程然后执行。当workQueue中没有任务的时候,worker就会阻塞,直到队列中有任务了就取出来继续执行。

线程池的几个主要参数的作用

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler)
  • corePoolSize: 规定线程池有几个线程(worker)在运行。
  • maximumPoolSize: 当workQueue满了,不能添加任务的时候,这个参数才会生效。规定线程池最多只能有多少个线程(worker)在执行。
  • keepAliveTime: 超出corePoolSize大小的那些线程的生存时间,这些线程如果长时间没有执行任务并且超过了keepAliveTime设定的时间,就会消亡。
  • unit: 生存时间对于的单位
  • workQueue: 存放任务的队列
  • threadFactory: 创建线程的工厂
  • handler: 当workQueue已经满了,并且线程池线程数已经达到maximumPoolSize,将执行拒绝策略。

    任务提交后的流程分析

    用户通过submit提交一个任务。线程池会执行如下流程:

  • 判断当前运行的worker数量是否超过corePoolSize,如果不超过corePoolSize。就创建一个worker直接执行该任务。—— 线程池最开始是没有worker在运行的

  • 如果正在运行的worker数量超过或者等于corePoolSize,那么就将该任务加入到workQueue队列中去。
  • 如果workQueue队列满了,也就是offer方法返回false的话,就检查当前运行的worker数量是否小于maximumPoolSize,如果小于就创建一个worker直接执行该任务。
  • 如果当前运行的worker数量是否大于等于maximumPoolSize,那么就执行RejectedExecutionHandler来拒绝这个任务的提交。

    源码解析

    先来看一下ThreadPoolExecutor中的几个关键属性。

    1. //这个属性是用来存放 当前运行的worker数量以及线程池状态的
    2. //int是32位的,这里把int的高3位拿来充当线程池状态的标志位,后29位拿来充当当前运行worker的数量
    3. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    4. //存放任务的阻塞队列
    5. private final BlockingQueue<Runnable> workQueue;
    6. //worker的集合,用set来存放
    7. private final HashSet<Worker> workers = new HashSet<Worker>();
    8. //历史达到的worker数最大值
    9. private int largestPoolSize;
    10. //当队列满了并且worker的数量达到maxSize的时候,执行具体的拒绝策略
    11. private volatile RejectedExecutionHandler handler;
    12. //超出coreSize的worker的生存时间
    13. private volatile long keepAliveTime;
    14. //常驻worker的数量
    15. private volatile int corePoolSize;
    16. //最大worker的数量,一般当workQueue满了才会用到这个参数
    17. private volatile int maximumPoolSize;

    1. 提交任务相关源码

    下面是execute方法的源码

    1. public void execute(Runnable command) {
    2. if (command == null)
    3. throw new NullPointerException();
    4. int c = ctl.get();
    5. //workerCountOf(c)会获取当前正在运行的worker数量
    6. if (workerCountOf(c) < corePoolSize) {
    7. //如果workerCount小于corePoolSize,就创建一个worker然后直接执行该任务
    8. if (addWorker(command, true))
    9. return;
    10. c = ctl.get();
    11. }
    12. //isRunning(c)是判断线程池是否在运行中,如果线程池被关闭了就不会再接受任务
    13. //后面将任务加入到队列中
    14. if (isRunning(c) && workQueue.offer(command)) {
    15. //如果添加到队列成功了,会再检查一次线程池的状态
    16. int recheck = ctl.get();
    17. //如果线程池关闭了,就将刚才添加的任务从队列中移除
    18. if (! isRunning(recheck) && remove(command))
    19. //执行拒绝策略
    20. reject(command);
    21. else if (workerCountOf(recheck) == 0)
    22. addWorker(null, false);
    23. }
    24. //如果加入队列失败,就尝试直接创建worker来执行任务
    25. else if (!addWorker(command, false))
    26. //如果创建worker失败,就执行拒绝策略
    27. reject(command);
    28. }

    添加worker的方法addWorker源码

    1. private boolean addWorker(Runnable firstTask, boolean core) {
    2. retry:
    3. //使用自旋+cas失败重试来保证线程竞争问题
    4. for (;;) {
    5. //先获取线程池的状态
    6. int c = ctl.get();
    7. int rs = runStateOf(c);
    8. // 如果线程池是关闭的,或者workQueue队列非空,就直接返回false,不做任何处理
    9. if (rs >= SHUTDOWN &&
    10. ! (rs == SHUTDOWN &&
    11. firstTask == null &&
    12. ! workQueue.isEmpty()))
    13. return false;
    14. for (;;) {
    15. int wc = workerCountOf(c);
    16. //根据入参core 来判断可以创建的worker数量是否达到上限,如果达到上限了就拒绝创建worker
    17. if (wc >= CAPACITY ||
    18. wc >= (core ? corePoolSize : maximumPoolSize))
    19. return false;
    20. //没有的话就尝试修改ctl添加workerCount的值。这里用了cas操作,如果失败了下一个循环会继续重试,直到设置成功
    21. if (compareAndIncrementWorkerCount(c))
    22. //如果设置成功了就跳出外层的那个for循环
    23. break retry;
    24. //重读一次ctl,判断如果线程池的状态改变了,会再重新循环一次
    25. c = ctl.get(); // Re-read ctl
    26. if (runStateOf(c) != rs)
    27. continue retry;
    28. }
    29. }
    30. boolean workerStarted = false;
    31. boolean workerAdded = false;
    32. Worker w = null;
    33. try {
    34. final ReentrantLock mainLock = this.mainLock;
    35. //创建一个worker,将提交上来的任务直接交给worker
    36. w = new Worker(firstTask);
    37. final Thread t = w.thread;
    38. if (t != null) {
    39. //加锁,防止竞争
    40. mainLock.lock();
    41. try {
    42. int c = ctl.get();
    43. int rs = runStateOf(c);
    44. //还是判断线程池的状态
    45. if (rs < SHUTDOWN ||
    46. (rs == SHUTDOWN && firstTask == null)) {
    47. //如果worker的线程已经启动了,会抛出异常
    48. if (t.isAlive())
    49. throw new IllegalThreadStateException();
    50. //添加新建的worker到线程池中
    51. workers.add(w);
    52. int s = workers.size();
    53. //更新历史worker数量的最大值
    54. if (s > largestPoolSize)
    55. largestPoolSize = s;
    56. //设置新增标志位
    57. workerAdded = true;
    58. }
    59. } finally {
    60. mainLock.unlock();
    61. }
    62. //如果worker是新增的,就启动该线程
    63. if (workerAdded) {
    64. t.start();
    65. //成功启动了线程,设置对应的标志位
    66. workerStarted = true;
    67. }
    68. }
    69. } finally {
    70. //如果启动失败了,会触发执行相应的方法
    71. if (! workerStarted)
    72. addWorkerFailed(w);
    73. }
    74. return workerStarted;
    75. }

    2. Worker的结构

    WorkerThreadPoolExecutor内部定义的一个内部类。先看一下Worker的继承关系

    1. private final class Worker extends AbstractQueuedSynchronizer implements Runnable

    它实现了Runnable接口,所以可以拿来当线程用。同时它还继承了AbstractQueuedSynchronizer同步器类,主要用来实现一个不可重入的锁。
    一些属性还有构造方法:

    1. //运行的线程,前面addWorker方法中就是直接通过启动这个线程来启动这个worker
    2. final Thread thread;
    3. //当一个worker刚创建的时候,就先尝试执行这个任务
    4. Runnable firstTask;
    5. //记录完成任务的数量
    6. volatile long completedTasks;
    7. Worker(Runnable firstTask) {
    8. setState(-1); // inhibit interrupts until runWorker
    9. this.firstTask = firstTask;
    10. //创建一个Thread,将自己设置给他,后面这个thread启动的时候,也就是执行worker的run方法
    11. this.thread = getThreadFactory().newThread(this);
    12. }

    workerrun方法

    1. public void run() {
    2. //这里调用了ThreadPoolExecutor的runWorker方法
    3. runWorker(this);
    4. }

    ThreadPoolExecutorrunWorker方法

    1. final void runWorker(Worker w) {
    2. //获取当前线程
    3. Thread wt = Thread.currentThread();
    4. Runnable task = w.firstTask;
    5. w.firstTask = null;
    6. //执行unlock方法,允许其他线程来中断自己
    7. w.unlock(); // allow interrupts
    8. boolean completedAbruptly = true;
    9. try {
    10. //如果前面的firstTask有值,就直接执行这个任务
    11. //如果没有具体的任务,就执行getTask()方法从队列中获取任务
    12. //这里会不断执行循环体,除非线程中断或者getTask()返回null才会跳出这个循环
    13. while (task != null || (task = getTask()) != null) {
    14. //执行任务前先锁住,这里主要的作用就是给shutdown方法判断worker是否在执行中的
    15. //shutdown方法里面会尝试给这个线程加锁,如果这个线程在执行,就不会中断它
    16. w.lock();
    17. //判断线程池状态,如果线程池被强制关闭了,就马上退出
    18. if ((runStateAtLeast(ctl.get(), STOP) ||
    19. (Thread.interrupted() &&
    20. runStateAtLeast(ctl.get(), STOP))) &&
    21. !wt.isInterrupted())
    22. wt.interrupt();
    23. try {
    24. //执行任务前调用。预留的方法,可扩展
    25. beforeExecute(wt, task);
    26. Throwable thrown = null;
    27. try {
    28. //真正的执行任务
    29. task.run();
    30. } catch (RuntimeException x) {
    31. thrown = x; throw x;
    32. } catch (Error x) {
    33. thrown = x; throw x;
    34. } catch (Throwable x) {
    35. thrown = x; throw new Error(x);
    36. } finally {
    37. //执行任务后调用。预留的方法,可扩展
    38. afterExecute(task, thrown);
    39. }
    40. } finally {
    41. task = null;
    42. //记录完成的任务数量
    43. w.completedTasks++;
    44. w.unlock();
    45. }
    46. }
    47. completedAbruptly = false;
    48. } finally {
    49. processWorkerExit(w, completedAbruptly);
    50. }
    51. }

    下面来看一下getTask()方法,这里面涉及到keepAliveTime的使用,从这个方法可以看出线程池是怎么让超过corePoolSize的那部分worker销毁的。

    1. private Runnable getTask() {
    2. boolean timedOut = false;
    3. for (;;) {
    4. int c = ctl.get();
    5. int rs = runStateOf(c);
    6. // 如果线程池已经关闭了,就直接返回null,
    7. //如果这里返回null,调用的那个worker就会跳出while循环,然后执行完销毁线程
    8. //SHUTDOWN状态表示执行了shutdown()方法
    9. //STOP表示执行了shutdownNow()方法
    10. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    11. decrementWorkerCount();
    12. return null;
    13. }
    14. //获取当前正在运行中的worker数量
    15. int wc = workerCountOf(c);
    16. // 如果设置了核心worker也会超时或者当前正在运行的worker数量超过了corePoolSize,就要根据时间判断是否要销毁线程了
    17. //其实就是从队列获取任务的时候要不要设置超时间时间,如果超过这个时间队列还没有任务进来,就会返回null
    18. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    19. //如果上一次循环从队列获取到的未null,这时候timedOut就会为true了
    20. if ((wc > maximumPoolSize || (timed && timedOut))
    21. && (wc > 1 || workQueue.isEmpty())) {
    22. //通过cas来设置WorkerCount,如果多个线程竞争,只有一个可以设置成功
    23. //最后如果没设置成功,就进入下一次循环,说不定下一次worker的数量就没有超过corePoolSize了,也就不用销毁worker了
    24. if (compareAndDecrementWorkerCount(c))
    25. return null;
    26. continue;
    27. }
    28. try {
    29. //如果要设置超时时间,就设置一下咯
    30. //过了这个keepAliveTime时间还没有任务进队列就会返回null,那worker就会销毁
    31. Runnable r = timed ?
    32. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    33. workQueue.take();
    34. if (r != null)
    35. return r;
    36. //如果r为null,就设置timedOut为true
    37. timedOut = true;
    38. } catch (InterruptedException retry) {
    39. timedOut = false;
    40. }
    41. }
    42. }

    3. 添加Callable任务的实现源码

    1. public <T> Future<T> submit(Callable<T> task) {
    2. if (task == null) throw new NullPointerException();
    3. RunnableFuture<T> ftask = newTaskFor(task);
    4. execute(ftask);
    5. return ftask;
    6. }

    要添加一个有返回值的任务的实现也很简单。其实就是对任务做了一层封装,将其封装成Future,然后提交给线程池执行,最后返回这个future
    这里的 newTaskFor(task) 方法会将其封装成一个FutureTask类。
    外部的线程拿到这个future,执行get()方法的时候,如果任务本身没有执行完,执行线程就会被阻塞,直到任务执行完。
    下面是FutureTask的get方法

    1. public V get() throws InterruptedException, ExecutionException {
    2. int s = state;
    3. //判断状态,如果任务还没执行完,就进入休眠,等待唤醒
    4. if (s <= COMPLETING)
    5. s = awaitDone(false, 0L);
    6. //返回值
    7. return report(s);
    8. }

    FutureTask中通过一个state状态来判断任务是否完成。当run方法执行完后,会将state状态置为完成,同时唤醒所有正在等待的线程。可以看一下FutureTaskrun方法

    1. public void run() {
    2. //判断线程的状态
    3. if (state != NEW ||
    4. !UNSAFE.compareAndSwapObject(this, runnerOffset,
    5. null, Thread.currentThread()))
    6. return;
    7. try {
    8. Callable<V> c = callable;
    9. if (c != null && state == NEW) {
    10. V result;
    11. boolean ran;
    12. try {
    13. //执行call方法
    14. result = c.call();
    15. ran = true;
    16. } catch (Throwable ex) {
    17. result = null;
    18. ran = false;
    19. setException(ex);
    20. }
    21. if (ran)
    22. //这个方法里面会设置返回内容,并且唤醒所以等待中的线程
    23. set(result);
    24. }
    25. } finally {
    26. runner = null;
    27. int s = state;
    28. if (s >= INTERRUPTING)
    29. handlePossibleCancellationInterrupt(s);
    30. }
    31. }

    4. shutdownshutdownNow方法的实现

    shutdown方法会将线程池的状态设置为SHUTDOWN,线程池进入这个状态后,就拒绝再接受任务,然后会将剩余的任务全部执行完 ```java public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {

    1. //检查是否可以关闭线程
    2. checkShutdownAccess();
    3. //设置线程池状态
    4. advanceRunState(SHUTDOWN);
    5. //尝试中断worker
    6. interruptIdleWorkers();
    7. //预留方法,留给子类实现
    8. onShutdown(); // hook for ScheduledThreadPoolExecutor

    } finally {

    1. mainLock.unlock();

    } tryTerminate(); }

private void interruptIdleWorkers() { interruptIdleWorkers(false); }

private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //遍历所有的worker for (Worker w : workers) { Thread t = w.thread; //先尝试调用w.tryLock(),如果获取到锁,就说明worker是空闲的,就可以直接中断它 //注意的是,worker自己本身实现了AQS同步框架,然后实现的类似锁的功能 //它实现的锁是不可重入的,所以如果worker在执行任务的时候,会先进行加锁,这里tryLock()就会返回false if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }

  1. `shutdownNow`做的比较绝,它先将线程池状态设置为`STOP`,然后拒绝所有提交的任务。最后中断左右正在运行中的`worker`,然后清空任务队列。
  2. ```java
  3. public List<Runnable> shutdownNow() {
  4. List<Runnable> tasks;
  5. final ReentrantLock mainLock = this.mainLock;
  6. mainLock.lock();
  7. try {
  8. checkShutdownAccess();
  9. //检测权限
  10. advanceRunState(STOP);
  11. //中断所有的worker
  12. interruptWorkers();
  13. //清空任务队列
  14. tasks = drainQueue();
  15. } finally {
  16. mainLock.unlock();
  17. }
  18. tryTerminate();
  19. return tasks;
  20. }
  21. private void interruptWorkers() {
  22. final ReentrantLock mainLock = this.mainLock;
  23. mainLock.lock();
  24. try {
  25. //遍历所有worker,然后调用中断方法
  26. for (Worker w : workers)
  27. w.interruptIfStarted();
  28. } finally {
  29. mainLock.unlock();
  30. }
  31. }