Java SpringBoot SpringBatch Quartz

1、pom文件

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.postgresql</groupId>
  8. <artifactId>postgresql</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-starter-jdbc</artifactId>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.springframework.boot</groupId>
  16. <artifactId>spring-boot-starter-batch</artifactId>
  17. </dependency>
  18. <dependency>
  19. <groupId>org.projectlombok</groupId>
  20. <artifactId>lombok</artifactId>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.springframework.boot</groupId>
  24. <artifactId>spring-boot-starter-batch</artifactId>
  25. </dependency>
  26. </dependencies>

2、application.yaml文件

  1. spring:
  2. datasource:
  3. username: thinklink
  4. password: thinklink
  5. url: jdbc:postgresql://172.16.205.54:5432/thinklink
  6. driver-class-name: org.postgresql.Driver
  7. batch:
  8. job:
  9. enabled: false
  10. server:
  11. port: 8073
  12. #upgrade-dispatch-base-url: http://172.16.205.125:8080/api/rpc/dispatch/command/
  13. upgrade-dispatch-base-url: http://172.16.205.211:8080/api/noauth/rpc/dispatch/command/
  14. # 每次批量处理的数据量,默认为5000
  15. batch-size: 5000

3、Service实现类,触发批处理任务的入口,执行一个job

  1. @Service("batchService")
  2. public class BatchServiceImpl implements BatchService {
  3. // 框架自动注入
  4. @Autowired
  5. private JobLauncher jobLauncher;
  6. @Autowired
  7. private Job updateDeviceJob;
  8. /**
  9. * 根据 taskId 创建一个Job
  10. * @param taskId
  11. * @throws Exception
  12. */
  13. @Override
  14. public void createBatchJob(String taskId) throws Exception {
  15. JobParameters jobParameters = new JobParametersBuilder()
  16. .addString("taskId", taskId)
  17. .addString("uuid", UUID.randomUUID().toString().replace("-",""))
  18. .toJobParameters();
  19. // 传入一个Job任务和任务需要的参数
  20. jobLauncher.run(updateDeviceJob, jobParameters);
  21. }
  22. }

4、SpringBatch配置类,此部分最重要

  1. @Configuration
  2. public class BatchConfiguration {
  3. private static final Logger log = LoggerFactory.getLogger(BatchConfiguration.class);
  4. @Value("${batch-size:5000}")
  5. private int batchSize;
  6. // 框架自动注入
  7. @Autowired
  8. public JobBuilderFactory jobBuilderFactory;
  9. // 框架自动注入
  10. @Autowired
  11. public StepBuilderFactory stepBuilderFactory;
  12. // 数据过滤器,对从数据库读出来的数据,注意进行操作
  13. @Autowired
  14. public TaskItemProcessor taskItemProcessor;
  15. // 接收job参数
  16. public Map<String, JobParameter> parameters;
  17. public Object taskId;
  18. @Autowired
  19. private JdbcTemplate jdbcTemplate;
  20. // 读取数据库操作
  21. @Bean
  22. @StepScope
  23. public JdbcCursorItemReader<DispatchRequest> itemReader(DataSource dataSource) {
  24. String querySql = " SELECT " +
  25. " e. ID AS taskId, " +
  26. " e.user_id AS userId, " +
  27. " e.timing_startup AS startTime, " +
  28. " u.device_id AS deviceId, " +
  29. " d.app_name AS appName, " +
  30. " d.compose_file AS composeFile, " +
  31. " e.failure_retry AS failureRetry, " +
  32. " e.tetry_times AS retryTimes, " +
  33. " e.device_managered AS deviceManagered " +
  34. " FROM " +
  35. " eiot_upgrade_task e " +
  36. " LEFT JOIN eiot_upgrade_device u ON e. ID = u.upgrade_task_id " +
  37. " LEFT JOIN eiot_app_detail d ON e.app_id = d. ID " +
  38. " WHERE " +
  39. " ( " +
  40. " u.device_upgrade_status = 0 " +
  41. " OR u.device_upgrade_status = 2" +
  42. " )" +
  43. " AND e.tetry_times > u.retry_times " +
  44. " AND e. ID = ?";
  45. return new JdbcCursorItemReaderBuilder<DispatchRequest>()
  46. .name("itemReader")
  47. .sql(querySql)
  48. .dataSource(dataSource)
  49. .queryArguments(new Object[]{parameters.get("taskId").getValue()})
  50. .rowMapper(new DispatchRequest.DispatchRequestRowMapper())
  51. .build();
  52. }
  53. // 将结果写回数据库
  54. @Bean
  55. @StepScope
  56. public ItemWriter<ProcessResult> itemWriter() {
  57. return new ItemWriter<ProcessResult>() {
  58. private int updateTaskStatus(DispatchRequest dispatchRequest, int status) {
  59. log.info("update taskId: {}, deviceId: {} to status {}", dispatchRequest.getTaskId(), dispatchRequest.getDeviceId(), status);
  60. Integer retryTimes = jdbcTemplate.queryForObject(
  61. "select retry_times from eiot_upgrade_device where device_id = ? and upgrade_task_id = ?",
  62. new Object[]{ dispatchRequest.getDeviceId(), dispatchRequest.getTaskId()}, Integer.class
  63. );
  64. retryTimes += 1;
  65. int updateCount = jdbcTemplate.update("update eiot_upgrade_device set device_upgrade_status = ?, retry_times = ? " +
  66. "where device_id = ? and upgrade_task_id = ?", status, retryTimes, dispatchRequest.getDeviceId(), dispatchRequest.getTaskId());
  67. if (updateCount <= 0) {
  68. log.warn("no task updated");
  69. } else {
  70. log.info("count of {} task updated", updateCount);
  71. }
  72. // 最后一次重试
  73. if (status == STATUS_DISPATCH_FAILED && retryTimes == dispatchRequest.getRetryTimes()) {
  74. log.info("the last retry of {} failed, inc deviceManagered", dispatchRequest.getTaskId());
  75. return 1;
  76. } else {
  77. return 0;
  78. }
  79. }
  80. @Override
  81. @Transactional
  82. public void write(List<? extends ProcessResult> list) throws Exception {
  83. Map taskMap = jdbcTemplate.queryForMap(
  84. "select device_managered, device_count, task_status from eiot_upgrade_task where id = ?",
  85. list.get(0).getDispatchRequest().getTaskId() // 我们认定一个批量里面,taskId都是一样的
  86. );
  87. int deviceManagered = (int)taskMap.get("device_managered");
  88. Integer deviceCount = (Integer) taskMap.get("device_count");
  89. if (deviceCount == null) {
  90. log.warn("deviceCount of task {} is null", list.get(0).getDispatchRequest().getTaskId());
  91. }
  92. int taskStatus = (int)taskMap.get("task_status");
  93. for (ProcessResult result: list) {
  94. deviceManagered += updateTaskStatus(result.getDispatchRequest(), result.getStatus());
  95. }
  96. if (deviceCount != null && deviceManagered == deviceCount) {
  97. taskStatus = 2; //任务状态 0:待升级,1:升级中,2:已完成
  98. }
  99. jdbcTemplate.update("update eiot_upgrade_task set device_managered = ?, task_status = ? " +
  100. "where id = ?", deviceManagered, taskStatus, list.get(0).getDispatchRequest().getTaskId());
  101. }
  102. };
  103. }
  104. /**
  105. * 定义一个下发更新的 job
  106. * @return
  107. */
  108. @Bean
  109. public Job updateDeviceJob(Step updateDeviceStep) {
  110. return jobBuilderFactory.get(UUID.randomUUID().toString().replace("-", ""))
  111. .listener(new JobListener()) // 设置Job的监听器
  112. .flow(updateDeviceStep)// 执行下发更新的Step
  113. .end()
  114. .build();
  115. }
  116. /**
  117. * 定义一个下发更新的 step
  118. * @return
  119. */
  120. @Bean
  121. public Step updateDeviceStep(JdbcCursorItemReader<DispatchRequest> itemReader,ItemWriter<ProcessResult> itemWriter) {
  122. return stepBuilderFactory.get(UUID.randomUUID().toString().replace("-", ""))
  123. .<DispatchRequest, ProcessResult> chunk(batchSize)
  124. .reader(itemReader) //根据taskId从数据库读取更新设备信息
  125. .processor(taskItemProcessor) // 每条更新信息,执行下发更新接口
  126. .writer(itemWriter)
  127. .build();
  128. }
  129. // job 监听器
  130. public class JobListener implements JobExecutionListener {
  131. @Override
  132. public void beforeJob(JobExecution jobExecution) {
  133. log.info(jobExecution.getJobInstance().getJobName() + " before... ");
  134. parameters = jobExecution.getJobParameters().getParameters();
  135. taskId = parameters.get("taskId").getValue();
  136. log.info("job param taskId : " + parameters.get("taskId"));
  137. }
  138. @Override
  139. public void afterJob(JobExecution jobExecution) {
  140. log.info(jobExecution.getJobInstance().getJobName() + " after... ");
  141. // 当所有job执行完之后,查询设备更新状态,如果有失败,则要定时重新执行job
  142. String sql = " SELECT " +
  143. " count(*) " +
  144. " FROM " +
  145. " eiot_upgrade_device d " +
  146. " LEFT JOIN eiot_upgrade_task u ON d.upgrade_task_id = u. ID " +
  147. " WHERE " +
  148. " u. ID = ? " +
  149. " AND d.retry_times < u.tetry_times " +
  150. " AND ( " +
  151. " d.device_upgrade_status = 0 " +
  152. " OR d.device_upgrade_status = 2 " +
  153. " ) ";
  154. // 获取更新失败的设备个数
  155. Integer count = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);
  156. log.info("update device failure count : " + count);
  157. // 下面是使用Quartz触发定时任务
  158. // 获取任务时间,单位秒
  159. // String time = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);
  160. // 此处方便测试,应该从数据库中取taskId对应的重试间隔,单位秒
  161. Integer millSecond = 10;
  162. if(count != null && count > 0){
  163. String jobName = "UpgradeTask_" + taskId;
  164. String reTaskId = taskId.toString();
  165. Map<String,Object> params = new HashMap<>();
  166. params.put("jobName",jobName);
  167. params.put("taskId",reTaskId);
  168. if (QuartzManager.checkNameNotExist(jobName))
  169. {
  170. QuartzManager.scheduleRunOnceJob(jobName, RunOnceJobLogic.class,params,millSecond);
  171. }
  172. }
  173. }
  174. }
  175. }

5、Processor,处理每条数据,可以在此对数据进行过滤操作

  1. @Component("taskItemProcessor")
  2. public class TaskItemProcessor implements ItemProcessor<DispatchRequest, ProcessResult> {
  3. public static final int STATUS_DISPATCH_FAILED = 2;
  4. public static final int STATUS_DISPATCH_SUCC = 1;
  5. private static final Logger log = LoggerFactory.getLogger(TaskItemProcessor.class);
  6. @Value("${upgrade-dispatch-base-url:http://localhost/api/v2/rpc/dispatch/command/}")
  7. private String dispatchUrl;
  8. @Autowired
  9. JdbcTemplate jdbcTemplate;
  10. /**
  11. * 在这里,执行 下发更新指令 的操作
  12. * @param dispatchRequest
  13. * @return
  14. * @throws Exception
  15. */
  16. @Override
  17. public ProcessResult process(final DispatchRequest dispatchRequest) {
  18. // 调用接口,下发指令
  19. String url = dispatchUrl + dispatchRequest.getDeviceId()+"/"+dispatchRequest.getUserId();
  20. log.info("request url:" + url);
  21. RestTemplate restTemplate = new RestTemplate();
  22. HttpHeaders headers = new HttpHeaders();
  23. headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
  24. MultiValueMap<String, String> params = new LinkedMultiValueMap<String, String>();
  25. JSONObject jsonOuter = new JSONObject();
  26. JSONObject jsonInner = new JSONObject();
  27. try {
  28. jsonInner.put("jobId",dispatchRequest.getTaskId());
  29. jsonInner.put("name",dispatchRequest.getName());
  30. jsonInner.put("composeFile", Base64Util.bytesToBase64Str(dispatchRequest.getComposeFile()));
  31. jsonInner.put("policy",new JSONObject().put("startTime",dispatchRequest.getPolicy()));
  32. jsonInner.put("timestamp",dispatchRequest.getTimestamp());
  33. jsonOuter.put("method","updateApp");
  34. jsonOuter.put("params",jsonInner);
  35. } catch (JSONException e) {
  36. log.info("JSON convert Exception :" + e);
  37. }catch (IOException e) {
  38. log.info("Base64Util bytesToBase64Str :" + e);
  39. }
  40. log.info("request body json :" + jsonOuter);
  41. HttpEntity<String> requestEntity = new HttpEntity<String>(jsonOuter.toString(),headers);
  42. int status;
  43. try {
  44. ResponseEntity<String> response = restTemplate.postForEntity(url,requestEntity,String.class);
  45. log.info("response :" + response);
  46. if (response.getStatusCode() == HttpStatus.OK) {
  47. status = STATUS_DISPATCH_SUCC;
  48. } else {
  49. status = STATUS_DISPATCH_FAILED;
  50. }
  51. }catch (Exception e){
  52. status = STATUS_DISPATCH_FAILED;
  53. }
  54. return new ProcessResult(dispatchRequest, status);
  55. }
  56. }

6、封装数据库返回数据的实体Bean,注意静态内部类

  1. public class DispatchRequest {
  2. private String taskId;
  3. private String deviceId;
  4. private String userId;
  5. private String name;
  6. private byte[] composeFile;
  7. private String policy;
  8. private String timestamp;
  9. private String md5;
  10. private int failureRetry;
  11. private int retryTimes;
  12. private int deviceManagered;
  13. // 省略构造函数,setter/getter/tostring方法
  14. //......
  15. public static class DispatchRequestRowMapper implements RowMapper<DispatchRequest> {
  16. @Override
  17. public DispatchRequest mapRow(ResultSet resultSet, int i) throws SQLException {
  18. DispatchRequest dispatchRequest = new DispatchRequest();
  19. dispatchRequest.setTaskId(resultSet.getString("taskId"));
  20. dispatchRequest.setUserId(resultSet.getString("userId"));
  21. dispatchRequest.setPolicy(resultSet.getString("startTime"));
  22. dispatchRequest.setDeviceId(resultSet.getString("deviceId"));
  23. dispatchRequest.setName(resultSet.getString("appName"));
  24. dispatchRequest.setComposeFile(resultSet.getBytes("composeFile"));
  25. dispatchRequest.setTimestamp(DateUtil.DateToString(new Date()));
  26. dispatchRequest.setRetryTimes(resultSet.getInt("retryTimes"));
  27. dispatchRequest.setFailureRetry(resultSet.getInt("failureRetry"));
  28. dispatchRequest.setDeviceManagered(resultSet.getInt("deviceManagered"));
  29. return dispatchRequest;
  30. }
  31. }
  32. }

7、启动类上要加上注解

  1. @SpringBootApplication
  2. @EnableBatchProcessing
  3. public class Application {
  4. public static void main(String[] args) {
  5. SpringApplication.run(Application.class, args);
  6. }
  7. }