异步事务处理

实际开发中,我们后再某些操作中发mq,但是存在这样的情况,业务端已经接受到了mq 消息,但是去查查询数据库的时候,业务数据查不到。这是因为发送 mq 的方法在一个大事务里,mq 发送后,事务才会提交。这个时候可以注册一个事务同步处理器:

  1. // 注册事务同步处理
  2. 2 TransactionSynchronizationManager.registerSynchronization(
  3. new TransactionSynchronizationAdapter() {
  4. 3 @Override
  5. 4 public void afterCommit() {
  6. 5 // 事务提交完毕时,触发:funcB
  7. 6 funB();
  8. 7 }

在tms 开发中,流程的执行需要在主线程业务数据提交后执行,就使用了 TransactionSynchronizationAdapter

  1. public void autoHandle(ActionHandleParamVO actionHandleParamVO) {
  2. Map<String, OrderBusinessStateHistoryPO> result = new HashMap<>(BaseConstant.MAP_INITIAL_CAPACITY);
  3. // 注册事务同步处理,等待业务的事务提交,然后进行流程处理
  4. TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
  5. @Override
  6. public void afterCommit() {
  7. List<String> orderIds = actionHandleParamVO.getBusinessIds();
  8. Integer corpId = actionHandleParamVO.getCorpId();
  9. if (CollectionUtils.isEmpty(orderIds)) {
  10. return;
  11. }
  12. //这里用CountDownLatch各个订单流程异步执行,最后统一返回结果
  13. final CountDownLatch latch = new CountDownLatch(orderIds.size());
  14. List<OrderInfoPO> orderList = flowOrderInfoDao.findByCorpIdAndIdIn(corpId, orderIds);
  15. Map<String, OrderInfoPO> orderMap = orderList.stream().collect(Collectors.toMap(OrderInfoPO::getId, item -> item));
  16. for (String orderId : orderIds) {
  17. OrderInfoPO orderInfoPO = orderMap.get(orderId);
  18. String lockKey = getLockKey(orderInfoPO);
  19. if (StringUtils.isNotBlank(lockKey)) {
  20. logger.info("订单{}尝试获取分布式锁", orderId);
  21. redisService.redisLock(BusiRedisLockKeyEnum.ORDER_ACTION_HANDLE_KEY.getLockKeyPrefix(), orderId);
  22. }
  23. autoFlowExecutorService.execute(() -> {
  24. try {
  25. //订单原状态
  26. Long oldOrderStatus = orderInfoPO.getOrderStatus();
  27. OrderBusinessStateHistoryPO history = autoHandle(orderInfoPO, actionHandleParamVO.getActionParam()).getResult();
  28. //流程有执行时,处理业务流程执行的回调
  29. //if (!history.getNodeId().equals(oldOrderStatus)) {
  30. logger.info("订单{}执行流程回调", history.getOrderId());
  31. E6Wrapper<FlowNodeInfoVO> nodeWrapper = flowNodeInfoService.getCacheById(history.getNodeId());
  32. FlowNodeInfoVO nodeInfo = nodeWrapper.getResult();
  33. OperationEnum operationEnum = OperationEnum.getByOperationId(nodeInfo.getButtonId());
  34. ActionHandleParamVO callbackParamVO = ActionHandleParamInstance.newInstanceOrder(Lists.newArrayList(history.getOrderId()), history.getCorpId(), operationEnum, actionHandleParamVO.getActionParam());
  35. ActionHandleHelper.doFlowCallback(history, callbackParamVO);
  36. //}
  37. logger.info("订单{}流程自动执行完毕", orderId);
  38. } catch (Exception e) {
  39. logger.error("订单{}流程自动执行出错:{}", orderId, e);
  40. } finally {
  41. if (StringUtils.isNotBlank(lockKey)) {
  42. redisService.redisUnLock(BusiRedisLockKeyEnum.ORDER_ACTION_HANDLE_KEY.getLockKeyPrefix(), orderId);
  43. }
  44. latch.countDown();
  45. }
  46. });
  47. }
  48. try {
  49. logger.info("等待所有订单执行完流程...");
  50. latch.await();
  51. } catch (InterruptedException e) {
  52. logger.error("异步处理流程自动执行出错:", e);
  53. }
  54. logger.info("所有订单执行完流程,返回结果");
  55. }
  56. });
  57. }

事务失效场景

Spring 事务管理 - 图1

一、事务不生效

1.访问权限问题

众所周知,java的访问权限主要有四种:privatedefaultprotectedpublic,它们的权限从左到右,依次变大。
但如果在开发过程中,把有某些事务方法,定义了错误的访问权限,就会导致事务功能出问题,例如:

  1. @Service
  2. public class UserService {
  3. @Transactional
  4. private void add(UserModel userModel) {
  5. saveData(userModel);
  6. updateData(userModel);
  7. }
  8. }

可以看到add方法的访问权限被定义成了private,这样会导致事务失效,spring要求被代理方法必须是public的。
说白了,在AbstractFallbackTransactionAttributeSource类的computeTransactionAttribute方法中有个判断,如果目标方法不是public,则TransactionAttribute返回null,即不支持事务。

  1. protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
  2. // Don't allow no-public methods as required.
  3. if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
  4. return null;
  5. }
  6. // The method may be on an interface, but we need attributes from the target class.
  7. // If the target class is null, the method will be unchanged.
  8. Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
  9. // First try is the method in the target class.
  10. TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
  11. if (txAttr != null) {
  12. return txAttr;
  13. }
  14. // Second try is the transaction attribute on the target class.
  15. txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
  16. if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
  17. return txAttr;
  18. }
  19. if (specificMethod != method) {
  20. // Fallback is to look at the original method.
  21. txAttr = findTransactionAttribute(method);
  22. if (txAttr != null) {
  23. return txAttr;
  24. }
  25. // Last fallback is the class of the original method.
  26. txAttr = findTransactionAttribute(method.getDeclaringClass());
  27. if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
  28. return txAttr;
  29. }
  30. }
  31. return null;
  32. }

也就是说,如果自定义的事务方法(即目标方法),它的访问权限不是public,而是privatedefaultprotected的话,spring则不会提供事务功能。

2. 方法用final修饰

有时候,某个方法不想被子类重新,这时可以将该方法定义成final的。普通方法这样定义是没问题的,但如果将事务方法定义成final,例如:

  1. @Service
  2. public class UserService {
  3. @Transactional
  4. public final void add(UserModel userModel){
  5. saveData(userModel);
  6. updateData(userModel);
  7. }
  8. }

可以看到add方法被定义成了final的,这样会导致事务失效。
为什么?
如果看过spring事务的源码,可能会知道spring事务底层使用了aop,也就是通过jdk动态代理或者cglib,生成了代理类,在代理类中实现的事务功能。
但如果某个方法用final修饰了,那么在它的代理类中,就无法重写该方法,而添加事务功能。
注意:如果某个方法是static的,同样无法通过动态代理,变成事务方法。

3.方法内部调用

有时候需要在某个Service类的某个方法中,调用另外一个事务方法,比如:

  1. @Service
  2. public class UserService {
  3. @Autowired
  4. private UserMapper userMapper;
  5. @Transactional
  6. public void add(UserModel userModel) {
  7. userMapper.insertUser(userModel);
  8. updateStatus(userModel);
  9. }
  10. @Transactional
  11. public void updateStatus(UserModel userModel) {
  12. doSameThing();
  13. }
  14. }

看到在事务方法add中,直接调用事务方法updateStatus。从前面介绍的内容可以知道,updateStatus方法拥有事务的能力是因为spring aop生成代理了对象,但是这种方法直接调用了this对象的方法,所以updateStatus方法不会生成事务。
由此可见,在同一个类中的方法直接内部调用,会导致事务失效。
那么问题来了,如果有些场景,确实想在同一个类的某个方法中,调用它自己的另外一个方法,该怎么办呢?

3.1 新加一个Service方法

这个方法非常简单,只需要新加一个Service方法,把@Transactional注解加到新Service方法上,把需要事务执行的代码移到新方法中。具体代码如下:

  1. @Servcie
  2. public class ServiceA {
  3. @Autowired
  4. prvate ServiceB serviceB;
  5. public void save(User user) {
  6. queryData1();
  7. queryData2();
  8. serviceB.doSave(user);
  9. }
  10. }
  11. @Servcie
  12. public class ServiceB {
  13. @Transactional(rollbackFor=Exception.class)
  14. public void doSave(User user) {
  15. addData1();
  16. updateData2();
  17. }
  18. }

3.2 在该Service类中注入自己

如果不想再新加一个Service类,在该Service类中注入自己也是一种选择。具体代码如下:

  1. @Servcie
  2. public class ServiceA {
  3. @Autowired
  4. prvate ServiceA serviceA;
  5. public void save(User user) {
  6. queryData1();
  7. queryData2();
  8. serviceA.doSave(user);
  9. }
  10. @Transactional(rollbackFor=Exception.class)
  11. public void doSave(User user) {
  12. addData1();
  13. updateData2();
  14. }
  15. }

3.3 通过AopContent

在该Service类中使用AopContext.currentProxy()获取代理对象
上面的方法2确实可以解决问题,但是代码看起来并不直观,还可以通过在该Service类中使用AOPProxy获取代理对象,实现相同的功能。具体代码如下:

  1. @Servcie
  2. public class ServiceA {
  3. public void save(User user) {
  4. queryData1();
  5. queryData2();
  6. ((ServiceA)AopContext.currentProxy()).doSave(user);
  7. }
  8. @Transactional(rollbackFor=Exception.class)
  9. public void doSave(User user) {
  10. addData1();
  11. updateData2();
  12. }
  13. }

4.未被spring管理

在平时开发过程中,有个细节很容易被忽略。即使用spring事务的前提是:对象要被spring管理,需要创建bean实例。
通常情况下,通过@Controller@Service@Component@Repository等注解,可以自动实现bean实例化和依赖注入的功能。
如果有一天,开发了一个Service类,但忘了加@Service注解,比如:

  1. //@Service
  2. public class UserService {
  3. @Transactional
  4. public void add(UserModel userModel) {
  5. saveData(userModel);
  6. updateData(userModel);
  7. }
  8. }

从上面的例子,可以看到UserService类没有加@Service注解,那么该类不会交给spring管理,所以它的add方法也不会生成事务。

5.多线程调用

在实际项目开发中,多线程的使用场景还是挺多的。如果spring事务用在多线程场景中,会有问题吗?

  1. @Slf4j
  2. @Service
  3. public class UserService {
  4. @Autowired
  5. private UserMapper userMapper;
  6. @Autowired
  7. private RoleService roleService;
  8. @Transactional
  9. public void add(UserModel userModel) throws Exception {
  10. userMapper.insertUser(userModel);
  11. new Thread(() -> {
  12. roleService.doOtherThing();
  13. }).start();
  14. }
  15. }
  16. @Service
  17. public class RoleService {
  18. @Transactional
  19. public void doOtherThing() {
  20. System.out.println("保存role表数据");
  21. }
  22. }

从上面的例子中,可以看到事务方法add中,调用了事务方法doOtherThing,但是事务方法doOtherThing是在另外一个线程中调用的。
这样会导致两个方法不在同一个线程中,获取到的数据库连接不一样,从而是两个不同的事务。如果想doOtherThing方法中抛了异常,add方法也回滚是不可能的。
如果看过spring事务源码的朋友,可能会知道spring的事务是通过数据库连接来实现的。当前线程中保存了一个map,key是数据源,value是数据库连接。

  1. private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");

说的同一个事务,其实是指同一个数据库连接,只有拥有同一个数据库连接才能同时提交和回滚。如果在不同的线程,拿到的数据库连接肯定是不一样的,所以是不同的事务。

6.表不支持事务

周所周知,在mysql5之前,默认的数据库引擎是myisam。
它的好处就不用多说了:索引文件和数据文件是分开存储的,对于查多写少的单表操作,性能比innodb更好。
有些老项目中,可能还在用它。
在创建表的时候,只需要把ENGINE参数设置成MyISAM即可:

  1. CREATE TABLE `category` (
  2. `id` bigint NOT NULL AUTO_INCREMENT,
  3. `one_category` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL,
  4. `two_category` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL,
  5. `three_category` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL,
  6. `four_category` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL,
  7. PRIMARY KEY (`id`)
  8. ) ENGINE=MyISAM AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin

myisam好用,但有个很致命的问题是:不支持事务。
如果只是单表操作还好,不会出现太大的问题。但如果需要跨多张表操作,由于其不支持事务,数据极有可能会出现不完整的情况。
此外,myisam还不支持行锁和外键。
所以在实际业务场景中,myisam使用的并不多。在mysql5以后,myisam已经逐渐退出了历史的舞台,取而代之的是innodb。
有时候在开发的过程中,发现某张表的事务一直都没有生效,那不一定是spring事务的锅,最好确认一下使用的那张表,是否支持事务。

7.未开启事务

有时候,事务没有生效的根本原因是没有开启事务。
开启事务不是一个项目中,最基本的功能吗?
为什么还会没有开启事务?
没错,如果项目已经搭建好了,事务功能肯定是有的。
但如果是在搭建项目demo的时候,只有一张表,而这张表的事务没有生效。那么会是什么原因造成的呢?
当然原因有很多,但没有开启事务,这个原因极其容易被忽略。
如果使用的是springboot项目,那么很幸运。因为springboot通过DataSourceTransactionManagerAutoConfiguration类,已经默认开启了事务。
要做的事情很简单,只需要配置spring.datasource相关参数即可。
但如果使用的还是传统的spring项目,则需要在applicationContext.xml文件中,手动配置事务相关参数。如果忘了配置,事务肯定是不会生效的。
具体配置如下信息:

  1. <!-- 配置事务管理器 -->
  2. <bean class="org.springframework.jdbc.datasource.DataSourceTransactionManager" id="transactionManager">
  3. <property name="dataSource" ref="dataSource"></property>
  4. </bean>
  5. <tx:advice id="advice" transaction-manager="transactionManager">
  6. <tx:attributes>
  7. <tx:method name="*" propagation="REQUIRED"/>
  8. </tx:attributes>
  9. </tx:advice>
  10. <!-- 用切点把事务切进去 -->
  11. <aop:config>
  12. <aop:pointcut expression="execution(* com.susan.*.*(..))" id="pointcut"/>
  13. <aop:advisor advice-ref="advice" pointcut-ref="pointcut"/>
  14. </aop:config>

默默的说一句,如果在pointcut标签中的切入点匹配规则,配错了的话,有些类的事务也不会生效。

二、事务不回滚

1.错误的传播特性

其实,在使用@Transactional注解时,是可以指定propagation参数的。
该参数的作用是指定事务的传播特性,spring目前支持7种传播特性:

  • REQUIRED 如果当前上下文中存在事务,那么加入该事务,如果不存在事务,创建一个事务,这是默认的传播属性值。
  • SUPPORTS 如果当前上下文存在事务,则支持事务加入事务,如果不存在事务,则使用非事务的方式执行。
  • MANDATORY 如果当前上下文中存在事务,否则抛出异常。
  • REQUIRES_NEW 每次都会新建一个事务,并且同时将上下文中的事务挂起,执行当前新建事务完成以后,上下文事务恢复再执行。
  • NOT_SUPPORTED 如果当前上下文中存在事务,则挂起当前事务,然后新的方法在没有事务的环境中执行。
  • NEVER 如果当前上下文中存在事务,则抛出异常,否则在无事务环境上执行代码。
  • NESTED 如果当前上下文中存在事务,则嵌套事务执行,如果不存在事务,则新建事务。

如果在手动设置propagation参数的时候,把传播特性设置错了,比如:

  1. @Service
  2. public class UserService {
  3. @Transactional(propagation = Propagation.NEVER)
  4. public void add(UserModel userModel) {
  5. saveData(userModel);
  6. updateData(userModel);
  7. }
  8. }

可以看到add方法的事务传播特性定义成了Propagation.NEVER,这种类型的传播特性不支持事务,如果有事务则会抛异常。
目前只有这三种传播特性才会创建新事务:REQUIREDREQUIRES_NEWNESTED

2.自己吞了异常

事务不会回滚,最常见的问题是:开发者在代码中手动try…catch了异常。比如:

  1. @Slf4j
  2. @Service
  3. public class UserService {
  4. @Transactional
  5. public void add(UserModel userModel) {
  6. try {
  7. saveData(userModel);
  8. updateData(userModel);
  9. } catch (Exception e) {
  10. log.error(e.getMessage(), e);
  11. }
  12. }
  13. }

这种情况下spring事务当然不会回滚,因为开发者自己捕获了异常,又没有手动抛出,换句话说就是把异常吞掉了。
如果想要spring事务能够正常回滚,必须抛出它能够处理的异常。如果没有抛异常,则spring认为程序是正常的。

3.手动抛了别的异常

即使开发者没有手动捕获异常,但如果抛的异常不正确,spring事务也不会回滚。

  1. @Slf4j
  2. @Service
  3. public class UserService {
  4. @Transactional
  5. public void add(UserModel userModel) throws Exception {
  6. try {
  7. saveData(userModel);
  8. updateData(userModel);
  9. } catch (Exception e) {
  10. log.error(e.getMessage(), e);
  11. throw new Exception(e);
  12. }
  13. }
  14. }

上面的这种情况,开发人员自己捕获了异常,又手动抛出了异常:Exception,事务同样不会回滚。
因为spring事务,默认情况下只会回滚RuntimeException(运行时异常)和Error(错误),对于普通的Exception(非运行时异常),它不会回滚。

4.自定义了回滚异常

在使用@Transactional注解声明事务时,有时想自定义回滚的异常,spring也是支持的。可以通过设置rollbackFor参数,来完成这个功能。
但如果这个参数的值设置错了,就会引出一些莫名其妙的问题,例如:

  1. @Slf4j
  2. @Service
  3. public class UserService {
  4. @Transactional(rollbackFor = BusinessException.class)
  5. public void add(UserModel userModel) throws Exception {
  6. saveData(userModel);
  7. updateData(userModel);
  8. }
  9. }

如果在执行上面这段代码,保存和更新数据时,程序报错了,抛了SqlExceptionDuplicateKeyException等异常。而BusinessException是自定义的异常,报错的异常不属于BusinessException,所以事务也不会回滚。
即使rollbackFor有默认值,但阿里巴巴开发者规范中,还是要求开发者重新指定该参数。
这是为什么呢?
因为如果使用默认值,一旦程序抛出了Exception,事务不会回滚,这会出现很大的bug。所以,建议一般情况下,将该参数设置成:ExceptionThrowable

5.嵌套事务回滚多了

  1. public class UserService {
  2. @Autowired
  3. private UserMapper userMapper;
  4. @Autowired
  5. private RoleService roleService;
  6. @Transactional
  7. public void add(UserModel userModel) throws Exception {
  8. userMapper.insertUser(userModel);
  9. roleService.doOtherThing();
  10. }
  11. }
  12. @Service
  13. public class RoleService {
  14. @Transactional(propagation = Propagation.NESTED)
  15. public void doOtherThing() {
  16. System.out.println("保存role表数据");
  17. }
  18. }

这种情况使用了嵌套的内部事务,原本是希望调用roleService.doOtherThing方法时,如果出现了异常,只回滚doOtherThing方法里的内容,不回滚 userMapper.insertUser里的内容,即回滚保存点。。但事实是,insertUser也回滚了。
why?
因为doOtherThing方法出现了异常,没有手动捕获,会继续往上抛,到外层add方法的代理方法中捕获了异常。所以,这种情况是直接回滚了整个事务,不只回滚单个保存点。
怎么样才能只回滚保存点呢?

  1. @Slf4j
  2. @Service
  3. public class UserService {
  4. @Autowired
  5. private UserMapper userMapper;
  6. @Autowired
  7. private RoleService roleService;
  8. @Transactional
  9. public void add(UserModel userModel) throws Exception {
  10. userMapper.insertUser(userModel);
  11. try {
  12. roleService.doOtherThing();
  13. } catch (Exception e) {
  14. log.error(e.getMessage(), e);
  15. }
  16. }
  17. }

可以将内部嵌套事务放在try/catch中,并且不继续往上抛异常。这样就能保证,如果内部嵌套事务中出现异常,只回滚内部事务,而不影响外部事务。

三、其他

1.大事务问题

在使用Spring事务时,有个让人非常头疼的问题,就是大事务问题。
通常情况下,会在方法上@Transactional注解,填加事务功能,比如:

  1. @Service
  2. public class UserService {
  3. @Autowired
  4. private RoleService roleService;
  5. @Transactional
  6. public void add(UserModel userModel) throws Exception {
  7. query1();
  8. query2();
  9. query3();
  10. roleService.save(userModel);
  11. update(userModel);
  12. }
  13. }
  14. @Service
  15. public class RoleService {
  16. @Autowired
  17. private RoleService roleService;
  18. @Transactional
  19. public void save(UserModel userModel) throws Exception {
  20. query4();
  21. query5();
  22. query6();
  23. saveData(userModel);
  24. }
  25. }

@Transactional注解,如果被加到方法上,有个缺点就是整个方法都包含在事务当中了。
上面的这个例子中,在UserService类中,其实只有这两行才需要事务:

  1. roleService.save(userModel);
  2. update(userModel);

在RoleService类中,只有这一行需要事务:

  1. saveData(userModel);

现在的这种写法,会导致所有的query方法也被包含在同一个事务当中。
如果query方法非常多,调用层级很深,而且有部分查询方法比较耗时的话,会造成整个事务非常耗时,而从造成大事务问题。
Spring 事务管理 - 图2

2.编程式事务

上面聊的这些内容都是基于@Transactional注解的,主要说的是它的事务问题,把这种事务叫做:声明式事务。
其实,spring还提供了另外一种创建事务的方式,即通过手动编写代码实现的事务,把这种事务叫做:编程式事务。例如:

  1. @Autowired
  2. private TransactionTemplate transactionTemplate;
  3. ...
  4. public void save(final User user) {
  5. queryData1();
  6. queryData2();
  7. transactionTemplate.execute((status) => {
  8. addData1();
  9. updateData2();
  10. return Boolean.TRUE;
  11. })
  12. }

在spring中为了支持编程式事务,专门提供了一个类:TransactionTemplate,在它的execute方法中,就实现了事务的功能。
相较于@Transactional注解声明式事务,更建议大家使用,基于TransactionTemplate的编程式事务。主要原因如下:

  1. 避免由于spring aop问题,导致事务失效的问题。
  2. 能够更小粒度的控制事务的范围,更直观。

建议在项目中少使用@Transactional注解开启事务。但并不是说一定不能用它,如果项目中有些业务逻辑比较简单,而且不经常变动,使用@Transactional注解开启事务开启事务也无妨,因为它更简单,开发效率更高,但是千万要小心事务失效的问题。