1 分布式事务基础

1.1 事务

  • 事务指的是一个操作单元,在这个操作单元中的所有操作最终要保持一致的行为,要么所有操作全部成功,要么所有操作全部被撤销。简而言之,事务提供一种“要么什么都不做,要么做全套”的机制。

1.2 本地事务

  • 本地事务其实可以认为是数据库提供的事务机制,数据库事务中的四大特性:
    • 分布式事务Seata - 图1A:原子性(Atomicity),一个事务中的所有操作,要么全部成功,要么全部失败。
    • 分布式事务Seata - 图2C:一致性(Consistency),在一个事务执行之前和执行之后数据库都必须处于一致性的状态。
    • 分布式事务Seata - 图3I:隔离性(Isolation),在并发环境中,当不同的事务同时操作相同的数据时,事务之间互不影响。
    • 分布式事务Seata - 图4D:持久性(Durability),指的是只要事务成功结束,它对数据库所做的更新就必须永久的保存下来。
  • 数据库事务在实现的时候会将一次事务涉及到的所有操作全部纳入到一个不可分割的执行单元,该执行单元中的所有操作要么全部成功,要么全部事变,只要其中任一操作执行失败,都将导致整个事务的回滚。

1.3 分布式事务

  • 分布式事务指的是事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。
  • 简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务器上,且属于不同的应用,分布式事务需要保证这些小操作要么全部成功,要么全部失败。
  • 本质上说,分布式事务就是为了保证不同数据库的数据一致性。

1.4 分布式事务的场景

1.4.1 单体系统访问多个数据库

  • 一个服务需要调用多个数据库实例完成数据的增删改操作。

单体系统访问多个数据库.png

1.4.2 多个微服务访问同一个数据库

  • 多个微服务调用一个数据库实例完成数据的增删改操作。

多个微服务访问同一个数据库实例.png

1.4.3 多个微服务访问多个数据库

  • 多个微服务访问多个数据库实例完成数据的增删改操作。

多个微服务访问多个数据库.png

2 分布式事务解决方案

2.1 全局事务(DTP模型)

  • 全局事务基于DTP模式实现,DTP是由X/Open组织提出的一种分布式事务模型(X/Open Distributed Transaction Processing Reference Model)。它规定了要实现分布式事务,需要三种角色:
    • AP:Application应用系统。
    • TM:Transaction Manager事务管理器。
    • RM:Resource Manager资源管理器。
  • 整个事务分成两个阶段:
    • 阶段一:表决阶段,所有参与者都将本事务执行预提交,并将能够成功的消息返回给协调者。
    • 阶段二:执行阶段,协调者根据所有参与者的反馈,通知所有参与者,协调一致的执行提交或回滚。

全局事务.png

  • 优点:提高了数据一致性的概率,实现成本较低。
  • 缺点:
    • 单点问题:事务协调者宕机。
    • 同步阻塞:延迟了提交时间,加长了资源阻塞时间。
    • 数据不一致:提交第二阶段,依然存在commit结果未知的情况,有可能导致数据不一致。

2.2 可靠消息服务

  • 基于可靠消息服务的方案是通过消息中间件保证上、下游应用数据操作的一致性。假设有A和B两个系统,分别可以处理任务A和任务B,此时存在一个业务流程,需要将任务A和任务B在同一个事务中处理,就可以使用消息中间件来实现这种分布式事务。

可靠消息服务.png

  • 第一步:消息由系统A投递到消息中间件。

    • 分布式事务Seata - 图10在系统A处理任务A之前,首先向消息中间件发送一条消息。
    • 分布式事务Seata - 图11消息中间件收到后将该条消息持久化,但不投递,持久化成功后,向A回复一个确认应答。
    • 分布式事务Seata - 图12系统A收到确认应答后,则可以开始处理任务A。
    • 分布式事务Seata - 图13任务A处理完成后,向消息中间件发送commit或者rollback的请求。该请求发送完成后,对系统A而言,该事务的处理过程就结束了。
    • 分布式事务Seata - 图14如果消息中间件收到commit,则向B系统投递消息;如果收到rollback,则直接丢失消息。但是,如果消息中间件收不到commit或rollback的指令,那么就要依靠“超时询问机制”。

      超时询问机制:

      • 系统A除了实现正常的业务流程外,还需要提供一个事务询问的接口,供消息中间件调用。当消息中间件收到发布消息便开始计时,如果到了超时没收到确认指令,就会主动调用系统A提供的事务询问接口询问该系统目前的状态。
      • 该接口会返回三种结果,中间件根据这三种结果做出不同的反应:
        • 提交:将该消息投递给系统B。
        • 回滚:直接将该消息丢弃。
        • 处理中:继续等待。
  • 第二步:消息中间件投递到系统B。

    • 消息中间件向下游系统投递完消息后便进入阻塞等待的状态,下游系统便立即进行任务的处理,任务处理完成后便向消息中间件返回应答。
      • 如果消息中间件收到确认应答后便认为该事务处理完毕。
      • 如果消息中间件在等待确认应答超时之后就会重新投递,直到下游消费者返回消费成功响应为止。

        一般消息中间件可以设置消息重试的次数和时间间隔,如果最终还是不能成功投递,则需要手动干预。之所以使用人工干预,而不是使用让A系统回滚,主要是考虑到整个系统设计的复杂度问题。

  • 基于可靠消息服务的分布式事务,前半部分使用异步,注意性能;后半部分使用同步,注重开发成本。

2.3 最大努力通知

  • 最大努力通知也被称为定期校对,其实是对第二种解决方案的进一步优化。它引入了本地消息表来记录错误消息,然后加入失败消息的定期校对功能,来进一步保证消息会被下游系统消费。

最大努力通知.png

  • 第一步:消息由系统A投递到消息中间件。
    • 分布式事务Seata - 图16处理业务的同一事务中,向本地消息表中写入一条记录。
    • 分布式事务Seata - 图17准备专门的消息发送者不断的发送本地消息表中的数据到消息中间件,如果发送失败则重试。
  • 第二步:消息由消息中间件投递到系统B。
    • 分布式事务Seata - 图18消息中间件收到消息后负责将该消息同步投递给相应的下游系统,并触发下游系统的任务执行。
    • 分布式事务Seata - 图19当下游系统处理成功后,向消息中间件反馈确认应答,消息中间件便可以将该条消息删除,从而该事务完成。
    • 分布式事务Seata - 图20对于投递失败的消息,利用重试机制进行重试,对于重试失败的,写入错误消息表。
    • 分布式事务Seata - 图21消息中间件需要提供失败消息的查询接口,下游系统会定期查询失败消息,并将其消费。
  • 优点:一种非常经典的实现,实现了最终一致性。
  • 缺点:消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。

2.4 TCC事务

  • TCC即为Try Confirm Cancel,它属于补偿型分布式事务。TCC实现分布式事务一共有三个步骤:
    • Try:尝试待执行的业务:这个过程并未执行业务,只是完成所有业务的一致性检查,并预留好执行所需的全部资源 。
    • Confirm:确认执行业务:确认执行业务操作,不做任何业务检查, 只使用Try阶段预留的业务资源。通常情况下,采用TCC 则认为 Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。若Confirm阶段真的 出错了,需引入重试机制或人工处理。
    • Cancel:取消待执行的业务:取消Try阶段预留的业务资源。通常情况下,采用TCC则认为Cancel阶段也是一定成功的。若 Cancel阶段真的出错了,需引入重试机制或人工处理。

TCC之Try-Confirm阶段.png

TCC之Try-Cancel阶段.png

  • TCC两阶段提交和XA两阶段提交的区别是:
    • XA是资源层面的分布式事务,强一致性,在两阶段提交的整个过程中,一直会持有资源的锁。
    • TCC是业务层面的分布式事务,最终一致性,不会一直持有资源的锁。
  • 优点:把数据库层的二阶段提交提高了应用层来实现,规避了数据库层的2PC性能低下的问题。
  • 缺点:TCC的Try、Confirm和Cancel操作功能需要业务提供,开发成本高。

3 Seata(v 1.4.0)

3.1 Seata介绍

  • 2019年1月,阿里巴巴中间件团队发起了开源项目Fescar,其愿景是让分布式事务的使用像本地事务的使用一样,简单和高效,并逐步解决开发者遇到的分布式事务方面的所有难题,后来更名为Seata,是一套分布式事务的解决方案。
  • Seata的设计目标是对业务无侵入,因此从业务无侵入的2PC方案着手,在传统2PC的基础上演进,它把分布式事务理解成一个包含若干分支事务的全局事务,全局事务的职责是协调其管辖下的分支事务达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个关系数据库的本地事务。

Seata的原理.png

  • Seata主要由三个重要组成组成:
    • TC:Transaction Coordinator事务协调器,管理全局的分支事务的状态,用于全局性事务的提交和回滚。
    • TM:Transaction Manager事务管理器,用于开启全局、提交和回滚全局事务。
    • RM:Resource Manager资源管理器,用于分支事务上的资源管理,向TC注册分支事务,上报分支事务的状态,接受TC的命令以便提交或者回滚分支事务。

Seata的三个重要组成.png

  • Seata的执行流程如下:
    • 分布式事务Seata - 图26A服务的TM向TC申请开启一个全局事务,TC就会创建一个全局事务并返回一个唯一的XID。
    • 分布式事务Seata - 图27A服务的RM向TC注册分支事务,并将其纳入XID对应全局事务的管辖。
    • 分布式事务Seata - 图28A服务执行分支事务,向数据库提交操作。
    • 分布式事务Seata - 图29A服务开始远程调用B服务,此时XID会在微服务的调用链上传播。
    • 分布式事务Seata - 图30B服务的RM注册分支事务,并将其纳入XID对应的全局事务的管辖。
    • 分布式事务Seata - 图31B服务执行分支事务,向数据库提交操作。
    • 分布式事务Seata - 图32全局事务调用链处理完毕,TM会根据有无异常向TC发起全局事务的提交或回滚。
    • 分布式事务Seata - 图33TC协调其管辖之下的所有分支事务,决定是否回滚。
  • Seata实现2PC和传统2PC的差别:
    • 分布式事务Seata - 图34架构层次方面,传统的2PC方案的RM实际上是在数据库层,RM本质上就是数据库本身,通过XA协议实现,而Seata的RM是以jar包的形式作为中间件层部署在应用程序这一层。
    • 分布式事务Seata - 图35两阶段提交方面,传统2PC无论第二阶段的决议是commit还是rollback,事务性资源的锁都要保持到2Phase完成才释放,而Seata的做法是在1Phase就将本地事务提交,这样就省去了2Phase持锁的时间,整体提交了效率。

3.2 Seata实现分布式事务控制

  • 通过Seata中间件实现分布式事务,模拟电商中的下单和扣除库存的过程。
  • 通过订单微服务执行下单操作,然后通过订单微服务调用商品微服务扣除库存。

模拟电商中的下单和扣库存的过程.png

3.3 准备环境

3.3.1 数据库脚本

  • 数据库脚本:
  1. CREATE DATABASE `seata-order` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_general_ci';
  2. CREATE TABLE `order_detail` (
  3. `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  4. `username` varchar(255) DEFAULT NULL COMMENT '用户名',
  5. `product_id` int(11) DEFAULT NULL COMMENT '商品的id',
  6. `number` int(11) DEFAULT NULL COMMENT '数量',
  7. `product_name` varchar(255) DEFAULT NULL COMMENT '商品的名称',
  8. `product_price` decimal(10,2) DEFAULT NULL COMMENT '商品的价格',
  9. PRIMARY KEY (`id`)
  10. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单表';
  11. CREATE DATABASE `seata-product` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_general_ci';
  12. CREATE TABLE `product` (
  13. `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  14. `name` varchar(255) DEFAULT NULL COMMENT '名称',
  15. `num` int(11) DEFAULT NULL COMMENT '库存',
  16. PRIMARY KEY (`id`)
  17. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品';
  18. INSERT INTO `product` VALUES (1, '营养快线', 5000);
  19. INSERT INTO `product` VALUES (2, '娃哈哈', 5000);
  20. INSERT INTO `product` VALUES (3, '茅台', 5000);

3.3.2 总工程的pom.xml

  • 总工程的pom.xml
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.xuweiwei</groupId>
  7. <artifactId>seata</artifactId>
  8. <packaging>pom</packaging>
  9. <version>1.0</version>
  10. <modules>
  11. <!-- 订单微服务 -->
  12. <module>seata-order</module>
  13. <!-- 商品微服务 -->
  14. <module>seata-product</module>
  15. <module>seata-eureka</module>
  16. </modules>
  17. <properties>
  18. <maven.compiler.source>8</maven.compiler.source>
  19. <maven.compiler.target>8</maven.compiler.target>
  20. <maven-source.version>1.8</maven-source.version>
  21. <maven-target.version>1.8</maven-target.version>
  22. <spring-cloud-alibaba.version>2.2.3.RELEASE</spring-cloud-alibaba.version>
  23. <spring-cloud.version>Hoxton.SR8</spring-cloud.version>
  24. <spring-boot.version>2.3.2.RELEASE</spring-boot.version>
  25. <lombok.version>1.18.16</lombok.version>
  26. <hutool-all.version>5.5.1</hutool-all.version>
  27. <springfox-boot.version>3.0.0</springfox-boot.version>
  28. <knife4j-boot.version>3.0.2</knife4j-boot.version>
  29. <mapstruct.version>1.4.1.Final</mapstruct.version>
  30. </properties>
  31. <dependencies>
  32. <dependency>
  33. <groupId>org.springframework.boot</groupId>
  34. <artifactId>spring-boot-devtools</artifactId>
  35. <scope>runtime</scope>
  36. <optional>true</optional>
  37. </dependency>
  38. <dependency>
  39. <groupId>org.springframework.boot</groupId>
  40. <artifactId>spring-boot-configuration-processor</artifactId>
  41. <optional>true</optional>
  42. </dependency>
  43. <dependency>
  44. <groupId>org.projectlombok</groupId>
  45. <artifactId>lombok</artifactId>
  46. <version>${lombok.version}</version>
  47. </dependency>
  48. <dependency>
  49. <groupId>cn.hutool</groupId>
  50. <artifactId>hutool-all</artifactId>
  51. <version>${hutool-all.version}</version>
  52. </dependency>
  53. <!-- Swagger -->
  54. <dependency>
  55. <groupId>io.springfox</groupId>
  56. <artifactId>springfox-boot-starter</artifactId>
  57. <version>${springfox-boot.version}</version>
  58. </dependency>
  59. <!--整合Knife4j-->
  60. <dependency>
  61. <groupId>com.github.xiaoymin</groupId>
  62. <artifactId>knife4j-spring-boot-starter</artifactId>
  63. <version>${knife4j-boot.version}</version>
  64. </dependency>
  65. <dependency>
  66. <groupId>org.mapstruct</groupId>
  67. <artifactId>mapstruct-jdk8</artifactId>
  68. <version>${mapstruct.version}</version>
  69. </dependency>
  70. <dependency>
  71. <groupId>org.mapstruct</groupId>
  72. <artifactId>mapstruct</artifactId>
  73. <version>${mapstruct.version}</version>
  74. </dependency>
  75. <dependency>
  76. <groupId>org.mapstruct</groupId>
  77. <artifactId>mapstruct-processor</artifactId>
  78. <version>${mapstruct.version}</version>
  79. </dependency>
  80. </dependencies>
  81. <dependencyManagement>
  82. <dependencies>
  83. <dependency>
  84. <groupId>org.springframework.boot</groupId>
  85. <artifactId>spring-boot-starter-parent</artifactId>
  86. <version>${spring-boot.version}</version>
  87. <type>pom</type>
  88. <scope>import</scope>
  89. </dependency>
  90. <dependency>
  91. <groupId>org.springframework.cloud</groupId>
  92. <artifactId>spring-cloud-dependencies</artifactId>
  93. <version>${spring-cloud.version}</version>
  94. <type>pom</type>
  95. <scope>import</scope>
  96. </dependency>
  97. <dependency>
  98. <groupId>com.alibaba.cloud</groupId>
  99. <artifactId>spring-cloud-alibaba-dependencies</artifactId>
  100. <version>${spring-cloud-alibaba.version}</version>
  101. <type>pom</type>
  102. <scope>import</scope>
  103. </dependency>
  104. </dependencies>
  105. </dependencyManagement>
  106. <build>
  107. <finalName>${project.artifactId}</finalName>
  108. <resources>
  109. <resource>
  110. <directory>src/main/resources</directory>
  111. <filtering>true</filtering>
  112. </resource>
  113. </resources>
  114. <plugins>
  115. <plugin>
  116. <groupId>org.apache.maven.plugins</groupId>
  117. <artifactId>maven-compiler-plugin</artifactId>
  118. <version>3.8.1</version>
  119. <!-- 指定版本-->
  120. <configuration>
  121. <source>${maven-source.version}</source>
  122. <target>${maven-target.version}</target>
  123. <encoding>UTF-8</encoding>
  124. </configuration>
  125. </plugin>
  126. </plugins>
  127. </build>
  128. </project>

3.3.3 商品微服务

  • pom.xml
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>seata</artifactId>
  7. <groupId>com.xuweiwei</groupId>
  8. <version>1.0</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>seata-product</artifactId>
  12. <dependencies>
  13. <dependency>
  14. <groupId>org.springframework.boot</groupId>
  15. <artifactId>spring-boot-starter-web</artifactId>
  16. </dependency>
  17. <dependency>
  18. <groupId>mysql</groupId>
  19. <artifactId>mysql-connector-java</artifactId>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.springframework.boot</groupId>
  23. <artifactId>spring-boot-starter-data-jpa</artifactId>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.springframework.cloud</groupId>
  27. <artifactId>spring-cloud-starter-openfeign</artifactId>
  28. </dependency>
  29. <dependency>
  30. <groupId>com.alibaba.cloud</groupId>
  31. <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
  32. </dependency>
  33. <dependency>
  34. <groupId>com.alibaba.cloud</groupId>
  35. <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
  36. </dependency>
  37. </dependencies>
  38. </project>
  • 启动类:
  1. package com.xuweiwei;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
  5. /**
  6. * @author 许大仙
  7. * @version 1.0
  8. * @since 2021-02-07 09:37
  9. */
  10. @EnableDiscoveryClient
  11. @SpringBootApplication
  12. public class ProductApplication {
  13. public static void main(String[] args) {
  14. SpringApplication.run(ProductApplication.class, args);
  15. }
  16. }
  • bootstrap.yml
  1. server:
  2. port: 8001
  3. spring:
  4. application:
  5. name: seata-order
  6. profiles:
  7. active: dev # 暂时没使用到
  8. datasource:
  9. driver-class-name: com.mysql.cj.jdbc.Driver
  10. url: jdbc:mysql://127.0.0.1:3306/seata-order?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true&nullCatalogMeansCurrent=true
  11. username: root
  12. password: 123456
  13. type: com.zaxxer.hikari.HikariDataSource
  14. # Hikari 连接池配置
  15. hikari:
  16. # 最小空闲连接数量
  17. minimum-idle: 5
  18. # 空闲连接存活最大时间,默认600000(10分钟)
  19. idle-timeout: 180000
  20. # 连接池最大连接数,默认是10
  21. maximum-pool-size: 1000
  22. # 此属性控制从池返回的连接的默认自动提交行为,默认值:true
  23. auto-commit: true
  24. # 连接池名称
  25. pool-name: HikariCP
  26. # 此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
  27. max-lifetime: 1800000
  28. # 数据库连接超时时间,默认30秒,即30000
  29. connection-timeout: 30000
  30. connection-test-query: SELECT 1
  31. data-source-properties:
  32. useInformationSchema: true
  33. cloud:
  34. nacos:
  35. discovery:
  36. server-addr: 127.0.0.1:8848 # 配置Nacos的地址
  37. config:
  38. server-addr: 127.0.0.1:8848 # 配置中心的地址
  39. file-extension: yml # 执行yaml格式的配置
  40. # JPA
  41. jpa:
  42. hibernate:
  43. ddl-auto: update # 第一次建表create 后面用update
  44. show-sql: true
  45. open-in-view: true
  46. # devtools
  47. devtools:
  48. restart:
  49. # 热部署开关
  50. enabled: true
  51. feign:
  52. # 开启Feign对sentinel的支持
  53. sentinel:
  54. enabled: true
  55. # 客户端配置
  56. client:
  57. config:
  58. default:
  59. connectTimeout: 3000 # 建立连接的超时时间
  60. readTimeout: 5000 # 读取的超时时间
  61. # 配置Feign的日志级别
  62. loggerLevel: full
  63. # Feign的错误解码器,相当于代码配置方式中的ErrorDecoder
  64. errorDecoder: feign.codec.ErrorDecoder.Default
  65. # 配置重试
  66. retryer: feign.Retryer.Default
  67. # 配置熔断不处理404异常
  68. decode404: false
  69. # 请求压缩
  70. compression:
  71. request:
  72. # 开启请求压缩
  73. enabled: true
  74. min-request-size: 2048 # 设置触发压缩的大小下限
  75. mime-types: text/html,application/xml,application/json #设置压缩的数据类型
  76. response:
  77. # 开启响应压缩
  78. enabled: true
  • 实体类Product.java
  1. package com.xuweiwei.biz.domain;
  2. import lombok.Getter;
  3. import lombok.Setter;
  4. import lombok.ToString;
  5. import javax.persistence.*;
  6. import java.io.Serializable;
  7. @Setter
  8. @Getter
  9. @ToString
  10. @Entity
  11. @Table(name = "`product`")
  12. public class Product implements Serializable {
  13. @Id
  14. @GeneratedValue(strategy = GenerationType.IDENTITY)
  15. private Integer id;
  16. @Column(name = "name")
  17. private String name;
  18. @Column(name = "num")
  19. private Integer num;
  20. }
  • Dao层接口:
  1. package com.xuweiwei.biz.dao;
  2. import com.xuweiwei.biz.domain.Product;
  3. import org.springframework.data.jpa.repository.JpaRepository;
  4. import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
  5. /**
  6. * @author 许大仙
  7. * @version 1.0
  8. * @since 2021-02-05 16:05
  9. */
  10. public interface ProductRepository extends JpaRepository<Product, Integer>, JpaSpecificationExecutor<Product> {
  11. }
  • 业务层接口:
  1. package com.xuweiwei.biz.service;
  2. import com.xuweiwei.biz.domain.Product;
  3. /**
  4. * @author 许大仙
  5. * @version 1.0
  6. * @since 2021-02-07 10:25
  7. */
  8. public interface ProductService {
  9. Product view(Integer id);
  10. void reduceInventory(Integer id, Integer num);
  11. }
  • 业务层实现类:
  1. package com.xuweiwei.biz.service.impl;
  2. import com.xuweiwei.biz.dao.ProductRepository;
  3. import com.xuweiwei.biz.domain.Product;
  4. import com.xuweiwei.biz.service.ProductService;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Service;
  7. import org.springframework.transaction.annotation.Transactional;
  8. import java.util.Optional;
  9. /**
  10. * @author 许大仙
  11. * @version 1.0
  12. * @since 2021-02-07 10:25
  13. */
  14. @Service
  15. @Transactional
  16. public class ProductServiceImpl implements ProductService {
  17. @Autowired
  18. private ProductRepository productRepository;
  19. @Override
  20. public Product view(Integer id) {
  21. return productRepository.findById(id).orElseGet(Product::new);
  22. }
  23. @Override
  24. public void reduceInventory(Integer id, Integer num) {
  25. Optional<Product> optional = productRepository.findById(id);
  26. if (optional.isPresent()) {
  27. Product product = optional.get();
  28. int inventory = product.getNum() - num;
  29. product.setNum(inventory);
  30. productRepository.save(product);
  31. }
  32. }
  33. }
  • 控制器ProductController.java
package com.xuweiwei.biz.web;

import com.xuweiwei.biz.domain.Product;
import com.xuweiwei.biz.service.ProductService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

/**
 * @author 许大仙
 * @version 1.0
 * @since 2021-02-07 09:37
 */
@RestController
@RequestMapping(value = "/product")
public class ProductController {

    @Autowired
    private ProductService productService;

    /**
     * 查询商品信息
     *
     * @param id
     * @return
     */
    @GetMapping(value = "/view/{id}")
    public Product view(@PathVariable(value = "id") Integer id) {
        return productService.view(id);
    }

    /**
     * 扣除库存
     *
     * @param id
     * @param num
     */
    @PostMapping(value = "/reduceInventory")
    public void reduceInventory(@RequestParam(value = "id") Integer id,@RequestParam(value = "num") Integer num) {
        productService.reduceInventory(id,num);
    }
}

3.3.4 订单微服务

  • pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>seata</artifactId>
        <groupId>com.xuweiwei</groupId>
        <version>1.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>seata-order</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        </dependency>
    </dependencies>

</project>
  • 启动类:
package com.xuweiwei;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;

/**
 * @author 许大仙
 * @version 1.0
 * @since 2021-02-07 09:51
 */
@EnableDiscoveryClient
@SpringBootApplication
@EnableFeignClients
public class OrderApplication {
    public static void main(String[] args) {
        SpringApplication.run(OrderApplication.class, args);
    }
}
  • bootstrap.yml
server:
  port: 8002

spring:
  application:
    name: seata-product
  profiles:
    active: dev # 暂时没使用到
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/seata-product?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true&nullCatalogMeansCurrent=true
    driver-class-name: com.mysql.cj.jdbc.Driver
    username: root
    password: 123456
    type: com.zaxxer.hikari.HikariDataSource
    # Hikari 连接池配置
    hikari:
      # 最小空闲连接数量
      minimum-idle: 5
      # 空闲连接存活最大时间,默认600000(10分钟)
      idle-timeout: 180000
      # 连接池最大连接数,默认是10
      maximum-pool-size: 1000
      # 此属性控制从池返回的连接的默认自动提交行为,默认值:true
      auto-commit: true
      # 连接池名称
      pool-name: HikariCP
      # 此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
      max-lifetime: 1800000
      # 数据库连接超时时间,默认30秒,即30000
      connection-timeout: 30000
      connection-test-query: SELECT 1
      data-source-properties:
        useInformationSchema: true
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848 # 配置Nacos的地址
    config:
      server-addr: 127.0.0.1:8848 # 配置中心的地址
      file-extension: yml # 执行yaml格式的配置
  # JPA
  jpa:
    hibernate:
      ddl-auto: update  # 第一次建表create  后面用update
    show-sql: true
    open-in-view: true
  # devtools
  devtools:
    restart:
      # 热部署开关
      enabled: true


feign:
  # 开启Feign对sentinel的支持
  sentinel:
    enabled: true
  # 客户端配置
  client:
    config:
      default:
        connectTimeout: 3000  # 建立连接的超时时间
        readTimeout: 5000 # 读取的超时时间
        # 配置Feign的日志级别
        loggerLevel: full
        # Feign的错误解码器,相当于代码配置方式中的ErrorDecoder
        errorDecoder: feign.codec.ErrorDecoder.Default
        # 配置重试
        retryer: feign.Retryer.Default
        # 配置熔断不处理404异常
        decode404: false
  # 请求压缩
  compression:
    request:
      # 开启请求压缩
      enabled: true
      min-request-size: 2048 # 设置触发压缩的大小下限
      mime-types: text/html,application/xml,application/json #设置压缩的数据类型
    response:
      # 开启响应压缩
      enabled: true
  • 实体类OrderDetail.java
package com.xuweiwei.biz.domain;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import javax.persistence.*;
import java.io.Serializable;
import java.math.BigDecimal;

@Setter
@Getter
@ToString
@Entity
@Table(name="`order_detail`")
public class OrderDetail implements Serializable {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Integer id;

    @Column(name = "username")
    private String username;

    @Column(name = "product_id")
    private Integer productId;

    @Column(name = "number")
    private Integer number;

    @Column(name = "product_name")
    private String productName;

    @Column(name = "product_price")
    private BigDecimal productPrice;

}
  • Dao层接口:
package com.xuweiwei.biz.dao;


import com.xuweiwei.biz.domain.OrderDetail;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;

/**
 * @author 许大仙
 * @version 1.0
 * @since 2021-02-05 16:05
 */
public interface OrderDetailRepository extends JpaRepository<OrderDetail, Integer>, JpaSpecificationExecutor<OrderDetail> {
}
  • 业务层接口:
package com.xuweiwei.biz.service;

import com.xuweiwei.biz.domain.OrderDetail;

/**
 * @author 许大仙
 * @version 1.0
 * @since 2021-02-07 10:10
 */
public interface OrderDetailService {

    /**
     * 创建订单
     *
     * @param pid
     * @return
     */
    OrderDetail createOrder(Integer pid);
}
  • 业务层实现类:
package com.xuweiwei.biz.service.impl;

import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONUtil;
import com.xuweiwei.api.ProductFeign;
import com.xuweiwei.biz.dao.OrderDetailRepository;
import com.xuweiwei.biz.domain.OrderDetail;
import com.xuweiwei.biz.service.OrderDetailService;
import com.xuweiwei.biz.utils.Product;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

/**
 * @author 许大仙
 * @version 1.0
 * @since 2021-02-07 10:10
 */
@Transactional
@Service
@Slf4j
public class OrderDetailServiceImpl implements OrderDetailService {

    @Autowired
    private OrderDetailRepository orderDetailRepository;

    @Autowired
    private ProductFeign productFeign;

    @Override
    public OrderDetail createOrder(Integer pid) {

        Product product = productFeign.view(pid);

        log.info("查询到{}号商品的信息,内容是:{}", pid, JSONUtil.parseObj(product).toStringPretty());

        if (ObjectUtil.isEmpty(product.getId())) {
            throw new RuntimeException("商品不存在");
        }

        //创建订单
        OrderDetail orderDetail = new OrderDetail();
        orderDetail.setUsername("测试用户");
        orderDetail.setProductId(pid);
        orderDetail.setNumber(1);
        orderDetail.setProductName(product.getName());

        OrderDetail orderDetailDb = orderDetailRepository.save(orderDetail);

        log.info("创建订单成功,订单信息为{}", JSONUtil.parseObj(orderDetailDb).toStringPretty());

        productFeign.reduceInventory(pid, orderDetail.getNumber());

        return orderDetailDb;
    }
}
  • 数据传输对象:
package com.xuweiwei.biz.utils;

import lombok.Getter;
import lombok.Setter;

@Setter
@Getter
public class Product {

    private Integer id;

    private String name;

    private Integer num;
}
  • 控制器OrderController.java
package com.xuweiwei.biz.web;

import com.xuweiwei.biz.domain.OrderDetail;
import com.xuweiwei.biz.service.OrderDetailService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author 许大仙
 * @version 1.0
 * @since 2021-02-07 09:51
 */
@RestController
@RequestMapping(value = "/order")
@Slf4j
@Api(tags = "订单")
public class OrderController {


    @Autowired
    private OrderDetailService orderDetailService;

    @ApiOperation(value = "新增订单", notes = "新增订单", httpMethod = "POST")
    @PostMapping(value = "/order/buy/{pid}")
    public OrderDetail order(@PathVariable("pid") @ApiParam(value = "pid", required = true) Integer pid) {
        log.info("接收到{}号商品的下单请求", pid);
        return orderDetailService.createOrder(pid);
    }

}
  • Swagger的配置类:
package com.xuweiwei.biz.config;

import com.github.xiaoymin.knife4j.spring.annotations.EnableKnife4j;
import io.swagger.models.auth.In;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.builders.RequestParameterBuilder;
import springfox.documentation.oas.annotations.EnableOpenApi;
import springfox.documentation.service.*;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spi.service.contexts.SecurityContext;
import springfox.documentation.spring.web.plugins.Docket;

import java.util.Collections;
import java.util.List;

/**
 * @author 许大仙
 * @version 1.0
 * @since 2020-11-17 10:56
 */
@Configuration
@EnableOpenApi
@EnableKnife4j
public class SwaggerConfig {

    @Bean
    public Docket docket() {
        RequestParameter parameter = new RequestParameterBuilder()
                .name("Authorization")
                .description("请求头")
                .in(ParameterType.HEADER)
                .required(false)
                .build();
        List<RequestParameter> parameters = Collections.singletonList(parameter);

        return new Docket(DocumentationType.OAS_30)
                .apiInfo(this.apiInfo())
                .select()
                //指定扫描的包
                .apis(RequestHandlerSelectors.basePackage("com.xuweiwei.biz.web"))
                .paths(PathSelectors.any())
                .build()
                .globalRequestParameters(parameters)
                .securitySchemes(this.securitySchemes())
                .securityContexts(this.securityContexts())
                .enable(true);
    }

    /**
     * 设置授权信息
     */
    private List<SecurityScheme> securitySchemes() {
        return Collections.singletonList(new ApiKey("Authorization", "token", In.HEADER.toValue()));
    }

    /**
     * 授权信息全局应用
     */
    private List<SecurityContext> securityContexts() {
        return Collections.singletonList(
                SecurityContext.builder()
                        .securityReferences(Collections.singletonList(new SecurityReference("Authorization", new AuthorizationScope[]{new AuthorizationScope("global", "")})))
                        .build()
        );
    }

    private ApiInfo apiInfo() {
        return new ApiInfoBuilder()
                .title("后端微服务架构")
                .description("订单微服务")
                .contact(new Contact("许大仙", "", "1900919313@qq.com"))
                .version("1.0")
                .build();
    }
}
  • Feign接口:
package com.xuweiwei.api;

import com.xuweiwei.api.callback.ProductFeignCallBack;
import com.xuweiwei.biz.utils.Product;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;

/**
 * @author 许大仙
 * @version 1.0
 * @since 2021-02-07 10:11
 */
@Component
@FeignClient(name = "seata-product",fallback = ProductFeignCallBack.class)
public interface ProductFeign {

    @GetMapping(value = "/product/view/{pid}")
    Product view(@PathVariable(value = "pid") Integer pid);

    @PostMapping(value = "/product/reduceInventory")
    void reduceInventory(@RequestParam(value = "id") Integer id,@RequestParam(value = "num") Integer num);
}
  • Feign熔断处理类(暂时不会起作用,因为没有熔断组件):
package com.xuweiwei.api.callback;

import com.xuweiwei.api.ProductFeign;
import com.xuweiwei.biz.utils.Product;
import org.springframework.stereotype.Component;

/**
 * @author 许大仙
 * @version 1.0
 * @since 2021-02-07 10:57
 */
@Component
public class ProductFeignCallBack implements ProductFeign {
    @Override
    public Product view(Integer pid) {
        return new Product();
    }

    @Override
    public void reduceInventory(Integer id, Integer num) {

    }
}

3.4 异常模式

  • 在订单微服务的业务层实现类中模拟一个异常:商品微服务成功扣除库存,而订单微服务出现错误,引起本地事务回滚。
package com.xuweiwei.biz.service.impl;

import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONUtil;
import com.xuweiwei.api.ProductFeign;
import com.xuweiwei.biz.dao.OrderDetailRepository;
import com.xuweiwei.biz.domain.OrderDetail;
import com.xuweiwei.biz.service.OrderDetailService;
import com.xuweiwei.biz.utils.Product;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

/**
 * @author 许大仙
 * @version 1.0
 * @since 2021-02-07 10:10
 */
@Transactional
@Service
@Slf4j
public class OrderDetailServiceImpl implements OrderDetailService {

    @Autowired
    private OrderDetailRepository orderDetailRepository;

    @Autowired
    private ProductFeign productFeign;

    @Override
    public OrderDetail createOrder(Integer pid) {

        Product product = productFeign.view(pid);

        log.info("查询到{}号商品的信息,内容是:{}", pid, JSONUtil.parseObj(product).toStringPretty());

        if (ObjectUtil.isEmpty(product.getId())) {
            throw new RuntimeException("商品不存在");
        }

        //创建订单
        OrderDetail orderDetail = new OrderDetail();
        orderDetail.setUsername("测试用户");
        orderDetail.setProductId(pid);
        orderDetail.setNumber(1);
        orderDetail.setProductName(product.getName());

        OrderDetail orderDetailDb = orderDetailRepository.save(orderDetail);

        log.info("创建订单成功,订单信息为{}", JSONUtil.parseObj(orderDetailDb).toStringPretty());

        productFeign.reduceInventory(pid, orderDetail.getNumber());

        int i = 10 / 0;

        return orderDetailDb;
    }
}

3.5 启动Seata

3.5.1 Seata下载

3.5.2 修改配置文件

  • 将下载得到的压缩包进行解压,进入conf目录,修改下面的配置文件。
  • registry.conf
registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"
  loadBalance = "RandomLoadBalance"
  loadBalanceVirtualNodes = 10

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = ""
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "nacos"

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = "nacos"
    password ="nacos"
  }

}
  • 到GitHub的Seata源码库下载两个文件,config.txtnacos-config.sh,其中config.txt复制到seata目录下,而nacos-config.sh复制到seata/conf目录下。

config.txt的位置.png

nacos-config.sh的位置.png

  • 修改config.txt:
service.vgroupMapping.seata-product=default
service.vgroupMapping.seata-order=default

语法为:service.vgroupMapping.定义的服务组名称=命名空间

3.5.3 初始化Seata到Nacos的配置

  • 需要保证Nacos已经正常运行:
cd conf
nacos-config.sh

完整的命令:nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -t 命名空间的id - u nacos -w nacos

  • 执行成功后,可以打开Nacos的控制台,在配置列表中,可以看到初始化了很多Group为SEATA_GROUP的配置。

初始化Seata到Nacos的配置.png

3.5.4 启动Seata服务

  • 启动Seata服务:
cd bin
seata-server.bat -p 8091 -m file
  • 执行成功后,在Nacos的服务列表下面可以看到一个名为seata-server的服务。

启动Seata服务.png

3.6 使用Seata实现事务控制

3.6.1 初始化数据库表

  • 在我们的数据库中加入一张undo_log表,这是Seata记录事务日志要用到的表。
CREATE TABLE IF NOT EXISTS `undo_log`
(
    `branch_id`     BIGINT(20)   NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(100) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

3.6.2 添加配置

  • 在需要进行分布式控制的微服务中进行如下的配置。
  • 分布式事务Seata - 图41添加依赖:
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId> 
</dependency>
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
  • 分布式事务Seata - 图42配置DataSourceProxyConfig:Seata是通过带来数据源实现事务控制的,所以需要配置io.seata.rm.datasource.DataSourceProxy的Bean,且是@Primary默认的数据源,否则事务不会回滚,无法实现分布式事务。
package com.xuweiwei.biz.config;

import com.zaxxer.hikari.HikariDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import javax.sql.DataSource;

/**
 * @author 许大仙
 * @version 1.0
 * @since 2021-02-07 14:42
 */
@Configuration
@Slf4j
public class DataSourceProxyConfig {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource dataSource(DataSourceProperties properties){
        log.info("init  data source:{}", properties);
        return DataSourceBuilder.create(properties.getClassLoader())
                .type(HikariDataSource.class)
                .driverClassName(properties.determineDriverClassName())
                .url(properties.determineUrl())
                .username(properties.determineUsername())
                .password(properties.determinePassword())
                .build();
    }

    @Bean
    @Primary
    public DataSourceProxy dataSourceProxy(DataSource dataSource){
        return new DataSourceProxy(dataSource);
    }

}
  • 分布式事务Seata - 图43修改bootstrap.yml文件:
# 修改部分
seata:
  enabled: true
  tx-service-group: ${spring.application.name}
  enable-auto-data-source-proxy: true
  config:
    type: nacos
    nacos:
      server-addr: ${spring.cloud.nacos.config.server-addr}
      group: SEATA_GROUP
      username: "nacos"
      password: "nacos"
      namespace: ""  # 如果在nacos-config.sh将配置导入到Nacos中的时候加入了-t 参数,那么此处需要添加namespace的id
  registry:
    type: nacos
    nacos:
      application: seata-server
      server-addr: ${spring.cloud.nacos.config.server-addr}
      username: "nacos"
      password: "nacos"
  • 完整的商品微服务的bootstrap.yml配置:
server:
  port: 8002

spring:
  application:
    name: seata-product
  profiles:
    active: dev # 暂时没使用到
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/seata-product?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true&nullCatalogMeansCurrent=true
    driver-class-name: com.mysql.cj.jdbc.Driver
    username: root
    password: 123456
    type: com.zaxxer.hikari.HikariDataSource
    # Hikari 连接池配置
    hikari:
      # 最小空闲连接数量
      minimum-idle: 5
      # 空闲连接存活最大时间,默认600000(10分钟)
      idle-timeout: 180000
      # 连接池最大连接数,默认是10
      maximum-pool-size: 1000
      # 此属性控制从池返回的连接的默认自动提交行为,默认值:true
      auto-commit: true
      # 连接池名称
      pool-name: HikariCP
      # 此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
      max-lifetime: 1800000
      # 数据库连接超时时间,默认30秒,即30000
      connection-timeout: 30000
      connection-test-query: SELECT 1
      data-source-properties:
        useInformationSchema: true
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848 # 配置Nacos的地址
      config:
        server-addr: 127.0.0.1:8848 # 配置中心的地址
        file-extension: yml # 执行yaml格式的配置
  # JPA
  jpa:
    hibernate:
      ddl-auto: update  # 第一次建表create  后面用update
    show-sql: true
    open-in-view: true
  # devtools
  devtools:
    restart:
      # 热部署开关
      enabled: true

seata:
  enabled: true
  tx-service-group: ${spring.application.name}
  enable-auto-data-source-proxy: true
  config:
    type: nacos
    nacos:
      server-addr: ${spring.cloud.nacos.config.server-addr}
      group: SEATA_GROUP
      username: "nacos"
      password: "nacos"
      namespace: ""  # 如果在nacos-config.sh将配置导入到Nacos中的时候加入了-t 参数,那么此处需要添加namespace的id
  registry:
    type: nacos
    nacos:
      application: seata-server
      server-addr: ${spring.cloud.nacos.config.server-addr}
      username: "nacos"
      password: "nacos"



feign:
  # 开启Feign对sentinel的支持
  sentinel:
    enabled: true
  # 客户端配置
  client:
    config:
      default:
        connectTimeout: 3000  # 建立连接的超时时间
        readTimeout: 5000 # 读取的超时时间
        # 配置Feign的日志级别
        loggerLevel: full
        # Feign的错误解码器,相当于代码配置方式中的ErrorDecoder
        errorDecoder: feign.codec.ErrorDecoder.Default
        # 配置重试
        retryer: feign.Retryer.Default
        # 配置熔断不处理404异常
        decode404: false
  # 请求压缩
  compression:
    request:
      # 开启请求压缩
      enabled: true
      min-request-size: 2048 # 设置触发压缩的大小下限
      mime-types: text/html,application/xml,application/json #设置压缩的数据类型
    response:
      # 开启响应压缩
      enabled: true
  • 完成的订单微服务的bootstrap.yml配置:
server:
  port: 8001

spring:
  application:
    name: seata-order
  profiles:
    active: dev # 暂时没使用到
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/seata-order?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true&nullCatalogMeansCurrent=true
    username: root
    password: 123456
    type: com.zaxxer.hikari.HikariDataSource
    # Hikari 连接池配置
    hikari:
      # 最小空闲连接数量
      minimum-idle: 5
      # 空闲连接存活最大时间,默认600000(10分钟)
      idle-timeout: 180000
      # 连接池最大连接数,默认是10
      maximum-pool-size: 1000
      # 此属性控制从池返回的连接的默认自动提交行为,默认值:true
      auto-commit: true
      # 连接池名称
      pool-name: HikariCP
      # 此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
      max-lifetime: 1800000
      # 数据库连接超时时间,默认30秒,即30000
      connection-timeout: 30000
      connection-test-query: SELECT 1
      data-source-properties:
        useInformationSchema: true
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848 # 配置Nacos的地址
      config:
        server-addr: 127.0.0.1:8848 # 配置中心的地址
        file-extension: yml # 执行yaml格式的配置
  # JPA
  jpa:
    hibernate:
      ddl-auto: update  # 第一次建表create  后面用update
    show-sql: true
    open-in-view: true
  # devtools
  devtools:
    restart:
      # 热部署开关
      enabled: true
seata:
  enabled: true
  tx-service-group: ${spring.application.name}
  enable-auto-data-source-proxy: true
  config:
    type: nacos
    nacos:
      server-addr: ${spring.cloud.nacos.config.server-addr}
      group: SEATA_GROUP
      username: "nacos"
      password: "nacos"
      namespace: ""  # 如果在nacos-config.sh将配置导入到Nacos中的时候加入了-t 参数,那么此处需要添加namespace的id
  registry:
    type: nacos
    nacos:
      application: seata-server
      server-addr: ${spring.cloud.nacos.config.server-addr}
      username: "nacos"
      password: "nacos"


feign:
  # 开启Feign对sentinel的支持
  sentinel:
    enabled: true
  # 客户端配置
  client:
    config:
      default:
        connectTimeout: 3000  # 建立连接的超时时间
        readTimeout: 5000 # 读取的超时时间
        # 配置Feign的日志级别
        loggerLevel: full
        # Feign的错误解码器,相当于代码配置方式中的ErrorDecoder
        errorDecoder: feign.codec.ErrorDecoder.Default
        # 配置重试
        retryer: feign.Retryer.Default
        # 配置熔断不处理404异常
        decode404: false
  # 请求压缩
  compression:
    request:
      # 开启请求压缩
      enabled: true
      min-request-size: 2048 # 设置触发压缩的大小下限
      mime-types: text/html,application/xml,application/json #设置压缩的数据类型
    response:
      # 开启响应压缩
      enabled: true

3.6.3 在订单微服务中开启全局事务

  • 在订单微服务中开启全局事务:
package com.xuweiwei.biz.service.impl;

import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONUtil;
import com.xuweiwei.api.ProductFeign;
import com.xuweiwei.biz.dao.OrderDetailRepository;
import com.xuweiwei.biz.domain.OrderDetail;
import com.xuweiwei.biz.service.OrderDetailService;
import com.xuweiwei.biz.utils.Product;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

/**
 * @author 许大仙
 * @version 1.0
 * @since 2021-02-07 10:10
 */
@Transactional
@Service
@Slf4j
public class OrderDetailServiceImpl implements OrderDetailService {

    @Autowired
    private OrderDetailRepository orderDetailRepository;

    @Autowired
    private ProductFeign productFeign;

    @GlobalTransactional //开启全局事务控制
    @Override
    public OrderDetail createOrder(Integer pid) {

        Product product = productFeign.view(pid);

        log.info("查询到{}号商品的信息,内容是:{}", pid, JSONUtil.parseObj(product).toStringPretty());

        if (ObjectUtil.isEmpty(product.getId())) {
            throw new RuntimeException("商品不存在");
        }

        //创建订单
        OrderDetail orderDetail = new OrderDetail();
        orderDetail.setUsername("测试用户");
        orderDetail.setProductId(pid);
        orderDetail.setNumber(1);
        orderDetail.setProductName(product.getName());

        OrderDetail orderDetailDb = orderDetailRepository.save(orderDetail);

        log.info("创建订单成功,订单信息为{}", JSONUtil.parseObj(orderDetailDb).toStringPretty());

        productFeign.reduceInventory(pid, orderDetail.getNumber());

        //模拟异常
        int i = 10 / 0;

        return orderDetailDb;
    }
}

3.7 Seata运行流程分析

Seata运行流程分析.png

  • 要点说明:
  • 1️⃣每个RM使用DataSourceProxy连接数据库,其目的是使用ConnectionProxy,使用数据源和数据连 接代理的目的就是在第一阶段将undo_log和业务数据放在一个本地事务提交,这样就保存了只要有业务 操作就一定有undo_log。
  • 2️⃣在第一阶段undo_log中存放了数据修改前和修改后的值,为事务回滚作好准备,所以第一阶段完成 就已经将分支事务提交,也就释放了锁资源。
  • 3️⃣TM开启全局事务开始,将XID全局事务id放在事务上下文中,通过feign调用也将XID传入下游分支 事务,每个分支事务将自己的Branch ID分支事务ID与XID关联。
  • 4️⃣第二阶段全局事务提交,TC会通知各各分支参与者提交分支事务,在第一阶段就已经提交了分支事 务,这里各各参与者只需要删除undo_log即可,并且可以异步执行,第二阶段很快可以完成。
  • 5️⃣第二阶段全局事务回滚,TC会通知各各分支参与者回滚分支事务,通过 XID 和 Branch ID 找到相应 的回滚日志,通过回滚日志生成反向的 SQL 并执行,以完成分支事务回滚到之前的状态,如果回滚失 败则会重试回滚操作。