调度中心admin
1. 调度中心admin启动过程
// 1. 加载 XxlJobAdminConfig//2. 启动过程 XxlJobSchedulerpublic void init() throws Exception {// init i18ninitI18n();// admin registry monitor run 注册监听器线程的启动 ***JobRegistryMonitorHelper.getInstance().start();// admin fail-monitor run 失败任务监听线程启动 *** 每10s去数据库轮询失败的日志任务,然后发送消息JobFailMonitorHelper.getInstance().start();// admin lose-monitor run 丢失任务监听线程启动 ***// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;JobLosedMonitorHelper.getInstance().start();// admin trigger pool start 触发器池线程的启动 *****JobTriggerPoolHelper.toStart();// admin log report start 日志报告线程的启动 *** 每1s统计一次任务执行器执行情况JobLogReportHelper.getInstance().start();// start-schedule 调度线程的启动维护 *****JobScheduleHelper.getInstance().start();}
1.1 注册监听器线程的启动
每30s维护一次自动注册的在线执行器。
//任务注册Beat周期默认30s; 执行器以一倍Beat进行执行器注册, 调度中心以一倍Beat进行动态任务发现; 注册信息的失效时间为三倍Beat;public void start(){registryThread = new Thread(new Runnable() {@Overridepublic void run() {while (!toStop) {try {// 查询出自动注册类型的执行器List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);if (groupList!=null && !groupList.isEmpty()) {// 删除无效的注册地址 BEAT_TIMEOUT=30s,DEAD_TIMEOUT = BEAT_TIMEOUT * 3;//找到 xxl_job_registry表中 updateTime < 当前时间-90s的地址,判定这些地址无效,数据删除List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());if (ids!=null && ids.size()>0) {XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);}// 找到 xxl_job_registry表中 updateTime > 当前时间-90s 的有效地址,更新xxl_job_group的address_list字段// fresh online address (admin/executor)HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());if (list != null) {for (XxlJobRegistry item: list) {if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {String appname = item.getRegistryKey();List<String> registryList = appAddressMap.get(appname);if (registryList == null) {registryList = new ArrayList<String>();}if (!registryList.contains(item.getRegistryValue())) {registryList.add(item.getRegistryValue());}appAddressMap.put(appname, registryList);}}}// fresh group addressfor (XxlJobGroup group: groupList) {List<String> registryList = appAddressMap.get(group.getAppname());String addressListStr = null;if (registryList!=null && !registryList.isEmpty()) {Collections.sort(registryList);StringBuilder addressListSB = new StringBuilder();for (String item:registryList) {addressListSB.append(item).append(",");}addressListStr = addressListSB.toString();addressListStr = addressListStr.substring(0, addressListStr.length()-1);}group.setAddressList(addressListStr);XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);}}} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);}}try {//频率为30s一次TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);} catch (InterruptedException e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);}}}logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");}});//守护线程registryThread.setDaemon(true);registryThread.setName("xxl-job, admin JobRegistryMonitorHelper");registryThread.start();}
1.2 调度线程的启动维护
1. 计算单线程1s处理任务数,假设处理一个任务需要50ms,那么1s能够处理20个任务2. 计算出1s最大处理任务量 = 最大线程数 * 203. 死循环轮询加载需要调度的任务3.1 开启事务,使用 select * from xxl_job_lock where lock_name = 'schedule_lock' for update 加行级锁,保证单线程处理3.2 得到未来5s的任务, triggerNextTime < nowTime()+5s, limit 最大任务处理量3.3 判断任务情况if (nowTime > triggerNextTime +5 s), 不再执行该任务(misfire),只更新下次执行时间else if (nowTime > getTriggerNextTime), 超时在5s内的,立即trigger任务,然后更新下次执行时间if(jobInfo.getTriggerStatus() == 1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime())成功触发任务,如果下次执行时间在5秒内,扔到ringData中等待处理,更新下次执行时间else 还未达到触发的时间点,扔到ringData中等待处理,更新下次执行时间4. 提交事务,释放锁5. 判断此次时间是否超过了1s,小于1s,需等待到1s,TimeUnit.MILLISECONDS.sleep((preReadSuc ? 1000 : PRE_READ_MS) - System.currentTimeMillis() % 1000);否则,直接下次循环## 调度的准时处理秒级的准时处理,3.3 中Map<Integer, List<Integer>> ringData 的key使用的是 int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);这样保证了一个jobId不会出现重复添加的情况// 避免处理耗时太长,跨过刻度,向前校验一个刻度;for (int i = 0; i < 2; i++) {List<Integer> tmpData = ringData.remove((nowSecond + 60 - i) % 60);if (tmpData != null) {ringItemData.addAll(tmpData);}}
1.3 触发器池线程的启动
1.4 触发的路由策略
一致性hash策略
分组下机器地址相同,不同JOB均匀散列在不同机器上,保证分组下机器分配JOB平均;且每个JOB固定调度其中一台机器;a、virtual node:解决不均衡问题b、hash method replace hashCode:String的hashCode可能重复,需要进一步扩大hashCode的取值范围参考文章: https://juejin.im/post/6873407119421341704
最不经常使用原则 LFU
// load least userd count addressList<Map.Entry<String, Integer>> lfuItemList = new ArrayList<>(lfuItemMap.entrySet());//按照使用次数排序Collections.sort(lfuItemList, Comparator.comparing(Map.Entry::getValue));
LRU
/*** LinkedHashMap* a、accessOrder:true=访问顺序排序(get/put时排序);false=插入顺序排期;* b、removeEldestEntry:新增元素时将会调用,返回true时会删除最老元素;可封装LinkedHashMap并重写该方法,比如定义最大容量,超出是返回true即可实现固定长度的LRU算法;*/巧用 new LinkedHashMap<>(16, 0.75f, true);或者 new LinkedHashSet<>(16, 0.75f, true);
- 故障转移
做一次心跳检测,执行心跳检测成功的服务 - 忙碌转移
判断当前job对应的线程是否正在执行,以非执行中的去执行 - 轮询
按照一定的规则取模
