定时任务线程池配置
@Component@Slf4jpublic class QuartzJobConfigService {@Bean("detectScheduler")private Scheduler getScheduler(QuartzFactory jobFactory) {Scheduler scheduler = getSchedulerFactoryBean(jobFactory).getScheduler();return scheduler;}@Bean("jobFactory")private QuartzFactory getQuartzFactory(AutowireCapableBeanFactory capableBeanFactory){QuartzFactory factory = new QuartzFactory(capableBeanFactory);return factory;}class QuartzFactory extends AdaptableJobFactory {public QuartzFactory(AutowireCapableBeanFactory capableBeanFactory) {this.capableBeanFactory = capableBeanFactory;}private AutowireCapableBeanFactory capableBeanFactory;@Overrideprotected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {Object jobInstance = super.createJobInstance(bundle);capableBeanFactory.autowireBean(jobInstance); //这一步解决不能spring注入bean的问题return jobInstance;}}private SchedulerFactoryBean getSchedulerFactoryBean(QuartzFactory jobFactory) {SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();try {schedulerFactoryBean.setSchedulerName("Device-Detect-Scheduler");schedulerFactoryBean.setJobFactory(jobFactory);schedulerFactoryBean.setOverwriteExistingJobs(true);schedulerFactoryBean.setStartupDelay(1);ExecutorService threadPool = new ThreadPoolExecutor(10, 10, 5L, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(1000), new NamedThreadFactory("device-detect-quartz-thread"));schedulerFactoryBean.setTaskExecutor(threadPool);schedulerFactoryBean.afterPropertiesSet();} catch (Exception e) {log.error(ModelCatalogCenterLogHelper.message(" getSchedulerFactoryBean error..."), e);}return schedulerFactoryBean;}}
定时任务执行逻辑
import org.quartz.CronTrigger;import org.quartz.Job;import org.quartz.JobDetail;import org.quartz.Trigger;/*** 定时任务功能** @author 韩仁松* @since businessV1.0.0*/public interface JobExecuteService {JobDetail createJobDetail(Class<? extends Job> jobClass, String jobName, String jobGroup);CronTrigger createCronTrigger(String triggerName, String triggerGroup, String cron) ;void scheduleJob(JobDetail jobDetail, Trigger trigger);void removeJob(String jobName, String jobGroup, String triggerName, String triggerGroup);boolean existJob(String jobName, String jobGroup);}
@Service@Slf4jpublic class JobExecuteServiceImpl implements JobExecuteService {@Autowired@Qualifier("detectScheduler")private Scheduler scheduler;@Overridepublic JobDetail createJobDetail(Class<? extends Job> jobClass, String jobName, String jobGroup) {if(jobClass==null) {throw new BusinessException(ErrorCode.B_PARAM_VALID);}if(StringUtils.isEmpty(jobName)) {throw new BusinessException(ErrorCode.B_PARAM_VALID);}return JobBuilder.newJob(jobClass).withIdentity(jobName,jobGroup).build();}@Overridepublic CronTrigger createCronTrigger(String triggerName, String triggerGroup, String cron) {if(StringUtils.isEmpty(cron)) {throw new BusinessException(ErrorCode.B_PARAM_VALID);}if(StringUtils.isEmpty(triggerName)) {throw new BusinessException(ErrorCode.B_PARAM_VALID);}CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cron);return TriggerBuilder.newTrigger().withIdentity(triggerName, triggerGroup).withSchedule(cronScheduleBuilder).build();}@Overridepublic void scheduleJob(JobDetail jobDetail, Trigger trigger) {if(jobDetail==null) {throw new BusinessException(ErrorCode.B_PARAM_VALID);}if(trigger==null) {throw new BusinessException(ErrorCode.B_PARAM_VALID);}try {scheduler.scheduleJob(jobDetail, trigger);startJob(scheduler);} catch (Exception e) {log.error("scheduleJob method error...",e);}}@Overridepublic void removeJob(String jobName, String jobGroup, String triggerName, String triggerGroup) {if(StringUtils.isEmpty(jobName)) {throw new BusinessException(ErrorCode.B_PARAM_VALID);}if(StringUtils.isEmpty(jobGroup)) {throw new BusinessException(ErrorCode.B_PARAM_VALID);}if(StringUtils.isEmpty(triggerName)) {throw new BusinessException(ErrorCode.B_PARAM_VALID);}if(StringUtils.isEmpty(triggerGroup)) {throw new BusinessException(ErrorCode.B_PARAM_VALID);}JobKey jobKey = new JobKey(jobName, jobGroup);TriggerKey triggerKey = new TriggerKey(triggerName, triggerGroup);try {scheduler.pauseTrigger(triggerKey);scheduler.unscheduleJob(triggerKey);scheduler.deleteJob(jobKey);} catch (SchedulerException e) {log.error(("removeJob method error, job is {}, trigger is {}"), jobName, triggerName, e);}}/*** @param scheduler*/private void startJob(Scheduler scheduler) {try {if(scheduler!=null) {if(!scheduler.isStarted()) {scheduler.start();}}} catch (SchedulerException e) {log.error(("startJob method error..."),e);}}@Overridepublic boolean existJob(String jobName, String jobGroup) {if(StringUtils.isEmpty(jobName)) {return false;}JobKey jobKey = new JobKey(jobName, jobGroup);try {return scheduler.checkExists(jobKey);} catch (SchedulerException e) {log.error(("existJob method error, jobName is :{}"),jobName,e);}return false;}}
任务执行
启动即执行+定时执行
@Componentpublic class QuartzJobInit implements CommandLineRunner {@Autowiredprivate GovernJobInfoService governJobInfoService;@Overridepublic void run(String... args) throws Exception {//启动即执行// 定时任务governJobInfoService.startModelStandardDataEsSynJob();}}
public interface GovernJobInfoService {void startModelStandardDataEsSynJob();}
@Overridepublic void startModelStandardDataEsSynJob() {JobEnum jobEnum = JobEnum.getQuartzJobEnum("modelStandardDataEsSynJob");try {boolean exist = jobExecuteService.existJob(jobEnum.getJobName(), jobEnum.getJobGroup());if(!exist) {JobDetail detail = jobExecuteService.createJobDetail(jobEnum.getJobClass(), jobEnum.getJobName(), jobEnum.getJobGroup());Trigger trigger = jobExecuteService.createCronTrigger(jobEnum.getTriggerName(), jobEnum.getTriggerGroup(), modelStandardDataSyncCron);jobExecuteService.scheduleJob(detail, trigger);log.info(("{} add to quartz success..."), jobEnum.toString());}} catch (Exception e) {log.error(("job init error, job is:{} "), jobEnum.toString(), e);}}
定制job
public enum JobEnum {MODEL_STANDARD_DATA_SYN_JOB("modelStandardDataEsSynJob", ModelStandardEsSynJob.class, "modelStandardDataEsSynJobGroup", "modelStandardDataEsSynTrigger", "modelStandardDataEsSynTriggerGroup"),;private String jobName;private Class<? extends Job> jobClass;private String jobGroup;private String triggerName;private String triggerGroup;JobEnum(String jobName, Class<? extends Job> jobClass, String jobGroup, String triggerName, String triggerGroup) {this.jobName = jobName;this.jobClass = jobClass;this.jobGroup = jobGroup;this.triggerName = triggerName;this.triggerGroup = triggerGroup;}public static JobEnum getQuartzJobEnum(String jobName){if(StringUtils.isEmpty(jobName)) {return null;}for(JobEnum jobEnum : JobEnum.values()) {if(jobName.equals(jobEnum.getJobName())) {return jobEnum;}}return null;}public String getJobName() {return jobName;}public Class<? extends Job> getJobClass() {return jobClass;}public String getJobGroup() {return jobGroup;}public String getTriggerName() {return triggerName;}public String getTriggerGroup() {return triggerGroup;}}
/*** 模型数据集es同步Job对象** @author 韩仁松* @since businessV1.0.0*/@DisallowConcurrentExecution@Slf4jpublic class ModelStandardEsSynJob implements Job {@Autowiredprivate StandardRelationDao standardRelationDao;@Autowiredprivate LoadAndUnloadEsService loadAndUnloadEsService;@Value("${es.index.modelStandardRelationCatalogCenter}")private String modelStandardRelationIndex;@Overridepublic void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {log.info("ModelDataEsSynJob run start !!! ");this.synModelStandardEsInfo();}public void synModelStandardEsInfo() {// 获取所有发布模型最新版本数据集列表List<ModelStandardEsVo> models = standardRelationDao.queryNewestModelStandardList();// 获取ES中所有的数据List<ModelStandardEsVo> modelsInEs = getModelCodeInEs();// 进行对比(获取不存在的模型)if (CollectionUtils.isNotEmpty(models) && CollectionUtils.isEmpty(modelsInEs)) {loadAndUnloadEsService.loadModelStandardToEs(models);return;}if (CollectionUtils.isEmpty(models) && CollectionUtils.isNotEmpty(modelsInEs)) {loadAndUnloadEsService.unloadModelStandardToEs(modelsInEs);return;}if (CollectionUtils.isNotEmpty(models) && CollectionUtils.isNotEmpty(modelsInEs)) {loadAndUnloadEsService.unloadModelStandardToEs(modelsInEs);loadAndUnloadEsService.loadModelStandardToEs(models);}}public List<ModelStandardEsVo> getModelCodeInEs() {List<ModelStandardEsVo> modelsInEs = new ArrayList<>();long count = 0;try {BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();SearchSourceBuilder countSourceBuilder = new SearchSourceBuilder();countSourceBuilder.query(boolBuilder);CountRequest countRequest = new CountRequest(modelStandardRelationIndex);countRequest.source(countSourceBuilder);CountResponse countResponse = EsClientUtil.getClient().count(countRequest, RequestOptions.DEFAULT);count = countResponse.getCount();if (count > 0) {sourceBuilder.from(0);sourceBuilder.size((int) count);sourceBuilder.query(boolBuilder);String[] storedFields = new String[2];storedFields[0] = "model_identification";storedFields[1] = "standard_identification";sourceBuilder.fetchSource(storedFields, null);SearchRequest searchRequest = new SearchRequest(modelStandardRelationIndex);searchRequest.source(sourceBuilder);SearchResponse response = EsClientUtil.getClient().search(searchRequest, RequestOptions.DEFAULT);SearchHits hits = response.getHits();SearchHit[] searchHits = hits.getHits();for (SearchHit hit : searchHits) {Map<String, Object> sourceAsMap = hit.getSourceAsMap();ModelStandardEsVo modelStandardEsVo = new ModelStandardEsVo();modelStandardEsVo.setModelIdentification((String) sourceAsMap.get("model_identification"));modelStandardEsVo.setStandardIdentification((String) sourceAsMap.get("standard_identification"));modelsInEs.add(modelStandardEsVo);}}} catch (BusinessException e) {log.error(ModelCatalogCenterLogHelper.message(e.getCode(), e.getMessage()), e);throw new BusinessException(e.getCode(), e.getMessage(), e);} catch (Exception e) {throw new BusinessException(ModelCatalogCenterErrorCode.ES_OPERATION_FAILD, "es operate failed!!!", e);}log.info(" es operate success!!!");return modelsInEs;}}
