Java SpringBoot 定时任务

一、功能说明

SpringBoot的定时任务的加强工具,实现对SpringBoot原生的定时任务进行动态管理,完全兼容原生@Scheduled注解,无需对原本的定时任务进行修改。

二、快速使用

具体的功能已经封装成SpringBoot-starter即插即用

  1. <dependency>
  2. <groupId>com.github.guoyixing</groupId>
  3. <artifactId>spring-boot-starter-super-scheduled</artifactId>
  4. <version>0.3.1</version>
  5. </dependency>

使用方法和源码:
码云:https://gitee.com/qiaodaimadewangcai/super-scheduled
github:https://github.com/guoyixing/super-scheduled

三、实现原理

1、动态管理实现

(1) 配置管理介绍

  1. @Component("superScheduledConfig")
  2. public class SuperScheduledConfig {
  3. /**
  4. * 执行定时任务的线程池
  5. */
  6. private ThreadPoolTaskScheduler taskScheduler;
  7. /**
  8. * 定时任务名称与定时任务回调钩子 的关联关系容器
  9. */
  10. private Map<String, ScheduledFuture> nameToScheduledFuture = new ConcurrentHashMap<>();
  11. /**
  12. * 定时任务名称与定时任务需要执行的逻辑 的关联关系容器
  13. */
  14. private Map<String, Runnable> nameToRunnable = new ConcurrentHashMap<>();
  15. /**
  16. * 定时任务名称与定时任务的源信息 的关联关系容器
  17. */
  18. private Map<String, ScheduledSource> nameToScheduledSource = new ConcurrentHashMap<>();
  19. /* 普通的get/sets省略 */
  20. }

(2) 使用后处理器拦截SpringBoot原本的定时任务

  • 实现ApplicationContextAware接口拿到SpringBoot的上下文
  • 实现BeanPostProcessor接口,将这个类标记为后处理器,后处理器会在每个bean实例化之后执行
  • 使用@DependsOn注解强制依赖SuperScheduledConfig类,让SpringBoot实例化SuperScheduledPostProcessor类之前先实例化SuperScheduledConfig
  • 主要实现逻辑在postProcessAfterInitialization()方法中

SpringBoot 定时任务动态管理通用解决方案 - 图1

  1. @DependsOn({"superScheduledConfig"})
  2. @Component
  3. @Order
  4. public class SuperScheduledPostProcessor implements BeanPostProcessor, ApplicationContextAware {
  5. protected final Log logger = LogFactory.getLog(getClass());
  6. private ApplicationContext applicationContext;
  7. /**
  8. * 实例化bean之前的操作
  9. * @param bean bean实例
  10. * @param beanName bean的Name
  11. */
  12. @Override
  13. public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
  14. return bean;
  15. }
  16. /**
  17. * 实例化bean之后的操作
  18. * @param bean bean实例
  19. * @param beanName bean的Name
  20. */
  21. @Override
  22. public Object postProcessAfterInitialization(Object bean,
  23. String beanName) throws BeansException {
  24. //1.获取配置管理器
  25. SuperScheduledConfig superScheduledConfig = applicationContext.getBean(SuperScheduledConfig.class);
  26. //2.获取当前实例化完成的bean的所有方法
  27. Method[] methods = bean.getClass().getDeclaredMethods();
  28. //循环处理对每个方法逐一处理
  29. if (methods.length > 0) {
  30. for (Method method : methods) {
  31. //3.尝试在该方法上获取@Scheduled注解(SpringBoot的定时任务注解)
  32. Scheduled annotation = method.getAnnotation(Scheduled.class);
  33. //如果无法获取到@Scheduled注解,就跳过这个方法
  34. if (annotation == null) {
  35. continue;
  36. }
  37. //4.创建定时任务的源属性
  38. //创建定时任务的源属性(用来记录定时任务的配置,初始化的时候记录的是注解上原本的属性)
  39. ScheduledSource scheduledSource = new ScheduledSource(annotation, method, bean);
  40. //对注解上获取到源属性中的属性进行检测
  41. if (!scheduledSource.check()) {
  42. throw new SuperScheduledException("在" + beanName + "Bean中" + method.getName() + "方法的注解参数错误");
  43. }
  44. //生成定时任务的名称(id),使用beanName+“.”+方法名
  45. String name = beanName + "." + method.getName();
  46. //将以key-value的形式,将源数据存入配置管理器中,key:定时任务的名称 value:源数据
  47. superScheduledConfig.addScheduledSource(name, scheduledSource);
  48. try {
  49. //5.将原本SpringBoot的定时任务取消掉
  50. clearOriginalScheduled(annotation);
  51. } catch (Exception e) {
  52. throw new SuperScheduledException("在关闭原始方法" + beanName + method.getName() + "时出现错误");
  53. }
  54. }
  55. }
  56. //最后bean保持原有返回
  57. return bean;
  58. }
  59. /**
  60. * 修改注解原先的属性
  61. * @param annotation 注解实例对象
  62. * @throws Exception
  63. */
  64. private void clearOriginalScheduled(Scheduled annotation) throws Exception {
  65. changeAnnotationValue(annotation, "cron", Scheduled.CRON_DISABLED);
  66. changeAnnotationValue(annotation, "fixedDelay", -1L);
  67. changeAnnotationValue(annotation, "fixedDelayString", "");
  68. changeAnnotationValue(annotation, "fixedRate", -1L);
  69. changeAnnotationValue(annotation, "fixedRateString", "");
  70. changeAnnotationValue(annotation, "initialDelay", -1L);
  71. changeAnnotationValue(annotation, "initialDelayString", "");
  72. }
  73. /**
  74. * 获取SpringBoot的上下文
  75. * @param applicationContext SpringBoot的上下文
  76. */
  77. @Override
  78. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  79. this.applicationContext = applicationContext;
  80. }
  81. }

(3) 使用ApplicationRunner初始化自定义的定时任务运行器

  • 实现ApplicationContextAware接口拿到SpringBoot的上下文
  • 使用@DependsOn注解强制依赖threadPoolTaskScheduler
  • 实现ApplicationRunner接口,在所有bean初始化结束之后,运行自定义逻辑
  • 主要实现逻辑在run()方法中

SpringBoot 定时任务动态管理通用解决方案 - 图2

  1. @DependsOn("threadPoolTaskScheduler")
  2. @Component
  3. public class SuperScheduledApplicationRunner implements ApplicationRunner, ApplicationContextAware {
  4. protected final Log logger = LogFactory.getLog(getClass());
  5. private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  6. private ApplicationContext applicationContext;
  7. /**
  8. * 定时任务配置管理器
  9. */
  10. @Autowired
  11. private SuperScheduledConfig superScheduledConfig;
  12. /**
  13. * 定时任务执行线程
  14. */
  15. @Autowired
  16. private ThreadPoolTaskScheduler threadPoolTaskScheduler;
  17. @Override
  18. public void run(ApplicationArguments args) {
  19. //1.定时任务配置管理器中缓存 定时任务执行线程
  20. superScheduledConfig.setTaskScheduler(threadPoolTaskScheduler);
  21. //2.获取所有定时任务源数据
  22. Map<String, ScheduledSource> nameToScheduledSource = superScheduledConfig.getNameToScheduledSource();
  23. //逐一处理定时任务
  24. for (String name : nameToScheduledSource.keySet()) {
  25. //3.获取定时任务源数据
  26. ScheduledSource scheduledSource = nameToScheduledSource.get(name);
  27. //4.获取所有增强类
  28. String[] baseStrengthenBeanNames = applicationContext.getBeanNamesForType(BaseStrengthen.class);
  29. //5.创建执行控制器
  30. SuperScheduledRunnable runnable = new SuperScheduledRunnable();
  31. //配置执行控制器
  32. runnable.setMethod(scheduledSource.getMethod());
  33. runnable.setBean(scheduledSource.getBean());
  34. //6.逐一处理增强类(增强器实现原理后面具体分析)
  35. List<Point> points = new ArrayList<>(baseStrengthenBeanNames.length);
  36. for (String baseStrengthenBeanName : baseStrengthenBeanNames) {
  37. //7.将增强器代理成point
  38. Object baseStrengthenBean = applicationContext.getBean(baseStrengthenBeanName);
  39. //创建代理
  40. Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable));
  41. proxy.setSuperScheduledName(name);
  42. //8.所有的points连成起来
  43. points.add(proxy);
  44. }
  45. //将point形成调用链
  46. runnable.setChain(new Chain(points));
  47. //将执行逻辑封装并缓存到定时任务配置管理器中
  48. superScheduledConfig.addRunnable(name, runnable::invoke);
  49. try {
  50. //8.启动定时任务
  51. ScheduledFuture<?> schedule = ScheduledFutureFactory.create(threadPoolTaskScheduler
  52. , scheduledSource, runnable::invoke);
  53. //将线程回调钩子存到任务配置管理器中
  54. superScheduledConfig.addScheduledFuture(name, schedule);
  55. logger.info(df.format(LocalDateTime.now()) + "任务" + name + "已经启动...");
  56. } catch (Exception e) {
  57. throw new SuperScheduledException("任务" + name + "启动失败,错误信息:" + e.getLocalizedMessage());
  58. }
  59. }
  60. }
  61. @Override
  62. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  63. this.applicationContext = applicationContext;
  64. }
  65. }

(4) 进行动态管理

  1. @Component
  2. public class SuperScheduledManager {
  3. protected final Log logger = LogFactory.getLog(getClass());
  4. private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  5. @Autowired
  6. private SuperScheduledConfig superScheduledConfig;
  7. /**
  8. * 修改Scheduled的执行周期
  9. *
  10. * @param name scheduled的名称
  11. * @param cron cron表达式
  12. */
  13. public void setScheduledCron(String name, String cron) {
  14. //终止原先的任务
  15. cancelScheduled(name);
  16. //创建新的任务
  17. ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
  18. scheduledSource.clear();
  19. scheduledSource.setCron(cron);
  20. addScheduled(name, scheduledSource);
  21. }
  22. /**
  23. * 修改Scheduled的fixedDelay
  24. *
  25. * @param name scheduled的名称
  26. * @param fixedDelay 上一次执行完毕时间点之后多长时间再执行
  27. */
  28. public void setScheduledFixedDelay(String name, Long fixedDelay) {
  29. //终止原先的任务
  30. cancelScheduled(name);
  31. //创建新的任务
  32. ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
  33. scheduledSource.clear();
  34. scheduledSource.setFixedDelay(fixedDelay);
  35. addScheduled(name, scheduledSource);
  36. }
  37. /**
  38. * 修改Scheduled的fixedRate
  39. *
  40. * @param name scheduled的名称
  41. * @param fixedRate 上一次开始执行之后多长时间再执行
  42. */
  43. public void setScheduledFixedRate(String name, Long fixedRate) {
  44. //终止原先的任务
  45. cancelScheduled(name);
  46. //创建新的任务
  47. ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
  48. scheduledSource.clear();
  49. scheduledSource.setFixedRate(fixedRate);
  50. addScheduled(name, scheduledSource);
  51. }
  52. /**
  53. * 查询所有启动的Scheduled
  54. */
  55. public List<String> getRunScheduledName() {
  56. Set<String> names = superScheduledConfig.getNameToScheduledFuture().keySet();
  57. return new ArrayList<>(names);
  58. }
  59. /**
  60. * 查询所有的Scheduled
  61. */
  62. public List<String> getAllSuperScheduledName() {
  63. Set<String> names = superScheduledConfig.getNameToRunnable().keySet();
  64. return new ArrayList<>(names);
  65. }
  66. /**
  67. * 终止Scheduled
  68. *
  69. * @param name scheduled的名称
  70. */
  71. public void cancelScheduled(String name) {
  72. ScheduledFuture scheduledFuture = superScheduledConfig.getScheduledFuture(name);
  73. scheduledFuture.cancel(true);
  74. superScheduledConfig.removeScheduledFuture(name);
  75. logger.info(df.format(LocalDateTime.now()) + "任务" + name + "已经终止...");
  76. }
  77. /**
  78. * 启动Scheduled
  79. *
  80. * @param name scheduled的名称
  81. * @param scheduledSource 定时任务的源信息
  82. */
  83. public void addScheduled(String name, ScheduledSource scheduledSource) {
  84. if (getRunScheduledName().contains(name)) {
  85. throw new SuperScheduledException("定时任务" + name + "已经被启动过了");
  86. }
  87. if (!scheduledSource.check()) {
  88. throw new SuperScheduledException("定时任务" + name + "源数据内容错误");
  89. }
  90. scheduledSource.refreshType();
  91. Runnable runnable = superScheduledConfig.getRunnable(name);
  92. ThreadPoolTaskScheduler taskScheduler = superScheduledConfig.getTaskScheduler();
  93. ScheduledFuture<?> schedule = ScheduledFutureFactory.create(taskScheduler, scheduledSource, runnable);
  94. logger.info(df.format(LocalDateTime.now()) + "任务" + name + "已经启动...");
  95. superScheduledConfig.addScheduledSource(name, scheduledSource);
  96. superScheduledConfig.addScheduledFuture(name, schedule);
  97. }
  98. /**
  99. * 以cron类型启动Scheduled
  100. *
  101. * @param name scheduled的名称
  102. * @param cron cron表达式
  103. */
  104. public void addCronScheduled(String name, String cron) {
  105. ScheduledSource scheduledSource = new ScheduledSource();
  106. scheduledSource.setCron(cron);
  107. addScheduled(name, scheduledSource);
  108. }
  109. /**
  110. * 以fixedDelay类型启动Scheduled
  111. *
  112. * @param name scheduled的名称
  113. * @param fixedDelay 上一次执行完毕时间点之后多长时间再执行
  114. * @param initialDelay 第一次执行的延迟时间
  115. */
  116. public void addFixedDelayScheduled(String name, Long fixedDelay, Long... initialDelay) {
  117. ScheduledSource scheduledSource = new ScheduledSource();
  118. scheduledSource.setFixedDelay(fixedDelay);
  119. if (initialDelay != null && initialDelay.length == 1) {
  120. scheduledSource.setInitialDelay(initialDelay[0]);
  121. } else if (initialDelay != null && initialDelay.length > 1) {
  122. throw new SuperScheduledException("第一次执行的延迟时间只能传入一个参数");
  123. }
  124. addScheduled(name, scheduledSource);
  125. }
  126. /**
  127. * 以fixedRate类型启动Scheduled
  128. *
  129. * @param name scheduled的名称
  130. * @param fixedRate 上一次开始执行之后多长时间再执行
  131. * @param initialDelay 第一次执行的延迟时间
  132. */
  133. public void addFixedRateScheduled(String name, Long fixedRate, Long... initialDelay) {
  134. ScheduledSource scheduledSource = new ScheduledSource();
  135. scheduledSource.setFixedRate(fixedRate);
  136. if (initialDelay != null && initialDelay.length == 1) {
  137. scheduledSource.setInitialDelay(initialDelay[0]);
  138. } else if (initialDelay != null && initialDelay.length > 1) {
  139. throw new SuperScheduledException("第一次执行的延迟时间只能传入一个参数");
  140. }
  141. addScheduled(name, scheduledSource);
  142. }
  143. /**
  144. * 手动执行一次任务
  145. *
  146. * @param name scheduled的名称
  147. */
  148. public void runScheduled(String name) {
  149. Runnable runnable = superScheduledConfig.getRunnable(name);
  150. runnable.run();
  151. }
  152. }

2、增强接口实现

增强器实现的整体思路与SpringAop的思路一致,实现没有Aop复杂

(1) 增强接口

  1. @Order(Ordered.HIGHEST_PRECEDENCE)
  2. public interface BaseStrengthen {
  3. /**
  4. * 前置强化方法
  5. *
  6. * @param bean bean实例(或者是被代理的bean)
  7. * @param method 执行的方法对象
  8. * @param args 方法参数
  9. */
  10. void before(Object bean, Method method, Object[] args);
  11. /**
  12. * 后置强化方法
  13. * 出现异常不会执行
  14. * 如果未出现异常,在afterFinally方法之后执行
  15. *
  16. * @param bean bean实例(或者是被代理的bean)
  17. * @param method 执行的方法对象
  18. * @param args 方法参数
  19. */
  20. void after(Object bean, Method method, Object[] args);
  21. /**
  22. * 异常强化方法
  23. *
  24. * @param bean bean实例(或者是被代理的bean)
  25. * @param method 执行的方法对象
  26. * @param args 方法参数
  27. */
  28. void exception(Object bean, Method method, Object[] args);
  29. /**
  30. * Finally强化方法,出现异常也会执行
  31. *
  32. * @param bean bean实例(或者是被代理的bean)
  33. * @param method 执行的方法对象
  34. * @param args 方法参数
  35. */
  36. void afterFinally(Object bean, Method method, Object[] args);
  37. }

(2) 代理抽象类

  1. public abstract class Point {
  2. /**
  3. * 定时任务名
  4. */
  5. private String superScheduledName;
  6. /**
  7. * 抽象的执行方法,使用代理实现
  8. * @param runnable 定时任务执行器
  9. */
  10. public abstract Object invoke(SuperScheduledRunnable runnable);
  11. /* 普通的get/sets省略 */
  12. }

(3) 调用链类

  1. public class Chain {
  2. private List<Point> list;
  3. private int index = -1;
  4. /**
  5. * 索引自增1
  6. */
  7. public int incIndex() {
  8. return ++index;
  9. }
  10. /**
  11. * 索引还原
  12. */
  13. public void resetIndex() {
  14. this.index = -1;
  15. }
  16. }

(4) cglib动态代理实现

使用cglib代理增强器,将增强器全部代理成调用链节点Point

  1. public class RunnableBaseInterceptor implements MethodInterceptor {
  2. /**
  3. * 定时任务执行器
  4. */
  5. private SuperScheduledRunnable runnable;
  6. /**
  7. * 定时任务增强类
  8. */
  9. private BaseStrengthen strengthen;
  10. @Override
  11. public Object intercept(Object obj, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
  12. Object result;
  13. //如果执行的是invoke()方法
  14. if ("invoke".equals(method.getName())) {
  15. //前置强化方法
  16. strengthen.before(obj, method, args);
  17. try {
  18. //调用执行器中的invoke()方法
  19. result = runnable.invoke();
  20. } catch (Exception e) {
  21. //异常强化方法
  22. strengthen.exception(obj, method, args);
  23. throw new SuperScheduledException(strengthen.getClass() + "中强化执行时发生错误", e);
  24. } finally {
  25. //Finally强化方法,出现异常也会执行
  26. strengthen.afterFinally(obj, method, args);
  27. }
  28. //后置强化方法
  29. strengthen.after(obj, method, args);
  30. } else {
  31. //直接执行方法
  32. result = methodProxy.invokeSuper(obj, args);
  33. }
  34. return result;
  35. }
  36. public RunnableBaseInterceptor(Object object, SuperScheduledRunnable runnable) {
  37. this.runnable = runnable;
  38. if (BaseStrengthen.class.isAssignableFrom(object.getClass())) {
  39. this.strengthen = (BaseStrengthen) object;
  40. } else {
  41. throw new SuperScheduledException(object.getClass() + "对象不是BaseStrengthen类型");
  42. }
  43. }
  44. public RunnableBaseInterceptor() {
  45. }
  46. }

(5) 定时任务执行器实现

  1. public class SuperScheduledRunnable {
  2. /**
  3. * 原始的方法
  4. */
  5. private Method method;
  6. /**
  7. * 方法所在的bean
  8. */
  9. private Object bean;
  10. /**
  11. * 增强器的调用链
  12. */
  13. private Chain chain;
  14. public Object invoke() {
  15. Object result;
  16. //索引自增1
  17. if (chain.incIndex() == chain.getList().size()) {
  18. //调用链中的增强方法已经全部执行结束
  19. try {
  20. //调用链索引初始化
  21. chain.resetIndex();
  22. //增强器全部执行完毕,执行原本的方法
  23. result = method.invoke(bean);
  24. } catch (IllegalAccessException | InvocationTargetException e) {
  25. throw new SuperScheduledException(e.getLocalizedMessage());
  26. }
  27. } else {
  28. //获取被代理后的方法增强器
  29. Point point = chain.getList().get(chain.getIndex());
  30. //执行增强器代理
  31. //增强器代理中,会回调方法执行器,形成调用链,逐一运行调用链中的增强器
  32. result = point.invoke(this);
  33. }
  34. return result;
  35. }
  36. /* 普通的get/sets省略 */
  37. }

(6) 增强器代理逻辑

com.gyx.superscheduled.core.SuperScheduledApplicationRunner类中的代码片段

  1. //创建执行控制器
  2. SuperScheduledRunnable runnable = new SuperScheduledRunnable();
  3. runnable.setMethod(scheduledSource.getMethod());
  4. runnable.setBean(scheduledSource.getBean());
  5. //用来存放 增强器的代理对象
  6. List<Point> points = new ArrayList<>(baseStrengthenBeanNames.length);
  7. //循环所有的增强器的beanName
  8. for (String baseStrengthenBeanName : baseStrengthenBeanNames) {
  9. //获取增强器的bean对象
  10. Object baseStrengthenBean = applicationContext.getBean(baseStrengthenBeanName);
  11. //将增强器代理成Point节点
  12. Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable));
  13. proxy.setSuperScheduledName(name);
  14. //增强器的代理对象缓存到list中
  15. points.add(proxy);
  16. }
  17. //将增强器代理实例的集合生成调用链
  18. //执行控制器中设置调用链
  19. runnable.setChain(new Chain(points));