1649988897(1).png
    用法

    1. public class ScheduledThreadPoolExecutorExample {
    2. public static void main(String[] args) {
    3. ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(5);
    4. Task task = new Task("任务");
    5. System.out.println("Created : " + task.getName());
    6. //延迟多长时间之后只执行一次
    7. // executor.schedule(task, 2, TimeUnit.SECONDS);
    8. //延迟指定时间后执行一次,之后按照:上一次任务执行时长 + 周期的时长 的时间去周期执行;
    9. // executor.scheduleWithFixedDelay(task, 0, 2, TimeUnit.SECONDS); //任务+延迟
    10. //延迟指定时间后执行一次,之后按照固定的时长周期执行;
    11. executor.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);//任延迟取最大值 稳定定时器
    12. }
    13. }

    底层调用的;方法

    1. private void delayedExecute(RunnableScheduledFuture<?> task) {
    2. //如果线程池不是RUNNING状态,则使用拒绝策略把提交任务拒绝掉
    3. if (isShutdown())
    4. reject(task);
    5. else {
    6. //与ThreadPoolExecutor不同,这里直接把任务加入延迟队列
    7. super.getQueue().add(task);
    8. //如果当前状态无法执行任务,则取消
    9. if (isShutdown() &&
    10. !canRunInCurrentRunState(task.isPeriodic()) &&
    11. remove(task))
    12. task.cancel(false);
    13. else
    14. //和ThreadPoolExecutor不一样,corePoolSize没有达到会增加Worker;
    15. //增加Worker,确保提交的任务能够被执行
    16. ensurePrestart();
    17. }
    18. }

    /*
    Specialized delay queue. To mesh with TPE declarations, this
    class must be declared as a BlockingQueue even though
    it can only hold RunnableScheduledFutures.
    */
    DelayedWorkQueue
    1649994047(1).png

    1. public boolean offer(Runnable x) {
    2. if (x == null)
    3. throw new NullPointerException();
    4. RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    5. final ReentrantLock lock = this.lock;
    6. lock.lock();
    7. try {
    8. int i = size;
    9. if (i >= queue.length)
    10. // 容量扩增50%。
    11. grow();
    12. size = i + 1;
    13. // 第一个元素,其实这里也可以统一进行sift-up操作,没必要特判。
    14. if (i == 0) {
    15. queue[0] = e;
    16. setIndex(e, 0);
    17. } else {
    18. // 插入堆尾。
    19. siftUp(i, e);
    20. }
    21. // 如果新加入的元素成为了堆顶,则原先的leader就无效了。
    22. if (queue[0] == e) {
    23. leader = null;
    24. // 由于原先leader已经无效被设置为null了,这里随便唤醒一个线程(未必是原先的leader)来取走堆顶任务。
    25. available.signal();
    26. }
    27. } finally {
    28. lock.unlock();
    29. }
    30. return true;
    31. }

    image.png
    RunnableScheduledFuture
    siftup方法:

    1. private void siftUp(int k, RunnableScheduledFuture<?> key) {
    2. // 找到父节点的索引
    3. while (k > 0) {
    4. // 获取父节点
    5. int parent = (k ­- 1) >>> 1;
    6. RunnableScheduledFuture<?> e = queue[parent
    7. // 如果key节点的执行时间大于父节点的执行时间,不需要再排序了
    8. if (key.compareTo(e) >= 0)
    9. break;
    10. // 如果key.compareTo(e) < 0,
    11. 说明key节点的执行时间小于父节点的执行时间,需要把父节点移到后面
    12. queue[k] = e;
    13. setIndex(e, k
    14. // 设置索引为k
    15. k = parent;
    16. }
    17. // key设置为排序后的位置中
    18. queue[k] = key;
    19. setIndex(key, k
    20. }
    21. }

    任务执行:

    1. public void run() {
    2. // 是否周期性,就是判断period是否为0。
    3. boolean periodic = isPeriodic();
    4. // 检查任务是否可以被执行。
    5. if (!canRunInCurrentRunState(periodic))
    6. cancel(false);
    7. // 如果非周期性任务直接调用run运行即可。
    8. else if (!periodic)
    9. ScheduledFutureTask.super.run();
    10. // 如果成功runAndRest,则设置下次运行时间并调用reExecutePeriodic。
    11. else if (ScheduledFutureTask.super.runAndReset()) {
    12. setNextRunTime();
    13. // 需要重新将任务(outerTask)放到工作队列中。此方法源码会在后文介绍ScheduledThreadPoolExecutor本身API时提及。
    14. reExecutePeriodic(outerTask);
    15. }
    16. }

    fied-rate模式和fixed-delay模式区别

    1. private void setNextRunTime() {
    2. long p = period;
    3. /*
    4. * fixed-rate模式,时间设置为上一次时间+p。
    5. * 提一句,这里的时间其实只是可以被执行的最小时间,不代表到点就要执行。
    6. * 这次任务还没执行完是 不会执行下一次的。
    7. */
    8. if (p > 0)
    9. time += p;
    10. /**
    11. * fixed-delay模式,计算下一次任务可以被执行的时间。
    12. * 就是当前时间+delay值。因为代码走到这里任务就已经结束了,now()可以认为就是任务结束时间。
    13. */
    14. else
    15. time = triggerTime(-p);
    16. }
    17. long triggerTime(long delay) {
    18. /*
    19. * 如果delay < Long.Max_VALUE/2,则下次执行时间为当前时间+delay。
    20. *
    21. * 否则为了避免队列中出现由于溢出导致的排序紊乱,需要调用overflowFree来修正一下delay(如果有必要的话)。
    22. */
    23. return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    24. }
    25. /**
    26. * 主要就是有这么一种情况:
    27. * 某个任务的delay为负数,说明当前可以执行(其实早该执行了)。
    28. * 工作队列中维护任务顺序是基于compareTo的,在compareTo中比较两个任务的顺序会用time相减,负数则说明优先级高。
    29. *
    30. * 那么就有可能出现一个delay为正数,减去另一个为负数的delay,结果上溢为负数,则会导致compareTo产生错误的结果。
    31. *
    32. * 为了特殊处理这种情况,首先判断一下队首的delay是不是负数,如果是正数不用管了,怎么减都不会溢出。
    33. * 否则可以拿当前delay减去队首的delay来比较看,如果不出现上溢,则整个队列都ok,排序不会乱。
    34. * 不然就把当前delay值给调整为Long.MAX_VALUE + 队首delay。
    35. */
    36. private long overflowFree(long delay) {
    37. Delayed head = (Delayed) super.getQueue().peek();
    38. if (head != null) {
    39. long headDelay = head.getDelay(NANOSECONDS);
    40. if (headDelay < 0 && (delay - headDelay < 0))
    41. delay = Long.MAX_VALUE + headDelay;
    42. }
    43. return delay;
    44. }

    image.png
    DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序(time小的排在前面),若time相同则根据sequenceNumber排序( sequenceNumber小的排在前面);