一、什么是 xxl-job?
1.1、介绍
1.2、项目解决了什么核心问题?
1.3、与其他项目有什么区别和优势?
二、xxl-job 调度中心安装
2.1、单机安装
version: "3"
services:
xxl-job:
image: xuxueli/xxl-job-admin:2.3.0
container_name: xxl-job-server
environment:
# 注意用空格分割每一个参数
- PARAMS=" --server.port=9090 --spring.datasource.url=jdbc:mysql://mysql:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai --spring.datasource.username= --spring.datasource.password= "
volumes:
- /opt/xxl-job:/data
ports:
- "7070:9090"
networks:
- blog-nextwork # 与外部已启动的mysql 通信
restart: always
networks:
blog-nextwork:
external: true # 指定外部创建的网络,不再创建
2.2、集群安装
- DB配置保持一致;
- 集群机器时钟保持一致(单机集群忽视);
- 建议:推荐通过nginx为调度中心集群做负载均衡,分配域名。调度中心访问、执行器回调配置、调用API服务等操作均通过该域名进行。
三、Spring 集成 xxl-job
3.1、引入依赖
<!-- http://repo1.maven.org/maven2/com/xuxueli/xxl-job-core/ -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.0</version>
</dependency>
3.2、添加配置
server:
port: 8081
xxl:
job:
#accessToken: ''
admin:
# 访问地址
addresses: http://127.0.0.1:7070/xxl-job-admin
executor:
#address: ''
appname: xxl-job-executor-sample
#ip: ''
#logpath: /data/applogs/xxl-job/jobhandler
#logretentiondays: 30
port: 9999
3.3、添加 handler
@Component
public class HelloWorldJobGroup {
/**
* 一个方法就是一个执行器
*
* @throws Exception
*/
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World.");
}
}
3.4、在 xxl-job 服务端添加执行器
3.5、在 xxl-job 服务端添加任务
四、xxl-job原理
4.1、执行器自动注册过程
- 执行器 XxlJobExecutor 初始化
- 注册 JobHandler (反射获取)
- 启动内部 netty http 服务
新创建一个线程 ExecutorRegistryThread,并且该线程为守护线程
启动 scheduleThread 调度线程
获取锁
select * from xxl_job_lock where lock_name = 'schedule_lock' for update
预读取接下来 5s 内运行的任务,限制条数
# 限制条数
# treadpool-size = 快速任务线程池 大小 + 慢速任务线程池 大小
# trigger-qps = 20 ;每个触发器花费 50ms , 那么 1s 可以执行 20 个
treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
超时任务处理(+5s 仍少于现在时间)
- 获取任务的超时处理策略,进行处理
- 任务下次执行时间小于 5s
- 立即触发执行
- 刷新下一次执行时间
- 如果刷新的下一次执行时间等于 5s,并且任务再运行中,进入时间轮容器(刻度为秒,60s)中,预备执行
任务下次执行时间大于大于现在时间
计算时间轮刻度
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
存入时间轮容器中
- 刷新下次执行时间
- 更新任务,持久化到数据库
时间轮线程处理
- 睡眠
- 避免处理耗时太长,跨过刻度,向前校验一个刻度;从时间轮中获取 两个刻度的任务
- 触发器工具类 JobTriggerPoolHelper
- 根据任务是否超时,选择不同的线程池执行
- fastTriggerPool
- slowTriggerPool
- XxlJobTrigger 执行任务
- 通过 jobId 查找任务
- 获取任务分组 (包含执行器地址)
- 获取任务的路由策略
- 一、广播,向全部的执行器发送
- 二、单个处理,通过路由策略选择执行器
- 发送执行请求到执行器中