架构图
使用
首先把xxljob 部署起来
1.代码
// 配置类
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}
// 业务处理
@Component
public class XxlJobHandler {
@XxlJob("dispatchDelayAnalyse")
public ReturnT<String> dispatchDelayAnalyse(String param) throws Exception {
XxlJobLogger.log("XXL-JOB, dispatchDelayAnalyse begin");
// 业务处理
return ReturnT.SUCCESS;
}
2.任务管理界面配置任务
原理
调度器初始化时序图
xxl-job 调度器的核心功能是进行任务管理,调度,提供 rpc 接口出发执行器执行任务
任务调度流程
旧版本的 xxl-job 任务调度使用的是 quartz 框架,新版本使用自己的实现,核心调度逻辑位于 JobScheduleHelper;有两个线程处理调度逻辑,scheduleThread 和 ringThread,同时还维护了一个时间轮的数据结构,其中key 是秒,value 是待执行的任务列表;
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
ScheduleThread
scheduleThread 会向前预查询 5s 要执行的任务信息,比较任务的执行时间与当前时间, 分情况进行处理:
先从xxl_job_lock 表获取锁,防止多个调度器的情况下任务并发执行;
select * from xxl_job_lock where lock_name = 'schedule_lock' for update
预读 5s 要执行的任务信息到内存
SELECT <include refid="Base_Column_List" /> FROM xxl_job_info AS t WHERE t.trigger_status = 1 and t.trigger_next_time <![CDATA[ <= ]]> #{maxNextTime} ORDER BY id ASC LIMIT #{pagesize}
比较任务执行时间与当前时间,做不同处理
1)nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS: 说明任务已过期,计算任务下次调度时间;
2)nowTime > jobInfo.getTriggerNextTime(): 直接出发执行,并且计算任务的下次执行时间,如果 nowTime + PRE_READ_MS >下次执行时间,将任务放到时间轮,再次计算job的下次执行时间;
3)如果nowTime 不满足上面两个条件,将任务放到时间轮,再次计算job的下次执行时间;
- 更新任务下次执行时间;
- 手动提交事务
RingThread
ringThread 负责从时间轮中获取待执行的任务,直接触发执行,为了防止任务执行时间过长,每次向前预读取一个刻度的任务; ```java ListringItemData = new ArrayList<>(); int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
// 避免处理耗时太长,跨过刻度,向前校验一个刻度; for (int i = 0; i < 2; i++) { ListtmpData = ringData.remove( (nowSecond+60-i)%60 ); if (tmpData != null) {
} }ringItemData.addAll(tmpData);
// ring trigger if (ringItemData.size() > 0) { // do trigger for (int jobId: ringItemData) { // do trigger JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null); } // clear ringItemData.clear(); } ```
执行器初始化时序图
xxl-job 的执行器作用:
- 注册执行器;
- 处理调度器的rpc 请求,执行任务;
- 处理结果回调;
实际项目使用过程中会与spring 框架集成使用,项目启动后会首先扫描注册所有 @XxlJob 注解的 job,然后初始化了一个 netty http server 来监听处理调度器的 rpc 请求,执行任务,处理完成后将处理结果返回给调度器。�