1. Seata产品模块
Seata中有三⼤模块,分别是TM、RM和TC。其中TM和RM是作为Seata的客户端与业务系统集成在⼀起,TC作为Seata的服务端独⽴部署。
- TC (Transaction Coordinator) -事务协调者
维护全局和分⽀事务的状态,驱动全局事务提交或回滚。
- TM (Transaction Manager) -事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。
- RM (Resource Manager) -资源管理器
管理分⽀事务处理的资源,与TC交谈以注册分⽀事务和报告分⽀事务的状态,并驱动分⽀事务提交或回滚。
在Seata(基于2PC)中,分布式事务的执⾏流程:
- TM开启分布式事务, TM会 向TC注册全局事务记录;
- 操作具体业务模块的数据库操作之前, RM会向TC注册分⽀事务;
- 当业务操作完事后.TM会通知TC提交/回滚分布式事务;
- TC汇总事务信息,决定分布式事务是提交还是回滚;
- TC通知所有RM提交/回滚 资源,事务⼆阶段结束。
2. Seata-AT模式
2.1 案例引⼊及问题剖析
- 导⼊lagou_parent⼯程
2. 执⾏初始化SQL脚本,⾸先创建4个数据库
seata_bussiness/seata_order/seata_points/seata_storage,在各⾃数据库执⾏SQL脚本
seata_order数据库 ```sql */ SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0;
— Table structure for t_order
DROP TABLE IF EXISTS t_order
;
CREATE TABLE t_order
(
id
bigint(0) NOT NULL COMMENT ‘订单id’,
goods_Id
int(0) NULL DEFAULT NULL COMMENT ‘商品ID’,
num
int(0) NULL DEFAULT NULL COMMENT ‘商品数量’,
money
decimal(10, 0) NULL DEFAULT NULL COMMENT ‘商品总⾦额’,
create_time
datetime(0) NULL DEFAULT NULL COMMENT ‘订单创建时间’,
username
varchar(50) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT
NULL COMMENT ‘⽤户名称’,
PRIMARY KEY (id
) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 49 CHARACTER SET = utf8 COLLATE =
utf8_bin ROW_FORMAT = Dynamic;
— Records of t_order
SET FOREIGN_KEY_CHECKS = 1;
seata_points数据库
```sql
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for t_points
-- ----------------------------
DROP TABLE IF EXISTS `t_points`;
CREATE TABLE `t_points` (
`id` int(0) NOT NULL AUTO_INCREMENT COMMENT '积分ID',
`username` varchar(50) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL
COMMENT '⽤户名',
`points` int(0) NULL DEFAULT NULL COMMENT '⽤户积分',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8 COLLATE =
utf8_bin ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of t_points
-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;
seata_storage数据库
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for t_storage
-- ----------------------------
DROP TABLE IF EXISTS `t_storage`;
CREATE TABLE `t_storage` (
`id` int(0) NOT NULL AUTO_INCREMENT COMMENT '库存ID',
`goods_id` int(0) NULL DEFAULT NULL COMMENT '商品ID',
`storage` int(0) NULL DEFAULT NULL COMMENT '库存量',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE =
utf8_bin ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of t_storage
-- ----------------------------
INSERT INTO `t_storage` VALUES (1, 1, 100);
SET FOREIGN_KEY_CHECKS = 1;
- 案例测试
**依次将4个服务启动.和nacos服务
**
访问路径为:
http://localhost:8000/test1 正常访问数据分别⼊库
http://localhost:8000/test2访问出错库存不⾜,导致服务调⽤失败.则观察数据库, 经发现订单与积
分数据库都已改变,⽽库存数据库没有减少库存, 所以不满⾜事务的特性.
2.2 AT模式介绍
AT 模式是⼀种⽆侵⼊的分布式事务解决⽅案。在 AT 模式下,⽤户只需关注⾃⼰的“业务 SQL”,⽤户的 “业务 SQL” 作为⼀阶段,Seata 框架会⾃动⽣成事务的⼆阶段提交和回滚操作。
2.3 AT模式原理
在介绍AT 模式的时候它是⽆侵⼊的分布式事务解决⽅案, 那么如何做到对业务的⽆侵⼊的呢?
1. ⼀阶段
在⼀阶段,Seata 会拦截“业务 SQL”,⾸先解析 SQL 语义,找到“业务 SQL”要更新的业务数据,在业务数据被更新前,将其保存成“before image”,然后执⾏“业务 SQL”更新业务数据,在业务数据更新之后,再将其保存成“after image”,最后⽣成⾏锁。以上操作全部在⼀个数据库事务内完成,这样保证了⼀阶段操作的原⼦性。
2. ⼆阶段
- 提交
⼆阶段如果是提交的话,因为“业务 SQL”在⼀阶段已经提交⾄数据库, 所以 Seata 框架只需将⼀阶段保存的快照数据和⾏锁删掉,完成数据清理即可。
- 回滚
⼆阶段如果是回滚的话,Seata 就需要回滚⼀阶段已经执⾏的“业务 SQL”,还原业务数据。回滚⽅式便是⽤“before image”还原业务数据;但在还原前要⾸先要校验脏写,对⽐“数据库当前业务数据”和 “after image”,如果两份数据完全⼀致就说明没有脏写,可以还原业务数据,如果不⼀致就说明有脏写,出现脏写就需要转⼈⼯处理。
AT 模式的⼀阶段、⼆阶段提交和回滚均由 Seata 框架⾃动⽣成,⽤户只需编写“业务SQL”,便能轻松接⼊分布式事务,AT 模式是⼀种对业务⽆任何侵⼊的分布式事务解决⽅案。
2.4 AT模式改造案例
2.4.1 Seata Server - TC全局事务协调器
介绍了 seata 事务的三个模块:TC(事务协调器)、TM(事务管理器)和RM(资源管理器),其中 TM 和 RM 是嵌⼊在业务应⽤中的,⽽ TC 则是⼀个独⽴服务。
Seata Server 就是 TC,直接从官⽅仓库下载启动即可,下载地址:https://github.com/seata/seata/releases
registry.conf
Seata Server 要向注册中⼼进⾏注册,这样,其他服务就可以通过注册中⼼去发现 Seata Server,与 Seata Server 进⾏通信。
Seata ⽀持多款注册中⼼服务:nacos 、eureka、redis、zk、consul、etcd3、sofa。
我们项⽬中要使⽤ nacos注册中⼼,nacos服务的连接地址、注册的服务名,这需要在seata/conf/registry.conf⽂件中进⾏配置:
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
loadBalance = "RandomLoadBalance"
loadBalanceVirtualNodes = 10
nacos {
application = "seata-server"
serverAddr = "106.75.226.220:8848"
group = "SEATA_GROUP"
namespace = ""
cluster = "default"
username = "nacos"
password = "nacos"
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = 0
password = ""
cluster = "default"
timeout = 0
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"
nacos {
serverAddr = "106.75.226.220:8848"
namespace = ""
group = "SEATA_GROUP"
username = "nacos"
password = "nacos"
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
appId = "seata-server"
apolloMeta = "http://192.168.1.204:8801"
namespace = "application"
apolloAccesskeySecret = ""
}
zk {
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
- 向nacos中添加配置信息
下载配置config.txt https://github.com/seata/seata/tree/develop/script/config-center
https://seata.io/zh-cn/docs/user/configurations.html针对每个⼀项配置介绍
- 将config.txt⽂件放⼊seata⽬录下⾯
- 修改config.txt信息
Server端存储的模式(store.mode)现有file,db,redis三种。主要存储全局事务会话信息,分⽀事务信息, 锁记录表信息,seata-server默认是file模式。file只能⽀持单机模式, 如果想要⾼可⽤模式的话可以切换db或者redis. 为了⽅便查看全局事务会话信息本次课程采⽤db数据库模式
存储模式
store.mode=db
mysql数据库连接信息
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true
store.db.user=root
store.db.password=root
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
注意1: 需要创建seata数据库
注意2: 需要创建global_table/branch_table/lock_table三张表,seata1.0以上就不⾃带数据库⽂件了,要⾃⼰去github下载,https://github.com/seata/seata/tree/develop/scrip
-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(128),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
- 使⽤nacos-config.sh ⽤于向 Nacos 中添加配置
下载地址:https://github.com/seata/seata/tree/develop/script/config-center/nacos - 将nacos-config.sh放在seata/conf⽂件夹中
打开git bash here 执⾏nacos-config.sh,需要提前将nacos启动
输⼊命令 :sh nacos-config.sh -h 127.0.0.1
- 启动seata-server
观察nacos服务列表
2.4.2 TM/RM端整合Seata
AT 模式在RM端需要 UNDO_LOG 表,来记录每个RM的事务信息,主要包含数据修改前,后的相关信息,⽤于回滚处理,所以在所有数据库中分别执⾏
-- 注意此处0.3.0+ 增加唯⼀索引 ux_undo_log
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
TM/RM端整合seata⼀共有五个步骤:
事务分组:
RM(事务管理器)端整合Seata与TM(事务管理器)端步骤类似,只不过不需要在⽅法添加@GlobalTransactional注解,针对我们⼯程lagou_bussiness是事务的发起者,所以是TM端,其它⼯程为RM端. 所以我们只需要在lagou_common_db完成前4步骤即可
- ⼯程中添加Seata依赖
lagou_parent添加seata依赖管理,⽤于seata的版本锁定
在lagou_common_db⼯程添加seata依赖<dependencyManagement>
<dependencies>
<!--spring cloud依赖管理,引⼊了Spring Cloud的版本-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Greenwich.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--SCA -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!--SCA -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.1.0.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--SCA -->
<!--seata版本管理, ⽤于锁定⾼版本的seata -->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.3.0</version>
</dependency>
</dependencies>
</dependencyManagement>
<!--seata依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
<!--排除低版本seata依赖-->
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--添加⾼版本seata依赖-->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.3.0</version>
</dependency>
在common⼯程添加registry.conf依赖 ```xml registry {
file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = “nacos” loadBalance = “RandomLoadBalance” loadBalanceVirtualNodes = 10
nacos { application = “seata-server” serverAddr = “106.75.226.220:8848” group = “SEATA_GROUP” namespace = “” cluster = “default” username = “nacos” password = “nacos” } eureka { serviceUrl = “http://localhost:8761/eureka“ application = “default” weight = “1” } redis { serverAddr = “localhost:6379” db = 0 password = “” cluster = “default” timeout = 0 } zk { cluster = “default” serverAddr = “127.0.0.1:2181” sessionTimeout = 6000 connectTimeout = 2000 username = “” password = “” } consul { cluster = “default” serverAddr = “127.0.0.1:8500” } etcd3 { cluster = “default” serverAddr = “http://localhost:2379“ } sofa { serverAddr = “127.0.0.1:9603” application = “default” region = “DEFAULT_ZONE” datacenter = “DefaultDataCenter” cluster = “default” group = “SEATA_GROUP” addressWaitTime = “3000” } file { name = “file.conf” } }
config {
file、nacos 、apollo、zk、consul、etcd3
type = “nacos”
nacos { serverAddr = “106.75.226.220:8848” namespace = “” group = “SEATA_GROUP” username = “nacos” password = “nacos” } consul { serverAddr = “127.0.0.1:8500” } apollo { appId = “seata-server” apolloMeta = “http://192.168.1.204:8801“ namespace = “application” apolloAccesskeySecret = “” } zk { serverAddr = “127.0.0.1:2181” sessionTimeout = 6000 connectTimeout = 2000 username = “” password = “” } etcd3 { serverAddr = “http://localhost:2379“ } file { name = “file.conf” } }
3. 添加公共配置
```xml
spring.cloud.alibaba.seata.tx-service-group=my_test_tx_group //分组名称为nacos上配置文件的名称
logging.level.io.seata=debug
指定值为default,对应的配置为:
4. 在每个模块下引⼊公共配置⽂件
profiles
active: seata
- 编译数据源代理
启动扫描配置类,分别加载每个⼯程的启动类中package com.lagou.common_db;
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import
org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;
@Configuration
public class DataSourceConfiguration {
/**
* 使⽤druid连接池
*
* @return
*/
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource druidDataSource() {
return new DruidDataSource();
}
/**
* 设置数据源代理-,完成分⽀事务注册/事务提交与回滚等操作
* @param druidDataSource
* @return
*/
@Primary //设置⾸选数据源对象
@Bean("dataSource")
public DataSourceProxy dataSource(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
}
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class,
scanBasePackages = "com.lagou")
- 添加注解@GlobalTransactional ```java /**
- 商品销售 *
- @param goodsId 商品id
- @param num 销售数量
- @param username ⽤户名
- @param money ⾦额
*/
@GlobalTransactional(name = “sale”, timeoutMills = 100000, rollbackFor =Exception.class)
//@Transactional
public void sale(Integer goodsId, Integer num, Double money, String username) {
//创建订单
orderServiceFeign.addOrder(idWorker.nextId(),goodsId, num, money,username);
//增加积分
pointsServiceFeign.increase(username, (int) (money / 10));
//扣减库存
storageServiceFeign.decrease(goodsId, num);
}
```
3.Seata-TCC模式
3.1 TCC模式介绍
Seata 开源了 TCC 模式,该模式由蚂蚁⾦服贡献。TCC 模式需要⽤户根据⾃⼰的业务场景实现Try、Confirm 和 Cancel 三个操作;事务发起⽅在⼀阶段 执⾏ Try ⽅式,在⼆阶段提交执⾏ Confirm⽅法,⼆阶段回滚执⾏ Cancel ⽅法。
TCC 三个⽅法描述:
- Try:资源的检测和预留;
- Confirm:执⾏的业务操作提交;要求 Try 成功 Confirm ⼀定要能成功;
- Cancel:预留资源释放。
**业务模型分 2 阶段设计:
**
⽤户接⼊ TCC ,最重要的是考虑如何将⾃⼰的业务模型拆成两阶段来实现。
以“扣钱”场景为例,在接⼊ TCC 前,对 A 账户的扣钱,只需⼀条更新账户余额的 SQL 便能完成;
但是在接⼊ TCC 之后,⽤户就需要考虑如何将原来⼀步就能完成的扣钱操作,拆成两阶段,实现成三个⽅法,并且保证⼀阶段 Try 成功的话 ⼆阶段 Confirm ⼀定能成功。
Try ⽅法作为⼀阶段准备⽅法,需要做资源的检查和预留。在扣钱场景下,Try 要做的事情是就是检查账户余额是否充⾜,预留转账资⾦,预留的⽅式就是冻结 A 账户的 转账资⾦。Try ⽅法执⾏之后,
账号 A 余额虽然还是 100,但是其中 30 元已经被冻结了,不能被其他事务使⽤。⼆阶段 Confirm ⽅法执⾏真正的扣钱操作。Confirm 会使⽤ Try 阶段冻结的资⾦,执⾏账号扣款。
Confirm ⽅法执⾏之后,账号 A 在⼀阶段中冻结的 30 元已经被扣除,账号 A 余额变成 70 元 。
如果⼆阶段是回滚的话,就需要在 Cancel ⽅法内释放⼀阶段 Try 冻结的 30 元,使账号 A 的回到初始状态,100 元全部可⽤。
⽤户接⼊ TCC 模式,最重要的事情就是考虑如何将业务模型拆成 2 阶段,实现成 TCC 的 3 个⽅法,并且保证 Try 成功 Confirm ⼀定能成功。相对于 AT 模式,TCC 模式对业务代码有⼀定的侵⼊性,但是 TCC 模式⽆ AT 模式的全局⾏锁,TCC 性能会⽐ AT 模式⾼很多。
3.2 TCC模式改造案例
3.2.1 RM端改造
针对RM端,实现起来需要完成try/commit/rollback的实现,所以步骤相对较多但是前三步骤和AT模式⼀样
- 修改数据库表结构,增加预留检查字段,⽤于提交和回滚
ALTER TABLE `seata_order`.`t_order` ADD COLUMN `status` int(0) NULL
COMMENT '订单状态-0不可⽤,事务未提交 , 1-可⽤,事务提交' ;
ALTER TABLE `seata_points`.`t_points` ADD COLUMN `frozen_points` int(0)
NULL DEFAULT 0 COMMENT '冻结积分' AFTER `points`;
ALTER TABLE `seata_storage`.`t_storage` ADD COLUMN `frozen_storage` int(0)
NULL DEFAULT 0 COMMENT '冻结库存' AFTER `goods_id`;
- lagou_order⼯程改造
接⼝ ```sql import com.lagou.order.entity.Order; import io.seata.rm.tcc.api.BusinessActionContext; import io.seata.rm.tcc.api.BusinessActionContextParameter; import io.seata.rm.tcc.api.LocalTCC; import io.seata.rm.tcc.api.TwoPhaseBusinessAction; /**
- @LocalTCC 该注解需要添加到上⾯描述的接⼝上,表示实现该接⼝的类被 seata 来管 理,seata 根据事务的状态,
- ⾃动调⽤我们定义的⽅法,如果没问题则调⽤ Commit ⽅法,否则调⽤ Rollback ⽅
法。
/
@LocalTCC
public interface OrderService extends IService
{ /* - @TwoPhaseBusinessAction 描述⼆阶段提交
- name: 为 tcc⽅法的 bean 名称,需要全局唯⼀,⼀般写⽅法名即可
- commitMethod: Commit⽅法的⽅法名
- rollbackMethod:Rollback⽅法的⽅法名
- @BusinessActionContextParamete 该注解⽤来修饰 Try⽅法的⼊参,
- 被修饰的⼊参可以在 Commit ⽅法和 Rollback ⽅法中通过
BusinessActionContext 获取。
*/
@TwoPhaseBusinessAction(name = “addTcc”,
commitMethod = “addCommit”, rollbackMethod =
“addRollBack”)
void add(@BusinessActionContextParameter(paramName = “order”) Order order);
public boolean addCommit(BusinessActionContext context);
public boolean addRollBack(BusinessActionContext context);
}
实现类
```java
import java.util.Date;
@Slf4j
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order>
implements OrderService {
@Override
public void add(Order order) {
order.setCreateTime(new Date());//设置订单创建时间
order.setStatus(0);//try阶段-预检查
this.save(order);//保存订单
}
@Override
public boolean addCommit(BusinessActionContext context) {
Order order =
JSON.parseObject(context.getActionContext("order").toString(),
Order.class);
order = this.getById(order.getId());
if (order != null) {
order.setStatus(1);//commit阶段-提交事务
this.saveOrUpdate(order);//修改订单
}
log.info("--------->xid=" + context.getXid() + " 提交成功!");
return true;
}
@Override
public boolean addRollBack(BusinessActionContext context) {
Order order =
JSON.parseObject(context.getActionContext("order").toString(),
Order.class);
order = this.getById(order.getId());
if (order != null) {
this.removeById(order.getId());//删除订单
}
log.info("--------->xid=" + context.getXid() + " 回滚成功!");
return true;
}
}
- lagou_points⼯程改造
接⼝改造
实现类改造 ```java import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.lagou.points.mapper.PointsMapper; import com.lagou.points.entity.Points; import com.lagou.points.service.PointsService; import io.seata.rm.tcc.api.BusinessActionContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /**
- 会员积分服务
/
@Slf4j
@Service
public class PointsServiceImpl extends ServiceImpl
implements PointsService { /* - 会员增加积分 *
- @param username ⽤户名
- @param points 增加的积分
- @return 积分对象
*/
public void increase(String username, Integer points) {
QueryWrapper
wrapper = new QueryWrapper (); wrapper.lambda().eq(Points::getUsername, username); Points userPoints = this.getOne(wrapper); if (userPoints == null) { userPoints = new Points(); userPoints.setUsername(username); //userPoints.setPoints(points); 不直接增加积分 userPoints.setFrozenPoints(points);//设置冻结积分 this.save(userPoints); } else { userPoints.setFrozenPoints(points);//设置冻结积分 this.saveOrUpdate(userPoints); } } @Override public boolean increaseCommit(BusinessActionContext context) { //查询⽤户积分 QueryWrapper wrapper = new QueryWrapper (); wrapper.lambda().eq(Points::getUsername, context.getActionContext(“username”)); Points userPoints = this.getOne(wrapper); if (userPoints != null) { //增加⽤户积分 userPoints.setPoints(userPoints.getPoints() + userPoints.getFrozenPoints()); //冻结积分清零 userPoints.setFrozenPoints(0); this.saveOrUpdate(userPoints); } log.info(“————->xid=” + context.getXid() + “ 提交成功!”); return true; } @Override public boolean increaseRollback(BusinessActionContext context) { //查询⽤户积分 QueryWrapper wrapper = new QueryWrapper (); wrapper.lambda().eq(Points::getUsername, context.getActionContext(“username”)); Points userPoints = this.getOne(wrapper); if (userPoints != null) { //冻结积分清零 userPoints.setFrozenPoints(0); this.saveOrUpdate(userPoints); } log.info(“————->xid=” + context.getXid() + “ 回滚成功!”); return true; } } ```
- lagou_stroage⼯程改造
接⼝改造
实现类改造
/**
* 仓库服务
*/
@Slf4j
@Service
public class StorageServiceImpl extends ServiceImpl<StorageMapper,
Storage> implements StorageService {
/**
* 减少库存
*
* @param goodsId 商品ID
* @param quantity 减少数量
* @return 库存对象
*/
public void decrease(Integer goodsId, Integer quantity) {
QueryWrapper<Storage> wrapper = new QueryWrapper<Storage>();
wrapper.lambda().eq(Storage::getGoodsId, goodsId);
Storage goodsStorage = this.getOne(wrapper);
if (goodsStorage.getStorage() >= quantity) {
//goodsStorage.setStorage(goodsStorage.getStorage() -
quantity);
//设置冻结库存
goodsStorage.setFrozenStorage(quantity);
} else {
throw new RuntimeException(goodsId + "库存不⾜,⽬前剩余库存:"
+ goodsStorage.getStorage());
}
this.saveOrUpdate(goodsStorage);
}
@Override
public boolean decreaseCommit(BusinessActionContext context) {
QueryWrapper<Storage> wrapper = new QueryWrapper<Storage>();
wrapper.lambda().eq(Storage::getGoodsId,
context.getActionContext("goodsId"));
Storage goodsStorage = this.getOne(wrapper);
if (goodsStorage != null) {
//扣减库存
goodsStorage.setStorage(goodsStorage.getStorage() -
goodsStorage.getFrozenStorage());
//冻结库存清零
goodsStorage.setFrozenStorage(0);
this.saveOrUpdate(goodsStorage);
}
log.info("--------->xid=" + context.getXid() + " 提交成功!");
return true;
}
@Override
public boolean decreaseRollback(BusinessActionContext context) {
QueryWrapper<Storage> wrapper = new QueryWrapper<Storage>();
wrapper.lambda().eq(Storage::getGoodsId,
context.getActionContext("goodsId"));
Storage goodsStorage = this.getOne(wrapper);
if (goodsStorage != null) {
//冻结库存清零
goodsStorage.setFrozenStorage(0);
this.saveOrUpdate(goodsStorage);
}
log.info("--------->xid=" + context.getXid() + " 回滚成功!");
return true;
}
}
3.2.2 TM端改造
针对我们⼯程lagou_bussiness是事务的发起者,所以是TM端,其它⼯程为RM端. 所以我们只需要在
lagou_common_db完成即可,因为lagou_bussiness⽅法⾥⾯没有对数据库操作.所以只需要将之前AT模式的代理数据源去掉即可.注意:如果lagou_bussiness也对数据库操作了.也需要完成
try/commit/rollback的实现.
代码实现:
4.Seata-Saga模式简介与三种模式对⽐
4.1 Saga模式简单介绍
Saga 模式是 Seata 开源的⻓事务解决⽅案,将由蚂蚁⾦服主要贡献。在 Saga 模式下,分布式事务内有多个参与者,每⼀个参与者都是⼀个冲正补偿服务,需要⽤户根据业务场景实现其正向操作和逆向回滚操作。
分布式事务执⾏过程中,依次执⾏各参与者的正向操作,如果所有正向操作均执⾏成功,那么分布式事务提交。如果任何⼀个正向操作执⾏失败,那么分布式事务会去退回去执⾏前⾯各参与者的逆向回滚操作,回滚已提交的参与者,使分布式事务回到初始状态。
**适⽤场景:
**
业务流程⻓、业务流程多
参与者包含第三⽅公司或遗留系统服务,⽆法提供 TCC 模式要求的三个接⼝
典型业务系统:如⾦融⽹络(与外部⾦融机构对接)、互联⽹微贷、渠道整合等业务系统
4.2 三种模式对⽐
**AT 模式是⽆侵⼊的分布式事务解决⽅案,适⽤于不希望对业务进⾏改造的场景,⼏乎0学习成本。
TCC 模式是⾼性能分布式事务解决⽅案,适⽤于核⼼系统等对性能有很⾼要求的场景。
**Saga 模式是⻓事务解决⽅案,适⽤于业务流程⻓且需要保证事务最终⼀致性的业务系统,Saga 模式⼀阶段就会提交本地事务,⽆锁,⻓流程情况下可以保证性能,多⽤于渠道层、集成层业务系统。事务参与者可能是其它公司的服务或者是遗留系统的服务,⽆法进⾏改造和提供 TCC 要求的接⼝,也可以使⽤Saga 模式。