1.引入依赖
<dependency><groupId>com.xy</groupId><artifactId>xy-core-job-elastic-boot-starter-monitor</artifactId><version>${xy-core-job-elastic-boot-starter-version}</version></dependency>
2.配置定时任务
elasticjob.enabled=trueelasticjob.reg-center.server-lists=zookeeper-svc:2181elasticjob.reg-center.namespace=job/${spring.application.name}#是否记录traceelasticjob.datasource.enable=trueelasticjob.datasource.url=jdbc:mysql://t-mysql.celcxdxq82fd.us-west-2.rds.amazonaws.com:3306/trendsi_trace?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=trueelasticjob.datasource.type=com.zaxxer.hikari.HikariDataSourceelasticjob.datasource.driver-class-name=com.mysql.cj.jdbc.Driverelasticjob.datasource.username=trendsielasticjob.datasource.password=b3?(`*f_:h8SBXr$#定时任务完整路径名,testJob为任务名,不可重复elastic-job.jobs.testJob.elastic-job-class=com.company.demo.job.SampleJob#任务执行时间/频率elastic-job.jobs.testJob.cron=0/5 * * * * ?#分片数elastic-job.jobs.testJob.sharding-total-count=1#分片参数elastic-job.jobs.testJob.sharding-item-parameters=0=Beijing,1=Shanghai,2=Guangzhou#任务失败是否专一elastic-job.jobs.testJob.failover=true#重启时任务配置是否覆盖elastic-job.jobs.testJob.overwrite=true#任务描述elasticjob.jobs.testJob.description=测试demo#错过是否执行elasticjob.jobs.testJob.misfire=true
3.定义任务
3.1 原生定义
实现SimpleJob接口即可
@Slf4j@Component@RequiredArgsConstructor(onConstructor = @__(@Autowired))public class SampleJob implements SimpleJob {private final RedisTemplate<String, String> redisTemplate;private final AsyncService asyncService;private final TaskNotifyRepository taskNotifyRepository;/*** 1.当分片数为1时,在同一个zooKeeper和jobName情况下,多台机器部署了ElasticJob时,只有拿到shardingContext.getShardingItem()为0的机器得以执行,其他的机器不执行* 2.当分片数大于1时,假如有3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。此时每台服务器可根据拿到的shardingItem值进行相应的处理,* 举例场景:* 假如job处理数据库中的数据业务,方法为:A服务器处理数据库中Id以0,1,2结尾的数据,B处理数据库中Id以3,4,5结尾的数据,C处理器处理6,7,8,9结尾的数据,合计处理0-9为全部数据* 如果服务器C崩溃,Elastic* Job自动进行进行失效转移,将C服务器的分片转移到A和B服务器上,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9* 此时,A服务器处理数据库中Id以0,1,2,3,4结尾的数据,B处理数据库中Id以5,6,7,8,9结尾的数据,合计处理0-9为全部数据.* <p>* int shardingTotalCount = shardingContext.getShardingTotalCount();* <p>* String shardingParameter = shardingContext.getShardingParameter();** @param shardingContext*/@Overridepublic void execute(ShardingContext shardingContext) {String jobParameter = shardingContext.getJobParameter();String shardingParameter = shardingContext.getShardingParameter();String format = MessageFormat.format("jobParameter:{0},shardingParameter:{1}", jobParameter, shardingParameter);log.info(format);switch (shardingContext.getShardingItem()) {case 0:// log.info(format);// throw new RuntimeException("mmmmmmmm");//Cat.logError("ShardingItem:0", new RuntimeException("mmmmmmmm"));break;case 1:// log.info(format);break;case 2:// log.info(format);break;}}@Overrideprotected JobDescInfo getJobDescInfo() {JobDescInfo jobDescInfo = new JobDescInfo();jobDescInfo.setJobName("demoJob");jobDescInfo.setAuthor("jack.li");jobDescInfo.setJobDesc("这是一个测试任务");return jobDescInfo;}}
3.2cat api监控
实现封装有cat打点功能的AbstractSimpleJob
@Slf4j@Component@RequiredArgsConstructor(onConstructor = @__(@Autowired))public class SampleJob extends AbstractSimpleJob {private final RedisTemplate<String, String> redisTemplate;private final AsyncService asyncService;private final TaskNotifyRepository taskNotifyRepository;/*** 1.当分片数为1时,在同一个zooKeeper和jobName情况下,多台机器部署了ElasticJob时,只有拿到shardingContext.getShardingItem()为0的机器得以执行,其他的机器不执行* 2.当分片数大于1时,假如有3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。此时每台服务器可根据拿到的shardingItem值进行相应的处理,* 举例场景:* 假如job处理数据库中的数据业务,方法为:A服务器处理数据库中Id以0,1,2结尾的数据,B处理数据库中Id以3,4,5结尾的数据,C处理器处理6,7,8,9结尾的数据,合计处理0-9为全部数据* 如果服务器C崩溃,Elastic* Job自动进行进行失效转移,将C服务器的分片转移到A和B服务器上,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9* 此时,A服务器处理数据库中Id以0,1,2,3,4结尾的数据,B处理数据库中Id以5,6,7,8,9结尾的数据,合计处理0-9为全部数据.* <p>* int shardingTotalCount = shardingContext.getShardingTotalCount();* <p>* String shardingParameter = shardingContext.getShardingParameter();** @param shardingContext*/@Override@DistributedLock(businessType = "Txt00111111", keys = {})public void executeJob(ShardingContext shardingContext) {String jobParameter = shardingContext.getJobParameter();String shardingParameter = shardingContext.getShardingParameter();String format = MessageFormat.format("jobParameter:{0},shardingParameter:{1}", jobParameter, shardingParameter);log.info(format);switch (shardingContext.getShardingItem()) {case 0:// log.info(format);// throw new RuntimeException("mmmmmmmm");//Cat.logError("ShardingItem:0", new RuntimeException("mmmmmmmm"));break;case 1:// log.info(format);break;case 2:// log.info(format);break;}}@Overrideprotected JobDescInfo getJobDescInfo() {JobDescInfo jobDescInfo = new JobDescInfo();jobDescInfo.setJobName("demoJob");jobDescInfo.setAuthor("jack.li");jobDescInfo.setJobDesc("这是一个测试任务");return jobDescInfo;}}
