什么是本地事务

什么是本地事务(Local Transaction)?本地事务也称为数据库事务或传统事务(相对于分布式事务而言)。它的执行模式就是常见的:

  1. 1.transaction begin
  2. 2.insert/delete/update
  3. 3.insert/delete/update
  4. 4....
  5. 5.transaction commit/rollback

本地事务有这么几个特征:

  • 一次事务只连接一个支持事务的数据库(一般来说都是关系型数据库)
  • 事务的执行结果保证ACID
  • 会用到数据库锁

    什么是分布式事务

    系统微服务化后,一个看似简单的功能,内部可能需要调用多个服务并操作多个数据库实现,服务调用的分布式事务问题变的非常突出。

一个下单请求同时设计到订单库,优惠券库,库存库的写操作,需要保证三个库写操作的一致性,就要用到分布式事务 即:分布式事务就是要解决一个请求同时对多个数据库写操作的一致性。

1 XA协议

常见的分布式解决方案如下:

  • 两阶段提交型
  • 三阶段提交型
  • TCC补偿机制
  • 异步确保性
  • 最大努力通知型

这几种解决方案中,例如两阶段与三阶段提交都是基于一个XA的协议:
分布式事务 - 图1
XA协议由Tuxedeo首先提出的,并交给X/Open阻止,作为资源管理器(数据库)与事务管理器的接口标准,目前,Oracle、Informix、DB2、Sybase等各大数据库厂商都提供对XA的支持,XA协议采用两阶段提交方式来管理分布式事务,XA接口提供资源管理器与事务管理器之间进行通信的标准接口。
对应JAVA的实现为JTA/JTS,JTA可以说是定义了一套实现XA协议的接口规范,而JTS就是JTA的具体实现。

2 两阶段提交(2PC)与三阶段提交(3PC)

2PC:
2PC即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段( Prepare phase).提交阶段( commit phase )
分布式事务 - 图2
在第一阶段(准备阶段),事务管理器TM(协调者)先向事务参与者(资源RM)们发送准备请求,大家都返回OK状态,那么就进入第二阶段,提交事务,如果在第一阶段有任何一个参与者没有OK,那么事务协调器通知其他所有事务参与者(资源RM)回滚事务。
二阶段能保证分布式事务的原子性,但是也有一些明显的缺陷:

  • 在第一阶段,如果参与者迟迟不回复协调者,就会造成事务的阻塞,性能不好。
  • 单节点故障,如果协调器挂了,参与者会阻塞,比如在第二阶段,如果事务协调器宕机,参与者没办法回复信息,长时间处于事务资源锁定,造成阻塞(事务操作是要加锁的)。
  • 在第二阶段,如果在事务协调器发出”commit”执行后宕机,一部和参与者收到了消息提交了事务,而 一部分没有消息没法做出事务提交操作,这样就出现了数据不一致。
  • 在第二阶段,如果事务事务协调器发出“commit”指令后宕机,收到“commmit”指令的参与者也宕机了, 那么事务最终变成了什么效果,提交了还是没提交?没有谁知道。

3PC:
三阶段提交协议主要是为了解决两阶段提交协议的阻塞问题,2PC存在的问题是当协作者崩溃时,参与者不能做出最后的选择。因此参与者可能在协作者恢复之前保持阻塞。三阶段提交(Three-phase commit),是二阶段提交(2PC)的改进版本。
分布式事务 - 图3
与两阶段提交不同的是,三阶段提交有两个改动点。

  1. 引入超时机制。同时在协调者和参与者中都引入超时机制。
  2. 在第一阶段和第二阶段中插入一个准备阶段。保证了在最后提交阶段之前各参与节点的状态是一致的。

也就是说,除了引入超时机制之外,3PC把2PC的准备阶段再次一分为二,这样三阶段提交就有CanCommit、PreCommit、DoCommit三个阶段。

  • CanCommit: 这里是资源准备阶段
  • PreCommit: 这里是资源确认阶段
  • DoCommit: 这跟2PC的二阶段差不多

    3 TCC补偿机制

    TCC 其实就是采用的补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。它分为三个阶段:

  • Try 阶段主要是对业务系统做检测及资源预留

  • Confirm 阶段主要是对业务系统做确认提交,Try阶段执行成功并开始执行 Confirm阶段时,默认 Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。
  • Cancel 阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。

分布式事务 - 图4
TCC实际上把数据库层的二阶段提交上提到了应用层来实现对于数据库来说是一阶段提交,避免了数据库层的2PC性能低下的问题,但是TCC操作需要业务实现,开发成本较高。

4 MQ异步确保型策略

MQ异步确保型是指通过消息的一致性来处理事务。当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致。
分布式事务 - 图5
最终一致性要解决三个问题:本地事务和消息发送的原子性,接收消息的可靠性,消息重复消费问题

  • 本地事务和消息发送原子性
    本地事务与消息发送的原子性问题即:事务发起方在本地事务执行成功后消息必须发出去,否则就丢弃消息。即实现本地事务和消息发送的原子性,要么都成功,要么都失败。本地事务与消息发送的原子性问题是实现可靠消息最终一致性方案的关键问题。
  • 事务参与方接收消息可靠性 ack机制
    务参与方必须能够从消息队列接收到消息,如果接收消息失败可以重复接收消息
  • 消息重复消息问题 幂等性校验
    由于网络2的存在,若某一个消费节点超时但是消费成功,此时消息中间件会重复投递此消息,就导致了消息的重复消费。要解决消息重复消费的问题就要实现事务参与方的方法幂等性。

我们可以以RocketMQ为例子:来看一下它的事务消息的流程设计:
分布式事务 - 图6

5 最大努力通知型

最大努力通知服务表示在不影响主业务的情况下,尽可能地确保数据的一致性。它需要开发人员根据业务来指定通知规则,在满足通知规则的前提下,尽可能的确保数据的一致,以达到最大努力的目的。
分布式事务 - 图7
目标:发起通知方(接口提供方)通过一定的机制最大努力将业务处理结果通知到接收方(接口调用方)。
具体包括:

  • 有一定的消息重复通知机制。
    因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知。 10s 1min 10min 1h 5h 1d
  • 消息校对机制。
    如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知 方查询消息信息来满足需求。
  • 消息处理方需要保证幂等性

最大努力通知与MQ异步确保有什么不同?

  • 可靠消息最终一致性
    系统A本地事务执行成功,通知系统B处理任务,通常通过MQ实现。一般适用于平台内部,对一 致性要求相对较高(微服务的2个子系统之间)。
  • 最大努力通知
    所谓最大努力通知就是系统A用最大努力通知系统B,能不能成功,不做完全保证,如果没通知到位,系统B可以主动来调用系统A的接口查询结果状态。一般适用于跨平台业务,或对接了上方平台的业 务场景(支付结果通知)。

    6 各种方案选型

    在学习各种分布式事务的解决方案后,我们了解到各种方案的优缺点:

  • 2PC最大的诟病是一个阻塞协议。RM在执行分支事务后需要等待TM的决定,此时服务会阻塞并锁定资源。由于其阻塞机制和最差时间复杂度高,因此,这种设计不能适应随着事务涉及的服务数量增加而扩展的需要,很难用于并发较高以及子事务生命周期较长(long-running transactions)的分布式服务中。

  • 如果拿TCC事务的处理流程与2PC两阶段提交做比较, 2PC通常都是在跨库的DB层面,而TCC则在应用层面的处理,需要通过业务逻辑来实现。这种分布式事务的实现方式的优势在于,可以让应用自己定义数据操作的粒度,使得降低锁冲突、提高吞吐量成为可能。而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现try、confirm、 cancel三个操作。 此外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略。
  • 可靠消息最终一致性事务适合执行周期长且实时性要求不高的场景。引入消息机制后,同步的事务操作变为基于消息执行的异步操作,避免了分布式事务中的同步阻塞操作的影响,并实现了两个服务的解耦。典型的使用场景:注册送积分,登录送优惠券等。
  • 最大努力通知是分布式事务中要求最低的一种,适用于一些最终一致性时间敏感度低的业务 ;允许发起通知方处理业务失败,在接收通知方收到通知后积极进行失败处理,无论发起通知方如何处理结果都会不影响到接收通知方的后续处理;发起通知方需提供查询执行情况接口,用于接收通知方校对结果。典型的使用 场景:银行通知、支付结果通知等。

    什么是Seata

    Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。在 Seata 开源之前,Seata 对应的内部版本在阿里经济体内部一直扮演着分布式一致性中间件的角色,帮助经济体平稳的度过历年的双11,对各BU业务进行了有力的支撑。经过多年沉淀与积累,商业化产品先后在阿里云、金融云进行售卖。2019.1 为了打造更加完善的技术生态和普惠技术成果,Seata 正式宣布对外开源,未来 Seata 将以社区共建的形式帮助其技术更加可靠与完备。

    Seata的工作原理

    Seata主要用的是基于二阶段提交思想的AT模式,通过注解实现非业务侵入。
    分布式事务 - 图8

  • TC(Transaction Coordinator),事务协调者,在源码中Seata Server充当事务协调者身份,维护全局锁状态,协调全局事务的提交与回滚。

  • TM(Transaction Manager),事务管理者,业务代码中使用了全局事务注解的服务属于事务管理者,控制全局事务的范围,执行全局事务的提交与回滚。
  • RM(Resource Manager),资源管理者,业务代码中被远程调用的部分,负责执行本地事务,和提交与回滚本地事务。

第一阶段如上图所有的RM执行自己的本地事务。在执行本地事务时,用大白话讲其实就是jdbc执行sql时,seata使用了数据源代理,在执行sql前,对sql进行解析,生成前置镜像sql,后置镜像sql,同时向undo log插入一条数据,方便后期万一出现异常做回滚,然后向TC注册分支事务,提交本地事务,最后向TC提交它的分支事务状态。
二阶段流程分两种情况

  1. 所有RM本地事务执行成功,此时TM会向TC发起全局事务提交,TC会立马释放全局锁然后异步驱动所有RM做分支事务的提交。
  2. 存在一个RM本地事务不成功,此时TM会向TC发起全局事务回滚,TC会驱动所有的RM做回滚操作,等待所有的RM回滚成功后然后再释放全局锁。

分布式事务 - 图9
这个阶段所有RM提交分支事务,其实就是删除Undo Log表里的记录,如果提交分支事务失败,并不会影响业务数据,可以手动的做Undo Log删除。
分布式事务 - 图10
这个阶段,所有的RM执行分支事务回滚,此时是去Undo Log表中查找数据,然后通过第一阶段生成的后置镜像sql,与数据进行校验,通过前置镜像sql做回滚,然后删除Undo Log日志。

Seata的下载

打开官方网站的下载页:http://seata.io/zh-cn/blog/download.html
分布式事务 - 图11
最新版本是1.4.0.点击 binary 进行下载,这里我们可以把0.9.0的包也下载下来,因为有些初始配置(包括数据库建表语句)在新包里面没有,需要到github上面去找,但是0.9.0这里里面有需要的那些东西。
分布式事务 - 图12
下载下来之后是压缩包,解压就可以:
分布式事务 - 图13

Seata的配置

解压出来之后,我们需要进入到conf目录进行配置:
分布式事务 - 图14
这里我们需要对两个配置文件进行设置,首先打开registry.conf:

  1. #这里主要是配置服务的注册中心,Seata的服务器也是一个微服务
  2. registry {
  3. # 注册中心可以是多种形式: file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  4. # type的值就是需要选择的形式,我们选择用当前项目使用的nacos
  5. type = "nacos"
  6. loadBalance = "RandomLoadBalance"
  7. loadBalanceVirtualNodes = 10
  8. #选择nacos后相关的配置
  9. nacos {
  10. application = "seata-server"
  11. serverAddr = "localhost:8848"
  12. group = "SEATA_GROUP"
  13. namespace = "public"
  14. cluster = "DEFAULT"
  15. username = "nacos"
  16. password = "nacos"
  17. }
  18. #后面是选择各个不同的注册中心需要的配置,我们不需要去做改动
  19. eureka {
  20. serviceUrl = "http://localhost:8761/eureka"
  21. application = "default"
  22. weight = "1"
  23. }
  24. redis {
  25. serverAddr = "localhost:6379"
  26. db = 0
  27. password = ""
  28. cluster = "default"
  29. timeout = 0
  30. }
  31. zk {
  32. cluster = "default"
  33. serverAddr = "127.0.0.1:2181"
  34. sessionTimeout = 6000
  35. connectTimeout = 2000
  36. username = ""
  37. password = ""
  38. }
  39. consul {
  40. cluster = "default"
  41. serverAddr = "127.0.0.1:8500"
  42. }
  43. etcd3 {
  44. cluster = "default"
  45. serverAddr = "http://localhost:2379"
  46. }
  47. sofa {
  48. serverAddr = "127.0.0.1:9603"
  49. application = "default"
  50. region = "DEFAULT_ZONE"
  51. datacenter = "DefaultDataCenter"
  52. cluster = "default"
  53. group = "SEATA_GROUP"
  54. addressWaitTime = "3000"
  55. }
  56. file {
  57. name = "file.conf"
  58. }
  59. }
  60. #这里是服务端的相关参数配置
  61. config {
  62. #参数配置也可以选择不同的形式: file、nacos 、apollo、zk、consul、etcd3
  63. #type的值就是选择的具体配置形式,我们选择的是文件配置
  64. type = "file"
  65. #这些是其他配置方式的各个配置,可以不用改动
  66. nacos {
  67. serverAddr = "127.0.0.1:8848"
  68. namespace = ""
  69. group = "SEATA_GROUP"
  70. username = ""
  71. password = ""
  72. }
  73. consul {
  74. serverAddr = "127.0.0.1:8500"
  75. }
  76. apollo {
  77. appId = "seata-server"
  78. apolloMeta = "http://192.168.1.204:8801"
  79. namespace = "application"
  80. apolloAccesskeySecret = ""
  81. }
  82. zk {
  83. serverAddr = "127.0.0.1:2181"
  84. sessionTimeout = 6000
  85. connectTimeout = 2000
  86. username = ""
  87. password = ""
  88. }
  89. etcd3 {
  90. serverAddr = "http://localhost:2379"
  91. }
  92. #文件配置指出文件是谁
  93. file {
  94. name = "file.conf"
  95. }
  96. }

这样registry就配置好了,我们现在需要打开第二个文件就是参数配置文件(file.conf)进行相关配置:

  1. #参数配置里面最主要的是数据源的配置,因为Seata在处理事务的过程中会存储一些临时数据,所以需要数据源
  2. store {
  3. # 配置数据源的类型,可以是 db(数据库)、file、redis
  4. # 这里我们选择用数据库
  5. mode = "db"
  6. ## 这是用文件存储的配置 不用改动
  7. file {
  8. dir = "sessionStore"
  9. maxBranchSessionSize = 16384
  10. maxGlobalSessionSize = 512
  11. fileWriteBufferCacheSize = 16384
  12. sessionReloadReadSize = 100
  13. flushDiskMode = async
  14. }
  15. ## 这是配置选择数据库时候的连接配置
  16. db {
  17. ## 连接池配置,可以是druid、dbcp、hikari
  18. datasource = "druid"
  19. ## 数据库的类型,我们选择mysql
  20. dbType = "mysql"
  21. ## 下面是数据库的常规配置
  22. driverClassName = "com.mysql.jdbc.Driver"
  23. url = "jdbc:mysql://127.0.0.1:3306/seata"
  24. user = "root"
  25. password = "123321"
  26. minConn = 5
  27. maxConn = 100
  28. ## 下面这三个配置是数据库需要用到的表示谁?数据库存储会涉及到表,这里有三张表
  29. ## 这三张表是Seata提供的,我们只需要导入DDL就可以
  30. globalTable = "global_table"
  31. branchTable = "branch_table"
  32. lockTable = "lock_table"
  33. queryLimit = 100
  34. maxWait = 5000
  35. }
  36. ## 这是选择用redis作为数据源的配置,不用改动
  37. redis {
  38. host = "127.0.0.1"
  39. port = "6379"
  40. password = ""
  41. database = "0"
  42. minConn = 1
  43. maxConn = 10
  44. maxTotal = 100
  45. queryLimit = 100
  46. }
  47. }

数据库建表

在前面file的配置里面数据库需要三张表,这三张表在1.4.0这个版本下面它没有提供,所以我们去0.9.0的conf目录下面:
分布式事务 - 图15
可以看到这里面有两个sql文件,db_store.sql就是这三张表的DDL,我们直接在导入mysql数据库就可以:

  1. -- 每当有一个全局事务发起后,就会在该表中记录全局事务的ID
  2. drop table if exists `global_table`;
  3. create table `global_table` (
  4. `xid` varchar(128) not null,
  5. `transaction_id` bigint,
  6. `status` tinyint not null,
  7. `application_id` varchar(32),
  8. `transaction_service_group` varchar(32),
  9. `transaction_name` varchar(128),
  10. `timeout` int,
  11. `begin_time` bigint,
  12. `application_data` varchar(2000),
  13. `gmt_create` datetime,
  14. `gmt_modified` datetime,
  15. primary key (`xid`),
  16. key `idx_gmt_modified_status` (`gmt_modified`, `status`),
  17. key `idx_transaction_id` (`transaction_id`)
  18. );
  19. -- 记录每一个分支事务的ID,分支事务操作的哪个数据库等信息
  20. drop table if exists `branch_table`;
  21. create table `branch_table` (
  22. `branch_id` bigint not null,
  23. `xid` varchar(128) not null,
  24. `transaction_id` bigint ,
  25. `resource_group_id` varchar(32),
  26. `resource_id` varchar(256) ,
  27. `lock_key` varchar(128) ,
  28. `branch_type` varchar(8) ,
  29. `status` tinyint,
  30. `client_id` varchar(64),
  31. `application_data` varchar(2000),
  32. `gmt_create` datetime,
  33. `gmt_modified` datetime,
  34. primary key (`branch_id`),
  35. key `idx_xid` (`xid`)
  36. );
  37. -- 用于生成全局锁
  38. drop table if exists `lock_table`;
  39. create table `lock_table` (
  40. `row_key` varchar(128) not null,
  41. `xid` varchar(96),
  42. `transaction_id` long ,
  43. `branch_id` long,
  44. `resource_id` varchar(256) ,
  45. `table_name` varchar(32) ,
  46. `pk` varchar(36) ,
  47. `gmt_create` datetime ,
  48. `gmt_modified` datetime,
  49. primary key(`row_key`)
  50. );

在上面的目录结构里面还有一个sql文件db_undo_log.sql,这个也是在执行事务的过程中需要用到的表,但并不是Seata服务器直接操作的表,这是我们自己的业务服务里面需要操作的表,所以在我么项目自己的每个数据库里面也需要插入这个表:

  1. -- 此脚本必须初始化在你当前的业务数据库中,用于AT 模式XID记录。与server端无关(注:业务数据库)
  2. -- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
  3. drop table `undo_log`;
  4. CREATE TABLE `undo_log` (
  5. `id` bigint(20) NOT NULL AUTO_INCREMENT,
  6. `branch_id` bigint(20) NOT NULL,
  7. `xid` varchar(100) NOT NULL,
  8. `context` varchar(128) NOT NULL,
  9. `rollback_info` longblob NOT NULL,
  10. `log_status` int(11) NOT NULL,
  11. `log_created` datetime NOT NULL,
  12. `log_modified` datetime NOT NULL,
  13. `ext` varchar(100) DEFAULT NULL,
  14. PRIMARY KEY (`id`),
  15. UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
  16. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

启动服务

上面三个步骤配置好之后我们就可以启动Seata服务了,启动之前,我们需要先开启nacos,然后通过Seata的bin目录下面的命令启动Seata:
分布式事务 - 图16
分布式事务 - 图17
看到这里就启动成功,nacos上面也可以看到该服务:
分布式事务 - 图18

SpringBoot引入seata

Spring-Cloud-alibaba提供了Seata的集成包,我们需要把他导入到项目中

  1. <dependency>
  2. <groupId>com.alibaba.cloud</groupId>
  3. <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
  4. </dependency>

配置application.yml

spring:
  application:
    name: child
  cloud:
        alibaba:
      seata:
        tx-service-group: my_tx_group
seata:
  enabled: true
  service:
    vgroup-mapping:
      my_tx_group: "default"

在controller层的业务方法上面写上发生错误后进行回滚的注解,通常在增删改功能接口上

@RestController
@Slf4j
public class OrderController {
    @Resource
    private OrderService orderService;

    //全局回滚
    @GlobalTransactional(rollbackFor = Exception.class)
    public void creatOrder(Order order){
        orderService.save(oreder);
    }

    //局部回滚
    @Transactional(rollbackFor = Exception.class)
    public void createOrder2(Order order){
        orderService.save(oreder);
    }  
}

为了使发生的异常不会回导,创建全局异常类


@RestControllerAdvice
public class GlobalException {
    @ExceptionHandler(Exception.class)
    public ResponseEntity<String> myExceptionHandler(Exception e){
        return new ResponseEntity<String>("error",HttpStatus.INTERNAL_SERVER_ERROR);
    }
}