架构图

image.png

使用

首先把xxljob 部署起来

1.代码

  1. // 配置类
  2. @Configuration
  3. public class XxlJobConfig {
  4. private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
  5. @Value("${xxl.job.admin.addresses}")
  6. private String adminAddresses;
  7. @Value("${xxl.job.accessToken}")
  8. private String accessToken;
  9. @Value("${xxl.job.executor.appname}")
  10. private String appname;
  11. @Value("${xxl.job.executor.address}")
  12. private String address;
  13. @Value("${xxl.job.executor.ip}")
  14. private String ip;
  15. @Value("${xxl.job.executor.port}")
  16. private int port;
  17. @Value("${xxl.job.executor.logpath}")
  18. private String logPath;
  19. @Value("${xxl.job.executor.logretentiondays}")
  20. private int logRetentionDays;
  21. @Bean
  22. public XxlJobSpringExecutor xxlJobExecutor() {
  23. XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
  24. xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
  25. xxlJobSpringExecutor.setAppname(appname);
  26. xxlJobSpringExecutor.setAddress(address);
  27. xxlJobSpringExecutor.setIp(ip);
  28. xxlJobSpringExecutor.setPort(port);
  29. xxlJobSpringExecutor.setAccessToken(accessToken);
  30. xxlJobSpringExecutor.setLogPath(logPath);
  31. xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
  32. return xxlJobSpringExecutor;
  33. }
  34. }
  35. // 业务处理
  36. @Component
  37. public class XxlJobHandler {
  38. @XxlJob("dispatchDelayAnalyse")
  39. public ReturnT<String> dispatchDelayAnalyse(String param) throws Exception {
  40. XxlJobLogger.log("XXL-JOB, dispatchDelayAnalyse begin");
  41. // 业务处理
  42. return ReturnT.SUCCESS;
  43. }

2.任务管理界面配置任务

image.png

原理

调度器初始化时序图

xxl-job调度器初始化时序.png
xxl-job 调度器的核心功能是进行任务管理,调度,提供 rpc 接口出发执行器执行任务

任务调度流程

旧版本的 xxl-job 任务调度使用的是 quartz 框架,新版本使用自己的实现,核心调度逻辑位于 JobScheduleHelper;有两个线程处理调度逻辑,scheduleThread 和 ringThread,同时还维护了一个时间轮的数据结构,其中key 是秒,value 是待执行的任务列表;

  1. private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();

ScheduleThread

scheduleThread 会向前预查询 5s 要执行的任务信息,比较任务的执行时间与当前时间, 分情况进行处理:

  1. 先从xxl_job_lock 表获取锁,防止多个调度器的情况下任务并发执行;

    1. select * from xxl_job_lock where lock_name = 'schedule_lock' for update
  2. 预读 5s 要执行的任务信息到内存

    SELECT <include refid="Base_Column_List" />
         FROM xxl_job_info AS t
         WHERE t.trigger_status = 1
             and t.trigger_next_time <![CDATA[ <= ]]> #{maxNextTime}
         ORDER BY id ASC
         LIMIT #{pagesize}
    
  3. 比较任务执行时间与当前时间,做不同处理

1)nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS: 说明任务已过期,计算任务下次调度时间;
2)nowTime > jobInfo.getTriggerNextTime(): 直接出发执行,并且计算任务的下次执行时间,如果 nowTime + PRE_READ_MS >下次执行时间,将任务放到时间轮,再次计算job的下次执行时间;
3)如果nowTime 不满足上面两个条件,将任务放到时间轮,再次计算job的下次执行时间;

  1. 更新任务下次执行时间;
  2. 手动提交事务

    RingThread

    ringThread 负责从时间轮中获取待执行的任务,直接触发执行,为了防止任务执行时间过长,每次向前预读取一个刻度的任务; ```java List ringItemData = new ArrayList<>(); int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
    // 避免处理耗时太长,跨过刻度,向前校验一个刻度; for (int i = 0; i < 2; i++) { List tmpData = ringData.remove( (nowSecond+60-i)%60 ); if (tmpData != null) {
     ringItemData.addAll(tmpData);
    
    } }

// ring trigger if (ringItemData.size() > 0) { // do trigger for (int jobId: ringItemData) { // do trigger JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null); } // clear ringItemData.clear(); } ```

执行器初始化时序图

xxl-job执行器初始化时序图.png
xxl-job 的执行器作用:

  1. 注册执行器;
  2. 处理调度器的rpc 请求,执行任务;
  3. 处理结果回调;

实际项目使用过程中会与spring 框架集成使用,项目启动后会首先扫描注册所有 @XxlJob 注解的 job,然后初始化了一个 netty http server 来监听处理调度器的 rpc 请求,执行任务,处理完成后将处理结果返回给调度器。�