elastic-job需要以下的环境。

zookeeper + zk客户端zktools
elastic-job控制管理台-console-2.1.4 .zip

zk的启动需要稍微修改下配置文件。
启动zk教程

可视化面板:http://127.0.0.1:8899/# root/root

代码实现

环境完毕之后,开始编写代码。

依赖

  1. <dependency>
  2. <groupId>com.dangdang</groupId>
  3. <artifactId>elastic-job-lite-core</artifactId>
  4. <version>2.1.5</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.dangdang</groupId>
  8. <artifactId>elastic-job-lite-spring</artifactId>
  9. <version>2.1.5</version>
  10. </dependency>

配置文件

  1. regCenter:
  2. serverList: localhost:2181
  3. namespace: springboot2_elasticjob

JobRegistryCenterConfig.java

zk的注册中心

  1. import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
  2. import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. @ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
  9. public class JobRegistryCenterConfig {
  10. @Bean(initMethod = "init")
  11. public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList,
  12. @Value("${regCenter.namespace}") final String namespace) {
  13. return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
  14. }
  15. }

MySimpleJob.java

定时任务。

  1. import com.dangdang.ddframe.job.api.ShardingContext;
  2. import com.dangdang.ddframe.job.api.simple.SimpleJob;
  3. import lombok.Data;
  4. import lombok.extern.slf4j.Slf4j;
  5. @Slf4j
  6. public class MySimpleJob implements SimpleJob {
  7. @Override
  8. public void execute(ShardingContext shardingContext) {
  9. log.info(String.format("Thread ID: %s, 作业分片总数: %s, " +
  10. "当前分片项: %s.当前参数: %s," +
  11. "作业名称: %s.作业自定义参数: %s"
  12. ,
  13. Thread.currentThread().getId(),
  14. shardingContext.getShardingTotalCount(),
  15. shardingContext.getShardingItem(),
  16. shardingContext.getShardingParameter(),
  17. shardingContext.getJobName(),
  18. shardingContext.getJobParameter()
  19. ));
  20. }
  21. }

MyJobConfig.java

把job注册到zk里面,然后设置一些参数

  1. import com.dangdang.ddframe.job.api.simple.SimpleJob;
  2. import com.dangdang.ddframe.job.config.JobCoreConfiguration;
  3. import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
  4. import com.dangdang.ddframe.job.lite.api.JobScheduler;
  5. import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
  6. import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
  7. import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. @Configuration
  12. public class MyJobConfig {
  13. private final String cron = "0/2 * * * * ?";
  14. private final int shardingTotalCount = 2;
  15. private final String shardingItemParameters = "0=A,1=B,2=C";
  16. private final String jobParameters = "parameter";
  17. @Autowired
  18. private ZookeeperRegistryCenter regCenter;
  19. /**
  20. * 将我们的自己的job任务注册到zk里面。别忘了 要初始化一下 init
  21. * @param simpleJob
  22. * @return
  23. */
  24. @Bean(initMethod = "init")
  25. public JobScheduler simpleJobScheduler() {
  26. return new SpringJobScheduler(new MySimpleJob() , regCenter, getLiteJobConfiguration(MySimpleJob.class,
  27. cron, shardingTotalCount, shardingItemParameters, jobParameters));
  28. }
  29. /**
  30. * 就是自定义一些配置
  31. * @param jobClass
  32. * @param cron
  33. * @param shardingTotalCount
  34. * @param shardingItemParameters
  35. * @param jobParameters
  36. * @return
  37. */
  38. private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
  39. final String cron,
  40. final int shardingTotalCount,
  41. final String shardingItemParameters,
  42. final String jobParameters) {
  43. // 定义作业核心配置
  44. JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).
  45. shardingItemParameters(shardingItemParameters).jobParameter(jobParameters).build();
  46. // 定义SIMPLE类型配置
  47. SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, jobClass.getCanonicalName());
  48. // 定义Lite作业根配置
  49. LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();
  50. return simpleJobRootConfig;
  51. }
  52. }

启动就ok了。