tags: springbatch


1.引言

上一篇文章《决战数据库-spring batch(4)数据库到数据库》.html)使用Spring Batch内置的读、写组件,对数据库间的数据进行同步。相对来说,这个数据读取和数据写入是基于jdbc进行读写的(数据对象的映射需要我们自己处理,如UserRowMapper),我们现在开发一般都使用上层一点的ORM框架,如Hibernate,MyBatis,BeetlSQL。对于Hibernate,Spring Batch有默认的HibernateCursorItemReaderHibernateItemWriter,也可以实现自行使用MyBatisBeetlSQL。个人感觉,从易用性和学习曲线上看,BeetlSQL会更容易上手,社区也挺活跃,因此,本文介绍一下使用Spring Batch结合BeetlSQL进行数据库到数据库的数据同步。

2.开发环境

  • JDK: jdk1.8
  • Spring Boot: 2.1.4.RELEASE
  • Spring Batch:4.1.2.RELEASE
  • BeetlSQL: 1.1.77.RELEASE
  • 开发IDE: IDEA
  • 构建工具Maven: 3.3.9
  • 日志组件logback:1.2.3
  • lombok:1.18.6

3.BeetlSQL简要说明

按官方文档,BeetlSQL是一个全功能DAO工具, 同时具有Hibernate优点 & Mybatis优点功能,适用于承认以SQL为中心,同时又需求工具能自动能生成大量常用的SQL的应用。详细可参考官方文档。从最近一段时间的使用过程中,感觉从开发效率、维护性、易用性,都比较优秀。

4.使用BeetlSQL读写数据库

本示例依然是基于上一篇文章的示例功能,从源数据库中读取test_user表的数据,然后经过处理,再写入到目标数据库的test_user表中。只是不是使用Spring Batch内置的JdbcCursorItemReaderJdbcBatchItemWriter,改为使用BeetlSQL进行读写。完整示例可参考代码

4.1 引入BeetlSQL依赖

BeetlSQL提供了Spring Bootstarter来实现自动配置,在pom.xml添加以下依赖即可:

  1. <!-- orm框架: beetlsql -->
  2. <dependency>
  3. <groupId>com.ibeetl</groupId>
  4. <artifactId>beetl-framework-starter</artifactId>
  5. <version>1.1.77.RELEASE</version>
  6. </dependency>

添加后,会添加两个依赖,分别是beetl-2.9.9,和beetlsql-2.11.2,如下图:

便捷的数据读写-spring batch(5)结合beetlSql进行数据读写 - 图1

4.2 编写多数据源的dao

4.2.1 添加配置文件

由于我们是使用多数据源进行读写,关于多数据源的配置,上一篇文章.html)已经进行了描述,此处不再说明。BeetlSQL对多数据源有较好的支持,只需要简单的配置即可。具体可参考官方文档。下面对此做简单说明。在application.properties文件中添加以下配置:

  1. #beetlsql配置
  2. #默认/sql,可不设置
  3. #beetlsql.sqlPath=/sql
  4. #dao文件的后缀
  5. beetlsql.daoSuffix=Repository
  6. #自动加载和查找的dao文件所在包
  7. beetlsql.basePackage=me.mason.springbatch
  8. #默认org.beetl.sql.core.db.MySqlStyle,可不设置
  9. #beetlsql.dbStyle=org.beetl.sql.core.db.MySqlStyle
  10. #多数据源dao文件所在位置,以包区分读写数据源
  11. beetlsql.ds.datasource.basePackage=me.mason.springbatch.dao.local
  12. beetlsql.ds.originDatasource.basePackage=me.mason.springbatch.dao.origin
  13. beetlsql.ds.targetDatasource.basePackage=me.mason.springbatch.dao.target
  14. beetlsql.mutiple.datasource=datasource,originDatasource,targetDatasource

说明:

  • beetlsql.daoSuffix表示Dao文件的后缀,BeetlSql会根据此后缀加载Dao
  • beetlsql.mutiple.datasource数据源名与数据源的配置一致。

4.2.2 添加dao文件

添加上述配置后,由于使用包名来区分读写数据源的Dao,因此需要创建对应的包,分别在工程中me.mason.springbatch下创建dao.local,dao.origindao.target三个包,分别存放需要读取的三个数据源对应的Dao。在本示例中,仅使用源数据库和目标数据库,因此,仅需要在dao.origin中添加OriginUserRepository进行源数据读操作,在dao.target中添加TargetUserRepository进行写操作即可。(注意,由于配置中指定是使用后缀Repository,因此此处的类名需要使用它作为后缀)。如下:

OriginUserRepository.java

  1. @Repository
  2. public interface OriginUserRepository extends BaseMapper<User> {
  3. List<User> getOriginUser(Map<String,Object> params);
  4. }

TargetUserRepository.java

  1. @Repository
  2. public interface TargetUserRepository extends BaseMapper<User> {
  3. }

说明:

  • 使用注解@Repository标注是数据读写dao
  • 继承BaseMapper,以使用BeetlSQL内置的增删改查能力
  • 对于OriginUserRepositorygetOriginUser是自定义的数据读取操作,此操作具体实现是使用写在sql/user.md中的sql语句(sql语句在后面将说明)。

4.3 编写sql文件

根据BeetlSql的功能,开发者可以自定义sql语句进行数据库操作,而sql语句是以markdown文件的形式保存,支持beetl的语法,支持参数化语句,逻辑判断等操作,这就有点像Mybatis中的xml语句,但显示和修改会更友好。具体sql文件更详细的使用功能,读者可到官文档查阅。

本示例中,上面OriginUserRepository自定义了getOriginUser函数,以此函数名即可编写sql进行读数据(当然,由于此sql比较简单,完全可以不写到markdown文件也可以实现,此处仅做示例展示此功能而已)。如下所示:

  1. getOriginUser
  2. ===
  3. * 查询user数据
  4. select * from test_user

接着就是写数据需要用到的sqlinsertUser是这条语句的名称,在插入数据时会使用到:

  1. insertUser
  2. ===
  3. * 插入数据
  4. insert into test_user(id,name,phone,title,email,gender,date_of_birth
  5. ,sys_create_time,sys_create_user,sys_update_time,sys_update_user)
  6. values (#id#,#name#,#phone#,#title#,#email#,#gender#,#dateOfBirth#
  7. ,#sysCreateTime#,#sysCreateUser#,#sysUpdateTime#,#sysUpdateUser#)

由上面的sql可见,这跟我们平时写sql没有区别,其中以##包括的是参数,即实体User的字段。

4.4 编写读组件ItemReader

经过上面的配置和添加的dao类,我们已经有了读和写数据的能力。使用OriginUserRepository,可以编写Spring BatchItemReader。读取数据后在内存中,在read()时返回。如下所示:

  1. @Slf4j
  2. public class UserItemReader implements ItemReader<User> {
  3. protected List<User> items;
  4. protected Map<String,Object> params;
  5. @Autowired
  6. private OriginUserRepository originUserRepository;
  7. @Override
  8. public User read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
  9. if(Objects.isNull(items)){
  10. //使用beetlsql的md执行sql
  11. items = originUserRepository.getOriginUser(params);
  12. if(items.size() > 0){
  13. return items.remove(0);
  14. }
  15. }else{
  16. if (!items.isEmpty()) {
  17. return items.remove(0);
  18. }
  19. }
  20. return null;
  21. }
  22. public Map<String, Object> getParams() {
  23. return params;
  24. }
  25. public void setParams(Map<String, Object> params) {
  26. this.params = params;
  27. }
  28. }

说明:

  • originUserRepository.getOriginUser执行的是user.mdgetOriginUser查询语句。
  • 查询后数据保存在List<User>,在read()时返回其中的数据。全部返回后会返回null,以表示结束。
  • BeetlSql支持使用Map来传参,本示例暂时没有使用。

4.5 编写写组件ItemWriter

读取到数据后,使用TargetUserRepository,执行上面编写的insertUser来插入数据。如下所示

  1. public class UserItemWriter implements ItemWriter<User> {
  2. @Autowired
  3. private TargetUserRepository targetUserRepository;
  4. @Override
  5. public void write(List<? extends User> items) throws Exception {
  6. targetUserRepository.getSQLManager().updateBatch("user.insertUser",items);
  7. }
  8. }

说明:

  • 使用SQLManagerupdateBatch批量写数据
  • user.insertUser中,usermarkdown文件的名称,也是实体名称,insertUser是写的sql语句。

4.6 组装完整任务

新建BeetlsqlBatchConfig.java,作为Spring Batch的任务配置

4.6.1 注入读写组件

使用上面已写的ReaderWriter,使用Bean注解加入,如下:

  1. @Bean
  2. public ItemReader beetlsqlItemReader() {
  3. UserItemReader userItemReader = new UserItemReader();
  4. //设置参数,当前示例可不设置参数
  5. Map<String,Object> params = CollUtil.newHashMap();
  6. userItemReader.setParams(params);
  7. return userItemReader;
  8. }
  9. @Bean
  10. public ItemWriter beetlsqlWriter() {
  11. return new UserItemWriter();
  12. }

4.6.2 组装任务

使用StepJob,实现完整的任务配置,如下:

  1. @Bean
  2. public Job beetlsqlJob(Step beetlsqlStep,JobExecutionListener beetlsqlListener){
  3. String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
  4. return jobBuilderFactory.get(funcName)
  5. .listener(beetlsqlListener)
  6. .flow(beetlsqlStep)
  7. .end().build();
  8. }
  9. @Bean
  10. public Step beetlsqlStep(ItemReader beetlsqlItemReader ,ItemProcessor beetlsqlProcessor
  11. ,ItemWriter beetlsqlWriter){
  12. String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
  13. return stepBuilderFactory.get(funcName)
  14. .<User,User>chunk(10)
  15. .reader(beetlsqlItemReader)
  16. .processor(beetlsqlProcessor)
  17. .writer(beetlsqlWriter)
  18. .build();
  19. }

4.7 测试

参考上一文章的Db2DbJobTest,编写BeetlsqlJobTest文件。测试前先把目标数据库中的test_user库清空,然后启动Job进行测试,结果跟Db2DbJobTest是一致的。日志输出如下:

便捷的数据读写-spring batch(5)结合beetlSql进行数据读写 - 图2

在上面使用BeetlSql过程中,可见有几个好处:

  • 不需要自己写RowMapper对数据进行映射,更简单。
  • sql语句写在markdown文件中,修改更灵活。
  • sql语句执行输出在日志中,更清晰。

5.总结

本文使用Spring Batch对数据库到数据库的示例做了一个改动,使用BeetlSQL进行多数据源的读写操作,以实现更简单、更灵活、更清晰的数据库读写。希望对想要用Spring Batch同时又想了解BeetlSql的读者有帮助。