从单机角度,定时任务实现主要有以下几种方案:

1. while+sleep

本质上,定义一个线程,使用while循环,通过sleep延迟时间来周期性调度
结合示例代码如下:

  1. public static void main(String[] args) {
  2. final long timeInterval = 5000;
  3. new Thread(new Runnable() {
  4. @Override
  5. public void run() {
  6. while (true) {
  7. System.out.println(Thread.currentThread().getName() + "每隔5秒执行一次");
  8. try {
  9. Thread.sleep(timeInterval);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. }
  15. }).start();
  16. }

实现上,较为简单,但是弊端也很明显。如果创建了大量的这种线程,会发现大量的定时任务线程在调度切换时,损耗巨大,且整体效率低。且无法自定义调度规则。

2. 最小堆实现timer

所谓的最小堆方案,每当有新的任务加入的时候,会把需要即将执行的任务排到前面,同时会有一个线程不断的轮询判断,如果当前某个任务已经到达执行时间点,就会立即执行。具体的实现就是jdk自带的timer。

  1. public static void main(String[] args) {
  2. Timer timer = new Timer();
  3. // 每隔一秒调用一次
  4. timer.schedule(new TimerTask() {
  5. @Override
  6. public void run() {
  7. System.out.println("timer1 Test");
  8. }
  9. },1000,1000);
  10. timer.schedule(new TimerTask() {
  11. @Override
  12. public void run() {
  13. System.out.println("timer1 Test2");
  14. }
  15. },3000,3000);
  16. }

从源码可以看出是通过最小堆实现的。通过起一个TimerTask线程,共用一个Timer调度器。我们深入源码看一下

  1. /**
  2. * Schedules the specified task for repeated <i>fixed-delay execution</i>,
  3. * beginning after the specified delay. Subsequent executions take place
  4. * at approximately regular intervals separated by the specified period.
  5. *
  6. * <p>In fixed-delay execution, each execution is scheduled relative to
  7. * the actual execution time of the previous execution. If an execution
  8. * is delayed for any reason (such as garbage collection or other
  9. * background activity), subsequent executions will be delayed as well.
  10. * In the long run, the frequency of execution will generally be slightly
  11. * lower than the reciprocal of the specified period (assuming the system
  12. * clock underlying <tt>Object.wait(long)</tt> is accurate).
  13. *
  14. * <p>Fixed-delay execution is appropriate for recurring activities
  15. * that require "smoothness." In other words, it is appropriate for
  16. * activities where it is more important to keep the frequency accurate
  17. * in the short run than in the long run. This includes most animation
  18. * tasks, such as blinking a cursor at regular intervals. It also includes
  19. * tasks wherein regular activity is performed in response to human
  20. * input, such as automatically repeating a character as long as a key
  21. * is held down.
  22. *
  23. * @param task task to be scheduled.
  24. * @param delay delay in milliseconds before task is to be executed.
  25. * @param period time in milliseconds between successive task executions.
  26. * @throws IllegalArgumentException if {@code delay < 0}, or
  27. * {@code delay + System.currentTimeMillis() < 0}, or
  28. * {@code period <= 0}
  29. * @throws IllegalStateException if task was already scheduled or
  30. * cancelled, timer was cancelled, or timer thread terminated.
  31. * @throws NullPointerException if {@code task} is null
  32. */
  33. public void schedule(TimerTask task, long delay, long period) {
  34. if (delay < 0)
  35. throw new IllegalArgumentException("Negative delay.");
  36. if (period <= 0)
  37. throw new IllegalArgumentException("Non-positive period.");
  38. sched(task, System.currentTimeMillis()+delay, -period);
  39. }

从方法上可以看出,这里主要做参数验证,其中TimerTask是一个线程任务,delay表示延迟多久执行(单位毫秒),period表示多久执行一次(单位毫秒)

继续深入

  1. /**
  2. * Schedule the specified timer task for execution at the specified
  3. * time with the specified period, in milliseconds. If period is
  4. * positive, the task is scheduled for repeated execution; if period is
  5. * zero, the task is scheduled for one-time execution. Time is specified
  6. * in Date.getTime() format. This method checks timer state, task state,
  7. * and initial execution time, but not period.
  8. *
  9. * @throws IllegalArgumentException if <tt>time</tt> is negative.
  10. * @throws IllegalStateException if task was already scheduled or
  11. * cancelled, timer was cancelled, or timer thread terminated.
  12. * @throws NullPointerException if {@code task} is null
  13. */
  14. private void sched(TimerTask task, long time, long period) {
  15. if (time < 0)
  16. throw new IllegalArgumentException("Illegal execution time.");
  17. // Constrain value of period sufficiently to prevent numeric
  18. // overflow while still being effectively infinitely large.
  19. if (Math.abs(period) > (Long.MAX_VALUE >> 1))
  20. period >>= 1;
  21. synchronized(queue) {
  22. if (!thread.newTasksMayBeScheduled)
  23. throw new IllegalStateException("Timer already cancelled.");
  24. synchronized(task.lock) {
  25. if (task.state != TimerTask.VIRGIN)
  26. throw new IllegalStateException(
  27. "Task already scheduled or cancelled");
  28. task.nextExecutionTime = time;
  29. task.period = period;
  30. task.state = TimerTask.SCHEDULED;
  31. }
  32. queue.add(task);
  33. if (queue.getMin() == task)
  34. queue.notify();
  35. }
  36. }

从上述操作可以看出,在同步代码块中,将task放入到queue中。

我们可以继续来看queue对象,任务会加入到TaskQueue中。同时在Timer初始化阶段会将TaskQueue作为参数传入。

  1. public class Timer {
  2. private final TaskQueue queue = new TaskQueue();
  3. private final TimerThread thread = new TimerThread(queue);
  4. public Timer() {
  5. this("Timer-" + serialNumber());
  6. }
  7. public Timer(String name) {
  8. thread.setName(name);
  9. thread.start();
  10. }
  11. //...
  12. }

TaskQueue是一个最小堆的数据实体,源码如下:

  1. /**
  2. * This class represents a timer task queue: a priority queue of TimerTasks,
  3. * ordered on nextExecutionTime. Each Timer object has one of these, which it
  4. * shares with its TimerThread. Internally this class uses a heap, which
  5. * offers log(n) performance for the add, removeMin and rescheduleMin
  6. * operations, and constant time performance for the getMin operation.
  7. */
  8. class TaskQueue {
  9. /**
  10. * Priority queue represented as a balanced binary heap: the two children
  11. * of queue[n] are queue[2*n] and queue[2*n+1]. The priority queue is
  12. * ordered on the nextExecutionTime field: The TimerTask with the lowest
  13. * nextExecutionTime is in queue[1] (assuming the queue is nonempty). For
  14. * each node n in the heap, and each descendant of n, d,
  15. * n.nextExecutionTime <= d.nextExecutionTime.
  16. */
  17. private TimerTask[] queue = new TimerTask[128];
  18. /**
  19. * The number of tasks in the priority queue. (The tasks are stored in
  20. * queue[1] up to queue[size]).
  21. */
  22. private int size = 0;
  23. /**
  24. * Returns the number of tasks currently on the queue.
  25. */
  26. int size() {
  27. return size;
  28. }
  29. /**
  30. * Adds a new task to the priority queue.
  31. */
  32. void add(TimerTask task) {
  33. // Grow backing store if necessary
  34. if (size + 1 == queue.length)
  35. queue = Arrays.copyOf(queue, 2*queue.length);
  36. queue[++size] = task;
  37. fixUp(size);
  38. }
  39. /**
  40. * Return the "head task" of the priority queue. (The head task is an
  41. * task with the lowest nextExecutionTime.)
  42. */
  43. TimerTask getMin() {
  44. return queue[1];
  45. }
  46. /**
  47. * Return the ith task in the priority queue, where i ranges from 1 (the
  48. * head task, which is returned by getMin) to the number of tasks on the
  49. * queue, inclusive.
  50. */
  51. TimerTask get(int i) {
  52. return queue[i];
  53. }
  54. /**
  55. * Remove the head task from the priority queue.
  56. */
  57. void removeMin() {
  58. queue[1] = queue[size];
  59. queue[size--] = null; // Drop extra reference to prevent memory leak
  60. fixDown(1);
  61. }
  62. /**
  63. * Removes the ith element from queue without regard for maintaining
  64. * the heap invariant. Recall that queue is one-based, so
  65. * 1 <= i <= size.
  66. */
  67. void quickRemove(int i) {
  68. assert i <= size;
  69. queue[i] = queue[size];
  70. queue[size--] = null; // Drop extra ref to prevent memory leak
  71. }
  72. /**
  73. * Sets the nextExecutionTime associated with the head task to the
  74. * specified value, and adjusts priority queue accordingly.
  75. */
  76. void rescheduleMin(long newTime) {
  77. queue[1].nextExecutionTime = newTime;
  78. fixDown(1);
  79. }
  80. /**
  81. * Returns true if the priority queue contains no elements.
  82. */
  83. boolean isEmpty() {
  84. return size==0;
  85. }
  86. /**
  87. * Removes all elements from the priority queue.
  88. */
  89. void clear() {
  90. // Null out task references to prevent memory leak
  91. for (int i=1; i<=size; i++)
  92. queue[i] = null;
  93. size = 0;
  94. }
  95. /**
  96. * Establishes the heap invariant (described above) assuming the heap
  97. * satisfies the invariant except possibly for the leaf-node indexed by k
  98. * (which may have a nextExecutionTime less than its parent's).
  99. *
  100. * This method functions by "promoting" queue[k] up the hierarchy
  101. * (by swapping it with its parent) repeatedly until queue[k]'s
  102. * nextExecutionTime is greater than or equal to that of its parent.
  103. */
  104. private void fixUp(int k) {
  105. while (k > 1) {
  106. int j = k >> 1;
  107. if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
  108. break;
  109. TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;
  110. k = j;
  111. }
  112. }
  113. /**
  114. * Establishes the heap invariant (described above) in the subtree
  115. * rooted at k, which is assumed to satisfy the heap invariant except
  116. * possibly for node k itself (which may have a nextExecutionTime greater
  117. * than its children's).
  118. *
  119. * This method functions by "demoting" queue[k] down the hierarchy
  120. * (by swapping it with its smaller child) repeatedly until queue[k]'s
  121. * nextExecutionTime is less than or equal to those of its children.
  122. */
  123. private void fixDown(int k) {
  124. int j;
  125. while ((j = k << 1) <= size && j > 0) {
  126. if (j < size &&
  127. queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)
  128. j++; // j indexes smallest kid
  129. if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)
  130. break;
  131. TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;
  132. k = j;
  133. }
  134. }
  135. /**
  136. * Establishes the heap invariant (described above) in the entire tree,
  137. * assuming nothing about the order of the elements prior to the call.
  138. */
  139. void heapify() {
  140. for (int i = size/2; i >= 1; i--)
  141. fixDown(i);
  142. }
  143. }

以上是jdk自己实现的一个堆结构,可以参考一下。

最好,来看一下TimerThread,TimerThread本质上是一个任务调度线程。首先从TaskQueue里面获取排在最前面的任务,然后判断它是否能到达任务执行时间点,如果已经到达,就立即执行任务。

class TimerThread extends Thread {

    boolean newTasksMayBeScheduled = true;

    private TaskQueue queue;

    TimerThread(TaskQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            mainLoop();
        } finally {
            // Someone killed this Thread, behave as if Timer cancelled
            synchronized(queue) {
                newTasksMayBeScheduled = false;
                queue.clear();  // Eliminate obsolete references
            }
        }
    }

    /**
     * The main timer loop.  (See class comment.)
     */
    private void mainLoop() {
        while (true) {
            try {
                TimerTask task;
                boolean taskFired;
                synchronized(queue) {
                    // Wait for queue to become non-empty
                    while (queue.isEmpty() && newTasksMayBeScheduled)
                        queue.wait();
                    if (queue.isEmpty())
                        break; // Queue is empty and will forever remain; die

                    // Queue nonempty; look at first evt and do the right thing
                    long currentTime, executionTime;
                    task = queue.getMin();
                    synchronized(task.lock) {
                        if (task.state == TimerTask.CANCELLED) {
                            queue.removeMin();
                            continue;  // No action required, poll queue again
                        }
                        currentTime = System.currentTimeMillis();
                        executionTime = task.nextExecutionTime;
                        if (taskFired = (executionTime<=currentTime)) {
                            if (task.period == 0) { // Non-repeating, remove
                                queue.removeMin();
                                task.state = TimerTask.EXECUTED;
                            } else { // Repeating task, reschedule
                                queue.rescheduleMin(
                                  task.period<0 ? currentTime   - task.period
                                                : executionTime + task.period);
                            }
                        }
                    }
                    if (!taskFired) // Task hasn't yet fired; wait
                        queue.wait(executionTime - currentTime);
                }
                if (taskFired)  // Task fired; run it, holding no locks
                    task.run();
            } catch(InterruptedException e) {
            }
        }
    }
}

利用最小堆实现的方案,相比while+sleep方法,多了一个线程来管理所有的任务,优点是减少线程之间的性能开销,提升了执行效率,但是带来一些缺点,新任务写入的效率变为O(logn)

同时,个人觉得这个方案的缺点还有:

  • 串行阻塞:调度线程只有一个,长任务会阻塞短任务的执行,例如A
    跑了一分钟,B任务需要执行的话,会存在等待的情况。
  • 容错性能差:因为单线程执行,且缺少异常处理机制,一旦前面任务出现异常,后续的任务会阻塞住。

3.ScheduledThreadPoolExecutor

scheduledThreadPoolExecutor基于线程池来做定时策略。其设计思想是,每一个被调度的任务都会由线程池来管理执行,因此,任务是并发执行的,相互之间不受干扰。需要注意的是,只有当任务的执行时间到来时,ScheduledThreadPoolExecutor才会真正启动一个线程,其余时间ScheduledThreadPoolExecutor都是在轮询任务的状态。

举一个简单的例子

public static void main(String[] args) {
    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(3);
    //启动1秒之后,每隔1秒执行一次
    executor.scheduleAtFixedRate((new Runnable() {
        @Override
        public void run() {
            System.out.println("test3");
        }
    }),1,1, TimeUnit.SECONDS);
    //启动1秒之后,每隔3秒执行一次
    executor.scheduleAtFixedRate((new Runnable() {
        @Override
        public void run() {
            System.out.println("test4");
        }
    }),1,3, TimeUnit.SECONDS);
}

executor通过scheduleAtFixedRate方法进行调度执行。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

从源码上看,其首先进行参数校验,之后,将任务封装到ScheduledFutureTask线程中,ScheduleFutureTask继承自RunnableScheduledFuture,并作为参数调用delayedExecute()方法进行预处理。

继续看delayedExecute()方法

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            //预处理
            ensurePrestart();
    }
}

当线程未关闭的时候,会通过super.getQueue().add(task)操作,将任务加入到队列中,同时调用ersurePrestart()方法做预处理。

其中,super.getQueue()得到的是一个自定义的DelayedWorkQueue阻塞队列,数据存储方面也是一个最小堆结构的队列。这点在初始化secheduledPoolExecutor的时候,也可以看出来。

static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {

    private static final int INITIAL_CAPACITY = 16;
    private RunnableScheduledFuture<?>[] queue =
        new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
    private final ReentrantLock lock = new ReentrantLock();
    private int size = 0;   

    //....

    public boolean add(Runnable e) {
        return offer(e);
    }

    public boolean offer(Runnable x) {
        if (x == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = size;
            if (i >= queue.length)
                grow();
            size = i + 1;
            if (i == 0) {
                queue[0] = e;
                setIndex(e, 0);
            } else {
                siftUp(i, e);
            }
            if (queue[0] == e) {
                leader = null;
                available.signal();
            }
        } finally {
            lock.unlock();
        }
        return true;
    }

    public RunnableScheduledFuture<?> take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                RunnableScheduledFuture<?> first = queue[0];
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return finishPoll(first);
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && queue[0] != null)
                available.signal();
            lock.unlock();
        }
    }
}

从源码可以看出,DelayedWorkQueue其实是ScheduledPoolExecutor的一个静态内部类,在添加的时候,会将任务添加到RunnableScheduledFuture数组中,同时,线程池中的worker线程会通过调用任务队列中的take方法获取对应的scheduledFutureTask线程任务,接着执行对应的线程任务。

ScheduledFutureTask任务线程,才是真正执行任务的线程类,只是绕了一圈,做了很多包装,run()方法就是真正执行定时任务的方法。

private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {

    /** Sequence number to break ties FIFO */
    private final long sequenceNumber;

    /** The time the task is enabled to execute in nanoTime units */
    private long time;

    /**
     * Period in nanoseconds for repeating tasks.  A positive
     * value indicates fixed-rate execution.  A negative value
     * indicates fixed-delay execution.  A value of 0 indicates a
     * non-repeating task.
     */
    private final long period;

    /** The actual task to be re-enqueued by reExecutePeriodic */
    RunnableScheduledFuture<V> outerTask = this;

    /**
     * Overrides FutureTask version so as to reset/requeue if periodic.
     */
    public void run() {
        boolean periodic = isPeriodic();
        if (!canRunInCurrentRunState(periodic))
            cancel(false);
        else if (!periodic)
            ScheduledFutureTask.super.run();
        else if (ScheduledFutureTask.super.runAndReset()) {
            setNextRunTime();
            reExecutePeriodic(outerTask);
        }
    }

    //...
}

ScheduledExecutorService 相比 Timer 定时器,完美的解决上面说到的 Timer 存在的两个缺点!

在单体应用里面,使用 ScheduledExecutorService 可以解决大部分需要使用定时任务的业务需求!

但是这是否意味着它是最佳的解决方案呢?

我们发现线程池中 ScheduledExecutorService 的排序容器跟 Timer 一样,都是采用最小堆的存储结构,新任务加入排序效率是O(log(n)),执行取任务是O(1)。

这里的写入排序效率其实是有空间可提升的,有可能优化到O(1)的时间复杂度,也就是我们下面要介绍的时间轮实现!

4.时间轮实现


时间轮,从数据结构上看,是一个循环队列。实际上,可以通过一个环形数组实现。

image.png
插入、取值流程:

1.当我们需要新建一个 1s 延时任务的时候,则只需要将它放到下标为 1 的那个槽中,2、3、…、7也同样如此。
2.而如果是新建一个 10s 的延时任务,则需要将它放到下标为 2 的槽中,但同时需要记录它所对应的圈数,也就是 1 圈,不然就和 2 秒的延时消息重复了
3.当创建一个 21s 的延时任务时,它所在的位置就在下标为 5 的槽中,同样的需要为他加上圈数为 2,依次类推…
因此,总结起来有两个核心的变量:

  • 数组下标:表示某个任务延迟时间,从数据操作上对执行时间点进行取余
  • 圈数:表示需要循环圈数
    通过这张图可以更直观的理解!
    image.png

当我们需要取出延时任务时,只需要每秒往下移动这个指针,然后取出该位置的所有任务即可,取任务的时间消耗为O(1)。

当我们需要插入任务式,也只需要计算出对应的下表和圈数,即可将任务插入到对应的数组位置中,插入任务的时间消耗为O(1)。

如果时间轮的槽比较少,会导致某一个槽上的任务非常多,那么效率也比较低,这就和 HashMap 的 hash 冲突是一样的,因此在设计槽的时候不能太大也不能太小。

代码实现如下:

public class RingBufferWheel {

    private Logger logger = LoggerFactory.getLogger(RingBufferWheel.class);


    /**
     * default ring buffer size
     */
    private static final int STATIC_RING_SIZE = 64;

    private Object[] ringBuffer;

    private int bufferSize;

    /**
     * business thread pool
     */
    private ExecutorService executorService;

    private volatile int size = 0;

    /***
     * task stop sign
     */
    private volatile boolean stop = false;

    /**
     * task start sign
     */
    private volatile AtomicBoolean start = new AtomicBoolean(false);

    /**
     * total tick times
     */
    private AtomicInteger tick = new AtomicInteger();

    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    private AtomicInteger taskId = new AtomicInteger();
    private Map<Integer, Task> taskMap = new ConcurrentHashMap<>(16);

    /**
     * Create a new delay task ring buffer by default size
     *
     * @param executorService the business thread pool
     */
    public RingBufferWheel(ExecutorService executorService) {
        this.executorService = executorService;
        this.bufferSize = STATIC_RING_SIZE;
        this.ringBuffer = new Object[bufferSize];
    }


    /**
     * Create a new delay task ring buffer by custom buffer size
     *
     * @param executorService the business thread pool
     * @param bufferSize      custom buffer size
     */
    public RingBufferWheel(ExecutorService executorService, int bufferSize) {
        this(executorService);

        if (!powerOf2(bufferSize)) {
            throw new RuntimeException("bufferSize=[" + bufferSize + "] must be a power of 2");
        }
        this.bufferSize = bufferSize;
        this.ringBuffer = new Object[bufferSize];
    }

    /**
     * Add a task into the ring buffer(thread safe)
     *
     * @param task business task extends {@link Task}
     */
    public int addTask(Task task) {
        int key = task.getKey();
        int id;

        try {
            lock.lock();
            int index = mod(key, bufferSize);
            task.setIndex(index);
            Set<Task> tasks = get(index);

            int cycleNum = cycleNum(key, bufferSize);
            if (tasks != null) {
                task.setCycleNum(cycleNum);
                tasks.add(task);
            } else {
                task.setIndex(index);
                task.setCycleNum(cycleNum);
                Set<Task> sets = new HashSet<>();
                sets.add(task);
                put(key, sets);
            }
            id = taskId.incrementAndGet();
            task.setTaskId(id);
            taskMap.put(id, task);
            size++;
        } finally {
            lock.unlock();
        }

        start();

        return id;
    }


    /**
     * Cancel task by taskId
     * @param id unique id through {@link #addTask(Task)}
     * @return
     */
    public boolean cancel(int id) {

        boolean flag = false;
        Set<Task> tempTask = new HashSet<>();

        try {
            lock.lock();
            Task task = taskMap.get(id);
            if (task == null) {
                return false;
            }

            Set<Task> tasks = get(task.getIndex());
            for (Task tk : tasks) {
                if (tk.getKey() == task.getKey() && tk.getCycleNum() == task.getCycleNum()) {
                    size--;
                    flag = true;
                    taskMap.remove(id);
                } else {
                    tempTask.add(tk);
                }

            }
            //update origin data
            ringBuffer[task.getIndex()] = tempTask;
        } finally {
            lock.unlock();
        }

        return flag;
    }

    /**
     * Thread safe
     *
     * @return the size of ring buffer
     */
    public int taskSize() {
        return size;
    }

    /**
     * Same with method {@link #taskSize}
     * @return
     */
    public int taskMapSize(){
        return taskMap.size();
    }

    /**
     * Start background thread to consumer wheel timer, it will always run until you call method {@link #stop}
     */
    public void start() {
        if (!start.get()) {

            if (start.compareAndSet(start.get(), true)) {
                logger.info("Delay task is starting");
                Thread job = new Thread(new TriggerJob());
                job.setName("consumer RingBuffer thread");
                job.start();
                start.set(true);
            }

        }
    }

    /**
     * Stop consumer ring buffer thread
     *
     * @param force True will force close consumer thread and discard all pending tasks
     *              otherwise the consumer thread waits for all tasks to completes before closing.
     */
    public void stop(boolean force) {
        if (force) {
            logger.info("Delay task is forced stop");
            stop = true;
            executorService.shutdownNow();
        } else {
            logger.info("Delay task is stopping");
            if (taskSize() > 0) {
                try {
                    lock.lock();
                    condition.await();
                    stop = true;
                } catch (InterruptedException e) {
                    logger.error("InterruptedException", e);
                } finally {
                    lock.unlock();
                }
            }
            executorService.shutdown();
        }


    }


    private Set<Task> get(int index) {
        return (Set<Task>) ringBuffer[index];
    }

    private void put(int key, Set<Task> tasks) {
        int index = mod(key, bufferSize);
        ringBuffer[index] = tasks;
    }

    /**
     * Remove and get task list.
     * @param key
     * @return task list
     */
    private Set<Task> remove(int key) {
        Set<Task> tempTask = new HashSet<>();
        Set<Task> result = new HashSet<>();

        Set<Task> tasks = (Set<Task>) ringBuffer[key];
        if (tasks == null) {
            return result;
        }

        for (Task task : tasks) {
            if (task.getCycleNum() == 0) {
                result.add(task);

                size2Notify();
            } else {
                // decrement 1 cycle number and update origin data
                task.setCycleNum(task.getCycleNum() - 1);
                tempTask.add(task);
            }
            // remove task, and free the memory.
            taskMap.remove(task.getTaskId());
        }

        //update origin data
        ringBuffer[key] = tempTask;

        return result;
    }

    private void size2Notify() {
        try {
            lock.lock();
            size--;
            if (size == 0) {
                condition.signal();
            }
        } finally {
            lock.unlock();
        }
    }

    private boolean powerOf2(int target) {
        if (target < 0) {
            return false;
        }
        int value = target & (target - 1);
        if (value != 0) {
            return false;
        }

        return true;
    }

    private int mod(int target, int mod) {
        // equals target % mod
        target = target + tick.get();
        return target & (mod - 1);
    }

    private int cycleNum(int target, int mod) {
        //equals target/mod
        return target >> Integer.bitCount(mod - 1);
    }

    /**
     * An abstract class used to implement business.
     */
    public abstract static class Task extends Thread {

        private int index;

        private int cycleNum;

        private int key;

        /**
         * The unique ID of the task
         */
        private int taskId ;

        @Override
        public void run() {
        }

        public int getKey() {
            return key;
        }

        /**
         *
         * @param key Delay time(seconds)
         */
        public void setKey(int key) {
            this.key = key;
        }

        public int getCycleNum() {
            return cycleNum;
        }

        private void setCycleNum(int cycleNum) {
            this.cycleNum = cycleNum;
        }

        public int getIndex() {
            return index;
        }

        private void setIndex(int index) {
            this.index = index;
        }

        public int getTaskId() {
            return taskId;
        }

        public void setTaskId(int taskId) {
            this.taskId = taskId;
        }
    }


    private class TriggerJob implements Runnable {

        @Override
        public void run() {
            int index = 0;
            while (!stop) {
                try {
                    Set<Task> tasks = remove(index);
                    for (Task task : tasks) {
                        executorService.submit(task);
                    }

                    if (++index > bufferSize - 1) {
                        index = 0;
                    }

                    //Total tick number of records
                    tick.incrementAndGet();
                    TimeUnit.SECONDS.sleep(1);

                } catch (Exception e) {
                    logger.error("Exception", e);
                }

            }

            logger.info("Delay task has stopped");
        }
    }
}

紧接着,编写一个客户端实现

public static void main(String[] args) {
    RingBufferWheel ringBufferWheel = new RingBufferWheel( Executors.newFixedThreadPool(2));
    for (int i = 0; i < 3; i++) {
        RingBufferWheel.Task job = new Job();
        job.setKey(i);
        ringBufferWheel.addTask(job);
    }
}

public static class Job extends RingBufferWheel.Task{
    @Override
    public void run() {
        System.out.println("test5");
    }
}

如果要周期性执行任务,可以在任务执行完成之后,再重新加入到时间轮中。

时间轮的应用还是非常广的,例如在 Disruptor 项目中就运用到了 RingBuffer,还有Netty中的HashedWheelTimer工具原理也差不多等等,有兴趣的同学,可以阅读一下官方对应的源码!