Spring 定时任务

EnableScheduling

  • 首先关注的类为启动定时任务的注解@EnableScheduling
  1. @Target(ElementType.TYPE)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Import(SchedulingConfiguration.class)
  4. @Documented
  5. public @interface EnableScheduling {
  6. }

SchedulingConfiguration

  • 注册定时任务相关信息
  1. @Configuration
  2. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  3. public class SchedulingConfiguration {
  4. /**
  5. * 开启定时任务
  6. * @return
  7. */
  8. @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
  9. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  10. public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
  11. // 注册 ScheduledAnnotationBeanPostProcessor
  12. return new ScheduledAnnotationBeanPostProcessor();
  13. }
  14. }

ScheduledAnnotationBeanPostProcessor

  • 关注 application 事件,以及 spring 生命周期相关的接口实现
  1. /**
  2. * application 事件
  3. * @param event the event to respond to
  4. */
  5. @Override
  6. public void onApplicationEvent(ContextRefreshedEvent event) {
  7. if (event.getApplicationContext() == this.applicationContext) {
  8. // Running in an ApplicationContext -> register tasks this late...
  9. // giving other ContextRefreshedEvent listeners a chance to perform
  10. // their work at the same time (e.g. Spring Batch's job registration).
  11. // 注册定时任务
  12. finishRegistration();
  13. }
  14. }
  1. @Override
  2. public Object postProcessAfterInitialization(Object bean, String beanName) {
  3. if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
  4. bean instanceof ScheduledExecutorService) {
  5. // Ignore AOP infrastructure such as scoped proxies.
  6. return bean;
  7. }
  8. // 当前类
  9. Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
  10. if (!this.nonAnnotatedClasses.contains(targetClass)) {
  11. // 方法扫描,存在 Scheduled、Schedules 注解的全部扫描
  12. Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
  13. (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
  14. Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
  15. method, Scheduled.class, Schedules.class);
  16. return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
  17. });
  18. if (annotatedMethods.isEmpty()) {
  19. this.nonAnnotatedClasses.add(targetClass);
  20. if (logger.isTraceEnabled()) {
  21. logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
  22. }
  23. }
  24. else {
  25. // Non-empty set of methods
  26. annotatedMethods.forEach((method, scheduledMethods) ->
  27. // 处理 scheduled 相关信息
  28. scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
  29. if (logger.isTraceEnabled()) {
  30. logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
  31. "': " + annotatedMethods);
  32. }
  33. }
  34. }
  35. return bean;
  36. }
  • 处理定时任务注解
  1. protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
  2. try {
  3. Runnable runnable = createRunnable(bean, method);
  4. boolean processedSchedule = false;
  5. String errorMessage =
  6. "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
  7. Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
  8. // Determine initial delay
  9. // 是否延迟执行
  10. long initialDelay = scheduled.initialDelay();
  11. // 延迟执行时间
  12. String initialDelayString = scheduled.initialDelayString();
  13. // 是否有延迟执行的时间
  14. if (StringUtils.hasText(initialDelayString)) {
  15. Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
  16. if (this.embeddedValueResolver != null) {
  17. initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
  18. }
  19. if (StringUtils.hasLength(initialDelayString)) {
  20. try {
  21. initialDelay = parseDelayAsLong(initialDelayString);
  22. }
  23. catch (RuntimeException ex) {
  24. throw new IllegalArgumentException(
  25. "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
  26. }
  27. }
  28. }
  29. // Check cron expression
  30. // 获取cron表达式
  31. String cron = scheduled.cron();
  32. // cron表达式是否存在
  33. if (StringUtils.hasText(cron)) {
  34. // 获取时区
  35. String zone = scheduled.zone();
  36. if (this.embeddedValueResolver != null) {
  37. // 字符串转换
  38. cron = this.embeddedValueResolver.resolveStringValue(cron);
  39. zone = this.embeddedValueResolver.resolveStringValue(zone);
  40. }
  41. if (StringUtils.hasLength(cron)) {
  42. // cron 是否延迟
  43. Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
  44. processedSchedule = true;
  45. if (!Scheduled.CRON_DISABLED.equals(cron)) {
  46. TimeZone timeZone;
  47. if (StringUtils.hasText(zone)) {
  48. // 时区解析
  49. timeZone = StringUtils.parseTimeZoneString(zone);
  50. }
  51. else {
  52. // 默认时区获取
  53. timeZone = TimeZone.getDefault();
  54. }
  55. // 创建任务
  56. tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
  57. }
  58. }
  59. }
  60. // At this point we don't need to differentiate between initial delay set or not anymore
  61. if (initialDelay < 0) {
  62. initialDelay = 0;
  63. }
  64. // Check fixed delay
  65. // 获取间隔调用时间
  66. long fixedDelay = scheduled.fixedDelay();
  67. // 间隔时间>0
  68. if (fixedDelay >= 0) {
  69. Assert.isTrue(!processedSchedule, errorMessage);
  70. processedSchedule = true;
  71. // 创建任务,间隔时间定时任务
  72. tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
  73. }
  74. // 延迟时间
  75. String fixedDelayString = scheduled.fixedDelayString();
  76. if (StringUtils.hasText(fixedDelayString)) {
  77. if (this.embeddedValueResolver != null) {
  78. fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
  79. }
  80. if (StringUtils.hasLength(fixedDelayString)) {
  81. Assert.isTrue(!processedSchedule, errorMessage);
  82. processedSchedule = true;
  83. try {
  84. fixedDelay = parseDelayAsLong(fixedDelayString);
  85. }
  86. catch (RuntimeException ex) {
  87. throw new IllegalArgumentException(
  88. "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
  89. }
  90. // 创建延迟时间任务
  91. tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
  92. }
  93. }
  94. // Check fixed rate
  95. // 获取调用频率
  96. long fixedRate = scheduled.fixedRate();
  97. if (fixedRate >= 0) {
  98. Assert.isTrue(!processedSchedule, errorMessage);
  99. processedSchedule = true;
  100. // 创建调用频率的定时任务
  101. tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
  102. }
  103. String fixedRateString = scheduled.fixedRateString();
  104. if (StringUtils.hasText(fixedRateString)) {
  105. if (this.embeddedValueResolver != null) {
  106. fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
  107. }
  108. if (StringUtils.hasLength(fixedRateString)) {
  109. Assert.isTrue(!processedSchedule, errorMessage);
  110. processedSchedule = true;
  111. try {
  112. fixedRate = parseDelayAsLong(fixedRateString);
  113. }
  114. catch (RuntimeException ex) {
  115. throw new IllegalArgumentException(
  116. "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
  117. }
  118. tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
  119. }
  120. }
  121. // Check whether we had any attribute set
  122. Assert.isTrue(processedSchedule, errorMessage);
  123. // Finally register the scheduled tasks
  124. synchronized (this.scheduledTasks) {
  125. // 定时任务注册
  126. Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
  127. regTasks.addAll(tasks);
  128. }
  129. }
  130. catch (IllegalArgumentException ex) {
  131. throw new IllegalStateException(
  132. "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
  133. }
  134. }

定时任务

  • CronTask
    • cron 定时任务
  • FixedDelayTask
    • 间隔时间的定时任务
  • FixedRateTask
    • 调用频率的定时任务
  • ScheduledTask
    • 定时任务对象

cron 表达式解析

  • org.springframework.scheduling.support.CronSequenceGenerator.doParse
  1. private void doParse(String[] fields) {
  2. setNumberHits(this.seconds, fields[0], 0, 60);
  3. setNumberHits(this.minutes, fields[1], 0, 60);
  4. setNumberHits(this.hours, fields[2], 0, 24);
  5. setDaysOfMonth(this.daysOfMonth, fields[3]);
  6. setMonths(this.months, fields[4]);
  7. setDays(this.daysOfWeek, replaceOrdinals(fields[5], "SUN,MON,TUE,WED,THU,FRI,SAT"), 8);
  8. if (this.daysOfWeek.get(7)) {
  9. // Sunday can be represented as 0 or 7
  10. this.daysOfWeek.set(0);
  11. this.daysOfWeek.clear(7);
  12. }
  13. }

执行定时任务

  • 这里以 CronTask 任务进行分析,其他定时任务同理
    • org.springframework.scheduling.config.ScheduledTaskRegistrar.scheduleCronTask
  1. @Nullable
  2. public ScheduledTask scheduleCronTask(CronTask task) {
  3. // 从未执行的任务列表中删除,并且获取这个任务
  4. ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
  5. boolean newTask = false;
  6. // 没有这个任务
  7. if (scheduledTask == null) {
  8. scheduledTask = new ScheduledTask(task);
  9. newTask = true;
  10. }
  11. // 任务调度器是否为空
  12. if (this.taskScheduler != null) {
  13. scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
  14. }
  15. else {
  16. // 添加到cron任务列表
  17. addCronTask(task);
  18. // 保存到没有执行的任务中
  19. this.unresolvedTasks.put(task, scheduledTask);
  20. }
  21. return (newTask ? scheduledTask : null);
  22. }