参考文档: Spring_Batch_in_Action.pdf Spring.Batch批处理框架.pdf

企业中经常会有需要批处理才能完成的业务操作,比如:自动化地处理大批量复杂的数据,如月结计算;重复性地处理大批量数据,如费率计算;充当内部系统和外部系统的数据纽带,中间需要对数据进行格式化,校验,转换处理等。

Spring Batch是一个轻量级但功能又十分全面的批处理框架,本节我们将通过一些简单的例子来入门Spring Batch。

框架搭建

新建一个Spring Boot项目,版本为2.2.4.RELEASE,artifactId为spring-batch-start,项目结构如下图所示:

Spring Batch入门 - 图1

然后在pom中引入Spring Batch、MySQL和JDBC依赖,引入后pom内容如下所示:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.2.5.RELEASE</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>cc.mrbird</groupId>
  12. <artifactId>spring-batch-start</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>spring-batch-start</name>
  15. <description>Demo project for Spring Boot</description>
  16. <properties>
  17. <java.version>1.8</java.version>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-batch</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>mysql</groupId>
  26. <artifactId>mysql-connector-java</artifactId>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.springframework.boot</groupId>
  30. <artifactId>spring-boot-starter-jdbc</artifactId>
  31. </dependency>
  32. </dependencies>
  33. <build>
  34. <plugins>
  35. <plugin>
  36. <groupId>org.springframework.boot</groupId>
  37. <artifactId>spring-boot-maven-plugin</artifactId>
  38. </plugin>
  39. </plugins>
  40. </build>
  41. </project>

在编写代码之前,我们先来简单了解下Spring Batch的组成:

Spring Batch入门 - 图2

Spring Batch里最基本的单元就是任务Job,一个Job由若干个步骤Step组成。任务启动器Job Launcher负责运行Job,任务存储仓库Job Repository存储着Job的执行状态,参数和日志等信息。Job处理任务又可以分为三大类:数据读取Item Reader、数据中间处理Item Processor和数据输出Item Writer。

任务存储仓库可以是关系型数据库MySQL,非关系型数据库MongoDB或者直接存储在内存中,本篇使用的是MySQL作为任务存储仓库。

新建一个名称为springbatch的MySQL数据库,然后导入org.springframework.batch.core目录下的schema-mysql.sql文件:

Spring Batch入门 - 图3Spring Batch入门 - 图4

导入后,库表如下图所示:

Spring Batch入门 - 图5

然后在项目的配置文件application.yml里添加MySQL相关配置:

  1. spring:
  2. datasource:
  3. driver-class-name: com.mysql.cj.jdbc.Driver
  4. url: jdbc:mysql://127.0.0.1:3306/springbatch
  5. username: root
  6. password: 123456

接着在Spring Boot的入口类上添加@EnableBatchProcessing注解,表示开启Spring Batch批处理功能:

  1. @SpringBootApplication
  2. @EnableBatchProcessing
  3. public class SpringBatchStartApplication {
  4. public static void main(String[] args) {
  5. SpringApplication.run(SpringBatchStartApplication.class, args);
  6. }
  7. }

至此,基本框架搭建好了,下面开始配置一个简单的任务。

编写第一个任务

在cc.mrbird.batch目录下新建job包,然后在该包下新建一个FirstJobDemo类,代码如下所示:

  1. @Component
  2. public class FirstJobDemo {
  3. @Autowired
  4. private JobBuilderFactory jobBuilderFactory;
  5. @Autowired
  6. private StepBuilderFactory stepBuilderFactory;
  7. @Bean
  8. public Job firstJob() {
  9. return jobBuilderFactory.get("firstJob")
  10. .start(step())
  11. .build();
  12. }
  13. private Step step() {
  14. return stepBuilderFactory.get("step")
  15. .tasklet((contribution, chunkContext) -> {
  16. System.out.println("执行步骤....");
  17. return RepeatStatus.FINISHED;
  18. }).build();
  19. }
  20. }

上面代码中,我们注入了JobBuilderFactory任务创建工厂和StepBuilderFactory步骤创建工厂,分别用于创建任务Job和步骤Step。JobBuilderFactoryget方法用于创建一个指定名称的任务,start方法指定任务的开始步骤,步骤通过StepBuilderFactory构建。

步骤Step由若干个小任务Tasklet组成,所以我们通过tasklet方法创建。tasklet方法接收一个Tasklet类型参数,Tasklet是一个函数是接口,源码如下:

  1. public interface Tasklet {
  2. @Nullable
  3. RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception;
  4. }

所以我们可以使用lambda表达式创建一个匿名实现:

  1. (contribution, chunkContext) -> {
  2. System.out.println("执行步骤....");
  3. return RepeatStatus.FINISHED;
  4. }

该匿名实现必须返回一个明确的执行状态,这里返回RepeatStatus.FINISHED表示该小任务执行成功,正常结束。

此外,需要注意的是,我们配置的任务Job必须注册到Spring IOC容器中,并且任务的名称和步骤的名称组成唯一。比如上面的例子,我们的任务名称为firstJob,步骤的名称为step,如果存在别的任务和步骤组合也叫这个名称的话,则会执行失败。

启动项目,控制台打印日志如下:

  1. ...
  2. 2020-03-06 11:01:11.785 INFO 17324 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=firstJob]] launched with the following parameters: [{}]
  3. 2020-03-06 11:01:11.846 INFO 17324 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step]
  4. 执行步骤....
  5. 2020-03-06 11:01:11.886 INFO 17324 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step] executed in 40ms
  6. 2020-03-06 11:01:11.909 INFO 17324 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=firstJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 101ms

可以看到,任务成功执行了,数据库的库表也将记录相关运行日志。

重新启动项目,控制台并不会再次打印出任务执行日志,因为Job名称和 Step名称组成唯一,执行完的不可重复的任务,不会再次执行。

多步骤任务

参考 Controlling Step Flow

一个复杂的任务一般包含多个步骤,下面举个多步骤任务的例子。在job包下新建MultiStepJobDemo类:

  1. @Component
  2. public class MultiStepJobDemo {
  3. @Autowired
  4. private JobBuilderFactory jobBuilderFactory;
  5. @Autowired
  6. private StepBuilderFactory stepBuilderFactory;
  7. @Bean
  8. public Job multiStepJob() {
  9. return jobBuilderFactory.get("multiStepJob")
  10. .start(step1())
  11. .next(step2())
  12. .next(step3())
  13. .build();
  14. }
  15. private Step step1() {
  16. return stepBuilderFactory.get("step1")
  17. .tasklet((stepContribution, chunkContext) -> {
  18. System.out.println("执行步骤一操作。。。");
  19. return RepeatStatus.FINISHED;
  20. }).build();
  21. }
  22. private Step step2() {
  23. return stepBuilderFactory.get("step2")
  24. .tasklet((stepContribution, chunkContext) -> {
  25. System.out.println("执行步骤二操作。。。");
  26. return RepeatStatus.FINISHED;
  27. }).build();
  28. }
  29. private Step step3() {
  30. return stepBuilderFactory.get("step3")
  31. .tasklet((stepContribution, chunkContext) -> {
  32. System.out.println("执行步骤三操作。。。");
  33. return RepeatStatus.FINISHED;
  34. }).build();
  35. }
  36. }

上面代码中,我们通过step1()step2()step3()三个方法创建了三个步骤。Job里要使用这些步骤,只需要通过JobBuilderFactorystart方法指定第一个步骤,然后通过next方法不断地指定下一个步骤即可。

启动项目,控制台打印日志如下:

  1. 2020-03-06 13:52:52.188 INFO 18472 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=multiStepJob]] launched with the following parameters: [{}]
  2. 2020-03-06 13:52:52.222 INFO 18472 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
  3. 执行步骤一操作。。。
  4. 2020-03-06 13:52:52.251 INFO 18472 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step1] executed in 29ms
  5. 2020-03-06 13:52:52.292 INFO 18472 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step2]
  6. 执行步骤二操作。。。
  7. 2020-03-06 13:52:52.323 INFO 18472 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step2] executed in 30ms
  8. 2020-03-06 13:52:52.375 INFO 18472 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step3]
  9. 执行步骤三操作。。。
  10. 2020-03-06 13:52:52.405 INFO 18472 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step3] executed in 29ms
  11. 2020-03-06 13:52:52.428 INFO 18472 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=multiStepJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 231ms

三个步骤依次执行成功。

多个步骤在执行过程中也可以通过上一个步骤的执行状态来决定是否执行下一个步骤,修改上面的代码:

  1. @Component
  2. public class MultiStepJobDemo {
  3. @Autowired
  4. private JobBuilderFactory jobBuilderFactory;
  5. @Autowired
  6. private StepBuilderFactory stepBuilderFactory;
  7. @Bean
  8. public Job multiStepJob() {
  9. return jobBuilderFactory.get("multiStepJob2")
  10. .start(step1())
  11. .on(ExitStatus.COMPLETED.getExitCode()).to(step2())
  12. .from(step2())
  13. .on(ExitStatus.COMPLETED.getExitCode()).to(step3())
  14. .from(step3()).end()
  15. .build();
  16. }
  17. private Step step1() {
  18. return stepBuilderFactory.get("step1")
  19. .tasklet((stepContribution, chunkContext) -> {
  20. System.out.println("执行步骤一操作。。。");
  21. return RepeatStatus.FINISHED;
  22. }).build();
  23. }
  24. private Step step2() {
  25. return stepBuilderFactory.get("step2")
  26. .tasklet((stepContribution, chunkContext) -> {
  27. System.out.println("执行步骤二操作。。。");
  28. return RepeatStatus.FINISHED;
  29. }).build();
  30. }
  31. private Step step3() {
  32. return stepBuilderFactory.get("step3")
  33. .tasklet((stepContribution, chunkContext) -> {
  34. System.out.println("执行步骤三操作。。。");
  35. return RepeatStatus.FINISHED;
  36. }).build();
  37. }
  38. }

multiStepJob()方法的含义是:multiStepJob2任务先执行step1,当step1状态为完成时,接着执行step2,当step2的状态为完成时,接着执行step3。ExitStatus.COMPLETED常量表示任务顺利执行完毕,正常退出,该类还包含以下几种退出状态:

  1. public class ExitStatus implements Serializable, Comparable<ExitStatus> {
  2. /**
  3. * Convenient constant value representing unknown state - assumed not
  4. * continuable.
  5. */
  6. public static final ExitStatus UNKNOWN = new ExitStatus("UNKNOWN");
  7. /**
  8. * Convenient constant value representing continuable state where processing
  9. * is still taking place, so no further action is required. Used for
  10. * asynchronous execution scenarios where the processing is happening in
  11. * another thread or process and the caller is not required to wait for the
  12. * result.
  13. */
  14. public static final ExitStatus EXECUTING = new ExitStatus("EXECUTING");
  15. /**
  16. * Convenient constant value representing finished processing.
  17. */
  18. public static final ExitStatus COMPLETED = new ExitStatus("COMPLETED");
  19. /**
  20. * Convenient constant value representing job that did no processing (e.g.
  21. * because it was already complete).
  22. */
  23. public static final ExitStatus NOOP = new ExitStatus("NOOP");
  24. /**
  25. * Convenient constant value representing finished processing with an error.
  26. */
  27. public static final ExitStatus FAILED = new ExitStatus("FAILED");
  28. /**
  29. * Convenient constant value representing finished processing with
  30. * interrupted status.
  31. */
  32. public static final ExitStatus STOPPED = new ExitStatus("STOPPED");
  33. ...
  34. }

启动项目,控制台日志打印如下:

  1. 2020-03-06 14:21:49.384 INFO 18745 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=multiStepJob2]] launched with the following parameters: [{}]
  2. 2020-03-06 14:21:49.427 INFO 18745 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
  3. 执行步骤一操作。。。
  4. 2020-03-06 14:21:49.456 INFO 18745 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step1] executed in 29ms
  5. 2020-03-06 14:21:49.501 INFO 18745 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step2]
  6. 执行步骤二操作。。。
  7. 2020-03-06 14:21:49.527 INFO 18745 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step2] executed in 26ms
  8. 2020-03-06 14:21:49.576 INFO 18745 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step3]
  9. 执行步骤三操作。。。
  10. 2020-03-06 14:21:49.604 INFO 18745 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step3] executed in 28ms
  11. 2020-03-06 14:21:49.629 INFO 18745 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=multiStepJob2]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 238ms

Flow的用法

Flow的作用就是可以将多个步骤Step组合在一起然后再组装到任务Job中。举个Flow的例子,在job包下新建FlowJobDemo类:

  1. @Component
  2. public class FlowJobDemo {
  3. @Autowired
  4. private JobBuilderFactory jobBuilderFactory;
  5. @Autowired
  6. private StepBuilderFactory stepBuilderFactory;
  7. @Bean
  8. public Job flowJob() {
  9. return jobBuilderFactory.get("flowJob")
  10. .start(flow())
  11. .next(step3())
  12. .end()
  13. .build();
  14. }
  15. private Step step1() {
  16. return stepBuilderFactory.get("step1")
  17. .tasklet((stepContribution, chunkContext) -> {
  18. System.out.println("执行步骤一操作。。。");
  19. return RepeatStatus.FINISHED;
  20. }).build();
  21. }
  22. private Step step2() {
  23. return stepBuilderFactory.get("step2")
  24. .tasklet((stepContribution, chunkContext) -> {
  25. System.out.println("执行步骤二操作。。。");
  26. return RepeatStatus.FINISHED;
  27. }).build();
  28. }
  29. private Step step3() {
  30. return stepBuilderFactory.get("step3")
  31. .tasklet((stepContribution, chunkContext) -> {
  32. System.out.println("执行步骤三操作。。。");
  33. return RepeatStatus.FINISHED;
  34. }).build();
  35. }
  36. // 创建一个flow对象,包含若干个step
  37. private Flow flow() {
  38. return new FlowBuilder<Flow>("flow")
  39. .start(step1())
  40. .next(step2())
  41. .build();
  42. }
  43. }

上面代码中,我们通过FlowBuilder将step1和step2组合在一起,创建了一个名为flow的Flow,然后再将其赋给任务Job。使用Flow和Step构建Job的区别是,Job流程中包含Flow类型的时候需要在build()方法前调用end()方法。

启动程序,控制台日志打印如下:

  1. 2020-03-06 14:36:42.621 INFO 18865 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=flowJob]] launched with the following parameters: [{}]
  2. 2020-03-06 14:36:42.667 INFO 18865 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
  3. 执行步骤一操作。。。
  4. 2020-03-06 14:36:42.697 INFO 18865 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step1] executed in 30ms
  5. 2020-03-06 14:36:42.744 INFO 18865 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step2]
  6. 执行步骤二操作。。。
  7. 2020-03-06 14:36:42.771 INFO 18865 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step2] executed in 27ms
  8. 2020-03-06 14:36:42.824 INFO 18865 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step3]
  9. 执行步骤三操作。。。
  10. 2020-03-06 14:36:42.850 INFO 18865 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step3] executed in 25ms
  11. 2020-03-06 14:36:42.874 INFO 18865 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=flowJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 245ms

并行执行

任务中的步骤除了可以串行执行(一个接着一个执行)外,还可以并行执行,并行执行在特定的业务需求下可以提供任务执行效率。

将任务并行化只需两个简单步骤:

  1. 将步骤Step转换为Flow;
  2. 任务Job中指定并行Flow。

举个例子,在job包下新建SplitJobDemo类:

  1. @Component
  2. public class SplitJobDemo {
  3. @Autowired
  4. private JobBuilderFactory jobBuilderFactory;
  5. @Autowired
  6. private StepBuilderFactory stepBuilderFactory;
  7. @Bean
  8. public Job splitJob() {
  9. return jobBuilderFactory.get("splitJob")
  10. .start(flow1())
  11. .split(new SimpleAsyncTaskExecutor()).add(flow2())
  12. .end()
  13. .build();
  14. }
  15. private Step step1() {
  16. return stepBuilderFactory.get("step1")
  17. .tasklet((stepContribution, chunkContext) -> {
  18. System.out.println("执行步骤一操作。。。");
  19. return RepeatStatus.FINISHED;
  20. }).build();
  21. }
  22. private Step step2() {
  23. return stepBuilderFactory.get("step2")
  24. .tasklet((stepContribution, chunkContext) -> {
  25. System.out.println("执行步骤二操作。。。");
  26. return RepeatStatus.FINISHED;
  27. }).build();
  28. }
  29. private Step step3() {
  30. return stepBuilderFactory.get("step3")
  31. .tasklet((stepContribution, chunkContext) -> {
  32. System.out.println("执行步骤三操作。。。");
  33. return RepeatStatus.FINISHED;
  34. }).build();
  35. }
  36. private Flow flow1() {
  37. return new FlowBuilder<Flow>("flow1")
  38. .start(step1())
  39. .next(step2())
  40. .build();
  41. }
  42. private Flow flow2() {
  43. return new FlowBuilder<Flow>("flow2")
  44. .start(step3())
  45. .build();
  46. }
  47. }

上面例子中,我们创建了两个Flow:flow1(包含step1和step2)和flow2(包含step3)。然后通过JobBuilderFactorysplit方法,指定一个异步执行器,将flow1和flow2异步执行(也就是并行)。

启动项目,控制台日志打印如下:

  1. 2020-03-06 15:25:43.602 INFO 19449 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=splitJob]] launched with the following parameters: [{}]
  2. 2020-03-06 15:25:43.643 INFO 19449 --- [cTaskExecutor-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [step3]
  3. 2020-03-06 15:25:43.650 INFO 19449 --- [cTaskExecutor-2] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
  4. 执行步骤三操作。。。
  5. 执行步骤一操作。。。
  6. 2020-03-06 15:25:43.673 INFO 19449 --- [cTaskExecutor-2] o.s.batch.core.step.AbstractStep : Step: [step1] executed in 23ms
  7. 2020-03-06 15:25:43.674 INFO 19449 --- [cTaskExecutor-1] o.s.batch.core.step.AbstractStep : Step: [step3] executed in 31ms
  8. 2020-03-06 15:25:43.714 INFO 19449 --- [cTaskExecutor-2] o.s.batch.core.job.SimpleStepHandler : Executing step: [step2]
  9. 执行步骤二操作。。。
  10. 2020-03-06 15:25:43.738 INFO 19449 --- [cTaskExecutor-2] o.s.batch.core.step.AbstractStep : Step: [step2] executed in 24ms
  11. 2020-03-06 15:25:43.758 INFO 19449 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=splitJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 146ms

可以看到step3并没有在step2后才执行,说明步骤已经是并行化的(开启并行化后,并行的步骤执行顺序并不能100%确定,因为线程调度具有不确定性)。

任务决策器

决策器的作用就是可以指定程序在不同的情况下运行不同的任务流程,比如今天是周末,则让任务执行step1和step2,如果是工作日,则之心step1和step3。

使用决策器前,我们需要自定义一个决策器的实现。在cc.mrbird.batch包下新建decider包,然后创建MyDecider类,实现JobExecutionDecider接口:

  1. @Component
  2. public class MyDecider implements JobExecutionDecider {
  3. @Override
  4. public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
  5. LocalDate now = LocalDate.now();
  6. DayOfWeek dayOfWeek = now.getDayOfWeek();
  7. if (dayOfWeek == DayOfWeek.SATURDAY || dayOfWeek == DayOfWeek.SUNDAY) {
  8. return new FlowExecutionStatus("weekend");
  9. } else {
  10. return new FlowExecutionStatus("workingDay");
  11. }
  12. }
  13. }

MyDecider实现JobExecutionDecider接口的decide方法,该方法返回FlowExecutionStatus。上面的逻辑是:判断今天是否是周末,如果是,返回FlowExecutionStatus("weekend")状态,否则返回FlowExecutionStatus("workingDay")状态。

下面演示如何在任务Job里使用决策器。在job包下新建DeciderJobDemo

  1. @Component
  2. public class DeciderJobDemo {
  3. @Autowired
  4. private JobBuilderFactory jobBuilderFactory;
  5. @Autowired
  6. private StepBuilderFactory stepBuilderFactory;
  7. @Autowired
  8. private MyDecider myDecider;
  9. @Bean
  10. public Job deciderJob() {
  11. return jobBuilderFactory.get("deciderJob")
  12. .start(step1())
  13. .next(myDecider)
  14. .from(myDecider).on("weekend").to(step2())
  15. .from(myDecider).on("workingDay").to(step3())
  16. .from(step3()).on("*").to(step4())
  17. .end()
  18. .build();
  19. }
  20. private Step step1() {
  21. return stepBuilderFactory.get("step1")
  22. .tasklet((stepContribution, chunkContext) -> {
  23. System.out.println("执行步骤一操作。。。");
  24. return RepeatStatus.FINISHED;
  25. }).build();
  26. }
  27. private Step step2() {
  28. return stepBuilderFactory.get("step2")
  29. .tasklet((stepContribution, chunkContext) -> {
  30. System.out.println("执行步骤二操作。。。");
  31. return RepeatStatus.FINISHED;
  32. }).build();
  33. }
  34. private Step step3() {
  35. return stepBuilderFactory.get("step3")
  36. .tasklet((stepContribution, chunkContext) -> {
  37. System.out.println("执行步骤三操作。。。");
  38. return RepeatStatus.FINISHED;
  39. }).build();
  40. }
  41. private Step step4() {
  42. return stepBuilderFactory.get("step4")
  43. .tasklet((stepContribution, chunkContext) -> {
  44. System.out.println("执行步骤四操作。。。");
  45. return RepeatStatus.FINISHED;
  46. }).build();
  47. }

上面代码中,我们注入了自定义决策器MyDecider,然后在jobDecider()方法里使用了该决策器:

  1. @Bean
  2. public Job deciderJob() {
  3. return jobBuilderFactory.get("deciderJob")
  4. .start(step1())
  5. .next(myDecider)
  6. .from(myDecider).on("weekend").to(step2())
  7. .from(myDecider).on("workingDay").to(step3())
  8. .from(step3()).on("*").to(step4())
  9. .end()
  10. .build();
  11. }

这段代码的含义是:任务deciderJob首先执行step1,然后指定自定义决策器,如果决策器返回weekend,那么执行step2,如果决策器返回workingDay,那么执行step3。如果执行了step3,那么无论step3的结果是什么,都将执行step4。

启动项目,控制台输出如下所示:

  1. 2020-03-06 16:09:10.541 INFO 19873 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=deciderJob]] launched with the following parameters: [{}]
  2. 2020-03-06 16:09:10.609 INFO 19873 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
  3. 执行步骤一操作。。。
  4. 2020-03-06 16:09:10.641 INFO 19873 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step1] executed in 32ms
  5. 2020-03-06 16:09:10.692 INFO 19873 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step3]
  6. 执行步骤三操作。。。
  7. 2020-03-06 16:09:10.723 INFO 19873 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step3] executed in 31ms
  8. 2020-03-06 16:09:10.769 INFO 19873 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step4]
  9. 执行步骤四操作。。。
  10. 2020-03-06 16:09:10.797 INFO 19873 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step4] executed in 27ms
  11. 2020-03-06 16:09:10.818 INFO 19873 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=deciderJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 256ms

因为今天是2020年03月06日星期五,是工作日,所以任务执行了step1、step3和step4。

任务嵌套

任务Job除了可以由Step或者Flow构成外,我们还可以将多个任务Job转换为特殊的Step,然后再赋给另一个任务Job,这就是任务的嵌套。

举个例子,在job包下新建NestedJobDemo类:

  1. @Component
  2. public class NestedJobDemo {
  3. @Autowired
  4. private JobBuilderFactory jobBuilderFactory;
  5. @Autowired
  6. private StepBuilderFactory stepBuilderFactory;
  7. @Autowired
  8. private JobLauncher jobLauncher;
  9. @Autowired
  10. private JobRepository jobRepository;
  11. @Autowired
  12. private PlatformTransactionManager platformTransactionManager;
  13. // 父任务
  14. @Bean
  15. public Job parentJob() {
  16. return jobBuilderFactory.get("parentJob")
  17. .start(childJobOneStep())
  18. .next(childJobTwoStep())
  19. .build();
  20. }
  21. // 将任务转换为特殊的步骤
  22. private Step childJobOneStep() {
  23. return new JobStepBuilder(new StepBuilder("childJobOneStep"))
  24. .job(childJobOne())
  25. .launcher(jobLauncher)
  26. .repository(jobRepository)
  27. .transactionManager(platformTransactionManager)
  28. .build();
  29. }
  30. // 将任务转换为特殊的步骤
  31. private Step childJobTwoStep() {
  32. return new JobStepBuilder(new StepBuilder("childJobTwoStep"))
  33. .job(childJobTwo())
  34. .launcher(jobLauncher)
  35. .repository(jobRepository)
  36. .transactionManager(platformTransactionManager)
  37. .build();
  38. }
  39. // 子任务一
  40. private Job childJobOne() {
  41. return jobBuilderFactory.get("childJobOne")
  42. .start(
  43. stepBuilderFactory.get("childJobOneStep")
  44. .tasklet((stepContribution, chunkContext) -> {
  45. System.out.println("子任务一执行步骤。。。");
  46. return RepeatStatus.FINISHED;
  47. }).build()
  48. ).build();
  49. }
  50. // 子任务二
  51. private Job childJobTwo() {
  52. return jobBuilderFactory.get("childJobTwo")
  53. .start(
  54. stepBuilderFactory.get("childJobTwoStep")
  55. .tasklet((stepContribution, chunkContext) -> {
  56. System.out.println("子任务二执行步骤。。。");
  57. return RepeatStatus.FINISHED;
  58. }).build()
  59. ).build();
  60. }
  61. }

上面代码中,我们通过childJobOne()childJobTwo()方法创建了两个任务Job,这里没什么好说的,前面都介绍过。关键在于childJobOneStep()方法和childJobTwoStep()方法。在childJobOneStep()方法中,我们通过JobStepBuilder构建了一个名称为childJobOneStep的Step,顾名思义,它是一个任务型Step的构造工厂,可以将任务转换为“特殊”的步骤。在构建过程中,我们还需要传入任务执行器JobLauncher、任务仓库JobRepository和事务管理器PlatformTransactionManager。

将任务转换为特殊的步骤后,将其赋给父任务parentJob即可,流程和前面介绍的一致。

配置好后,启动项目,控制台输出如下所示:

  1. 2020-03-06 16:58:39.771 INFO 21588 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=parentJob]] launched with the following parameters: [{}]
  2. 2020-03-06 16:58:39.812 INFO 21588 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [childJobOneStep]
  3. 2020-03-06 16:58:39.866 INFO 21588 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=childJobOne]] launched with the following parameters: [{}]
  4. 2020-03-06 16:58:39.908 INFO 21588 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [childJobOneStep]
  5. 子任务一执行步骤。。。
  6. 2020-03-06 16:58:39.940 INFO 21588 --- [ main] o.s.batch.core.step.AbstractStep : Step: [childJobOneStep] executed in 32ms
  7. 2020-03-06 16:58:39.960 INFO 21588 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=childJobOne]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 86ms
  8. 2020-03-06 16:58:39.983 INFO 21588 --- [ main] o.s.batch.core.step.AbstractStep : Step: [childJobOneStep] executed in 171ms
  9. 2020-03-06 16:58:40.019 INFO 21588 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [childJobTwoStep]
  10. 2020-03-06 16:58:40.067 INFO 21588 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=childJobTwo]] launched with the following parameters: [{}]
  11. 2020-03-06 16:58:40.102 INFO 21588 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [childJobTwoStep]
  12. 子任务二执行步骤。。。
  13. 2020-03-06 16:58:40.130 INFO 21588 --- [ main] o.s.batch.core.step.AbstractStep : Step: [childJobTwoStep] executed in 28ms
  14. 2020-03-06 16:58:40.152 INFO 21588 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=childJobTwo]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 75ms
  15. 2020-03-06 16:58:40.157 INFO 21588 --- [ main] o.s.batch.core.step.AbstractStep : Step: [childJobTwoStep] executed in 138ms
  16. 2020-03-06 16:58:40.177 INFO 21588 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=parentJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 398ms