1.引入依赖

  1. <dependency>
  2. <groupId>com.xy</groupId>
  3. <artifactId>xy-core-job-elastic-boot-starter-monitor</artifactId>
  4. <version>${xy-core-job-elastic-boot-starter-version}</version>
  5. </dependency>

2.配置定时任务

  1. elasticjob.enabled=true
  2. elasticjob.reg-center.server-lists=zookeeper-svc:2181
  3. elasticjob.reg-center.namespace=job/${spring.application.name}
  4. #是否记录trace
  5. elasticjob.datasource.enable=true
  6. elasticjob.datasource.url=jdbc:mysql://t-mysql.celcxdxq82fd.us-west-2.rds.amazonaws.com:3306/trendsi_trace?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
  7. elasticjob.datasource.type=com.zaxxer.hikari.HikariDataSource
  8. elasticjob.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
  9. elasticjob.datasource.username=trendsi
  10. elasticjob.datasource.password=b3?(`*f_:h8SBXr$
  11. #定时任务完整路径名,testJob为任务名,不可重复
  12. elastic-job.jobs.testJob.elastic-job-class=com.company.demo.job.SampleJob
  13. #任务执行时间/频率
  14. elastic-job.jobs.testJob.cron=0/5 * * * * ?
  15. #分片数
  16. elastic-job.jobs.testJob.sharding-total-count=1
  17. #分片参数
  18. elastic-job.jobs.testJob.sharding-item-parameters=0=Beijing,1=Shanghai,2=Guangzhou
  19. #任务失败是否专一
  20. elastic-job.jobs.testJob.failover=true
  21. #重启时任务配置是否覆盖
  22. elastic-job.jobs.testJob.overwrite=true
  23. #任务描述
  24. elasticjob.jobs.testJob.description=测试demo
  25. #错过是否执行
  26. elasticjob.jobs.testJob.misfire=true

3.定义任务

3.1 原生定义

实现SimpleJob接口即可

  1. @Slf4j
  2. @Component
  3. @RequiredArgsConstructor(onConstructor = @__(@Autowired))
  4. public class SampleJob implements SimpleJob {
  5. private final RedisTemplate<String, String> redisTemplate;
  6. private final AsyncService asyncService;
  7. private final TaskNotifyRepository taskNotifyRepository;
  8. /**
  9. * 1.当分片数为1时,在同一个zooKeeper和jobName情况下,多台机器部署了ElasticJob时,只有拿到shardingContext.getShardingItem()为0的机器得以执行,其他的机器不执行
  10. * 2.当分片数大于1时,假如有3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。此时每台服务器可根据拿到的shardingItem值进行相应的处理,
  11. * 举例场景:
  12. * 假如job处理数据库中的数据业务,方法为:A服务器处理数据库中Id以0,1,2结尾的数据,B处理数据库中Id以3,4,5结尾的数据,C处理器处理6,7,8,9结尾的数据,合计处理0-9为全部数据
  13. * 如果服务器C崩溃,Elastic
  14. * Job自动进行进行失效转移,将C服务器的分片转移到A和B服务器上,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9
  15. * 此时,A服务器处理数据库中Id以0,1,2,3,4结尾的数据,B处理数据库中Id以5,6,7,8,9结尾的数据,合计处理0-9为全部数据.
  16. * <p>
  17. * int shardingTotalCount = shardingContext.getShardingTotalCount();
  18. * <p>
  19. * String shardingParameter = shardingContext.getShardingParameter();
  20. *
  21. * @param shardingContext
  22. */
  23. @Override
  24. public void execute(ShardingContext shardingContext) {
  25. String jobParameter = shardingContext.getJobParameter();
  26. String shardingParameter = shardingContext.getShardingParameter();
  27. String format = MessageFormat.format("jobParameter:{0},shardingParameter:{1}", jobParameter, shardingParameter);
  28. log.info(format);
  29. switch (shardingContext.getShardingItem()) {
  30. case 0:
  31. // log.info(format);
  32. // throw new RuntimeException("mmmmmmmm");
  33. //Cat.logError("ShardingItem:0", new RuntimeException("mmmmmmmm"));
  34. break;
  35. case 1:
  36. // log.info(format);
  37. break;
  38. case 2:
  39. // log.info(format);
  40. break;
  41. }
  42. }
  43. @Override
  44. protected JobDescInfo getJobDescInfo() {
  45. JobDescInfo jobDescInfo = new JobDescInfo();
  46. jobDescInfo.setJobName("demoJob");
  47. jobDescInfo.setAuthor("jack.li");
  48. jobDescInfo.setJobDesc("这是一个测试任务");
  49. return jobDescInfo;
  50. }
  51. }

3.2cat api监控

实现封装有cat打点功能的AbstractSimpleJob

  1. @Slf4j
  2. @Component
  3. @RequiredArgsConstructor(onConstructor = @__(@Autowired))
  4. public class SampleJob extends AbstractSimpleJob {
  5. private final RedisTemplate<String, String> redisTemplate;
  6. private final AsyncService asyncService;
  7. private final TaskNotifyRepository taskNotifyRepository;
  8. /**
  9. * 1.当分片数为1时,在同一个zooKeeper和jobName情况下,多台机器部署了ElasticJob时,只有拿到shardingContext.getShardingItem()为0的机器得以执行,其他的机器不执行
  10. * 2.当分片数大于1时,假如有3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。此时每台服务器可根据拿到的shardingItem值进行相应的处理,
  11. * 举例场景:
  12. * 假如job处理数据库中的数据业务,方法为:A服务器处理数据库中Id以0,1,2结尾的数据,B处理数据库中Id以3,4,5结尾的数据,C处理器处理6,7,8,9结尾的数据,合计处理0-9为全部数据
  13. * 如果服务器C崩溃,Elastic
  14. * Job自动进行进行失效转移,将C服务器的分片转移到A和B服务器上,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9
  15. * 此时,A服务器处理数据库中Id以0,1,2,3,4结尾的数据,B处理数据库中Id以5,6,7,8,9结尾的数据,合计处理0-9为全部数据.
  16. * <p>
  17. * int shardingTotalCount = shardingContext.getShardingTotalCount();
  18. * <p>
  19. * String shardingParameter = shardingContext.getShardingParameter();
  20. *
  21. * @param shardingContext
  22. */
  23. @Override
  24. @DistributedLock(businessType = "Txt00111111", keys = {})
  25. public void executeJob(ShardingContext shardingContext) {
  26. String jobParameter = shardingContext.getJobParameter();
  27. String shardingParameter = shardingContext.getShardingParameter();
  28. String format = MessageFormat.format("jobParameter:{0},shardingParameter:{1}", jobParameter, shardingParameter);
  29. log.info(format);
  30. switch (shardingContext.getShardingItem()) {
  31. case 0:
  32. // log.info(format);
  33. // throw new RuntimeException("mmmmmmmm");
  34. //Cat.logError("ShardingItem:0", new RuntimeException("mmmmmmmm"));
  35. break;
  36. case 1:
  37. // log.info(format);
  38. break;
  39. case 2:
  40. // log.info(format);
  41. break;
  42. }
  43. }
  44. @Override
  45. protected JobDescInfo getJobDescInfo() {
  46. JobDescInfo jobDescInfo = new JobDescInfo();
  47. jobDescInfo.setJobName("demoJob");
  48. jobDescInfo.setAuthor("jack.li");
  49. jobDescInfo.setJobDesc("这是一个测试任务");
  50. return jobDescInfo;
  51. }
  52. }