在Spring Batch中,ItemReader接口用于读取数据,ItemWriter接口用于输出数据。除此之外,我们可以通过ItemProcessor接口实现数据的处理,包括:数据校验,数据过滤和数据转换等。数据处理的时机发生于ItemReader读取数据之后,ItemWriter输出数据之前。本节记录下Spring Batch中ItemProcessor的使用。

框架搭建

新建一个Spring Boot项目,版本为2.2.4.RELEASE,artifactId为spring-batch-itemprocessor,项目结构如下图所示:

Spring Batch处理数据 - 图1

剩下的数据库层的准备,项目配置,依赖引入和Spring Batch入门文章中的框架搭建步骤一致,这里就不再赘述。

在介绍Spring Batch ItemProcessor之前,我们先准备个简单的数据读取源。在cc.mrbird.batch包下新建entity包,然后在该包下新建TestData实体类:

  1. public class TestData {
  2. private int id;
  3. private String field1;
  4. private String field2;
  5. private String field3;
  6. // get,set,toString略
  7. }

接着在cc.mrbird.batch包下新建reader包,然后在该包下创建ItemReaderConfigure

  1. @Configuration
  2. public class ItemReaderConfigure {
  3. @Bean
  4. public ListItemReader<TestData> simpleReader() {
  5. List<TestData> data = new ArrayList<>();
  6. TestData testData1 = new TestData();
  7. testData1.setId(1);
  8. testData1.setField1("11");
  9. testData1.setField2("12");
  10. testData1.setField3("13");
  11. data.add(testData1);
  12. TestData testData2 = new TestData();
  13. testData2.setId(2);
  14. testData2.setField1("21");
  15. testData2.setField2("22");
  16. testData2.setField3("23");
  17. data.add(testData2);
  18. TestData testData3 = new TestData();
  19. testData3.setId(3);
  20. testData3.setField1("31");
  21. testData3.setField2("32");
  22. // 设置为空字符串,用于后面格式校验演示
  23. testData3.setField3("");
  24. data.add(testData3);
  25. return new ListItemReader<>(data);
  26. }
  27. }

上面注册了一个ItemReader类型的Bean,后续都用它作为读取数据的来源。

格式校验

ItemProcessor的实现类ValidatingItemProcessor可以用于数据格式校验。举个例子,在cc.mrbird.batch包下新建job包,然后在该包下新建ValidatingItemProcessorDemo

  1. @Component
  2. public class ValidatingItemProcessorDemo {
  3. @Autowired
  4. private JobBuilderFactory jobBuilderFactory;
  5. @Autowired
  6. private StepBuilderFactory stepBuilderFactory;
  7. @Autowired
  8. private ListItemReader<TestData> simpleReader;
  9. @Bean
  10. public Job validatingItemProcessorJob() {
  11. return jobBuilderFactory.get("validatingItemProcessorJob")
  12. .start(step())
  13. .build();
  14. }
  15. private Step step() {
  16. return stepBuilderFactory.get("step")
  17. .<TestData, TestData>chunk(2)
  18. .reader(simpleReader)
  19. .processor(validatingItemProcessor())
  20. .writer(list -> list.forEach(System.out::println))
  21. .build();
  22. }
  23. private ValidatingItemProcessor<TestData> validatingItemProcessor() {
  24. ValidatingItemProcessor<TestData> processor = new ValidatingItemProcessor<>();
  25. processor.setValidator(value -> {
  26. // 对每一条数据进行校验
  27. if ("".equals(value.getField3())) {
  28. // 如果field3的值为空串,则抛异常
  29. throw new ValidationException("field3的值不合法");
  30. }
  31. });
  32. return processor;
  33. }
  34. }

通过ValidatingItemProcessor我们对ItemReader读取的每一条数据进行校验,如果field3的值为空串的话,则抛出ValidationException("field3的值不合法")异常。ItemProcessor通过步骤创建工厂的processor()设置。

启动项目,控制台日志的打印如下:

  1. 2020-03-09 14:18:47.186 INFO 17967 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=validatingItemProcessorJob]] launched with the following parameters: [{}]
  2. 2020-03-09 14:18:47.252 INFO 17967 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step]
  3. TestData{id=1, field1='11', field2='12', field3='13'}
  4. TestData{id=2, field1='21', field2='22', field3='23'}
  5. 2020-03-09 14:18:47.300 ERROR 17967 --- [ main] o.s.batch.core.step.AbstractStep : Encountered an error executing step step in job validatingItemProcessorJob
  6. org.springframework.batch.item.validator.ValidationException: field3的值不合法
  7. at cc.mrbird.batch.entity.job.ValidatingItemProcessorDemo.lambda$validatingItemProcessor$1(ValidatingItemProcessorDemo.java:50) ~[classes/:na]
  8. at org.springframework.batch.item.validator.ValidatingItemProcessor.process(ValidatingItemProcessor.java:84) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  9. at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:134) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  10. at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:319) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  11. at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:210) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  12. at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  13. at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  14. at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  15. at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.2.4.RELEASE.jar:5.2.4.RELEASE]
  16. at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  17. at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  18. at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  19. at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  20. at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  21. at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  22. at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  23. at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  24. at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  25. at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  26. at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  27. at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  28. at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
  29. at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  30. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_231]
  31. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_231]
  32. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_231]
  33. at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_231]
  34. at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) [spring-aop-5.2.4.RELEASE.jar:5.2.4.RELEASE]
  35. at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) [spring-aop-5.2.4.RELEASE.jar:5.2.4.RELEASE]
  36. at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.2.4.RELEASE.jar:5.2.4.RELEASE]
  37. at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  38. at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.2.4.RELEASE.jar:5.2.4.RELEASE]
  39. at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.2.4.RELEASE.jar:5.2.4.RELEASE]
  40. at com.sun.proxy.$Proxy46.run(Unknown Source) [na:na]
  41. at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.execute(JobLauncherCommandLineRunner.java:192) [spring-boot-autoconfigure-2.2.5.RELEASE.jar:2.2.5.RELEASE]
  42. at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.executeLocalJobs(JobLauncherCommandLineRunner.java:166) [spring-boot-autoconfigure-2.2.5.RELEASE.jar:2.2.5.RELEASE]
  43. at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.launchJobFromProperties(JobLauncherCommandLineRunner.java:153) [spring-boot-autoconfigure-2.2.5.RELEASE.jar:2.2.5.RELEASE]
  44. at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.run(JobLauncherCommandLineRunner.java:148) [spring-boot-autoconfigure-2.2.5.RELEASE.jar:2.2.5.RELEASE]
  45. at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:784) [spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
  46. at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:768) [spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
  47. at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) [spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
  48. at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
  49. at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) [spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
  50. at cc.mrbird.batch.SpringBatchItemprocessorApplication.main(SpringBatchItemprocessorApplication.java:12) [classes/:na]
  51. 2020-03-09 14:18:47.307 INFO 17967 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step] executed in 55ms
  52. 2020-03-09 14:18:47.335 INFO 17967 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=validatingItemProcessorJob]] completed with the following parameters: [{}] and the following status: [FAILED] in 127ms

可以看到任务处理过程中抛出了预期异常,关于任务处理中如何处理异常,可以参考后续的文章。

除了使用这种方式外,我们还可以使用BeanValidatingItemProcessor校验使用JSR-303注解标注的实体类。比如,在TestData类的field3属性上添加@NotBlank注解:

  1. public class TestData {
  2. private int id;
  3. private String field1;
  4. private String field2;
  5. @NotBlank
  6. private String field3;
  7. // get,set,toString略
  8. }

使用该注解需要在pom中添加spring-boot-starter-validation依赖:

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-validation</artifactId>
  4. </dependency>

然后在job包下新建BeanValidatingItemProcessorDemo

  1. @Component
  2. public class BeanValidatingItemProcessorDemo {
  3. @Autowired
  4. private JobBuilderFactory jobBuilderFactory;
  5. @Autowired
  6. private StepBuilderFactory stepBuilderFactory;
  7. @Autowired
  8. private ListItemReader<TestData> simpleReader;
  9. @Bean
  10. public Job beanValidatingItemProcessorJob() throws Exception {
  11. return jobBuilderFactory.get("beanValidatingItemProcessorJob")
  12. .start(step())
  13. .build();
  14. }
  15. private Step step() throws Exception {
  16. return stepBuilderFactory.get("step")
  17. .<TestData, TestData>chunk(2)
  18. .reader(simpleReader)
  19. .processor(beanValidatingItemProcessor())
  20. .writer(list -> list.forEach(System.out::println))
  21. .build();
  22. }
  23. private BeanValidatingItemProcessor<TestData> beanValidatingItemProcessor() throws Exception {
  24. BeanValidatingItemProcessor<TestData> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
  25. // 开启过滤,不符合规则的数据被过滤掉;
  26. beanValidatingItemProcessor.setFilter(true);
  27. beanValidatingItemProcessor.afterPropertiesSet();
  28. return beanValidatingItemProcessor;
  29. }
  30. }

启动项目后,控制台日志打印如下:

  1. 2020-03-09 14:31:14.813 INFO 18100 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=beanValidatingItemProcessorJob]] launched with the following parameters: [{}]
  2. 2020-03-09 14:31:14.873 INFO 18100 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step]
  3. TestData{id=1, field1='11', field2='12', field3='13'}
  4. TestData{id=2, field1='21', field2='22', field3='23'}
  5. 2020-03-09 14:31:14.959 INFO 18100 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step] executed in 85ms
  6. 2020-03-09 14:31:14.980 INFO 18100 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=beanValidatingItemProcessorJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 145ms
  7. 2020-03-09 14:31:15.069 INFO 18100 --- [ main]

可以看到,不符合规则的数据已经被排除了。如果不开启过滤beanValidatingItemProcessor.setFilter(false),那么在遇到不符合注解校验规则的数据,将抛出如下异常:

  1. org.springframework.batch.item.validator.ValidationException: Validation failed for TestData{id=3, field1='31', field2='32', field3=''}:
  2. Field error in object 'item' on field 'field3': rejected value []; codes [NotBlank.item.field3,NotBlank.field3,NotBlank.java.lang.String,NotBlank]; arguments [org.springframework.context.support.DefaultMessageSourceResolvable: codes [item.field3,field3]; arguments []; default message [field3]]; default message [不能为空]
  3. at org.springframework.batch.item.validator.SpringValidator.validate(SpringValidator.java:54) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  4. at org.springframework.batch.item.validator.ValidatingItemProcessor.process(ValidatingItemProcessor.java:84) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  5. at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:134) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  6. at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:319) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
  7. ...

数据过滤

通过自定义ItemProcessor的实现类,我们也可以简单地实现数据过滤。在cc.mrbird.batch包下新建processor包,然后在该包下新建TestDataFilterItemProcessor

  1. @Component
  2. public class TestDataFilterItemProcessor implements ItemProcessor<TestData, TestData> {
  3. @Override
  4. public TestData process(TestData item) {
  5. // 返回null,会过滤掉这条数据
  6. return "".equals(item.getField3()) ? null : item;
  7. }
  8. }

TestDataFilterItemProcessor实现了ItemProcessorprocess()方法,在该方法内编写具体的校验逻辑,上面代码判断TestData的field3是否为空串,是的话返回null(返回null会过滤掉这条数据)。

接着在job包下新建TestDataFilterItemProcessorDemo

  1. @Component
  2. public class TestDataFilterItemProcessorDemo {
  3. @Autowired
  4. private JobBuilderFactory jobBuilderFactory;
  5. @Autowired
  6. private StepBuilderFactory stepBuilderFactory;
  7. @Autowired
  8. private ListItemReader<TestData> simpleReader;
  9. @Autowired
  10. private TestDataFilterItemProcessor testDataFilterItemProcessor;
  11. @Bean
  12. public Job testDataFilterItemProcessorJob() {
  13. return jobBuilderFactory.get("testDataFilterItemProcessorJob")
  14. .start(step())
  15. .build();
  16. }
  17. private Step step() {
  18. return stepBuilderFactory.get("step")
  19. .<TestData, TestData>chunk(2)
  20. .reader(simpleReader)
  21. .processor(testDataFilterItemProcessor)
  22. .writer(list -> list.forEach(System.out::println))
  23. .build();
  24. }
  25. }

启动项目,控制台日志打印如下:

  1. 2020-03-09 15:03:30.932 INFO 18690 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=testDataFilterItemProcessorJob]] launched with the following parameters: [{}]
  2. 2020-03-09 15:03:30.973 INFO 18690 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step]
  3. TestData{id=1, field1='11', field2='12', field3='13'}
  4. TestData{id=2, field1='21', field2='22', field3='23'}
  5. 2020-03-09 15:03:31.012 INFO 18690 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step] executed in 39ms
  6. 2020-03-09 15:03:31.037 INFO 18690 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=testDataFilterItemProcessorJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 95ms

数据转换

在processor包下新建一个ItemProcessor实现类TestDataTransformItemPorcessor

  1. @Component
  2. public class TestDataTransformItemPorcessor implements ItemProcessor<TestData, TestData> {
  3. @Override
  4. public TestData process(TestData item) {
  5. // field1值拼接 hello
  6. item.setField1(item.getField1() + " hello");
  7. return item;
  8. }
  9. }

在job包下新建TestDataTransformItemPorcessorDemo

  1. @Component
  2. public class TestDataTransformItemPorcessorDemo {
  3. @Autowired
  4. private JobBuilderFactory jobBuilderFactory;
  5. @Autowired
  6. private StepBuilderFactory stepBuilderFactory;
  7. @Autowired
  8. private ListItemReader<TestData> simpleReader;
  9. @Autowired
  10. private TestDataTransformItemPorcessor testDataTransformItemPorcessor;
  11. @Bean
  12. public Job testDataTransformItemPorcessorJob() {
  13. return jobBuilderFactory.get("testDataTransformItemPorcessorJob")
  14. .start(step())
  15. .build();
  16. }
  17. private Step step() {
  18. return stepBuilderFactory.get("step")
  19. .<TestData, TestData>chunk(2)
  20. .reader(simpleReader)
  21. .processor(testDataTransformItemPorcessor)
  22. .writer(list -> list.forEach(System.out::println))
  23. .build();
  24. }
  25. }

启动项目,控制台日志打印如下:

  1. 2020-03-09 15:08:55.628 INFO 18775 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=testDataTransformItemPorcessorJob]] launched with the following parameters: [{}]
  2. 2020-03-09 15:08:55.694 INFO 18775 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step]
  3. TestData{id=1, field1='11 hello', field2='12', field3='13'}
  4. TestData{id=2, field1='21 hello', field2='22', field3='23'}
  5. TestData{id=3, field1='31 hello', field2='32', field3=''}
  6. 2020-03-09 15:08:55.757 INFO 18775 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step] executed in 63ms
  7. 2020-03-09 15:08:55.781 INFO 18775 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=testDataTransformItemPorcessorJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 144ms

聚合处理

在创建Step的时候,除了制定一个ItemProcess外,我们可以通过CompositeItemProcessor聚合多个processor处理过程。

在job包下新建CompositeItemProcessorDemo

  1. @Component
  2. public class CompositeItemProcessorDemo {
  3. @Autowired
  4. private JobBuilderFactory jobBuilderFactory;
  5. @Autowired
  6. private StepBuilderFactory stepBuilderFactory;
  7. @Autowired
  8. private ListItemReader<TestData> simpleReader;
  9. @Autowired
  10. private TestDataFilterItemProcessor testDataFilterItemProcessor;
  11. @Autowired
  12. private TestDataTransformItemPorcessor testDataTransformItemPorcessor;
  13. @Bean
  14. public Job compositeItemProcessorJob() {
  15. return jobBuilderFactory.get("compositeItemProcessorJob")
  16. .start(step())
  17. .build();
  18. }
  19. private Step step() {
  20. return stepBuilderFactory.get("step")
  21. .<TestData, TestData>chunk(2)
  22. .reader(simpleReader)
  23. .processor(compositeItemProcessor())
  24. .writer(list -> list.forEach(System.out::println))
  25. .build();
  26. }
  27. // CompositeItemProcessor组合多种中间处理器
  28. private CompositeItemProcessor<TestData, TestData> compositeItemProcessor() {
  29. CompositeItemProcessor<TestData, TestData> processor = new CompositeItemProcessor<>();
  30. List<ItemProcessor<TestData, TestData>> processors = Arrays.asList(testDataFilterItemProcessor, testDataTransformItemPorcessor);
  31. // 代理两个processor
  32. processor.setDelegates(processors);
  33. return processor;
  34. }
  35. }

上面代码中,我们通过CompositeItemProcessor聚合了前面定义的连个processor:TestDataFilterItemProcessorTestDataTransformItemPorcessor

启动项目,控制台日志打印如下:

  1. 2020-03-09 15:21:24.960 INFO 18882 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=compositeItemProcessorJob]] launched with the following parameters: [{}]
  2. 2020-03-09 15:21:25.005 INFO 18882 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step]
  3. TestData{id=1, field1='11 hello', field2='12', field3='13'}
  4. TestData{id=2, field1='21 hello', field2='22', field3='23'}
  5. 2020-03-09 15:21:25.065 INFO 18882 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step] executed in 60ms
  6. 2020-03-09 15:21:25.104 INFO 18882 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=compositeItemProcessorJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 128ms

从结果可以看到,数据不但进行了过滤,还进行了转换(拼接hello)。