5.1 Seata简介
官网入口
在分布式微服务中,存在多个微服务相互调用。当A微服务调用B微服务时,当其中一个微服务出现异常需要保证整条链路能够全部回滚。Seata 就是一款开源的分布式事务解决方案。
seata提供四种事务模型 AT、TCC、SAGA、XA。
5.1.1 Seata 的三大组件
- TC:Transaction Coordinator 事务协调器,维护全局和分支事务的状态,负责协调并驱动全局事务的提交或回滚
- TM:Transaction Manager 事务管理器,控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议
- RM:Resource Manager 资源管理器,管理分支事务处理的资源,向 TC 注册分支事务,上报分支事务的状态,接受 TC 的命令来提交或者回滚分支事务
TC 相当于seata服务端,协调各个微服务的事务,TM相当于注册在seata的客户端,RM相当于在微服务中代码块。
5.1.2 Seata 的执行流程
- A 服务的 TM 向 TC 申请开启一个全局事务,TC 就会创建一个全局事务并返回一个唯一的 XID
- A 服务的 RM 向 TC 注册分支事务,并将其纳入 XID 对应全局事务的管辖
- A 服务执行分支事务,向数据库执行操作
- A 服务开始远程调用 B 服务,此时 XID 会在微服务的调用链上传播
- B 服务的 RM 向 TC 注册分支事务,并将其纳入 XID 对应的全局事务的管辖
- B 服务执行分支事务,向数据库执行操作
- 全局事务调用链处理完毕,TM 根据有无异常向 TC 发起全局事务的提交或者回滚
- TC 协调其管辖之下的所有分支事务,决定是否回滚
5.1.3 Seata的事务模式
这里只摘录AT模式前提
- 基于支持本地 ACID 事务的关系型数据库。
-
整体机制
两阶段提交协议的演变:
一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
二阶段:
一阶段本地事务提交前,需要确保先拿到 全局锁 。
- 拿不到 全局锁 ,不能提交本地事务。
- 拿 全局锁 的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁。
以一个示例来说明:
两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。
tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁 。
tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。
如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。
读隔离
在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted) 。
如果应用在特定场景下,必需要求全局的 读已提交 ,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理。
SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。
工作机制 -两阶段提交
一阶段
过程:
- 解析 SQL:得到 SQL 的类型(UPDATE),表(product),条件(where name = ‘TXC’)等相关的信息。
- 查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据。
- 执行业务 SQL
- 查询后镜像:根据前镜像的结果,通过 主键 定位数据。
- 插入回滚日志:把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到 UNDO_LOG 表中。
- 提交前,向 TC 注册分支:申请 product 表中,主键值等于 1 的记录的 全局锁 。
- 本地事务提交:业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交。
- 将本地事务提交的结果上报给 TC。
二阶段-回滚
- 收到 TC 的分支回滚请求,开启一个本地事务,执行如下操作。
- 通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录。
- 数据校验:拿 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理(人工处理)。
- 根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句。
提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。
二阶段-提交
收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。
异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。
适用场景与优缺点
适用场景:
分布式事务的业务逻辑中仅仅是纯数据库操作,不包含其他中间件的事务逻辑
优点:
改动及代码侵入最小,由 Seata 来负责 Commit 和 Rollback 的自动化提交或回滚操作
缺点:如果事务中包含缓存存储或发送 MQ 消息等,则不适合使用
- 多次对数据库操作,以及全局行锁的存在对并发处理性能有影响
- 为了保证镜像 SQL 的可靠性,需要用户对 SQL 尽量做简化,建议做法:将多条 SQL 语句分解为多个事务中的原子步骤(对应 Seata AT 模式的分支 Branch 概念),如果单条 SQL 语句跨表,也分解成为多个事务中的原子步骤(尽量降低 Seata 存储前 SQL 镜像结果时的风险)
5.2 docker 搭建seata1.4.2 AT模式
前提: 使用nacos作为配置中心和注册中心,使用mysql作为数据中心5.2.1 环境准备
创建seata数据库并且导入表 seata.sql
每个需要分布式事务的数据库都需要的undo_log表 undo_log.sql
官网下载需要的配置文件config.txt config.txt5.2.2 配置文件修改
目录挂载 配置文件 registry.conf
设置命名空间,分组 需要和微服务保持在同一命名空间,分组下registry {
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "172.24.49.194:18848"
namespace = "3d0a77b8-817f-499b-bfda-f90d5a6e4dab"
group = "DEFAULT_GROUP"
cluster = "default"
username = "nacos"
password = "nacos"
}
}
config {
type = "nacos"
nacos {
serverAddr = "172.24.49.194:18848"
namespace = "3d0a77b8-817f-499b-bfda-f90d5a6e4dab"
group = "DEFAULT_GROUP"
username = "nacos"
password = "nacos"
dataId = "seataServer.properties"
}
}
修改config.txt中 service.vgroupMapping 设置自定义事务名称,修改store.mode=db并且修改自己的数据库配置
上传到nacos配置dataId为 seataServer.properties
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
service.vgroupMapping.order-service_tx_group=my_tx_group
service.default.grouplist=seataIP:port
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
store.mode=db
store.publicKey=
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.cj.jdbc.Driver
store.db.url=jdbc:mysql://IP:port/seata?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false
store.db.user=username
store.db.password=password
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
store.redis.mode=single
store.redis.single.host=127.0.0.1
store.redis.single.port=6379
store.redis.sentinel.masterName=
store.redis.sentinel.sentinelHosts=
store.redis.maxConn=10
store.redis.minConn=1
store.redis.maxTotal=100
store.redis.database=0
store.redis.password=
store.redis.queryLimit=100
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
5.2.3 创建镜像 需要指定镜像版本号
docker run -di --name myseata \
-p 18091:18091 \
-e SEATA_IP=注册在nacos的IP \
-e SEATA_PORT=18091 \
-e SEATA_CONFIG_NAME=file:/root/seata-config/registry \
-v /home/seata/conf:/root/seata-config \
-v /home/seata/logs:/root/logs \
seataio/seata-server:1.4.2
Seata 的高可用依赖于注册中心、配置中心和数据库来实现 启动多个seata服务即可
5.3 seata 应用
订单微服务创建订单 -> 调用商品微服务扣库存
需要在调用方,被调用方都要集成seata配置。
导入属于该版本的seata pom.xml
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency>
配置连接 seata 需要指定事务分组
seata:
tx-service-group: my_tx_group #此处配置自定义的seata事务分组名称
enable-auto-data-source-proxy: true #开启数据库代理 默认true
# service:
# vgroup-mapping:
# my_tx_group: default # default 是指集群的名称
config:
type: nacos
nacos:
server-addr: ${spring.cloud.nacos.config.server-addr}
group: ${spring.cloud.nacos.config.group}
namespace: ${spring.cloud.nacos.config.namespace}
username: ${spring.cloud.nacos.config.username}
password: ${spring.cloud.nacos.config.password}
data-id: seataServer.properties
registry:
type: nacos
nacos:
server-addr: ${spring.cloud.nacos.config.server-addr}
group: ${spring.cloud.nacos.config.group}
namespace: ${spring.cloud.nacos.config.namespace}
username: ${spring.cloud.nacos.config.username}
password: ${spring.cloud.nacos.config.password}
/**
* 扣减库存
*
* @param pid
* @param num
* @return
*/
@Override
public Product redStock(Integer pid, Integer num) {
productMapper.update(null, new LambdaUpdateWrapper<Product>().eq(Product::getPid, pid).setSql("pnum = pnum -" + num));
return productMapper.selectById(pid);
}
使用seata 注解 @GlobalTransactional 标注这个事务交给seata管理
/**
* 创建订单
*
* @param pid 商品id
* @param num 购买量
*/
@Override
@GlobalTransactional
public void createOrder(Integer pid, Integer num) throws RuntimeException {
try {
CommonResult<Product> result = productService.redStock(pid, num);
Product data = result.getData();
Order order = new Order();
order.setPid(data.getPid()).setPname(data.getPname()).setPprice(data.getPprice()).setNumber(1);
int a = 1 / 0;
orderMapper.insert(order);
} catch (Exception e) {
throw new RuntimeException("创建异常");
}
}
当开启事务 在undo_log中 会保存一条记录 当正确完成事务就会自动删除记录,出现异常就会按照 beforeImage 进行回滚。
{
"@class": "io.seata.rm.datasource.undo.BranchUndoLog",
"xid": "106.14.72.13:18091:36234756017881222",
"branchId": 36234756017881224,
"sqlUndoLogs": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.undo.SQLUndoLog",
"sqlType": "UPDATE",
"tableName": "t_product",
"beforeImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords",
"tableName": "t_product",
"rows": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.sql.struct.Row",
"fields": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "pid",
"keyType": "PRIMARY_KEY",
"type": 4,
"value": 1
},
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "pnum",
"keyType": "NULL",
"type": 4,
"value": 90
}
]
]
}
]
]
},
"afterImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords",
"tableName": "t_product",
"rows": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.sql.struct.Row",
"fields": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "pid",
"keyType": "PRIMARY_KEY",
"type": 4,
"value": 1
},
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "pnum",
"keyType": "NULL",
"type": 4,
"value": 87
}
]
]
}
]
]
}
}
]
]
}