Spring 事务

声明式事务

Propagation

  • 事务传播
  1. public enum Propagation {
  2. /**
  3. * 有事务则加入,没有则新建
  4. */
  5. REQUIRED(TransactionDefinition.PROPAGATION_REQUIRED),
  6. /**
  7. * 有事务就用,如果没有就不开启(继承关系)
  8. * @see org.springframework.transaction.support.AbstractPlatformTransactionManager#setTransactionSynchronization
  9. */
  10. SUPPORTS(TransactionDefinition.PROPAGATION_SUPPORTS),
  11. /**
  12. * 必须在已有事务中
  13. */
  14. MANDATORY(TransactionDefinition.PROPAGATION_MANDATORY),
  15. /**
  16. * 不管是否已有事务,都要开启新事务,老事务挂起
  17. * @see org.springframework.transaction.jta.JtaTransactionManager#setTransactionManager
  18. */
  19. REQUIRES_NEW(TransactionDefinition.PROPAGATION_REQUIRES_NEW),
  20. /**
  21. * 不开启事务
  22. * @see org.springframework.transaction.jta.JtaTransactionManager#setTransactionManager
  23. */
  24. NOT_SUPPORTED(TransactionDefinition.PROPAGATION_NOT_SUPPORTED),
  25. /**
  26. * 必须在没有事务的方法中调用,否则抛出异常
  27. */
  28. NEVER(TransactionDefinition.PROPAGATION_NEVER),
  29. /**
  30. * 果已有事务,则嵌套执行,如果没有,就新建(和REQUIRED类似,和REQUIRES_NEW容易混淆)
  31. * @see org.springframework.jdbc.datasource.DataSourceTransactionManager
  32. */
  33. NESTED(TransactionDefinition.PROPAGATION_NESTED);
  34. private final int value;
  35. Propagation(int value) {
  36. this.value = value;
  37. }
  38. public int value() {
  39. return this.value;
  40. }
  41. }

Isolation

  • 事务级别
  1. public enum Isolation {
  2. /**
  3. * @see java.sql.Connection
  4. */
  5. DEFAULT(TransactionDefinition.ISOLATION_DEFAULT),
  6. /**
  7. * 读未提交
  8. *
  9. * @see java.sql.Connection#TRANSACTION_READ_UNCOMMITTED
  10. */
  11. READ_UNCOMMITTED(TransactionDefinition.ISOLATION_READ_UNCOMMITTED),
  12. /**
  13. * 读已提交
  14. *
  15. * @see java.sql.Connection#TRANSACTION_READ_COMMITTED
  16. */
  17. READ_COMMITTED(TransactionDefinition.ISOLATION_READ_COMMITTED),
  18. /**
  19. * 可重复读
  20. *
  21. * @see java.sql.Connection#TRANSACTION_REPEATABLE_READ
  22. */
  23. REPEATABLE_READ(TransactionDefinition.ISOLATION_REPEATABLE_READ),
  24. /**
  25. * 可串行化
  26. *
  27. * @see java.sql.Connection#TRANSACTION_SERIALIZABLE
  28. */
  29. SERIALIZABLE(TransactionDefinition.ISOLATION_SERIALIZABLE);
  30. private final int value;
  31. Isolation(int value) {
  32. this.value = value;
  33. }
  34. public int value() {
  35. return this.value;
  36. }
  37. }

EnableTransactionManagement

  • 下面代码是一个注解方式的事务配置使用 EnableTransactionManagement来开启事务支持
  1. @ComponentScan(basePackages = "org.source.hot.spring.overview.ioc.tx.declarative")
  2. @EnableTransactionManagement
  3. public class TxConfig {
  4. @Bean // 数据源
  5. public DataSource dataSource() {
  6. DruidDataSource dataSource = new DruidDataSource();
  7. dataSource.setUsername("");
  8. dataSource.setPassword("");
  9. dataSource.setUrl("");
  10. dataSource.setDriverClassName(com.mysql.jdbc.Driver.class.getName());
  11. return dataSource;
  12. }
  13. @Bean
  14. public JdbcTemplate jdbcTemplate(DataSource dataSource) {
  15. return new JdbcTemplate(dataSource);
  16. }
  17. @Bean //事务管理器
  18. public PlatformTransactionManager platformTransactionManager(DataSource dataSource) {
  19. return new DataSourceTransactionManager(dataSource);
  20. }
  21. }
  • 注解源码如下,关注于@Import(TransactionManagementConfigurationSelector.class)
  1. @Target(ElementType.TYPE)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. @Import(TransactionManagementConfigurationSelector.class)
  5. public @interface EnableTransactionManagement {
  6. boolean proxyTargetClass() default false;
  7. AdviceMode mode() default AdviceMode.PROXY;
  8. int order() default Ordered.LOWEST_PRECEDENCE;
  9. }
  1. public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
  2. @Override
  3. protected String[] selectImports(AdviceMode adviceMode) {
  4. // 根据切面类型进行初始化
  5. switch (adviceMode) {
  6. case PROXY:
  7. // 默认值
  8. return new String[] {AutoProxyRegistrar.class.getName(),
  9. ProxyTransactionManagementConfiguration.class.getName()};
  10. case ASPECTJ:
  11. return new String[] {determineTransactionAspectClass()};
  12. default:
  13. return null;
  14. }
  15. }
  16. private String determineTransactionAspectClass() {
  17. return (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader()) ?
  18. TransactionManagementConfigUtils.JTA_TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME :
  19. TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME);
  20. }
  21. }

ProxyTransactionManagementConfiguration

  1. @Configuration(proxyBeanMethods = false)
  2. public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
  3. /**
  4. * 事务切面
  5. * @param transactionAttributeSource
  6. * @param transactionInterceptor
  7. * @return
  8. */
  9. @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
  10. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  11. public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
  12. TransactionAttributeSource transactionAttributeSource,
  13. TransactionInterceptor transactionInterceptor) {
  14. // 事务切面
  15. BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
  16. // 事务属性
  17. advisor.setTransactionAttributeSource(transactionAttributeSource);
  18. advisor.setAdvice(transactionInterceptor);
  19. if (this.enableTx != null) {
  20. // 执行顺序
  21. advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
  22. }
  23. return advisor;
  24. }
  25. @Bean
  26. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  27. public TransactionAttributeSource transactionAttributeSource() {
  28. return new AnnotationTransactionAttributeSource();
  29. }
  30. /***
  31. * 事务拦截器
  32. * @param transactionAttributeSource
  33. * @return
  34. */
  35. @Bean
  36. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  37. public TransactionInterceptor transactionInterceptor(
  38. TransactionAttributeSource transactionAttributeSource) {
  39. TransactionInterceptor interceptor = new TransactionInterceptor();
  40. interceptor.setTransactionAttributeSource(transactionAttributeSource);
  41. if (this.txManager != null) {
  42. // 事务管理器注入
  43. interceptor.setTransactionManager(this.txManager);
  44. }
  45. return interceptor;
  46. }
  47. }

TransactionInterceptor

image-20200729144622440

  • 实现了org.aopalliance.intercept.MethodInterceptor接口的方法
  1. @Override
  2. @Nullable
  3. public Object invoke(MethodInvocation invocation) throws Throwable {
  4. // Work out the target class: may be {@code null}.
  5. // The TransactionAttributeSource should be passed the target class
  6. // as well as the method, which may be from an interface.
  7. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
  8. // Adapt to TransactionAspectSupport's invokeWithinTransaction...
  9. return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
  10. }
  • 这段代码会在具有Transactional 的注解方法上生效
  1. @Service
  2. public class IssueServiceImpl {
  3. @Autowired
  4. private JdbcTemplate jdbcTemplate;
  5. @Transactional()
  6. public boolean insertIssue() throws Exception {
  7. jdbcTemplate.execute("INSERT INTO `scrum`.`issue`() VALUES ()");
  8. throw new Exception("a");
  9. }
  10. }
  11. public class DeclarativeTransactionTest {
  12. public static void main(String[] args) throws Exception {
  13. AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(
  14. TxConfig.class);
  15. IssueServiceImpl bean = applicationContext.getBean(IssueServiceImpl.class);
  16. bean.insertIssue();
  17. System.out.println();
  18. applicationContext.close();
  19. }
  20. }

image-20200729145518089

断点开始进行查阅. 再断点后执行一步会直接进入 cglib 代理对象

org.springframework.aop.framework.CglibAopProxy.DynamicAdvisedInterceptor#intercept 具体不展开,继续往下执行

image-20200729145637688

走到invoke方法了

入参对象查看

image-20200729145835608

  • 获取事务属性

    ```java @Override @Nullable public TransactionAttribute getTransactionAttribute(Method method,

    1. @Nullable Class<?> targetClass) {

    if (method.getDeclaringClass() == Object.class) {

    1. return null;

    }

    // First, see if we have a cached value. // 尝试缓存中获取 Object cacheKey = getCacheKey(method, targetClass); TransactionAttribute cached = this.attributeCache.get(cacheKey); if (cached != null) {

    1. // Value will either be canonical value indicating there is no transaction attribute,
    2. // or an actual transaction attribute.
    3. if (cached == NULL_TRANSACTION_ATTRIBUTE) {
    4. return null;
    5. } else {
    6. return cached;
    7. }

    } else {

    1. // We need to work it out.
    2. // 自行构建一个事务属性
    3. TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
    4. // Put it in the cache.
    5. if (txAttr == null) {
    6. this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
    7. } else {
    8. String methodIdentification = ClassUtils
    9. .getQualifiedMethodName(method, targetClass);
    10. if (txAttr instanceof DefaultTransactionAttribute) {
    11. ((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification);
    12. }
    13. if (logger.isTraceEnabled()) {
    14. logger.trace("Adding transactional method '" + methodIdentification
    15. + "' with attribute: " + txAttr);
    16. }
    17. this.attributeCache.put(cacheKey, txAttr);
    18. }
    19. return txAttr;

    } }

  1. protected Object getCacheKey(Method method, @Nullable Class<?> targetClass) {
  2. return new MethodClassKey(method, targetClass);
  3. }
  1. ![image-20200729162023837](/uploads/projects/SpringBoot-source/docs/Spring/TX/images/spring/image-20200729162023837.png)
  2. - 此处方法已经获取到了这个方法就是后面的一个切面
  3. - 确定事务管理器
  4. ```java
  5. @Nullable
  6. protected TransactionManager determineTransactionManager(
  7. @Nullable TransactionAttribute txAttr) {
  8. // Do not attempt to lookup tx manager if no tx attributes are set
  9. // 空判断返回一个事务管理器
  10. if (txAttr == null || this.beanFactory == null) {
  11. return getTransactionManager();
  12. }
  13. // 属性是否有别名
  14. String qualifier = txAttr.getQualifier();
  15. // 如果有
  16. if (StringUtils.hasText(qualifier)) {
  17. // 从 ioc 容器中根据类型和名称获取事务管理器
  18. return determineQualifiedTransactionManager(this.beanFactory, qualifier);
  19. } else if (StringUtils.hasText(this.transactionManagerBeanName)) {
  20. // 从 ioc 容器中根据类型和名称获取事务管理器
  21. return determineQualifiedTransactionManager(this.beanFactory,
  22. this.transactionManagerBeanName);
  23. } else {
  24. // 通过get方法获取
  25. TransactionManager defaultTransactionManager = getTransactionManager();
  26. // 如果没有
  27. if (defaultTransactionManager == null) {
  28. // 尝试从缓存中获取
  29. defaultTransactionManager = this.transactionManagerCache
  30. .get(DEFAULT_TRANSACTION_MANAGER_KEY);
  31. // 缓存里面没有从 ioc 容器中获取并且设置缓存
  32. if (defaultTransactionManager == null) {
  33. defaultTransactionManager = this.beanFactory.getBean(TransactionManager.class);
  34. this.transactionManagerCache.putIfAbsent(
  35. DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);
  36. }
  37. }
  38. return defaultTransactionManager;
  39. }
  40. }

image-20200729160650401

  • 类型转换

    1. @Nullable
    2. private PlatformTransactionManager asPlatformTransactionManager(
    3. @Nullable Object transactionManager) {
    4. if (transactionManager == null
    5. || transactionManager instanceof PlatformTransactionManager) {
    6. return (PlatformTransactionManager) transactionManager;
    7. } else {
    8. throw new IllegalStateException(
    9. "Specified transaction manager is not a PlatformTransactionManager: "
    10. + transactionManager);
    11. }
    12. }
  • 获取方法切面

    1. private String methodIdentification(Method method, @Nullable Class<?> targetClass,
    2. @Nullable TransactionAttribute txAttr) {
    3. String methodIdentification = methodIdentification(method, targetClass);
    4. if (methodIdentification == null) {
    5. if (txAttr instanceof DefaultTransactionAttribute) {
    6. // 直接就获取了.方法签名.
    7. methodIdentification = ((DefaultTransactionAttribute) txAttr).getDescriptor();
    8. }
    9. if (methodIdentification == null) {
    10. methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
    11. }
    12. }
    13. return methodIdentification;
    14. }

image-20200729161647214

  • 创建一个新的事务根据事务传播性

    1. @SuppressWarnings("serial")
    2. protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
    3. @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
    4. // If no name specified, apply method identification as transaction name.
    5. // 把切面的地址放进去
    6. if (txAttr != null && txAttr.getName() == null) {
    7. txAttr = new DelegatingTransactionAttribute(txAttr) {
    8. @Override
    9. public String getName() {
    10. return joinpointIdentification;
    11. }
    12. };
    13. }
    14. TransactionStatus status = null;
    15. if (txAttr != null) {
    16. if (tm != null) {
    17. // 事务状态
    18. // 获取事务
    19. status = tm.getTransaction(txAttr);
    20. } else {
    21. if (logger.isDebugEnabled()) {
    22. logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
    23. "] because no transaction manager has been configured");
    24. }
    25. }
    26. }
    27. // 处理出一个 TransactionInfo
    28. return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
    29. }

image-20200729163303000

  • tm.getTransaction

    1. @Override
    2. public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
    3. throws TransactionException {
    4. // Use defaults if no transaction definition given.
    5. // 获取事务的定义
    6. TransactionDefinition def = (definition != null ? definition
    7. : TransactionDefinition.withDefaults());
    8. // 获取事务
    9. Object transaction = doGetTransaction();
    10. boolean debugEnabled = logger.isDebugEnabled();
    11. // 是否存在事务
    12. if (isExistingTransaction(transaction)) {
    13. // Existing transaction found -> check propagation behavior to find out how to behave.
    14. // 存在事务后处理什么操作
    15. return handleExistingTransaction(def, transaction, debugEnabled);
    16. }
    17. // Check definition settings for new transaction.
    18. // 超时的校验. 小于默认值抛出异常
    19. if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
    20. throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    21. }
    22. // No existing transaction found -> check propagation behavior to find out how to proceed.
    23. // 没有事务抛出异常
    24. if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
    25. throw new IllegalTransactionStateException(
    26. "No existing transaction found for transaction marked with propagation 'mandatory'");
    27. } else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
    28. def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
    29. def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    30. SuspendedResourcesHolder suspendedResources = suspend(null);
    31. if (debugEnabled) {
    32. logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
    33. }
    34. try {
    35. boolean newSynchronization = (getTransactionSynchronization()
    36. != SYNCHRONIZATION_NEVER);
    37. DefaultTransactionStatus status = newTransactionStatus(
    38. def, transaction, true, newSynchronization, debugEnabled,
    39. suspendedResources);
    40. doBegin(transaction, def);
    41. prepareSynchronization(status, def);
    42. return status;
    43. } catch (RuntimeException | Error ex) {
    44. resume(null, suspendedResources);
    45. throw ex;
    46. }
    47. } else {
    48. // Create "empty" transaction: no actual transaction, but potentially synchronization.
    49. if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger
    50. .isWarnEnabled()) {
    51. logger.warn(
    52. "Custom isolation level specified but no actual transaction initiated; " +
    53. "isolation level will effectively be ignored: " + def);
    54. }
    55. boolean newSynchronization = (getTransactionSynchronization()
    56. == SYNCHRONIZATION_ALWAYS);
    57. return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled,
    58. null);
    59. }
    60. }
    • org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction

    • org.springframework.jdbc.datasource.DataSourceTransactionManager#doGetTransaction

      1. @Override
      2. protected Object doGetTransaction() {
      3. DataSourceTransactionObject txObject = new DataSourceTransactionObject();
      4. txObject.setSavepointAllowed(isNestedTransactionAllowed());
      5. // 数据库链接对象
      6. // 从事务管理器中获取数据库链接对象
      7. ConnectionHolder conHolder =
      8. (ConnectionHolder) TransactionSynchronizationManager
      9. .getResource(obtainDataSource());
      10. txObject.setConnectionHolder(conHolder, false);
      11. return txObject;
      12. }
    • org.springframework.transaction.support.AbstractPlatformTransactionManager#suspend

      1. @Nullable
      2. protected final SuspendedResourcesHolder suspend(@Nullable Object transaction)
      3. throws TransactionException {
      4. if (TransactionSynchronizationManager.isSynchronizationActive()) {
      5. List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
      6. try {
      7. Object suspendedResources = null;
      8. if (transaction != null) {
      9. suspendedResources = doSuspend(transaction);
      10. }
      11. // 线程名称
      12. String name = TransactionSynchronizationManager.getCurrentTransactionName();
      13. // 同步方法中设置
      14. TransactionSynchronizationManager.setCurrentTransactionName(null);
      15. // 只读设置
      16. boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
      17. // 同步方法中设置
      18. TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
      19. // 隔离级别
      20. Integer isolationLevel = TransactionSynchronizationManager
      21. .getCurrentTransactionIsolationLevel();
      22. // 同步方法中设置
      23. TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
      24. // 是否活跃
      25. boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
      26. TransactionSynchronizationManager.setActualTransactionActive(false);
      27. return new SuspendedResourcesHolder(
      28. suspendedResources, suspendedSynchronizations, name, readOnly,
      29. isolationLevel, wasActive);
      30. } catch (RuntimeException | Error ex) {
      31. // doSuspend failed - original transaction is still active...
      32. doResumeSynchronization(suspendedSynchronizations);
      33. throw ex;
      34. }
      35. } else if (transaction != null) {
      36. // Transaction active but no synchronization active.
      37. Object suspendedResources = doSuspend(transaction);
      38. return new SuspendedResourcesHolder(suspendedResources);
      39. } else {
      40. // Neither transaction nor synchronization active.
      41. return null;
      42. }
      43. }
  • prepareTransactionInfo简单的new对象并且绑定线程

    1. protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
    2. @Nullable TransactionAttribute txAttr, String joinpointIdentification,
    3. @Nullable TransactionStatus status) {
    4. // 初始化
    5. TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
    6. if (txAttr != null) {
    7. // We need a transaction for this method...
    8. if (logger.isTraceEnabled()) {
    9. logger.trace(
    10. "Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
    11. }
    12. // The transaction manager will flag an error if an incompatible tx already exists.
    13. txInfo.newTransactionStatus(status);
    14. } else {
    15. // The TransactionInfo.hasTransaction() method will return false. We created it only
    16. // to preserve the integrity of the ThreadLocal stack maintained in this class.
    17. if (logger.isTraceEnabled()) {
    18. logger.trace("No need to create transaction for [" + joinpointIdentification +
    19. "]: This method is not transactional.");
    20. }
    21. }
    22. // We always bind the TransactionInfo to the thread, even if we didn't create
    23. // a new transaction here. This guarantees that the TransactionInfo stack
    24. // will be managed correctly even if no transaction was created by this aspect.
    25. // 和线程绑定
    26. txInfo.bindToThread();
    27. return txInfo;
    28. }
  • retVal = invocation.proceedWithInvocation();

    • 这里走的是 CGLIB 的方法直接会执行结果将结果返回具体方法在

      org.springframework.aop.framework.CglibAopProxy.CglibMethodInvocation#proceed

    1. @Override
    2. @Nullable
    3. public Object proceed() throws Throwable {
    4. try {
    5. return super.proceed();
    6. }
    7. catch (RuntimeException ex) {
    8. throw ex;
    9. }
    10. catch (Exception ex) {
    11. if (ReflectionUtils.declaresException(getMethod(), ex.getClass())) {
    12. throw ex;
    13. }
    14. else {
    15. throw new UndeclaredThrowableException(ex);
    16. }
    17. }
    18. }
  • 如果没有异常就直接处理完成返回了

  • 我们现在是有异常的

    1. try {
    2. // This is an around advice: Invoke the next interceptor in the chain.
    3. // This will normally result in a target object being invoked.
    4. // 回调方法
    5. retVal = invocation.proceedWithInvocation();
    6. } catch (Throwable ex) {
    7. // target invocation exception
    8. // 回滚异常
    9. completeTransactionAfterThrowing(txInfo, ex);
    10. throw ex;
    11. } finally {
    12. // 消息清理
    13. cleanupTransactionInfo(txInfo);
    14. }
  • completeTransactionAfterThrowing回滚异常的处理方法

    1. protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo,
    2. Throwable ex) {
    3. if (txInfo != null && txInfo.getTransactionStatus() != null) {
    4. if (logger.isTraceEnabled()) {
    5. logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
    6. "] after exception: " + ex);
    7. }
    8. if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
    9. try {
    10. // 做回滚
    11. txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
    12. } catch (TransactionSystemException ex2) {
    13. logger.error("Application exception overridden by rollback exception", ex);
    14. ex2.initApplicationException(ex);
    15. throw ex2;
    16. } catch (RuntimeException | Error ex2) {
    17. logger.error("Application exception overridden by rollback exception", ex);
    18. throw ex2;
    19. }
    20. } else {
    21. // We don't roll back on this exception.
    22. // Will still roll back if TransactionStatus.isRollbackOnly() is true.
    23. try {
    24. // org.springframework.transaction.support.AbstractPlatformTransactionManager.commit 的方法
    25. txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
    26. } catch (TransactionSystemException ex2) {
    27. logger.error("Application exception overridden by commit exception", ex);
    28. ex2.initApplicationException(ex);
    29. throw ex2;
    30. } catch (RuntimeException | Error ex2) {
    31. logger.error("Application exception overridden by commit exception", ex);
    32. throw ex2;
    33. }
    34. }
    35. }
    36. }
    • 整理一下这里的流程

      1. 有异常走回滚

        txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus())

      2. 没有异常直接提交

        txInfo.getTransactionManager().commit(txInfo.getTransactionStatus())

    • 注意: 这里的异常如果是 exception 不会走回滚

  • 判断是否需要回滚

    1. txInfo.transactionAttribute.rollbackOn
    • 链路

      • org.springframework.transaction.interceptor.DelegatingTransactionAttribute#rollbackOn

        • org.springframework.transaction.interceptor.RuleBasedTransactionAttribute#rollbackOn

          1. @Override
          2. public boolean rollbackOn(Throwable ex) {
          3. if (logger.isTraceEnabled()) {
          4. logger.trace(
          5. "Applying rules to determine whether transaction should rollback on " + ex);
          6. }
          7. RollbackRuleAttribute winner = null;
          8. int deepest = Integer.MAX_VALUE;
          9. if (this.rollbackRules != null) {
          10. for (RollbackRuleAttribute rule : this.rollbackRules) {
          11. int depth = rule.getDepth(ex);
          12. if (depth >= 0 && depth < deepest) {
          13. deepest = depth;
          14. winner = rule;
          15. }
          16. }
          17. }
          18. if (logger.isTraceEnabled()) {
          19. logger.trace("Winning rollback rule is: " + winner);
          20. }
          21. // User superclass behavior (rollback on unchecked) if no rule matches.
          22. if (winner == null) {
          23. logger.trace("No relevant rollback rule found: applying default rules");
          24. return super.rollbackOn(ex);
          25. }
          26. return !(winner instanceof NoRollbackRuleAttribute);
          27. }
          • org.springframework.transaction.interceptor.DefaultTransactionAttribute#rollbackOn

            1. @Override
            2. public boolean rollbackOn(Throwable ex) {
            3. return (ex instanceof RuntimeException || ex instanceof Error);
            4. }
            • 这就是我们的异常判断是否需要回滚
  • cleanupTransactionInfo

    数据清理

    1. protected void cleanupTransactionInfo(@Nullable TransactionInfo txInfo) {
    2. if (txInfo != null) {
    3. txInfo.restoreThreadLocalStatus();
    4. }
    5. }
    1. private void restoreThreadLocalStatus() {
    2. // Use stack to restore old transaction TransactionInfo.
    3. // Will be null if none was set.
    4. transactionInfoHolder.set(this.oldTransactionInfo);
    5. }

编程式事务

DefaultTransactionDefinition

  • 默认的事务定义
    • 常见属性
      1. timeout
      2. readOnly
      3. ….

PlatformTransactionManager

  1. // 获取事务
  2. TransactionStatus getTransaction(@Nullable TransactionDefinition definition)throws TransactionException;
  3. // 提交事务
  4. void commit(TransactionStatus status) throws TransactionException;
  5. // 回滚事务
  6. void rollback(TransactionStatus status) throws TransactionException;
  • 贴出一部分

image-20200728105926218

  • AbstractPlatformTransactionManager 定义了一些基础属性 以及一些需要子类实现的方法
  1. // 属性
  2. defaultTimeout
  3. nestedTransactionAllowed
  4. validateExistingTransaction
  5. globalRollbackOnParticipationFailure
  6. failEarlyOnGlobalRollbackOnly
  7. rollbackOnCommitFailure
  8. // 方法
  9. doGetTransaction
  10. isExistingTransaction
  11. useSavepointForNestedTransaction
  12. doBegin
  13. doSuspend
  14. doResume
  15. shouldCommitOnGlobalRollbackOnly
  16. prepareForCommit
  17. doCommit
  18. doRollback
  19. doSetRollbackOnly
  20. registerAfterCompletionWithExistingTransaction
  21. doCleanupAfterCompletion

DataSourceTransactionManager

  • xml 配置如下
  1. <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource">
  2. <property name="url"
  3. value=""/>
  4. <property name="username" value=""/>
  5. <property name="password" value=""/>
  6. <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
  7. </bean>
  8. <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
  9. <property name="dataSource" ref="dataSource"/>
  10. </bean>
  11. <bean id="transactionManager"
  12. class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
  13. <property name="dataSource" ref="dataSource"/>
  14. </bean>
  • 两个属性,通常我们会配置 datasource

    1. @Nullable
    2. private DataSource dataSource;
    3. private boolean enforceReadOnly = false;
    • bean 的属性注入就不具体描述了

image-20200728133037075

  • InitializingBean

      1. @Override
      2. public void afterPropertiesSet() {
      3. if (getDataSource() == null) {
      4. throw new IllegalArgumentException("Property 'dataSource' is required");
      5. }
      6. }
      • 如果dataSource为空会抛出异常
      • 默认单例会注册到 ioc 容器中.后续注册流程不具体描述
  • 方法注释

  1. /**
  2. * 获取datasource
  3. */
  4. protected DataSource obtainDataSource() {
  5. DataSource dataSource = getDataSource();
  6. Assert.state(dataSource != null, "No DataSource set");
  7. return dataSource;
  8. }
  9. /**
  10. * 创建事务
  11. *
  12. * @return 事务对象
  13. */
  14. @Override
  15. protected Object doGetTransaction() {
  16. DataSourceTransactionObject txObject = new DataSourceTransactionObject();
  17. txObject.setSavepointAllowed(isNestedTransactionAllowed());
  18. // 数据库链接对象
  19. // 从事务管理器中获取数据库链接对象
  20. ConnectionHolder conHolder =
  21. (ConnectionHolder) TransactionSynchronizationManager
  22. .getResource(obtainDataSource());
  23. txObject.setConnectionHolder(conHolder, false);
  24. return txObject;
  25. }
  26. /**
  27. * 是否存在事务
  28. *
  29. * @param transaction transaction object returned by doGetTransaction
  30. * @return
  31. */
  32. @Override
  33. protected boolean isExistingTransaction(Object transaction) {
  34. DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  35. return (txObject.hasConnectionHolder() && txObject.getConnectionHolder()
  36. .isTransactionActive());
  37. }
  38. /**
  39. * This implementation sets the isolation level but ignores the timeout. 事务的开始方法
  40. */
  41. @Override
  42. protected void doBegin(Object transaction, TransactionDefinition definition) {
  43. // 拿出事务
  44. DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  45. // 链接对象
  46. Connection con = null;
  47. try {
  48. if (!txObject.hasConnectionHolder() ||
  49. txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
  50. // 数据库链接对象
  51. Connection newCon = obtainDataSource().getConnection();
  52. if (logger.isDebugEnabled()) {
  53. logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
  54. }
  55. // 设置数据库连接
  56. txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
  57. }
  58. // 拿出链接对象并且设置同步事务
  59. txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
  60. // 链接对象赋值
  61. con = txObject.getConnectionHolder().getConnection();
  62. // 获取事务级别
  63. Integer previousIsolationLevel = DataSourceUtils
  64. .prepareConnectionForTransaction(con, definition);
  65. // 设置事务隔离级别
  66. txObject.setPreviousIsolationLevel(previousIsolationLevel);
  67. // 设置只读
  68. txObject.setReadOnly(definition.isReadOnly());
  69. // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
  70. // so we don't want to do it unnecessarily (for example if we've explicitly
  71. // configured the connection pool to set it already).
  72. // 判断是否自动提交
  73. if (con.getAutoCommit()) {
  74. txObject.setMustRestoreAutoCommit(true);
  75. if (logger.isDebugEnabled()) {
  76. logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
  77. }
  78. con.setAutoCommit(false);
  79. }
  80. // 事务链接准备
  81. prepareTransactionalConnection(con, definition);
  82. // 事务激活
  83. txObject.getConnectionHolder().setTransactionActive(true);
  84. // 超时时间获取
  85. int timeout = determineTimeout(definition);
  86. // 默认超时时间设置
  87. if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
  88. txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
  89. }
  90. // Bind the connection holder to the thread.
  91. // 将链接和当前线程绑定
  92. if (txObject.isNewConnectionHolder()) {
  93. // k: datasource v: connectionHolder
  94. TransactionSynchronizationManager
  95. .bindResource(obtainDataSource(), txObject.getConnectionHolder());
  96. }
  97. } catch (Throwable ex) {
  98. if (txObject.isNewConnectionHolder()) {
  99. // 释放链接
  100. DataSourceUtils.releaseConnection(con, obtainDataSource());
  101. txObject.setConnectionHolder(null, false);
  102. }
  103. throw new CannotCreateTransactionException(
  104. "Could not open JDBC Connection for transaction", ex);
  105. }
  106. }
  107. /**
  108. * 挂起事务
  109. *
  110. * @param transaction transaction object returned by {@code doGetTransaction}
  111. * @return 移除的链接
  112. */
  113. @Override
  114. protected Object doSuspend(Object transaction) {
  115. // 获取事务对象
  116. DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  117. // 连接置空
  118. txObject.setConnectionHolder(null);
  119. // 解除资源绑定
  120. return TransactionSynchronizationManager.unbindResource(obtainDataSource());
  121. }
  122. /**
  123. * 恢复事务
  124. *
  125. * @param transaction transaction object returned by {@code doGetTransaction}
  126. * @param suspendedResources the object that holds suspended resources, as returned by
  127. * doSuspend
  128. */
  129. @Override
  130. protected void doResume(@Nullable Object transaction, Object suspendedResources) {
  131. // 资源绑定
  132. TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
  133. }
  134. /**
  135. * 做提交
  136. *
  137. * @param status the status representation of the transaction
  138. */
  139. @Override
  140. protected void doCommit(DefaultTransactionStatus status) {
  141. // 事务对象
  142. DataSourceTransactionObject txObject = (DataSourceTransactionObject) status
  143. .getTransaction();
  144. // 获取链接
  145. Connection con = txObject.getConnectionHolder().getConnection();
  146. if (status.isDebug()) {
  147. logger.debug("Committing JDBC transaction on Connection [" + con + "]");
  148. }
  149. try {
  150. // 链接提交
  151. con.commit();
  152. } catch (SQLException ex) {
  153. throw new TransactionSystemException("Could not commit JDBC transaction", ex);
  154. }
  155. }
  156. /**
  157. * 事务回滚
  158. *
  159. * @param status the status representation of the transaction
  160. */
  161. @Override
  162. protected void doRollback(DefaultTransactionStatus status) {
  163. // 事务对象
  164. DataSourceTransactionObject txObject = (DataSourceTransactionObject) status
  165. .getTransaction();
  166. // 链接对象
  167. Connection con = txObject.getConnectionHolder().getConnection();
  168. if (status.isDebug()) {
  169. logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
  170. }
  171. try {
  172. // 回滚方法
  173. con.rollback();
  174. } catch (SQLException ex) {
  175. throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
  176. }
  177. }
  178. /**
  179. * 设置回滚
  180. *
  181. * @param status the status representation of the transaction
  182. */
  183. @Override
  184. protected void doSetRollbackOnly(DefaultTransactionStatus status) {
  185. DataSourceTransactionObject txObject = (DataSourceTransactionObject) status
  186. .getTransaction();
  187. if (status.isDebug()) {
  188. logger.debug(
  189. "Setting JDBC transaction [" + txObject.getConnectionHolder().getConnection() +
  190. "] rollback-only");
  191. }
  192. txObject.setRollbackOnly();
  193. }
  194. /**
  195. * 清除
  196. *
  197. * @param transaction transaction object returned by {@code doGetTransaction}
  198. */
  199. @Override
  200. protected void doCleanupAfterCompletion(Object transaction) {
  201. DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  202. // Remove the connection holder from the thread, if exposed.
  203. if (txObject.isNewConnectionHolder()) {
  204. // 释放datasource绑定的资源
  205. TransactionSynchronizationManager.unbindResource(obtainDataSource());
  206. }
  207. // Reset connection.
  208. Connection con = txObject.getConnectionHolder().getConnection();
  209. try {
  210. if (txObject.isMustRestoreAutoCommit()) {
  211. con.setAutoCommit(true);
  212. }
  213. // 重置链接
  214. DataSourceUtils.resetConnectionAfterTransaction(
  215. con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());
  216. } catch (Throwable ex) {
  217. logger.debug("Could not reset JDBC Connection after transaction", ex);
  218. }
  219. if (txObject.isNewConnectionHolder()) {
  220. if (logger.isDebugEnabled()) {
  221. logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
  222. }
  223. DataSourceUtils.releaseConnection(con, this.dataSource);
  224. }
  225. txObject.getConnectionHolder().clear();
  226. }
  227. /**
  228. *
  229. * 事务准备
  230. */
  231. protected void prepareTransactionalConnection(Connection con, TransactionDefinition definition)
  232. throws SQLException {
  233. if (isEnforceReadOnly() && definition.isReadOnly()) {
  234. try (Statement stmt = con.createStatement()) {
  235. // 执行sql 类似事务隔离级别
  236. stmt.executeUpdate("SET TRANSACTION READ ONLY");
  237. }
  238. }
  239. }

内部类 DataSourceTransactionObject

  1. private static class DataSourceTransactionObject extends JdbcTransactionObjectSupport {
  2. /**
  3. * 是否有新的链接
  4. */
  5. private boolean newConnectionHolder;
  6. /**
  7. * 是否自动提交
  8. */
  9. private boolean mustRestoreAutoCommit;
  10. }

AbstractPlatformTransactionManager

  • abstract 修饰具体定义的方法不具体展开。主要关注实现org.springframework.transaction.PlatformTransactionManager的几个方法

commit 方法

  1. @Override
  2. public final void commit(TransactionStatus status) throws TransactionException {
  3. if (status.isCompleted()) {
  4. throw new IllegalTransactionStateException(
  5. "Transaction is already completed - do not call commit or rollback more than once per transaction");
  6. }
  7. // 事务状态
  8. DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
  9. if (defStatus.isLocalRollbackOnly()) {
  10. if (defStatus.isDebug()) {
  11. logger.debug("Transactional code has requested rollback");
  12. }
  13. // 处理回滚
  14. processRollback(defStatus, false);
  15. return;
  16. }
  17. if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
  18. if (defStatus.isDebug()) {
  19. logger.debug(
  20. "Global transaction is marked as rollback-only but transactional code requested commit");
  21. }
  22. // 处理回滚
  23. processRollback(defStatus, true);
  24. return;
  25. }
  26. // 真正的处理提交
  27. processCommit(defStatus);
  28. }
  1. private void processCommit(DefaultTransactionStatus status) throws TransactionException {
  2. try {
  3. boolean beforeCompletionInvoked = false;
  4. try {
  5. boolean unexpectedRollback = false;
  6. //
  7. prepareForCommit(status);
  8. triggerBeforeCommit(status);
  9. triggerBeforeCompletion(status);
  10. // 前置任务是否已经执行
  11. beforeCompletionInvoked = true;
  12. // 嵌套事务. 是否有保存点
  13. if (status.hasSavepoint()) {
  14. if (status.isDebug()) {
  15. logger.debug("Releasing transaction savepoint");
  16. }
  17. unexpectedRollback = status.isGlobalRollbackOnly();
  18. status.releaseHeldSavepoint();
  19. } else if (status.isNewTransaction()) {
  20. if (status.isDebug()) {
  21. logger.debug("Initiating transaction commit");
  22. }
  23. unexpectedRollback = status.isGlobalRollbackOnly();
  24. doCommit(status);
  25. } else if (isFailEarlyOnGlobalRollbackOnly()) {
  26. unexpectedRollback = status.isGlobalRollbackOnly();
  27. }
  28. // Throw UnexpectedRollbackException if we have a global rollback-only
  29. // marker but still didn't get a corresponding exception from commit.
  30. if (unexpectedRollback) {
  31. throw new UnexpectedRollbackException(
  32. "Transaction silently rolled back because it has been marked as rollback-only");
  33. }
  34. } catch (UnexpectedRollbackException ex) {
  35. // can only be caused by doCommit
  36. // 事务的同步状态: 回滚
  37. triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
  38. throw ex;
  39. } catch (TransactionException ex) {
  40. // can only be caused by doCommit
  41. // 提交失败 做回滚
  42. if (isRollbackOnCommitFailure()) {
  43. doRollbackOnCommitException(status, ex);
  44. } else {
  45. // 事务的同步状态: 未知
  46. triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
  47. }
  48. throw ex;
  49. } catch (RuntimeException | Error ex) {
  50. if (!beforeCompletionInvoked) {
  51. triggerBeforeCompletion(status);
  52. }
  53. doRollbackOnCommitException(status, ex);
  54. throw ex;
  55. }
  56. // Trigger afterCommit callbacks, with an exception thrown there
  57. // propagated to callers but the transaction still considered as committed.
  58. try {
  59. triggerAfterCommit(status);
  60. } finally {
  61. triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
  62. }
  63. } finally {
  64. // 完成后清理
  65. cleanupAfterCompletion(status);
  66. }
  67. }

rollback 方法

  1. @Override
  2. public final void rollback(TransactionStatus status) throws TransactionException {
  3. // 是否已完成
  4. if (status.isCompleted()) {
  5. throw new IllegalTransactionStateException(
  6. "Transaction is already completed - do not call commit or rollback more than once per transaction");
  7. }
  8. DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
  9. // 执行回滚
  10. processRollback(defStatus, false);
  11. }
  1. private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
  2. try {
  3. boolean unexpectedRollback = unexpected;
  4. try {
  5. triggerBeforeCompletion(status);
  6. // 嵌套事务
  7. if (status.hasSavepoint()) {
  8. if (status.isDebug()) {
  9. logger.debug("Rolling back transaction to savepoint");
  10. }
  11. // 回滚保存点
  12. status.rollbackToHeldSavepoint();
  13. }
  14. // 独立事务
  15. else if (status.isNewTransaction()) {
  16. if (status.isDebug()) {
  17. logger.debug("Initiating transaction rollback");
  18. }
  19. // 执行回滚
  20. doRollback(status);
  21. } else {
  22. // Participating in larger transaction
  23. if (status.hasTransaction()) {
  24. if (status.isLocalRollbackOnly()
  25. || isGlobalRollbackOnParticipationFailure()) {
  26. if (status.isDebug()) {
  27. logger.debug(
  28. "Participating transaction failed - marking existing transaction as rollback-only");
  29. }
  30. // 设置回滚
  31. doSetRollbackOnly(status);
  32. } else {
  33. if (status.isDebug()) {
  34. logger.debug(
  35. "Participating transaction failed - letting transaction originator decide on rollback");
  36. }
  37. }
  38. } else {
  39. logger.debug(
  40. "Should roll back transaction but cannot - no transaction available");
  41. }
  42. // Unexpected rollback only matters here if we're asked to fail early
  43. if (!isFailEarlyOnGlobalRollbackOnly()) {
  44. unexpectedRollback = false;
  45. }
  46. }
  47. } catch (RuntimeException | Error ex) {
  48. triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
  49. throw ex;
  50. }
  51. triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
  52. // Raise UnexpectedRollbackException if we had a global rollback-only marker
  53. if (unexpectedRollback) {
  54. throw new UnexpectedRollbackException(
  55. "Transaction rolled back because it has been marked as rollback-only");
  56. }
  57. } finally {
  58. cleanupAfterCompletion(status);
  59. }
  60. }

TransactionSynchronizationManager

  • 事务同步管理器

  • 一些基本属性

  1. /**
  2. * 资源
  3. */
  4. private static final ThreadLocal<Map<Object, Object>> resources =
  5. new NamedThreadLocal<>("Transactional resources");
  6. /**
  7. * 同步器
  8. */
  9. private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
  10. new NamedThreadLocal<>("Transaction synchronizations");
  11. /**
  12. * 事务名称
  13. */
  14. private static final ThreadLocal<String> currentTransactionName =
  15. new NamedThreadLocal<>("Current transaction name");
  16. /**
  17. * 是否只读
  18. */
  19. private static final ThreadLocal<Boolean> currentTransactionReadOnly =
  20. new NamedThreadLocal<>("Current transaction read-only status");
  21. /**
  22. * 事务隔离级别
  23. */
  24. private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
  25. new NamedThreadLocal<>("Current transaction isolation level");
  26. /**
  27. * 事务激活状态
  28. */
  29. private static final ThreadLocal<Boolean> actualTransactionActive =
  30. new NamedThreadLocal<>("Actual transaction active");

资源方法

获取资源
  1. public static Map<Object, Object> getResourceMap() {
  2. // 线程变量中获取
  3. Map<Object, Object> map = resources.get();
  4. // 判空 如果为空给个空map如果有就返回
  5. return (map != null ? Collections.unmodifiableMap(map) : Collections.emptyMap());
  6. }
判断是否存在资源
  1. public static boolean hasResource(Object key) {
  2. // 资源key获取
  3. // 通过 unwrapResourceIfNecessary 会走一次资源对象转换.
  4. // 1. InfrastructureProxy
  5. // 2. ScopedObject
  6. Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
  7. Object value = doGetResource(actualKey);
  8. return (value != null);
  9. }
  • unwrapResourceIfNecessary方法会将资源具体化到接口,从接口中调用方法获取具体的资源

    1. static Object unwrapResourceIfNecessary(Object resource) {
    2. Assert.notNull(resource, "Resource must not be null");
    3. Object resourceRef = resource;
    4. // unwrap infrastructure proxy
    5. if (resourceRef instanceof InfrastructureProxy) {
    6. resourceRef = ((InfrastructureProxy) resourceRef).getWrappedObject();
    7. }
    8. if (aopAvailable) {
    9. // now unwrap scoped proxy
    10. resourceRef = ScopedProxyUnwrapper.unwrapIfNecessary(resourceRef);
    11. }
    12. return resourceRef;
    13. }
    14. private static class ScopedProxyUnwrapper {
    15. public static Object unwrapIfNecessary(Object resource) {
    16. if (resource instanceof ScopedObject) {
    17. return ((ScopedObject) resource).getTargetObject();
    18. } else {
    19. return resource;
    20. }
    21. }
    22. }
  • doGetResource 方法去获取资源

    1. @Nullable
    2. private static Object doGetResource(Object actualKey) {
    3. Map<Object, Object> map = resources.get();
    4. if (map == null) {
    5. return null;
    6. }
    7. Object value = map.get(actualKey);
    8. // Transparently remove ResourceHolder that was marked as void...
    9. // 如果资源是下面两种的其中一个就删除这个资源
    10. if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
    11. map.remove(actualKey);
    12. // Remove entire ThreadLocal if empty...
    13. if (map.isEmpty()) {
    14. resources.remove();
    15. }
    16. value = null;
    17. }
    18. return value;
    19. }
资源绑定
  1. public static void bindResource(Object key, Object value) throws IllegalStateException {
  2. // 将资源转换为正真的key
  3. Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
  4. Assert.notNull(value, "Value must not be null");
  5. Map<Object, Object> map = resources.get();
  6. // set ThreadLocal Map if none found
  7. // 资源对象为空初始化
  8. if (map == null) {
  9. map = new HashMap<>();
  10. resources.set(map);
  11. }
  12. // 原来的值
  13. Object oldValue = map.put(actualKey, value);
  14. // Transparently suppress a ResourceHolder that was marked as void...
  15. // 如果原来的值是下面的两种 抛出异常
  16. if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
  17. oldValue = null;
  18. }
  19. if (oldValue != null) {
  20. throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
  21. actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
  22. }
  23. if (logger.isTraceEnabled()) {
  24. logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" +
  25. Thread.currentThread().getName() + "]");
  26. }
  27. }
  • debug 使用的是 druid 的数据源

image-20200729090322058

  • unwrapResourceIfNecessary 方法
  1. static Object unwrapResourceIfNecessary(Object resource) {
  2. Assert.notNull(resource, "Resource must not be null");
  3. Object resourceRef = resource;
  4. // unwrap infrastructure proxy
  5. if (resourceRef instanceof InfrastructureProxy) {
  6. resourceRef = ((InfrastructureProxy) resourceRef).getWrappedObject();
  7. }
  8. if (aopAvailable) {
  9. // now unwrap scoped proxy
  10. resourceRef = ScopedProxyUnwrapper.unwrapIfNecessary(resourceRef);
  11. }
  12. return resourceRef;
  13. }

显然com.alibaba.druid.pool.DruidDataSource不是InfrastructureProxy

  • aopAvailable

    1. private static final boolean aopAvailable = ClassUtils.isPresent(
    2. "org.springframework.aop.scope.ScopedObject",
    3. TransactionSynchronizationUtils.class.getClassLoader());
    1. public static boolean isPresent(String className, @Nullable ClassLoader classLoader) {
    2. try {
    3. forName(className, classLoader);
    4. return true;
    5. }
    6. catch (IllegalAccessError err) {
    7. throw new IllegalStateException("Readability mismatch in inheritance hierarchy of class [" +
    8. className + "]: " + err.getMessage(), err);
    9. }
    10. catch (Throwable ex) {
    11. // Typically ClassNotFoundException or NoClassDefFoundError...
    12. return false;
    13. }
    14. }

    看是否可以解析如果解析成功返回true 解析失败返回false

  • ScopedProxyUnwrapper.unwrapIfNecessary

    1. private static class ScopedProxyUnwrapper {
    2. public static Object unwrapIfNecessary(Object resource) {
    3. if (resource instanceof ScopedObject) {
    4. return ((ScopedObject) resource).getTargetObject();
    5. } else {
    6. return resource;
    7. }
    8. }
    9. }
    • com.alibaba.druid.pool.DruidDataSource不是ScopedObject 直接返回

后续就是一个mapput方法不具体展开

解除资源绑定
  1. public static Object unbindResource(Object key) throws IllegalStateException {
  2. // 获取真正的资源对象
  3. Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
  4. // map 移除key
  5. Object value = doUnbindResource(actualKey);
  6. if (value == null) {
  7. throw new IllegalStateException(
  8. "No value for key [" + actualKey + "] bound to thread [" + Thread
  9. .currentThread().getName() + "]");
  10. }
  11. return value;
  12. }
  13. @Nullable
  14. private static Object doUnbindResource(Object actualKey) {
  15. Map<Object, Object> map = resources.get();
  16. if (map == null) {
  17. return null;
  18. }
  19. Object value = map.remove(actualKey);
  20. // Remove entire ThreadLocal if empty...
  21. if (map.isEmpty()) {
  22. resources.remove();
  23. }
  24. // Transparently suppress a ResourceHolder that was marked as void...
  25. if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
  26. value = null;
  27. }
  28. if (value != null && logger.isTraceEnabled()) {
  29. logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread [" +
  30. Thread.currentThread().getName() + "]");
  31. }
  32. return value;
  33. }

map 对象的 remove 操作

其他

  • 其他几个都是使用ThreadLocal进行数据设置操作即可.

TransactionTemplate

  • 属性

    1. @Nullable
    2. private PlatformTransactionManager transactionManager;

    前文说到 DataSourceTransactionManager 实现了 PlatformTransactionManager 因此配置的时候我们有如下片段

    1. <bean id="transactionTemplate"
    2. class="org.springframework.transaction.support.TransactionTemplate">
    3. <property name="transactionManager" ref="transactionManager"/>
    4. </bean>
  • 事务操作模板类图

    image-20200728094658684

  • org.springframework.beans.factory.InitializingBean接口的实现

    1. @Override
    2. public void afterPropertiesSet() {
    3. if (this.transactionManager == null) {
    4. throw new IllegalArgumentException("Property 'transactionManager' is required");
    5. }
    6. }

execute

  1. @Override
  2. @Nullable
  3. public <T> T execute(TransactionCallback<T> action) throws TransactionException {
  4. Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");
  5. // 事务管理是否是 xxx接口
  6. if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
  7. // 强转执行
  8. return ((CallbackPreferringPlatformTransactionManager) this.transactionManager)
  9. .execute(this, action);
  10. } else {
  11. // 获取事务状态
  12. TransactionStatus status = this.transactionManager.getTransaction(this);
  13. // 返回结果
  14. T result;
  15. try {
  16. // 事务回调执行
  17. result = action.doInTransaction(status);
  18. } catch (RuntimeException | Error ex) {
  19. // Transactional code threw application exception -> rollback
  20. // 回滚异常
  21. rollbackOnException(status, ex);
  22. throw ex;
  23. } catch (Throwable ex) {
  24. // Transactional code threw unexpected exception -> rollback
  25. // 回滚异常
  26. rollbackOnException(status, ex);
  27. throw new UndeclaredThrowableException(ex,
  28. "TransactionCallback threw undeclared checked exception");
  29. }
  30. // 提交
  31. this.transactionManager.commit(status);
  32. return result;
  33. }
  34. }