原文: https://howtodoinjava.com/spring-batch/multiresourceitemreader-read-multiple-csv-files-example/

学习使用MultiResourceItemReader类从文件系统或资源文件夹中读取多个 CSV 文件。 这些文件可能具有第一行作为标题,因此不要忘记跳过第一行。

项目结构

在此项目中,我们将:

  1. input/*.csv读取 3 个 CSV 文件。
  2. 将数据写入控制台。

Spring Batch `MultiResourceItemReader` – 读取多个 CSV 文件示例 - 图1

项目结构

使用MultiResourceItemReader读取 CSV 文件

您需要使用MultiResourceItemReader从 CSV 文件中读取行。 它从多个资源顺序读取项目。

BatchConfig.java

  1. @Value("input/inputData*.csv")
  2. private Resource[] inputResources;
  3. @Bean
  4. public Job readCSVFilesJob() {
  5. return jobBuilderFactory
  6. .get("readCSVFilesJob")
  7. .incrementer(new RunIdIncrementer())
  8. .start(step1())
  9. .build();
  10. }
  11. @Bean
  12. public Step step1() {
  13. return stepBuilderFactory.get("step1").<Employee, Employee>chunk(5)
  14. .reader(multiResourceItemReader())
  15. .writer(writer())
  16. .build();
  17. }
  18. @Bean
  19. public MultiResourceItemReader<Employee> multiResourceItemReader()
  20. {
  21. MultiResourceItemReader<Employee> resourceItemReader = new MultiResourceItemReader<Employee>();
  22. resourceItemReader.setResources(inputResources);
  23. resourceItemReader.setDelegate(reader());
  24. return resourceItemReader;
  25. }
  26. @Bean
  27. public FlatFileItemReader<Employee> reader()
  28. {
  29. //Create reader instance
  30. FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
  31. //Set number of lines to skips. Use it if file has header rows.
  32. reader.setLinesToSkip(1);
  33. //Configure how each line will be parsed and mapped to different values
  34. reader.setLineMapper(new DefaultLineMapper() {
  35. {
  36. //3 columns in each row
  37. setLineTokenizer(new DelimitedLineTokenizer() {
  38. {
  39. setNames(new String[] { "id", "firstName", "lastName" });
  40. }
  41. });
  42. //Set values in Employee class
  43. setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
  44. {
  45. setTargetType(Employee.class);
  46. }
  47. });
  48. }
  49. });
  50. return reader;
  51. }

Employee.java

  1. public class Employee {
  2. String id;
  3. String firstName;
  4. String lastName;
  5. //public setter and getter methods
  6. }

inputData1.csv

  1. id,firstName,lastName
  2. 1,Lokesh,Gupta
  3. 2,Amit,Mishra
  4. 3,Pankaj,Kumar
  5. 4,David,Miller

inputData2.csv

  1. id,firstName,lastName
  2. 5,Ramesh,Gupta
  3. 6,Vineet,Mishra
  4. 7,Amit,Kumar
  5. 8,Dav,Miller

inputData3.csv

  1. id,firstName,lastName
  2. 9,Vikas,Kumar
  3. 10,Pratek,Mishra
  4. 11,Brian,Kumar
  5. 12,David,Cena

将读取的行写入控制台

创建实现ItemWriter接口的ConsoleItemWriter类。

ConsoleItemWriter.java

  1. import java.util.List;
  2. import org.springframework.batch.item.ItemWriter;
  3. public class ConsoleItemWriter<T> implements ItemWriter<T> {
  4. @Override
  5. public void write(List<? extends T> items) throws Exception {
  6. for (T item : items) {
  7. System.out.println(item);
  8. }
  9. }
  10. }

使用ConsoleItemWriter作为编写器。

BatchConfig.java

  1. @Bean
  2. public ConsoleItemWriter<Employee> writer()
  3. {
  4. return new ConsoleItemWriter<Employee>();
  5. }

Maven 依赖

查看项目依赖项。

pom.xml

  1. <project xmlns="http://maven.apache.org/POM/4.0.0"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;
  4. <modelVersion>4.0.0</modelVersion>
  5. <groupId>com.howtodoinjava</groupId>
  6. <artifactId>App</artifactId>
  7. <version>0.0.1-SNAPSHOT</version>
  8. <packaging>jar</packaging>
  9. <name>App</name>
  10. <url>http://maven.apache.org</url>
  11. <parent>
  12. <groupId>org.springframework.boot</groupId>
  13. <artifactId>spring-boot-starter-parent</artifactId>
  14. <version>2.0.3.RELEASE</version>
  15. </parent>
  16. <properties>
  17. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-batch</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>com.h2database</groupId>
  26. <artifactId>h2</artifactId>
  27. <scope>runtime</scope>
  28. </dependency>
  29. </dependencies>
  30. <build>
  31. <plugins>
  32. <plugin>
  33. <groupId>org.springframework.boot</groupId>
  34. <artifactId>spring-boot-maven-plugin</artifactId>
  35. </plugin>
  36. </plugins>
  37. </build>
  38. <repositories>
  39. <repository>
  40. <id>repository.spring.release</id>
  41. <name>Spring GA Repository</name>
  42. <url>http://repo.spring.io/release</url>
  43. </repository>
  44. </repositories>
  45. </project>

示例

在运行该应用程序之前,请查看BatchConfig.java的完整代码。

BatchConfig.java

  1. package com.howtodoinjava.demo.config;
  2. import org.springframework.batch.core.Job;
  3. import org.springframework.batch.core.Step;
  4. import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
  5. import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
  6. import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
  7. import org.springframework.batch.core.launch.support.RunIdIncrementer;
  8. import org.springframework.batch.item.file.FlatFileItemReader;
  9. import org.springframework.batch.item.file.MultiResourceItemReader;
  10. import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
  11. import org.springframework.batch.item.file.mapping.DefaultLineMapper;
  12. import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
  13. import org.springframework.beans.factory.annotation.Autowired;
  14. import org.springframework.beans.factory.annotation.Value;
  15. import org.springframework.context.annotation.Bean;
  16. import org.springframework.context.annotation.Configuration;
  17. import org.springframework.core.io.Resource;
  18. import com.howtodoinjava.demo.model.Employee;
  19. @Configuration
  20. @EnableBatchProcessing
  21. public class BatchConfig
  22. {
  23. @Autowired
  24. private JobBuilderFactory jobBuilderFactory;
  25. @Autowired
  26. private StepBuilderFactory stepBuilderFactory;
  27. @Value("input/inputData*.csv")
  28. private Resource[] inputResources;
  29. @Bean
  30. public Job readCSVFilesJob() {
  31. return jobBuilderFactory
  32. .get("readCSVFilesJob")
  33. .incrementer(new RunIdIncrementer())
  34. .start(step1())
  35. .build();
  36. }
  37. @Bean
  38. public Step step1() {
  39. return stepBuilderFactory.get("step1").<Employee, Employee>chunk(5)
  40. .reader(multiResourceItemReader())
  41. .writer(writer())
  42. .build();
  43. }
  44. @Bean
  45. public MultiResourceItemReader<Employee> multiResourceItemReader()
  46. {
  47. MultiResourceItemReader<Employee> resourceItemReader = new MultiResourceItemReader<Employee>();
  48. resourceItemReader.setResources(inputResources);
  49. resourceItemReader.setDelegate(reader());
  50. return resourceItemReader;
  51. }
  52. @SuppressWarnings({ "rawtypes", "unchecked" })
  53. @Bean
  54. public FlatFileItemReader<Employee> reader()
  55. {
  56. //Create reader instance
  57. FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
  58. //Set number of lines to skips. Use it if file has header rows.
  59. reader.setLinesToSkip(1);
  60. //Configure how each line will be parsed and mapped to different values
  61. reader.setLineMapper(new DefaultLineMapper() {
  62. {
  63. //3 columns in each row
  64. setLineTokenizer(new DelimitedLineTokenizer() {
  65. {
  66. setNames(new String[] { "id", "firstName", "lastName" });
  67. }
  68. });
  69. //Set values in Employee class
  70. setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
  71. {
  72. setTargetType(Employee.class);
  73. }
  74. });
  75. }
  76. });
  77. return reader;
  78. }
  79. @Bean
  80. public ConsoleItemWriter<Employee> writer()
  81. {
  82. return new ConsoleItemWriter<Employee>();
  83. }
  84. }

App.java

  1. package com.howtodoinjava.demo;
  2. import org.springframework.batch.core.Job;
  3. import org.springframework.batch.core.JobParameters;
  4. import org.springframework.batch.core.JobParametersBuilder;
  5. import org.springframework.batch.core.launch.JobLauncher;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.boot.SpringApplication;
  8. import org.springframework.boot.autoconfigure.SpringBootApplication;
  9. import org.springframework.scheduling.annotation.EnableScheduling;
  10. import org.springframework.scheduling.annotation.Scheduled;
  11. @SpringBootApplication
  12. @EnableScheduling
  13. public class App
  14. {
  15. @Autowired
  16. JobLauncher jobLauncher;
  17. @Autowired
  18. Job job;
  19. public static void main(String[] args)
  20. {
  21. SpringApplication.run(App.class, args);
  22. }
  23. @Scheduled(cron = "0 */1 * * * ?")
  24. public void perform() throws Exception
  25. {
  26. JobParameters params = new JobParametersBuilder()
  27. .addString("JobID", String.valueOf(System.currentTimeMillis()))
  28. .toJobParameters();
  29. jobLauncher.run(job, params);
  30. }
  31. }

application.properties

  1. #Disable batch job's auto start
  2. spring.batch.job.enabled=false
  3. spring.main.banner-mode=off

运行应用程序

将应用程序作为 Spring 运行应用程序运行,并观察控制台。 批处理作业将在每分钟开始时开始。 它将读取输入文件,并在控制台中打印读取的值。

Console

  1. 2018-07-10 15:32:26 INFO - Starting App on XYZ with PID 4596 (C:\Users\user\workspace\App\target\classes started by zkpkhua in C:\Users\user\workspace\App)
  2. 2018-07-10 15:32:26 INFO - No active profile set, falling back to default profiles: default
  3. 2018-07-10 15:32:27 INFO - Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@3c9d0b9d: startup date [Tue Jul 10 15:32:27 IST 2018]; root of context hierarchy
  4. 2018-07-10 15:32:28 INFO - HikariPool-1 - Starting...
  5. 2018-07-10 15:32:29 INFO - HikariPool-1 - Start completed.
  6. 2018-07-10 15:32:29 INFO - No database type set, using meta data indicating: H2
  7. 2018-07-10 15:32:29 INFO - No TaskExecutor has been set, defaulting to synchronous executor.
  8. 2018-07-10 15:32:29 INFO - Executing SQL script from class path resource [org/springframework/batch/core/schema-h2.sql]
  9. 2018-07-10 15:32:29 INFO - Executed SQL script from class path resource [org/springframework/batch/core/schema-h2.sql] in 68 ms.
  10. 2018-07-10 15:32:30 INFO - Registering beans for JMX exposure on startup
  11. 2018-07-10 15:32:30 INFO - Bean with name 'dataSource' has been autodetected for JMX exposure
  12. 2018-07-10 15:32:30 INFO - Located MBean 'dataSource': registering with JMX server as MBean [com.zaxxer.hikari:name=dataSource,type=HikariDataSource]
  13. 2018-07-10 15:32:30 INFO - No TaskScheduler/ScheduledExecutorService bean found for scheduled processing
  14. 2018-07-10 15:32:30 INFO - Started App in 4.036 seconds (JVM running for 4.827)
  15. 2018-07-10 15:33:00 INFO - Job: [SimpleJob: [name=readCSVFilesJob]] launched with the following parameters: [{JobID=1531216980005}]
  16. 2018-07-10 15:33:00 INFO - Executing step: [step1]
  17. Employee [id=1, firstName=Lokesh, lastName=Gupta]
  18. Employee [id=2, firstName=Amit, lastName=Mishra]
  19. Employee [id=3, firstName=Pankaj, lastName=Kumar]
  20. Employee [id=4, firstName=David, lastName=Miller]
  21. Employee [id=5, firstName=Ramesh, lastName=Gupta]
  22. Employee [id=6, firstName=Vineet, lastName=Mishra]
  23. Employee [id=7, firstName=Amit, lastName=Kumar]
  24. Employee [id=8, firstName=Dav, lastName=Miller]
  25. Employee [id=9, firstName=Vikas, lastName=Kumar]
  26. Employee [id=10, firstName=Pratek, lastName=Mishra]
  27. Employee [id=11, firstName=Brian, lastName=Kumar]
  28. Employee [id=12, firstName=David, lastName=Cena]
  29. 2018-07-10 15:33:00 INFO - Job: [SimpleJob: [name=readCSVFilesJob]] completed with the following parameters: [{JobID=1531216980005}] and the following status: [COMPLETED]

将我的问题放在评论部分。

学习愉快!