1 MySQL事务实现

1.1 原子性

undolog实现事务原子性
image.png

1.2 持久性

通过redolog实现事务持久性
image.png

1.3 隔离性

undolog,锁机制,隐藏列等,隐藏列记录版本号,更新时间,undolog指针

  • 脏读

当事务A在T3时间节点读取zhangsan的余额时,会发现数据已被其他事务修改,且状态为未提交。此时事务A读取最新数据后,根据数据的undo log执行回滚操作,得到事务B修改前的数据,从而避免了脏读。

  • 不可重复读

当事务A在T2节点第一次读取数据时,会记录该数据的版本号(数据的版本号是以row为单位记录的),假设版本号为1;当事务B提交时,该行记录的版本号增加,假设版本号为2;当事务A在T5再一次读取数据时,发现数据的版本号(2)大于第一次读取时记录的版本号(1),因此会根据undo log执行回滚操作,得到版本号为1时的数据,从而实现了可重复读。

  • 幻读

InnoDB实现的RR通过next-key lock机制避免了幻读现象。
next-keylock是行锁的一种,实现相当于recordlock(记录锁)+gaplock(间隙锁);其特点是不仅会锁住记录本身(recordlock的功能),还会锁定一个范围(gaplock的功能)。当然,这里我们讨论的是不加锁读:此时的next-keylock并不是真的加锁,只是为读取的数据增加了标记(标记内容包括数据的版本号等);准确起见姑且称之为类next-keylock机制。还是以前面的例子来说明:
image.png
当事务A在T2节点第一次读取0

1.4 一致性

可以说,一致性是事务追求的最终目标:前面提到的原子性、持久性和隔离性,都是为了保证数据库状态的一致性。此外,除了数据库层面的保障,一致性的实现也需要应用层面进行保障。
实现一致性的措施包括:
保证原子性、持久性和隔离性,如果这些特性无法保证,事务的一致性也无法保证数据库本身提供保障,例如不允许向整形列插入字符串值、字符串长度不能超过列的限制等应用层面进行保障,例如如果转账操作只扣除转账者的余额,而没有增加接收者的余额,无论数据库实现的多么完美,也无法保证状态的一致

2 Spring事务实现

传播机制
Spring针对方法嵌套调用时事务的创建行为定义了七种事务传播机制,分别是PROPAGATION_REQUIRED、PROPAGATION_SUPPORT、PROPAGATION_MANDATORY、PROPAGATION_REQUIRES_NEW、PROPAGATION_NOT_SUPPORTED、PROPAGATION_NEVER以及PROPAGATION_NESTED,基本上从字面意思就能知道每种传播机制各自的行为表现,Spring默认的事务传播机制是PROPAGATION_REQUIRED,即如果当前存在事务,则使用当前事务,否则创建新的事务。详情可参考Spring事务传播行为
事务行为
事务的行为包括事务开启、事务提交和事务回滚。InnoDB所有的用户SQL执行都在事务控制之内,在默认情况下,autocommit设置为true,单条SQL执行成功后,MySQL会自动提交事务,或者如果SQL执行出错,则根据异常类型执行事务提交或者回滚。可以使用START TRANSACTION(SQL标准)或者BEGIN开启事务,使用COMMIT和ROLLBACK提交和回滚事务;也可以通过设置autocommit属性来控制事务行为,当设置autocommit为false时,其后执行的多条SQL语句将在一个事务内,直到执行COMMIT或者ROLLBACK事务才会提交或者回滚。
AOP增强
Spring使用AOP(面向切面编程)来实现声明式事务,后续在讲Spring事务具体实现的时候会详细说明,关于AOP的概念可参考Spring AOP概念理解(通俗易懂),这里不再细说。说下动态代理和AOP增强。
动态代理是Spring实现AOP的默认方式,分为两种:JDK动态代理和CGLIB动态代理。JDK动态代理面向接口,通过反射生成目标代理接口的匿名实现类;CGLIB动态代理则通过继承,使用字节码增强技术(或者objenesis类库)为目标代理类生成代理子类。Spring默认对接口实现使用JDK动态代理,对具体类使用CGLIB,同时也支持配置全局使用CGLIB来生成代理对象。
我们在切面配置中会使用到@Aspect注解,这里用到了Aspectj的切面表达式。Aspectj是java语言实现的一个AOP框架,使用静态代理模式,拥有完善的AOP功能,与Spring AOP互为补充。Spring采用了Aspectj强大的切面表达式定义方式,但是默认情况下仍然使用动态代理方式,并未使用Aspectj的编译器和织入器,当然也支持配置使用Aspectj静态代理替代动态代理方式。Aspectj功能更强大,比方说它支持对字段、POJO类进行增强,与之相对,Spring只支持对Bean方法级别进行增强。
Spring对方法的增强有五种方式
前置增强(org.springframework.aop.BeforeAdvice):在目标方法执行之前进行增强;
后置增强(org.springframework.aop.AfterReturningAdvice):在目标方法执行之后进行增强;
环绕增强(org.aopalliance.intercept.MethodInterceptor):在目标方法执行前后都执行增强;
异常抛出增强(org.springframework.aop.ThrowsAdvice):在目标方法抛出异常后执行增强;
引介增强(org.springframework.aop.IntroductionInterceptor):为目标类添加新的方法和属性。
声明式事务的实现就是通过环绕增强的方式,在目标方法执行之前开启事务,在目标方法执行之后提交或者回滚事务,事务拦截器的继承关系图可以体现这一点:

image.png

Spring事务抽象
统一一致的事务抽象是Spring框架的一大优势,无论是全局事务还是本地事务,JTA、JDBC、Hibernate还是JPA,Spring都使用统一的编程模型,使得应用程序可以很容易地在全局事务与本地事务,或者不同的事务框架之间进行切换。下图是Spring事务抽象的核心类图:

image.png

接口PlatformTransactionManager定义了事务操作的行为,其依赖TransactionDefinition和TransactionStatus接口,其实大部分的事务属性和行为我们以MySQL数据库为例已经有过了解,这里再对应介绍下。
PlatformTransactionManager:事务管理器
getTransaction方法:事务获取操作,根据事务属性定义,获取当前事务或者创建新事物;
commit方法:事务提交操作,注意这里所说的提交并非直接提交事务,而是根据当前事务状态执行提交或者回滚操作;
rollback方法:事务回滚操作,同样,也并非一定直接回滚事务,也有可能只是标记事务为只读,等待其他调用方执行回滚。
TransactionDefinition:事务属性定义
getPropagationBehavior方法:返回事务的传播属性,默认是PROPAGATION_REQUIRED;
getIsolationLevel方法:返回事务隔离级别,事务隔离级别只有在创建新事务时才有效,也就是说只对应传播属性PROPAGATION_REQUIRED和PROPAGATION_REQUIRES_NEW;
getTimeout方法:返回事务超时时间,以秒为单位,同样只有在创建新事务时才有效;
isReadOnly方法:是否优化为只读事务,支持这项属性的事务管理器会将事务标记为只读,只读事务不允许有写操作,不支持只读属性的事务管理器需要忽略这项设置,这一点跟其他事务属性定义不同,针对其他不支持的属性设置,事务管理器应该抛出异常。
getName方法:返回事务名称,声明式事务中默认值为“类的完全限定名.方法名”。
TransactionStatus:当前事务状态
isNewTransaction方法:当前方法是否创建了新事务(区别于使用现有事务以及没有事务);
hasSavepoint方法:在嵌套事务场景中,判断当前事务是否包含保存点;
setRollbackOnly和isRollbackOnly方法:只读属性设置(主要用于标记事务,等待回滚)和查询;
flush方法:刷新底层会话中的修改到数据库,一般用于刷新如Hibernate/JPA的会话,是否生效由具体事务资源实现决定;
isCompleted方法:判断当前事务是否已完成(已提交或者已回滚)。
部分Spring包含的对PlatformTransactionManager的实现类如下图所示:

image.png

AbstractPlatformTransactionManager抽象类实现了Spring事务的标准流程,其子类DataSourceTransactionManager是我们使用较多的JDBC单数据源事务管理器,而JtaTransactionManager是JTA(Java Transaction API)规范的实现类,另外两个则分别是JavaEE容器WebLogic和WebSphere的JTA事务管理器的具体实现。
Spring事务切面
之前提到,Spring采用AOP来实现声明式事务,那么事务的AOP切面是如何织入的呢?这一点涉及到AOP动态代理对象的生成过程。
代理对象生成的核心类是AbstractAutoProxyCreator,实现了BeanPostProcessor接口,会在Bean初始化完成之后,通过postProcessAfterInitialization方法生成代理对象,关于BeanPostProcessor在Bean生命周期中的作用,可参考一些常用的Spring扩展接口
看一下AbstractAutoProxyCreator类的核心代码,主要关注三个方法:postProcessAfterInitialization、wrapIfNecessary和createProxy,为了突出核心流程,以注释代替了部分代码的具体实现,后续的源码分析也采用相同的处理。

  1. // AbstractAutoProxyCreator.class
  2. @Override
  3. public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
  4. if (bean != null) {
  5. Object cacheKey = getCacheKey(bean.getClass(), beanName);
  6. if (!this.earlyProxyReferences.contains(cacheKey)) {
  7. // 创建代理对象
  8. return wrapIfNecessary(bean, beanName, cacheKey);
  9. }
  10. }
  11. return bean;
  12. }
  13. protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
  14. // 参数检查,跳过已经执行过代理对象生成,或者已知的不需要生成代理对象的Bean
  15. ...
  16. // Create proxy if we have advice.
  17. // 查询当前Bean所有的AOP增强配置,最终是通过AOPUtils工具类实现
  18. Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
  19. if (specificInterceptors != DO_NOT_PROXY) {
  20. this.advisedBeans.put(cacheKey, Boolean.TRUE);
  21. // 执行AOP织入,创建代理对象
  22. Object proxy = createProxy(
  23. bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
  24. this.proxyTypes.put(cacheKey, proxy.getClass());
  25. return proxy;
  26. }
  27. this.advisedBeans.put(cacheKey, Boolean.FALSE);
  28. return bean;
  29. }
  30. protected Object createProxy(Class<?> beanClass, String beanName, Object[] specificInterceptors, TargetSource targetSource) {
  31. if (this.beanFactory instanceof ConfigurableListableBeanFactory) {
  32. AutoProxyUtils.exposeTargetClass((ConfigurableListableBeanFactory) this.beanFactory, beanName, beanClass);
  33. }
  34. // 实例化代理工厂类
  35. ProxyFactory proxyFactory = new ProxyFactory();
  36. proxyFactory.copyFrom(this);
  37. // 当全局使用动态代理时,设置是否需要对目标Bean强制使用CGLIB动态代理
  38. ...
  39. // 构建AOP增强顾问,包含框架公共增强和应用程序自定义增强
  40. // 设置proxyFactory属性,如增强、目标类、是否允许变更等
  41. ...
  42. // 创建代理对象
  43. return proxyFactory.getProxy(getProxyClassLoader());
  44. }

最后是通过调用ProxyFactory#getProxy(java.lang.ClassLoader)方法来创建代理对象:

  1. // ProxyFactory.class
  2. public Object getProxy(ClassLoader classLoader) {
  3. return createAopProxy().getProxy(classLoader);
  4. }
  5. // ProxyFactory父类ProxyCreatorSupport.class
  6. protected final synchronized AopProxy createAopProxy() {
  7. if (!this.active) {
  8. activate();
  9. }
  10. return getAopProxyFactory().createAopProxy(this);
  11. }
  12. public ProxyCreatorSupport() {
  13. this.aopProxyFactory = new DefaultAopProxyFactory();
  14. }

ProxyFactory的父类构造器实例化了DefaultAopProxyFactory类,从其源代码我们可以看到Spring动态代理方式选择策略的实现:如果目标类optimize,proxyTargetClass属性设置为true或者未指定需要代理的接口,则使用CGLIB生成代理对象,否则使用JDK动态代理。

  1. public class DefaultAopProxyFactory implements AopProxyFactory, Serializable {
  2. @Override
  3. public AopProxy createAopProxy(AdvisedSupport config) throws AopConfigException {
  4. // 如果optimize,proxyTargetClass属性设置为true或者未指定代理接口,则使用CGLIB生成代理对象
  5. if (config.isOptimize() || config.isProxyTargetClass() || hasNoUserSuppliedProxyInterfaces(config)) {
  6. Class<?> targetClass = config.getTargetClass();
  7. // 参数检查,targetClass为空抛出异常
  8. ...
  9. // 目标类本身是接口或者代理对象,仍然使用JDK动态代理
  10. if (targetClass.isInterface() || Proxy.isProxyClass(targetClass)) {
  11. return new JdkDynamicAopProxy(config);
  12. }
  13. // Objenesis是一个可以不通过构造器创建子类的java工具类库
  14. // 作为Spring 4.0后CGLIB的默认实现
  15. return new ObjenesisCglibAopProxy(config);
  16. }
  17. else {
  18. // 否则使用JDK动态代理
  19. return new JdkDynamicAopProxy(config);
  20. }
  21. }
  22. ...
  23. }

Spring事务拦截
我们已经了解了AOP切面织入生成代理对象的过程,当Bean方法通过代理对象调用时,会触发对应的AOP增强拦截器,前面提到声明式事务是一种环绕增强,对应接口为MethodInterceptor,事务增强对该接口的实现为TransactionInterceptor,类图如下:

image.png

事务拦截器TransactionInterceptor在invoke方法中,通过调用父类TransactionAspectSupport的invokeWithinTransaction方法进行事务处理,该方法支持声明式事务和编程式事务。

  1. // TransactionInterceptor.class
  2. @Override
  3. public Object invoke(final MethodInvocation invocation) throws Throwable {
  4. // 获取targetClass
  5. ...
  6. // Adapt to TransactionAspectSupport's invokeWithinTransaction...
  7. return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
  8. @Override
  9. public Object proceedWithInvocation() throws Throwable {
  10. // 实际执行目标方法
  11. return invocation.proceed();
  12. }
  13. });
  14. }
  15. // TransactionInterceptor父类TransactionAspectSupport.class
  16. protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
  17. throws Throwable {
  18. // If the transaction attribute is null, the method is non-transactional.
  19. // 查询目标方法事务属性、确定事务管理器、构造连接点标识(用于确认事务名称)
  20. final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
  21. final PlatformTransactionManager tm = determineTransactionManager(txAttr);
  22. final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
  23. if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
  24. // 事务获取
  25. TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
  26. Object retVal = null;
  27. try {
  28. // 通过回调执行目标方法
  29. retVal = invocation.proceedWithInvocation();
  30. }
  31. catch (Throwable ex) {
  32. // 目标方法执行抛出异常,根据异常类型执行事务提交或者回滚操作
  33. completeTransactionAfterThrowing(txInfo, ex);
  34. throw ex;
  35. }
  36. finally {
  37. // 清理当前线程事务信息
  38. cleanupTransactionInfo(txInfo);
  39. }
  40. // 目标方法执行成功,提交事务
  41. commitTransactionAfterReturning(txInfo);
  42. return retVal;
  43. } else {
  44. // 带回调的事务执行处理,一般用于编程式事务
  45. ...
  46. }
  47. }

在讲Spring事务抽象时,有提到事务抽象的核心接口为PlatformTransactionManager,它负责管理事务行为,包括事务的获取、提交和回滚。在invokeWithinTransaction方法中,我们可以看到createTransactionIfNecessary、commitTransactionAfterReturning和completeTransactionAfterThrowing都是针对该接口编程,并不依赖于特定事务管理器,这里是对Spring事务抽象的实现。

  1. //TransactionAspectSupport.class
  2. protected TransactionInfo createTransactionIfNecessary(
  3. PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
  4. ...
  5. TransactionStatus status = null;
  6. if (txAttr != null) {
  7. if (tm != null) {
  8. // 获取事务
  9. status = tm.getTransaction(txAttr);
  10. ...
  11. }
  12. protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
  13. if (txInfo != null && txInfo.hasTransaction()) {
  14. ...
  15. // 提交事务
  16. txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
  17. }
  18. }
  19. protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
  20. if (txInfo != null && txInfo.hasTransaction()) {
  21. ...
  22. if (txInfo.transactionAttribute.rollbackOn(ex)) {
  23. try {
  24. // 异常类型为回滚异常,执行事务回滚
  25. txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
  26. }
  27. ...
  28. } else {
  29. try {
  30. // 异常类型为非回滚异常,仍然执行事务提交
  31. txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
  32. }
  33. ...
  34. }
  35. protected final class TransactionInfo {
  36. private final PlatformTransactionManager transactionManager;
  37. ...

另外,在获取事务时,AbstractPlatformTransactionManager#doBegin方法负责开启新事务,在DataSourceTransactionManager有如下代码:

  1. @Override
  2. protected void doBegin(Object transaction, TransactionDefinition definition) {
  3. // 获取数据库连接con
  4. ...
  5. if (con.getAutoCommit()) {
  6. txObject.setMustRestoreAutoCommit(true);
  7. if (logger.isDebugEnabled()) {
  8. logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
  9. }
  10. con.setAutoCommit(false);
  11. }
  12. ...
  13. }

这里才真正开启了数据库事务。
Spring事务同步
提到事务传播机制时,我们经常提到一个条件“如果当前已有事务”,那么Spring是如何知道当前是否已经开启了事务呢?在AbstractPlatformTransactionManager中是这样做的:

  1. // AbstractPlatformTransactionManager.class
  2. @Override
  3. public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
  4. Object transaction = doGetTransaction();
  5. // 参数为null时构造默认值
  6. ...
  7. if (isExistingTransaction(transaction)) {
  8. // Existing transaction found -> check propagation behavior to find out how to behave.
  9. return handleExistingTransaction(definition, transaction, debugEnabled);
  10. }
  11. ...
  12. // 获取当前事务对象
  13. protected abstract Object doGetTransaction() throws TransactionException;
  14. // 判断当前事务对象是否包含活跃事务
  15. protected boolean isExistingTransaction(Object transaction) throws TransactionException {
  16. return false;
  17. }

注意getTransaction方法是final的,无法被子类覆盖,保证了获取事务流程的一致和稳定。抽象方法doGetTransaction获取当前事务对象,方法isExistingTransaction判断当前事务对象是否存在活跃事务,具体逻辑由特定事务管理器实现,看下我们使用最多的DataSourceTransactionManager对应的实现:

  1. // DataSourceTransactionManager.class
  2. @Override
  3. protected Object doGetTransaction() {
  4. DataSourceTransactionObject txObject = new DataSourceTransactionObject();
  5. txObject.setSavepointAllowed(isNestedTransactionAllowed());
  6. ConnectionHolder conHolder =
  7. (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
  8. txObject.setConnectionHolder(conHolder, false);
  9. return txObject;
  10. }
  11. @Override
  12. protected boolean isExistingTransaction(Object transaction) {
  13. DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  14. return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
  15. }

可以看到,获取当前事务对象时,使用了TransactionSynchronizationManager#getResource方法,类图如下:

image.png

TransactionSynchronizationManager通过ThreadLocal对象在当前线程记录了resources和synchronizations属性。resources是一个HashMap,用于记录当前参与事务的事务资源,方便进行事务同步,在DataSourceTransactionManager的例子中就是以dataSource作为key,保存了数据库连接,这样在同一个线程中,不同的方法调用就可以通过dataSource获取相同的数据库连接,从而保证所有操作在一个事务中进行。synchronizations属性是一个TransactionSynchronization对象的集合,AbstractPlatformTransactionManager类中定义了事务操作各个阶段的调用流程,以事务提交为例:

  1. // AbstractPlatformTransactionManager.class
  2. private void processCommit(DefaultTransactionStatus status) throws TransactionException {
  3. try {
  4. boolean beforeCompletionInvoked = false;
  5. try {
  6. prepareForCommit(status);
  7. triggerBeforeCommit(status);
  8. triggerBeforeCompletion(status);
  9. ....
  10. else if (status.isNewTransaction()) {
  11. // 记录日志
  12. ...
  13. doCommit(status);
  14. }
  15. ...
  16. // 事务调用异常处理
  17. ...
  18. try {
  19. triggerAfterCommit(status);
  20. }
  21. finally {
  22. triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
  23. }
  24. }
  25. }

我们可以看到,有很多trigger前缀的方法,这些方法用于在事务操作的各个阶段触发回调,从而可以精确控制在事务执行的不同阶段所要执行的操作,这些回调实际上都通过TransactionSynchronizationUtils来实现,它会遍历TransactionSynchronizationManager#synchronizations集合中的TransactionSynchronization对象,然后分别触发集合中各个元素对应方法的调用。例如:

  1. TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
  2. @Override
  3. public void afterCommit() {
  4. // do something after commit
  5. }
  6. });

这段代码就在当前线程的事务synchronizations属性中,添加了一个自定义同步类,如果当前存在事务,那么在事务管理器执行事务提交之后,就会触发afterCommit方法,可以通过这种方式在事务执行的不同阶段自定义一些操作。
到这里,我们已经对Spring事务的实现原理和处理流

引用资料

https://blog.51cto.com/u_15297476/4990460
https://baijiahao.baidu.com/s?id=1690462040725127226&wfr=spider&for=pc

Spring:
https://zhuanlan.zhihu.com/p/54067384