Spring Batch提供了多种监听器Listener,用于在任务处理过程中触发我们的逻辑代码。常用的监听器根据粒度从粗到细分别有:Job级别的监听器JobExecutionListener、Step级别的监听器StepExecutionListener、Chunk监听器ChunkListener、ItemReader监听器ItemReadListener、ItemWriter监听器ItemWriteListener和ItemProcessor监听器ItemProcessListener等。具体可以参考下表:

监听器 具体说明
JobExecutionListener 在Job开始之前(beforeJob)和之后(aflerJob)触发
StepExecutionListener 在Step开始之前(beforeStep)和之后(afterStep)触发
ChunkListener 在 Chunk 开始之前(beforeChunk),之后(afterChunk)和错误后(afterChunkError)触发
ItemReadListener 在 Read 开始之前(beforeRead>,之后(afterRead)和错误后(onReadError)触发
ItemProcessListener 在 Processor 开始之前(beforeProcess),之后(afterProcess)和错误后(onProcessError)触发
ItemWriterListener 在 Writer 开始之前(beforeWrite),之后(afterWrite)和错误后(onWriteError)触发

框架搭建

新建一个Spring Boot项目,版本为2.2.4.RELEASE,artifactId为spring-batch-listener,项目结构如下图所示:

Spring Batch监听器 - 图1

剩下的数据库层的准备,项目配置,依赖引入和Spring Batch入门文章中的框架搭建步骤一致,这里就不再赘述。

监听器演示

每种监听器都可以通过两种方式使用:

  1. 接口实现;
  2. 注解驱动。

先来看看通过实现接口的方式使用监听器。在cc.mrbird.batch包下新建listener包,然后在该包下新建MyJobExecutionListener,实现JobExecutionListener接口:

  1. @Component
  2. public class MyJobExecutionListener implements JobExecutionListener {
  3. @Override
  4. public void beforeJob(JobExecution jobExecution) {
  5. System.out.println("before job execute: " + jobExecution.getJobInstance().getJobName());
  6. }
  7. @Override
  8. public void afterJob(JobExecution jobExecution) {
  9. System.out.println("after job execute: " + jobExecution.getJobInstance().getJobName());
  10. }
  11. }

上面实现的两个方法很直观了,触发时机分别为任务执行前和任务执行后。

接着看看如何使用注解驱动使用监听器。在listener包下新建MyStepExecutionListener

  1. @Component
  2. public class MyStepExecutionListener {
  3. @BeforeStep
  4. public void breforeStep(StepExecution stepExecution) {
  5. System.out.println("before step execute: " + stepExecution.getStepName());
  6. }
  7. @AfterStep
  8. public void afterStep(StepExecution stepExecution) {
  9. System.out.println("after step execute: " + stepExecution.getStepName());
  10. }
  11. }

通过注解的方式不需要实现接口,而是在对应的方法上通过诸如@BeforeStep@AfterStep等注解标注即可,不过方法的签名必须符合注解的要求,否则会反射失败。比如,查看@BeforeStep的源码:

Spring Batch监听器 - 图2

监听器的创建大致就这两种姿势了,下面的例子不在详细说明,直接贴代码。

在listener包下继续创建MyChunkListenerMyItemReaderListenerMyItemProcessListenerMyItemWriterListener

MyChunkListener

  1. @Component
  2. public class MyChunkListener implements ChunkListener {
  3. @Override
  4. public void beforeChunk(ChunkContext context) {
  5. System.out.println("before chunk: " + context.getStepContext().getStepName());
  6. }
  7. @Override
  8. public void afterChunk(ChunkContext context) {
  9. System.out.println("after chunk: " + context.getStepContext().getStepName());
  10. }
  11. @Override
  12. public void afterChunkError(ChunkContext context) {
  13. System.out.println("before chunk error: " + context.getStepContext().getStepName());
  14. }
  15. }

MyItemReaderListener

  1. @Component
  2. public class MyItemReaderListener implements ItemReadListener<String> {
  3. @Override
  4. public void beforeRead() {
  5. System.out.println("before read");
  6. }
  7. @Override
  8. public void afterRead(String item) {
  9. System.out.println("after read: " + item);
  10. }
  11. @Override
  12. public void onReadError(Exception ex) {
  13. System.out.println("on read error: " + ex.getMessage());
  14. }
  15. }

MyItemProcessListener

  1. @Component
  2. public class MyItemProcessListener implements ItemProcessListener<String, String> {
  3. @Override
  4. public void beforeProcess(String item) {
  5. System.out.println("before process: " + item);
  6. }
  7. @Override
  8. public void afterProcess(String item, String result) {
  9. System.out.println("after process: " + item + " result: " + result);
  10. }
  11. @Override
  12. public void onProcessError(String item, Exception e) {
  13. System.out.println("on process error: " + item + " , error message: " + e.getMessage());
  14. }
  15. }

MyItemWriterListener

  1. @Component
  2. public class MyItemWriterListener implements ItemWriteListener<String> {
  3. @Override
  4. public void beforeWrite(List<? extends String> items) {
  5. System.out.println("before write: " + items);
  6. }
  7. @Override
  8. public void afterWrite(List<? extends String> items) {
  9. System.out.println("after write: " + items);
  10. }
  11. @Override
  12. public void onWriteError(Exception exception, List<? extends String> items) {
  13. System.out.println("on write error: " + items + " , error message: " + exception.getMessage());
  14. }
  15. }

准备好这些监听器后,我们在cc.mrbird.batch包下新建job包,然后在该包下新建ListenerTestJobDemo

  1. @Component
  2. public class ListenerTestJobDemo {
  3. @Autowired
  4. private JobBuilderFactory jobBuilderFactory;
  5. @Autowired
  6. private StepBuilderFactory stepBuilderFactory;
  7. @Autowired
  8. private MyJobExecutionListener myJobExecutionListener;
  9. @Autowired
  10. private MyStepExecutionListener myStepExecutionListener;
  11. @Autowired
  12. private MyChunkListener myChunkListener;
  13. @Autowired
  14. private MyItemReaderListener myItemReaderListener;
  15. @Autowired
  16. private MyItemProcessListener myItemProcessListener;
  17. @Autowired
  18. private MyItemWriterListener myItemWriterListener;
  19. @Bean
  20. public Job listenerTestJob() {
  21. return jobBuilderFactory.get("listenerTestJob")
  22. .start(step())
  23. .listener(myJobExecutionListener)
  24. .build();
  25. }
  26. private Step step() {
  27. return stepBuilderFactory.get("step")
  28. .listener(myStepExecutionListener)
  29. .<String, String>chunk(2)
  30. .faultTolerant()
  31. .listener(myChunkListener)
  32. .reader(reader())
  33. .listener(myItemReaderListener)
  34. .processor(processor())
  35. .listener(myItemProcessListener)
  36. .writer(list -> list.forEach(System.out::println))
  37. .listener(myItemWriterListener)
  38. .build();
  39. }
  40. private ItemReader<String> reader() {
  41. List<String> data = Arrays.asList("java", "c++", "javascript", "python");
  42. return new simpleReader(data);
  43. }
  44. private ItemProcessor<String, String> processor() {
  45. return item -> item + " language";
  46. }
  47. }
  48. class simpleReader implements ItemReader<String> {
  49. private Iterator<String> iterator;
  50. public simpleReader(List<String> data) {
  51. this.iterator = data.iterator();
  52. }
  53. @Override
  54. public String read() {
  55. return iterator.hasNext() ? iterator.next() : null;
  56. }
  57. }

上面代码我们在相应的位置配置了监听器(配置chunk监听器的时候,必须配置faultTolerant())。

启动项目,控制台日志打印如下:

  1. 2020-03-09 17:08:34.439 INFO 20165 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=listenerTestJob]] launched with the following parameters: [{}]
  2. before job execute: listenerTestJob3
  3. 2020-03-09 17:08:34.495 INFO 20165 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step]
  4. before step execute: step
  5. before chunk: step
  6. before read
  7. after read: java
  8. before read
  9. after read: c++
  10. before process: java
  11. after process: java result: java language
  12. before process: c++
  13. after process: c++ result: c++ language
  14. before write: [java language, c++ language]
  15. java language
  16. c++ language
  17. after write: [java language, c++ language]
  18. after chunk: step
  19. before chunk: step
  20. before read
  21. after read: javascript
  22. before read
  23. after read: python
  24. before process: javascript
  25. after process: javascript result: javascript language
  26. before process: python
  27. after process: python result: python language
  28. before write: [javascript language, python language]
  29. javascript language
  30. python language
  31. after write: [javascript language, python language]
  32. after chunk: step
  33. before chunk: step
  34. before read
  35. after chunk: step
  36. after step execute: step
  37. 2020-03-09 17:08:34.546 INFO 20165 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step] executed in 51ms
  38. after job execute: listenerTestJob3
  39. 2020-03-09 17:08:34.566 INFO 20165 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=listenerTestJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 105ms

从上面的运行结果我们可以看出:

  1. 证实了chunk(2)表示每一批处理2个数据块;
  2. Step里的执行顺序是read -> process -> writer。

聚合监听器

每种监听器可以通过对应的聚合类组合在一起,比如有多个JobExecutionListener,则可以使用CompositeJobExecutionListener聚合它们。上面介绍的这几种监听器都有与之对应的CompositeXXXListener聚合类,这里只演示CompositeJobExecutionListener,剩下的以此类推。

在job包下新建CompositeJobExecutionListenerJobDemo

  1. @Component
  2. public class CompositeJobExecutionListenerJobDemo {
  3. @Autowired
  4. private JobBuilderFactory jobBuilderFactory;
  5. @Autowired
  6. private StepBuilderFactory stepBuilderFactory;
  7. @Bean
  8. public Job compositeJobExecutionListenerJob() {
  9. return jobBuilderFactory.get("compositeJobExecutionListenerJob")
  10. .start(step())
  11. .listener(compositeJobExecutionListener())
  12. .build();
  13. }
  14. private Step step() {
  15. return stepBuilderFactory.get("step")
  16. .tasklet((contribution, chunkContext) -> {
  17. System.out.println("执行步骤....");
  18. return RepeatStatus.FINISHED;
  19. }).build();
  20. }
  21. private CompositeJobExecutionListener compositeJobExecutionListener() {
  22. CompositeJobExecutionListener listener = new CompositeJobExecutionListener();
  23. // 任务监听器1
  24. JobExecutionListener jobExecutionListenerOne = new JobExecutionListener() {
  25. @Override
  26. public void beforeJob(JobExecution jobExecution) {
  27. System.out.println("任务监听器One,before job execute: " + jobExecution.getJobInstance().getJobName());
  28. }
  29. @Override
  30. public void afterJob(JobExecution jobExecution) {
  31. System.out.println("任务监听器One,after job execute: " + jobExecution.getJobInstance().getJobName());
  32. }
  33. };
  34. // 任务监听器2
  35. JobExecutionListener jobExecutionListenerTwo = new JobExecutionListener() {
  36. @Override
  37. public void beforeJob(JobExecution jobExecution) {
  38. System.out.println("任务监听器Two,before job execute: " + jobExecution.getJobInstance().getJobName());
  39. }
  40. @Override
  41. public void afterJob(JobExecution jobExecution) {
  42. System.out.println("任务监听器Two,after job execute: " + jobExecution.getJobInstance().getJobName());
  43. }
  44. };
  45. // 聚合
  46. listener.setListeners(Arrays.asList(jobExecutionListenerOne, jobExecutionListenerTwo));
  47. return listener;
  48. }
  49. }

启动项目,控制台日志打印如下:

  1. 2020-03-09 17:26:47.533 INFO 20310 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=compositeJobExecutionListenerJob]] launched with the following parameters: [{}]
  2. 任务监听器Onebefore job execute: compositeJobExecutionListenerJob
  3. 任务监听器Twobefore job execute: compositeJobExecutionListenerJob
  4. 2020-03-09 17:26:47.603 INFO 20310 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step]
  5. 执行步骤....
  6. 2020-03-09 17:26:47.660 INFO 20310 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step] executed in 57ms
  7. 任务监听器Twoafter job execute: compositeJobExecutionListenerJob
  8. 任务监听器Oneafter job execute: compositeJobExecutionListenerJob
  9. 2020-03-09 17:26:47.693 INFO 20310 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=compositeJobExecutionListenerJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 129ms

除了本文介绍的这几个监听器外,还有一些和异常处理相关的监听器,会在后续的文章中提到。