(1) 基本介绍
**XXL-JOB**
是一个轻量级分布式任务调度平台,主打特点是平台化,易部署,开发迅速、学习简单、轻量级、易扩展,代码仍在持续更新中。
调度中心
: 任务调度控制台,平台自身并不承担业务逻辑,只是负责任务的统一管理和调度执行,并且提供任务管理平台执行器
: 负责接收“调度中心”的调度并执行,可直接部署执行器,也可以将执行器集成到现有业务项目中。 通过将任务的调度控制和任务的执行解耦,业务使用只需要关注业务逻辑的开发。XXL-JOB
主要提供了任务的动态配置管理、任务监控和统计报表以及调度日志几大功能模块,支持多种运行模式和路由策略,可基于对应执行器机器集群数量进行简单分片数据处理。(2) 框架源码及文档
源码地址:
- 码云
文档地址:
- 中文文档
-
(3)XXL-JOB的特性
1、简单:支持通过
Web页面
对任务进行CRUD
操作,操作简单,一分钟上手;- 2、动态:支持动态修改任务状态、启动/停止任务,以及终止运行中任务,即时生效;
- 3、调度中心HA(中心式):调度采用
中心式设计
,调度中心自研调度组件并支持集群部署
,可保证调度中心HA; - 4、执行器HA(分布式):任务分布式执行,任务”执行器”支持集群部署,可保证任务执行HA;
- 5、注册中心: 执行器会周期性自动注册任务, 调度中心将会自动发现注册的任务并触发执行。也支持手动录入执行器地址;
- 6、弹性扩容缩容:一旦有新执行器机器上线或者下线,下次调度时将会重新分配任务;
- 7、路由策略:执行器集群部署时提供丰富的路由策略,包括:第一个、最后一个、轮询、随机、一致性HASH、最不经常使用、最近最久未使用、故障转移、忙碌转移等;
- 8、故障转移:任务路由策略选择故障转移情况下,如果执行器集群中某一台机器故障,将会自动Failover切换到一台正常的执行器发送调度请求。
- 9、阻塞处理策略:调度过于密集执行器来不及处理时的处理策略,策略包括:单机串行(默认)、丢弃后续调度、覆盖之前调度;
- 10、任务超时控制:支持自定义任务超时时间,任务运行超时将会主动中断任务;
- 11、任务失败重试:支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;其中分片任务支持分片粒度的失败重试;
- 12、任务失败警告:默认提供邮件方式失败告警,同时预留扩展接口,可方便的扩展短信、钉钉等告警方式;
- 13、分片广播任务:执行器集群部署时,任务路由策略选择
分片广播
情况下,一次任务调度将会广播触发集群中所有执行器执行一次任务,可根据分片参数开发分片任务; - 14、动态分片:分片广播任务以执行器为维度进行分片,支持动态扩容执行器集群从而动态增加分片数量,协同进行业务处理;在进行大数据量业务操作时可显著提升任务处理能力和速度。
- 15、事件触发:除了
Cron方式
和任务依赖方式
触发任务执行之外,支持基于事件的触发任务方式。调度中心提供触发任务单次执行的API服务,可根据业务事件灵活触发(4) XXL-JOB架构图
最新版本架构图:xxl-job其实也是在quartz的基础上实现的,但是修改了任务调度的模式,并且任务调度采用注册和RPC调用方式来实现。
(5) XXL-JOB原理解析
2.1.0版本前核心调度模块都是基于quartz
框架,2.1.0版本开始自研调度组件,移除quartz
依赖 ,使用时间轮调度。
(RPC的底层变化, 2.0.1 使用的是Jetty服务的RPC, 2.0.2 使用的Nettty服务的RPC)
(5.1) 定时触发任务是如何实现的?:使用时间轮实现
xxl_job_info
表是记录定时任务的db表,里面有个trigger_next_time(Long)
字段,表示下一次触发的时间点任务时间被修改 / 每一次任务触发后,可以根据cronb
表达式计算下一次触发时间戳:
Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date()))
更新trigger_next_time
字段定时执行任务逻辑:
- 定时任务
scheduleThread
:不断从db
把5秒
内要执行的任务读出,立即触发 / 放到时间轮等待触发,并更新trigger_next_time
- 获取当前时间
now
- 轮询
db
,找出trigger_next_time
在距now 5秒
内的任务
3.1 对到达now时间后的任务(超出now 5秒外)
(1) 直接跳过不执行;
(2) 重置trigger_next_time
3.2 对到达now
时间后的任务(超出now
5秒内)
(1) 开线程执行触发逻辑;
(2) 若任务下一次触发时间是在5秒内,则放到时间轮内(Map秒数(1-60) => 任务id列表);
(3) 重置trigger_next_time
3.3 对未到达now
时间的任务
(1)直接放到时间轮内;
(2)重置trigger_next_time
- 定时任务
ringThread
:时间轮实现到点触发任务
4.1 时间轮数据结构:Map<Integer, List<Integer>> key
是秒数(1-60)
,value
是任务id
列表 - 获取当前时间秒数
- 从时间轮内移出当前秒数前2个秒数(避免处理耗时太长,跨过刻度,向前校验一个刻度)的任务列表id,一一触发任务;
(5.2) 如何避免集群中的多个服务器同时调度任务?
当xxl-job应用本身集群部署(实现高可用HA)时,如何避免集群中的多个服务器同时调度任务?
通过mysql悲观锁实现分布式锁(for update语句)
- 定时任务
setAutoCommit(false)
关闭隐式自动提交事务,启动事务select lock for update
(显式排他锁,其他事务无法进入&无法实现for update
)- 读
db
任务信息 -> 拉任务到内存时间轮 -> 更新db
任务信息 commit
提交事务,同时会释放for update
的排他锁(悲观锁)(5.3) 任务执行器注册中心是如何实现的?
使用db表xxl_job_group记录下执行器的信息:执行器AppName、执行器名称title、执行器地址列表address_list(多地址逗号分隔)
(5.4) 如何实现任务执行器的路由?
执行器集群部署时提供丰富的路由策略,包括:
第一个、最后一个、轮询、随机、一致性HASH、最不经常使用、最近最久未使用、故障转移、忙碌转移等;
第一个、最后一个、轮询、随机:都是简单读
address_list
即可- 一致性HASH:
TreeSet
实现一致性hash
算法 - 最不经常使用、最近最久未使用:
HashMap、LinkedHashMap
- 故障转移:遍历
address_list
获取address
时,逐个检查该address
的心跳(请求返回状态);只有心跳正常的address
才返回使用 忙碌转移:遍历
address_list
获取address
时,逐个检查该address
是否忙碌(请求返回状态);只有状态为idle
的address
才返回使用(5.5) 如何实现任务分片、并行执行?
拉出任务的执行机器列表,逐个设置
index / total
,把index / total
分发到任务执行器- 任务执行器可根据
index / total
参数开发分片任务(6)XXL-JOB任务调度流程
1:XXL-Jobadmin平台创建执行器(Job实际执行地址)
2:XXL-Jobadmin平台新建任务,填写对应的执行器
3:Job服务器代码中,使用JobHandler表示该类为Job执行方法
4:当任务执行的时候,会现在XXL-Jobadmin调度平台先执行一次,获取任务中的执行器,然后去对应的执行器地址服务器,执行对应的任务
(7) XXL-JOB环境搭建
(7.1) 源码结构
通过上面给出的源码下载地址,我们将源码clone到IDEA中,如下:
(7.2) 初始化数据库
初始化脚本在上面源码目录的 /doc/db/tables_xxl_job.sql
,将此脚本在MySQL
数据库中执行一遍。
执行完毕,会在MySQL数据库中生成如下 8 张表:
(7.3) 配置调度中心
调度中心就是源码中的 xxl-job-admin
工程,我们需要将其配置成自己需要的调度中心,通过该工程我们能够以图形化的方式统一管理任务调度平台上调度任务,负责触发调度执行。
(7.3.1) 修改调度中心配置文件
### web
server.port=8080
server.servlet.context-path=/xxl-job-admin
### actuator
management.server.servlet.context-path=/actuator
management.health.mail.enabled=false
### resources
spring.mvc.servlet.load-on-startup=0
spring.mvc.static-path-pattern=/static/**
spring.resources.static-locations=classpath:/static/
### freemarker
spring.freemarker.templateLoaderPath=classpath:/templates/
spring.freemarker.suffix=.ftl
spring.freemarker.charset=UTF-8
spring.freemarker.request-context-attribute=request
spring.freemarker.settings.number_format=0.##########
### mybatis
mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml
#mybatis.type-aliases-package=com.xxl.job.admin.core.model
### xxl-job, datasource
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root_pwd
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
### datasource-pool
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.minimum-idle=10
spring.datasource.hikari.maximum-pool-size=30
spring.datasource.hikari.auto-commit=true
spring.datasource.hikari.idle-timeout=30000
spring.datasource.hikari.pool-name=HikariCP
spring.datasource.hikari.max-lifetime=900000
spring.datasource.hikari.connection-timeout=10000
spring.datasource.hikari.connection-test-query=SELECT 1
spring.datasource.hikari.validation-timeout=1000
### xxl-job, email
spring.mail.host=smtp.qq.com
spring.mail.port=25
spring.mail.username=xxx@qq.com
spring.mail.from=xxx@qq.com
spring.mail.password=xxx
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory
### xxl-job, access token
xxl.job.accessToken=
### xxl-job, i18n (default is zh_CN, and you can choose "zh_CN", "zh_TC" and "en")
xxl.job.i18n=zh_CN
## xxl-job, triggerpool max size
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100
### xxl-job, log retention days
xxl.job.logretentiondays=30
注意:
基本上上面的配置文件我们需要修改的只有第 5 点,修改数据库的地址,这要与我们前面初始化的数据库名称径,用户名密码保持一致;第二个就是修改第 6 点,报警邮箱,因为该工程任务失败后有失败告警功能,可以通过邮件来提醒,如果我们需要此功能,可以配置一下。
(7.3.2) 部署调度中心
(7.3.3)访问调度中心管理界面
地址:
(7.4) 创建执行器项目
下面我以创建一个 springboot
版本的执行器为例来介绍:
在源码中作者已经贴心的给出了多种执行器项目示例,可根据你的喜好直接将其部署作为你自己的执行器
现以集成到现有项目为例,将执行器集成到现有的一个Spring-Boot项目Athena中去
步骤一:在你的项目里引入xxl-job-core的依赖
<!-- xxl-rpc-core -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.0</version>
</dependency>
步骤二:执行器配置
在创建好的springboot
项目的配置文件 application.yml
添加如下配置:
#项目端口号
server.port=8081
#日志文件
logging.config= classpath:logback.xml
#调度中心部署跟地址:如调度中心集群部署存在多个地址则用逗号分隔。
#执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调"。
xxl.job.admin.addresses =http://127.0.0.1:8080/xxl-job-admin
### 执行器通讯TOKEN [选填]:非空时启用;
xxl.job.accessToken =
#分别配置执行器的名称、ip地址、端口号
#注意:如果配置多个执行器时,防止端口冲突
xxl.job.executor.appname =executorDemo
xxl.job.executor.address =
xxl.job.executor.ip = 127.0.0.1
xxl.job.executor.port = 9999
#执行器运行日志文件存储的磁盘位置,需要对该路径拥有读写权限
xxl.job.executor.logpath= D:/data/xxl-job/jobhandler
#执行器Log文件定期清理功能,指定日志保存天数,日志文件过期自动删除。限制至少保持3天,否则功能不生效;
#-1表示永不删除
xxl.job.executor.logretentiondays= -1
这里需要注意的是:配置执行器的名称、IP地址、端口号,后面如果配置多个执行器时,要防止端口冲突。再就是执行器的名称,我们后面会到上一步的调度中心管理界面进行对应配置。
步骤三:载入配置文件
在项目中创建 XxlJobConfig.class 文件:
package com.example.studyprojects.config;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}
XXL-JOB执行器的相关配置项:
xxl.job.admin.addresses
调度中心的部署地址。若调度中心采用集群部署,存在多个地址,则用逗号分隔。执行器将会使用该地址进行”执行器心跳注册”和”任务结果回调”。
xxl.job.executor.appname
执行器的应用名称,它是执行器心跳注册的分组依据。
xxl.job.executor.ip
执行器的IP地址,用于”调度中心请求并触发任务”和”执行器注册”。执行器IP默认为空,表示自动获取IP。多网卡时可手动设置指定IP,手动设置IP时将会绑定Host。
xxl.job.executor.port
执行器的端口号,默认值为9999。单机部署多个执行器时,注意要配置不同的执行器端口。
xxl.job.accessToken
执行器的通信令牌,非空时启用。
xxl.job.executor.logpath
执行器输出的日志文件的存储路径,需要拥有该路径的读写权限。
xxl.job.executor.logretentiondays
执行器日志文件的定期清理功能,指定日志保存天数,日志文件过期自动删除。限制至少保存3天,否则功能不生效。
步骤四:创建任务
在项目中创建一个Handler,用于执行我们想要执行的东西,这里我只是简单的打印一行日志:
XXL-JOB, Hello World!!!
/*
任务示例
*/
@Component
public class JobHandlerDemo {
@XxlJob(value = "demoJobHandler")
public ReturnT<String> execute(String s) throws Exception {
System.out.println("=====hello world=====");
// XxlJobLogger.log("XXL-JOB, Hello World.");
return ReturnT.SUCCESS;
}
}
(8) 调度中心中配置执行器,添加任务
调度中心前面我们已经配置好了,启动该配置中心,进入http://localhost:8080/xxl-job-admin 界面。
(8.1) 配置执行器
点击 执行器管理——》新增执行器—-》,如下如下界面,然后填充此表格,点击保存即可。
相关参数说明:
- AppName:是每个执行器集群的唯一标识
AppName
, 执行器会周期性以AppName
为对象进行自动注册
。可通过该配置自动发现注册成功的执行器, 供任务调度时使用; - 名称:执行器的名称, 因为AppName限制字母数字等组成,可读性不强, 名称为了提高执行器的可读性;
注册方式:调度中心获取执行器地址的方式,
- 自动注册:执行器自动进行执行器注册,调度中心通过底层注册表可以动态发现执行器机器地址;
- 手动录入:人工手动录入执行器的地址信息,多地址逗号分隔,供调度中心使用;
机器地址:”注册方式”为”手动录入”时有效,支持人工维护执行器的地址信息;
(8.2) 添加新任务
点击 任务管理—-》新增任务—-》
相关参数说明:执行器:任务的绑定的执行器,任务触发调度时将会自动发现注册成功的执行器, 实现任务自动发现功能; 另一方面也可以方便的进行任务分组。每个任务必须绑定一个执行器, 可在 “执行器管理” 进行设置。
- 任务描述:任务的描述信息,便于任务管理;
- 路由策略:当执行器集群部署时,提供丰富的路由策略,包括;
FIRST(第一个):固定选择第一个机器;
LAST(最后一个):固定选择最后一个机器;
ROUND(轮询):
RANDOM(随机):随机选择在线的机器;
CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
LEAST_RECENTLY_USED(最近最久未使用):最久为使用的机器优先被选举;
FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务; - Cron:触发任务执行的Cron表达式;
- 运行模式:
BEAN模式:任务以JobHandler方式维护在执行器端;需要结合 “JobHandler” 属性匹配执行器中任务;
GLUE模式(Java):任务以源码方式维护在调度中心;该模式的任务实际上是一段继承自IJobHandler的Java类代码并 “groovy” 源码方式维护,它在执行器项目中运行,可使用@Resource/@Autowire注入执行器里中的其他服务;
GLUE模式(Shell):任务以源码方式维护在调度中心;该模式的任务实际上是一段 “shell” 脚本;
GLUE模式(Python):任务以源码方式维护在调度中心;该模式的任务实际上是一段 “python” 脚本;
GLUE模式(PHP):任务以源码方式维护在调度中心;该模式的任务实际上是一段 “php” 脚本;
GLUE模式(NodeJS):任务以源码方式维护在调度中心;该模式的任务实际上是一段 “nodejs” 脚本;
GLUE模式(PowerShell):任务以源码方式维护在调度中心;该模式的任务实际上是一段 “PowerShell” 脚本; - JobHandler:运行模式为 “BEAN模式” 时生效,对应执行器中新开发的JobHandler类“@JobHandler”注解自定义的value值;
- 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务; - 子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度。
- 任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务;
- 失败重试次数;支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;
- 报警邮件:任务调度失败时邮件通知的邮箱地址,支持配置多邮箱地址,配置多个邮箱地址时用逗号分隔;
- 负责人:任务的负责人;
- 执行参数:任务执行所需的参数,多个参数时用逗号分隔,任务执行时将会把多个参数转换成数组传入;
(9) 启动项目 测试任务
配置完执行器以及任务,我们只需要启动该任务,便可以运行了。
启动之后,我们查看后台日志:
定时任务触发成功!!
调度中心查询日志:
调度成功!!
执行成功!!