学习目标

  • 本地事务与分布式事务
  • CAP定理
  • 分布式事务解决方案
  • Seata使用

1 前奏

1.1 什么是事务

提供一种“要么什么都不做,要么做全套(All or Nothing)”机制。

1.2 事务的作用

保证数据一致性

1.3 事务ACID四大特性

A:原子性(Atomicity)
一个事务(transaction)中的所有操作,要么全部完成,要么全部不完成,不会结束在中间某个环节。事务在执行过程中发生错误,会被回滚(Rollback)到事务开始前的状态,就像这个事务从来没有执行过一样。
C:一致性(Consistency)
事务的一致性指的是在一个事务执行之前和执行之后数据库都必须处于一致性状态。
如果事务成功地完成,那么系统中所有变化将正确地应用,系统处于有效状态。
如果在事务中出现错误,那么系统中的所有变化将自动地回滚,系统返回到原始状态。
I:隔离性(Isolation)
指的是在并发环境中,当不同的事务同时操纵相同的数据时,每个事务都有各自的完整数据空间。由并发事务所做的修改必须与任何其他并发事务所做的修改隔离。事务查看数据更新时,数据所处的状态要么是另一事务修改它之前的状态,要么是另一事务修改它之后的状态,事务不会查看到中间状态的数据。
D:持久性(Durability)
指的是只要事务成功结束,它对数据库所做的更新就必须保存下来。即使发生系统崩溃,重新启动数据库系统后,数据库还能恢复到事务成功结束时的状态。

1.4 事务的并发问题

脏读:
事务A读取了事务B更新的数据,事务B未提交并回滚数据,那么A读取到的数据是脏数据
不可重复读:
事务 A 多次读取同一数据,事务 B 在事务A多次读取的过程中,对数据作了更新并提交,导致事务A多次读取同一数据时,结果 不一致。
幻读:
系统管理员A将数据库中所有学生的成绩从具体分数改为ABCDE等级,但是系统管理员B就在这个时候插入了一条具体分数的记录,当系统管理员A更改结束后发现还有一条记录没有改过来,就好像发生了幻觉一样,这就叫幻读。
  小结:不可重复读的和幻读很容易混淆,不可重复读侧重于修改,幻读侧重于新增或删除。解决不可重复读的问题只需锁住满足条件的行,解决幻读需要锁表。

1.5 MySQL事务隔离级别

事务隔离级别 脏读 不可重复读 幻读
读未提交(read-uncommitted)
读已提交(read-committed) ×
可重复读(repeatable-read) × ×
串行化(serializable) × × ×

mysql默认的事务隔离级别为repeatable-read
分布式事务 - 图1

1.6 事务传播行为(propagation behavior)

指的就是当一个事务方法被另一个事务方法调用时,这个事务方法应该如何进行。
例如:methodA事务方法调用methodB事务方法时,methodB是继续在调用者methodA的事务中运行呢,还是为自己开启一个新事务运行,这就是由methodB的事务传播行为决定的。
Spring定义了七种传播行为:参考 TransactionDefinition类

事务传播行为类型 说明
PROPAGATION_REQUIRED 如果当前没有事务,就新建一个事务,如果已经存在一个事务中,加入到这个事务中。默认
PROPAGATION_SUPPORTS 支持当前事务,如果当前没有事务,就以非事务方式执行
PROPAGATION_MANDATORY 使用当前的事务,如果当前没有事务,就抛出异常。
PROPAGATION_REQUIRES_NEW 新建事务,如果当前存在事务,把当前事务挂起。(一个新的事务将启动,而且如果有一个现有的事务在运行的话,则这个方法将在运行期被挂起,直到新的事务提交或者回滚才恢复执行)
PROPAGATION_NOT_SUPPORTED 以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
PROPAGATION_NEVER 以非事务方式执行,如果当前存在事务,则抛出异常。
PROPAGATION_NESTED 如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则执行与PROPAGATION_REQUIRED类似的操作。(外层事务抛出异常回滚,那么内层事务必须回滚,反之内层事务并不影响外层事务)

1.7本地事务

本地事务也称为数据库事务传统事务(相对于分布式事务而言)。它的执行模式就是常见的:

1. transaction begin
2. insert/delete/update
3. insert/delete/update
4. …
5. transaction commit/rollback
  • 本地事务有这么几个特征:
    • 一次事务只连接一个支持事务的数据库(一般来说都是关系型数据库)
    • 事务的执行结果保证ACID
    • 会用到数据库锁
  • 起初,事务仅限于对单一数据库资源的访问控制,架构服务化以后,事务的概念延伸到了服务中。倘若将一个单一的服务操作作为一个事务,那么整个服务操作只能涉及一个单一的数据库资源,这类基于单个服务单一数据库资源访问的事务,被称为本地事务(Local Transaction)

分布式事务 - 图2

第二章 分布式事务

2.1 微服务分布式事务问题

  • 首先,传统的单体应用(Monolithic App),通过 3 个 Module,在同一个数据源上更新数据来完成一项业务。很自然的,整个业务过程的数据一致性本地事务来保证。

分布式事务 - 图3

  • 随着业务需求和架构的变化,单体应用被拆分为微服务:原来的 3 个 Module 被拆分为 3 个独立的服务,分别使用独立的数据源。业务过程将由 3 个服务的调用来完成。

分布式事务 - 图4

  • 此时,每一个服务内部的数据一致性仍由本地事务来保证。而整个业务层面的全局数据一致性要如何保障呢?这就是微服务架构下面临的,典型的分布式事务需求:我们需要一个分布式事务的解决方案保障业务全局的数据一致性。

分布式事务 - 图5

2.2 什么是分布式事务

分布式事务指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。
指一次大的操作由不同的小操作组成的,这些小的操作分布在不同的服务器上,分布式事务需要保证这些小操作要么全部成功,要么全部失败。
本质上来说,分布式事务就是为了保证不同数据库的数据一致性。

2.3 什么是分布式系统

  • 部署在不同节点上的系统通过网络交互来完成协同工作的系统。
  • 比如:

    1. - 充值加积分的业务,用户在充值系统向自己的账户充钱,在积分系统中自己积分相应的增加。
    2. - 充值系统和积分系统是两个不同的系统,一次充值加积分的业务就需要这两个系统协同工作来完成。

    2.4 分布式事务应用在哪些场景

  • 电商系统中的下单扣库存

电商系统中,订单系统库存系统是两个系统,一次下单的操作由两个系统协同完成

  • 金融系统中的银行卡充值

在金融系统中通过银行卡向平台充值需要通过银行系统金融系统协同完成。

  • 教育系统中下单选课业务

在线教育系统中,用户购买课程,下单支付成功后学生选课成功,此事务由订单系统选课系统协同完成。

  • SNS系统的消息发送

在社交系统中发送站内消息同时发送手机短信,一次消息发送由站内消息系统手机通信系统协同完成。

2.5 跨多服务多数据库的分布式事务

一个服务操作调用另一个服务,这时事务需要跨越多个服务。在这种情况下,起始服务的事务在调用另外一个服务的时候,需要以某种机制流转到另外一个服务,从而使被调用的服务访问的资源也自动加入到该事务当中来。这就需要跨服务跨数据库的全局事务进行数据一致性的保证。
分布式事务 - 图6
较之基于单一数据库资源访问的本地事务,分布式事务的应用架构更为复杂。在不同的分布式应用架构下,实现一个分布式事务要考虑的问题并不完全一样,比如对多资源的协调、事务的跨服务传播等,实现机制也是复杂多变。

2.6 CAP定理

  • CAP 定理,又被叫作布鲁尔定理。对于设计分布式系统(不仅仅是分布式事务)的架构师来说,CAP 就是你的入门理论。指在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)三部分要素。
  • 这三个要素最多只能同时实现两点,不可能三者兼顾。分布式系统的最大难点,就是各个节点的状态如何同步。CAP 定理是这方面的基本定理,也是理解分布式系统的起点。

分布式事务 - 图7

  • C (一致性Consistency):在分布式系统中的所有数据备份,在同一时刻是否同样的值。

① 强一致性
简言之,在任意时刻,所有节点中的数据是一样的。
② 弱一致性
数据更新后,如果能容忍后续的访问只能访问到部分或者全部访问不到,则是弱一致性。最终一致性就属于弱一致性。

  • A (可用性Availability):在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。
  • P (网络分区容错性Partition tolerance):以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择。
  • 什么是分区?

在分布式系统中,不同的节点分布在不同的子网络中,由于一些特殊的原因,这些子节点之间出现了网络不通的状态,但他们的内部子网络是正常的。从而导致了整个系统的环境被切分成了若干个孤立的区域。这就是分区。

  • 取舍策略

现如今,对于多数大型互联网应用的场景,主机众多、部署分散,而且现在的集群规模越来越大,节点只会越来越多,所以节点故障、网络故障是常态,因此分区容错性也就成为了一个分布式系统必然要面对的问题。那么就只能在C和A之间进行取舍。

  • 是选择停掉所有的服务,等网络节点修复后恢复数据,以此来保证一致性(PC),
  • 还是选择继续提供服务,放弃强一致性的要求,以此来保证整体的可用性(PA)。

我们分析一下既然可以满足两个,那么舍弃哪一个比较好呢?

组合 分析结果
CA 满足一致性和可用性,放弃分区容错性。
说白了,就是一个单体应用。
CP 满足一致性和分区容错性,放弃可用性。
当系统被分区,为了保证一致性,必须放弃可用性,停止所有服务。
AP 满足可用性和分区容错性,放弃一致性。
当出现分区同时为了保证可用性,必须让节点继续对外服务,这就意味着你的系统在并发访问的时候,可能出现数据不一致的情况。

在分布式系统设计中AP的应用较多,即保证分区容忍性和可用性,牺牲数据的强一致性(写操作后立刻读取到最新数据),保证数据最终一致性(弱一致性)。
总而言之,没有最好的策略,好的系统应该是根据业务场景来进行架构设计的,只有适合的才是最好的。

第二章 分布式事务解决方案

1.XA两段提交(低效率)-分布式事务解决方案
2.TCC三段提交(2段,高效率[不推荐(补偿代码)])
3.本地消息(MQ+Table)
4.事务消息(RocketMQ[alibaba])
5.Seata(alibaba)

2.1 基于XA协议的两阶段提交(2PC)

X/Open 组织(即现在的 Open Group )定义了分布式事务处理模型
XA协议:XA是一个分布式事务协议。XA中大致分为两部分:事务管理器本地资源管理器。其中本地资源管理器往往由数据库实现,比如Oracle、DB2这些商业数据库都实现了XA接口,而事务管理器作为全局的调度者,负责各个本地资源的提交和回滚。

2.1.1 概念

二阶段提交2PC(Two phase Commit)是指,在分布式系统里,为了保证所有节点在进行事务提交时保持一致性的一种算法。

2.1.2 背景

在分布式系统里,每个节点都可以知晓自己操作的成功或者失败,却无法知道其他节点操作的成功或失败。
当一个事务跨多个节点时,为了保持事务的原子性与一致性,需要引入一个协调者(Coordinator)来统一掌控所有参与者(Participant)的操作结果,并指示它们是否要把操作结果进行真正的提交(commit)或者回滚(rollback)。

2.1.3 思路

2PC顾名思义分为两个阶段,其实施思路可概括为:
(1)投票阶段(voting phase):参与者将操作结果通知协调者;
(2)提交阶段(commit phase):收到参与者的通知后,协调者再向参与者发出通知,根据反馈情况决定各参与者是否要提交还是回滚;

2.1.4 缺陷

算法执行过程中,所有节点都处于阻塞状态,所有节点所持有的资源(例如数据库数据,本地文件等)都处于封锁状态。
典型场景为:
(1)某一个参与者发出通知之前,所有参与者以及协调者都处于阻塞状态;
(2)在协调者发出通知之前,所有参与者都处于阻塞状态;
另外,如有协调者或者某个参与者出现了崩溃,为了避免整个算法处于一个完全阻塞状态,往往需要借助超时机制来将算法继续向前推进,故此时算法的效率比较低。
总的来说,2PC是一种比较保守的算法

2.1.5 举例

甲乙丙丁四人要组织一个会议,需要确定会议时间,不妨设甲是协调者,乙丙丁是参与者。
投票阶段:
(1)甲发邮件给乙丙丁,周二十点开会是否有时间;
(2)甲回复有时间;
(3)乙回复有时间;
(4)丙迟迟不回复,此时对于这个活动,甲乙丙均处于阻塞状态,算法无法继续进行;
(5)丙回复有时间(或者没有时间);
提交阶段:
(1)协调者甲将收集到的结果反馈给乙丙丁(什么时候反馈,以及反馈结果如何,在此例中取决与丙的时间与决定);
(2)乙收到;
(3)丙收到;
(4)丁收到;

2.1.6 结论

2PC效率很低,分布式事务很难做

2.1.7 实际应用交互流程

2 2PC两阶段提交的正向流程

第一阶段:
2PC中包含着两个角色:事务协调者事务参与者。让我们来看一看他们之间的交互流程:
分布式事务 - 图8
在分布式事务的第一阶段,作为事务协调者的节点会首先向所有的参与者节点发送Prepare请求。
在接到Prepare请求之后,每一个参与者节点会各自执行与事务有关的数据更新,写入Undo Log和Redo Log。如果参与者执行成功,暂时不提交事务,而是向事务协调节点返回“完成”消息。
当事务协调者接到了所有参与者的返回消息,整个分布式事务将会进入第二阶段。
第二阶段:
分布式事务 - 图9
在2PC分布式事务的第二阶段,如果事务协调节点在之前所收到都是正向返回,那么它将会向所有事务参与者发出Commit请求。
接到Commit请求之后,事务参与者节点会各自进行本地的事务提交,并释放锁资源。当本地事务完成提交后,将会向事务协调者返回“完成”消息。
当事务协调者接收到所有事务参与者的“完成”反馈,整个分布式事务完成。

3 失败情况的处理流程

第一阶段:
分布式事务 - 图10
第二阶段:
分布式事务 - 图11
在2PC的第一阶段,如果某个事务参与者反馈失败消息,说明该节点的本地事务执行不成功,必须回滚。
于是在第二阶段,事务协调节点向所有的事务参与者发送Abort(中止)请求。接收到Abort请求之后,各个事务参与者节点需要在本地进行事务的回滚操作,回滚操作依照Undo Log来进行。
以上就是2PC两阶段提交协议的详细过程。

4 2PC两阶段提交究竟有哪些不足呢?

  1. 性能问题

2PC遵循强一致性。在事务执行过程中,各个节点占用着数据库资源,只有当所有节点准备完毕,事务协调者才会通知提交,参与者提交后释放资源。这样的过程有着非常明显的性能问题。

  1. 协调者单点故障问题

2PC模型的核心,一旦事务协调者节点挂掉,参与者收不到提交或是回滚通知,参与者会一直处于中间状态无法完成事务。

  1. 丢失消息导致的不一致问题。

第二个阶段,如果发生局部网络问题,一部分事务参与者收到了提交消息,另一部分事务参与者没收到提交消息,那么就导致了节点之间数据的不一致。

2.2 代码补偿事务(TCC)

TCC的作用主要是解决跨服务调用场景下的分布式事务问题

2.2.1 场景案例

以航班预定的案例,来介绍TCC要解决的事务场景。在这里笔者虚构一个场景,把自己当做航班预定的主人公,来介绍这个案例。从合肥 –> 昆明 –> 大理。
准备从合肥出发,到云南大理去游玩,然后使用美团App(机票代理商)来订机票。发现没有从合肥直达大理的航班,需要到昆明进行中转。如下图:
分布式事务 - 图12
从图中我们可以看出来,从合肥到昆明乘坐的是四川航空,从昆明到大理乘坐的是东方航空。
由于使用的是美团App预定,当我选择了这种航班预定方案后,美团App要去四川航空和东方航空各帮我购买一张票。如下图:
分布式事务 - 图13
考虑最简单的情况:美团先去川航帮我买票,如果买不到,那么东航也没必要买了。如果川航购买成功,再去东航购买另一张票。
现在问题来了:假设美团先从川航成功买到了票,然后去东航买票的时候,因为天气问题,东航航班被取消了。那么此时,美团必须取消川航的票,因为只有一张票是没用的,不取消就是浪费我的钱。那么如果取消会怎样呢?如果读者有取消机票经历的话,非正常退票,肯定要扣手续费的。在这里,川航本来已经购买成功,现在因为东航的原因要退川航的票,川航应该是要扣代理商的钱的。
那么美团就要保证,如果任一航班购买失败,都不能扣钱,怎么做呢?
两个航空公司都为美团提供以下3个接口:机票预留接口、确认接口、取消接口。美团App分2个阶段进行调用,如下所示:
分布式事务 - 图14
在第1阶段:
美团分别请求两个航空公司预留机票,两个航空公司分别告诉美团预留成功还是失败。航空公司需要保证,机票预留成功的话,之后一定能购买到。
在第2阶段:
如果两个航空公司都预留成功,则分别向两个公司发送确认购买请求。
如果两个航空公司任意一个预留失败,则对于预留成功的航空公司也要取消预留。这种情况下,对于之前预留成功机票的航班取消,也不会扣用户的钱,因为购买并没实际发生,之前只是请求预留机票而已。
通过这种方案,可以保证两个航空公司购买机票的一致性,要不都成功,要不都失败,即使失败也不会扣用户的钱。如果在两个航班都已经已经确认购买后,再退票,那肯定还是要扣钱的。
当然,实际情况肯定这里提到的肯定要复杂,通常航空公司在第一阶段,对于预留的机票,会要求在指定的时间必须确认购买(支付成功),如果没有及时确认购买,会自动取消。假设川航要求10分钟内支付成功,东航要求30分钟内支付成功。以较短的时间算,如果用户在10分钟内支付成功的话,那么美团会向两个航空公司都发送确认购买的请求,如果超过10分钟(以较短的时间为准),那么就不能进行支付。
这个方案提供给我们一种跨服务保证事务一致性的一种解决思路,可以把这种方案当做TCC的雏形。
具体流程:
分布式事务 - 图15
TCC是Try ( 尝试 ) — Confirm(确认) — Cancel ( 取消 ) 的简称:

操作方法 含义
Try 完成所有业务检查(一致性),预留业务资源(准隔离性) 回顾上面航班预定案例的阶段1,机票就是业务资源,所有的资源提供者(航空公司)预留都成功,try阶段才算成功
Confirm 确认执行业务操作,不做任何业务检查, 只使用Try阶段预留的业务资源。回顾上面航班预定案例的阶段2,美团APP确认两个航空公司机票都预留成功,因此向两个航空公司分别发送确认购买的请求。
Cancel 取消Try阶段预留的业务资源。回顾上面航班预定案例的阶段2,如果某个业务方的业务资源没有预留成功,则取消所有业务资源预留请求。

2.2.2 TCC两阶段提交与XA两阶段提交的区别

XA是资源层面的分布式事务,强一致性,在两阶段提交的整个过程中,一直会持有资源的锁。
TCC是业务层面的分布式事务,最终一致性,不会一直持有资源的锁。
其核心在于将业务分为两个操作步骤完成。不依赖 RM 对分布式事务的支持,而是通过对业务逻辑的分解来实现分布式事务。

2.3 本地消息表(异步确保)- 事务最终一致性

这种实现方式的思路,其实是源于 ebay,后来通过支付宝等公司的布道,在业内广泛使用。其基本的设计思想是将远程分布式事务拆分成一系列的本地事务。如果不考虑性能及设计优雅,借助关系型数据库中的表即可实现。

  • 订单系统新增一条消息表,将新增订单和新增消息放到一个事务里完成,然后通过轮询的方式去查询消息表,将消息推送到 MQ,库存系统去消费 MQ。

分布式事务 - 图16

  • 执行流程:
    • 订单系统,添加一条订单和一条消息,在一个事务里提交。
    • 订单系统,使用定时任务轮询查询状态为未同步的消息表,发送到 MQ,如果发送失败,就重试发送。
    • 库存系统,接收 MQ 消息,修改库存表,需要保证幂等操作。
    • 如果修改成功,调用 RPC 接口修改订单系统消息表的状态为已完成或者直接删除这条消息。
    • 如果修改失败,可以不做处理,等待重试。
  • 订单系统中的消息有可能由于业务问题会一直重复发送,所以为了避免这种情况可以记录一下发送次数,当达到次数限制之后报警,人工接入处理;库存系统需要保证幂等,避免同一条消息被多次消费造成数据一致。
  • 本地消息表这种方案实现了最终一致性,需要在业务系统里增加消息表,业务逻辑中多一次插入的 DB 操作,所以性能会有损耗,而且最终一致性的间隔主要由定时任务的间隔时间决定。
  • 优点: 一种非常经典的实现,避免了分布式事务,实现了最终一致性。
  • 缺点: 消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。

2.4 MQ 事务消息

有一些第三方的MQ是支持事务消息的,比如RocketMQ,他们支持事务消息的方式也是类似于采用的二阶段提交,但是市面上一些主流的MQ都是不支持事务消息的,比如 RabbitMQ 和 Kafka 都不支持。
以阿里的 RocketMQ 中间件为例,其思路大致为:

  1. RocketMQ提供了类似X/Open XA的分布事务功能,通过MQ的事务消息能达到分布式事务的最终一致。
  2. 发送方在业务执行开始会先向消息服务器中投递 “ 半消息 ” ,半消息即暂时不会真正投递的消息,当发送方(即生产者)将消息成功发送给了MQ服务端且并未将该消息的二次确认结果返回,此时消息状态是“ 暂时不可投递 ” 状态(可以认为是状态未知)。该状态下的消息即半消息。
  3. 如果出现网络闪断、生产者应用重启等原因导致事务消息二次确认丢失,MQ服务端会通过扫描发现某条消息长期处于 “ 半消息 ” 状态,MQ服务端会主动向生产者查询该消息的最终状态是处于Commit(消息提交)还是Rollback(消息回滚)。这个过程称为消息回查

在业务方法内要想消息队列提交两次请求,一次发送消息和一次确认消息。如果确认消息发送失败了RocketMQ会定期扫描消息集群中的事务消息,这时候发现了Prepared消息,它会向消息发送者确认,所以生产方需要实现一个check接口,RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
总体而言RocketMQ事务消息分为两条主线

  • 定时任务发送流程:发送half message(半消息),执行本地事务,发送事务执行结果
  • 定时任务回查流程:MQ服务器回查本地事务,发送事务执行结果

分布式事务 - 图17
具体流程如下
1、Producer 向 MQ 服务器 发送消息 , MQ Server 将消息状态标记为 Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费到的。
2、MQ 服务器收到消息并持久化成功之后,会向Producer 确认首次消息发送成功,此时消息处于 half message(半消息) 状态,并未发送给对应的 Consumer 。
3、Producer 开始执行本地事务逻辑 , 通过本地数据库事务控制。
4、根据事务执行结果,Producer 向 MQ 服务器提交二次确认 ( commit 或者 rollback) 。MQ Server 收到 Commit 状态则将半消息标记为可投递,Consumer 最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,Consumer 将不会接受该消息。
5、在断网或者应用重启的情况下,二次确认未成功的发给 MQ Server,MQ Server 会主动向 Producer 启动消息回查
6、Producer 根据事务执行结果,对消息回查返回对应的结果。
7、Mq Server根据返回结果,决定继续投递消息或者丢弃消息(重复第4步操作)。
注意 1-4 为事务消息的发送过程, 5-6 为事务消息的回查过程。
优点: 实现了最终一致性,不需要依赖本地数据库事务。
缺点: 目前主流MQ中只有RocketMQ支持事务消息。

2.5 Seata介绍

http://seata.io/zh-cn/
Seata是阿里开源的一个分布式事务框架,能够让大家在操作分布式事务时,像操作本地事务一样简单。一个注解搞定分布式事务。
解决分布式事务问题,有两个设计初衷

  • 对业务无侵入:即减少技术架构上的微服务化所带来的分布式事务问题对业务的侵入
  • 高性能:减少分布式事务解决方案所带来的性能消耗

Seata中有两种分布式事务实现方案,AT及TCC

  • AT模式主要关注多 DB 访问的数据一致性,当然也包括多服务下的多 DB 数据访问一致性问题 2PC-改进
  • TCC 模式主要关注业务拆分,在按照业务横向扩展资源时,解决微服务间调用的一致性问题

那 Seata 是怎么做到的呢?下面说说它的各个模块之间的关系。
Seata 的设计思路是将一个分布式事务可以理解成一个全局事务,下面挂了若干个分支事务,而一个分支事务是一个满足 ACID 的本地事务,因此我们可以操作分布式事务像操作本地事务一样。
2019 年 1 月,阿里巴巴中间件团队发起了开源项目 Fescar(Fast & EaSy Commit And Rollback),和社区一起共建开源分布式事务解决方案。Fescar 的愿景是让分布式事务的使用像本地事务的使用一样,简单和高效,并逐步解决开发者们遇到的分布式事务方面的所有难题。
Seata全称:Simple Extensible Autonomous Transaction Architecture,简单可扩展自治事务框架。

AT模式(Automatic (Branch) Transaction Mode)

  • Transaction Coordinator (TC):事务协调器,维护全局事务的运行状态,负责协调并决定全局事务的提交或回滚。
  • Transaction Manager(TM): 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交全局回滚的决议。
  • Resource Manager (RM):资源管理器,负责本地事务的注册,本地事务状态的汇报(投票),并且负责本地事务的提交和回滚
  • XID:一个全局事务的唯一标识

其中,TM是一个分布式事务的发起者和终结者,TC负责维护分布式事务的运行状态,而RM则负责本地事务的运行。
如下图所示:
分布式事务 - 图18
下面是一个分布式事务在Seata中的执行流程:

  1. TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID
  2. XID 在微服务调用链路的上下文中传播。
  3. RM 向 TC 注册分支事务,接着执行这个分支事务并提交(重点:RM在第一阶段就已经执行了本地事务的提交/回滚),最后将执行结果汇报给TC
  4. TM 根据 TC 中所有的分支事务的执行情况,发起全局提交或回滚决议。
  5. TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。

Seata 中有三大模块,分别是 TM、RM 和 TC。 其中 TM 和 RM 是作为 Seata 的客户端与业务系统集成在一起,TC 作为 Seata 的服务端独立部署。
阿里云GTS,商业付费版。

MT模式(Manual (Branch) Transaction Mode)

Seata还支持MT模式。MT模式本质上是一种TCC方案,业务逻辑需要被拆分为 Prepare/Commit/Rollback 3 部分,形成一个 MT 分支,加入全局事务。如图所示:
分布式事务 - 图19
MT 模式一方面是 AT 模式的补充。另外,更重要的价值在于,通过 MT 模式可以把众多非事务性资源纳入全局事务的管理中。

第三章 Seata案例

3.1 需求分析

分布式事务 - 图20
完成一个案例,用户下单的时候记录下单日志,完成订单添加,完成用户账户扣款,完成商品库存削减功能,一会在任何一个微服务中制造异常,测试分布式事务。
先将\seata\案例SQL脚本数据库脚本导入到数据库中。
分布式事务 - 图21

3.2 案例实现

Fescar:全称fast easy commit and rollback

3.2.1 父工程

5 搭建 fescar-parent

6 pom.xml依赖如下

<?xml version=”1.0” encoding=”UTF-8”?>
<project xmlns=”http://maven.apache.org/POM/4.0.0
xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance
xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd“>
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu</groupId>
<artifactId>fescar-parent</artifactId>
<version>1.0-SNAPSHOT</version>
<modules>
<module>fescar-api</module>
<module>fescar-eureka</module>
<module>fescar-item</module>
<module>fescar-user</module>
<module>fescar-order</module>
<module>fescar-business</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.2.RELEASE</version>
</parent>
<packaging>pom</packaging>
<properties>
<skipTests>true</skipTests>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>com.github.wxpay</groupId>
<artifactId>wxpay-sdk</artifactId>
<version>0.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper-spring-boot-starter</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.SR1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>

3.2.2 公共工程fescar-api

将所有数据库对应的Pojo/Feign抽取出一个公共工程fescar-api,在该工程中导入依赖:

7 pom.xml

<?xml version=”1.0” encoding=”UTF-8”?>
<project xmlns=”http://maven.apache.org/POM/4.0.0
xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance
xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd“>
<parent>
<artifactId>fescar-parent</artifactId>
<groupId>com.atguigu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<description>API:Model和Feign</description>
<artifactId>fescar-api</artifactId>
</project>

8 将Pojo导入到工程中

分布式事务 - 图22

3.2.3 商品微服务fescar-item

创建fescar-item微服务,在该工程中实现库存削减。

9 pom.xml

<?xml version=”1.0” encoding=”UTF-8”?>
<project xmlns=”http://maven.apache.org/POM/4.0.0
xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance
xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd“>
<parent>
<artifactId>fescar-parent</artifactId>
<groupId>com.atguigu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>fescar-item</artifactId>
<dependencies>
<dependency>
<groupId>com.atguigu</groupId>
<artifactId>fescar-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

10 Dao

创建com.atguigu.dao.ItemInfoMapper,代码如下:

package com.atguigu.dao;
import com.atguigu.pojo.ItemInfo;
import tk.mybatis.mapper.common.Mapper;
public interface ItemInfoMapper extends Mapper<ItemInfo> {
}

11 Service

创建com.atguigu.service.ItemInfoService接口,并创建库存递减方法,代码如下:

package com.atguigu.service;
public interface ItemInfoService {
/
库存递减
@param id
@param count
/
void decrCount(int id, int** count);
}

创建com.atguigu.service.impl.ItemInfoServiceImpl实现库存递减操作,代码如下:

package com.atguigu.service.impl;
import com.atguigu.dao.ItemInfoMapper;
import com.atguigu.pojo.ItemInfo;
import com.atguigu.service.ItemInfoService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class ItemInfoServiceImpl implements ItemInfoService {
@Autowired
private ItemInfoMapper itemInfoMapper;
/
库存递减
@param id
@param count
*/ @Transactional(rollbackFor = Exception.class) @Override
public void decrCount(int id, int count) {
//查询商品信息 ItemInfo itemInfo = itemInfoMapper.selectByPrimaryKey(id);
itemInfo.setCount(itemInfo.getCount()-count);
int** dcount = itemInfoMapper.updateByPrimaryKeySelective(itemInfo);
System.out.println(“库存递减受影响行数:”+dcount);
}
}

12 Controller

创建com.atguigu.controller.ItemInfoController,代码如下:

package com.atguigu.controller;
import com.atguigu.service.ItemInfoService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.;
@RestController
@RequestMapping(“/itemInfo”)
@CrossOrigin
public class ItemInfoController {
@Autowired
private ItemInfoService itemInfoService;
/**
库存递减
@param id
@param count
@return
/
@PostMapping(value = “/decrCount”)
public String decrCount(@RequestParam(value = “id”) int id, @RequestParam(value = “count”) int count){
//库存递减 itemInfoService.decrCount(id,count);
return “success”;
}
}

13 启动类

创建com.atguigu.ItemApplication代码如下:

package com.atguigu;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import tk.mybatis.spring.annotation.MapperScan;
@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients(basePackages = {“com.atguigu.feign”})
@MapperScan(basePackages = {“com.atguigu.dao”})
public class ItemApplication {
public static void main(String[] args) {
SpringApplication.run(ItemApplication.class,args);
}
}

14 application.yml

创建applicatin.yml,配置如下:

server:
port: 18082spring:
application:
name: item
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/fescar-item?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
username: root
password: root
main:
allow-bean-definition-overriding: true
eureka:
client:
service-url:
defaultZone: http://127.0.0.1:7001/eureka/ instance:
prefer-ip-address: true
feign:
hystrix:
enabled: true#hystrix 配置hystrix:
command:
default:
execution:
isolation:
thread:
timeoutInMilliseconds: 10000 strategy: SEMAPHORE

3.2.4 用户微服务fescar-user

创建fescar-user微服务,并引入公共工程依赖。

15 pom.xml

<?xml version=”1.0” encoding=”UTF-8”?>
<project xmlns=”http://maven.apache.org/POM/4.0.0
xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance
xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd“>
<parent>
<artifactId>fescar-parent</artifactId>
<groupId>com.atguigu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>fescar-user</artifactId>
<dependencies>
<dependency>
<groupId>com.atguigu</groupId>
<artifactId>fescar-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

16 Dao

创建com.atguigu.dao.UserInfoMapper,代码如下:

package com.atguigu.dao;
import com.atguigu.pojo.UserInfo;
import tk.mybatis.mapper.common.Mapper;
public interface UserInfoMapper extends Mapper<UserInfo> {
}

17 Service

创建com.atguigu.service.UserInfoService接口,代码如下:

package com.atguigu.service;
public interface UserInfoService {
/
账户金额递减
@param username
@param money
*/
void decrMoney(String username, int** money);
}

创建com.atguigu.service.impl.UserInfoServiceImpl实现用户账户扣款,代码如下:

package com.atguigu.service.impl;
import com.atguigu.dao.UserInfoMapper;
import com.atguigu.pojo.UserInfo;
import com.atguigu.service.UserInfoService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class UserInfoServiceImpl implements UserInfoService {
@Autowired
private UserInfoMapper userInfoMapper;
/
账户金额递减
@param username
@param money
*/ @Transactional(rollbackFor = Exception.class) @Override
public void decrMoney(String username, int money) {
UserInfo userInfo = userInfoMapper.selectByPrimaryKey(username);
userInfo.setMoney(userInfo.getMoney()-money);
int count = userInfoMapper.updateByPrimaryKeySelective(userInfo);
System.out.println(“添加用户受影响行数:”+count);
// int q=10/0;** }
}

18 Controller

创建com.atguigu.controller.UserInfoController代码如下:

package com.atguigu.controller;
import com.atguigu.service.UserInfoService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.;
@RestController
@RequestMapping(“/userInfo”)
@CrossOrigin
public class UserInfoController {
@Autowired
private UserInfoService userInfoService;
/**

账户余额递减
@param username
@param money
/
@PostMapping(value = “/add”)
public String decrMoney(@RequestParam(value = “username”) String username,
@RequestParam(value = “money”) int money){
userInfoService.decrMoney(username,money);
return “success”;
}
}

19 启动类

创建com.atguigu.UserApplication,代码如下:

package com.atguigu;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import tk.mybatis.spring.annotation.MapperScan;
@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients(basePackages = {“com.atguigu.feign”})
@MapperScan(basePackages = {“com.atguigu.dao”})
public class UserApplication {
public static void main(String[] args) {
SpringApplication.run(UserApplication.class,args);
}
}

20 application.yml

创建application.yml配置如下:

server: port: 18084
spring: application: name: user
datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/fescar-user?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
username: root
password: root
main: allow-bean-definition-overriding: true
eureka: client: service-url: defaultZone: http://127.0.0.1:7001/eureka instance: prefer-ip-address: truefeign: hystrix: enabled: true#hystrix 配置
hystrix: command: default: execution: isolation: thread: timeoutInMilliseconds: 10000 strategy: SEMAPHORE

3.2.5 订单微服务fescar-order

在fescar-order订单微服务中实现调用商品微服务递减库存。

21 pom.xml

<?xml version=”1.0” encoding=”UTF-8”?>
<project xmlns=”http://maven.apache.org/POM/4.0.0
xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance
xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd“>
<parent>
<artifactId>fescar-parent</artifactId>
<groupId>com.atguigu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>fescar-order</artifactId>
<dependencies>
<dependency>
<groupId>com.atguigu</groupId>
<artifactId>fescar-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

22 Dao

创建com.atguigu.dao.OrderInfoMapper,代码如下:

package com.atguigu.dao;
import com.atguigu.pojo.OrderInfo;
import tk.mybatis.mapper.common.Mapper;
public interface OrderInfoMapper extends Mapper<OrderInfo> {
}

23 Service

创建com.atguigu.service.OrderInfoService实现添加订单操作,代码如下:

package com.atguigu.service;
public interface OrderInfoService {
/
添加订单
@param username
@param id
@param count
/
void add(String username, int id, int** count);
}

创建com.atguigu.service.impl.OrderInfoServiceImpl,代码如下:

package com.atguigu.service.impl;
import com.atguigu.dao.OrderInfoMapper;
import com.atguigu.feign.ItemInfoFeign;
import com.atguigu.pojo.OrderInfo;
import com.atguigu.service.OrderInfoService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class OrderInfoServiceImpl implements OrderInfoService {
@Autowired
private OrderInfoMapper orderInfoMapper;
@Autowired
private ItemInfoFeign itemInfoFeign;
/
添加订单
@param username
@param id
@param count
/
@Transactional
@Override
public void add(String username, int id, int count) {
//添加订单 OrderInfo orderInfo =
new OrderInfo();
orderInfo.setMessage(“生成订单”);
orderInfo.setMoney(10);
int** icount = orderInfoMapper.insertSelective(orderInfo);
System.out.println(“添加订单受影响函数:”+icount);
//递减库存 itemInfoFeign.decrCount(id,count);
}
}

24 Controller

创建com.atguigu.controller.OrderInfoController调用下单操作,代码如下:

package com.atguigu.controller;
import com.atguigu.service.OrderInfoService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.;
@RestController
@RequestMapping(“/orderInfo”)
@CrossOrigin
public class OrderInfoController {
@Autowired
private OrderInfoService orderInfoService;
/**
增加订单
@param username
@param id
@param count
/
@PostMapping(value = “/add”)
public String add(@RequestParam(value = “name”) String username, @RequestParam(value = “id”) int id, @RequestParam(value = “count”) int count){
//添加订单 orderInfoService.add(username,id,count);
return “success”;
}
}

25 启动类

创建com.atguigu.OrderApplication启动类,代码如下:

package com.atguigu;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import tk.mybatis.spring.annotation.MapperScan;
@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients(basePackages = {“com.atguigu.feign”})
@MapperScan(basePackages = {“com.atguigu.dao”})
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class,args);
}
}

26 application.yml

server: port: 18083
spring: application: name: order
datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/fescar-order?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
username: root
password: root
main: allow-bean-definition-overriding: true
eureka: client: service-url: defaultZone: http://127.0.0.1:7001/eureka instance: prefer-ip-address: truefeign: hystrix: enabled: true#hystrix 配置
hystrix: command: default: execution: isolation: thread: timeoutInMilliseconds: 10000 strategy: SEMAPHORE

3.2.6 业务微服务fescar-business

创建fescar-business业务微服务,在该微服务中实现分布式事务控制,下单入口从这里开始。

27 pom.xml

<?xml version=”1.0” encoding=”UTF-8”?>
<project xmlns=”http://maven.apache.org/POM/4.0.0
xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance
xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd“>
<parent>
<artifactId>fescar-parent</artifactId>
<groupId>com.atguigu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<description>分布式事务业务控制</description>
<artifactId>fescar-business</artifactId>
<dependencies>
<dependency>
<groupId>com.atguigu</groupId>
<artifactId>fescar-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

28 Dao

创建com.atguigu.dao.LogInfoMapper代码如下:

package com.atguigu.dao;
import com.atguigu.pojo.LogInfo;
import tk.mybatis.mapper.common.Mapper;
public interface LogInfoMapper extends Mapper<LogInfo> {
}

29 Service

创建com.atguigu.service.BusinessService接口,代码如下:

package com.atguigu.service;
public interface BusinessService {
/
下单
@param username
@param id
@param count
*/
void add(String username, int id, int** count);
}

创建com.atguigu.service.impl.BusinessServiceImpl,代码如下:

package com.atguigu.service.impl;
import com.atguigu.dao.LogInfoMapper;
import com.atguigu.feign.OrderInfoFeign;
import com.atguigu.feign.UserInfoFeign;
import com.atguigu.pojo.LogInfo;
import com.atguigu.service.BusinessService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
@Service
public class BusinessServiceImpl implements BusinessService {
@Autowired
private OrderInfoFeign orderInfoFeign;
@Autowired
private UserInfoFeign userInfoFeign;
@Autowired
private LogInfoMapper logInfoMapper;
/

下单
@GlobalTransactional:全局事务入口
@param username
@param id
@param count
/
@Override
public void add(String username, int id, int count) {
//添加订单日志 LogInfo logInfo =
new LogInfo();
logInfo.setContent(“添加订单数据—-“+
new Date());
logInfo.setCreatetime(
new Date());
int** logcount = logInfoMapper.insertSelective(logInfo);
System.out.println(“添加日志受影响行数:”+logcount);
//添加订单 orderInfoFeign.add(username,id,count);
//用户账户余额递减 userInfoFeign.decrMoney(username,10);
}
}

30 Controller

创建com.atguigu.controller.BusinessController,代码如下:

package com.atguigu.controller;
import com.atguigu.service.BusinessService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = “/business”)
public class BusinessController {
@Autowired
private BusinessService businessService;
/
购买商品分布式事务测试
*/
@RequestMapping(value = “/addorder”)
public String order(){
String username=”zhangsan”;
int id=1;
int count=5;
//下单 businessService.add(username,id,count);
return **”success”;
}
}

31 启动类

创建启动类com.atguigu.BusinessApplication,代码如下:

package com.atguigu;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import tk.mybatis.spring.annotation.MapperScan;
@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients(basePackages = {“com.atguigu.feign”})
@MapperScan(basePackages = {“com.atguigu.dao”})
public class BusinessApplication {
public static void main(String[] args) {
SpringApplication.run(BusinessApplication.class,args);
}
}

32 application.yml

server: port: 18081
spring: application: name: business
datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/fescar-business?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
username: root
password: root
main: allow-bean-definition-overriding: true
eureka: client: service-url: defaultZone: http://127.0.0.1:7001/eureka instance: prefer-ip-address: truefeign: hystrix: enabled: true
#读取超时设置
ribbon: ReadTimeout: 30000
#hystrix 配置
hystrix: command: default: execution: isolation: thread: timeoutInMilliseconds: 10000 strategy: SEMAPHORE

3.2.7 直接启动测试

第四章 Seata使用

Seata参考地址:https://github.com/seata/seata-samples/blob/master/doc/quick-integration-with-spring-cloud.md

4.1 引入依赖

在fescar-api项目中引入依赖,这样可以传递给其他微服务项目使用。

<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>

4.2 registry.conf 配置文件

在 fescar-api 工程的resource 文件夹下面,新建 registry.conf 配置文件, TC 存储RM注册信息的文件配置

registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = “file”
nacos {
serverAddr = “localhost” namespace = “public” cluster = “default” }
eureka {
serviceUrl = “http://localhost:8761/eureka“ application = “default” weight = “1” }
redis {
serverAddr = “localhost:6379” db = “0” }
zk {
cluster = “default” serverAddr = “127.0.0.1:2181” session.timeout = 6000 connect.timeout = 2000 }
consul {
cluster = “default” serverAddr = “127.0.0.1:8500” }
etcd3 {
cluster = “default” serverAddr = “http://localhost:2379“ }
sofa {
serverAddr = “127.0.0.1:9603” application = “default” region = “DEFAULT_ZONE” datacenter = “DefaultDataCenter” cluster = “default” group = “SEATA_GROUP” addressWaitTime = “3000” }
file {
name = “file.conf” }
}
config {
# file、nacos 、apollo、zk、consul、etcd3 type = “file”
nacos {
serverAddr = “localhost” namespace = “public” cluster = “default” }
consul {
serverAddr = “127.0.0.1:8500” }
apollo {
app.id = “seata-server” apollo.meta = “http://192.168.1.204:8801“ }
zk {
serverAddr = “127.0.0.1:2181” session.timeout = 6000 connect.timeout = 2000 }
etcd3 {
serverAddr = “http://localhost:2379“ }
file {
name = “file.conf” }
}

4.3 file.conf 配置文件

在 fescar-api 工程的resource 文件夹下面,新建 file.conf 配置文件,各大微服务的RM和TC之间的通信配置

transport {
# tcp udt unix-domain-socket type = “TCP” #NIO NATIVE server = “NIO” #enable heartbeat heartbeat = true
#thread factory for netty thread-factory {
boss-thread-prefix = “NettyBoss” worker-thread-prefix = “NettyServerNIOWorker” server-executor-thread-prefix = “NettyServerBizHandler” share-boss-worker = false
client-selector-thread-prefix = “NettyClientSelector” client-selector-thread-size = 1 client-worker-thread-prefix = “NettyClientWorkerThread” # netty boss thread size,will not be used for UDT boss-thread-size = 1 #auto default pin or 8 worker-thread-size = 8 }
shutdown {
# when destroy server, wait seconds wait = 3 }
serialization = “seata” compressor = “none”}
service {
#vgroup->rgroup vgroup_mapping.my_test_tx_group = “default” #only support single node default.grouplist = “127.0.0.1:8091” #degrade current not support enableDegrade = false
#disable disable = false
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent max.commit.retry.timeout = “-1” max.rollback.retry.timeout = “-1”}
client {
#TC通知提交最大的数据缓存大小 async.commit.buffer.limit = 10000 lock {
retry.internal = 10 retry.times = 30 }
report.retry.count = 5}
## transaction log storestore {
## store mode: file、db mode = “file”
## file store file {
dir = “sessionStore”
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions max-branch-session-size = 16384 # globe session size , if exceeded throws exceptions max-global-session-size = 512 # file buffer size , if exceeded allocate new buffer file-write-buffer-cache-size = 16384 # when recover batch read size session.reload.read_size = 100 # async, sync flush-disk-mode = async
}
## database store db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc. datasource = “dbcp” ## mysql/oracle/h2/oceanbase etc. db-type = “mysql” url = “jdbc:mysql://127.0.0.1:3306/seata” user = “mysql” password = “mysql” min-conn = 1 max-conn = 3 global.table = “global_table” branch.table = “branch_table” lock-table = “lock_table” query-limit = 100 }
}
lock {
## the lock store mode: local、remote mode = “remote”
local {
## store locks in user’s database }
remote {
## store locks in the seata’s server }
}
recovery {
committing-retry-delay = 30 asyn-committing-retry-delay = 30 rollbacking-retry-delay = 30 timeout-retry-delay = 30}
transaction {
undo.data.validation = true
undo.log.serialization = “jackson”}
## metrics settingsmetrics {
enabled = false
registry-type = “compact” # multi exporters use comma divided exporter-list = “prometheus” exporter-prometheus-port = 9898}

4.4 tx-service-group配置

修改 fescar-business , fescar-item , fescar-order fescar-user 各大微服务的 application.yml 配置,配置通信指定的组名( my_test_tx_group 在 file.conf 中有):

cloud:
alibaba:
seata:
tx-service-group: my_test_tx_group

4.5 在 fescar-api 工程下面新建配置类

package com.atguigu.config;
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
@Configuration
public class DataSourceProxyConfig {
/
普通数据源
@return
*/
@Bean
@ConfigurationProperties(prefix = “spring.datasource”)
public DataSource dataSource() {
return new DruidDataSource();
}
/

代理数据源绑定DataSourceProxy —-> undo_log的操作
@param dataSource
@return
/
@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);
}
/
mybatis—->手动指定sqlSessionFactory所使用的代理数据源
@param dataSourceProxy
@return
@throws Exception
*/
@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean =
new SqlSessionFactoryBean();
// 换成代理数据源 sqlSessionFactoryBean.setDataSource(dataSourceProxy);
return** sqlSessionFactoryBean.getObject();
}
}

4.6 在每个数据库中添加 undo_log 表

DROP TABLE undo_log;
CREATE TABLE undo_log(
id BIGINT(20) NOT NULL AUTO_INCREMENT,
branch_id BIGINT(20) NOT NULL,
xid VARCHAR(100) NOT NULL,
context VARCHAR(128) NOT NULL,
rollback_info LONGBLOB NOT NULL,
log_status INT(11) NOT NULL,
log_created DATETIME NOT NULL,
log_modified DATETIME NOT NULL,
ext VARCHAR(100) DEFAULT NULL,
PRIMARY KEY (id),
UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8

4.7 启动 seata-server

双击 D:\seata-server-0.8.0\bin\seata-server.bat

4.8 入口方法上添加@GlobalTransactional开启事务

分布式事务 - 图23

第五章 Seata之原理简介

5.1 再看TC/TM/RM三大组件

分布式事务 - 图24

5.2 分布式事务的执行流程

  • TM开启分布式事务(TM向TC注册全局事务记录)
  • 换业务场景,编排数据库,服务等事务内资源(RM向TC汇报资源准备状态)
  • TM结束分布式事务,事务一阶段结束(TM通知TC提交/回滚分布式事务)
  • TC汇总事务信息,决定分布式事务是提交还是回滚
  • TC通知所有RM提交/回滚资源,事务二阶段结束。

    5.3 AT模式如何做到对业务的无侵入

    是什么
    分布式事务 - 图25

分布式事务 - 图26

5.3.1 一阶段加载

  • 在一阶段,Seata会拦截”业务SQL”
    • 1.解析SQL语义,找到“业务SQL”要更新的业务数据,在业务数据被更新前,将其保存成“before image”
    • 2.执行”业务SQL”更新业务数据,在业务数据更新之后,
    • 3.其保存成“after image”,最后生成行锁。
  • 以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。

分布式事务 - 图27

5.3.2 二阶段提交

  • 二阶段如果顺利提交的话,因为“业务SQL”在一阶段已经提交至数据库,所以Seata框架只需将一阶段保存的快照数据和行锁删掉,完成数据清理即可。

分布式事务 - 图28

5.3.3 二阶段回滚

二阶段如果是回滚的话,Seata就需要回滚一阶段执行的“业务SQL”,还原业务数据。
回滚方式便是用“before image”还原业务数据;但在还原前要首先校验脏写,对比“数据库当前业务数据”和“after image”,如果两份数据完全一致就说明没有脏写,可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转人工处理。
分布式事务 - 图29