ThreadPoolExecutor

线程池的优点

  1. 当执行大量异步任务时线程池能够提供较好的性能。在不使用线程池时,每当需要执行异步任务时直接new 一个线程来运行,而线程的创建和销毁是需要开销的。线程池里面的线程是可复用的,不需要每次执行异步任务时都重新创建和销毁线程。
  2. 是线程池也提供了一种资源限制和管理的手段,比如可以限制线程的个数,动态新增线程等。每个ThreadPoo!Executor 也保留了一些基本的统计数据, 比如当前线程池完成的任务数目等。
  3. 线程池提供了可调参数和可扩展性接口,满足不同情境需要。

Executors.png
Executors是一个工具类,提供了许多静态方法,根据需要返回不同线程池示例。ThreadPoolExecutor继承了AbstractExecutorService,其成员变量ctl是一个原子类,记录线程池状态和线程池中线程个数。
ThreadPoolExecutor.png

线程池状态

  1. // ctl 高29-31位表示线程池状态,0-28位表示线程数量(最多2^29-1,约5亿)
  2. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  3. // 线程个数掩码位数
  4. private static final int COUNT_BITS = Integer.SIZE - 3; // 29
  5. // 线程数
  6. private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 2^29-1 0001 1111 1111 1111 1111 1111 1111 1111
  7. // 高三位 29~31位 表示线程池状态 RUNNING < STUTDOWN < STOP < TIDYING < TERMINATED
  8. // 111 0 0000 0000 0000 0000 0000 0000 0000 表示RUNNING
  9. private static final int RUNNING = -1 << COUNT_BITS;
  10. // 000 0 0000 0000 0000 0000 0000 0000 0000 表示SHUTDOWN
  11. private static final int SHUTDOWN = 0 << COUNT_BITS;
  12. // 001 0 0000 0000 0000 0000 0000 0000 0000 表示STOP
  13. private static final int STOP = 1 << COUNT_BITS;
  14. // 010 0 0000 0000 0000 0000 0000 0000 0000 表示TIDYING
  15. private static final int TIDYING = 2 << COUNT_BITS;
  16. // 011 0 0000 0000 0000 0000 0000 0000 0000 表示TERMINATED
  17. private static final int TERMINATED = 3 << COUNT_BITS;
  18. // 获取状态 c的高三位 CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111
  19. private static int runStateOf(int c) { return c & ~CAPACITY; }
  20. // 获取线程池线程数量
  21. private static int workerCountOf(int c) { return c & CAPACITY; }
  22. // 计算ctl新值
  23. private static int ctlOf(int rs, int wc) { return rs | wc; }
  24. private static boolean runStateLessThan(int c, int s) {
  25. return c < s;
  26. }
  27. private static boolean runStateAtLeast(int c, int s) {
  28. return c >= s;
  29. }
  30. private static boolean isRunning(int c) {
  31. return c < SHUTDOWN;
  32. }

线程池状态含义:

  • RUNNING:接受新任务并且处理阻塞队列里的任务
  • SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务
  • STOP:拒绝新任务并抛弃阻塞队列里的任务,同时中断正在处理的任务
  • TIDYING:所有任务都执行完(包含阻塞队列里面的任务)后当前线程池活动线程数为0,将调用terminated方法
  • TERMINATED:终止状态。terminated方法调用完成以后的状态

    RUNNING -> SHUTDOWN : 显式调用shutdown () 方法, 或者隐式调了finalize()方法里面的shutdown()方法 RUNNING 或SHUTDOWN-> STOP : 显式调用shutdownNow()方法时 SHUTDOWN -> TIDY ING : 当线程池和任务队列都为空时 STOP -> TIDYING : 当线程池为空时 TIDYYIG-> TERM剧ATED : 当terminated() hook 方法执行完成时

线程池参数

  1. /**
  2. 用于保存等待执行的任务的阻塞队列,
  3. 比如基于数组的有界ArrayBlock ingQueue 、基于链表的无界LinkedBlockingQueue 、
  4. 最多只有一个元素的同步队列SynchronousQueue 及优先级队列PriorityB lockingQueue 等。
  5. */
  6. private final BlockingQueue<Runnable> workQueue;
  7. // mainLock 是独占锁, 用来控制新增Worker线程操作的原子性。
  8. private final ReentrantLock mainLock = new ReentrantLock();
  9. private final HashSet<Worker> workers = new HashSet<Worker>();
  10. // 线程调用awaitTermination时进入termination的等待条件队列
  11. private final Condition termination = mainLock.newCondition();
  12. private int largestPoolSize;
  13. // 完成的任务数量
  14. private long completedTaskCount;
  15. // 创建线程工厂
  16. private volatile ThreadFactory threadFactory;
  17. /**
  18. 饱和策略, 当队列满并且线程个数达到maximunPoolSize后采取的策略,
  19. 比如AbortPolicy (抛出异常〉
  20. CallerRunsPolicy (使用调用者所在线程来运行任务)
  21. DiscardOldestPolicy (调用poll 丢弃一个任务,执行当前任务)
  22. DiscardPolicy (默默丢弃,不抛出异常〉
  23. */
  24. private volatile RejectedExecutionHandler handler;
  25. // 存活时间 如果当前线程池中的线程数量比核心线程数量多,并且是闲置状态, 则这些闲置的线程能存活的最大时间。
  26. private volatile long keepAliveTime;
  27. private volatile boolean allowCoreThreadTimeOut;
  28. // 线程池核心线程数
  29. private volatile int corePoolSize;
  30. // 线程池最大线程数量
  31. private volatile int maximumPoolSize;

ThreadPoolExecutor构造方法

构造方法重要参数:
int corePoolSize:核心线程数量
int maximumPoolSize:最大线程数量
long keepAliveTime:非核心线程空闲时的生存时间
TimeUnit unit :时间单位
BlockingQueue workQueue:工作阻塞队列
TheadFactory threadFactory:线程工厂
RejectExectionHandler handler: 拒绝策略,线程个数达到maximunPoolSize并且任务队列满并且后采取的策略,jdk提供了4种,可自定义。jdk提供的4种:AbortPolicy (抛出异常〉、 CallerRunsPolicy(调用线程运行)、DiscardPolicy (丢弃任务)、 DiscardOldestPolicy (丢弃阻塞队列等待时间最长的任务,新任务加阻塞队列)

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue) {
  6. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  7. Executors.defaultThreadFactory(), defaultHandler);
  8. }
  9. public ThreadPoolExecutor(int corePoolSize,
  10. int maximumPoolSize,
  11. long keepAliveTime,
  12. TimeUnit unit,
  13. BlockingQueue<Runnable> workQueue,
  14. RejectedExecutionHandler handler) {
  15. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  16. Executors.defaultThreadFactory(), handler);
  17. }
  18. public ThreadPoolExecutor(int corePoolSize,
  19. int maximumPoolSize,
  20. long keepAliveTime,
  21. TimeUnit unit,
  22. BlockingQueue<Runnable> workQueue,
  23. ThreadFactory threadFactory) {
  24. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  25. threadFactory, defaultHandler);
  26. }
  27. public ThreadPoolExecutor(int corePoolSize,
  28. int maximumPoolSize,
  29. long keepAliveTime,
  30. TimeUnit unit,
  31. BlockingQueue<Runnable> workQueue,
  32. ThreadFactory threadFactory,
  33. RejectedExecutionHandler handler) {
  34. if (corePoolSize < 0 ||
  35. maximumPoolSize <= 0 ||
  36. maximumPoolSize < corePoolSize ||
  37. keepAliveTime < 0)
  38. throw new IllegalArgumentException();
  39. if (workQueue == null || threadFactory == null || handler == null)
  40. throw new NullPointerException();
  41. this.acc = System.getSecurityManager() == null ?
  42. null :
  43. AccessController.getContext();
  44. this.corePoolSize = corePoolSize;
  45. this.maximumPoolSize = maximumPoolSize;
  46. this.workQueue = workQueue;
  47. this.keepAliveTime = unit.toNanos(keepAliveTime);
  48. this.threadFactory = threadFactory;
  49. this.handler = handler;
  50. }
public class Code01_UserDefinedThreadPool {

    public static void main(String[] args) {

        ThreadPoolExecutor threadPoolExecutor =
            new ThreadPoolExecutor(2, 4, 0L,
                                   TimeUnit.MILLISECONDS,
                                   new ArrayBlockingQueue<>(5),
                                   new MyThreadFactory(),
                                   new ThreadPoolExecutor.AbortPolicy());

        for (int i = 0; i < 10; i++) {
            final int[] j = {i};
            threadPoolExecutor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " 执行 任务" + j[0] + "   线程池状态:" + threadPoolExecutor.toString());
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });

        }
    }

    static class MyThreadFactory implements ThreadFactory {
        AtomicLong threadNum = new AtomicLong(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "线程池测试线程" + threadNum.getAndIncrement());
        }
    }

}

运行结果:最多4个线程、任务阻塞队列个数5,所以第10个任务(编号9)的任务被拒绝,根据设定的拒绝策略,抛出异常。
image.png

默认线程池类型

  • newFixedThreadPool:创建一个核心线程个数和最大线程个数都为nThreads的线程池,并且阻塞队列长度为Integer.MAX_VALUE。keepAlivetime=0说明只要线程个数比核心线程个数多并且当前空闲则回收。 ```java public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>());
    
    }

// 使用自定义线程创建工厂 public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory); }


- **newSingleThreadExecutor**:创建一个核心线程个数和最大线程个数都为1的线程池,并且阻塞队列长度为Integer.MAX_VALUE。keepAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收。
```java
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

// 使用自定义的线程创建工厂
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}
  • newCachedThreadPool:创建一个按需创建线程的线程池,初始线程个数为0,最多线程个数为Integer.MAX_VALUE,并且阻塞队列为同步对了。keepAliveTinme=60说明当前线程在60s内空闲则回收。加入同步队列的任务会被马上执行,同步队列里面最多只有一个任务,再继续添加任务会被阻塞。

    SynchronousQueue 容量为0,用于两个线程之间传递数据,类似Exchanger。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

// 自定义线程创建工厂
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}

工作线程Worker

用户线程提交任务到线程池后,由Worker执行。Worker继承了AQS和Runnable接口,自己实现了不可重入的独占锁,state=0表示锁未被获取,state=1表示锁已经被获取状态,state=-1是Worker创建时的默认状态,避免在调用runWorker()方法前被中断。firstTash记录工作线程执行的第一个任务。thread是具体执行任务的线程。

 private final class Worker extends AbstractQueuedSynchronizer implements Runnable
Worker(Runnable firstTask) {
    setState(-1); // 在调用runWorker前禁止中断
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this); // 创建一个线程
}

在构造函数内首先设置Worker 的状态为-1 ,这是为了避免当前Worker 在调用runWorker 方法前被中断(当其他线程调用了线程池的shutdownNow 时,如果Worker 状态>=0 则会中断该线程) 。这里设置了线程的状态为-1 ,所以该线程就不会被中断了。

这里在执行具体任务期间加锁,是为了避免在任务运行期间,其他线程调用了shutdown 后正在执行的任务被中断( shutdown 只会中断当前被阻塞挂起的线程〕

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;

    // worker自己继承了AQS,实现了非重入的独占锁 
    w.unlock(); // 将state设置为0,表示允许中断
    boolean completedAbruptly = true;
    try {

        // task null 检查 
        while (task != null || (task = getTask()) != null) {

            w.lock(); 
            /*
           1、 runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()
                   线程池的状态 >=STOP(STOP、TIDYING、TERMINATED) 同时工作线程没有被中断

           2、  Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()
                   线程被中断 同时 线程池的状态被设置为STOP或TIDYING或TERMINATED 线程中断

            */
            if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                && !wt.isInterrupted())
                wt.interrupt();

            try {
                // 执行任务前干的事情
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 执行任务完毕后千一些事情
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // )统计当前Worker 完成了多少个任务
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // )执行清理工作
        processWorkerExit(w, completedAbruptly);
    }
}

execute

execute 方法的作用是提交任务command 到线程池进行执行。用户线程提交任务到线程池的模型图如图所示:(生产者-消费者模型,用户线程是生产者,w线程池worker线程是消费者)
image.png

public void execute(Runnable command) {
    // null判断
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get(); // 获取ctl (高3位表示线程池状态,低29位表示线程数量)

    // 如果线程池中线程个数小于corePoolSize,则添加新的线程执行该任务
    if (workerCountOf(c) < corePoolSize) {
        // 核心线程添加成功则核心线程运行这个任务
        if (addWorker(command, true)) 
            return;
        // 核心线程添加失败,再次获取 ctl
        c = ctl.get();
    }

    // 如果线程池处于RUNNING,则添加任务到阻塞队列,offer添加成功返回true,失败返回false
    if (isRunning(c) && workQueue.offer(command)) {

        // 二次检查,防止其他线程改变了线程池状态
        // 如果当前线程池状态不是RUNNING,则从从队列中删除任务,并执行拒绝策略
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果当前线程池是否还有线程,没有则添加一个线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果队列满了,则新增工作线程(非核心线程),线程新增失败执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get(); // ctl  高3位线程状态,低29位线程数量
        int rs = runStateOf(c); // 线程池的状态

        /*
        RUNNING(111 xxxxx……) < SHUTDOWN(000 xxxx……) < STOP(001 xxxxx……) < TIDYING(010 xxxx……)<TERMINATED(011 xxxx……)

        rs>=SHUTDOWN(当前线程池状态为STOP 、TIDYING 或TERMINATED )||
        rs==SHUTDOWN && firstTask !=null(当前线程池状态为SHUTDOWN 并且己经有了第一个任务)||
        rs==SHUTDOWN && workQueue.isEmpty (当前线程池状态为SHUTDOWN 并且任务队列为空)

        以上情况任意一种,添加失败
       */
        if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty()))
            return false;  

        // 循环CAS增加线程个数
        for (;;) {
            // 获取工作线程的个数
            int wc = workerCountOf(c);

            /*1、如果线程个数超过限制返回false,
              core是否为核心线程:
              2、core=true,添加的是核心线程,当前线程数>corePoolSize,无法添加,返回false
              3、core=false,添加的不是核心线程,当前线程数>maximumPoolSize,无法添加,返回false
            */
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false;

            // CAS增加线程个数,同时只有一个线程成功,添加成功跳出外层循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // CAS失败,则看线程池状态是否变化了,变化则跳出循坏尝试获取线程池,否则继续重新CAS添加工作线程
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    // 运行到这表示CAS成功 线程数量添加成功
    boolean workerStarted = false;  
    boolean workerAdded = false;
    Worker w = null; 
    try {
        // 创建worker,第一个任务为firstTask
        w = new Worker(firstTask); 
        final Thread t = w.thread; // t为worker执行任务的线程
        if (t != null) {
            // 整个线程池加独占锁,把新增的Worker 添加到工作集workers
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
               // 再次检查线程池状态,避免在获取锁之前调用了shutDown
                int rs = runStateOf(ctl.get());

                /*
                rs<SHUTDOWN 表示 线程程 RUNNING,线程池可以任务阻塞队列可以添加任务
                rs == SHUTDOWN && firstTask==null,
                    rs == SHUTDOWN:线程池只能运行阻塞队列里的任务,不能添加
                    firstTask==null:第一个任务为null

                */
                if (rs < SHUTDOWN || rs == SHUTDOWN && firstTask == null)) {
                    // 如果线程已经启动,抛出异常
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                   // 添加任务
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 添加成功后启动任务
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

shutdown

调用shutdown 方法后,线程池就不会再接受新的任务了,但是工作队列里面的任务还是要执行的。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 权限检查
        checkShutdownAccess();
        // 设置当前线程池状态SHUTDOWN,如采已经是SHUTDOWN则直接返回
        advanceRunState(SHUTDOWN);
        // 设置中断标志
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 尝试将状态变为TERMINATED
    tryTerminate();
}
private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // 没有被中断的空闲的线程,设置中断
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // 如果线程池是RUNNING状态或者已经是TIDTING或TERMINATED 或者是SHUTDOWN且任务队列非空,直接返回
        if (isRunning(c) || runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
            return;

        // 工作线程不等于0,将一个线程中断
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 设置当前线程池状态为TIDYING 
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    //TIDYING 设置成功,
                    // 执行扩展接口terminated 在线程池状态变为TERMINATED前做一些事情
                    terminated();
                } finally {
                    // 设置线程池状态为TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));

                    // 唤醒调用添加阻塞队列 termimarion.await方法而被阻塞的所有线程
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

shutdownNow

调用shutdownNow 方法后, 线程池就不会再接受新的任务了,并且会丢弃工作队列里面的任务, 正在执行的任务会被中断, 该方法会立刻返回,并不等待激活的任务执行完成。返回值为这时候队列里面被丢弃的任务列表。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 权限检查
        checkShutdownAccess();
        // 设置线程池状态STOP
        advanceRunState(STOP);
        // 中断所有线程
        interruptWorkers();
        // 将任务列表移动的task
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
// 中断全部线程,包括空闲线程和正在执行任务的线程
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}
void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

awaitTermination

当线程调用awaitTermination 方法后,当前线程会被阻塞,直到线程池状态变TERMINATED 才返回, 或者等待时间超时才返回。

public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            // 判断线程池状是否是TERMINATED
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
            // 当前线程进入termination等待队列 
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor是指定一定延迟时间后或者定时进行任务调度执行的线程池。 ScheduledThreadPoolExecutor 继承了ThreadPoolExecutor 并实现了ScheduledExecutorService 接口。 线程池队列是DelayedWorkQueue , 其和DelayedQueue 类似,是一个延迟队列。

ScheduledThreadPoolExecutor.png
image.png

ScheduledFutureTask

ScheduledThreadPoolExecutor 内部有一个ScheduledFutureTask类,继承了FutureTask,实现了RunnableScheduledFuture。

private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {

FutureTask内部有一个表示任务的状态的变量 state

private volatile int state;
private static final int NEW          = 0;  // 初始状态
private static final int COMPLETING   = 1;  // 执行中
private static final int NORMAL       = 2;  // 正常运行结束状态
private static final int EXCEPTIONAL  = 3;  // 运行中异常
private static final int CANCELLED    = 4;  // 任务被取消
private static final int INTERRUPTING = 5;  // 任务正在被中断
private static final int INTERRUPTED  = 6;  // 任务已经被中断

可能的任务的状态转移路径: NEW -> COMPLETING ->NORMAL NEW ->COMPLETING ->EXCEPTIONAL NEW ->CANCELLED NEW ->INTERRUPTING ->INTERRUPTED

ScheduledFutureTask内部还有一个变量period用来表示任务的类型

/**
period = 0 : 表示当前任务是一次性的,执行完毕后退出
period < 0 : 表示当前任务为fixed-delay任务,是固定延迟的定时可重复执行任务
period > 0 : 表示当前任务为fixed-rate任务,是固定频率的定时可重复执行任务
*/
private final long period;

构造函数

由构造函数可知,ScheduledThreadPoolExecutor的任务队列是DelayedWorkedQueue

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());  
}

public ScheduledThreadPoolExecutor(int corePoolSize,RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}

 public ScheduledThreadPoolExecutor(int corePoolSize,
                                    ThreadFactory threadFactory,RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory, handler);
    }

schedule

该方法的作用是提交一个延迟执行的任务, 任务从提交时间算起延迟单位为unit 的delay 时间后开始执行。提交的任务不是周期性任务,任务只会执行一次

public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {

    // 参数检查
    if (command == null || unit == null)
        throw new NullPointerException();
    // 任务转换
    RunnableScheduledFuture<?> t = 
        decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));
    // 添加任务延迟队列
    delayedExecute(t);
    return t;
}

ScheduledFutureTask构造函数

ScheduledFutureTask(Runnable r, V result, long ns) {
    super(r, result); // FutureTask(Runnable runnable, V result)
    this.time = ns; // 表示启动任务的时间
    this.period = 0; // 任务类型 ,period=0表示一次性的任务
    this.sequenceNumber = sequencer.getAndIncrement();
}
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(r unnable, result); // Runnable类型任务装换为Callable类型
    this.state = NEW;       // 设置任务状态为NEW
}

delayedExecute:将任务添加代任务队列(延迟队列),让最快要过期的元素放到队首

private void delayedExecute(RunnableScheduledFuture<?> task) {
      // 线程池一个SHUTDOWN 则拒绝加入
    if (isShutdown())
        reject(task);
    else {
        // 添加任务到延迟队列
        super.getQueue().add(task);
        // 再次检查线程池状态
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            // 确保至少一个线程在处理任务
            ensurePrestart();
    }
}
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    // 增加核心线程
    if (wc < corePoolSize)
        addWorker(null, true);
    // 如采初始化corePoolSize= = O , 则也添加一个线程
    else if (wc == 0)
        addWorker(null, false);
}

ScheduledFutureTask#run

public void run() {
    // 线程是否只执行一次,false表示只执行1次
    boolean periodic = isPeriodic(); // return period != 0;

    // 取消任务
    if (!canRunInCurrentRunState(periodic))
        cancel(false);

    else if (!periodic) // 只执行1次
        ScheduledFutureTask.super.run(); // 调用FutureTask#run
    else if (ScheduledFutureTask.super.runAndReset()) { // 定时执行
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}
public void run() {
    // 当前任务不是NEW 或者 当前任务是NEW但是CAS设置当前线程为任务持有者失败
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;

    try {
        Callable<V> c = callable;
        // 执行Callable,再次检查任务状态为NEW
        if (c != null && state == NEW) {
            V result;
            boolean ran;

            // 正常结算设置任务状态NORMAL,抛出异常设置任务状态EXCEPTIONAL
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            // 执行成功修改任务状态
            if (ran)
                set(result);
        }
    } finally {

        runner = null;

        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        // 设置当前任务状态为NORMAL 正常结算,没有使用CA S 是因为对于同一个任务只可能有一个线程运行到这里
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}
protected void setException(Throwable t) {
    // 如果当前任务状态是NEW 设置为COMPLETING 
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        // 设置当前线程EXCEPTIONAL  非正常结束
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

scheduleWithFixedDelay

当任务执行完毕后,让其延迟固定时间后再次运行( fixed-delay 任务)。 其中initialDelay 表示提交任务后延迟多少时间开始执行任务command, delay 表示当任务执行完毕后延长多少时间后再次运行command 任务, unit 是initialDelay 和delay 的时间单位 任务会一直重复运行直到任务运行中抛出了异常,被取消了,或者关闭了线程池。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    // 参数检查
    if (command == null || unit == null)
        throw new NullPointerException();

    // 参数检查,延迟不能小于0
    if (delay <= 0)
        throw new IllegalArgumentException();

    // 任务转换,period = -delay < 0, 表示当前任务为fixed-delay任务,是固定延迟的定时可重复执行任务
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));

    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;

    // 添加任务到延迟队列
    delayedExecute(t);
    return t;
}

执行任务是会先调用ScheduledFutureTask#run,发现period!=0,则会调用FutureTask#runAndReset,运行成功在执行 setNextRunTime()、reExecutePeriodic(outerTask);

FutureTask#runAndReset

protected boolean runAndReset() {

    // 任务状态不是NEW 或者是NEW 当前设置当前线程为任务执行先失败
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return false;


    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        // 再次检查任务状态为NEW
        if (c != null && s == NEW) {
            try {
                c.call(); // don't set result
                ran = true;
            } catch (Throwable ex) {
                // 执行Callable过程抛出异常,设置任务状态为异常中断
                setException(ex);
            }
        }
    } finally {

        runner = null;
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    // 如果当前任务正常执行完毕并且任务状态为NEW 则返回true , 否则返回false
    return ran && s == NEW;
}

ScheduledFutureTask#setNextRunTime

private void setNextRunTime() {
    long p = period;
    if (p > 0)  // fixed-rate 类型任务
        time += p;
    else  // fixed-delay类型任务
        time = triggerTime(-p); // 设置time 为当前时间加上-p 的时间,也就是延迟-p时间后再次执行
}

scheduleAtFixedRate

该方法相对起始时间点以固定频率调用指定的任务( fixed-rate 任务) 。 当把任务提交到线程池并延迟initialDelay 时间( 时间单位为unit )后开始执行任command 然后从initialDelay+period 时间点再次执行,而后在initialDelay + 2 * period 时间点再次执行,循环往复,直到抛出异常或者调用了任务的cancel 方法取消了任务,或者关闭了线程池。 scheduleAtFixedRate 的原理与scheduleWithFixedDelay 类似,下面我们讲下它们之间的不同点。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    // period > 0 
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));

    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

当前任务执行完毕后,调用setNextRunTime 设置任务下次执行的时间时执行的是time += p 而不再是time = triggerTime(-p) 。
image.png

ForkJoinPool入门

CompletableFuture使用