调度中心admin

1. 调度中心admin启动过程

  1. // 1. 加载 XxlJobAdminConfig
  2. //2. 启动过程 XxlJobScheduler
  3. public void init() throws Exception {
  4. // init i18n
  5. initI18n();
  6. // admin registry monitor run 注册监听器线程的启动 ***
  7. JobRegistryMonitorHelper.getInstance().start();
  8. // admin fail-monitor run 失败任务监听线程启动 *** 每10s去数据库轮询失败的日志任务,然后发送消息
  9. JobFailMonitorHelper.getInstance().start();
  10. // admin lose-monitor run 丢失任务监听线程启动 ***
  11. // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
  12. JobLosedMonitorHelper.getInstance().start();
  13. // admin trigger pool start 触发器池线程的启动 *****
  14. JobTriggerPoolHelper.toStart();
  15. // admin log report start 日志报告线程的启动 *** 每1s统计一次任务执行器执行情况
  16. JobLogReportHelper.getInstance().start();
  17. // start-schedule 调度线程的启动维护 *****
  18. JobScheduleHelper.getInstance().start();
  19. }

1.1 注册监听器线程的启动

每30s维护一次自动注册的在线执行器。

  1. //任务注册Beat周期默认30s; 执行器以一倍Beat进行执行器注册, 调度中心以一倍Beat进行动态任务发现; 注册信息的失效时间为三倍Beat;
  2. public void start(){
  3. registryThread = new Thread(new Runnable() {
  4. @Override
  5. public void run() {
  6. while (!toStop) {
  7. try {
  8. // 查询出自动注册类型的执行器
  9. List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
  10. if (groupList!=null && !groupList.isEmpty()) {
  11. // 删除无效的注册地址 BEAT_TIMEOUT=30s,DEAD_TIMEOUT = BEAT_TIMEOUT * 3;
  12. //找到 xxl_job_registry表中 updateTime < 当前时间-90s的地址,判定这些地址无效,数据删除
  13. List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
  14. if (ids!=null && ids.size()>0) {
  15. XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
  16. }
  17. // 找到 xxl_job_registry表中 updateTime > 当前时间-90s 的有效地址,更新xxl_job_group的address_list字段
  18. // fresh online address (admin/executor)
  19. HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
  20. List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
  21. if (list != null) {
  22. for (XxlJobRegistry item: list) {
  23. if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
  24. String appname = item.getRegistryKey();
  25. List<String> registryList = appAddressMap.get(appname);
  26. if (registryList == null) {
  27. registryList = new ArrayList<String>();
  28. }
  29. if (!registryList.contains(item.getRegistryValue())) {
  30. registryList.add(item.getRegistryValue());
  31. }
  32. appAddressMap.put(appname, registryList);
  33. }
  34. }
  35. }
  36. // fresh group address
  37. for (XxlJobGroup group: groupList) {
  38. List<String> registryList = appAddressMap.get(group.getAppname());
  39. String addressListStr = null;
  40. if (registryList!=null && !registryList.isEmpty()) {
  41. Collections.sort(registryList);
  42. StringBuilder addressListSB = new StringBuilder();
  43. for (String item:registryList) {
  44. addressListSB.append(item).append(",");
  45. }
  46. addressListStr = addressListSB.toString();
  47. addressListStr = addressListStr.substring(0, addressListStr.length()-1);
  48. }
  49. group.setAddressList(addressListStr);
  50. XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
  51. }
  52. }
  53. } catch (Exception e) {
  54. if (!toStop) {
  55. logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
  56. }
  57. }
  58. try {
  59. //频率为30s一次
  60. TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
  61. } catch (InterruptedException e) {
  62. if (!toStop) {
  63. logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
  64. }
  65. }
  66. }
  67. logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
  68. }
  69. });
  70. //守护线程
  71. registryThread.setDaemon(true);
  72. registryThread.setName("xxl-job, admin JobRegistryMonitorHelper");
  73. registryThread.start();
  74. }

1.2 调度线程的启动维护
  1. 1. 计算单线程1s处理任务数,假设处理一个任务需要50ms,那么1s能够处理20个任务
  2. 2. 计算出1s最大处理任务量 = 最大线程数 * 20
  3. 3. 死循环轮询加载需要调度的任务
  4. 3.1 开启事务,使用 select * from xxl_job_lock where lock_name = 'schedule_lock' for update 加行级锁,保证单线程处理
  5. 3.2 得到未来5s的任务, triggerNextTime < nowTime()+5s, limit 最大任务处理量
  6. 3.3 判断任务情况
  7. if (nowTime > triggerNextTime +5 s), 不再执行该任务(misfire),只更新下次执行时间
  8. else if (nowTime > getTriggerNextTime)
  9. , 超时在5s内的,立即trigger任务,然后更新下次执行时间
  10. if(jobInfo.getTriggerStatus() == 1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime())
  11. 成功触发任务,如果下次执行时间在5秒内,扔到ringData中等待处理,更新下次执行时间
  12. else 还未达到触发的时间点,扔到ringData中等待处理,更新下次执行时间
  13. 4. 提交事务,释放锁
  14. 5. 判断此次时间是否超过了1s,小于1s,需等待到1sTimeUnit.MILLISECONDS.sleep((preReadSuc ? 1000 : PRE_READ_MS) - System.currentTimeMillis() % 1000);
  15. 否则,直接下次循环
  16. ## 调度的准时处理
  17. 秒级的准时处理,3.3 Map<Integer, List<Integer>> ringData key使用的是 int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);这样保证了一个jobId不会出现重复添加的情况
  18. // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
  19. for (int i = 0; i < 2; i++) {
  20. List<Integer> tmpData = ringData.remove((nowSecond + 60 - i) % 60);
  21. if (tmpData != null) {
  22. ringItemData.addAll(tmpData);
  23. }
  24. }

1.3 触发器池线程的启动

1.4 触发的路由策略
  • 一致性hash策略

    1. 分组下机器地址相同,不同JOB均匀散列在不同机器上,保证分组下机器分配JOB平均;且每个JOB固定调度其中一台机器;
    2. avirtual node:解决不均衡问题
    3. bhash method replace hashCodeStringhashCode可能重复,需要进一步扩大hashCode的取值范围
    4. 参考文章: https://juejin.im/post/6873407119421341704
  • 最不经常使用原则 LFU

    1. // load least userd count address
    2. List<Map.Entry<String, Integer>> lfuItemList = new ArrayList<>(lfuItemMap.entrySet());
    3. //按照使用次数排序
    4. Collections.sort(lfuItemList, Comparator.comparing(Map.Entry::getValue));

  • LRU

    1. /**
    2. * LinkedHashMap
    3. * a、accessOrder:true=访问顺序排序(get/put时排序);false=插入顺序排期;
    4. * b、removeEldestEntry:新增元素时将会调用,返回true时会删除最老元素;可封装LinkedHashMap并重写该方法,比如定义最大容量,超出是返回true即可实现固定长度的LRU算法;
    5. */
    6. 巧用 new LinkedHashMap<>(16, 0.75f, true);
    7. 或者 new LinkedHashSet<>(16, 0.75f, true);
  • https://juejin.im/post/6844904053000912903

  • 故障转移
    做一次心跳检测,执行心跳检测成功的服务
  • 忙碌转移
    判断当前job对应的线程是否正在执行,以非执行中的去执行
  • 轮询
    按照一定的规则取模