定时任务线程池配置

  1. @Component
  2. @Slf4j
  3. public class QuartzJobConfigService {
  4. @Bean("detectScheduler")
  5. private Scheduler getScheduler(QuartzFactory jobFactory) {
  6. Scheduler scheduler = getSchedulerFactoryBean(jobFactory).getScheduler();
  7. return scheduler;
  8. }
  9. @Bean("jobFactory")
  10. private QuartzFactory getQuartzFactory(AutowireCapableBeanFactory capableBeanFactory){
  11. QuartzFactory factory = new QuartzFactory(capableBeanFactory);
  12. return factory;
  13. }
  14. class QuartzFactory extends AdaptableJobFactory {
  15. public QuartzFactory(AutowireCapableBeanFactory capableBeanFactory) {
  16. this.capableBeanFactory = capableBeanFactory;
  17. }
  18. private AutowireCapableBeanFactory capableBeanFactory;
  19. @Override
  20. protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
  21. Object jobInstance = super.createJobInstance(bundle);
  22. capableBeanFactory.autowireBean(jobInstance); //这一步解决不能spring注入bean的问题
  23. return jobInstance;
  24. }
  25. }
  26. private SchedulerFactoryBean getSchedulerFactoryBean(QuartzFactory jobFactory) {
  27. SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
  28. try {
  29. schedulerFactoryBean.setSchedulerName("Device-Detect-Scheduler");
  30. schedulerFactoryBean.setJobFactory(jobFactory);
  31. schedulerFactoryBean.setOverwriteExistingJobs(true);
  32. schedulerFactoryBean.setStartupDelay(1);
  33. ExecutorService threadPool = new ThreadPoolExecutor(10, 10, 5L, TimeUnit.SECONDS,
  34. new LinkedBlockingQueue<Runnable>(1000), new NamedThreadFactory("device-detect-quartz-thread"));
  35. schedulerFactoryBean.setTaskExecutor(threadPool);
  36. schedulerFactoryBean.afterPropertiesSet();
  37. } catch (Exception e) {
  38. log.error(ModelCatalogCenterLogHelper.message(" getSchedulerFactoryBean error..."), e);
  39. }
  40. return schedulerFactoryBean;
  41. }
  42. }

定时任务执行逻辑

  1. import org.quartz.CronTrigger;
  2. import org.quartz.Job;
  3. import org.quartz.JobDetail;
  4. import org.quartz.Trigger;
  5. /**
  6. * 定时任务功能
  7. *
  8. * @author 韩仁松
  9. * @since businessV1.0.0
  10. */
  11. public interface JobExecuteService {
  12. JobDetail createJobDetail(Class<? extends Job> jobClass, String jobName, String jobGroup);
  13. CronTrigger createCronTrigger(String triggerName, String triggerGroup, String cron) ;
  14. void scheduleJob(JobDetail jobDetail, Trigger trigger);
  15. void removeJob(String jobName, String jobGroup, String triggerName, String triggerGroup);
  16. boolean existJob(String jobName, String jobGroup);
  17. }
  1. @Service
  2. @Slf4j
  3. public class JobExecuteServiceImpl implements JobExecuteService {
  4. @Autowired
  5. @Qualifier("detectScheduler")
  6. private Scheduler scheduler;
  7. @Override
  8. public JobDetail createJobDetail(Class<? extends Job> jobClass, String jobName, String jobGroup) {
  9. if(jobClass==null) {
  10. throw new BusinessException(ErrorCode.B_PARAM_VALID);
  11. }
  12. if(StringUtils.isEmpty(jobName)) {
  13. throw new BusinessException(ErrorCode.B_PARAM_VALID);
  14. }
  15. return JobBuilder.newJob(jobClass).withIdentity(jobName,jobGroup).build();
  16. }
  17. @Override
  18. public CronTrigger createCronTrigger(String triggerName, String triggerGroup, String cron) {
  19. if(StringUtils.isEmpty(cron)) {
  20. throw new BusinessException(ErrorCode.B_PARAM_VALID);
  21. }
  22. if(StringUtils.isEmpty(triggerName)) {
  23. throw new BusinessException(ErrorCode.B_PARAM_VALID);
  24. }
  25. CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cron);
  26. return TriggerBuilder.newTrigger()
  27. .withIdentity(triggerName, triggerGroup).withSchedule(cronScheduleBuilder)
  28. .build();
  29. }
  30. @Override
  31. public void scheduleJob(JobDetail jobDetail, Trigger trigger) {
  32. if(jobDetail==null) {
  33. throw new BusinessException(ErrorCode.B_PARAM_VALID);
  34. }
  35. if(trigger==null) {
  36. throw new BusinessException(ErrorCode.B_PARAM_VALID);
  37. }
  38. try {
  39. scheduler.scheduleJob(jobDetail, trigger);
  40. startJob(scheduler);
  41. } catch (Exception e) {
  42. log.error("scheduleJob method error...",e);
  43. }
  44. }
  45. @Override
  46. public void removeJob(String jobName, String jobGroup, String triggerName, String triggerGroup) {
  47. if(StringUtils.isEmpty(jobName)) {
  48. throw new BusinessException(ErrorCode.B_PARAM_VALID);
  49. }
  50. if(StringUtils.isEmpty(jobGroup)) {
  51. throw new BusinessException(ErrorCode.B_PARAM_VALID);
  52. }
  53. if(StringUtils.isEmpty(triggerName)) {
  54. throw new BusinessException(ErrorCode.B_PARAM_VALID);
  55. }
  56. if(StringUtils.isEmpty(triggerGroup)) {
  57. throw new BusinessException(ErrorCode.B_PARAM_VALID);
  58. }
  59. JobKey jobKey = new JobKey(jobName, jobGroup);
  60. TriggerKey triggerKey = new TriggerKey(triggerName, triggerGroup);
  61. try {
  62. scheduler.pauseTrigger(triggerKey);
  63. scheduler.unscheduleJob(triggerKey);
  64. scheduler.deleteJob(jobKey);
  65. } catch (SchedulerException e) {
  66. log.error(("removeJob method error, job is {}, trigger is {}"), jobName, triggerName, e);
  67. }
  68. }
  69. /**
  70. * @param scheduler
  71. */
  72. private void startJob(Scheduler scheduler) {
  73. try {
  74. if(scheduler!=null) {
  75. if(!scheduler.isStarted()) {
  76. scheduler.start();
  77. }
  78. }
  79. } catch (SchedulerException e) {
  80. log.error(("startJob method error..."),e);
  81. }
  82. }
  83. @Override
  84. public boolean existJob(String jobName, String jobGroup) {
  85. if(StringUtils.isEmpty(jobName)) {
  86. return false;
  87. }
  88. JobKey jobKey = new JobKey(jobName, jobGroup);
  89. try {
  90. return scheduler.checkExists(jobKey);
  91. } catch (SchedulerException e) {
  92. log.error(("existJob method error, jobName is :{}"),jobName,e);
  93. }
  94. return false;
  95. }
  96. }

任务执行
启动即执行+定时执行

  1. @Component
  2. public class QuartzJobInit implements CommandLineRunner {
  3. @Autowired
  4. private GovernJobInfoService governJobInfoService;
  5. @Override
  6. public void run(String... args) throws Exception {
  7. //启动即执行
  8. // 定时任务
  9. governJobInfoService.startModelStandardDataEsSynJob();
  10. }
  11. }
  1. public interface GovernJobInfoService {
  2. void startModelStandardDataEsSynJob();
  3. }
  1. @Override
  2. public void startModelStandardDataEsSynJob() {
  3. JobEnum jobEnum = JobEnum.getQuartzJobEnum("modelStandardDataEsSynJob");
  4. try {
  5. boolean exist = jobExecuteService.existJob(jobEnum.getJobName(), jobEnum.getJobGroup());
  6. if(!exist) {
  7. JobDetail detail = jobExecuteService.createJobDetail(jobEnum.getJobClass(), jobEnum.getJobName(), jobEnum.getJobGroup());
  8. Trigger trigger = jobExecuteService.createCronTrigger(jobEnum.getTriggerName(), jobEnum.getTriggerGroup(), modelStandardDataSyncCron);
  9. jobExecuteService.scheduleJob(detail, trigger);
  10. log.info(("{} add to quartz success..."), jobEnum.toString());
  11. }
  12. } catch (Exception e) {
  13. log.error(("job init error, job is:{} "), jobEnum.toString(), e);
  14. }
  15. }

定制job

  1. public enum JobEnum {
  2. MODEL_STANDARD_DATA_SYN_JOB("modelStandardDataEsSynJob", ModelStandardEsSynJob.class, "modelStandardDataEsSynJobGroup", "modelStandardDataEsSynTrigger", "modelStandardDataEsSynTriggerGroup"),
  3. ;
  4. private String jobName;
  5. private Class<? extends Job> jobClass;
  6. private String jobGroup;
  7. private String triggerName;
  8. private String triggerGroup;
  9. JobEnum(String jobName, Class<? extends Job> jobClass, String jobGroup, String triggerName, String triggerGroup) {
  10. this.jobName = jobName;
  11. this.jobClass = jobClass;
  12. this.jobGroup = jobGroup;
  13. this.triggerName = triggerName;
  14. this.triggerGroup = triggerGroup;
  15. }
  16. public static JobEnum getQuartzJobEnum(String jobName){
  17. if(StringUtils.isEmpty(jobName)) {
  18. return null;
  19. }
  20. for(JobEnum jobEnum : JobEnum.values()) {
  21. if(jobName.equals(jobEnum.getJobName())) {
  22. return jobEnum;
  23. }
  24. }
  25. return null;
  26. }
  27. public String getJobName() {
  28. return jobName;
  29. }
  30. public Class<? extends Job> getJobClass() {
  31. return jobClass;
  32. }
  33. public String getJobGroup() {
  34. return jobGroup;
  35. }
  36. public String getTriggerName() {
  37. return triggerName;
  38. }
  39. public String getTriggerGroup() {
  40. return triggerGroup;
  41. }
  42. }
  1. /**
  2. * 模型数据集es同步Job对象
  3. *
  4. * @author 韩仁松
  5. * @since businessV1.0.0
  6. */
  7. @DisallowConcurrentExecution
  8. @Slf4j
  9. public class ModelStandardEsSynJob implements Job {
  10. @Autowired
  11. private StandardRelationDao standardRelationDao;
  12. @Autowired
  13. private LoadAndUnloadEsService loadAndUnloadEsService;
  14. @Value("${es.index.modelStandardRelationCatalogCenter}")
  15. private String modelStandardRelationIndex;
  16. @Override
  17. public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
  18. log.info("ModelDataEsSynJob run start !!! ");
  19. this.synModelStandardEsInfo();
  20. }
  21. public void synModelStandardEsInfo() {
  22. // 获取所有发布模型最新版本数据集列表
  23. List<ModelStandardEsVo> models = standardRelationDao.queryNewestModelStandardList();
  24. // 获取ES中所有的数据
  25. List<ModelStandardEsVo> modelsInEs = getModelCodeInEs();
  26. // 进行对比(获取不存在的模型)
  27. if (CollectionUtils.isNotEmpty(models) && CollectionUtils.isEmpty(modelsInEs)) {
  28. loadAndUnloadEsService.loadModelStandardToEs(models);
  29. return;
  30. }
  31. if (CollectionUtils.isEmpty(models) && CollectionUtils.isNotEmpty(modelsInEs)) {
  32. loadAndUnloadEsService.unloadModelStandardToEs(modelsInEs);
  33. return;
  34. }
  35. if (CollectionUtils.isNotEmpty(models) && CollectionUtils.isNotEmpty(modelsInEs)) {
  36. loadAndUnloadEsService.unloadModelStandardToEs(modelsInEs);
  37. loadAndUnloadEsService.loadModelStandardToEs(models);
  38. }
  39. }
  40. public List<ModelStandardEsVo> getModelCodeInEs() {
  41. List<ModelStandardEsVo> modelsInEs = new ArrayList<>();
  42. long count = 0;
  43. try {
  44. BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();
  45. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  46. SearchSourceBuilder countSourceBuilder = new SearchSourceBuilder();
  47. countSourceBuilder.query(boolBuilder);
  48. CountRequest countRequest = new CountRequest(modelStandardRelationIndex);
  49. countRequest.source(countSourceBuilder);
  50. CountResponse countResponse = EsClientUtil.getClient().count(countRequest, RequestOptions.DEFAULT);
  51. count = countResponse.getCount();
  52. if (count > 0) {
  53. sourceBuilder.from(0);
  54. sourceBuilder.size((int) count);
  55. sourceBuilder.query(boolBuilder);
  56. String[] storedFields = new String[2];
  57. storedFields[0] = "model_identification";
  58. storedFields[1] = "standard_identification";
  59. sourceBuilder.fetchSource(storedFields, null);
  60. SearchRequest searchRequest = new SearchRequest(modelStandardRelationIndex);
  61. searchRequest.source(sourceBuilder);
  62. SearchResponse response = EsClientUtil.getClient().search(searchRequest, RequestOptions.DEFAULT);
  63. SearchHits hits = response.getHits();
  64. SearchHit[] searchHits = hits.getHits();
  65. for (SearchHit hit : searchHits) {
  66. Map<String, Object> sourceAsMap = hit.getSourceAsMap();
  67. ModelStandardEsVo modelStandardEsVo = new ModelStandardEsVo();
  68. modelStandardEsVo.setModelIdentification((String) sourceAsMap.get("model_identification"));
  69. modelStandardEsVo.setStandardIdentification((String) sourceAsMap.get("standard_identification"));
  70. modelsInEs.add(modelStandardEsVo);
  71. }
  72. }
  73. } catch (BusinessException e) {
  74. log.error(ModelCatalogCenterLogHelper.message(e.getCode(), e.getMessage()), e);
  75. throw new BusinessException(e.getCode(), e.getMessage(), e);
  76. } catch (Exception e) {
  77. throw new BusinessException(ModelCatalogCenterErrorCode.ES_OPERATION_FAILD, "es operate failed!!!", e);
  78. }
  79. log.info(" es operate success!!!");
  80. return modelsInEs;
  81. }
  82. }