定时任务线程池配置
@Component
@Slf4j
public class QuartzJobConfigService {
@Bean("detectScheduler")
private Scheduler getScheduler(QuartzFactory jobFactory) {
Scheduler scheduler = getSchedulerFactoryBean(jobFactory).getScheduler();
return scheduler;
}
@Bean("jobFactory")
private QuartzFactory getQuartzFactory(AutowireCapableBeanFactory capableBeanFactory){
QuartzFactory factory = new QuartzFactory(capableBeanFactory);
return factory;
}
class QuartzFactory extends AdaptableJobFactory {
public QuartzFactory(AutowireCapableBeanFactory capableBeanFactory) {
this.capableBeanFactory = capableBeanFactory;
}
private AutowireCapableBeanFactory capableBeanFactory;
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
Object jobInstance = super.createJobInstance(bundle);
capableBeanFactory.autowireBean(jobInstance); //这一步解决不能spring注入bean的问题
return jobInstance;
}
}
private SchedulerFactoryBean getSchedulerFactoryBean(QuartzFactory jobFactory) {
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
try {
schedulerFactoryBean.setSchedulerName("Device-Detect-Scheduler");
schedulerFactoryBean.setJobFactory(jobFactory);
schedulerFactoryBean.setOverwriteExistingJobs(true);
schedulerFactoryBean.setStartupDelay(1);
ExecutorService threadPool = new ThreadPoolExecutor(10, 10, 5L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000), new NamedThreadFactory("device-detect-quartz-thread"));
schedulerFactoryBean.setTaskExecutor(threadPool);
schedulerFactoryBean.afterPropertiesSet();
} catch (Exception e) {
log.error(ModelCatalogCenterLogHelper.message(" getSchedulerFactoryBean error..."), e);
}
return schedulerFactoryBean;
}
}
定时任务执行逻辑
import org.quartz.CronTrigger;
import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.Trigger;
/**
* 定时任务功能
*
* @author 韩仁松
* @since businessV1.0.0
*/
public interface JobExecuteService {
JobDetail createJobDetail(Class<? extends Job> jobClass, String jobName, String jobGroup);
CronTrigger createCronTrigger(String triggerName, String triggerGroup, String cron) ;
void scheduleJob(JobDetail jobDetail, Trigger trigger);
void removeJob(String jobName, String jobGroup, String triggerName, String triggerGroup);
boolean existJob(String jobName, String jobGroup);
}
@Service
@Slf4j
public class JobExecuteServiceImpl implements JobExecuteService {
@Autowired
@Qualifier("detectScheduler")
private Scheduler scheduler;
@Override
public JobDetail createJobDetail(Class<? extends Job> jobClass, String jobName, String jobGroup) {
if(jobClass==null) {
throw new BusinessException(ErrorCode.B_PARAM_VALID);
}
if(StringUtils.isEmpty(jobName)) {
throw new BusinessException(ErrorCode.B_PARAM_VALID);
}
return JobBuilder.newJob(jobClass).withIdentity(jobName,jobGroup).build();
}
@Override
public CronTrigger createCronTrigger(String triggerName, String triggerGroup, String cron) {
if(StringUtils.isEmpty(cron)) {
throw new BusinessException(ErrorCode.B_PARAM_VALID);
}
if(StringUtils.isEmpty(triggerName)) {
throw new BusinessException(ErrorCode.B_PARAM_VALID);
}
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cron);
return TriggerBuilder.newTrigger()
.withIdentity(triggerName, triggerGroup).withSchedule(cronScheduleBuilder)
.build();
}
@Override
public void scheduleJob(JobDetail jobDetail, Trigger trigger) {
if(jobDetail==null) {
throw new BusinessException(ErrorCode.B_PARAM_VALID);
}
if(trigger==null) {
throw new BusinessException(ErrorCode.B_PARAM_VALID);
}
try {
scheduler.scheduleJob(jobDetail, trigger);
startJob(scheduler);
} catch (Exception e) {
log.error("scheduleJob method error...",e);
}
}
@Override
public void removeJob(String jobName, String jobGroup, String triggerName, String triggerGroup) {
if(StringUtils.isEmpty(jobName)) {
throw new BusinessException(ErrorCode.B_PARAM_VALID);
}
if(StringUtils.isEmpty(jobGroup)) {
throw new BusinessException(ErrorCode.B_PARAM_VALID);
}
if(StringUtils.isEmpty(triggerName)) {
throw new BusinessException(ErrorCode.B_PARAM_VALID);
}
if(StringUtils.isEmpty(triggerGroup)) {
throw new BusinessException(ErrorCode.B_PARAM_VALID);
}
JobKey jobKey = new JobKey(jobName, jobGroup);
TriggerKey triggerKey = new TriggerKey(triggerName, triggerGroup);
try {
scheduler.pauseTrigger(triggerKey);
scheduler.unscheduleJob(triggerKey);
scheduler.deleteJob(jobKey);
} catch (SchedulerException e) {
log.error(("removeJob method error, job is {}, trigger is {}"), jobName, triggerName, e);
}
}
/**
* @param scheduler
*/
private void startJob(Scheduler scheduler) {
try {
if(scheduler!=null) {
if(!scheduler.isStarted()) {
scheduler.start();
}
}
} catch (SchedulerException e) {
log.error(("startJob method error..."),e);
}
}
@Override
public boolean existJob(String jobName, String jobGroup) {
if(StringUtils.isEmpty(jobName)) {
return false;
}
JobKey jobKey = new JobKey(jobName, jobGroup);
try {
return scheduler.checkExists(jobKey);
} catch (SchedulerException e) {
log.error(("existJob method error, jobName is :{}"),jobName,e);
}
return false;
}
}
任务执行
启动即执行+定时执行
@Component
public class QuartzJobInit implements CommandLineRunner {
@Autowired
private GovernJobInfoService governJobInfoService;
@Override
public void run(String... args) throws Exception {
//启动即执行
// 定时任务
governJobInfoService.startModelStandardDataEsSynJob();
}
}
public interface GovernJobInfoService {
void startModelStandardDataEsSynJob();
}
@Override
public void startModelStandardDataEsSynJob() {
JobEnum jobEnum = JobEnum.getQuartzJobEnum("modelStandardDataEsSynJob");
try {
boolean exist = jobExecuteService.existJob(jobEnum.getJobName(), jobEnum.getJobGroup());
if(!exist) {
JobDetail detail = jobExecuteService.createJobDetail(jobEnum.getJobClass(), jobEnum.getJobName(), jobEnum.getJobGroup());
Trigger trigger = jobExecuteService.createCronTrigger(jobEnum.getTriggerName(), jobEnum.getTriggerGroup(), modelStandardDataSyncCron);
jobExecuteService.scheduleJob(detail, trigger);
log.info(("{} add to quartz success..."), jobEnum.toString());
}
} catch (Exception e) {
log.error(("job init error, job is:{} "), jobEnum.toString(), e);
}
}
定制job
public enum JobEnum {
MODEL_STANDARD_DATA_SYN_JOB("modelStandardDataEsSynJob", ModelStandardEsSynJob.class, "modelStandardDataEsSynJobGroup", "modelStandardDataEsSynTrigger", "modelStandardDataEsSynTriggerGroup"),
;
private String jobName;
private Class<? extends Job> jobClass;
private String jobGroup;
private String triggerName;
private String triggerGroup;
JobEnum(String jobName, Class<? extends Job> jobClass, String jobGroup, String triggerName, String triggerGroup) {
this.jobName = jobName;
this.jobClass = jobClass;
this.jobGroup = jobGroup;
this.triggerName = triggerName;
this.triggerGroup = triggerGroup;
}
public static JobEnum getQuartzJobEnum(String jobName){
if(StringUtils.isEmpty(jobName)) {
return null;
}
for(JobEnum jobEnum : JobEnum.values()) {
if(jobName.equals(jobEnum.getJobName())) {
return jobEnum;
}
}
return null;
}
public String getJobName() {
return jobName;
}
public Class<? extends Job> getJobClass() {
return jobClass;
}
public String getJobGroup() {
return jobGroup;
}
public String getTriggerName() {
return triggerName;
}
public String getTriggerGroup() {
return triggerGroup;
}
}
/**
* 模型数据集es同步Job对象
*
* @author 韩仁松
* @since businessV1.0.0
*/
@DisallowConcurrentExecution
@Slf4j
public class ModelStandardEsSynJob implements Job {
@Autowired
private StandardRelationDao standardRelationDao;
@Autowired
private LoadAndUnloadEsService loadAndUnloadEsService;
@Value("${es.index.modelStandardRelationCatalogCenter}")
private String modelStandardRelationIndex;
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
log.info("ModelDataEsSynJob run start !!! ");
this.synModelStandardEsInfo();
}
public void synModelStandardEsInfo() {
// 获取所有发布模型最新版本数据集列表
List<ModelStandardEsVo> models = standardRelationDao.queryNewestModelStandardList();
// 获取ES中所有的数据
List<ModelStandardEsVo> modelsInEs = getModelCodeInEs();
// 进行对比(获取不存在的模型)
if (CollectionUtils.isNotEmpty(models) && CollectionUtils.isEmpty(modelsInEs)) {
loadAndUnloadEsService.loadModelStandardToEs(models);
return;
}
if (CollectionUtils.isEmpty(models) && CollectionUtils.isNotEmpty(modelsInEs)) {
loadAndUnloadEsService.unloadModelStandardToEs(modelsInEs);
return;
}
if (CollectionUtils.isNotEmpty(models) && CollectionUtils.isNotEmpty(modelsInEs)) {
loadAndUnloadEsService.unloadModelStandardToEs(modelsInEs);
loadAndUnloadEsService.loadModelStandardToEs(models);
}
}
public List<ModelStandardEsVo> getModelCodeInEs() {
List<ModelStandardEsVo> modelsInEs = new ArrayList<>();
long count = 0;
try {
BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
SearchSourceBuilder countSourceBuilder = new SearchSourceBuilder();
countSourceBuilder.query(boolBuilder);
CountRequest countRequest = new CountRequest(modelStandardRelationIndex);
countRequest.source(countSourceBuilder);
CountResponse countResponse = EsClientUtil.getClient().count(countRequest, RequestOptions.DEFAULT);
count = countResponse.getCount();
if (count > 0) {
sourceBuilder.from(0);
sourceBuilder.size((int) count);
sourceBuilder.query(boolBuilder);
String[] storedFields = new String[2];
storedFields[0] = "model_identification";
storedFields[1] = "standard_identification";
sourceBuilder.fetchSource(storedFields, null);
SearchRequest searchRequest = new SearchRequest(modelStandardRelationIndex);
searchRequest.source(sourceBuilder);
SearchResponse response = EsClientUtil.getClient().search(searchRequest, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
SearchHit[] searchHits = hits.getHits();
for (SearchHit hit : searchHits) {
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
ModelStandardEsVo modelStandardEsVo = new ModelStandardEsVo();
modelStandardEsVo.setModelIdentification((String) sourceAsMap.get("model_identification"));
modelStandardEsVo.setStandardIdentification((String) sourceAsMap.get("standard_identification"));
modelsInEs.add(modelStandardEsVo);
}
}
} catch (BusinessException e) {
log.error(ModelCatalogCenterLogHelper.message(e.getCode(), e.getMessage()), e);
throw new BusinessException(e.getCode(), e.getMessage(), e);
} catch (Exception e) {
throw new BusinessException(ModelCatalogCenterErrorCode.ES_OPERATION_FAILD, "es operate failed!!!", e);
}
log.info(" es operate success!!!");
return modelsInEs;
}
}