Java SpringBoot Elastic-Job
Elastic-Job是当当开源的一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务;Elastic-Job-Cloud采用自研Mesos Framework的解决方案,额外提供资源治理、应用分发以及进程隔离等功能。
这里以Elastic-Job-lite为例,跟SpringBoot进行整合,当当的官方文档中并没有对SpringBoot集成作说明,所有的配置都是基于文档中的xml的配置修改出来的。

起步

准备好一个SpringBoot的项目,pom.xml中引入Elastic-job,mysql,jpa等依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-test</artifactId>
  9. <scope>test</scope>
  10. </dependency>
  11. <dependency>
  12. <groupId>com.dangdang</groupId>
  13. <artifactId>elastic-job-lite-spring</artifactId>
  14. <version>2.1.5</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.projectlombok</groupId>
  18. <artifactId>lombok</artifactId>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-data-jpa</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>mysql</groupId>
  26. <artifactId>mysql-connector-java</artifactId>
  27. </dependency>
  28. <dependency>
  29. <groupId>com.zaxxer</groupId>
  30. <artifactId>HikariCP</artifactId>
  31. </dependency>
  32. </dependencies>

配置

使用yaml进行相关属性的配置,主要配置的是数据库连接池,jpa

  1. elasticjob:
  2. serverlists: 172.31.31.48:2181
  3. namespace: boot-job
  4. spring:
  5. datasource:
  6. url: jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false
  7. driver-class-name: com.mysql.jdbc.Driver
  8. username: root
  9. password: root
  10. type: com.zaxxer.hikari.HikariDataSource
  11. # 自动创建更新验证数据库结构
  12. jpa:
  13. hibernate:
  14. ddl-auto: update
  15. show-sql: true
  16. database: mysql

elastic-job相关的配置使用java配置实现,代替官方文档的xml配置

  1. @Configuration
  2. @Data
  3. @ConfigurationProperties(prefix = "elasticjob")
  4. public class ElasticJobConfig {
  5. private String serverlists;
  6. private String namespace;
  7. @Resource
  8. private HikariDataSource dataSource;
  9. @Bean
  10. public ZookeeperConfiguration zkConfig() {
  11. return new ZookeeperConfiguration(serverlists, namespace);
  12. }
  13. @Bean(initMethod = "init")
  14. public ZookeeperRegistryCenter regCenter(ZookeeperConfiguration config) {
  15. return new ZookeeperRegistryCenter(config);
  16. }
  17. /**
  18. * 将作业运行的痕迹进行持久化到DB
  19. *
  20. * @return
  21. */
  22. @Bean
  23. public JobEventConfiguration jobEventConfiguration() {
  24. return new JobEventRdbConfiguration(dataSource);
  25. }
  26. @Bean
  27. public ElasticJobListener elasticJobListener() {
  28. return new ElasticJobListener(100, 100);
  29. }
  30. }

所有相关的配置到这里就已经OK了,接下来开始具体的编码实现

定时任务实现

先实现一个自己的任务类,需要实现elastic-job提供的SimpleJob接口,实现它的execute(ShardingContext shardingContext)方法

  1. @Slf4j
  2. public class MyElasticJob implements SimpleJob {
  3. @Override
  4. public void execute(ShardingContext shardingContext) {
  5. //打印出任务相关信息,JobParameter用于传递任务的ID
  6. log.info("任务名:{}, 片数:{}, id={}", shardingContext.getJobName(), shardingContext.getShardingTotalCount(),
  7. shardingContext.getJobParameter());
  8. }
  9. }

接下来实现一个分布式的任务监听器,如果任务有分片,分布式监听器会在总的任务开始前执行一次,结束时执行一次。监听器在之前的ElasticJobConfig已经注册到了Spring容器之中。

  1. public class ElasticJobListener extends AbstractDistributeOnceElasticJobListener {
  2. @Resource
  3. private TaskRepository taskRepository;
  4. public ElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
  5. super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
  6. }
  7. @Override
  8. public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
  9. }
  10. @Override
  11. public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
  12. //任务执行完成后更新状态为已执行
  13. JobTask jobTask = taskRepository.findOne(Long.valueOf(shardingContexts.getJobParameter()));
  14. jobTask.setStatus(1);
  15. taskRepository.save(jobTask);
  16. }
  17. }

实现一个ElasticJobHandler,用于向Elastic-job中添加指定的作业配置,作业配置分为3级,分别是JobCoreConfigurationJobTypeConfigurationLiteJobConfigurationLiteJobConfiguration使用JobTypeConfigurationJobTypeConfiguration使用JobCoreConfiguration,层层嵌套。

  1. @Component
  2. public class ElasticJobHandler {
  3. @Resource
  4. private ZookeeperRegistryCenter registryCenter;
  5. @Resource
  6. private JobEventConfiguration jobEventConfiguration;
  7. @Resource
  8. private ElasticJobListener elasticJobListener;
  9. /**
  10. * @param jobName
  11. * @param jobClass
  12. * @param shardingTotalCount
  13. * @param cron
  14. * @param id 数据ID
  15. * @return
  16. */
  17. private static LiteJobConfiguration.Builder simpleJobConfigBuilder(String jobName,
  18. Class<? extends SimpleJob> jobClass,
  19. int shardingTotalCount,
  20. String cron,
  21. String id) {
  22. return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
  23. JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount).jobParameter(id).build(), jobClass.getCanonicalName()));
  24. }
  25. /**
  26. * 添加一个定时任务
  27. *
  28. * @param jobName 任务名
  29. * @param cron 表达式
  30. * @param shardingTotalCount 分片数
  31. */
  32. public void addJob(String jobName, String cron, Integer shardingTotalCount, String id) {
  33. LiteJobConfiguration jobConfig = simpleJobConfigBuilder(jobName, MyElasticJob.class, shardingTotalCount, cron, id)
  34. .overwrite(true).build();
  35. new SpringJobScheduler(new MyElasticJob(), registryCenter, jobConfig, jobEventConfiguration, elasticJobListener).init();
  36. }
  37. }

到这里,elastic-job的注册中心,数据源相关配置,以及动态添加的逻辑已经做完了,接下来在service中调用上面写好的方法,验证功能是否正常。
编写一个ElasticJobService类,扫描数据库中状态为0的任务,并且把这些任务添加到Elastic-job中,这里的相关数据库操作使用了spring-data-jpa,dao层相关代码就不贴了,可以在源码中查看。

  1. @Service
  2. public class ElasticJobService {
  3. @Resource
  4. private ElasticJobHandler jobHandler;
  5. @Resource
  6. private TaskRepository taskRepository;
  7. /**
  8. * 扫描db,并添加任务
  9. */
  10. public void scanAddJob() {
  11. Specification query = (Specification<JobTask>) (root, criteriaQuery, criteriaBuilder) -> criteriaBuilder
  12. .and(criteriaBuilder.equal(root.get("status"), 0));
  13. List<JobTask> jobTasks = taskRepository.findAll(query);
  14. jobTasks.forEach(jobTask -> {
  15. Long current = System.currentTimeMillis();
  16. String jobName = "job" + jobTask.getSendTime();
  17. String cron;
  18. //说明消费未发送,但是已经过了消息的发送时间,调整时间继续执行任务
  19. if (jobTask.getSendTime() < current) {
  20. //设置为一分钟之后执行,把Date转换为cron表达式
  21. cron = CronUtils.getCron(new Date(current + 60000));
  22. } else {
  23. cron = CronUtils.getCron(new Date(jobTask.getSendTime()));
  24. }
  25. jobHandler.addJob(jobName, cron, 1, String.valueOf(jobTask.getId()));
  26. });
  27. }
  28. }

在Junit中添加几条测试数据

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @SpringBootTest
  3. public class JobTaskTest {
  4. @Resource
  5. private TaskRepository taskRepository;
  6. @Test
  7. public void add() {
  8. //生成几个任务,第一任务在三分钟之后
  9. Long unixTime = System.currentTimeMillis() + 60000;
  10. JobTask task = new JobTask("test-msg-1", 0, unixTime);
  11. taskRepository.save(task);
  12. unixTime += 60000;
  13. task = new JobTask("test-msg-2", 0, unixTime);
  14. taskRepository.save(task);
  15. unixTime += 60000;
  16. task = new JobTask("test-msg-3", 0, unixTime);
  17. taskRepository.save(task);
  18. unixTime += 60000;
  19. task = new JobTask("test-msg-4", 0, unixTime);
  20. taskRepository.save(task);
  21. }
  22. }

此时,数据库中多了四条状态为0的数据
最后,就可以开始验证整个流程了,代码如下

  1. @SpringBootApplication
  2. public class ElasticJobApplication implements CommandLineRunner {
  3. @Resource
  4. private ElasticJobService elasticJobService;
  5. public static void main(String[] args) {
  6. SpringApplication.run(ElasticJobApplication.class, args);
  7. }
  8. @Override
  9. public void run(String... strings) throws Exception {
  10. elasticJobService.scanAddJob();
  11. }
  12. }

可以看到,在启动过程中,多个任务被加入到了Elastic-job中,并且一小段时间之后,任务一次执行,执行成功之后,因为配置了监听器,会打印数据库的更新SQL,当任务执行完成,再查看数据库,发现状态也更改成功。
数据库中同时也会多出两张表JOB_EXECUTION_LOGJOB_STATUS_TRACE_LOG,这是之前配置的JobEventConfiguration,通过数据源持久化了作业配置的相关数据,这两张表的数据可以供Elastic-job提供的运维平台使用,具体请查看官方文档。

总结

至此,整个流程就已经走完了,整个demo中主要用到了Elastic-job和spring-data-jpa相关的技术,作为demo,肯定会有一些缺陷,没考虑到的地方,可以根据自己的业务场景进行改进。