1.引入依赖
<dependency>
<groupId>com.xy</groupId>
<artifactId>xy-core-job-elastic-boot-starter-monitor</artifactId>
<version>${xy-core-job-elastic-boot-starter-version}</version>
</dependency>
2.配置定时任务
elasticjob.enabled=true
elasticjob.reg-center.server-lists=zookeeper-svc:2181
elasticjob.reg-center.namespace=job/${spring.application.name}
#是否记录trace
elasticjob.datasource.enable=true
elasticjob.datasource.url=jdbc:mysql://t-mysql.celcxdxq82fd.us-west-2.rds.amazonaws.com:3306/trendsi_trace?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
elasticjob.datasource.type=com.zaxxer.hikari.HikariDataSource
elasticjob.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
elasticjob.datasource.username=trendsi
elasticjob.datasource.password=b3?(`*f_:h8SBXr$
#定时任务完整路径名,testJob为任务名,不可重复
elastic-job.jobs.testJob.elastic-job-class=com.company.demo.job.SampleJob
#任务执行时间/频率
elastic-job.jobs.testJob.cron=0/5 * * * * ?
#分片数
elastic-job.jobs.testJob.sharding-total-count=1
#分片参数
elastic-job.jobs.testJob.sharding-item-parameters=0=Beijing,1=Shanghai,2=Guangzhou
#任务失败是否专一
elastic-job.jobs.testJob.failover=true
#重启时任务配置是否覆盖
elastic-job.jobs.testJob.overwrite=true
#任务描述
elasticjob.jobs.testJob.description=测试demo
#错过是否执行
elasticjob.jobs.testJob.misfire=true
3.定义任务
3.1 原生定义
实现SimpleJob接口即可
@Slf4j
@Component
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class SampleJob implements SimpleJob {
private final RedisTemplate<String, String> redisTemplate;
private final AsyncService asyncService;
private final TaskNotifyRepository taskNotifyRepository;
/**
* 1.当分片数为1时,在同一个zooKeeper和jobName情况下,多台机器部署了ElasticJob时,只有拿到shardingContext.getShardingItem()为0的机器得以执行,其他的机器不执行
* 2.当分片数大于1时,假如有3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。此时每台服务器可根据拿到的shardingItem值进行相应的处理,
* 举例场景:
* 假如job处理数据库中的数据业务,方法为:A服务器处理数据库中Id以0,1,2结尾的数据,B处理数据库中Id以3,4,5结尾的数据,C处理器处理6,7,8,9结尾的数据,合计处理0-9为全部数据
* 如果服务器C崩溃,Elastic
* Job自动进行进行失效转移,将C服务器的分片转移到A和B服务器上,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9
* 此时,A服务器处理数据库中Id以0,1,2,3,4结尾的数据,B处理数据库中Id以5,6,7,8,9结尾的数据,合计处理0-9为全部数据.
* <p>
* int shardingTotalCount = shardingContext.getShardingTotalCount();
* <p>
* String shardingParameter = shardingContext.getShardingParameter();
*
* @param shardingContext
*/
@Override
public void execute(ShardingContext shardingContext) {
String jobParameter = shardingContext.getJobParameter();
String shardingParameter = shardingContext.getShardingParameter();
String format = MessageFormat.format("jobParameter:{0},shardingParameter:{1}", jobParameter, shardingParameter);
log.info(format);
switch (shardingContext.getShardingItem()) {
case 0:
// log.info(format);
// throw new RuntimeException("mmmmmmmm");
//Cat.logError("ShardingItem:0", new RuntimeException("mmmmmmmm"));
break;
case 1:
// log.info(format);
break;
case 2:
// log.info(format);
break;
}
}
@Override
protected JobDescInfo getJobDescInfo() {
JobDescInfo jobDescInfo = new JobDescInfo();
jobDescInfo.setJobName("demoJob");
jobDescInfo.setAuthor("jack.li");
jobDescInfo.setJobDesc("这是一个测试任务");
return jobDescInfo;
}
}
3.2cat api监控
实现封装有cat打点功能的AbstractSimpleJob
@Slf4j
@Component
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class SampleJob extends AbstractSimpleJob {
private final RedisTemplate<String, String> redisTemplate;
private final AsyncService asyncService;
private final TaskNotifyRepository taskNotifyRepository;
/**
* 1.当分片数为1时,在同一个zooKeeper和jobName情况下,多台机器部署了ElasticJob时,只有拿到shardingContext.getShardingItem()为0的机器得以执行,其他的机器不执行
* 2.当分片数大于1时,假如有3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。此时每台服务器可根据拿到的shardingItem值进行相应的处理,
* 举例场景:
* 假如job处理数据库中的数据业务,方法为:A服务器处理数据库中Id以0,1,2结尾的数据,B处理数据库中Id以3,4,5结尾的数据,C处理器处理6,7,8,9结尾的数据,合计处理0-9为全部数据
* 如果服务器C崩溃,Elastic
* Job自动进行进行失效转移,将C服务器的分片转移到A和B服务器上,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9
* 此时,A服务器处理数据库中Id以0,1,2,3,4结尾的数据,B处理数据库中Id以5,6,7,8,9结尾的数据,合计处理0-9为全部数据.
* <p>
* int shardingTotalCount = shardingContext.getShardingTotalCount();
* <p>
* String shardingParameter = shardingContext.getShardingParameter();
*
* @param shardingContext
*/
@Override
@DistributedLock(businessType = "Txt00111111", keys = {})
public void executeJob(ShardingContext shardingContext) {
String jobParameter = shardingContext.getJobParameter();
String shardingParameter = shardingContext.getShardingParameter();
String format = MessageFormat.format("jobParameter:{0},shardingParameter:{1}", jobParameter, shardingParameter);
log.info(format);
switch (shardingContext.getShardingItem()) {
case 0:
// log.info(format);
// throw new RuntimeException("mmmmmmmm");
//Cat.logError("ShardingItem:0", new RuntimeException("mmmmmmmm"));
break;
case 1:
// log.info(format);
break;
case 2:
// log.info(format);
break;
}
}
@Override
protected JobDescInfo getJobDescInfo() {
JobDescInfo jobDescInfo = new JobDescInfo();
jobDescInfo.setJobName("demoJob");
jobDescInfo.setAuthor("jack.li");
jobDescInfo.setJobDesc("这是一个测试任务");
return jobDescInfo;
}
}