SpringCloud Alibaba 实战 - 前京东金融架构师 - 拉勾教育

在前面的章节,我们分别讲解了 Spring Cloud Alibaba 中主要的组成部分,其中包括:注册中心与配置管理组件 Nacos、Ribbon 客户端负载均衡、OpenFeign 与 Dubbo 通信组件、Sentinel 服务限流与熔断保护组件、Sleuth+Zipkin 与 Skywalking 实现分布式追踪、Seata 分布式事务组件以及 RocketMQ 消息异步通信中间件,这些组件各司其职为微服务架构提供了有效的支撑。

从本章开始,我们综合运用这些组件,再结合我多年来分布式开发的经验,为你分享一些成熟的组合搭配与案例,让你在分布式开发这条路上少走些弯路。

本讲咱们进入第一个话题:利用 Seata 与 Nacos 构建分布式事务架构。在第 16 讲我们介绍了 Seata 的使用原理及解决方案,但并未涉及落地的开发技巧。今天我们补上这一块内容,我将手把手带你搭建可用的 Seata 分布式事务架构。

本讲涉及的内容较多,按搭建顺序将分为以下几个阶段:

  • 部署 Nacos 注册中心与配置中心;
  • 部署 TC 组件 Seata-Server;
  • 开发 RM 资源管理器;
  • 开发 TM 事务管理器;
  • 验证分布式事务。

下面我们一步步实现前面的 “商城销售积分” 应用案例。

18 | 综合应用:Seata 分布式事务实战教程 - 图1

案例示意图

首先咱们来看整体架构图:

18 | 综合应用:Seata 分布式事务实战教程 - 图2

基于 Seata 的分布式事务架构

我在 192.168.31.103 虚拟机构建了 5 个 MySQL 5.7 数据库实例,通过设置不同端口来模拟 5 台数据库服务器,它们的用途是:

  • 3309 端口数据库存储 Nacos 配置信息以及 Seata-Server 的分布式事务数据;
  • 3305 端口数据库存储 TM 端商城数据;
  • 3306 端口数据库存储 RM 端订单服务数据;
  • 3307 端口数据库存储 RM 端会员积分数据;
  • 3308 端口数据库存储 RM 端库存数据。

应用方面包含 6 个节点:

  • 192.168.31.103:8848 节点是 Nacos 注册中心与配置中心服务器,提供微服务架构核心支撑;
  • 192.168.31.107:8091 节点是 Seata-Server,也就是 TC 组件,用于协调全局事务的开启、提交与回滚;
  • 192.168.31.106:8001 节点是 TM,也就是商城应用,TM 用于定义事务边界与事务的范围;
  • 192.168.31.106:8002/8003/8004 则是具体的 RM 实例,分别对应订单、积分与库存服务。

其中所有 TM、RM、TC 实例在启动时都要向 Nacos 进行注册登记,以保证服务可以被发现。同时 TC(Seata-Server) 自身的配置信息也要托管在 Nacos 配置中心中,不再单独存储。所有 TM、RM 在启动时也要额外在 TC 中进行注册,以保证全局事务的完整性。

下面咱们开始第一个阶段:部署 Nacos 注册中心与配置中心。

部署 Nacos 注册中心与配置中心

部署 Nacos 注册中心与配置中心与前面课程内容并无二致,咱们快速完成即可。

第一步,下载 Nacos,上传到 192.168.31.103 节点解压缩。

  1. tar -xvf nacos-server-1.4.0.tar.gz

第二步,配置 conf/application.properties,增加数据库配置。

  1. ### Count of DB:
  2. db.num=1
  3. ### Connect URL of DB:
  4. db.url.0=jdbc:mysql://192.168.31.103:3309/nacos_config?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
  5. db.user=root
  6. db.password=root

第三步,创建 3309 端口的 nacos_config 数据库,执行 conf/nacos-mysql.sql,完成 Nacos 注册中心表创建。

18 | 综合应用:Seata 分布式事务实战教程 - 图3

第四步,创建 conf/cluster.conf 集群配置文件,配置一个单节点集群。

  1. #it is ip
  2. #example
  3. 192.168.31.10:8848

第五步,运行 startup 脚本,启动 Nacos。

  1. sh /usr/local/nacos/bin/startup.sh

至此,阶段一:Nacos 注册中心与配置中心部署完成,因为这些内容前面都讲过,所以不再赘述。
下面咱们开始阶段二:部署 TC 组件 Seata-Server。

从部署 TC 组件 Seata-Server 开始就是全新的内容,我将更细致地为你介绍每一步操作过程。

部署 TC 组件 Seata-Server

第一步,下载 Seata-Server。

在 192.168.31.107 设备上安装好 JDK 1.8,之后访问 Seata 的 GitHub,下载最新的 1.4.0 压缩包。

https://github.com/seata/seata/releases/download/v1.4.0/seata-server-1.4.0.tar.gz

解压后将 seata-server-1.4 上传到 192.168.31.107 节点的 /usr/local 目录下。

18 | 综合应用:Seata 分布式事务实战教程 - 图4

Seata-Server 目录结构

第二步,编辑 conf/registry.conf 文件,这个配置文件说明 Seata-Server 接入哪种注册中心与配置中心,官方提供的默认模板存在大量冗余配置,这里我提供接入 Nacos 最精简的配置内容以方便学习。下面是 Seata-Server 接入注册中心的配置信息。

  1. registry {
  2. # Seata-Server支持以下几种注册中心,这里改为nacos,默认是file文件形式不介入任何注册中心。
  3. # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  4. type = "nacos"
  5. # 负载均衡采用随机策略
  6. loadBalance = "RandomLoadBalance"
  7. loadBalanceVirtualNodes = 10
  8. # nacos注册中心接入配置
  9. nacos {
  10. # 应用名称
  11. application = "seata-server"
  12. #IP地址与端口
  13. serverAddr = "192.168.31.10:8848"
  14. # 分配应用组,采用默认值SEATA_GROUP即可
  15. group = "SEATA_GROUP"
  16. namespace = ""
  17. # 集群名称,采用默认值default即可
  18. cluster = "default"
  19. # Nacos接入用户名密码
  20. username = "nacos"
  21. password = "nacos"
  22. }
  23. }
  24. #Seata-Server接入配置中心
  25. config {
  26. # Seata-Server支持以下配置中心产品,这里设置为nacos,默认是file即文件形式保存配置内容。
  27. # file、nacos 、apollo、zk、consul、etcd3
  28. type = "nacos"
  29. # 设置Nacos的通信地址
  30. nacos {
  31. serverAddr = "192.168.31.10:8848"
  32. namespace = ""
  33. group = "SEATA_GROUP"
  34. username = "nacos"
  35. password = "nacos"
  36. }
  37. }

上面的配置文件你可以直接复制到自己工程中修改下 IP 端口即可。

第三步,在 Nacos 配置中心中初始化 Seata 配置。

Seata 官方也为我们提供了初始化配置脚本,请按我的步骤操作。

首先访问下面的地址查看 Seata 在 Nacos 中需要的设置项。

https://github.com/seata/seata/blob/1.4.0/script/config-center/config.txt

18 | 综合应用:Seata 分布式事务实战教程 - 图5

Seata-Server 配置项

将 GitHub 页面中 80 行文本内容复制后,在 /usr/local/seata-server-1.4.0 目录下创建 config.txt 文件,将 80 行文本粘贴其中。

18 | 综合应用:Seata 分布式事务实战教程 - 图6

config.txt 文件路径

在内容粘贴到 config.txt 后,有两个地方需要修改:

  • 34 行 store.mode=file 改为 store.mode=db 代表采用数据库存储 Seata-Server 的全局事务数据
  • 44~46 行配置 Seata-Server 的全局事务数据库,数据库 URL 指向 107 节点 3309 端口。这个全局事务数据库是 Seata 维护分布式事务的关键所在,后面咱们马上就要创建这个数据库。
  1. store.db.url=jdbc:mysql:
  2. store.db.user=root
  3. store.db.password=root

config.txt 保存后,还需要访问下面地址下载 nacos-config.sh 运行脚本。
https://github.com/seata/seata/blob/1.4.0/script/config-center/nacos/nacos-config.sh

18 | 综合应用:Seata 分布式事务实战教程 - 图7

nacos-config.sh

这个脚本用来读取前面的 config.txt 并将配置项载入 Nacos 配置中心。将页面中 101 行文本复制,然后在 /usr/local/seata-server-1.4.0 目录下创建 script 子目录,在 scirpt 子目录下创建 nacos-config.sh 文件,并将 101 行文本保存其中。

18 | 综合应用:Seata 分布式事务实战教程 - 图8

nacos-config.sh 文件路径

保存后执行下面命令运行导入脚本。

  1. sh nacos-config.sh -h 192.168.31.10

这条命令有两个参数,h 选项指向 Nacos 的 IP,执行后你会看到如下日志。

  1. ...
  2. Set metrics.registryType=compact successfully
  3. Set metrics.exporterList=prometheus successfully
  4. Set metrics.exporterPrometheusPort=9898 successfully
  5. =========================================================================
  6. Complete initialization parameters, total-count:80 , failure-count:0
  7. =========================================================================
  8. Init nacos config finished, please start seata-server.

80 个配置选项导入成功后,我们便可在 Nacos 的配置中心页面看到它们,访问 Nacos 后台http://192.168.31.10:8848/nacos
你会看到大量 SEATA_GROUP 分组的配置,这些配置信息在 Seata-Server 启动时都会自动读取。

18 | 综合应用:Seata 分布式事务实战教程 - 图9

Seata-Server 配置信息

第四步,创建并初始化 Seata-Server 全局事务数据库。

访问下面网址

https://github.com/seata/seata/blob/1.4.0/script/server/db/mysql.sql,下载 SQL 脚本。在 3309 端口 MySQL 创建新的数据库 seata,执行 SQL 脚本创建全局事务表。

18 | 综合应用:Seata 分布式事务实战教程 - 图10

Seata 全局事务数据库

脚本执行后会创建 3 张表,我们了解下即可。

  • global_table 保存全局事务数据;
  • branch_table 保存分支事务数据;
  • lock_table 保存锁定资源数据。

第五步,启动 seata-server。

seata-server 启动只需要执行 bin/seata-server.sh。

启动后,看到下面 Server started 代表启动成功。

  1. 10:52:38.254 INFO --- [ main] com.alibaba.druid.pool.DruidDataSource : {dataSource-1} inited
  2. 10:52:38.501 INFO --- [ main] i.s.core.rpc.netty.NettyServerBootstrap : Server started, listen port: 8091

这里有个细节,如果启动过程中提示数据库无法访问,说明 IP、端口配置有问题,可以通过 Nacos 配置中心设置 store.db.url 选项,而不是重新导入 config.txt。

18 | 综合应用:Seata 分布式事务实战教程 - 图11

修改 Seata 连接字符串

到这里,阶段二:部署 TC 组件 Seata-Server 已经完成。后面的事情就是基于 TC 组件 Seata-Server 实现分布式事务。

开发 RM 资源管理器

在 Seata 中 RM 资源管理器代表处理具体业务的模块,例如:订单服务创建订单、会员服务增加积分、库存服务减少库存都是 RM 资源管理器,下面我们来开发订单服务、会员服务与库存服务。

订单服务 rm-order

这里开发框架采用 Spring Boot + JPA(Hibernate)+ Druid 实现。

第一步,创建 seata-order 数据库与 undo_log 表。

在 103 节点的 3306 数据库上,创建 seata-order 数据库,执行下面的 SQL 初始化数据库。

  1. SET NAMES utf8mb4;
  2. SET FOREIGN_KEY_CHECKS = 0;
  3. DROP TABLE IF EXISTS `order`;
  4. CREATE TABLE `order` (
  5. `order_id` int(255) NOT NULL AUTO_INCREMENT COMMENT '订单编号',
  6. `goods_id` int(32) NOT NULL COMMENT '商品编号',
  7. `member_id` int(32) NOT NULL COMMENT '会员编号',
  8. `quantity` int(255) NOT NULL COMMENT '购买数量',
  9. `points` int(255) NOT NULL COMMENT '增加会员积分',
  10. PRIMARY KEY (`order_id`) USING BTREE
  11. ) ENGINE = InnoDB AUTO_INCREMENT = 51 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
  12. DROP TABLE IF EXISTS `undo_log`;
  13. CREATE TABLE `undo_log` (
  14. `id` bigint(20) NOT NULL AUTO_INCREMENT,
  15. `branch_id` bigint(20) NOT NULL,
  16. `xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  17. `context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  18. `rollback_info` longblob NOT NULL,
  19. `log_status` int(11) NOT NULL,
  20. `log_created` datetime(0) NOT NULL,
  21. `log_modified` datetime(0) NOT NULL,
  22. `ext` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  23. PRIMARY KEY (`id`) USING BTREE,
  24. UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
  25. ) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
  26. SET FOREIGN_KEY_CHECKS = 1;

数据库包含两张表,一张是订单业务表 order,字段含义已在脚本中注释。而另一张 undo_log 回滚日志表。undo_log 是 Seata 强制要求在每个 RM 端数据库创建的表,用于存储反向 SQL 的元数据。undo_log 表的脚本可以从 Seata GitHub 官方获取,然后在自己的业务库中执行。

https://github.com/seata/seata/blob/1.4.0/script/client/at/db/mysql.sql

到这里 RM 数据库创建完毕。

18 | 综合应用:Seata 分布式事务实战教程 - 图12

seata-order 数据库

第二步,利用 Spring Initializr 向导创建 rm-order 工程,确保 pom.xml 引入以下依赖。

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-data-jpa</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-web</artifactId>
  8. </dependency>
  9. <dependency>
  10. <groupId>com.alibaba.cloud</groupId>
  11. <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
  12. </dependency>
  13. <dependency>
  14. <groupId>com.alibaba.cloud</groupId>
  15. <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
  16. <exclusions>
  17. <exclusion>
  18. <groupId>io.seata</groupId>
  19. <artifactId>seata-all</artifactId>
  20. </exclusion>
  21. <exclusion>
  22. <groupId>io.seata</groupId>
  23. <artifactId>seata-spring-boot-starter</artifactId>
  24. </exclusion>
  25. </exclusions>
  26. </dependency>
  27. <dependency>
  28. <groupId>io.seata</groupId>
  29. <artifactId>seata-all</artifactId>
  30. <version>1.4.0</version>
  31. </dependency>
  32. <dependency>
  33. <groupId>io.seata</groupId>
  34. <artifactId>seata-spring-boot-starter</artifactId>
  35. <version>1.4.0</version>
  36. </dependency>
  37. <dependency>
  38. <groupId>mysql</groupId>
  39. <artifactId>mysql-connector-java</artifactId>
  40. <scope>runtime</scope>
  41. </dependency>

这份依赖有个注意事项,默认 starter-alibaba-seata 依赖内置的是旧版 1.3.0 的 Seata 客户端,因此要排除,在后面引入最新的 1.4.0,保证客户端与 Seate-Server 版本一致。

第三步,配置 application.yml。

application.yml 额外配置了事务分组与 Nacos 的信息,我已在配置文件中进行了注释说明。

  1. seata:
  2. enabled: true
  3. tx-service-group: my_test_tx_group
  4. enable-auto-data-source-proxy: true
  5. service:
  6. vgroup-mapping:
  7. my_test_tx_group: default
  8. grouplist:
  9. default: 192.168.31.107:8091
  10. enable-degrade: false
  11. disable-global-transaction: false
  12. config:
  13. type: nacos
  14. nacos:
  15. namespace:
  16. serverAddr: 192.168.31.10:8848
  17. group: SEATA_GROUP
  18. username: nacos
  19. password: nacos
  20. cluster: default
  21. registry:
  22. type: nacos
  23. nacos:
  24. application: seata-server
  25. server-addr: 192.168.31.10:8848
  26. group : SEATA_GROUP
  27. namespace:
  28. username: nacos
  29. password: nacos
  30. cluster: default
  31. spring:
  32. application:
  33. name: rm-order
  34. datasource:
  35. driver-class-name: com.mysql.cj.jdbc.Driver
  36. url: jdbc:mysql://192.168.31.103:3306/seata-order
  37. username: root
  38. password: root
  39. cloud:
  40. nacos:
  41. discovery:
  42. username: nacos
  43. password: nacos
  44. server-addr: 192.168.31.10:8848
  45. jpa:
  46. show-sql: true
  47. server:
  48. port: 8002
  49. logging:
  50. level:
  51. io:
  52. seata: debug

第四步,像单机应用一样开发数据库 CRUD,代码很简单,我已给出注释。

Order 实体类,让属性与字段进行映射。

  1. @Entity
  2. @Table(name = "`order`")
  3. public class Order {
  4. @Id
  5. @Column(name = "order_id")
  6. private Integer id;
  7. private Integer memberId;
  8. @Column(name = "goods_id")
  9. private Integer goodsId;
  10. private Integer points;
  11. private Integer quantity;
  12. public Order() {
  13. }
  14. public Order(Integer id, Integer memberId, Integer goodsId, Integer points, Integer quantity) {
  15. this.id = id;
  16. this.memberId = memberId;
  17. this.points = points;
  18. this.goodsId = goodsId;
  19. this.quantity = quantity;
  20. }
  21. }

OrderRepository 接口用于声明 CRUD 操作。

  1. public interface OrderRepository extends JpaRepository<Order,Integer> {
  2. }

OrderService 的 createOrder 方法实现创建订单的业务逻辑,注意在 createOrder 方法上必须增加 @Transactional 注解,Seata 客户端对这个注解进行扩展支持了分布式事务。

  1. @Service
  2. public class OrderService {
  3. @Resource
  4. private OrderRepository orderRepository;
  5. @Transactional
  6. public Order createOrder(Integer orderId,Integer memberId,Integer goodsId,Integer points,Integer quantity){
  7. return orderRepository.save(new Order(orderId, memberId,goodsId,points,quantity));
  8. }
  9. }

OrderController 的 createOrder 方法用于对外暴露 RESTful API,等待被 TM 调用。

  1. @RestController
  2. public class OrderController {
  3. @Resource
  4. private OrderService orderService;
  5. @GetMapping("/create_order")
  6. public String createOrder(Integer orderId,Integer memberId,Integer goodsId,Integer points,Integer quantity) throws JsonProcessingException {
  7. Map result = new HashMap<>();
  8. Order order = orderService.createOrder(orderId,memberId,goodsId,points,quantity);
  9. result.put("code", "0");
  10. result.put("message", "create order success");
  11. return new ObjectMapper().writeValueAsString(result);
  12. }
  13. }

第五步,最重要的一步,要让 Seata 客户端在处理事务时自动生成反向 SQL,必须额外配置 DataSourceProxy 数据源代理类,DataSourceProxy 是 Seata 提供的 DataSource 代理类,在分布式事务执行过程中,用于自动生成 undo_log 回滚数据,以及自动完成 RM 端分布式事务的提交或回滚操作。

在 Spring Boot 中利用 Java Config 方式对 DataSourceProxy 进行配置。

  1. package com.lagou.rmorder.datasource;
  2. import com.alibaba.druid.pool.DruidDataSource;
  3. import io.seata.rm.datasource.DataSourceProxy;
  4. import org.springframework.boot.context.properties.ConfigurationProperties;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.context.annotation.Primary;
  8. @Configuration
  9. public class DataSourceProxyConfig {
  10. @Bean
  11. @ConfigurationProperties(prefix = "spring.datasource")
  12. public DruidDataSource druidDataSource() {
  13. return new DruidDataSource();
  14. }
  15. @Primary
  16. @Bean
  17. public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
  18. return new DataSourceProxy(druidDataSource);
  19. }
  20. }

最后,启动 rm-order,在浏览器地址栏访问 create_order 接口,看到 create order success,在数据库中也出现对应记录。
http://192.168.31.106:8002/create_order?orderId=6&memberId=1&goodsId=2&points=20&quantity=200

  1. {
  2. code: "0",
  3. message: "create order success"
  4. }

18 | 综合应用:Seata 分布式事务实战教程 - 图13

订单数据

以上是 rm-order 订单服务的开发过程,除了配置 Seata 选项与创建 DataSourceProxy 外,其他的开发要素与单机应用几乎是一样的,这使得新人也可以很快上手。

下面咱们如法炮制开发 rm-points 积分服务与 rm-storage 库存服务。

积分服务 rm-points

因为积分服务的开发过程与订单服务几乎是一致的,我们只需给出关键代码。

第一步,在 3307 端口数据库创建 seata-points 数据库,包含 points 会员积分表与 undo_log 表。

  1. DROP TABLE IF EXISTS `points`;
  2. CREATE TABLE `points` (
  3. `member_id` int(255) NOT NULL COMMENT '会员编号',
  4. `points` int(255) NOT NULL COMMENT '积分数量',
  5. PRIMARY KEY (`member_id`) USING BTREE
  6. ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
  7. INSERT INTO `points` VALUES (1, 100);
  8. INSERT INTO `points` VALUES (2, 100);
  9. INSERT INTO `points` VALUES (3, 100);
  10. INSERT INTO `points` VALUES (4, 100);
  11. DROP TABLE IF EXISTS `undo_log`;
  12. CREATE TABLE `undo_log` (
  13. `id` bigint(20) NOT NULL AUTO_INCREMENT,
  14. `branch_id` bigint(20) NOT NULL,
  15. `xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  16. `context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  17. `rollback_info` longblob NOT NULL,
  18. `log_status` int(11) NOT NULL,
  19. `log_created` datetime(0) NOT NULL,
  20. `log_modified` datetime(0) NOT NULL,
  21. `ext` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  22. PRIMARY KEY (`id`) USING BTREE,
  23. UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
  24. ) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
  25. SET FOREIGN_KEY_CHECKS = 1;

第二步,创建 rm-points 积分服务,pom.xml 与 rm-order 依赖完全相同。

第三步,配置 application.yml,只有应用名、端口与数据库 URL 不同,其他与 rm-order 完全相同,这里就不再赘述。

  1. seata:
  2. ...
  3. spring:
  4. application:
  5. name: rm-points
  6. datasource:
  7. driver-class-name: com.mysql.cj.jdbc.Driver
  8. url: jdbc:mysql://192.168.31.103:3307/seata-points
  9. username: root
  10. password: root
  11. ...
  12. server:
  13. port: 8003
  14. ...

第四步:实现新增积分代码,基础的实体类与 Repository 不做赘述,我们关注 PointsService 与 PointsController。

PointsService.addPoints 方法实现会员积分增加业务,同样需要 @Transactional 注解。

  1. @Service
  2. public class PointsService {
  3. @Resource
  4. private PointsRepository orderRepository;
  5. @Transactional
  6. public Points addPoints(Integer memberId,Integer points){
  7. Points entity = orderRepository.findById(memberId).get();
  8. entity.setPoints( entity.getPoints() + points);
  9. return orderRepository.save(new Points(memberId,entity.getPoints()));
  10. }
  11. }

PointsController 对外暴露 add_points 接口调用 service 类实现业务。

  1. @RestController
  2. public class PointsController {
  3. @Resource
  4. private PointsService pointsService;
  5. @GetMapping("/add_points")
  6. public String addPoints(Integer memberId,Integer points) throws JsonProcessingException {
  7. Map result = new HashMap<>();
  8. Points entity = pointsService.addPoints(memberId, points);
  9. result.put("code", "0");
  10. result.put("message", "add points success");
  11. return new ObjectMapper().writeValueAsString(result);
  12. }
  13. }

第五步,最后不要忘记设置 DataSourceProxyConfig,代码与 rm-order 完全相同。

代码书写完毕,启动应用访问地址http://192.168.31.106:8003/add_points?memberId=1&points=20

可以得到 “add points success” 结果。

  1. {
  2. code: "0",
  3. message: "add points success"
  4. }

库存服务 rm-storage

RM 库存服务也是遵循相同套路开发。

第一步,创建数据库 seata-storage,执行建表脚本,包含库存表 storage 与回滚表 undo_log。

  1. SET NAMES utf8mb4;
  2. SET FOREIGN_KEY_CHECKS = 0;
  3. DROP TABLE IF EXISTS `storage`;
  4. CREATE TABLE `storage` (
  5. `goods_id` int(255) NOT NULL,
  6. `quantity` int(255) NOT NULL,
  7. PRIMARY KEY (`goods_id`) USING BTREE
  8. ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
  9. INSERT INTO `storage` VALUES (1, 100);
  10. INSERT INTO `storage` VALUES (2, 98);
  11. INSERT INTO `storage` VALUES (3, 100);
  12. INSERT INTO `storage` VALUES (4, 100);
  13. DROP TABLE IF EXISTS `undo_log`;
  14. CREATE TABLE `undo_log` (
  15. `id` bigint(20) NOT NULL AUTO_INCREMENT,
  16. `branch_id` bigint(20) NOT NULL,
  17. `xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  18. `context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  19. `rollback_info` longblob NOT NULL,
  20. `log_status` int(11) NOT NULL,
  21. `log_created` datetime(0) NOT NULL,
  22. `log_modified` datetime(0) NOT NULL,
  23. `ext` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  24. PRIMARY KEY (`id`) USING BTREE,
  25. UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
  26. ) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
  27. SET FOREIGN_KEY_CHECKS = 1;

第二步,创建 rm-storage 库存服务,pom.xml 与 rm-order 依赖完全相同。

第三步,配置 application.yml,只有应用名、端口与数据库 URL 不同,其他与 rm-order 完全相同。

  1. seata:
  2. ...
  3. spring:
  4. application:
  5. name: rm-storage
  6. datasource:
  7. driver-class-name: com.mysql.cj.jdbc.Driver
  8. url: jdbc:mysql://192.168.31.103:3308/seata-storage
  9. username: root
  10. password: root
  11. ...
  12. server:
  13. port: 8004
  14. ...

第四步,创建 StorageService 与 StorageController,实现减少库存业务。

StorageService.reduceStorage 方法实现商品库存减少,如果库存不足则抛出 IllegalStateException 异常。

  1. @Service
  2. public class StorageService {
  3. @Resource
  4. private StorageRepository storageRepository;
  5. @Transactional
  6. public Storage reduceStorage(Integer goodsId, Integer quantity){
  7. Storage storage = storageRepository.findById(goodsId).get();
  8. if(storage.getQuantity() < quantity){
  9. throw new IllegalStateException(goodsId + "商品库存不足");
  10. }
  11. storage.setQuantity(storage.getQuantity() - quantity);
  12. return storageRepository.save(storage);
  13. }
  14. }

StorageController 对外暴露 reduce_storage 接口,实现减少库存业务。

  1. @RestController
  2. public class StorageController {
  3. @Resource
  4. private StorageService storageService;
  5. @GetMapping("/reduce_storage")
  6. public String reduceStorage(Integer goodsId,Integer quantity) throws JsonProcessingException {
  7. Map result = new HashMap<>();
  8. Storage storage = storageService.reduceStorage(goodsId, quantity);
  9. result.put("code", "0");
  10. result.put("message", "reduce storage success");
  11. return new ObjectMapper().writeValueAsString(result);
  12. }
  13. }

第五步,DataSourceProxyConfig 代码与 rm-order 完全相同不再赘述。

启动应用,访问地址

http://localhost:8004/reduce_storage?goodsId=2&quantity=10

得到结果 ”reduce storage success”。

  1. {
  2. code: "0",
  3. message: "reduce storage success"
  4. }

到这里我们将订单、积分、库存三个 RM 示例都已开发完毕,这些服务都是独立运行,并没有形成整体,最后咱们还要开发商城应用 tm-mall,tm-mall 作为 TM 将服务串行调用,并形成分布式事务整体。

开发 TM 事务管理器

商城应用 tm-mall

第一步,3305 端口创建 seata-mall 数据库,因为真实的商城应用本身也需要往本地库写入数据,TM 本身也是一个 RM,因此在商城库中也要创建 undo_log 表。

18 | 综合应用:Seata 分布式事务实战教程 - 图14

tm-mall 数据库

第二步,创建 tm-mall 应用,框架同样采用 SpringBoot + JPA + druid,除了依赖 Seata 外,还需要额外引入 OpenFeign 实现微服务的远程调用。

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-data-jpa</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-web</artifactId>
  8. </dependency>
  9. <dependency>
  10. <groupId>com.alibaba.cloud</groupId>
  11. <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
  12. </dependency>
  13. <dependency>
  14. <groupId>com.alibaba.cloud</groupId>
  15. <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
  16. <exclusions>
  17. <exclusion>
  18. <groupId>io.seata</groupId>
  19. <artifactId>seata-all</artifactId>
  20. </exclusion>
  21. <exclusion>
  22. <groupId>io.seata</groupId>
  23. <artifactId>seata-spring-boot-starter</artifactId>
  24. </exclusion>
  25. </exclusions>
  26. </dependency>
  27. <dependency>
  28. <groupId>io.seata</groupId>
  29. <artifactId>seata-all</artifactId>
  30. <version>1.4.0</version>
  31. </dependency>
  32. <dependency>
  33. <groupId>io.seata</groupId>
  34. <artifactId>seata-spring-boot-starter</artifactId>
  35. <version>1.4.0</version>
  36. </dependency>
  37. <dependency>
  38. <groupId>mysql</groupId>
  39. <artifactId>mysql-connector-java</artifactId>
  40. <scope>runtime</scope>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.springframework.cloud</groupId>
  44. <artifactId>spring-cloud-starter-openfeign</artifactId>
  45. <version>${spring-cloud-alibaba.version}</version>
  46. </dependency>

第三步,配置 application.yml,只有应用名、端口与数据库 URL 不同,其他与 rm-order 完全相同。

  1. seata:
  2. ...
  3. spring:
  4. application:
  5. name: tm-mall
  6. datasource:
  7. driver-class-name: com.mysql.cj.jdbc.Driver
  8. url: jdbc:mysql://192.168.31.103:3305/seata-mall
  9. username: root
  10. password: root
  11. ...
  12. server:
  13. port: 8001
  14. ...

第四步,在应用入口增加 @EnableFeignClients 注解,开启 OpenFeign 远程调用功能。

  1. @SpringBootApplication
  2. @EnableFeignClients
  3. public class TmMallApplication {
  4. public static void main(String[] args) {
  5. SpringApplication.run(TmMallApplication.class, args);
  6. }
  7. }

第五步,开发三个 RM 的 OpenFeign 客户端。
OrderFeignClient 是 rm-order 服务的 OpenFeign 客户端。

  1. @FeignClient("rm-order")
  2. public interface OrderFeignClient {
  3. @GetMapping("/create_order")
  4. public String createOrder(@RequestParam("orderId") Integer orderId,
  5. @RequestParam("memberId") Integer memberId,
  6. @RequestParam("goodsId") Integer goodsId,
  7. @RequestParam("points") Integer points,
  8. @RequestParam("quantity") Integer quantity
  9. );
  10. }

PointsFeignClient 是 rm-points 服务的 OpenFeign 客户端。

  1. @FeignClient("rm-points")
  2. public interface PointsFeignClient {
  3. @GetMapping("/add_points")
  4. public String addPoints(@RequestParam("memberId") Integer memberId, @RequestParam("points") Integer points);
  5. }

StorageFeignClient 是 rm-storage 服务的 OpenFeign 客户端。

  1. @FeignClient("rm-storage")
  2. public interface StorageFeignClient {
  3. @GetMapping("/reduce_storage")
  4. public String reduceStorage(@RequestParam("goodsId") Integer goodsId, @RequestParam("quantity") Integer quantity);
  5. }

第六步,开发 MallService,定义全局事务范围。这里最重要的是 @GlobalTransactional 注解,该注解是全局事务注解,当进入 MallService.sale 方法时通知 TC 开启全局事务,sale 方法执行成功自动通知 TC 进行全局提交;sale 方法抛出异常时自动通知 TC 进行全局回滚。

  1. @Service
  2. public class MallService {
  3. @Resource
  4. OrderFeignClient orderFeignClient;
  5. @Resource
  6. PointsFeignClient pointsFeignClient;
  7. @Resource
  8. StorageFeignClient storageFeignClient;
  9. @GlobalTransactional(name = "seata-group-tx-mall", rollbackFor = {Exception.class})
  10. public String sale(Integer orderId,Integer memberId,Integer goodsId,Integer points,Integer quantity) {
  11. String orderResult = orderFeignClient.createOrder(orderId,memberId,goodsId,points,quantity);
  12. String pointsResult = pointsFeignClient.addPoints(memberId, points);
  13. String storageResult = storageFeignClient.reduceStorage(goodsId, quantity);
  14. return orderResult + " / " + pointsResult + " / " + storageResult;
  15. }
  16. }

第七步,开发 MallController 对外暴露 sale 接口提供调用。

  1. @RestController
  2. public class MallController {
  3. @Resource
  4. private MallService mallService;
  5. @GetMapping("/sale")
  6. public String sale(Integer orderId,Integer memberId,Integer goodsId,Integer points,Integer quantity){
  7. return mallService.sale(orderId,memberId,goodsId,points,quantity);
  8. }
  9. }

第八步,最后不要忘记配置 DataSourceProxyConfig,这是所有 TM 与 RM 都要设置的。

  1. @Configuration
  2. public class DataSourceProxyConfig {
  3. @Bean
  4. @ConfigurationProperties(prefix = "spring.datasource")
  5. public DruidDataSource druidDataSource() {
  6. return new DruidDataSource();
  7. }
  8. @Primary
  9. @Bean
  10. public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
  11. return new DataSourceProxy(druidDataSource);
  12. }
  13. }

到这里所有 TM 与 RM 都已开发完毕,下面咱们来验证分布式事务的执行效果。

验证分布式事务

将 Nacos、TC、TM、3 个 RM 都启动,之后访问 tm-mall 的 sale 接口。

http://localhost:8001/sale?orderId=6&memberId=1&goodsId=2&points=20&quantity=20

  1. {"code":"0","message":"create order success"}
  2. / {"code":"0","message":"add points success"}
  3. / {"code":"0","message":"reduce storage success"}

从结果中可以看到三个服务调用都已成功,我们从控制台看一下具体过程。

从日志中可以发现 TM 端负责开启全局事务,执行成功后通知 TC 全局事务提交。

  1. ## TM端日志
  2. # 启动全局事务
  3. i.seata.tm.api.DefaultGlobalTransaction : Begin new global transaction [192.168.31.107:8091:100622589646344192]
  4. ...
  5. # 全局事务已提交
  6. i.seata.tm.api.DefaultGlobalTransaction : [192.168.31.107:8091:100622589646344192] commit status: Committed

而 RM 端则负责两件事,成功提交本地的分支事务与删除 undo_log 回滚日志。

  1. ## RM日志
  2. # 分支事务已提交
  3. i.s.c.r.p.c.RmBranchCommitProcessor : branch commit result:xid=192.168.31.107:8091:100622589646344192,branchId=100622590170632192,branchStatus=PhaseTwo_Committed,result code =Success,getMsg =null
  4. # 清空undo_log表
  5. i.s.r.d.undo.mysql.MySQLUndoLogManager : batch delete undo log size 1

分析了提交过程,下面咱们再进行异常验证,将 quantity 设置为 200,这必将超出库存报错,看能否全局回滚。
http://localhost:8001/sale?orderId=6&memberId=1&goodsId=2&points=20&quantity=200

运行后程序报异常。

  1. java.lang.IllegalStateException: 商品库存不足。

观察发现 TM 向 TC 发起全局回滚通知。

  1. i.seata.tm.api.DefaultGlobalTransaction : Begin new global transaction [192.168.31.107:8091:100626590567763968]
  2. i.s.c.rpc.netty.AbstractNettyRemoting : io.seata.core.rpc.netty.TmNettyRemotingClient@2e81af7d msgId:1726, body:globalStatus=Rollbacked,ResultCode=Success,Msg=null

TC 向 RM 下达分支事务回滚通知,RM 收到通知做两件事:第一,根据 undo_log 表生成的反向 SQL,将之前写入的数据撤销;第二,删除 undo_log 数据。这两步操作保证了 RM 端数据能撤销回之前的状态。

  1. i.s.r.d.undo.AbstractUndoLogManager : Flushing UNDO LOG: {"@class":"io.seata.rm.datasource.undo.BranchUndoLog","xid":"192.168.31.107:8091:100626590567763968","branchId":100626590894919681...
  2. io.seata.rm.AbstractRMHandler : Branch Rollbacking: 192.168.31.107:8091:100626590567763968 100626590894919681 jdbc:mysql:
  3. i.s.r.d.undo.AbstractUndoLogManager : xid 192.168.31.107:8091:100626590567763968 branch 100626590894919681, undo_log deleted with GlobalFinished

到这里搭建 Seata 分布式事务架构的内容全部完成,最后咱们来做一下总结。

小结与预告

本讲是一套 Spring Cloud Alibaba 微服务架构的综合运用,我们从零开始一步步搭建了一套分布式事务的基础架构,也开发了实际的案例,通过程序运行结果让我们也理解了 Seata 的执行过程。这套框架的难点在于如何将 Seata-Server 与 Nacos 配置中心整合起来,因 Seata-Server 迭代很快,文档严重滞后,在搭建过程中又遇到了很多 BUG,没办法只能通过分析源码一点点排查,在这个过程中自己也对 Seata 有了更深刻的理解,希望你也能按我的操作过程完成分布式事务架构的搭建。

在这我给你留一道动手题:当前架构 Nacos 与 Seata-Server 都是单点的,不具备高可用特性,请你搭建对应的高可用集群。

下一讲,咱们将学习在微服务架构下如何设计多级缓存来提高应用性能。