tags: springbatch mongodb


1.引言

之前对Spring Batch的通过实例的方式进行了介绍,有兴趣的可见以下文章:

除了文件及关系型数据库的数据同步,Spring Batch的读组件(ItemReader),处理组件(ItemProcessor),写组件(ItemWriter)支持丰富的数据类型,其中MongoItemReaderMongoItemWriter是针对mongo的读写组件,用户可以直接使用,进行Mongodb的数据读写操作。一种比较常用的情景是从关系型数据库(如mysql)把数据同步到mongodb中,下面通过实例对mysqlmongodb的数据同步进行讲解。本文主要讲解有关Mongodb的操作,对于Spring Batch使用beetlsql进行关系数据库数据读取的操作请见文章《便捷的数据读写-spring batch(5)结合beetlSql进行数据读写》。本文的示例代码见github示例仓库

2.开发环境

  • JDK: jdk1.8
  • Spring Boot: 2.1.4.RELEASE
  • Spring Batch:4.1.2.RELEASE
  • 开发IDE: IDEA
  • 构建工具Maven: 3.3.9
  • 日志组件logback:1.2.3
  • lombok:1.18.6
  • MySQL: 5.6.26
  • Mongodb:4.0.10

3.开发流程

3.1 示例数据库及目标数据库

本示例的流程如下所示:

mongo同步-spring batch(8)的mongo读写组件使用 - 图1

示例工程中的sql目录有相应的关系数据库脚本,mytest.sql脚本创建一个test_user表,并有相应的测试数据。mongodb的安装可见官方文档,建立相应的存放数据的Collection,本示例为mytest

3.2 添加maven依赖及配置mongodb连接地址

由于需要使用mongodb的操作,因此需要添加它的依赖。如下所示:

  1. <!-- mongodb -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-data-mongodb</artifactId>
  5. </dependency>

添加依赖后,mongodb的连接地址需配置在配置文件中,若有用户名密码,则同样需要配置。如下:

  1. spring.data.mongodb.uri=mongodb://192.168.222.10/mytest
  2. # spring.data.mongodb.username=
  3. # spring.data.mongodb.password=

3.3 编写mongodb的读写组件

按示例,共三个组件,需要的是一个读mysql数据库的组件,一个mysql数据库实体转化为mongodb的处理组件,一个写入mongodb的写组件,代码结构如下图所示:

mongo同步-spring batch(8)的mongo读写组件使用 - 图2

其中ItemReader组件和ItemProcessor组件无须多讲,可参考之前的文章,这里主要讲一下mongodbItemWriter,此写入组件通过继承MongoItemWriter,编写自己的逻辑即可,而Spring Batch提供的mongodb写操作,是在初始化ItemWriter时,通过MongoOperations引入的,因此,MongoBatchConfig文件中,添加以下代码:

  1. @Bean
  2. public ItemWriter mongoWriter(MongoOperations mongoTemplate) {
  3. UserItemWriter userItemWriter = new UserItemWriter();
  4. userItemWriter.setTemplate(mongoTemplate);
  5. userItemWriter.setCollection("user");
  6. return userItemWriter;
  7. }

其中,MongoOperations是在初始化时注入,在自定义的UserItemWriter中,设置templatecollection即可。若逻辑简单,不写自定义的ItemWriter,也可以直接使用MongoItemWriterBuilder,直接构建MongoItemWriter,如下所示:

  1. return new MongoItemWriterBuilder<MongoUser>()
  2. .collection("user")
  3. .template(mongoTemplate)
  4. .build();

以上是写组件的构建,同理,对于mongodb的读组件,构建方式类似,只是需要注意一下动态参数的配置,如下示例代码是查询数据,并返回map,参数是在构建任务时动态传入的。

  1. @Bean
  2. @StepScope
  3. public MongoItemReader<Map> tweetsItemReader(MongoOperations mongoTemplate,@Value("#{jobParameters['hashTag']}") String hashtag) {
  4. return new MongoItemReaderBuilder<Map>()
  5. .name("tweetsItemReader")
  6. .targetType(Map.class)
  7. .jsonQuery("{ \"entities.hashtags.text\": { $eq: ?0 }}")
  8. .collection("tweets_collection")
  9. .parameterValues(Collections.singletonList(hashtag))
  10. .pageSize(10)
  11. .sorts(Collections.singletonMap("created_at", Sort.Direction.ASC))
  12. .template(mongoTemplate)
  13. .build();
  14. }

4.执行结果

编写单元测试或者在Controller编写启动任务,即可进行数据同步测试,执行结果如下所示:

mongo同步-spring batch(8)的mongo读写组件使用 - 图3

mongo同步-spring batch(8)的mongo读写组件使用 - 图4

5.总结

本文基于Spring Batch对数据从mysqlmongodb进行数据同步,通过结合示例代码,实现mongodb的读写组件进行编写及配置,希望需要使用Spring Batch进行关系数据库和mongodb进行批处理任务开发的人员有帮助。