原文: https://howtodoinjava.com/spring-batch/records-count-example/

了解如何使用ItemStreamChunkListener计数 Spring Batch 作业处理的记录数,并将记录数记录在日志文件或控制台中。

使用ItemStream计数记录

在给定的ItemStream实现下方,计算定期处理的记录数。

ItemCountItemStream.java

  1. import org.springframework.batch.item.ExecutionContext;
  2. import org.springframework.batch.item.ItemStream;
  3. import org.springframework.batch.item.ItemStreamException;
  4. public class ItemCountItemStream implements ItemStream {
  5. public void open(ExecutionContext executionContext) throws ItemStreamException {
  6. }
  7. public void update(ExecutionContext executionContext) throws ItemStreamException {
  8. System.out.println("ItemCount: "+executionContext.get("FlatFileItemReader.read.count"));
  9. }
  10. public void close() throws ItemStreamException {
  11. }
  12. }

如何使用ItemCountItemStream

Tasklet中使用SimpleStepBuilder.stream()方法注册上面创建的ItemCountItemStream

BatchConfig.java

  1. @Autowired
  2. private JobBuilderFactory jobBuilderFactory;
  3. @Autowired
  4. private StepBuilderFactory stepBuilderFactory;
  5. @Bean
  6. public Job readCSVFilesJob() {
  7. return jobBuilderFactory
  8. .get("readCSVFilesJob")
  9. .incrementer(new RunIdIncrementer())
  10. .start(step1())
  11. .build();
  12. }
  13. @Bean
  14. public Step step1() {
  15. return stepBuilderFactory
  16. .get("step1")
  17. .<Employee, Employee>chunk(1)
  18. .reader(reader())
  19. .writer(writer())
  20. .stream(stream())
  21. .build();
  22. }
  23. @Bean
  24. public ItemCountItemStream stream() {
  25. return new ItemCountItemStream();
  26. }

使用ChunkListener计数记录

在给定的ChunkListener实现下方,计算定期处理的记录数。

ItemCountListener.java

  1. import org.springframework.batch.core.ChunkListener;
  2. import org.springframework.batch.core.scope.context.ChunkContext;
  3. public class ItemCountListener implements ChunkListener {
  4. @Override
  5. public void beforeChunk(ChunkContext context) {
  6. }
  7. @Override
  8. public void afterChunk(ChunkContext context) {
  9. int count = context.getStepContext().getStepExecution().getReadCount();
  10. System.out.println("ItemCount: " + count);
  11. }
  12. @Override
  13. public void afterChunkError(ChunkContext context) {
  14. }
  15. }

如何使用ItemCountListener

Tasklet中使用SimpleStepBuilder.listener()方法注册上面创建的ItemCountListener

BatchConfig.java

  1. @Autowired
  2. private JobBuilderFactory jobBuilderFactory;
  3. @Autowired
  4. private StepBuilderFactory stepBuilderFactory;
  5. @Bean
  6. public Job readCSVFilesJob() {
  7. return jobBuilderFactory
  8. .get("readCSVFilesJob")
  9. .incrementer(new RunIdIncrementer())
  10. .start(step1())
  11. .build();
  12. }
  13. @Bean
  14. public Step step1() {
  15. return stepBuilderFactory
  16. .get("step1")
  17. .<Employee, Employee>chunk(1)
  18. .reader(reader())
  19. .writer(writer())
  20. .listener(listener())
  21. .build();
  22. }
  23. @Bean
  24. public ItemCountListener listener() {
  25. return new ItemCountListener();
  26. }

计数记录演示

我正在使用上述ItemCountListener配置来处理此 CSV。

inputData.csv

  1. id,firstName,lastName
  2. 1,Lokesh,Gupta
  3. 2,Amit,Mishra
  4. 3,Pankaj,Kumar
  5. 4,David,Miller
  6. 5,David,Walsh

完整的批处理配置如下所示:

BatchConfig.java

  1. package com.howtodoinjava.demo.config;
  2. import org.springframework.batch.core.Job;
  3. import org.springframework.batch.core.Step;
  4. import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
  5. import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
  6. import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
  7. import org.springframework.batch.core.launch.support.RunIdIncrementer;
  8. import org.springframework.batch.item.file.FlatFileItemReader;
  9. import org.springframework.batch.item.file.LineMapper;
  10. import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
  11. import org.springframework.batch.item.file.mapping.DefaultLineMapper;
  12. import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
  13. import org.springframework.beans.factory.annotation.Autowired;
  14. import org.springframework.beans.factory.annotation.Value;
  15. import org.springframework.context.annotation.Bean;
  16. import org.springframework.context.annotation.Configuration;
  17. import org.springframework.core.io.Resource;
  18. import com.howtodoinjava.demo.model.Employee;
  19. @Configuration
  20. @EnableBatchProcessing
  21. public class BatchConfig
  22. {
  23. @Autowired
  24. private JobBuilderFactory jobBuilderFactory;
  25. @Autowired
  26. private StepBuilderFactory stepBuilderFactory;
  27. @Value("/input/inputData.csv")
  28. private Resource inputResource;
  29. @Bean
  30. public Job readCSVFilesJob() {
  31. return jobBuilderFactory
  32. .get("readCSVFilesJob")
  33. .incrementer(new RunIdIncrementer())
  34. .start(step1())
  35. .build();
  36. }
  37. @Bean
  38. public Step step1() {
  39. return stepBuilderFactory
  40. .get("step1")
  41. .<Employee, Employee>chunk(1)
  42. .reader(reader())
  43. .writer(writer())
  44. .listener(listner())
  45. .build();
  46. }
  47. @Bean
  48. public ItemCountListener listner() {
  49. return new ItemCountListener();
  50. }
  51. @Bean
  52. public FlatFileItemReader<Employee> reader() {
  53. FlatFileItemReader<Employee> itemReader = new FlatFileItemReader<Employee>();
  54. itemReader.setLineMapper(lineMapper());
  55. itemReader.setLinesToSkip(1);
  56. itemReader.setResource(inputResource);
  57. return itemReader;
  58. }
  59. @Bean
  60. public LineMapper<Employee> lineMapper() {
  61. DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<Employee>();
  62. DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
  63. lineTokenizer.setNames(new String[] { "id", "firstName", "lastName" });
  64. lineTokenizer.setIncludedFields(new int[] { 0, 1, 2 });
  65. BeanWrapperFieldSetMapper<Employee> fieldSetMapper = new BeanWrapperFieldSetMapper<Employee>();
  66. fieldSetMapper.setTargetType(Employee.class);
  67. lineMapper.setLineTokenizer(lineTokenizer);
  68. lineMapper.setFieldSetMapper(fieldSetMapper);
  69. return lineMapper;
  70. }
  71. @Bean
  72. public ConsoleItemWriter<Employee> writer() {
  73. return new ConsoleItemWriter<Employee>();
  74. }
  75. }

作为 Spring 启动应用程序启动该应用程序。 Spring 任务调度器将开始工作。

App.java

  1. import org.springframework.batch.core.Job;
  2. import org.springframework.batch.core.JobParameters;
  3. import org.springframework.batch.core.JobParametersBuilder;
  4. import org.springframework.batch.core.launch.JobLauncher;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.boot.SpringApplication;
  7. import org.springframework.boot.autoconfigure.SpringBootApplication;
  8. import org.springframework.scheduling.annotation.EnableScheduling;
  9. import org.springframework.scheduling.annotation.Scheduled;
  10. @SpringBootApplication
  11. @EnableScheduling
  12. public class App
  13. {
  14. @Autowired
  15. JobLauncher jobLauncher;
  16. @Autowired
  17. Job job;
  18. public static void main(String[] args)
  19. {
  20. SpringApplication.run(App.class, args);
  21. }
  22. @Scheduled(cron = "0 */1 * * * ?")
  23. public void perform() throws Exception
  24. {
  25. JobParameters params = new JobParametersBuilder()
  26. .addString("JobID", String.valueOf(System.currentTimeMillis()))
  27. .toJobParameters();
  28. jobLauncher.run(job, params);
  29. }
  30. }

现在观看控制台。

Console

  1. 2018-07-11 16:38:00 INFO - Job: [SimpleJob: [name=readCSVFilesJob]] launched with the following parameters: [{JobID=1531307280004}]
  2. 2018-07-11 16:38:00 INFO - Executing step: [step1]
  3. Employee [id=1, firstName=Lokesh, lastName=Gupta]
  4. ItemCount: 1
  5. Employee [id=2, firstName=Amit, lastName=Mishra]
  6. ItemCount: 2
  7. Employee [id=3, firstName=Pankaj, lastName=Kumar]
  8. ItemCount: 3
  9. Employee [id=4, firstName=David, lastName=Miller]
  10. ItemCount: 4
  11. Employee [id=5, firstName=David, lastName=Walsh]
  12. ItemCount: 5
  13. ItemCount: 5
  14. 2018-07-11 16:38:00 INFO - Job: [SimpleJob: [name=readCSVFilesJob]] completed with the following parameters: [{JobID=1531307280004}] and the following status: [COMPLETED]

将我的问题放在评论部分。

学习愉快!

参考:

ItemStream Java 文档

ChunkListener Java 文档