原文: https://howtodoinjava.com/spring-batch/records-count-example/
了解如何使用ItemStream和ChunkListener计数 Spring Batch 作业处理的记录数,并将记录数记录在日志文件或控制台中。
使用ItemStream计数记录
在给定的ItemStream实现下方,计算定期处理的记录数。
ItemCountItemStream.java
import org.springframework.batch.item.ExecutionContext;import org.springframework.batch.item.ItemStream;import org.springframework.batch.item.ItemStreamException;public class ItemCountItemStream implements ItemStream {public void open(ExecutionContext executionContext) throws ItemStreamException {}public void update(ExecutionContext executionContext) throws ItemStreamException {System.out.println("ItemCount: "+executionContext.get("FlatFileItemReader.read.count"));}public void close() throws ItemStreamException {}}
如何使用ItemCountItemStream
在Tasklet中使用SimpleStepBuilder.stream()方法注册上面创建的ItemCountItemStream。
BatchConfig.java
@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Job readCSVFilesJob() {return jobBuilderFactory.get("readCSVFilesJob").incrementer(new RunIdIncrementer()).start(step1()).build();}@Beanpublic Step step1() {return stepBuilderFactory.get("step1").<Employee, Employee>chunk(1).reader(reader()).writer(writer()).stream(stream()).build();}@Beanpublic ItemCountItemStream stream() {return new ItemCountItemStream();}
使用ChunkListener计数记录
在给定的ChunkListener实现下方,计算定期处理的记录数。
ItemCountListener.java
import org.springframework.batch.core.ChunkListener;import org.springframework.batch.core.scope.context.ChunkContext;public class ItemCountListener implements ChunkListener {@Overridepublic void beforeChunk(ChunkContext context) {}@Overridepublic void afterChunk(ChunkContext context) {int count = context.getStepContext().getStepExecution().getReadCount();System.out.println("ItemCount: " + count);}@Overridepublic void afterChunkError(ChunkContext context) {}}
如何使用ItemCountListener
在Tasklet中使用SimpleStepBuilder.listener()方法注册上面创建的ItemCountListener。
BatchConfig.java
@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Job readCSVFilesJob() {return jobBuilderFactory.get("readCSVFilesJob").incrementer(new RunIdIncrementer()).start(step1()).build();}@Beanpublic Step step1() {return stepBuilderFactory.get("step1").<Employee, Employee>chunk(1).reader(reader()).writer(writer()).listener(listener()).build();}@Beanpublic ItemCountListener listener() {return new ItemCountListener();}
计数记录演示
我正在使用上述ItemCountListener配置来处理此 CSV。
inputData.csv
id,firstName,lastName1,Lokesh,Gupta2,Amit,Mishra3,Pankaj,Kumar4,David,Miller5,David,Walsh
完整的批处理配置如下所示:
BatchConfig.java
package com.howtodoinjava.demo.config;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.core.launch.support.RunIdIncrementer;import org.springframework.batch.item.file.FlatFileItemReader;import org.springframework.batch.item.file.LineMapper;import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;import org.springframework.batch.item.file.mapping.DefaultLineMapper;import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.Resource;import com.howtodoinjava.demo.model.Employee;@Configuration@EnableBatchProcessingpublic class BatchConfig{@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Value("/input/inputData.csv")private Resource inputResource;@Beanpublic Job readCSVFilesJob() {return jobBuilderFactory.get("readCSVFilesJob").incrementer(new RunIdIncrementer()).start(step1()).build();}@Beanpublic Step step1() {return stepBuilderFactory.get("step1").<Employee, Employee>chunk(1).reader(reader()).writer(writer()).listener(listner()).build();}@Beanpublic ItemCountListener listner() {return new ItemCountListener();}@Beanpublic FlatFileItemReader<Employee> reader() {FlatFileItemReader<Employee> itemReader = new FlatFileItemReader<Employee>();itemReader.setLineMapper(lineMapper());itemReader.setLinesToSkip(1);itemReader.setResource(inputResource);return itemReader;}@Beanpublic LineMapper<Employee> lineMapper() {DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<Employee>();DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();lineTokenizer.setNames(new String[] { "id", "firstName", "lastName" });lineTokenizer.setIncludedFields(new int[] { 0, 1, 2 });BeanWrapperFieldSetMapper<Employee> fieldSetMapper = new BeanWrapperFieldSetMapper<Employee>();fieldSetMapper.setTargetType(Employee.class);lineMapper.setLineTokenizer(lineTokenizer);lineMapper.setFieldSetMapper(fieldSetMapper);return lineMapper;}@Beanpublic ConsoleItemWriter<Employee> writer() {return new ConsoleItemWriter<Employee>();}}
作为 Spring 启动应用程序启动该应用程序。 Spring 任务调度器将开始工作。
App.java
import org.springframework.batch.core.Job;import org.springframework.batch.core.JobParameters;import org.springframework.batch.core.JobParametersBuilder;import org.springframework.batch.core.launch.JobLauncher;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.scheduling.annotation.EnableScheduling;import org.springframework.scheduling.annotation.Scheduled;@SpringBootApplication@EnableSchedulingpublic class App{@AutowiredJobLauncher jobLauncher;@AutowiredJob job;public static void main(String[] args){SpringApplication.run(App.class, args);}@Scheduled(cron = "0 */1 * * * ?")public void perform() throws Exception{JobParameters params = new JobParametersBuilder().addString("JobID", String.valueOf(System.currentTimeMillis())).toJobParameters();jobLauncher.run(job, params);}}
现在观看控制台。
Console
2018-07-11 16:38:00 INFO - Job: [SimpleJob: [name=readCSVFilesJob]] launched with the following parameters: [{JobID=1531307280004}]2018-07-11 16:38:00 INFO - Executing step: [step1]Employee [id=1, firstName=Lokesh, lastName=Gupta]ItemCount: 1Employee [id=2, firstName=Amit, lastName=Mishra]ItemCount: 2Employee [id=3, firstName=Pankaj, lastName=Kumar]ItemCount: 3Employee [id=4, firstName=David, lastName=Miller]ItemCount: 4Employee [id=5, firstName=David, lastName=Walsh]ItemCount: 5ItemCount: 52018-07-11 16:38:00 INFO - Job: [SimpleJob: [name=readCSVFilesJob]] completed with the following parameters: [{JobID=1531307280004}] and the following status: [COMPLETED]
将我的问题放在评论部分。
学习愉快!
参考:
