Java SpringBoot JTA
首先,到底什么是分布式事务呢,比如在执行一个业务逻辑的时候有两步分别操作A数据源和B数据源,当在A数据源执行数据更改后,在B数据源执行时出现运行时异常,那么必须要让B数据源的操作回滚,并回滚对A数据源的操作;这种情况在支付业务时常常出现;比如买票业务在最后支付失败,那之前的操作必须全部回滚,如果之前的操作分布在多个数据源中,那么这就是典型的分布式事务回滚;
了解了什么是分布式事务,那分布式事务在java的解决方案就是JTA(即Java Transaction API);SpringBoot官方提供了 Atomikos or Bitronix的解决思路;
其实,大多数情况下很多公司是使用消息队列的方式实现分布式事务。
这里重点讲解SpringBoot环境下,整合 Atomikos +mysql+mybatis+tomcat/jetty;

一、项目依赖

pom.xml中添加atomikos的SpringBoot相关依赖:

  1. <!--分布式事务-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-jta-atomikos</artifactId>
  5. </dependency>

点进去会发现里面整合好了:transactions-jms、transactions-jta、transactions-jdbc、javax.transaction-api

二、把数据源的相关配置项单独提炼到一个application.yml中:

注意:

  1. 这回spring.datasource.typecom.alibaba.druid.pool.xa.DruidXADataSource;
  2. spring.jta.transaction-manager-id的值在电脑中是唯一的,这个详细请阅读官方文档;

SpringBoot 分布式事务的解决方案(JTA Atomic 多数据源) - 图1
完整的yml文件如下:

  1. spring:
  2. datasource:
  3. type: com.alibaba.druid.pool.xa.DruidXADataSource
  4. druid:
  5. systemDB:
  6. name: systemDB
  7. url: jdbc:mysql://localhost:3306/springboot-mybatis?useUnicode=true&characterEncoding=utf-8
  8. username: root
  9. password: root
  10. # 下面为连接池的补充设置,应用到上面所有数据源中
  11. # 初始化大小,最小,最大
  12. initialSize: 5
  13. minIdle: 5
  14. maxActive: 20
  15. # 配置获取连接等待超时的时间
  16. maxWait: 60000
  17. # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
  18. timeBetweenEvictionRunsMillis: 60000
  19. # 配置一个连接在池中最小生存的时间,单位是毫秒
  20. minEvictableIdleTimeMillis: 30
  21. validationQuery: SELECT 1
  22. validationQueryTimeout: 10000
  23. testWhileIdle: true
  24. testOnBorrow: false
  25. testOnReturn: false
  26. # 打开PSCache,并且指定每个连接上PSCache的大小
  27. poolPreparedStatements: true
  28. maxPoolPreparedStatementPerConnectionSize: 20
  29. filters: stat,wall
  30. # 通过connectProperties属性来打开mergeSql功能;慢SQL记录
  31. connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
  32. # 合并多个DruidDataSource的监控数据
  33. useGlobalDataSourceStat: true
  34. businessDB:
  35. name: businessDB
  36. url: jdbc:mysql://localhost:3306/springboot-mybatis2?useUnicode=true&characterEncoding=utf-8
  37. username: root
  38. password: root
  39. # 下面为连接池的补充设置,应用到上面所有数据源中
  40. # 初始化大小,最小,最大
  41. initialSize: 5
  42. minIdle: 5
  43. maxActive: 20
  44. # 配置获取连接等待超时的时间
  45. maxWait: 60000
  46. # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
  47. timeBetweenEvictionRunsMillis: 60000
  48. # 配置一个连接在池中最小生存的时间,单位是毫秒
  49. minEvictableIdleTimeMillis: 30
  50. validationQuery: SELECT 1
  51. validationQueryTimeout: 10000
  52. testWhileIdle: true
  53. testOnBorrow: false
  54. testOnReturn: false
  55. # 打开PSCache,并且指定每个连接上PSCache的大小
  56. poolPreparedStatements: true
  57. maxPoolPreparedStatementPerConnectionSize: 20
  58. filters: stat,wall
  59. # 通过connectProperties属性来打开mergeSql功能;慢SQL记录
  60. connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
  61. # 合并多个DruidDataSource的监控数据
  62. useGlobalDataSourceStat: true
  63. #jta相关参数配置
  64. jta:
  65. log-dir: classpath:tx-logs
  66. transaction-manager-id: txManager

三、在DruidConfig.java中实现多个数据源的注册;分布式事务管理器的注册;druid的注册;
  1. package com.zjt.config;
  2. import com.alibaba.druid.filter.stat.StatFilter;
  3. import com.alibaba.druid.support.http.StatViewServlet;
  4. import com.alibaba.druid.support.http.WebStatFilter;
  5. import com.alibaba.druid.wall.WallConfig;
  6. import com.alibaba.druid.wall.WallFilter;
  7. import com.atomikos.icatch.jta.UserTransactionImp;
  8. import com.atomikos.icatch.jta.UserTransactionManager;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
  11. import org.springframework.boot.web.servlet.FilterRegistrationBean;
  12. import org.springframework.boot.web.servlet.ServletRegistrationBean;
  13. import org.springframework.context.annotation.Bean;
  14. import org.springframework.context.annotation.Configuration;
  15. import org.springframework.context.annotation.Primary;
  16. import org.springframework.core.env.Environment;
  17. import org.springframework.transaction.jta.JtaTransactionManager;
  18. import javax.sql.DataSource;
  19. import javax.transaction.UserTransaction;
  20. import java.util.Properties;
  21. /**
  22. * Druid配置
  23. *
  24. *
  25. */
  26. @Configuration
  27. public class DruidConfig {
  28. @Bean(name = "systemDataSource")
  29. @Primary
  30. @Autowired
  31. public DataSource systemDataSource(Environment env) {
  32. AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
  33. Properties prop = build(env, "spring.datasource.druid.systemDB.");
  34. ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
  35. ds.setUniqueResourceName("systemDB");
  36. ds.setPoolSize(5);
  37. ds.setXaProperties(prop);
  38. return ds;
  39. }
  40. @Autowired
  41. @Bean(name = "businessDataSource")
  42. public AtomikosDataSourceBean businessDataSource(Environment env) {
  43. AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
  44. Properties prop = build(env, "spring.datasource.druid.businessDB.");
  45. ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
  46. ds.setUniqueResourceName("businessDB");
  47. ds.setPoolSize(5);
  48. ds.setXaProperties(prop);
  49. return ds;
  50. }
  51. /**
  52. * 注入事物管理器
  53. * @return
  54. */
  55. @Bean(name = "xatx")
  56. public JtaTransactionManager regTransactionManager () {
  57. UserTransactionManager userTransactionManager = new UserTransactionManager();
  58. UserTransaction userTransaction = new UserTransactionImp();
  59. return new JtaTransactionManager(userTransaction, userTransactionManager);
  60. }
  61. private Properties build(Environment env, String prefix) {
  62. Properties prop = new Properties();
  63. prop.put("url", env.getProperty(prefix + "url"));
  64. prop.put("username", env.getProperty(prefix + "username"));
  65. prop.put("password", env.getProperty(prefix + "password"));
  66. prop.put("driverClassName", env.getProperty(prefix + "driverClassName", ""));
  67. prop.put("initialSize", env.getProperty(prefix + "initialSize", Integer.class));
  68. prop.put("maxActive", env.getProperty(prefix + "maxActive", Integer.class));
  69. prop.put("minIdle", env.getProperty(prefix + "minIdle", Integer.class));
  70. prop.put("maxWait", env.getProperty(prefix + "maxWait", Integer.class));
  71. prop.put("poolPreparedStatements", env.getProperty(prefix + "poolPreparedStatements", Boolean.class));
  72. prop.put("maxPoolPreparedStatementPerConnectionSize",
  73. env.getProperty(prefix + "maxPoolPreparedStatementPerConnectionSize", Integer.class));
  74. prop.put("maxPoolPreparedStatementPerConnectionSize",
  75. env.getProperty(prefix + "maxPoolPreparedStatementPerConnectionSize", Integer.class));
  76. prop.put("validationQuery", env.getProperty(prefix + "validationQuery"));
  77. prop.put("validationQueryTimeout", env.getProperty(prefix + "validationQueryTimeout", Integer.class));
  78. prop.put("testOnBorrow", env.getProperty(prefix + "testOnBorrow", Boolean.class));
  79. prop.put("testOnReturn", env.getProperty(prefix + "testOnReturn", Boolean.class));
  80. prop.put("testWhileIdle", env.getProperty(prefix + "testWhileIdle", Boolean.class));
  81. prop.put("timeBetweenEvictionRunsMillis",
  82. env.getProperty(prefix + "timeBetweenEvictionRunsMillis", Integer.class));
  83. prop.put("minEvictableIdleTimeMillis", env.getProperty(prefix + "minEvictableIdleTimeMillis", Integer.class));
  84. prop.put("filters", env.getProperty(prefix + "filters"));
  85. return prop;
  86. }
  87. @Bean
  88. public ServletRegistrationBean druidServlet() {
  89. ServletRegistrationBean servletRegistrationBean = new ServletRegistrationBean(new StatViewServlet(), "/druid/*");
  90. //控制台管理用户,加入下面2行 进入druid后台就需要登录
  91. //servletRegistrationBean.addInitParameter("loginUsername", "admin");
  92. //servletRegistrationBean.addInitParameter("loginPassword", "admin");
  93. return servletRegistrationBean;
  94. }
  95. @Bean
  96. public FilterRegistrationBean filterRegistrationBean() {
  97. FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean();
  98. filterRegistrationBean.setFilter(new WebStatFilter());
  99. filterRegistrationBean.addUrlPatterns("/*");
  100. filterRegistrationBean.addInitParameter("exclusions", "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*");
  101. filterRegistrationBean.addInitParameter("profileEnable", "true");
  102. return filterRegistrationBean;
  103. }
  104. @Bean
  105. public StatFilter statFilter(){
  106. StatFilter statFilter = new StatFilter();
  107. statFilter.setLogSlowSql(true); //slowSqlMillis用来配置SQL慢的标准,执行时间超过slowSqlMillis的就是慢。
  108. statFilter.setMergeSql(true); //SQL合并配置
  109. statFilter.setSlowSqlMillis(1000);//slowSqlMillis的缺省值为3000,也就是3秒。
  110. return statFilter;
  111. }
  112. @Bean
  113. public WallFilter wallFilter(){
  114. WallFilter wallFilter = new WallFilter();
  115. //允许执行多条SQL
  116. WallConfig config = new WallConfig();
  117. config.setMultiStatementAllow(true);
  118. wallFilter.setConfig(config);
  119. return wallFilter;
  120. }
  121. }

四、分别配置每个数据源对应的sqlSessionFactory,以及MapperScan扫描的包:

MybatisDatasourceConfig.java

  1. package com.zjt.config;
  2. import com.zjt.util.MyMapper;
  3. import org.apache.ibatis.session.SqlSessionFactory;
  4. import org.mybatis.spring.SqlSessionFactoryBean;
  5. import org.mybatis.spring.SqlSessionTemplate;
  6. import org.mybatis.spring.annotation.MapperScan;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.beans.factory.annotation.Qualifier;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
  12. import org.springframework.core.io.support.ResourcePatternResolver;
  13. import javax.sql.DataSource;
  14. /**
  15. *
  16. * @description
  17. */
  18. @Configuration
  19. // 精确到 mapper 目录,以便跟其他数据源隔离
  20. @MapperScan(basePackages = "com.zjt.mapper", markerInterface = MyMapper.class, sqlSessionFactoryRef = "sqlSessionFactory")
  21. public class MybatisDatasourceConfig {
  22. @Autowired
  23. @Qualifier("systemDataSource")
  24. private DataSource ds;
  25. @Bean
  26. public SqlSessionFactory sqlSessionFactory() throws Exception {
  27. SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
  28. factoryBean.setDataSource(ds);
  29. //指定mapper xml目录
  30. ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
  31. factoryBean.setMapperLocations(resolver.getResources("classpath:mapper/*.xml"));
  32. return factoryBean.getObject();
  33. }
  34. @Bean
  35. public SqlSessionTemplate sqlSessionTemplate() throws Exception {
  36. SqlSessionTemplate template = new SqlSessionTemplate(sqlSessionFactory()); // 使用上面配置的Factory
  37. return template;
  38. }
  39. //关于事务管理器,不管是JPA还是JDBC等都实现自接口 PlatformTransactionManager
  40. // 如果你添加的是 spring-boot-starter-jdbc 依赖,框架会默认注入 DataSourceTransactionManager 实例。
  41. //在Spring容器中,我们手工注解@Bean 将被优先加载,框架不会重新实例化其他的 PlatformTransactionManager 实现类。
  42. /*@Bean(name = "transactionManager")
  43. @Primary
  44. public DataSourceTransactionManager masterTransactionManager() {
  45. //MyBatis自动参与到spring事务管理中,无需额外配置,只要org.mybatis.spring.SqlSessionFactoryBean引用的数据源
  46. // 与DataSourceTransactionManager引用的数据源一致即可,否则事务管理会不起作用。
  47. return new DataSourceTransactionManager(ds);
  48. }*/
  49. }

MybatisDatasource2Config.java

  1. package com.zjt.config;
  2. import com.zjt.util.MyMapper;
  3. import org.apache.ibatis.session.SqlSessionFactory;
  4. import org.mybatis.spring.SqlSessionFactoryBean;
  5. import org.mybatis.spring.SqlSessionTemplate;
  6. import org.mybatis.spring.annotation.MapperScan;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.beans.factory.annotation.Qualifier;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
  12. import org.springframework.core.io.support.ResourcePatternResolver;
  13. import javax.sql.DataSource;
  14. /**
  15. *
  16. * @description
  17. */
  18. @Configuration
  19. // 精确到 mapper 目录,以便跟其他数据源隔离
  20. @MapperScan(basePackages = "com.zjt.mapper2", markerInterface = MyMapper.class, sqlSessionFactoryRef = "sqlSessionFactory2")
  21. public class MybatisDatasource2Config {
  22. @Autowired
  23. @Qualifier("businessDataSource")
  24. private DataSource ds;
  25. @Bean
  26. public SqlSessionFactory sqlSessionFactory2() throws Exception {
  27. SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
  28. factoryBean.setDataSource(ds);
  29. //指定mapper xml目录
  30. ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
  31. factoryBean.setMapperLocations(resolver.getResources("classpath:mapper2/*.xml"));
  32. return factoryBean.getObject();
  33. }
  34. @Bean
  35. public SqlSessionTemplate sqlSessionTemplate2() throws Exception {
  36. SqlSessionTemplate template = new SqlSessionTemplate(sqlSessionFactory2()); // 使用上面配置的Factory
  37. return template;
  38. }
  39. //关于事务管理器,不管是JPA还是JDBC等都实现自接口 PlatformTransactionManager
  40. // 如果你添加的是 spring-boot-starter-jdbc 依赖,框架会默认注入 DataSourceTransactionManager 实例。
  41. //在Spring容器中,我们手工注解@Bean 将被优先加载,框架不会重新实例化其他的 PlatformTransactionManager 实现类。
  42. /*@Bean(name = "transactionManager2")
  43. @Primary
  44. public DataSourceTransactionManager masterTransactionManager() {
  45. //MyBatis自动参与到spring事务管理中,无需额外配置,只要org.mybatis.spring.SqlSessionFactoryBean引用的数据源
  46. // 与DataSourceTransactionManager引用的数据源一致即可,否则事务管理会不起作用。
  47. return new DataSourceTransactionManager(ds);
  48. }*/
  49. }

由于本例中只使用一个事务管理器:xatx,故就不在使用TxAdviceInterceptor.java和TxAdvice2Interceptor.java中配置的事务管理器了;有需求的可以自己配置其他的事务管理器;(见DruidConfig.java中查看)

五、新建分布式业务测试接口JtaTestService.java和实现类JtaTestServiceImpl.java

其实就是一个很简单的test01()方法,在该方法中分别先后调用classService.saveOrUpdateTClass(tClass);teacherService.saveOrUpdateTeacher(teacher);
实现先后操作两个数据源:然后可以自己debug跟踪事务的提交时机,此外,也可以在在两个方法全执行结束之后,手动制造一个运行时异常,来检查分布式事务是否全部回滚;
注意:
在实现类的方法中使用的是:

  1. @Transactional(transactionManager = "xatx", propagation = Propagation.REQUIRED, rollbackFor = { java.lang.RuntimeException.class })

从而指定了使用哪个事务管理器,事务隔离级别(一般都用这个默认的),回滚的条件(一般可以使用Exception),这三个可以自己根据业务实际修改;

  1. package com.zjt.service3;
  2. import java.util.Map;
  3. public interface JtaTestService {
  4. public Map<String,Object> test01();
  5. }
  1. package com.zjt.service3.impl;
  2. import com.zjt.entity.TClass;
  3. import com.zjt.entity.Teacher;
  4. import com.zjt.service.TClassService;
  5. import com.zjt.service2.TeacherService;
  6. import com.zjt.service3.JtaTestService;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.beans.factory.annotation.Qualifier;
  9. import org.springframework.stereotype.Service;
  10. import org.springframework.transaction.annotation.Propagation;
  11. import org.springframework.transaction.annotation.Transactional;
  12. import java.util.LinkedHashMap;
  13. import java.util.Map;
  14. @Service("jtaTestServiceImpl")
  15. public class JtaTestServiceImpl implements JtaTestService{
  16. @Autowired
  17. @Qualifier("teacherServiceImpl")
  18. private TeacherService teacherService;
  19. @Autowired
  20. @Qualifier("tclassServiceImpl")
  21. private TClassService tclassService;
  22. @Override
  23. @Transactional(transactionManager = "xatx", propagation = Propagation.REQUIRED, rollbackFor = { java.lang.RuntimeException.class })
  24. public Map<String, Object> test01() {
  25. LinkedHashMap<String,Object> resultMap=new LinkedHashMap<String,Object>();
  26. TClass tClass=new TClass();
  27. tClass.setName("8888");
  28. tclassService.saveOrUpdateTClass(tClass);
  29. Teacher teacher=new Teacher();
  30. teacher.setName("8888");
  31. teacherService.saveOrUpdateTeacher(teacher);
  32. System.out.println(1/0);
  33. resultMap.put("state","success");
  34. resultMap.put("message","分布式事务同步成功");
  35. return resultMap;
  36. }
  37. }

六、建立JtaTestContoller.java,接受一个来自前端的http请求,触发JtaTestService 的test01方法:
  1. package com.zjt.web;
  2. import com.zjt.service3.JtaTestService;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.beans.factory.annotation.Qualifier;
  5. import org.springframework.stereotype.Controller;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.ResponseBody;
  8. import java.util.LinkedHashMap;
  9. import java.util.Map;
  10. @Controller
  11. @RequestMapping("/jtaTest")
  12. public class JtaTestContoller {
  13. @Autowired
  14. @Qualifier("jtaTestServiceImpl")
  15. private JtaTestService taTestService;
  16. @ResponseBody
  17. @RequestMapping("/test01")
  18. public Map<String,Object> test01(){
  19. LinkedHashMap<String,Object> resultMap=new LinkedHashMap<String,Object>();
  20. try {
  21. return taTestService.test01();
  22. }catch (Exception e){
  23. resultMap.put("state","fail");
  24. resultMap.put("message","分布式事务同步失败");
  25. return resultMap;
  26. }
  27. }
  28. }

七、在test.ftl中增加一个按钮来测试;
  1. //分布式事务测试
  2. $("#JTATest").click(function(){
  3. $.ajax({
  4. type: "POST",
  5. url: "${basePath!}/jtaTest/test01",
  6. data: {} ,
  7. async: false,
  8. error: function (request) {
  9. layer.alert("与服务器连接失败/(ㄒoㄒ)/~~");
  10. return false;
  11. },
  12. success: function (data) {
  13. if (data.state == 'fail') {
  14. layer.alert(data.message);
  15. return false;
  16. }else if(data.state == 'success'){
  17. layer.alert(data.message);
  18. }
  19. }
  20. });
  21. });
  22. <button class="layui-btn" id="JTATest">同时向班级和老师表插入名为8888的班级和老师</button>

八、启动服务,验证结果:

SpringBoot 分布式事务的解决方案(JTA Atomic 多数据源) - 图2
点击这个按钮,跳转到controller:
SpringBoot 分布式事务的解决方案(JTA Atomic 多数据源) - 图3
当正常执行了sql语句之后,可以发现数据库并没有变化,因为整个方法的事务还没有走完,当走到1/0这步时:
SpringBoot 分布式事务的解决方案(JTA Atomic 多数据源) - 图4
抛出运行时异常,并被spring事务拦截器拦截,并捕获异常:
SpringBoot 分布式事务的解决方案(JTA Atomic 多数据源) - 图5
this.completeTransactionAfterThrowing(txInfo, var16);方法中会将事务全部回滚:

  1. 22:09:04.243 logback [http-nio-8080-exec-5] INFO c.a.i.imp.CompositeTransactionImp - rollback() done of transaction 192.168.1.103.tm0000400006

此时,当再次打开数据库验证,依旧没有变化,证明分布式事务配置成功。

九、后记:

本文源代码:https://github.com/zhaojiatao/springboot-zjt-chapter10-springboot-atomikos-mysql-mybatis-druid.git
代码在tomcat和jetty环境下均可完成事务回滚。在事务回滚时可能报一个Transactional not active的警告,大部分人认为这只是一个警告,可以忽略。