一、什么是 xxl-job?

1.1、介绍

1.2、项目解决了什么核心问题?

1.3、与其他项目有什么区别和优势?

二、xxl-job 调度中心安装

2.1、单机安装

  1. version: "3"
  2. services:
  3. xxl-job:
  4. image: xuxueli/xxl-job-admin:2.3.0
  5. container_name: xxl-job-server
  6. environment:
  7. # 注意用空格分割每一个参数
  8. - PARAMS=" --server.port=9090 --spring.datasource.url=jdbc:mysql://mysql:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai --spring.datasource.username= --spring.datasource.password= "
  9. volumes:
  10. - /opt/xxl-job:/data
  11. ports:
  12. - "7070:9090"
  13. networks:
  14. - blog-nextwork # 与外部已启动的mysql 通信
  15. restart: always
  16. networks:
  17. blog-nextwork:
  18. external: true # 指定外部创建的网络,不再创建

2.2、集群安装

  • DB配置保持一致;
  • 集群机器时钟保持一致(单机集群忽视);
  • 建议:推荐通过nginx为调度中心集群做负载均衡,分配域名。调度中心访问、执行器回调配置、调用API服务等操作均通过该域名进行。

三、Spring 集成 xxl-job

3.1、引入依赖

  1. <!-- http://repo1.maven.org/maven2/com/xuxueli/xxl-job-core/ -->
  2. <dependency>
  3. <groupId>com.xuxueli</groupId>
  4. <artifactId>xxl-job-core</artifactId>
  5. <version>2.3.0</version>
  6. </dependency>

3.2、添加配置

  1. server:
  2. port: 8081
  3. xxl:
  4. job:
  5. #accessToken: ''
  6. admin:
  7. # 访问地址
  8. addresses: http://127.0.0.1:7070/xxl-job-admin
  9. executor:
  10. #address: ''
  11. appname: xxl-job-executor-sample
  12. #ip: ''
  13. #logpath: /data/applogs/xxl-job/jobhandler
  14. #logretentiondays: 30
  15. port: 9999

3.3、添加 handler

  1. @Component
  2. public class HelloWorldJobGroup {
  3. /**
  4. * 一个方法就是一个执行器
  5. *
  6. * @throws Exception
  7. */
  8. @XxlJob("demoJobHandler")
  9. public void demoJobHandler() throws Exception {
  10. XxlJobHelper.log("XXL-JOB, Hello World.");
  11. }
  12. }

3.4、在 xxl-job 服务端添加执行器

image.png

3.5、在 xxl-job 服务端添加任务

image.png

四、xxl-job原理

4.1、执行器自动注册过程

  • 执行器 XxlJobExecutor 初始化
    • 注册 JobHandler (反射获取)
  • 启动内部 netty http 服务
  • 新创建一个线程 ExecutorRegistryThread,并且该线程为守护线程

    • 向服务端发送 http请求注册
    • 服务端收到请求后,更新或插入执行器数据到数据库
    • 线程睡眠30秒后,循环执行ExecutorRegistryThread

      4.2、任务执行过程

      image.png

      4.2.1、服务端

      调度管理工具 JobScheduleHelper

  • 启动 scheduleThread 调度线程

    • 获取锁

      1. select * from xxl_job_lock where lock_name = 'schedule_lock' for update
    • 预读取接下来 5s 内运行的任务,限制条数

      1. # 限制条数
      2. # treadpool-size = 快速任务线程池 大小 + 慢速任务线程池 大小
      3. # trigger-qps = 20 ;每个触发器花费 50ms , 那么 1s 可以执行 20 个
      4. treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
  • 超时任务处理(+5s 仍少于现在时间)

    • 获取任务的超时处理策略,进行处理
  • 任务下次执行时间小于 5s
    • 立即触发执行
    • 刷新下一次执行时间
    • 如果刷新的下一次执行时间等于 5s,并且任务再运行中,进入时间轮容器(刻度为秒,60s)中,预备执行
  • 任务下次执行时间大于大于现在时间

    • 计算时间轮刻度

      1. int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
    • 存入时间轮容器中

    • 刷新下次执行时间
  • 更新任务,持久化到数据库

时间轮线程处理

  • 睡眠
  • 避免处理耗时太长,跨过刻度,向前校验一个刻度;从时间轮中获取 两个刻度的任务
  • 触发器工具类 JobTriggerPoolHelper
  • 根据任务是否超时,选择不同的线程池执行
    • fastTriggerPool
    • slowTriggerPool
  • XxlJobTrigger 执行任务
    • 通过 jobId 查找任务
    • 获取任务分组 (包含执行器地址)
    • 获取任务的路由策略
      • 一、广播,向全部的执行器发送
      • 二、单个处理,通过路由策略选择执行器
    • 发送执行请求到执行器中

4.2.2、执行器端