原文: https://howtodoinjava.com/spring-batch/records-count-example/
了解如何使用ItemStream
和ChunkListener
计数 Spring Batch 作业处理的记录数,并将记录数记录在日志文件或控制台中。
使用ItemStream
计数记录
在给定的ItemStream
实现下方,计算定期处理的记录数。
ItemCountItemStream.java
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
public class ItemCountItemStream implements ItemStream {
public void open(ExecutionContext executionContext) throws ItemStreamException {
}
public void update(ExecutionContext executionContext) throws ItemStreamException {
System.out.println("ItemCount: "+executionContext.get("FlatFileItemReader.read.count"));
}
public void close() throws ItemStreamException {
}
}
如何使用ItemCountItemStream
在Tasklet
中使用SimpleStepBuilder.stream()
方法注册上面创建的ItemCountItemStream
。
BatchConfig.java
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job readCSVFilesJob() {
return jobBuilderFactory
.get("readCSVFilesJob")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory
.get("step1")
.<Employee, Employee>chunk(1)
.reader(reader())
.writer(writer())
.stream(stream())
.build();
}
@Bean
public ItemCountItemStream stream() {
return new ItemCountItemStream();
}
使用ChunkListener
计数记录
在给定的ChunkListener
实现下方,计算定期处理的记录数。
ItemCountListener.java
import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.scope.context.ChunkContext;
public class ItemCountListener implements ChunkListener {
@Override
public void beforeChunk(ChunkContext context) {
}
@Override
public void afterChunk(ChunkContext context) {
int count = context.getStepContext().getStepExecution().getReadCount();
System.out.println("ItemCount: " + count);
}
@Override
public void afterChunkError(ChunkContext context) {
}
}
如何使用ItemCountListener
在Tasklet
中使用SimpleStepBuilder.listener()
方法注册上面创建的ItemCountListener
。
BatchConfig.java
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job readCSVFilesJob() {
return jobBuilderFactory
.get("readCSVFilesJob")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory
.get("step1")
.<Employee, Employee>chunk(1)
.reader(reader())
.writer(writer())
.listener(listener())
.build();
}
@Bean
public ItemCountListener listener() {
return new ItemCountListener();
}
计数记录演示
我正在使用上述ItemCountListener
配置来处理此 CSV。
inputData.csv
id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
4,David,Miller
5,David,Walsh
完整的批处理配置如下所示:
BatchConfig.java
package com.howtodoinjava.demo.config;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import com.howtodoinjava.demo.model.Employee;
@Configuration
@EnableBatchProcessing
public class BatchConfig
{
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Value("/input/inputData.csv")
private Resource inputResource;
@Bean
public Job readCSVFilesJob() {
return jobBuilderFactory
.get("readCSVFilesJob")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory
.get("step1")
.<Employee, Employee>chunk(1)
.reader(reader())
.writer(writer())
.listener(listner())
.build();
}
@Bean
public ItemCountListener listner() {
return new ItemCountListener();
}
@Bean
public FlatFileItemReader<Employee> reader() {
FlatFileItemReader<Employee> itemReader = new FlatFileItemReader<Employee>();
itemReader.setLineMapper(lineMapper());
itemReader.setLinesToSkip(1);
itemReader.setResource(inputResource);
return itemReader;
}
@Bean
public LineMapper<Employee> lineMapper() {
DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<Employee>();
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setNames(new String[] { "id", "firstName", "lastName" });
lineTokenizer.setIncludedFields(new int[] { 0, 1, 2 });
BeanWrapperFieldSetMapper<Employee> fieldSetMapper = new BeanWrapperFieldSetMapper<Employee>();
fieldSetMapper.setTargetType(Employee.class);
lineMapper.setLineTokenizer(lineTokenizer);
lineMapper.setFieldSetMapper(fieldSetMapper);
return lineMapper;
}
@Bean
public ConsoleItemWriter<Employee> writer() {
return new ConsoleItemWriter<Employee>();
}
}
作为 Spring 启动应用程序启动该应用程序。 Spring 任务调度器将开始工作。
App.java
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
@SpringBootApplication
@EnableScheduling
public class App
{
@Autowired
JobLauncher jobLauncher;
@Autowired
Job job;
public static void main(String[] args)
{
SpringApplication.run(App.class, args);
}
@Scheduled(cron = "0 */1 * * * ?")
public void perform() throws Exception
{
JobParameters params = new JobParametersBuilder()
.addString("JobID", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
jobLauncher.run(job, params);
}
}
现在观看控制台。
Console
2018-07-11 16:38:00 INFO - Job: [SimpleJob: [name=readCSVFilesJob]] launched with the following parameters: [{JobID=1531307280004}]
2018-07-11 16:38:00 INFO - Executing step: [step1]
Employee [id=1, firstName=Lokesh, lastName=Gupta]
ItemCount: 1
Employee [id=2, firstName=Amit, lastName=Mishra]
ItemCount: 2
Employee [id=3, firstName=Pankaj, lastName=Kumar]
ItemCount: 3
Employee [id=4, firstName=David, lastName=Miller]
ItemCount: 4
Employee [id=5, firstName=David, lastName=Walsh]
ItemCount: 5
ItemCount: 5
2018-07-11 16:38:00 INFO - Job: [SimpleJob: [name=readCSVFilesJob]] completed with the following parameters: [{JobID=1531307280004}] and the following status: [COMPLETED]
将我的问题放在评论部分。
学习愉快!
参考: