tags: springbatch


1.引言

上一篇文章《快速了解组件-spring batch(2)之helloworld》Spring Batch进行了入门级的开发,也对基本的组件有了一定的了解。但实际开发过程中,更多的是涉及文件及数据库的操作,以定时后台运行的方式,实现批处理操作。典型操作是从文本数据(csv/txt等文件)中读取数据,然后写入到数据库存储。如下图所示:

快速使用组件-spring batch(3)读文件数据到数据库 - 图1

若需要开发此过程,可以按照上一篇文章所写的,自定义ItemReaderItemWriter来实现,但是Spring Batch其实已经提供现成的文件读取和数据库写入的组件,开发人员可以直接使用,提高开发效率。本文将会对文件读取和数据库写入进行实战介绍。

2.开发环境

  • JDK: jdk1.8
  • Spring Boot: 2.1.4.RELEASE
  • Spring Batch:4.1.2.RELEASE
  • 开发IDE: IDEA
  • 构建工具Maven: 3.3.9
  • 日志组件logback:1.2.3
  • lombok:1.18.6

3.Spring Batch提供的读-处理-写组件一览

在使用Spring Batch内置的读写组件时,首先我们先弄清楚有哪些组件可以用,按读、写、处理,见下面说明。Spring Batch已提供了比较全面的支持。

3.1 ItemReader

ItemReader 说明
ListItemReader 读取List类型数据,只能读一次
ItemReaderAdapter ItemReader适配器,可以复用现有的读操作
FlatFileItemReader 读Flat类型文件
StaxEventItemReader 读XML类型文件
JdbcCursorItemReader 基于JDBC游标方式读数据库
HibernateCursorItemReader 基于Hibernate游标方式读数据库
StoredProcedureItemReader 基于存储过程读数据库
JpaPagingItemReader 基于Jpa方式分页读数据库
JdbcPagingItemReader 基于JDBC方式分页读数据库
HibernatePagingItemReader 基于Hibernate方式分页读取数据库
JmsItemReader 读取JMS队列
IteratorItemReader 迭代方式的读组件
MultiResourceItemReader 多文件读组件
MongoItemReader 基于分布式文件存储的数据库 MongoDB读组件
Neo4jItemReader 面向网络的数据库Neo4j的读组件
ResourcesItemReader 基于批量资源的读组件,每次读取返回资源对象 AmqpItemReader读取AMQP队列组件
RepositoryItemReader 基于 Spring Data的读组件

3.2 ItemWriter

ItemWriter 说明
FlatFileItemWriter 写Flat类型文件
MultiResourceItemWriter 多文件写组件
StaxEventItemWriter 写XML类型文件
AmqpItemWriter 写AMQP类型消息
ClassifierCompositeItemWriter 根据 Classifier路由不同的Item到特定的ItemWriter处理
HiberateItemWriter 基于Hibernate方式写数据库
ItemWriterAdapter ItemWriter适配器,可以复用现有的写服务
JdbcBatchItemWriter 基于JDBC方式写数据库
JmsItemWriter 写JMS队列 JpaItemWriter基于Jpa方式写数据库
GemfireItemWriter 基于分布式数据库Gemfire的写组件
SpELMappingGemfireItemWriter 基于Spring表达式语言写分布式数据库Gemfire的写组件
MimeMessageItemWriter 发送邮件的写组件
MongoItemWriter 基于分布式文件存储的数据库MongoDB写组件
Neo4jItemWriter 面向网络的数据库Neo4j的读组件
PropertyExtractingDelegatingItemWriter 属性抽取代理写组件:通过调用给定的 Spring Bean方法执行写入,参数由Item中指定的属性字段获取作为参数
RepositoryItemWriter基于 Spring Data的写组件
SimpleMailMessageItemWriter 发送邮件的写组件
CompositeItemWriter 条目写的组合模式,支持组装多个ItemWriter

3.3 ItemProcessor

ItemProcessor 说明
CompositeItemProcessor 组合处理器,可以封装多个业务处理服务
ItemProcessorAdapter ItemProcessor适配器,可以复用现有的业务处理服务
PassThroughItemProcessor 不做任何业务处理,直接返回读到的数据
ValidatingItemProcessor 数据校验处理器,支持对数据的校验,如果校验不通过可以进行过滤掉或者通过skip的方式跳过对记录的处理

4.开发流程

根据当前示例,从csv文件中读数据,写入到mysql数据库,只需要使用FlatFileItemReaderJdbcBatchItemWriter即可。下面对开发流程作简要说明。示例工程可以在这里获取,里面有文件resources/user-data.csv及相应的目标数据库脚本mytest.sql

4.1 创建spring batch数据库

4.1.1 创建数据库并执行sql脚本

Spring Batch的运行需要数据库的支持,以保存任务的运行状态及结果。因此需要先创建数据库。在mysql中创建名为my_spring_batch的数据库。并在此数据库中执行
Spring Batch的数据库脚本,脚本位置在spring-batch-core-4.1.2.RELEASE.jar的jar包中的\org\springframework\batch\core\schema-mysql.sql(也可以在示例工程sql文件夹中获取)。执行完成后,数据库表如下图所示:

快速使用组件-spring batch(3)读文件数据到数据库 - 图2

4.1.2 数据库表说明

数据库共9张表,以seq结尾的是用于生成主键的。其它6张表,以batch_job开头的是存储任务的相关信息,batch_step开头的存储步骤相关信息。

  • jobjob instancejob execution关系
    任务job是我们说的逻辑概念,即完整的一个批处理工作,它的实例就是job instance,此任务信息是存储在batch_job_instance表中。有点类似java中的类和类实例的概念,是一对多的关系。对于每一个job instance,执行的时候会生成记录存储在batch_job_execution中,表示每一个job执行的实际情况。注意,这里job instancejob execution也是一对多的关系,即同一个实例有可能会执行多次(如上一次执行失败了,后面重新再执行)。
  • batch_job_execution_contextbatch_job_execution_params
    存储任务执行时需要用到的上下文(以json格式存储)及运行时使用的参数。
  • batch_step_executionbatch_step_execution_context
    存储任务执行过程中的作业步骤及运行时上下文。

4.1.3 创建示例目标数据库

本示例只涉及一个test_user表。创建mytest数据库库,执行mytest.sql脚本即可。

4.2 配置多数据源

一般来说,我们会把Spring Batch的数据存储在独立的数据库中,而实际的应用使用的则是目标数据库,因此需要配置多数据源访问。基于上一篇文章.html)的工程进行开发。

4.2.1 添加mysql数据库依赖

  1. <!-- 数据库相关依赖-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-jdbc</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-data-jpa</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>mysql</groupId>
  12. <artifactId>mysql-connector-java</artifactId>
  13. <scope>runtime</scope>
  14. </dependency>

4.2.2 配置多数据源访问

Spring Boot对多数据源的支持比较友好,配置也很简单,先在配置文件中添加数据库配置,然后在java配置文件中添加相应的注解即可。如下:

  • application.properties配置内容
  1. # spring batch db
  2. spring.datasource.jdbc-url=jdbc:mysql://localhost:3310/my_spring_batch?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false
  3. spring.datasource.username=root
  4. spring.datasource.password=111111
  5. # target db
  6. spring.target-datasource.jdbc-url=jdbc:mysql://localhost:3310/mytest?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false
  7. spring.target-datasource.username=root
  8. spring.target-datasource.password=111111
  • DataSourceConfig配置内容
    新建DataSourceConfig.java文件,配置多数据源,如下:
  1. @Configuration
  2. public class DataSourceConfig {
  3. @Bean("datasource")
  4. @ConfigurationProperties(prefix="spring.datasource")
  5. @Primary
  6. public DataSource batchDatasource() {
  7. return DataSourceBuilder.create().build();
  8. }
  9. @Bean("targetDatasource")
  10. @ConfigurationProperties(prefix="spring.target-datasource")
  11. public DataSource targetDatasource() {
  12. return DataSourceBuilder.create().build();
  13. }
  14. }

这样,后面就可以直接使用datasourcetargetDatasource两个Bean进行数据库访问。

4.3 添加User实体

本实例中,读取csv文件,转为User实体,然后存储到数据库,因此需要先把User这个实体作一个定义。使用了lombokjpa的注解,如下:

  1. @Entity
  2. @Data
  3. @Table(name="test_user")
  4. public class User{
  5. @Id
  6. @GeneratedValue
  7. /**
  8. * id
  9. */
  10. private Long id;
  11. /**
  12. * 姓名
  13. */
  14. private String name;
  15. /**
  16. * 手机号
  17. */
  18. private String phone;
  19. ...略

4.4 添加文件读取组件ItemReader

使用内置的FlatFileItemReader即可。如下:

  1. @Bean
  2. public ItemReader file2DbItemReader(){
  3. String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
  4. return new FlatFileItemReaderBuilder<User>()
  5. .name(funcName)
  6. .resource(new ClassPathResource("user-data.csv"))
  7. // .linesToSkip(1)
  8. .delimited()
  9. .names(new String[]{"id","name","phone","title","email","gender","date_of_birth","sys_create_time","sys_create_user","sys_update_time","sys_update_user"})
  10. .fieldSetMapper(new UserFieldSetMapper())
  11. .build();
  12. }

说明:

  • FlatFileItemReaderBuilder用于创建FlatFileItemReader,设置相应的行为,包括使用它来设置读取文件的位置(resource),文件分隔符(默认是','),是否跳过前面几行(linesToSkip),标识每一列对应的列名称(可与数据库的字段名一致)。设置文件字段与数据库实体字段的对应关系。
  • 设置文件字段与数据库实体字段的对应关系,使用FieldSetMapper来实现,其中FieldSet代表每一行文本数据,返回值即为实体对象。如下所示:
  1. public class UserFieldSetMapper implements FieldSetMapper<User> {
  2. @Override
  3. public User mapFieldSet(FieldSet fieldSet) throws BindException {
  4. String patternYmd = "yyyy-MM-dd";
  5. String patternYmdHms = "yyyy-MM-dd HH:mm:ss";
  6. User user = new User();
  7. user.setId(fieldSet.readLong("id"));
  8. user.setName(fieldSet.readString("name"));
  9. user.setPhone(fieldSet.readString("phone"));
  10. user.setTitle(fieldSet.readString("title"));
  11. user.setEmail(fieldSet.readString("email"));
  12. user.setGender(fieldSet.readString("gender"));
  13. //此字段有可能为null
  14. String dataOfBirthStr = fieldSet.readString("date_of_birth");
  15. if(SyncConstants.STR_CSV_NULL.equals(dataOfBirthStr)){
  16. user.setDateOfBirth(null);
  17. }else{
  18. DateTime dateTime = DateUtil.parse(dataOfBirthStr, patternYmd);
  19. user.setDateOfBirth(dateTime.toJdkDate());
  20. }
  21. user.setSysCreateTime(fieldSet.readDate("sys_create_time",patternYmdHms));
  22. user.setSysCreateUser(fieldSet.readString("sys_create_user"));
  23. user.setSysUpdateTime(fieldSet.readDate("sys_update_time",patternYmdHms));
  24. user.setSysUpdateUser(fieldSet.readString("sys_update_user"));
  25. return user;
  26. }
  27. }

4.5 添加处理组件ItemProcessor

由于csv文本文件中的数据null值数据标识符为\N,因此可以在处理组件中进行处理,把标识符\N设置为null值。如下所示:

  1. @Slf4j
  2. public class File2DbItemProcessor implements ItemProcessor<User,User> {
  3. @Override
  4. public User process(User user) throws Exception {
  5. user.setPhone(checkStr(user.getPhone()));
  6. user.setTitle(checkStr(user.getTitle()));
  7. user.setEmail(checkStr(user.getEmail()));
  8. user.setGender(checkStr(user.getGender()));
  9. log.info(LogConstants.LOG_TAG + "item process: " +user.getName());
  10. return user;
  11. }
  12. public String checkStr(String dataToCheck){
  13. if(SyncConstants.STR_CSV_NULL.equals(dataToCheck)){
  14. return null;
  15. }
  16. return dataToCheck;
  17. }
  18. }

4.6 添加数据库写入组件ItemWriter

数据库写入组件使用JdbcBatchItemWriter即可,如下:

  1. @Bean
  2. public ItemWriter file2DbWriter(@Qualifier("targetDatasource") DataSource datasource){
  3. return new JdbcBatchItemWriterBuilder<User>()
  4. .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
  5. .sql("INSERT INTO test_user(id,name,phone,title,email,gender,date_of_birth,sys_create_time,sys_create_user,sys_update_time,sys_update_user) " +
  6. "VALUES (:id,:name,:phone,:title,:email,:gender,:dateOfBirth,:sysCreateTime,:sysCreateUser,:sysUpdateTime,:sysUpdateUser)")
  7. .dataSource(datasource)
  8. .build();
  9. }

说明:

  • 使用JdbcBatchItemWriterBuilder进行JdbcBatchItemWriter的创建,设置插入数据库的sql语句,同时指定数据源即可。
  • @Qualifier("targetDatasource") DataSource datasource用于指定数据源
  • 使用BeanPropertyItemSqlParameterSourceProvider可以直接把读取的数据实体的属性数据作为参数填充到sql语句中,从而实现数据插入操作。

4.7 组装完整任务

经过上面的操作,可以使用一个java配置,把读、写、处理组装成完整的stepjob,如下所示(详细可见示例工程文件):

File2DbBatchConfig.java

  1. @Bean
  2. public Job file2DbJob(Step file2DbStep,JobExecutionListener file2DbListener){
  3. String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
  4. return jobBuilderFactory.get(funcName)
  5. .listener(file2DbListener)
  6. .flow(file2DbStep)
  7. .end().build();
  8. }
  9. @Bean
  10. public Step file2DbStep(ItemReader file2DbItemReader , ItemProcessor file2DbProcessor
  11. ,ItemWriter file2DbWriter){
  12. String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
  13. return stepBuilderFactory.get(funcName)
  14. .<User,User>chunk(10)
  15. .reader(file2DbItemReader)
  16. .processor(file2DbProcessor)
  17. .writer(file2DbWriter)
  18. .build();
  19. }

4.8 编写测试

参考上一文章的ConsoleJobTest,编写File2DbJobTest文件。

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest(classes = {MainBootApplication.class,File2DbBatchConfig.class})
  3. @Slf4j
  4. public class File2DbJobTest {
  5. @Autowired
  6. private JobLauncherService jobLauncherService;
  7. @Autowired
  8. private Job file2DbJob;
  9. @Test
  10. public void testFile2DbJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
  11. //构建任务运行参数
  12. JobParameters jobParameters = JobUtil.makeJobParameters();
  13. //执行并显示结果
  14. Map<String, Object> stringObjectMap = jobLauncherService.startJob(file2DbJob, jobParameters);
  15. Assert.assertEquals(ExitStatus.COMPLETED,stringObjectMap.get(SyncConstants.STR_RETURN_EXITSTATUS));
  16. }
  17. }

执行后结果输出如下(exitCode=COMPLETED):

快速使用组件-spring batch(3)读文件数据到数据库 - 图3

5.总结

本文先对Spring Batch的开箱即用的ItemReaderItemWriterItemProcessor作了一个简要的概览,然后以读csv文件,处理null值,再插入到数据库的处理逻辑为案例,介绍了Spring Batch的数据库脚本,FlatFileItemReaderJdbcBatchItemWriter的使用。希望对大家更深入的了解Spring Batch有帮助,并能用到实践中。

参考资源