Java 定时任务
定时任务主要分单机分布式两大类。从单机角度,定时任务实现主要有以下 3 种方案:

  • while + sleep 组合
  • 最小堆实现
  • 时间轮实现


    while+sleep 方案,简单的说,就是定义一个线程,然后 while 循环,通过 sleep 延迟时间来达到周期性调度任务。
    1. public static void main(String[] args) {
    2. final long timeInterval = 5000;
    3. new Thread(new Runnable() {
    4. @Override
    5. public void run() {
    6. while (true) {
    7. System.out.println(Thread.currentThread().getName() + "每隔5秒执行一次");
    8. try {
    9. Thread.sleep(timeInterval);
    10. } catch (InterruptedException e) {
    11. e.printStackTrace();
    12. }
    13. }
    14. }
    15. }).start();
    16. }
    正因此,JDK 中的 Timer 定时器由此诞生了!


    所谓最小堆方案,每当有新任务加入的时候,会把需要即将要执行的任务排到前面,同时会有一个线程不断的轮询判断,如果当前某个任务已经到达执行时间点,就会立即执行,具体实现代表就是 JDK 中的 Timer 定时器!


    首先来一个简单的 Timer 定时器例子
    1. public static void main(String[] args) {
    2. Timer timer = new Timer();
    3. //每隔1秒调用一次
    4. timer.schedule(new TimerTask() {
    5. @Override
    6. public void run() {
    7. System.out.println("test1");
    8. }
    9. }, 1000, 1000);
    10. //每隔3秒调用一次
    11. timer.schedule(new TimerTask() {
    12. @Override
    13. public void run() {
    14. System.out.println("test2");
    15. }
    16. }, 3000, 3000);
    17. }
    实现上,好像跟上面介绍的 while+sleep 方案差不多,同样也是起一个TimerTask线程任务,只不过共用一个Timer调度器。


  1. public void schedule(TimerTask task, long delay, long period) {
  2. if (delay < 0)
  3. throw new IllegalArgumentException("Negative delay.");
  4. if (period <= 0)
  5. throw new IllegalArgumentException("Non-positive period.");
  6. sched(task, System.currentTimeMillis()+delay, -period);
  7. }



  1. private void sched(TimerTask task, long time, long period) {
  2. if (time < 0)
  3. throw new IllegalArgumentException("Illegal execution time.");
  4. // Constrain value of period sufficiently to prevent numeric
  5. // overflow while still being effectively infinitely large.
  6. if (Math.abs(period) > (Long.MAX_VALUE >> 1))
  7. period >>= 1;
  8. synchronized(queue) {
  9. if (!thread.newTasksMayBeScheduled)
  10. throw new IllegalStateException("Timer already cancelled.");
  11. synchronized(task.lock) {
  12. if (task.state != TimerTask.VIRGIN)
  13. throw new IllegalStateException(
  14. "Task already scheduled or cancelled");
  15. task.nextExecutionTime = time;
  16. task.period = period;
  17. task.state = TimerTask.SCHEDULED;
  18. }
  19. queue.add(task);
  20. if (queue.getMin() == task)
  21. queue.notify();
  22. }
  23. }



  1. public class Timer {
  2. private final TaskQueue queue = new TaskQueue();
  3. private final TimerThread thread = new TimerThread(queue);
  4. public Timer() {
  5. this("Timer-" + serialNumber());
  6. }
  7. public Timer(String name) {
  8. thread.setName(name);
  9. thread.start();
  10. }
  11. //...
  12. }



  1. class TaskQueue {
  2. private TimerTask[] queue = new TimerTask[128];
  3. private int size = 0;
  4. void add(TimerTask task) {
  5. // Grow backing store if necessary
  6. if (size + 1 == queue.length)
  7. queue = Arrays.copyOf(queue, 2*queue.length);
  8. queue[++size] = task;
  9. fixUp(size);
  10. }
  11. private void fixUp(int k) {
  12. while (k > 1) {
  13. int j = k >> 1;
  14. if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
  15. break;
  16. TimerTask tmp = queue[j];
  17. queue[j] = queue[k];
  18. queue[k] = tmp;
  19. k = j;
  20. }
  21. }
  22. //....
  23. }



  1. class TimerThread extends Thread {
  2. boolean newTasksMayBeScheduled = true;
  3. private TaskQueue queue;
  4. TimerThread(TaskQueue queue) {
  5. this.queue = queue;
  6. }
  7. public void run() {
  8. try {
  9. mainLoop();
  10. } finally {
  11. // Someone killed this Thread, behave as if Timer cancelled
  12. synchronized(queue) {
  13. newTasksMayBeScheduled = false;
  14. queue.clear(); // Eliminate obsolete references
  15. }
  16. }
  17. }
  18. /**
  19. * The main timer loop. (See class comment.)
  20. */
  21. private void mainLoop() {
  22. while (true) {
  23. try {
  24. TimerTask task;
  25. boolean taskFired;
  26. synchronized(queue) {
  27. // Wait for queue to become non-empty
  28. while (queue.isEmpty() && newTasksMayBeScheduled)
  29. queue.wait();
  30. if (queue.isEmpty())
  31. break; // Queue is empty and will forever remain; die
  32. // Queue nonempty; look at first evt and do the right thing
  33. long currentTime, executionTime;
  34. task = queue.getMin();
  35. synchronized(task.lock) {
  36. if (task.state == TimerTask.CANCELLED) {
  37. queue.removeMin();
  38. continue; // No action required, poll queue again
  39. }
  40. currentTime = System.currentTimeMillis();
  41. executionTime = task.nextExecutionTime;
  42. if (taskFired = (executionTime<=currentTime)) {
  43. if (task.period == 0) { // Non-repeating, remove
  44. queue.removeMin();
  45. task.state = TimerTask.EXECUTED;
  46. } else { // Repeating task, reschedule
  47. queue.rescheduleMin(
  48. task.period<0 ? currentTime - task.period
  49. : executionTime + task.period);
  50. }
  51. }
  52. }
  53. if (!taskFired) // Task hasn't yet fired; wait
  54. queue.wait(executionTime - currentTime);
  55. }
  56. if (taskFired) // Task fired; run it, holding no locks
  58. } catch(InterruptedException e) {
  59. }
  60. }
  61. }
  62. }

总结这个利用最小堆实现的方案,相比 while + sleep 方案,多了一个线程来管理所有的任务,优点就是减少了线程之间的性能开销,提升了执行效率;但是同样也带来的了一些缺点,整体的新加任务写入效率变成了 O(log(n))。

  • 串行阻塞:调度线程只有一个,长任务会阻塞短任务的执行,例如,A任务跑了一分钟,B任务至少需要等1分钟才能跑
  • 容错能力差:没有异常处理能力,一旦一个任务执行故障,后续任务都无法执行


    鉴于 Timer 的上述缺陷,从 Java 5 开始,推出了基于线程池设计的 ScheduledThreadPoolExecutor 。
    其设计思想是,每一个被调度的任务都会由线程池来管理执行,因此任务是并发执行的,相互之间不会受到干扰。需要注意的是,只有当任务的执行时间到来时,ScheduledThreadPoolExecutor 才会真正启动一个线程,其余时间 ScheduledThreadPoolExecutor 都是在轮询任务的状态。

    1. public static void main(String[] args) {
    2. ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(3);
    3. //启动1秒之后,每隔1秒执行一次
    4. executor.scheduleAtFixedRate((new Runnable() {
    5. @Override
    6. public void run() {
    7. System.out.println("test3");
    8. }
    9. }),1,1, TimeUnit.SECONDS);
    10. //启动1秒之后,每隔3秒执行一次
    11. executor.scheduleAtFixedRate((new Runnable() {
    12. @Override
    13. public void run() {
    14. System.out.println("test4");
    15. }
    16. }),1,3, TimeUnit.SECONDS);
    17. }


  • 进入scheduleAtFixedRate()方法


  1. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
  2. long initialDelay,
  3. long period,
  4. TimeUnit unit) {
  5. if (command == null || unit == null)
  6. throw new NullPointerException();
  7. if (period <= 0)
  8. throw new IllegalArgumentException();
  9. ScheduledFutureTask<Void> sft =
  10. new ScheduledFutureTask<Void>(command,
  11. null,
  12. triggerTime(initialDelay, unit),
  13. unit.toNanos(period));
  14. RunnableScheduledFuture<Void> t = decorateTask(command, sft);
  15. sft.outerTask = t;
  16. delayedExecute(t);
  17. return t;
  18. }
  • 继续看delayedExecute()方法


  1. private void delayedExecute(RunnableScheduledFuture<?> task) {
  2. if (isShutdown())
  3. reject(task);
  4. else {
  5. super.getQueue().add(task);
  6. if (isShutdown() &&
  7. !canRunInCurrentRunState(task.isPeriodic()) &&
  8. remove(task))
  9. task.cancel(false);
  10. else
  11. //预处理
  12. ensurePrestart();
  13. }
  14. }

其中super.getQueue()得到的是一个自定义的new DelayedWorkQueue()阻塞队列,数据存储方面也是一个最小堆结构的队列,这一点在初始化new ScheduledThreadPoolExecutor()的时候,可以看出!

  1. public ScheduledThreadPoolExecutor(int corePoolSize) {
  2. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  3. new DelayedWorkQueue());
  4. }


  1. static class DelayedWorkQueue extends AbstractQueue<Runnable>
  2. implements BlockingQueue<Runnable> {
  3. private static final int INITIAL_CAPACITY = 16;
  4. private RunnableScheduledFuture<?>[] queue =
  5. new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
  6. private final ReentrantLock lock = new ReentrantLock();
  7. private int size = 0;
  8. //....
  9. public boolean add(Runnable e) {
  10. return offer(e);
  11. }
  12. public boolean offer(Runnable x) {
  13. if (x == null)
  14. throw new NullPointerException();
  15. RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
  16. final ReentrantLock lock = this.lock;
  17. lock.lock();
  18. try {
  19. int i = size;
  20. if (i >= queue.length)
  21. grow();
  22. size = i + 1;
  23. if (i == 0) {
  24. queue[0] = e;
  25. setIndex(e, 0);
  26. } else {
  27. siftUp(i, e);
  28. }
  29. if (queue[0] == e) {
  30. leader = null;
  31. available.signal();
  32. }
  33. } finally {
  34. lock.unlock();
  35. }
  36. return true;
  37. }
  38. public RunnableScheduledFuture<?> take() throws InterruptedException {
  39. final ReentrantLock lock = this.lock;
  40. lock.lockInterruptibly();
  41. try {
  42. for (;;) {
  43. RunnableScheduledFuture<?> first = queue[0];
  44. if (first == null)
  45. available.await();
  46. else {
  47. long delay = first.getDelay(NANOSECONDS);
  48. if (delay <= 0)
  49. return finishPoll(first);
  50. first = null; // don't retain ref while waiting
  51. if (leader != null)
  52. available.await();
  53. else {
  54. Thread thisThread = Thread.currentThread();
  55. leader = thisThread;
  56. try {
  57. available.awaitNanos(delay);
  58. } finally {
  59. if (leader == thisThread)
  60. leader = null;
  61. }
  62. }
  63. }
  64. }
  65. } finally {
  66. if (leader == null && queue[0] != null)
  67. available.signal();
  68. lock.unlock();
  69. }
  70. }
  71. }
  • 回到最开始说到的ScheduledFutureTask任务线程类,最终执行任务的其实就是它


  1. private class ScheduledFutureTask<V>
  2. extends FutureTask<V> implements RunnableScheduledFuture<V> {
  3. /** Sequence number to break ties FIFO */
  4. private final long sequenceNumber;
  5. /** The time the task is enabled to execute in nanoTime units */
  6. private long time;
  7. /**
  8. * Period in nanoseconds for repeating tasks. A positive
  9. * value indicates fixed-rate execution. A negative value
  10. * indicates fixed-delay execution. A value of 0 indicates a
  11. * non-repeating task.
  12. */
  13. private final long period;
  14. /** The actual task to be re-enqueued by reExecutePeriodic */
  15. RunnableScheduledFuture<V> outerTask = this;
  16. /**
  17. * Overrides FutureTask version so as to reset/requeue if periodic.
  18. */
  19. public void run() {
  20. boolean periodic = isPeriodic();
  21. if (!canRunInCurrentRunState(periodic))
  22. cancel(false);
  23. else if (!periodic)
  25. else if (ScheduledFutureTask.super.runAndReset()) {
  26. setNextRunTime();
  27. reExecutePeriodic(outerTask);
  28. }
  29. }
  30. //...
  31. }


ScheduledExecutorService 相比 Timer 定时器,完美的解决上面说到的 Timer 存在的两个缺点!
在单体应用里面,使用 ScheduledExecutorService 可以解决大部分需要使用定时任务的业务需求!
可以发现线程池中 ScheduledExecutorService 的排序容器跟 Timer 一样,都是采用最小堆的存储结构,新任务加入排序效率是O(log(n)),执行取任务是O(1)


它其实就是一个环形的数组,如图所示,假设创建了一个长度为 8 的时间轮。

  • 1.当需要新建一个 1s 延时任务的时候,则只需要将它放到下标为 1 的那个槽中,2、3、…、7也同样如此。
  • 2.而如果是新建一个 10s 的延时任务,则需要将它放到下标为 2 的槽中,但同时需要记录它所对应的圈数,也就是 1 圈,不然就和 2 秒的延时消息重复了
  • 3.当创建一个 21s 的延时任务时,它所在的位置就在下标为 5 的槽中,同样的需要为他加上圈数为 2,依次类推…


  • 数组下标:表示某个任务延迟时间,从数据操作上对执行时间点进行取余
  • 圈数:表示需要循环圈数

如果时间轮的槽比较少,会导致某一个槽上的任务非常多,那么效率也比较低,这就和 HashMap 的 hash 冲突是一样的,因此在设计槽的时候不能太大也不能太小。


  • 首先创建一个RingBufferWheel时间轮定时任务管理器

    1. public class RingBufferWheel {
    2. private Logger logger = LoggerFactory.getLogger(RingBufferWheel.class);
    3. /**
    4. * default ring buffer size
    5. */
    6. private static final int STATIC_RING_SIZE = 64;
    7. private Object[] ringBuffer;
    8. private int bufferSize;
    9. /**
    10. * business thread pool
    11. */
    12. private ExecutorService executorService;
    13. private volatile int size = 0;
    14. /***
    15. * task stop sign
    16. */
    17. private volatile boolean stop = false;
    18. /**
    19. * task start sign
    20. */
    21. private volatile AtomicBoolean start = new AtomicBoolean(false);
    22. /**
    23. * total tick times
    24. */
    25. private AtomicInteger tick = new AtomicInteger();
    26. private Lock lock = new ReentrantLock();
    27. private Condition condition = lock.newCondition();
    28. private AtomicInteger taskId = new AtomicInteger();
    29. private Map<Integer, Task> taskMap = new ConcurrentHashMap<>(16);
    30. /**
    31. * Create a new delay task ring buffer by default size
    32. *
    33. * @param executorService the business thread pool
    34. */
    35. public RingBufferWheel(ExecutorService executorService) {
    36. this.executorService = executorService;
    37. this.bufferSize = STATIC_RING_SIZE;
    38. this.ringBuffer = new Object[bufferSize];
    39. }
    40. /**
    41. * Create a new delay task ring buffer by custom buffer size
    42. *
    43. * @param executorService the business thread pool
    44. * @param bufferSize custom buffer size
    45. */
    46. public RingBufferWheel(ExecutorService executorService, int bufferSize) {
    47. this(executorService);
    48. if (!powerOf2(bufferSize)) {
    49. throw new RuntimeException("bufferSize=[" + bufferSize + "] must be a power of 2");
    50. }
    51. this.bufferSize = bufferSize;
    52. this.ringBuffer = new Object[bufferSize];
    53. }
    54. /**
    55. * Add a task into the ring buffer(thread safe)
    56. *
    57. * @param task business task extends {@link Task}
    58. */
    59. public int addTask(Task task) {
    60. int key = task.getKey();
    61. int id;
    62. try {
    63. lock.lock();
    64. int index = mod(key, bufferSize);
    65. task.setIndex(index);
    66. Set<Task> tasks = get(index);
    67. int cycleNum = cycleNum(key, bufferSize);
    68. if (tasks != null) {
    69. task.setCycleNum(cycleNum);
    70. tasks.add(task);
    71. } else {
    72. task.setIndex(index);
    73. task.setCycleNum(cycleNum);
    74. Set<Task> sets = new HashSet<>();
    75. sets.add(task);
    76. put(key, sets);
    77. }
    78. id = taskId.incrementAndGet();
    79. task.setTaskId(id);
    80. taskMap.put(id, task);
    81. size++;
    82. } finally {
    83. lock.unlock();
    84. }
    85. start();
    86. return id;
    87. }
    88. /**
    89. * Cancel task by taskId
    90. * @param id unique id through {@link #addTask(Task)}
    91. * @return
    92. */
    93. public boolean cancel(int id) {
    94. boolean flag = false;
    95. Set<Task> tempTask = new HashSet<>();
    96. try {
    97. lock.lock();
    98. Task task = taskMap.get(id);
    99. if (task == null) {
    100. return false;
    101. }
    102. Set<Task> tasks = get(task.getIndex());
    103. for (Task tk : tasks) {
    104. if (tk.getKey() == task.getKey() && tk.getCycleNum() == task.getCycleNum()) {
    105. size--;
    106. flag = true;
    107. taskMap.remove(id);
    108. } else {
    109. tempTask.add(tk);
    110. }
    111. }
    112. //update origin data
    113. ringBuffer[task.getIndex()] = tempTask;
    114. } finally {
    115. lock.unlock();
    116. }
    117. return flag;
    118. }
    119. /**
    120. * Thread safe
    121. *
    122. * @return the size of ring buffer
    123. */
    124. public int taskSize() {
    125. return size;
    126. }
    127. /**
    128. * Same with method {@link #taskSize}
    129. * @return
    130. */
    131. public int taskMapSize(){
    132. return taskMap.size();
    133. }
    134. /**
    135. * Start background thread to consumer wheel timer, it will always run until you call method {@link #stop}
    136. */
    137. public void start() {
    138. if (!start.get()) {
    139. if (start.compareAndSet(start.get(), true)) {
    140."Delay task is starting");
    141. Thread job = new Thread(new TriggerJob());
    142. job.setName("consumer RingBuffer thread");
    143. job.start();
    144. start.set(true);
    145. }
    146. }
    147. }
    148. /**
    149. * Stop consumer ring buffer thread
    150. *
    151. * @param force True will force close consumer thread and discard all pending tasks
    152. * otherwise the consumer thread waits for all tasks to completes before closing.
    153. */
    154. public void stop(boolean force) {
    155. if (force) {
    156."Delay task is forced stop");
    157. stop = true;
    158. executorService.shutdownNow();
    159. } else {
    160."Delay task is stopping");
    161. if (taskSize() > 0) {
    162. try {
    163. lock.lock();
    164. condition.await();
    165. stop = true;
    166. } catch (InterruptedException e) {
    167. logger.error("InterruptedException", e);
    168. } finally {
    169. lock.unlock();
    170. }
    171. }
    172. executorService.shutdown();
    173. }
    174. }
    175. private Set<Task> get(int index) {
    176. return (Set<Task>) ringBuffer[index];
    177. }
    178. private void put(int key, Set<Task> tasks) {
    179. int index = mod(key, bufferSize);
    180. ringBuffer[index] = tasks;
    181. }
    182. /**
    183. * Remove and get task list.
    184. * @param key
    185. * @return task list
    186. */
    187. private Set<Task> remove(int key) {
    188. Set<Task> tempTask = new HashSet<>();
    189. Set<Task> result = new HashSet<>();
    190. Set<Task> tasks = (Set<Task>) ringBuffer[key];
    191. if (tasks == null) {
    192. return result;
    193. }
    194. for (Task task : tasks) {
    195. if (task.getCycleNum() == 0) {
    196. result.add(task);
    197. size2Notify();
    198. } else {
    199. // decrement 1 cycle number and update origin data
    200. task.setCycleNum(task.getCycleNum() - 1);
    201. tempTask.add(task);
    202. }
    203. // remove task, and free the memory.
    204. taskMap.remove(task.getTaskId());
    205. }
    206. //update origin data
    207. ringBuffer[key] = tempTask;
    208. return result;
    209. }
    210. private void size2Notify() {
    211. try {
    212. lock.lock();
    213. size--;
    214. if (size == 0) {
    215. condition.signal();
    216. }
    217. } finally {
    218. lock.unlock();
    219. }
    220. }
    221. private boolean powerOf2(int target) {
    222. if (target < 0) {
    223. return false;
    224. }
    225. int value = target & (target - 1);
    226. if (value != 0) {
    227. return false;
    228. }
    229. return true;
    230. }
    231. private int mod(int target, int mod) {
    232. // equals target % mod
    233. target = target + tick.get();
    234. return target & (mod - 1);
    235. }
    236. private int cycleNum(int target, int mod) {
    237. //equals target/mod
    238. return target >> Integer.bitCount(mod - 1);
    239. }
    240. /**
    241. * An abstract class used to implement business.
    242. */
    243. public abstract static class Task extends Thread {
    244. private int index;
    245. private int cycleNum;
    246. private int key;
    247. /**
    248. * The unique ID of the task
    249. */
    250. private int taskId ;
    251. @Override
    252. public void run() {
    253. }
    254. public int getKey() {
    255. return key;
    256. }
    257. /**
    258. *
    259. * @param key Delay time(seconds)
    260. */
    261. public void setKey(int key) {
    262. this.key = key;
    263. }
    264. public int getCycleNum() {
    265. return cycleNum;
    266. }
    267. private void setCycleNum(int cycleNum) {
    268. this.cycleNum = cycleNum;
    269. }
    270. public int getIndex() {
    271. return index;
    272. }
    273. private void setIndex(int index) {
    274. this.index = index;
    275. }
    276. public int getTaskId() {
    277. return taskId;
    278. }
    279. public void setTaskId(int taskId) {
    280. this.taskId = taskId;
    281. }
    282. }
    283. private class TriggerJob implements Runnable {
    284. @Override
    285. public void run() {
    286. int index = 0;
    287. while (!stop) {
    288. try {
    289. Set<Task> tasks = remove(index);
    290. for (Task task : tasks) {
    291. executorService.submit(task);
    292. }
    293. if (++index > bufferSize - 1) {
    294. index = 0;
    295. }
    296. //Total tick number of records
    297. tick.incrementAndGet();
    298. TimeUnit.SECONDS.sleep(1);
    299. } catch (Exception e) {
    300. logger.error("Exception", e);
    301. }
    302. }
    303."Delay task has stopped");
    304. }
    305. }
    306. }
  • 接着,编写一个客户端,测试客户端

    1. public static void main(String[] args) {
    2. RingBufferWheel ringBufferWheel = new RingBufferWheel( Executors.newFixedThreadPool(2));
    3. for (int i = 0; i < 3; i++) {
    4. RingBufferWheel.Task job = new Job();
    5. job.setKey(i);
    6. ringBufferWheel.addTask(job);
    7. }
    8. }
    9. public static class Job extends RingBufferWheel.Task{
    10. @Override
    11. public void run() {
    12. System.out.println("test5");
    13. }
    14. }


    1. test5
    2. test5
    3. test5



    时间轮的应用还是非常广的,例如在 Disruptor 项目中就运用到了 RingBuffer,还有Netty中的HashedWheelTimer工具原理也差不多。