elastic-job需要以下的环境。
zookeeper + zk客户端zktools
elastic-job控制管理台-console-2.1.4 .zip
zk的启动需要稍微修改下配置文件。
启动zk教程
可视化面板:http://127.0.0.1:8899/# root/root
代码实现
环境完毕之后,开始编写代码。
依赖
<dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-core</artifactId><version>2.1.5</version></dependency><dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-spring</artifactId><version>2.1.5</version></dependency>
配置文件
regCenter:serverList: localhost:2181namespace: springboot2_elasticjob
JobRegistryCenterConfig.java
zk的注册中心
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration@ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")public class JobRegistryCenterConfig {@Bean(initMethod = "init")public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList,@Value("${regCenter.namespace}") final String namespace) {return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));}}
MySimpleJob.java
定时任务。
import com.dangdang.ddframe.job.api.ShardingContext;import com.dangdang.ddframe.job.api.simple.SimpleJob;import lombok.Data;import lombok.extern.slf4j.Slf4j;@Slf4jpublic class MySimpleJob implements SimpleJob {@Overridepublic void execute(ShardingContext shardingContext) {log.info(String.format("Thread ID: %s, 作业分片总数: %s, " +"当前分片项: %s.当前参数: %s," +"作业名称: %s.作业自定义参数: %s",Thread.currentThread().getId(),shardingContext.getShardingTotalCount(),shardingContext.getShardingItem(),shardingContext.getShardingParameter(),shardingContext.getJobName(),shardingContext.getJobParameter()));}}
MyJobConfig.java
把job注册到zk里面,然后设置一些参数
import com.dangdang.ddframe.job.api.simple.SimpleJob;import com.dangdang.ddframe.job.config.JobCoreConfiguration;import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;import com.dangdang.ddframe.job.lite.api.JobScheduler;import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class MyJobConfig {private final String cron = "0/2 * * * * ?";private final int shardingTotalCount = 2;private final String shardingItemParameters = "0=A,1=B,2=C";private final String jobParameters = "parameter";@Autowiredprivate ZookeeperRegistryCenter regCenter;/*** 将我们的自己的job任务注册到zk里面。别忘了 要初始化一下 init* @param simpleJob* @return*/@Bean(initMethod = "init")public JobScheduler simpleJobScheduler() {return new SpringJobScheduler(new MySimpleJob() , regCenter, getLiteJobConfiguration(MySimpleJob.class,cron, shardingTotalCount, shardingItemParameters, jobParameters));}/*** 就是自定义一些配置* @param jobClass* @param cron* @param shardingTotalCount* @param shardingItemParameters* @param jobParameters* @return*/private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,final String cron,final int shardingTotalCount,final String shardingItemParameters,final String jobParameters) {// 定义作业核心配置JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).jobParameter(jobParameters).build();// 定义SIMPLE类型配置SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, jobClass.getCanonicalName());// 定义Lite作业根配置LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();return simpleJobRootConfig;}}
启动就ok了。
