Spring Batch处理任务过程中如果发生了异常,默认机制是马上停止任务执行,抛出相应异常,如果任务还包含未执行的步骤也不会被执行。要改变这个默认规则,我们可以配置异常重试和异常跳过机制。异常跳过:遇到异常的时候不希望结束任务,而是跳过这个异常,继续执行;异常重试:遇到异常的时候经过指定次数的重试,如果还是失败的话,才会停止任务。除了这两个特性外,本文也会记录一些别的特性。
框架搭建
新建一个Spring Boot项目,版本为2.2.4.RELEASE,artifactId为spring-batch-exception,项目结构如下图所示:
剩下的数据库层的准备,项目配置,依赖引入和Spring Batch入门文章中的框架搭建步骤一致,这里就不再赘述。
下面我们演示下,默认情况下Spring Batch处理任务遇到异常是怎么处理的。
在cc.mrbird.batch目录下新建job包,然后在该包下新建DefaultExceptionJobDemo
:
@Component
public class DefaultExceptionJobDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job defaultExceptionJob() {
return jobBuilderFactory.get("defaultExceptionJob")
.start(
stepBuilderFactory.get("step")
.tasklet((stepContribution, chunkContext) -> {
// 获取执行上下文
ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();
if (executionContext.containsKey("success")) {
System.out.println("任务执行成功");
return RepeatStatus.FINISHED;
} else {
String errorMessage = "处理任务过程发生异常";
System.out.println(errorMessage);
executionContext.put("success", true);
throw new RuntimeException(errorMessage);
}
}).build()
).build();
}
}
上面代码中,我们在Step的tasklet()
方法中获取了执行上下文,并且判断执行上下文中是否包含keysuccess
,如果包含,则任务执行成功;如果不包含,则抛出异常(抛出异常前,在执行上下文中添加success
key)。
启动项目,控制台日志打印如下:
2020-03-11 17:12:50.253 INFO 38673 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=defaultExceptionJob]] launched with the following parameters: [{}]
2020-03-11 17:12:50.323 INFO 38673 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step]
处理任务过程发生异常
2020-03-11 17:12:50.352 ERROR 38673 --- [ main] o.s.batch.core.step.AbstractStep : Encountered an error executing step step in job defaultExceptionJob
java.lang.RuntimeException: 处理任务过程发生异常
at cc.mrbird.batch.job.DefaultExceptionJobDemo.lambda$defaultExceptionJob$0(DefaultExceptionJobDemo.java:38) ~[classes/:na]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
...
可以看到,默认情况下,Spring Batch处理任务过程中如果发生了异常会马上停止任务的执行。
再次启动项目,控制台输出如下:
2020-03-11 17:14:03.184 INFO 38691 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=defaultExceptionJob]] launched with the following parameters: [{}]
2020-03-11 17:14:03.264 INFO 38691 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step]
任务执行成功
2020-03-11 17:14:03.302 INFO 38691 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step] executed in 37ms
2020-03-11 17:14:03.326 INFO 38691 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=defaultExceptionJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 120ms
因为在上次任务抛出异常前,我们在执行上下文中添加success
key(配合MySQL持久化,不会因项目启动而丢失)。
异常重试
Spring Batch允许我们配置任务在遇到指定异常时进行指定次数的重试。在此之前,我们先定义一个自定义异常。在cc.mrbird.batch包下新建exception包,然后在该包下新建MyJobExecutionException
:
public class MyJobExecutionException extends Exception{
private static final long serialVersionUID = 7168487913507656106L;
public MyJobExecutionException(String message) {
super(message);
}
}
然后在job包下新建RetryExceptionJobDemo
:
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job retryExceptionJob() {
return jobBuilderFactory.get("retryExceptionJob")
.start(step())
.build();
}
private Step step() {
return stepBuilderFactory.get("step")
.<String, String>chunk(2)
.reader(listItemReader())
.processor(myProcessor())
.writer(list -> list.forEach(System.out::println))
.faultTolerant() // 配置错误容忍
.retry(MyJobExecutionException.class) // 配置重试的异常类型
.retryLimit(3) // 重试3次,三次过后还是异常的话,则任务会结束,
// 异常的次数为reader,processor和writer中的总数,这里仅在processor里演示异常重试
.build();
}
private ListItemReader<String> listItemReader() {
ArrayList<String> datas = new ArrayList<>();
IntStream.range(0, 5).forEach(i -> datas.add(String.valueOf(i)));
return new ListItemReader<>(datas);
}
private ItemProcessor<String, String> myProcessor() {
return new ItemProcessor<String, String>() {
private int count;
@Override
public String process(String item) throws Exception {
System.out.println("当前处理的数据:" + item);
if (count >= 2) {
return item;
} else {
count++;
throw new MyJobExecutionException("任务处理出错");
}
}
};
}
在step()
方法中,faultTolerant()
表示开启容错功能,retry(MyJobExecutionException.class)
表示遇到MyJobExecutionException
异常时进行重试,retryLimit(3)
表示如果第三次重试还是失败的话,则抛出异常,结束任务。
通过前面的学习我们知道,步骤Step包括ItemReader
、ItemWriter
和ItemProcessor
,上面配置的错误容忍是针对整个Step的,所以容忍的异常次数应该是reader,processor和writer中的总数,上面的例子仅在processor里演示异常重试。
myProcessor()
的代码逻辑很简单,就是在前两次的时候抛出MyJobExecutionException("任务处理出错")
异常(count < 2
),第三次的时候正常返回item(count = 2 >= 2
),所以理论上上面的任务在重试两次之后正常运行。
启动项目,控制台打印日志如下:
2020-03-12 09:04:53.359 INFO 40522 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=retryExceptionJob]] launched with the following parameters: [{}]
2020-03-12 09:04:53.415 INFO 40522 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step]
当前处理的数据:0
当前处理的数据:0
当前处理的数据:0
当前处理的数据:1
0
1
当前处理的数据:2
当前处理的数据:3
2
3
当前处理的数据:4
4
2020-03-12 09:04:53.498 INFO 40522 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step] executed in 83ms
2020-03-12 09:04:53.522 INFO 40522 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=retryExceptionJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 152ms
结果符合我们的预期。
假如通过retryLimit(2)
将重试次数设置为2,并修改任务的名称为retryExceptionJob1
,启动项目看看运行结果如何:
v2020-03-12 09:06:48.855 INFO 40610 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=retryExceptionJob1]] launched with the following parameters: [{}]
2020-03-12 09:06:48.933 INFO 40610 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step]
当前处理的数据:0
当前处理的数据:0
2020-03-12 09:06:48.979 ERROR 40610 --- [ main] o.s.batch.core.step.AbstractStep : Encountered an error executing step step in job retryExceptionJob1
org.springframework.retry.RetryException: Non-skippable exception in recoverer while processing; nested exception is cc.mrbird.batch.exception.MyJobExecutionException: 任务处理出错
at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor$2.recover(FaultTolerantChunkProcessor.java:289) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:512) ~[spring-retry-1.2.5.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:351) ~[spring-retry-1.2.5.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211) ~[spring-retry-1.2.5.RELEASE.jar:na]
at org.springframework.batch.core.step.item.BatchRetryTemplate.execute(BatchRetryTemplate.java:217) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.transform(FaultTolerantChunkProcessor.java:298) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:210) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_231]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_231]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_231]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_231]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) [spring-aop-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) [spring-aop-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at com.sun.proxy.$Proxy46.run(Unknown Source) [na:na]
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.execute(JobLauncherCommandLineRunner.java:192) [spring-boot-autoconfigure-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.executeLocalJobs(JobLauncherCommandLineRunner.java:166) [spring-boot-autoconfigure-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.launchJobFromProperties(JobLauncherCommandLineRunner.java:153) [spring-boot-autoconfigure-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.run(JobLauncherCommandLineRunner.java:148) [spring-boot-autoconfigure-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:784) [spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:768) [spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) [spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) [spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at cc.mrbird.batch.SpringBatchExceptionApplication.main(SpringBatchExceptionApplication.java:12) [classes/:na]
Caused by: cc.mrbird.batch.exception.MyJobExecutionException: 任务处理出错
at cc.mrbird.batch.job.RetryExceptionJobDemo$1.process(RetryExceptionJobDemo.java:64) ~[classes/:na]
at cc.mrbird.batch.job.RetryExceptionJobDemo$1.process(RetryExceptionJobDemo.java:55) ~[classes/:na]
at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:134) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor$1.doWithRetry(FaultTolerantChunkProcessor.java:233) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.5.RELEASE.jar:na]
... 43 common frames omitted
2020-03-12 09:06:48.989 INFO 40610 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step] executed in 56ms
2020-03-12 09:06:49.019 INFO 40610 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=retryExceptionJob1]] completed with the following parameters: [{}] and the following status: [FAILED] in 152ms
异常次数超过了重试次数,所以抛出了异常。
异常跳过
我们也可以在Step中配置异常跳过,即遇到指定类型异常时忽略跳过它,在job包下新建SkipExceptionJobDemo
:
@Component
public class SkipExceptionJobDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job skipExceptionJob() {
return jobBuilderFactory.get("skipExceptionJob")
.start(step())
.build();
}
private Step step() {
return stepBuilderFactory.get("step")
.<String, String>chunk(2)
.reader(listItemReader())
.processor(myProcessor())
.writer(list -> list.forEach(System.out::println))
.faultTolerant() // 配置错误容忍
.skip(MyJobExecutionException.class) // 配置跳过的异常类型
.skipLimit(1) // 最多跳过1次,1次过后还是异常的话,则任务会结束,
// 异常的次数为reader,processor和writer中的总数,这里仅在processor里演示异常跳过
.build();
}
private ListItemReader<String> listItemReader() {
ArrayList<String> datas = new ArrayList<>();
IntStream.range(0, 5).forEach(i -> datas.add(String.valueOf(i)));
return new ListItemReader<>(datas);
}
private ItemProcessor<String, String> myProcessor() {
return item -> {
System.out.println("当前处理的数据:" + item);
if ("2".equals(item)) {
throw new MyJobExecutionException("任务处理出错");
} else {
return item;
}
};
}
}
在step()
方法中,faultTolerant()
表示开启容错功能,skip(MyJobExecutionException.class)
表示遇到MyJobExecutionException
异常时跳过,skipLimit(1)
表示只跳过一次。
myProcessor()
的逻辑是,当处理的item值为”2“的时候,抛出MyJobExecutionException("任务处理出错")
异常。
此外我们还可以配置SkipListener
类型监听器,在cc.mrbird.batch包下新建listener包,然后在该包下新建MySkipListener
:
@Component
public class MySkipListener implements SkipListener<String, String> {
@Override
public void onSkipInRead(Throwable t) {
System.out.println("在读取数据的时候遇到异常并跳过,异常:" + t.getMessage());
}
@Override
public void onSkipInWrite(String item, Throwable t) {
System.out.println("在输出数据的时候遇到异常并跳过,待输出数据:" + item + ",异常:" + t.getMessage());
}
@Override
public void onSkipInProcess(String item, Throwable t) {
System.out.println("在处理数据的时候遇到异常并跳过,待输出数据:" + item + ",异常:" + t.getMessage());
}
}
然后将它注入到SkipExceptionJobDemo
,并配置:
@Component
public class SkipExceptionJobDemo {
....
@Autowired
private MySkipListener mySkipListener;
....
private Step step() {
return stepBuilderFactory.get("step")
.<String, String>chunk(2)
.reader(listItemReader())
.processor(myProcessor())
.writer(list -> list.forEach(System.out::println))
.faultTolerant() // 配置错误容忍
.skip(MyJobExecutionException.class) // 配置跳过的异常类型
.skipLimit(1) // 最多跳过1次,1次过后还是异常的话,则任务会结束,
// 异常的次数为reader,processor和writer中的总数,这里仅在processor里演示异常跳过
.listener(mySkipListener)
.build();
}
....
}
启动项目,控制台日志打印如下:
2020-03-12 09:23:33.528 INFO 40759 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=skipExceptionJob]] launched with the following parameters: [{}]
2020-03-12 09:23:33.664 INFO 40759 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step]
当前处理的数据:0
当前处理的数据:1
0
1
当前处理的数据:2
当前处理的数据:3
3
在处理数据的时候遇到异常并跳过,待输出数据:2,异常:任务处理出错
当前处理的数据:4
4
2020-03-12 09:23:33.854 INFO 40759 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step] executed in 190ms
2020-03-12 09:23:33.885 INFO 40759 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=skipExceptionJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 324ms
skip逻辑实现方式概述
所谓skip是指的在spring batch的一个step当中,可以通过skip相关的方法提供一些容错机制。
在许多情况下,在spring batch在执行job过程中遇到的错误不应导致一个step的失败,而是可以选择跳过-skip。 至于怎么决定是否跳过某一个exception则通常是必须由了解数据本身及其含义的人来决定。 例如,财务数据往往是无法skip的,因为财务数据的背后是资金的转移,需要保证完全准确。然而某一些操作可能是可以允许跳过的,例如如果由于格式不正确或缺少必要信息而未加载某些无关紧要的内容,则并不会出现问题。
skip的用法
下面的代码是使用java config的一个step配置的例子
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(flatFileItemReader())
.writer(itemWriter())
.faultTolerant()
.skipLimit(10)
.skip(FlatFileParseException.class)
.build();
}
在上面的代码中,使用了FlatFileItemReader。 在整个step的执行过程当中任何时候抛出FlatFileParseException,则跳过该次异常并计算总跳过次数是否超过了配置的skiplimit上限10. 在step的执行过程当中,read,process和write中出的错误都会计数,且与skip limit进行比较。 当异常次数达到skiplimit限制的次数之后,若再次出现该异常则会导致step失败。 换句话说,在上面的这个例子当中,第十一次遇到FlatFileParseExcetion时才会触发异常导致失败,而不是第十次。
前面示例的一个问题是除了FlatFileParseException之外,任何其他异常只要遇到一次都会导致Job失败。 在某些情况下,这可能是正确的行为。 但是,在其他情况下,更有用的场景可能是识别哪些异常应该导致step失败,而跳过其他所有异常,下面的示例实现了这样的功能:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(flatFileItemReader())
.writer(itemWriter())
.faultTolerant()
.skipLimit(10)
.skip(Exception.class)
.noSkip(FileNotFoundException.class)
.build();
}
上面的代码将java.lang.Exception标识为可跳过的异常类,则意味着对于这个step来说所有类型的异常都是可跳过的。 但是,它同时通过noskip方法’排除’了java.io.FileNotFoundException,此时这个step将可跳过的异常类list细化为除了FileNotFoundException之外的所有异常。 那么遇到任何被排除了的的异常类都会导致step失败(也就是说,它们不会被跳过).
对于遇到的任何异常,是否可跳过由该异常类的类层次结构中最近的父类确定。 任何未分类的异常都被视为“会导致失败”。
同时,skip和noSkip调用的顺序对结果没有影响。
另外,按照上述代码我们是使用的默认的skip策略,我们可以自己实现自己的skip策略并且配置在step当中,实现skip策略仅需要实现SkipPolicy接口即可.
skip逻辑的实现原理
skip在spring的一个step中的工作原理大致如下图所示:
上图展示的是在process过程中出现异常的处理流程。从上图我们不难发现,skip的实现是基于事务的支持的,在step当中若遇到一个可被跳过的exception,则当前的transaction会回滚。spring batch会把reader读取到的item缓存下来,因此,当某一天记录出错导致skip的时候,则这条记录就会从cache当中被删除掉。
紧接着,spring batch会重新开启一个事务,并且这个事务处理的数据是去除掉skip记录的cache数据。如果配置了SkipListener,则在提交一个chunk之前,SkipListener的onSkipInProcess方法会被调用。
若配置了跳过的次数限制,则在每一次遇到skippable 的exception时,计数器都会加一,当达到限制时step就会fail。
但是在read或者write过程中出现异常则情况会更加复杂。
以writer为例,writer对于所有的chunk里的数据只会调用一次。因此spring无法知道到底是chunk里的哪个item导致了异常。要找出来是哪一条记录出错,唯一方法是将chunk分成只包含一条记录的小块。如下图所示:
在把chunk拆分开之后会再次进入循环,在图中用红色表示。对于reader当中读取到的缓存的数据中的每个item,都将开启一个自己的事务。在开启事务之后,依然先由ItemProcessor处理,然后再由ItemWriter写入。 如果在这个过程当中没有错误,则提交这个只包含一条记录的迷你chunk,并且循环迭代继续下一个item。
在上诉过程当中应当至少有一个可跳过的异常,当遇到可跳过异常时,事务将被回滚并且该条数据被标记为跳过的项。当所有的数据遍历处理完之后,我们继续正常的chunk处理流程。
此外,我们通过将chunk上的属性transactional设置为false(默认为true),将它定义为非事务性的。 如果这样做,Spring Batch会缓存已处理的项目,并且在写入失败时不重新执行ItemProcessor。 如果在process阶段没有与事务资源的交互,则可以执这样设置,否则在写入失败时springbatch就不会重新执行process逻辑。
事务问题
一次Setp分为Reader、Processor和Writer三个阶段,这些阶段统称为Item。默认情况下如果错误不是发生在Reader阶段,那么没必要再去重新读取一次数据。但是某些场景下需要Reader部分也需要重新执行,比如Reader是从一个JMS队列中消费消息,当发生回滚的时候消息也会在队列上重放,因此也要将Reader纳入到回滚的事物中,根据这个场景可以使用readerIsTransactionalQueue()
来配置数据重读:
private Step step() {
return stepBuilderFactory.get("step")
.<String, String>chunk(2)
.reader(listItemReader())
.writer(list -> list.forEach(System.out::println))
.readerIsTransactionalQueue() // 消息队列数据重读
.build();
}
我们还可以在Step中手动配置事务属性,事物的属性包括隔离等级(isolation)、传播方式(propagation)以及过期时间(timeout)等:
private Step step() {
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setPropagationBehavior(Propagation.REQUIRED.value());
attribute.setIsolationLevel(Isolation.DEFAULT.value());
attribute.setTimeout(30);
return stepBuilderFactory.get("step")
.<String, String>chunk(2)
.reader(listItemReader())
.writer(list -> list.forEach(System.out::println))
.transactionAttribute(attribute)
.build();
}
重启机制
默认情况下,任务执行完毕的状态为COMPLETED
,再次启动项目,该任务的Step不会再执行,我们可以通过配置allowStartIfComplete(true)
来实现每次项目重新启动都将执行这个Step:
private Step step() {
return stepBuilderFactory.get("step")
.<String, String>chunk(2)
.reader(listItemReader())
.writer(list -> list.forEach(System.out::println))
.allowStartIfComplete(true)
.build();
}
某些Step可能用于处理一些先决的任务,所以当Job再次重启时这Step就没必要再执行,可以通过设置startLimit()
来限定某个Step重启的次数。当设置为1时候表示仅仅运行一次,而出现重启时将不再执行:
private Step step() {
return stepBuilderFactory.get("step")
.<String, String>chunk(2)
.reader(listItemReader())
.writer(list -> list.forEach(System.out::println))
.startLimit(1)
.build();
}
部分内容参考自:https://blog.csdn.net/sswltt/article/details/103817645