调度中心admin
1. 调度中心admin启动过程
// 1. 加载 XxlJobAdminConfig
//2. 启动过程 XxlJobScheduler
public void init() throws Exception {
// init i18n
initI18n();
// admin registry monitor run 注册监听器线程的启动 ***
JobRegistryMonitorHelper.getInstance().start();
// admin fail-monitor run 失败任务监听线程启动 *** 每10s去数据库轮询失败的日志任务,然后发送消息
JobFailMonitorHelper.getInstance().start();
// admin lose-monitor run 丢失任务监听线程启动 ***
// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
JobLosedMonitorHelper.getInstance().start();
// admin trigger pool start 触发器池线程的启动 *****
JobTriggerPoolHelper.toStart();
// admin log report start 日志报告线程的启动 *** 每1s统计一次任务执行器执行情况
JobLogReportHelper.getInstance().start();
// start-schedule 调度线程的启动维护 *****
JobScheduleHelper.getInstance().start();
}
1.1 注册监听器线程的启动
每30s维护一次自动注册的在线执行器。
//任务注册Beat周期默认30s; 执行器以一倍Beat进行执行器注册, 调度中心以一倍Beat进行动态任务发现; 注册信息的失效时间为三倍Beat;
public void start(){
registryThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {
// 查询出自动注册类型的执行器
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {
// 删除无效的注册地址 BEAT_TIMEOUT=30s,DEAD_TIMEOUT = BEAT_TIMEOUT * 3;
//找到 xxl_job_registry表中 updateTime < 当前时间-90s的地址,判定这些地址无效,数据删除
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}
// 找到 xxl_job_registry表中 updateTime > 当前时间-90s 的有效地址,更新xxl_job_group的address_list字段
// fresh online address (admin/executor)
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
for (XxlJobRegistry item: list) {
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
String appname = item.getRegistryKey();
List<String> registryList = appAddressMap.get(appname);
if (registryList == null) {
registryList = new ArrayList<String>();
}
if (!registryList.contains(item.getRegistryValue())) {
registryList.add(item.getRegistryValue());
}
appAddressMap.put(appname, registryList);
}
}
}
// fresh group address
for (XxlJobGroup group: groupList) {
List<String> registryList = appAddressMap.get(group.getAppname());
String addressListStr = null;
if (registryList!=null && !registryList.isEmpty()) {
Collections.sort(registryList);
StringBuilder addressListSB = new StringBuilder();
for (String item:registryList) {
addressListSB.append(item).append(",");
}
addressListStr = addressListSB.toString();
addressListStr = addressListStr.substring(0, addressListStr.length()-1);
}
group.setAddressList(addressListStr);
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
try {
//频率为30s一次
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
}
});
//守护线程
registryThread.setDaemon(true);
registryThread.setName("xxl-job, admin JobRegistryMonitorHelper");
registryThread.start();
}
1.2 调度线程的启动维护
1. 计算单线程1s处理任务数,假设处理一个任务需要50ms,那么1s能够处理20个任务
2. 计算出1s最大处理任务量 = 最大线程数 * 20
3. 死循环轮询加载需要调度的任务
3.1 开启事务,使用 select * from xxl_job_lock where lock_name = 'schedule_lock' for update 加行级锁,保证单线程处理
3.2 得到未来5s的任务, triggerNextTime < nowTime()+5s, limit 最大任务处理量
3.3 判断任务情况
if (nowTime > triggerNextTime +5 s), 不再执行该任务(misfire),只更新下次执行时间
else if (nowTime > getTriggerNextTime)
, 超时在5s内的,立即trigger任务,然后更新下次执行时间
if(jobInfo.getTriggerStatus() == 1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime())
成功触发任务,如果下次执行时间在5秒内,扔到ringData中等待处理,更新下次执行时间
else 还未达到触发的时间点,扔到ringData中等待处理,更新下次执行时间
4. 提交事务,释放锁
5. 判断此次时间是否超过了1s,小于1s,需等待到1s,TimeUnit.MILLISECONDS.sleep((preReadSuc ? 1000 : PRE_READ_MS) - System.currentTimeMillis() % 1000);
否则,直接下次循环
## 调度的准时处理
秒级的准时处理,3.3 中Map<Integer, List<Integer>> ringData 的key使用的是 int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);这样保证了一个jobId不会出现重复添加的情况
// 避免处理耗时太长,跨过刻度,向前校验一个刻度;
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove((nowSecond + 60 - i) % 60);
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
1.3 触发器池线程的启动
1.4 触发的路由策略
一致性hash策略
分组下机器地址相同,不同JOB均匀散列在不同机器上,保证分组下机器分配JOB平均;且每个JOB固定调度其中一台机器;
a、virtual node:解决不均衡问题
b、hash method replace hashCode:String的hashCode可能重复,需要进一步扩大hashCode的取值范围
参考文章: https://juejin.im/post/6873407119421341704
最不经常使用原则 LFU
// load least userd count address
List<Map.Entry<String, Integer>> lfuItemList = new ArrayList<>(lfuItemMap.entrySet());
//按照使用次数排序
Collections.sort(lfuItemList, Comparator.comparing(Map.Entry::getValue));
LRU
/**
* LinkedHashMap
* a、accessOrder:true=访问顺序排序(get/put时排序);false=插入顺序排期;
* b、removeEldestEntry:新增元素时将会调用,返回true时会删除最老元素;可封装LinkedHashMap并重写该方法,比如定义最大容量,超出是返回true即可实现固定长度的LRU算法;
*/
巧用 new LinkedHashMap<>(16, 0.75f, true);
或者 new LinkedHashSet<>(16, 0.75f, true);
- 故障转移
做一次心跳检测,执行心跳检测成功的服务 - 忙碌转移
判断当前job对应的线程是否正在执行,以非执行中的去执行 - 轮询
按照一定的规则取模