线程池

先看继承体系

线程池 - 图1

顶层接口 Executor

  1. public interface Executor {
  2. //仅定义一个执行线程的方法
  3. void execute(Runnable command);
  4. }

二级接口ExecutorService

/**
 *  定义一些修改线程池状态的方法,以及执行线程的方法
 */
public interface ExecutorService extends Executor {

    // 关闭线程池
    void shutdown();
    // 立即关闭线程池
    List<Runnable> shutdownNow();
    //是否关闭了了
    boolean isShutdown();
    //是否terminate了
    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    // 有返回值的提交线程
    <T> Future<T> submit(Callable<T> task);
    // 重载有返回值的提交线程
    <T> Future<T> submit(Runnable task, T result);
    // 返回值为null的提交现场
    Future<?> submit(Runnable task);
    // 通过invoke形式执行所有线程 并有返回值,用的少
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    // 通过invoke形式执行带有超时的所有线程 并有返回值,用的少
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    // invoke执行线程 一个成功就行了,用的少
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    // invoke执行带有超时的线程 一个成功就行了,用的少
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

三级抽象类 AbstractExecutorService
重点关注一些方法~
    // 将runbale及返回值封装为FutureTask对象
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    // 进行重载
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
    //  submit提交的任务是具有返回值的,此返回值为null,封装成FutureTask,然后调用execute来真正执行
    //  RunnableFuture的子类是FutureTask
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    //   submit提交的任务是具有返回值的,此返回值为result,封装成FutureTask,然后调用execute来真正执行
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    // 重载,和上述方法一个意思
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    // 综上,可以看去执行线程都是通过execute,而返回值是你传啥就是啥

四个构造器

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
 // 最完全的线程池构造器,其余构造器均对此进行重载
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        /**
         * 如果核心线程数小于0
         * 如果最大线程数<=0
         * 如果最大线程数小于核心线程数,此可以理解为最大线程数必须 >= 核心线程数
         * 如果空闲存活时间 < 0
         * 进行抛异常
         */

        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        // 如果队列为空 或者 线程工厂为孔 或者 异常处理类为空,进行抛异常
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        // 权限相关的东西, 算了就, 不耽误
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

先引入一下位运算小知识

& 都为1则是1
| 都是0则是0
^ 异或 相同为0 不同为1
~ 取反 0变1 1变0
>> 右移 低位补0 高位丢弃
<< 左移 高位补0 低位丢弃

线程池状态

通过ctl来表示
    // ctl = 1110 0000  0000 0000 0000 0000 0000 0000
    // 高三位 表示 线程池的运行状态, 低29位表示线程池的线程数量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
     // COUNT_BITS = 29,ctl保存线程数量的 位
    private static final int COUNT_BITS = Integer.SIZE - 3;

    //(1 << COUNT_BITS)    =  0010 0000 0000 0000 0000 0000 0000 0000
    //(1 << COUNT_BITS) -1 =  0001 1111 1111 1111 1111 1111 1111 1111 -> 5亿多最大的线程数量
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    /**
     * 对于一下5个运行状态 RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED.
     */
    //111 补29个0  -> 高位1,是个负数,接受新的任务,处理等待队列中的任务
    private static final int RUNNING    = -1 << COUNT_BITS;
    //000 补29个0,不接受新的任务提交,但是会继续处理等待队列中的任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //001 补29个0,不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程
    private static final int STOP       =  1 << COUNT_BITS;
    //010 补29个0,所有的任务都销毁了,workCount 为 0。线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()
    private static final int TIDYING    =  2 << COUNT_BITS;
    //011 补29个0,terminated() 方法结束后,线程池的状态变为此
    private static final int TERMINATED =  3 << COUNT_BITS;

计算线程池状态

   /**
     *          c = ctl
     *           求线程池的状态
     *           CAPACITY  = 0001 1111 1111 1111 1111 1111 1111 1111
     *          ~CAPACITY  = 1110 0000 0000 0000 0000 0000 0000 0000
     *    假设           c  = 1110 0000 0000 0000 0000 0000 0000 1111
     *        & ~CAPACITY  = 1110 0000 0000 0000 0000 0000 0000 0000   ->  RUNNING
      */

    private static int runStateOf(int c)     { return c & ~CAPACITY; }

计算线程池线程数量

    /**
     *   计算线程池有多少线程
     *   假设 c:     1110 0000 0000 0000 0000 0000 0000 1111
     *  CAPACITY  = 0001 1111 1111 1111 1111 1111 1111 1111
     *              0000 0000 0000 0000 0000 0000 0000 1111   ->   15个
     */
    private static int workerCountOf(int c)  { return c & CAPACITY; }

初始化/计算 ctl 的值

    /**
     *   初始化 ctl 的值
     *    如:
     *     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
     *     RUNNING | 0
     *     RS -> RUNNING: 111 0 0000 0000 0000 0000 0000 0000 0000
     *     WC -> 0      : 000 0 0000 0000 0000 0000 0000 0000 0000
     *     结果为111 0 0000 0000 0000 0000 0000 0000 0000 ,是个复数
     */

    private static int ctlOf(int rs, int wc) { return rs | wc; }

围绕ctl计算的一些方法

 /**
     *          c = ctl
     *           求线程池的状态
     *           CAPACITY  = 0001 1111 1111 1111 1111 1111 1111 1111
     *          ~CAPACITY  = 1110 0000 0000 0000 0000 0000 0000 0000
     *    假设           c  = 1110 0000 0000 0000 0000 0000 0000 1111
     *        & ~CAPACITY  = 1110 0000 0000 0000 0000 0000 0000 0000   ->  RUNNING
      */

    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    /**
     *   计算线程池有多少线程
     *   假设 c:     1110 0000 0000 0000 0000 0000 0000 1111
     *  CAPACITY  = 0001 1111 1111 1111 1111 1111 1111 1111
     *              0000 0000 0000 0000 0000 0000 0000 1111   ->   15个
     */
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    /**
     *   初始化 ctl 的值
     *    如:
     *     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
     *     RUNNING | 0
     *     RS -> RUNNING: 111 0 0000 0000 0000 0000 0000 0000 0000
     *     WC -> 0      : 000 0 0000 0000 0000 0000 0000 0000 0000
     *     结果为111 0 0000 0000 0000 0000 0000 0000 0000 ,是个复数
     */

    private static int ctlOf(int rs, int wc) { return rs | wc; }
                                                           //线程池的运行状态是否小于某个状态值
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    //线程池的运行状态是否大于等于某个状态值
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    // 线程状态是否是运行中,需要小于SHUTDOWN
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }


    // CAS 将 ctl + 1
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }


    // CAS 将 ctl - 1
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }


    // 强制保证ctl -1 成功
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

一些其它属性或者方法

   // 用于存储 任务 的 任务队列,核心线程数在满了之后会提交至此队列
    private final BlockingQueue<Runnable> workQueue;    // 后续会通过offer()方法进行添加

    // 操作线程池需要获取锁
    private final ReentrantLock mainLock = new ReentrantLock();

    //线程池中的线程 真正存放的位置,将线程Thread封装为Worker对象
    private final HashSet<Worker> workers = new HashSet<Worker>();

    // 伴随 mainLock ,等待或唤醒锁的条件,其有await(),signal()等方法
    private final Condition termination = mainLock.newCondition();

    // 在获取此值的时刻,线程池中线程的最大值
    private int largestPoolSize;

    // 线程池中已经完成的任务数,完成的任务会累加上去
    private long completedTaskCount;

    // 线程工厂类
    private volatile ThreadFactory threadFactory;

    // 拒绝策略,可自己实现此接口,默认有4种实现
    private volatile RejectedExecutionHandler handler;

     /**
     *  空闲线程的存活时间,空闲线程包括核心线程 + 非核心线程
     *   allowCoreThreadTimeOut 为 false 时,核心线程不动
     *                          为 true 时,核心线程也会被回收
     */
    private volatile long keepAliveTime;

    //  核心线程是否会超时,配合 keepAliveTime
    private volatile boolean allowCoreThreadTimeOut;

    // 核心线程数 
    private volatile int corePoolSize;

    // 最大的线程数,5亿多
    private volatile int maximumPoolSize;

    // 默认的拒绝策略。AbortPolicy ->  直接拒绝抛异常,策略有4种随后进行分析
    private static final RejectedExecutionHandler defaultHandler =  new AbortPolicy();

queue的一些方法
add 添加一个元素 如果满了 抛异常
offer 添加一个元素 如果满了 返回false
element 返回头部节点的值,但不移除 为空,则抛异常
peek 返回头部节点的值,但不移除 为空,返回null
remove 返回头部节点的值,并移除 为空,抛异常
poll 返回头部节点的值,并移除 为空,返回null
put 添加一个元素 队列满了 则阻塞
take 返回头部节点的值,并移除 为空,则阻塞

线程池的拒绝策略(ThreadPoolExecutor内部默认实现了4种)

- 拒绝策略顶层接口
public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
  1. AbortPolicy 默认的拒绝策略直接抛异常
    ``` public static class AbortPolicy implements RejectedExecutionHandler {

    public AbortPolicy() { }
    
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

2.  CallerRunsPolicy  由线程池启动的线程进行执行

public static class CallerRunsPolicy implements RejectedExecutionHandler {

    public CallerRunsPolicy() { }


    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 如果线程池没有SHUTDOWN
        if (!e.isShutdown()) {
            //直接执行
            r.run();
        }
    }
}

3.  DiscardPolicy  直接丢弃啥也不干

public static class DiscardPolicy implements RejectedExecutionHandler {

    public DiscardPolicy() { }

       // 啥也不做
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

4.  DiscardOldestPolicy 直接丢弃最早的任务,慎用

public static class DiscardOldestPolicy implements RejectedExecutionHandler {

    public DiscardOldestPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 如果线程池状态不是SHUTDOWN
        if (!e.isShutdown()) {
            // 把头部的取出来不管
            e.getQueue().poll();
            // 用线程池去执行线程
            e.execute(r);
        }
    }
}
<a name="12b6b713"></a>
#### 线程执行的核心方法 execute

// 线程池执行方法,submit()也是调用此execute方法, public void execute(Runnable command) { if (command == null) throw new NullPointerException();

    // 先获取ctl线程池的状态,高3位表状态,低29表线程池数量
    int c = ctl.get();
    // 如果当前线程池的线程数量,小于核心线程数,则表示可以去新起核心线程
    if (workerCountOf(c) < corePoolSize) {
        // 将线程 封装 进Worker,以核心线程为边界
        if (addWorker(command, true))
            return;
        // 如果封装进worker失败,表示可能出现了并发,达到了核心线程数 或者 线程池状态改变了不允许放入线程
        //再次获取ctl线程池的状态
        c = ctl.get();
    }

    //逻辑至此,首先可以明确的是核心线程数满了 或者 addWoker 失败了
    // 线程是运行时状态 且 线程入队成功
    if (isRunning(c) && workQueue.offer(command)) {
        // 重新获取ctl的状态
        int recheck = ctl.get();
        // 如果线程状态被改变(如shutdown掉),不是运行状态了,就移除刚才入队的任务线程,此if逻辑是有可能false的
        if (! isRunning(recheck) && remove(command))
            // 直接走拒绝策略
            reject(command);
        //代码至此表示,1:线程池状态是running,2:线程池状态不是running,但是remove(command)失败
        //检查线程池中的线程数量是否为0 ,如果为0说明没线程存活去干活了,搞一个以非核心线程作为边界创建个worker
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 代码至此 ,表明线程池非running状态不能添加任务  或者 队列满了入队失败 或者核心线程数已满
    // 所以以非核心线程数作为边界去添加任务,如果也添加失败即表示maxiumPoolSize也已经满了,则抛出异常
    else if (!addWorker(command, false))
        reject(command);
}
<a name="3df54512"></a>
#### 线程执行的核心方法addWorker

//入队逻辑 private boolean addWorker(Runnable firstTask, boolean core) { retry: // 自旋 for (;;) { // 获取线程池状态 int c = ctl.get(); // 获取线程池的运行状态 int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        /**
         *   运行状态>= 停止
         *   运行状态是停止 && firstTask是空 && queue不是空 (此种情况允许创建worker)
         */
        //
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 获取线程池的线程数量
            int wc = workerCountOf(c);
            // 如果大于最大容量 或者 已经达到核心线程数 | 最大线程数,则不允许添加worker
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 此处为通过CAS 进行线程数量 + 1,成功则跳出循环,继续向下执行创建worker的逻辑
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // cas进行线程数量+1失败了,再次获取ctl的值
            c = ctl.get();  // Re-read ctl
            // 判断是否与刚进来时拿到的rs不同,不同则表示线程池状态已经被修改
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    // 至此,判断条件已经全部通过了,要新建Worker了
    boolean workerStarted = false;  // worker是否启动
    boolean workerAdded = false;    // worker是否新增成功
    Worker w = null;
    try {
        // 先新建个Worker给引用w
        w = new Worker(firstTask);
        // worker对象
        final Thread t = w.thread;
        // t为空的话,后续t.isAlive()/start()将报错
        if (t != null) {

            // 全局锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                //获取线程池的状态rs
                int rs = runStateOf(ctl.get());

                //条件1: 如果rs小于SHUTDOWN表明为running状态,该干嘛干嘛
                //条件2: rs == SHUTDOWN 且 firstTask 是空(此种情况允许创建worker)
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    //此时线程t本没有启动,如果启动了 说明出现了问题
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //一系列判断后,将新建的worker add到HashSet<Worker> workers对象中
                    workers.add(w);
                    // 重新记录下最大的线程的数量
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //worker 新增成功改其标志
                    workerAdded = true;
                }
            } finally {
                //解全局锁
                mainLock.unlock();
            }
            //成功则将worker 启动
            if (workerAdded) {
                //调用worker.run() -> runWorker()
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //如果worker添加失败,需将线程数量-1,因为上面+1了,还需将worker从workerSet中清除
        if (! workerStarted)
            addWorkerFailed(w);
    }
    // 创建worker成功正常启动 返回true
    // 线程池状态 > SHUTDOWN 返回false
    // 线程池状态 == SHUTDOWN,firstTask不为空,队列为空 返回false
    // 线程数量已经达到核心和最大线程数 返回false
    // ThreadFactory工厂类创建的线程为null 返回false
    return workerStarted;
}

<a name="5b648905"></a>
#### 线程执行的核心方法 runWorker

final void runWorker(Worker w) { //当前线程对象 Thread wt = Thread.currentThread(); //取出worker对象中firstTask Runnable task = w.firstTask; //置为null w.firstTask = null; // 此操作为调用AQS -> release(1) -> : /**

     *   setExclusiveOwnerThread(null);  //将独占线程设为null
     *   setState(0);                    //state置为0
     */
    w.unlock(); // allow interrupts

    // 是否突然完成了(正常、非正常)
    boolean completedAbruptly = true;
    try {
        /**
         *  1. (task == firstTask) != null
         *  2. firstTask == null了,则调用getTask去queue的队首中获取任务 getTask()会调用take(),是阻塞的
         *     如果为null 表示没有任务需要被执行了已经
         */
        while (task != null || (task = getTask()) != null) {
            // 调用worker的lock()方法,设置独占线程为当前线程并将state设为1, 即获取到锁
            w.lock();
            /**
             *   1: 看ctl的状态是否是>=STOP的,STOP不在接受新的任务,也不再处理queue中的任务,中断正在执行的任务
             *   2:(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))
             *      2.1 线程是被打断了的 注意interrupted()会在判断后重置中断标记,
             *      2.2 看ctl的状态是否是>=STOP的,
             *   3: 当前线程没有被中断
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 空方法 子类可实现
                beforeExecute(wt, task);
                // 任务执行异常的引用
                Throwable thrown = null;
                try {
                    // 执行线程
                    // submit执行的则结果封装为FutureTask
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 空方法 子类可实现
                    afterExecute(task, thrown);
                }
            } finally {
                // task 执行结束置为null
                task = null;
                // 执行的task数量+1
                w.completedTasks++;
                // worker处理完后 释放锁
                w.unlock();
            }
        }
        // 正常执行结束,getTask拿到的是null,正常退出
        completedAbruptly = false;
    } finally {
        // 线程池退出逻辑
        // 正常 |未正常执行结束,进行相应处理
        processWorkerExit(w, completedAbruptly);
    }
}

<a name="f97981bf"></a>
#### 线程执行的核心方法 getTask

// getTask()是从队列中获取任务 private Runnable getTask() { // timeOut = false.不会超时 boolean timedOut = false;

    for (;;) {
        //  获取ctl的值
        int c = ctl.get();
        // 获取线程池状态 rs
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        /**
         *  1.线程池状态>=SHUTDOWN
         *  2.线程池状态 >= STOP 或者 队列为空
         *  表示已经没有可以执行的任务了, 准备退出了
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 线程池数量 - 1
            decrementWorkerCount();
            return null;
        }

        /**
         *  执行到此 说明 rs是running  或者 rs == SHUTDOWN,队列不是null
         */

        // 获取线程池数量
        int wc = workerCountOf(c);

        /**
         * 1. allowCoreThreadTimeOut == true 表示核心线程也会被回收,通过poll(),false则维护核心线程数
         * 如果线程数量 > 核心线程数 表示所有线程会通过poll()来获取任务,poll()不到 线程准备退出
         * 就是来判断超时机制,有此机制可以在queue里获取任务时进行超时(poll()方法) 否则一直阻塞等待任务(take()方法),
         *
         * 在自旋时,wc > corePoolSize 是可能出现false的
         */
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 1.1 按道理线程数量不该大于maximumPoolSize,因为addWorker()的逻辑已经进行了逻辑判断,
        //    是因为有个setMaximumPoolSize()方法可以修改maximumPoolSize的值,继而会出现这种情况
        // 1.2 (timed && timedOut) 后续代码中 通过poll()获取任务超时会将timedOut改为true
        // 2.1 线程池中还有线程 当前线程可以被回收
        // 2.2 wc == 1 而且队列为null,最后一个线程也可以退出
        // 上述是在判断线程池是否可以退出
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 线程数 - 1,失败可能是并发下其他线程率先执行了,或者 线程池状态变化
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        // 代码至此,表示还能取任务
        try {
            // 获取任务允许超时的进行poll() 不超时的进行take()
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            // 成功获取进行返回
            if (r != null)
                return r;
            // 当前线程出现了超时,继续自旋
            timedOut = true;
        } catch (InterruptedException retry) {
            // 获取时出现异常,就自旋再获取
            timedOut = false;
        }
    }
}

<a name="5d608156"></a>
#### 线程池退出逻辑

private void processWorkerExit(Worker w, boolean completedAbruptly) { //如果是突然的异常退出,则将ctl的线程数量必须保证-1 if (completedAbruptly) // If abrupt, then workerCount wasn’t adjusted decrementWorkerCount();

    // 全局锁
    final ReentrantLock mainLock = this.mainLock;
    // 枷锁
    mainLock.lock();
    try {
        // 将worker完成的任务数汇总到completedTaskCount
        completedTaskCount += w.completedTasks;
        // 移除worker
        workers.remove(w);
    } finally {
        // 释放锁
        mainLock.unlock();
    }

    // 改线程池状态为TERMINATE的钩子方法
    tryTerminate();

    // 获取ctl的值
    int c = ctl.get();
    // 如果小于STOP 状态, 即为 RUNNING , SHUTDOWN
    if (runStateLessThan(c, STOP)) {
        // 如果是正常退出
        if (!completedAbruptly) {
            // min 表示线程池最小的线程数
            // 如果没开启回收核心线程,则将现在的核心线程数赋给min,开启则置为0
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 假设线程数是0了,但是还有任务未执行完,则留一个线程
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 如果线程数是大于等于 min 的,表示有线程去执行队列的剩下的任务
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 1.非正常退出,创建个worker出来作为补充
        // 2.queue还有任务没执行完,也要创建一个
        // 3.当前的线程数量 < 核心线程数, 维护其数量
        addWorker(null, false);
    }
}

<a name="9465f60d"></a>
#### 修改线程池状态为最终的TERMINATE

final void tryTerminate() { for (;;) { int c = ctl.get(); // 如果线程池状态是running 则不能修改 // 如果线程池状态已经是>=TIDYING了,表示马上就要变成TERMINATE了 //(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()) 表示线程池是SHUTDOWN,但是queue不是空 // 还有任务没搞完 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 已经没有线程需要执行了 if (workerCountOf(c) != 0) { // Eligible to terminate // ONLY_ONE == TRUE,只打断一个空闲worker interruptIdleWorkers(ONLY_ONE); return; }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 设置线程池状态为TIDYING成功
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 子类实现,终止时执行的方法
                    terminated();
                } finally {
                    // 设为TERMINATE
                    ctl.set(ctlOf(TERMINATED, 0));
                    // 唤醒所有等待线程
                    termination.signalAll();
                }
                return;
            }
        } finally {
            // 释放锁
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

<a name="57af449d"></a>
#### 线程池SHUTDOWN

public void shutdown() { // 先拿全局锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 检查权限相关 checkShutdownAccess(); // 去改线程池状态为SHUTDOWN advanceRunState(SHUTDOWN); //中断空闲的线程 interruptIdleWorkers(); // 子类实现 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 尝试修改线程池装为Terminate tryTerminate(); }

private void advanceRunState(int targetState) { for (;;) { // 获取线程池状态 int c = ctl.get(); /**

         *  条件一:判断当前线程池状态是否>=SHUTDOWN ,TRUE表示已经是SHUTDOWN,STOP,TIDYING,TERMINATION了
         *  条件二:如果1为false(线程池状态为RUNNING),则需要改线程池状态,如果ctl状态没有变化,则修改其状态为ctlOf(targetState, workerCountOf(c))
         *        ctlOf(targetState, workerCountOf(c))
         *                SHUTDOWN -> 000 000000000000000000000000000
         * 假设线程数量为                000 00000000000000000000000011 -> 5个
         * | 运算                      000 00000000000000000000000011  ->
         *                            就是SHUTDOWN了,不接受新的任务提交,但是会继续处理等待队列中的任务
         */
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}


// shutdown() 调用时,onlynoe默认传false,onlyone为true只中断一个???
private void interruptIdleWorkers(boolean onlyOne) {
    // 需要获取全局锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 遍历真正存放worker的set
        for (Worker w : workers) {
            //取出worker中真正的线程~
            Thread t = w.thread;
            // 如果worker还没被设置中断标记,并且尝试获取到了锁(tryLock()是有可能阻塞起来的,因为是通过CAS的)
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    // 将其中断
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    // 释放锁
                    w.unlock();
                }
            }
            // 当为false ,意在将workers全部中断,true就是中断一个就行
            if (onlyOne)
                break;
        }
    } finally {
        //释放锁
        mainLock.unlock();
    }
}

<a name="093a744a"></a>
#### 线程池关闭方法 shuotDownNow

public List shutdownNow() { List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 直接修改其状态为STOP advanceRunState(STOP); interruptWorkers(); // 将剩余任务导出到list中 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }

 private List<Runnable> drainQueue() {
    BlockingQueue<Runnable> q = workQueue;
    ArrayList<Runnable> taskList = new ArrayList<Runnable>();
    q.drainTo(taskList);
    if (!q.isEmpty()) {
        for (Runnable r : q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    return taskList;
}

```