原文: https://howtodoinjava.com/spring-batch/csv-to-database-java-config-example/

学习使用 Spring Batch 从 CSV 文件读取记录,并使用JdbcBatchItemWriter插入数据库。 我正在使用嵌入式数据库 H2 演示此示例。

项目概况

在此应用程序中,我们将执行以下任务:

  1. 使用FlatFileItemReader从 CSV 文件读取员工记录
  2. 配置 H2 数据库并在其中创建EMPLOYEE
  3. JdbcBatchItemWriter将员工记录写入EMPLOYEE
  4. 使用ItemProcessor将日志项插入数据库
  5. 使用 H2 控制台验证插入的记录

项目结构

Spring Batch CSV 到数据库 – Java 注解配置示例 - 图1

包结构

Maven 依赖

快速浏览构建此示例所需的 maven 依赖关系。 需要spring-boot-starter-web才能从浏览器窗口验证 H2 控制台中的数据。

pom.xml

  1. <project xmlns="http://maven.apache.org/POM/4.0.0"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;
  4. <modelVersion>4.0.0</modelVersion>
  5. <groupId>com.howtodoinjava</groupId>
  6. <artifactId>App</artifactId>
  7. <version>0.0.1-SNAPSHOT</version>
  8. <packaging>jar</packaging>
  9. <name>App</name>
  10. <url>http://maven.apache.org</url>
  11. <parent>
  12. <groupId>org.springframework.boot</groupId>
  13. <artifactId>spring-boot-starter-parent</artifactId>
  14. <version>2.0.3.RELEASE</version>
  15. </parent>
  16. <properties>
  17. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-batch</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-web</artifactId>
  27. </dependency>
  28. <dependency>
  29. <groupId>com.h2database</groupId>
  30. <artifactId>h2</artifactId>
  31. <scope>runtime</scope>
  32. </dependency>
  33. </dependencies>
  34. <build>
  35. <plugins>
  36. <plugin>
  37. <groupId>org.springframework.boot</groupId>
  38. <artifactId>spring-boot-maven-plugin</artifactId>
  39. </plugin>
  40. </plugins>
  41. </build>
  42. <repositories>
  43. <repository>
  44. <id>repository.spring.release</id>
  45. <name>Spring GA Repository</name>
  46. <url>http://repo.spring.io/release</url>
  47. </repository>
  48. </repositories>
  49. </project>

CSV 读取器和数据库写入器配置

  1. 我们将使用FlatFileItemReader读取 CSV 文件。 我们将使用涉及DefaultLineMapperDelimitedLineTokenizerBeanWrapperFieldSetMapper类的标准配置。
  2. 为了将记录写入数据库,我们将使用JdbcBatchItemWriter这是标准编写器,用于在数据库中为 Spring Batch 作业执行批处理查询。

BatchConfig.java

  1. package com.howtodoinjava.demo.config;
  2. import javax.sql.DataSource;
  3. import org.springframework.batch.core.Job;
  4. import org.springframework.batch.core.Step;
  5. import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
  6. import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
  7. import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
  8. import org.springframework.batch.core.launch.support.RunIdIncrementer;
  9. import org.springframework.batch.item.ItemProcessor;
  10. import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
  11. import org.springframework.batch.item.database.JdbcBatchItemWriter;
  12. import org.springframework.batch.item.file.FlatFileItemReader;
  13. import org.springframework.batch.item.file.LineMapper;
  14. import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
  15. import org.springframework.batch.item.file.mapping.DefaultLineMapper;
  16. import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
  17. import org.springframework.beans.factory.annotation.Autowired;
  18. import org.springframework.beans.factory.annotation.Value;
  19. import org.springframework.context.annotation.Bean;
  20. import org.springframework.context.annotation.Configuration;
  21. import org.springframework.core.io.Resource;
  22. import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
  23. import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
  24. import com.howtodoinjava.demo.model.Employee;
  25. @Configuration
  26. @EnableBatchProcessing
  27. public class BatchConfig {
  28. @Autowired
  29. private JobBuilderFactory jobBuilderFactory;
  30. @Autowired
  31. private StepBuilderFactory stepBuilderFactory;
  32. @Value("classPath:/input/inputData.csv")
  33. private Resource inputResource;
  34. @Bean
  35. public Job readCSVFileJob() {
  36. return jobBuilderFactory
  37. .get("readCSVFileJob")
  38. .incrementer(new RunIdIncrementer())
  39. .start(step())
  40. .build();
  41. }
  42. @Bean
  43. public Step step() {
  44. return stepBuilderFactory
  45. .get("step")
  46. .<Employee, Employee>chunk(5)
  47. .reader(reader())
  48. .processor(processor())
  49. .writer(writer())
  50. .build();
  51. }
  52. @Bean
  53. public ItemProcessor<Employee, Employee> processor() {
  54. return new DBLogProcessor();
  55. }
  56. @Bean
  57. public FlatFileItemReader<Employee> reader() {
  58. FlatFileItemReader<Employee> itemReader = new FlatFileItemReader<Employee>();
  59. itemReader.setLineMapper(lineMapper());
  60. itemReader.setLinesToSkip(1);
  61. itemReader.setResource(inputResource);
  62. return itemReader;
  63. }
  64. @Bean
  65. public LineMapper<Employee> lineMapper() {
  66. DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<Employee>();
  67. DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
  68. lineTokenizer.setNames(new String[] { "id", "firstName", "lastName" });
  69. lineTokenizer.setIncludedFields(new int[] { 0, 1, 2 });
  70. BeanWrapperFieldSetMapper<Employee> fieldSetMapper = new BeanWrapperFieldSetMapper<Employee>();
  71. fieldSetMapper.setTargetType(Employee.class);
  72. lineMapper.setLineTokenizer(lineTokenizer);
  73. lineMapper.setFieldSetMapper(fieldSetMapper);
  74. return lineMapper;
  75. }
  76. @Bean
  77. public JdbcBatchItemWriter<Employee> writer() {
  78. JdbcBatchItemWriter<Employee> itemWriter = new JdbcBatchItemWriter<Employee>();
  79. itemWriter.setDataSource(dataSource());
  80. itemWriter.setSql("INSERT INTO EMPLOYEE (ID, FIRSTNAME, LASTNAME) VALUES (:id, :firstName, :lastName)");
  81. itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Employee>());
  82. return itemWriter;
  83. }
  84. @Bean
  85. public DataSource dataSource(){
  86. EmbeddedDatabaseBuilder embeddedDatabaseBuilder = new EmbeddedDatabaseBuilder();
  87. return embeddedDatabaseBuilder.addScript("classpath:org/springframework/batch/core/schema-drop-h2.sql")
  88. .addScript("classpath:org/springframework/batch/core/schema-h2.sql")
  89. .addScript("classpath:employee.sql")
  90. .setType(EmbeddedDatabaseType.H2)
  91. .build();
  92. }
  93. }

还创建DBLogProcessor,它将在写入数据库之前记录员工记录。 它是可选的。

DBLogProcessor.java

  1. package com.howtodoinjava.demo.config;
  2. import org.springframework.batch.item.ItemProcessor;
  3. import com.howtodoinjava.demo.model.Employee;
  4. public class DBLogProcessor implements ItemProcessor<Employee, Employee>
  5. {
  6. public Employee process(Employee employee) throws Exception
  7. {
  8. System.out.println("Inserting employee : " + employee);
  9. return employee;
  10. }
  11. }

模型类

Employee.java

  1. package com.howtodoinjava.demo.config;
  2. package com.howtodoinjava.demo.model;
  3. public class Employee {
  4. String id;
  5. String firstName;
  6. String lastName;
  7. //Setter and getter methods
  8. }

应用属性

application.properties

  1. #Disable batch job's auto start
  2. spring.batch.job.enabled=false
  3. spring.main.banner-mode=off
  4. #batch input files location
  5. input.dir=c:/temp/input

日志配置

logback.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <configuration scan="true">
  3. <appender name="consoleAppender" class="ch.qos.logback.core.ConsoleAppender">
  4. <encoder>
  5. <charset>UTF-8</charset>
  6. <Pattern>%d{yyyy-MM-dd HH:mm:ss} %p %X{TXNID} - %m%n</Pattern>
  7. </encoder>
  8. </appender>
  9. <root level="INFO">
  10. <appender-ref ref="consoleAppender" />
  11. </root>
  12. </configuration>

配置 H2 数据库

我们已经在BatchConfig.java中配置了数据源。

BatchConfig.java

  1. @Bean
  2. public DataSource dataSource(){
  3. EmbeddedDatabaseBuilder embeddedDatabaseBuilder = new EmbeddedDatabaseBuilder();
  4. return embeddedDatabaseBuilder.addScript("classpath:org/springframework/batch/core/schema-drop-h2.sql")
  5. .addScript("classpath:org/springframework/batch/core/schema-h2.sql")
  6. .addScript("classpath:employee.sql")
  7. .setType(EmbeddedDatabaseType.H2)
  8. .build();
  9. }

创建EMPLOYEE

以上配置将自动生成默认表。 要生成EMPLOYEE表,请创建模式文件employee.sql并将其放置在resources文件夹中。

employee.sql

  1. DROP TABLE EMPLOYEE IF EXISTS;
  2. CREATE TABLE EMPLOYEE (
  3. ID VARCHAR(10),
  4. FIRSTNAME VARCHAR(100),
  5. LASTNAME VARCHAR(100)
  6. ) ;

启用 H2 控制台

要启用 H2 控制台,请向 Spring Web 注册org.h2.server.web.WebServlet

WebConfig.java

  1. package com.howtodoinjava.demo.config;
  2. import org.h2.server.web.WebServlet;
  3. import org.springframework.boot.web.servlet.ServletRegistrationBean;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @SuppressWarnings({"rawtypes","unchecked"})
  7. @Configuration
  8. public class WebConfig {
  9. @Bean
  10. ServletRegistrationBean h2servletRegistration(){
  11. ServletRegistrationBean registrationBean = new ServletRegistrationBean( new WebServlet());
  12. registrationBean.addUrlMappings("/console/*");
  13. return registrationBean;
  14. }
  15. }

示例

我们的应用程序配置完成,作业准备执行。 让我们创建输入 CSV 文件。

inputData.csv

  1. id,firstName,lastName
  2. 1,Lokesh,Gupta
  3. 2,Amit,Mishra
  4. 3,Pankaj,Kumar
  5. 4,David,Miller
  6. 5,David,Walsh

运行演示

要运行演示和批处理作业,请创建 Spring 运行应用程序类并启动应用程序。

App.java

  1. package com.howtodoinjava.demo;
  2. import org.springframework.batch.core.Job;
  3. import org.springframework.batch.core.JobParameters;
  4. import org.springframework.batch.core.JobParametersBuilder;
  5. import org.springframework.batch.core.launch.JobLauncher;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.boot.SpringApplication;
  8. import org.springframework.boot.autoconfigure.SpringBootApplication;
  9. import org.springframework.scheduling.annotation.EnableScheduling;
  10. import org.springframework.scheduling.annotation.Scheduled;
  11. @SpringBootApplication
  12. @EnableScheduling
  13. public class App
  14. {
  15. @Autowired
  16. JobLauncher jobLauncher;
  17. @Autowired
  18. Job job;
  19. public static void main(String[] args)
  20. {
  21. SpringApplication.run(App.class, args);
  22. }
  23. @Scheduled(cron = "0 */1 * * * ?")
  24. public void perform() throws Exception
  25. {
  26. JobParameters params = new JobParametersBuilder()
  27. .addString("JobID", String.valueOf(System.currentTimeMillis()))
  28. .toJobParameters();
  29. jobLauncher.run(job, params);
  30. }
  31. }

验证批处理作业结果

要验证批处理作业是否成功执行,请检查日志和 H2 控制台。

Console

  1. 2018-07-11 19:11:00 INFO - Job: [SimpleJob: [name=readCSVFileJob]] launched with the following parameters: [{JobID=1531316460004}]
  2. 2018-07-11 19:11:00 INFO - Executing step: [step]
  3. Inserting employee : Employee [id=1, firstName=Lokesh, lastName=Gupta]
  4. Inserting employee : Employee [id=2, firstName=Amit, lastName=Mishra]
  5. Inserting employee : Employee [id=3, firstName=Pankaj, lastName=Kumar]
  6. Inserting employee : Employee [id=4, firstName=David, lastName=Miller]
  7. Inserting employee : Employee [id=5, firstName=David, lastName=Walsh]
  8. 2018-07-11 19:11:00 INFO - Job: [SimpleJob: [name=readCSVFileJob]] completed with the following parameters: [{JobID=1531316460004}] and the following status: [COMPLETED]

H2 控制台

Spring Batch CSV 到数据库 – Java 注解配置示例 - 图2

H2 控制台登录

Spring Batch CSV 到数据库 – Java 注解配置示例 - 图3

H2 控制台中的数据

将我的问题放在评论部分。

学习愉快!

下载源码