本文着重分析 Spring 声明式事务 @Transactional 的实现原理。学习 Spring 事务首先要知道 spring-tx 和 spring-jdbc 是两个工程,换句话说 Spring 事务和 jdbc 它们是彻底解耦的。

  1. Spinrg-AOP:Spring 事务是 AOP 的典型应用。
  2. PlatformTransactionManager:Spring 是如何接管事务的。
  3. 同一个连接保证:Spring 事务和用于业务处理的的连接是如何保证是同一个?如果 connection 都不是同一个,事务更无从谈起。

    1. 工作原理

    首先,我们先看一下原生的 jdbc 是如何处理事务的。之后,我们再分析 Spring 是如何接管事务。

    1.1 jdbc 事务

    经典的 jdbc 事务可以分为三部分:第一步先获取连接 connection,并关闭自动提交;第二步执行业务逻辑;第三步根据是否抛出异常选择回滚或提交事务。
    1. Connection connection = getConnection();
    2. connection.setAutoCommit(false);
    3. -------------------- 准备事务 ------------------------
    4. try {
    5. Statement statement = connection.createStatement();
    6. ResultSet resultSet = statement.executeQuery("select * from demo_table");
    7. ------------------ 提交或回滚事务 --------------------
    8. } catch (SQLException e) {
    9. connection.rollback();
    10. } finally {
    11. connection.commit();
    12. }
    想像一下,如果是你来设计一个框架接管所有的事务,你会怎么做呢?
  • 首先,这是典型的 AOP 应用,可以使用面向切面编程来消除模板代码。整个事务处理的过程分为三部分:
    • 第一步:获取事务,包括获取连接,以及关闭自动提交等配置。
    • 第二步:通过反射执行业务代码。
    • 第三步:根据是否抛出异常,提交或回滚事务。
  • 其次,问题就来了:使用 AOP 后,如何保证处理事务的 connection 和中间处理业务的 connection 是同一个呢?Spring 使用了一个最简单的办法,将** connection 保存**在 ThreadLocal 中,这样就能保证处理的都是同一个连接。

    1.2 Spring 事务处理

    我们先大致了解一下 Spring 与事务有关的核心类,再接着分析 Spring 是如何接管事务的。

    1.2.1 事务核心类

    Spring 事务中涉及到的核心的类如下,下文如果不特别说明,都使用其简称,代表声明式事务中的默认实现类。

  • TransactionInterceptor:代理的核心类,也是事务处理的入口。

  • PlatformTransactionManager:(最核心类)事务管理器,管理事务的各生命周期方法。简称 TxMgr,默认实现类为 DataSourceTransactionManager。
  • TransactionAttribute:事务属性,包含隔离级别,传播行为,是否只读等信息。简称 TxAttr,通过 AnnotationTransactionAttributeSource 读取 @Transactional 注解中的事务属性。
  • TransactionStatus:事务状态。简称 TxStatus,默认实现类 DefaultTransactionStatus,其中最核心的属性是事务对象 transaction(在 jdbc 中就是 ConnectionHolder)。
  • TransactionInfo:事务信息,内含 TxMgr, TxAttr, TxStatus 等信息,同样保存在 ThreadLocal 中。简称 TxInfo。
  • TransactionSynchronization:事务生命周期回调。简称 TxSync。
  • TransactionSynchronizationManager:事务状态管理器,实际上是使用 ThreadLocal 保存当前事物的状态信息。其中,jdbc 中最关键的 ConnectionHolder 信息就是保存在 resources 变量中。简称 TxSyncMgr。

    1.2.2 事务启动

    @EnableTransactionManagement 开启事务时,默认会通过 @Import 注册 AutoProxyRegistrar 和 ProxyTransactionManagementConfiguration 两个配置类组件。

  • AutoProxyRegistrar:开启 AOP 注解驱动。实际上注册 InfrastructureAdvisorAutoProxyCreator 的 AOP 后置处理器。

  • ProxyTransactionManagementConfiguration:事务核心配置类,注册与事务相关的组件。无论是 xml 还是注解驱动,最终都会注册这三个最核心的组件。
    • AnnotationTransactionAttributeSource:读取 @Transactional 注解的属性 TxAttr。
    • TransactionInterceptor:事务处理的核心入口。
    • BeanFactoryTransactionAttributeSourceAdvisor:和 Spring AOP 的其它 Advisor(增强)一样,它有两个最主要属性 Advice(通知)和 Pointcut(切入点)。其中 Advice 就是 TransactionInterceptor,而 Pointcut 则是 TransactionAttributeSourcePointcut。这个切入点 Pointcut 也很简单,MethodMatcher 和 ClassFilter 的匹配,实际上都是委托给 TransactionAttributeSource 组件完成的。

总结:开启声明式事务后,AnnotationTransactionAttributeSource 会读取类或方法上 @Transactional 注解,决定是否生成 AOP 代理。其事务的核心处理由 TransactionInterceptor 这个拦截器完成,它也是事务处理的入口。

1.2.3 事务处理

有了上面的基础,我们再来看一下 Spring 是如何处理事务。 Spring Tx 事务 - 图1说明:我们已经说了,Spring 事务是典型的 AOP 应用。所有标注有 @Transactional 注解的类或方法都会被 AOP 代理,代理后的核心逻辑在 TransactionInterceptor 类中,大致可以分为准备事务(1 ~ 3)和执行事务(4)两部分。

  1. 读取事务属性(TxAttr):getTransactionAttributeSource 方法会从 AnnotationTransactionAttributeSource 读取 @Transactional 注解中的事务属性到 TransactionAttribute 中。
  2. 获取事务管理器(TxMgr):determineTransactionManager 方法根据 txAttr 属性配置获取事务管理器。如果 txAttr 配置了事务管理器就使用配置的,否则使用默认的事务管理器(从 beanFactory 容器中获取)。
  3. 获取事务对象(TxObject):createTransactionIfNecessary 内部调用 TxMgr#getTransaction(txAttr) 创建事务,对 jdbc 而言就是创建一个 connection。getTransaction 会根据事务的传播行为 propagationBehavior 决定是否创建新的事务。Spring 有 7 种事务的传播行为,默认是 REQUIRED,后面会具体分析。
  4. 执行事务:proceedWithInvocation 方法会执行业务逻辑。如果没有异常,则调用 commitTransactionAfterReturning -> TxMgr#commit(TxStatus) -> connection.commit() 提交事务,否则调用 completeTransactionAfterThrowing-> TxMgr#rollback(TxStatus) -> connection.rollback() 方法回滚事务。

    1. protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
    2. final InvocationCallback invocation) throws Throwable {
    3. // 1. 获取事务属性。
    4. TransactionAttributeSource tas = getTransactionAttributeSource();
    5. final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    6. // 2. 根据事务属性获取事务管理器
    7. final TransactionManager tm = determineTransactionManager(txAttr);
    8. PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
    9. final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
    10. // 3. 事务处理
    11. if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
    12. // 3.1 生成新的事务
    13. TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
    14. Object retVal;
    15. try
    16. retVal = invocation.proceedWithInvocation(); // 3.2 执行业务代码
    17. } catch (Throwable ex) {
    18. completeTransactionAfterThrowing(txInfo, ex); // 3.3 事务回滚
    19. throw ex;
    20. } finally {
    21. cleanupTransactionInfo(txInfo);
    22. }
    23. commitTransactionAfterReturning(txInfo); // 3.4 事务提交
    24. return retVal;
    25. }
    26. ...
    27. }

    总结:TransactionInterceptor 类封装了事务处理的核心逻辑,和原生的 jdbc 事务处理流程基本相同。其主要的功能基本上都委托给了事务管理器(TxMgr)。

    2. 源码分析

    2.1 事务状态管理器

    TransactionSynchronizationManager 保存了事务的状态,这些状态都是通过 ThreadLocal 保存的。(可以暂时忽略,状态同步并不影响事务的执行)

  5. actualTransactionActive:如是否有事务开启。

  6. currentTransactionIsolationLevel:事务的隔离级别。
  7. currentTransactionReadOnly:事务是否是只读。对于只读事务,数据库可以做一些优化。
  8. currentTransactionName:事务名称。
  9. synchronizations:事务生命周期回调接口。
  10. resources:其它绑定的资源,如 DataSourceTransactionManager 将连接 ConnectionHolder 就保存在该变量中。

如果是新建的事务,会通过 prepareSynchronization 将当前事务的状态信息保存到 ThreadLocal 中。

  1. protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
  2. if (status.isNewSynchronization()) {
  3. TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
  4. TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
  5. definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
  6. definition.getIsolationLevel() : null);
  7. TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
  8. TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
  9. TransactionSynchronizationManager.initSynchronization();
  10. }
  11. }

2.2 事务管理器

AbstractPlatformTransactionManager 是各种事务管理器的抽象基类,也可以说是骨架。它封装了很多事务管理的流程代码,如获取事务事务的传播行为事务的挂起和恢复事务提交和回滚等待,具体的实现则需要子类完成。

  • getTransaction(TransactionDefinition):根据事务配置 TxAttr 获取事务对象。对声明式事务而言,根据 @Transactional 注解配置获取事务。
  • commit(TransactionStatus):提交事务模板方法,具体由子类完成。
  • rollback(TransactionStatus):回滚事务。

下面列出一些主要的模板方法。

  • doGetTransaction:用于从 TxMgr 中拿一个事务对象。如果当前已经有事务的话,返回的对象应该是要包含当前事务信息的。
  • isExistingTransaction:用于判断一个事务对象是否对应于一个已经存在的事务。Spring会根据这个方法的返回值来分类讨论事务该如何传播。
  • doBegin:物理开启事务。
  • doSuspend:将当前事务资源挂起。对于我们常用的 DataSourceTransactionManager,它的资源就是ConnectionHolder。会将 ConnectionHolder 与当前线程脱钩并取出来。
  • doResume:恢复当前事务资源。对于我们常用的 DataSourceTransactionManager,它会将 ConnectionHolder 重新绑定到线程上。
  • doCommit:物理提交事务。
  • doRollback:物理回滚事务。
  • doSetRollbackOnly:给事务标记为回滚。对于我们常用的 DataSourceTransactionManager,它的实现是拿出事务对象中的 ConnectionHolder 打上回滚标记。这个标记是一种 “全局的标记”,因为隶属于同一个物理事务都能读到同一个 ConnectionHolder。

    2.2.1 获取事务入口

    核心方法是 TxMgr#getTransaction(TxAttr)。上文已经分析了 TransactionInterceptor 拦截事务请求时,会通过 createTransactionIfNecessary 方法创建事务,实际上就是调用 TxMgr#getTransaction(TxAttr) 获取事务。
    getTransaction 方法获取事务的逻辑比较复杂,因为要考虑事务的传播行为,如果事务已经存在,就需要考虑是否通过 suspend 和 resume 方法挂起和恢复事务。涉及到的方法:

  • getTransaction:根据事务配置 TxAttr 获取事务。其中 doGetTransaction 方法用于获取当前的事务对象。

  • suspend:挂起当前事务,返回当前事务的所有状态。主要包含两点,一是事务的状态,指的是 TxSyncMgr 中的状态信息;二是事务对象本身,由具体子类的 doSuspend 实现。最后返回的 SuspendedResourcesHolder 包含事务状态和事务对象,缓存到新的事务中。
  • resume:恢复当前事务,包括事务状态和事务对象。将 SuspendedResourcesHolder 中的事务状态信息恢复到 TxSyncMgr 状态中,最后子类通过 doResume 方法将事务对象 ConnectionHolder 重新绑定到线程上。

本小节以 Spring 默认的 REQUIRED 传播行为(如果当前有事务就加入到当前事务中,否则就创建一个新的事务)进行分析。

  1. @Override
  2. public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
  3. throws TransactionException {
  4. TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
  5. Object transaction = doGetTransaction();
  6. ...
  7. // 1. 当前事务存在
  8. if (isExistingTransaction(transaction)) {
  9. return handleExistingTransaction(def, transaction, debugEnabled);
  10. }
  11. // 2. 当前事务不存在
  12. // 2.1 MANDATORY 必须在事务中执行,如果事务不存在则抛出异常
  13. if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
  14. throw new IllegalTransactionStateException();
  15. // 2.2 REQUIRED、REQUIRES_NEW、NESTED 创建新的事务执行
  16. } else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
  17. def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
  18. def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
  19. // 所谓的挂起,实际上是将当前事务的状态保存到新事务的suspendedResources字段中
  20. SuspendedResourcesHolder suspendedResources = suspend(null);
  21. try {
  22. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  23. DefaultTransactionStatus status = newTransactionStatus(
  24. def, transaction, true, newSynchronization, debugEnabled, suspendedResources);
  25. doBegin(transaction, def);
  26. prepareSynchronization(status, def);
  27. return status;
  28. } catch (RuntimeException | Error ex) {
  29. resume(null, suspendedResources);
  30. throw ex;
  31. }
  32. // 2.3 SUPPORTS、NOT_SUPPORTED、NEVER 不在事务中执行
  33. } else {
  34. boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
  35. return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
  36. }
  37. }

说明:获取事务时,通过 doGetTransaction 获取当前事务 transaction,然后针是否存在事务分别处理。我们先只分析当前事务不存在的情况,如果是事务传播行为是 REQUIRED(必须在事务中执行),则需要创建新的事务。此时 suspend 参数为 null,也就是不会调用 doSuspend 方法解绑连接,事务仍会在当前连接上提交,但会创建一个新的事务。

2.2.2 获取事务的步骤

  • 第一步,调用 suspend 方法挂起当前事务。也就是将当前事务的状态和事务对象封装到 suspendedResources 对象中,最后保存到当前事务对象 status 中。
    • 如果当前事务不存在,suspend 不会调用 doSuspend 方法。
    • 如果当前事务存在,suspend 会调用 doSuspend 获取当前的事务对象。DataSourceTxMgr#doSuspend 会将事务对象置空,同时解绑 ThreadLocal。同时 doBegin 方法发现没有事务对象,则创建一个新的连接,并绑定到 ThreadLocal 中。
  • 第二步,调用 doBegin 配置事务对象。对于 DataSourceTxMgr#doBegin 方法,一是判断连接是否存在,如果存在就创建一个新的连接,并绑定到 ThreadLocal。二是配置连接,如只读、事务隔离级别等等。
  • 第三步,调用 prepareTransactionStatus 激活事务状态管理器。TxSyncMgr 维护了多个 ThreadLocal 变量来保存当前事务的状态信息,prepareTransactionStatus 方法将当前事务的状态绑定到 ThreadLocal。 Spring Tx 事务 - 图2

    2.2.3 事务的传播行为

    Spring 中的 Propagation 枚举和 TransactionDefinition 接口定义了 7 种事务传播行为。
传播行为 存在事务 不存在事务
REQUIRED 加入当前事务 创建新的事务
REQUIRES_NEW 挂起当前事务,创建新的事务 创建新的事务
SUPPORTS 加入当前事务 不在事务中执行
NOT_SUPPORTED 挂起当前事务,不在事务中执行 不在事务中执行
MANDATORY 加入当前事务 抛出异常
NEVER 抛出异常 不在事务中执行
NESTED 创建嵌套事务 创建新的事务

关于事务的传播行为的代码见 AbstractPlatformTxMgr#getTransaction,之前我们已经分析了事务不存在时,事务的创建逻辑,下面再看一下事务存在时的处理逻辑。

  1. private TransactionStatus handleExistingTransaction(
  2. TransactionDefinition definition, Object transaction, boolean debugEnabled)
  3. throws TransactionException {
  4. // 1. NEVER:不能存在事务,如果有事务则抛出异常
  5. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
  6. throw new IllegalTransactionStateException();
  7. }
  8. // 2. NOT_SUPPORTED:挂起当前事务,不在事务中执行
  9. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
  10. Object suspendedResources = suspend(transaction);
  11. boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
  12. return prepareTransactionStatus(
  13. definition, null, false, newSynchronization, debugEnabled, suspendedResources);
  14. }
  15. // 3. REQUIRES_NEW:挂起当前事务,创建新的事务
  16. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
  17. SuspendedResourcesHolder suspendedResources = suspend(transaction);
  18. try {
  19. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  20. DefaultTransactionStatus status = newTransactionStatus(
  21. definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
  22. doBegin(transaction, definition);
  23. prepareSynchronization(status, definition);
  24. return status;
  25. } catch (RuntimeException | Error beginEx) {
  26. resumeAfterBeginException(transaction, suspendedResources, beginEx);
  27. throw beginEx;
  28. }
  29. }
  30. // 4. NESTED:创建嵌套事务???
  31. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
  32. ...
  33. }
  34. // 5. SUPPORTS、REQUIRED:在当前事务中执行
  35. if (isValidateExistingTransaction()) {
  36. }
  37. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  38. return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
  39. }

说明:如果事务需要挂起,则调用 suspend(transaction) 将当前事务的状态缓存起来,同时将当前事务的状态全部重置。当事务结束时,则调用 resume 恢复事物的状态。

2.2.4 事务的挂起和恢复

事务的挂起和恢复本质是将事务的状态和事务对象封装成 SuspendedResourcesHolder,保存到当前事务中。当事务执行结束后,不管是回滚还是提交事务,都需要恢复原先的事务。
(1)事务挂起

  1. // suspend 方法将当前事务的状态和事务对象封装到SuspendedResourcesHolder中
  2. protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
  3. if (TransactionSynchronizationManager.isSynchronizationActive()) {
  4. List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
  5. try {
  6. Object suspendedResources = null;
  7. if (transaction != null) {
  8. suspendedResources = doSuspend(transaction);
  9. }
  10. String name = TransactionSynchronizationManager.getCurrentTransactionName();
  11. TransactionSynchronizationManager.setCurrentTransactionName(null);
  12. boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
  13. TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
  14. Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
  15. TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
  16. boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
  17. TransactionSynchronizationManager.setActualTransactionActive(false);
  18. return new SuspendedResourcesHolder(
  19. suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
  20. } catch (RuntimeException | Error ex) {
  21. doResumeSynchronization(suspendedSynchronizations);
  22. throw ex;
  23. }
  24. } else if (transaction != null) {
  25. Object suspendedResources = doSuspend(transaction);
  26. return new SuspendedResourcesHolder(suspendedResources);
  27. } else {
  28. return null;
  29. }
  30. }
  31. @Override
  32. protected Object doSuspend(Object transaction) {
  33. DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  34. txObject.setConnectionHolder(null);
  35. return TransactionSynchronizationManager.unbindResource(obtainDataSource());
  36. }

说明:suspend 方法将当前事务的状态(TxSyncMgr)和事务对象(doSuspend)封装到 SuspendedResourcesHolder 中。另外,当抛出 RuntimeException 和 Error 异常时会回滚。
(2)事务恢复

  1. protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
  2. throws TransactionException {
  3. if (resourcesHolder != null) {
  4. Object suspendedResources = resourcesHolder.suspendedResources;
  5. if (suspendedResources != null) {
  6. doResume(transaction, suspendedResources);
  7. }
  8. List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
  9. if (suspendedSynchronizations != null) {
  10. TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
  11. TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
  12. TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
  13. TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
  14. doResumeSynchronization(suspendedSynchronizations);
  15. }
  16. }
  17. }
  18. @Override
  19. protected void doResume(@Nullable Object transaction, Object suspendedResources) {
  20. TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
  21. }

2.2.5 事务的提交和回滚

commit 和 rollback 分别提交和回滚事务,它们处理的逻辑都差不多:

  • 事务提交或回滚时,大致可以分为以下三种情况。
    • 已经设置保存点:(嵌套事务)回滚到 savepoint。
    • 外层事务:(正常情况)直接提交或回滚事务。
    • 已经设置回滚标记:(内存事务发生异常时,会设置回滚标记)如果 failEarlyOnGlobalRollbackOnly = true 则会立即抛出异常,默认为 false。
  • 异常处理:如果是 RuntimeException 或 Error 则需要调用 doRollbackOnCommitException 处理异常。如果是外层事务则直接回滚,否则设置回滚标记 rollbackOnly=true。
  • 状态恢复:cleanupAfterCompletion(status) 方法恢复被挂起事务的状态,总之所有的状态都要回滚到事务开始前。

(1)事务提交

  1. @Override
  2. public final void commit(TransactionStatus status) throws TransactionException {
  3. DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
  4. // 1. 设置单个事务的回滚状态,相当于局部状态。rollbackOnly=true 只能回滚
  5. if (defStatus.isLocalRollbackOnly()) {
  6. processRollback(defStatus, false);
  7. return;
  8. }
  9. // 2. 设置ConnectionHolder的回滚状态,相当于全局状态
  10. if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
  11. processRollback(defStatus, true);
  12. return;
  13. }
  14. processCommit(defStatus);
  15. }
  16. // triggerXX 方法都是TxSync生命周期的回调。
  17. private void processCommit(DefaultTransactionStatus status) throws TransactionException {
  18. try {
  19. boolean beforeCompletionInvoked = false;
  20. try {
  21. boolean unexpectedRollback = false;
  22. prepareForCommit(status);
  23. triggerBeforeCommit(status);
  24. triggerBeforeCompletion(status);
  25. beforeCompletionInvoked = true;
  26. // 1.1 提交事务,有savepoint就回滚到保存点(嵌套事务)
  27. if (status.hasSavepoint()) {
  28. unexpectedRollback = status.isGlobalRollbackOnly();
  29. status.releaseHeldSavepoint();
  30. // 1.2 (核心逻辑)如果是外层事务,则直接提交事务
  31. } else if (status.isNewTransaction()) {
  32. unexpectedRollback = status.isGlobalRollbackOnly();
  33. doCommit(status);
  34. // 1.3 如果有异常,设置了全局回滚,则直接抛出异常
  35. } else if (isFailEarlyOnGlobalRollbackOnly()) {
  36. unexpectedRollback = status.isGlobalRollbackOnly();
  37. }
  38. if (unexpectedRollback) {
  39. throw new UnexpectedRollbackException();
  40. }
  41. } catch (UnexpectedRollbackException ex) {
  42. // 2.1 未知异常,直接抛出
  43. triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
  44. throw ex;
  45. } catch (TransactionException ex) {
  46. // 2.2 TransactionException异常,默认直接抛出异常
  47. if (isRollbackOnCommitFailure()) {
  48. doRollbackOnCommitException(status, ex); // 设置全局回滚
  49. } else {
  50. triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
  51. }
  52. throw ex;
  53. } catch (RuntimeException | Error ex) {
  54. // 2.3 Spring默认这两种异常回滚
  55. if (!beforeCompletionInvoked) {
  56. triggerBeforeCompletion(status);
  57. }
  58. doRollbackOnCommitException(status, ex); // 设置全局回滚
  59. throw ex;
  60. }
  61. try {
  62. triggerAfterCommit(status);
  63. } finally {
  64. triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
  65. }
  66. } finally {
  67. // 3. 恢复挂起的事务状态及事务对象,总之所有的状态都要回滚到事务开始前的状态
  68. cleanupAfterCompletion(status);
  69. }
  70. }

说明:commit 提交时,事务处理如下。

  • 回滚标记:通过内层事务异常时,设置了回滚标记,直接调用 processRollback 进行回滚。一切正常,才调用 processCommit 进行提交。
  • 正常情况:外层事务直接提交。内层事务如果允许快速失败(failfast),则直接抛出异常。
  • 异常情况:提交时发生异常,这里的异常不仅仅是数据库事务提交异常,也包括事务生命周期回调异常的场景,此时调用 doRollbackOnCommitException 处理回滚。外层事务则直接回滚。内层事务设置回滚标记, 让外层事务回滚。

补充:isLocalRollbackOnly 和 isGlobalRollbackOnly 的区别?

  • isLocalRollbackOnly:设置单个事务的 status.rollbackOnly,只有单个事务可见。
  • isGlobalRollbackOnly:设置的是连接的 connectionHolder.rollbackOnly,即该连接的所有事务都可见,所以是全局的。在 Spring 中,doRollbackOnCommitException 和 doSetRollbackOnly 方法都是设置的全局回滚异常。

(2)事务回滚

  1. @Override
  2. public final void rollback(TransactionStatus status) throws TransactionException {
  3. DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
  4. processRollback(defStatus, false);
  5. }
  6. private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
  7. try {
  8. boolean unexpectedRollback = unexpected;
  9. try {
  10. triggerBeforeCompletion(status);
  11. // 1.1 嵌套事务,直接回滚到savepoint
  12. if (status.hasSavepoint()) {
  13. status.rollbackToHeldSavepoint();
  14. // 1.2 (核心逻辑)外层事务,直接回滚
  15. } else if (status.isNewTransaction()) {
  16. doRollback(status);
  17. // 1.3 内层事务,设置全局回滚或直接抛出异常
  18. } else {
  19. // 设置全局回滚
  20. if (status.hasTransaction()) {
  21. if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
  22. doSetRollbackOnly(status);
  23. }
  24. }
  25. // 默认failEarlyOnGlobalRollbackOnly=false,也就是不允许提前抛出异常
  26. if (!isFailEarlyOnGlobalRollbackOnly()) {
  27. unexpectedRollback = false; // 不允许提前抛出异常
  28. }
  29. }
  30. } catch (RuntimeException | Error ex) {
  31. triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
  32. throw ex;
  33. }
  34. triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
  35. if (unexpectedRollback) {
  36. throw new UnexpectedRollbackException();
  37. }
  38. } finally {
  39. cleanupAfterCompletion(status);
  40. }
  41. }

说明:rollback 提交时,事务处理如下。

  • 正常情况:外层事务直接回滚。内层事务调用 doSetRollbackOnly 方法设置回滚标记。
  • 异常情况:回滚时发生异常,直接抛出异常。

    2.3 JdbcTemplate 事务处理

    JdbcTemplate 如果开启了事务,TxManager#getTransaction 获取事务时,会将从连接池 DataSource 中获取的连接 ConnectionHolder 绑定到 TxSynchMgr.resources 变量中,resources 是 ThreadLocal,通过 ThreadLocal 做线程隔离。同时 JdbcTemplate 也是这个 ThreadLocal 中获取连接 ConnectionHolder。这样 Spring 就接管了整个事务。 Spring Tx 事务 - 图3说明:上述时序图主要关注 ConnectionHolder 获取的连接过程,通过 ThreadLocal 变量保证事务业务处理的连接是同一个。另外,并不是所有的事务处理都会完整的经历上述时序图的所有过程。比如,
  1. 如果直接在当前事务中执行,就不需要调用 suspend 来挂起当前事务。这样 doBegin 时,就不需要调用 dataSouce.getConnection() 来重新获取连接。
  2. 如果当前事务不存在,调用 suspend 时就不需要调用 doSuspend 来解绑事务。同样也不需要重新获取连接。

    3. 总结时刻

    推荐阅读

  3. Spring事务源码阅读笔记》:推荐指数五星。对 Spring 事务分析比较全面。

  4. read-only=”true” 只读事务》:只读事务

每天用心记录一点点。内容也许不重要,但习惯很重要!