数据库初始化工具

image.png

  1. 创建模块
  2. 添加依赖

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.springframework.boot</groupId>
    4. <artifactId>spring-boot-starter-data-jdbc</artifactId>
    5. </dependency>
    6. <dependency>
    7. <groupId>mysql</groupId>
    8. <artifactId>mysql-connector-java</artifactId>
    9. <scope>runtime</scope>
    10. </dependency>
    11. <dependency>
    12. <groupId>org.springframework.boot</groupId>
    13. <artifactId>spring-boot-starter-test</artifactId>
    14. <scope>test</scope>
    15. </dependency>
    16. </dependencies>
  3. yml配置连接数据库

    1. spring:
    2. datasource:
    3. url: jdbc:mysql:///?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8
    4. driver-class-name: com.mysql.cj.jdbc.Driver
    5. username: root
    6. password: root
  4. 创建sql脚本文件

在resource资源文件夹下创建sql目录,在sql目录中创建SQL脚本文件

  • storage.sql
  • seata-server.sql
  • order.sql
  • account.sql
    1. 在启动类中执行sql脚本
  • @PostConstruct注解作用:
  • ScriptUtils.executeSqlScript()是Spring提供的 jdbc 脚本执行器

    1. @SpringBootApplication
    2. public class DbInitApplication {
    3. @Autowired
    4. private DataSource dataSource;
    5. public static void main(String[] args) {
    6. SpringApplication.run(DbInitApplication.class, args);
    7. }
    8. @PostConstruct
    9. public void dbInit() throws Exception {
    10. exec("sql/account.sql");
    11. exec("sql/order.sql");
    12. exec("sql/seata-server.sql");
    13. exec("sql/storage.sql");
    14. }
    15. private void exec(String sql) throws Exception {
    16. ClassPathResource pathResource = new ClassPathResource(sql, DbInitApplication.class.getClassLoader());
    17. EncodedResource resource = new EncodedResource(pathResource, StandardCharsets.UTF_8);
    18. //spring jdbc 提供的一个工具用户执行sql脚本文件
    19. ScriptUtils.executeSqlScript(dataSource.getConnection(), resource);
    20. }
    21. }

    Seata AT事务

    部署seata server(TC事务协调器)

    下载地址:https://github.com/seata/seata/releases

  1. 配置

    • 修改registry.conf —向注册中心注册,将type="file"改为type="eureka",注册至eureka的服务名改为自定义服务名 ```java registry {

      file 、nacos 、eureka、redis、zk、consul、etcd3、sofa

      type = “eureka”

    eureka { serviceUrl = “http://localhost:8761/eureka“ application = “seata-server” weight = “1” }

    1. - file.conf -- 协调器运行过程中记录的日志数据,要存到数据库,设置`mode="db"`,使用数据库存储日志数据,设置mysql连接和用户名密码以及存储数据的表名
    2. ```java
    3. mode = "db"
    4. publicKey = ""
    5. db {
    6. ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
    7. datasource = "druid"
    8. ## mysql/oracle/postgresql/h2/oceanbase etc.
    9. dbType = "mysql"
    10. driverClassName = "com.mysql.cj.jdbc.Driver"
    11. ## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection param
    12. url = "jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8"
    13. user = "root"
    14. password = "root"
    15. minConn = 5
    16. maxConn = 30
    17. globalTable = "global_table"
    18. branchTable = "branch_table"
    19. lockTable = "lock_table"
    20. queryLimit = 100
    21. maxWait = 5000
    22. }
    • seata-server.bat — 使用的内存默认 2G,测试环境把内存改小:256m,修改第85行数据
      1. %JAVACMD% %JAVA_OPTS% -server -Xmx256m -Xms256m -Xmn128m -Xss512k -XX:SurvivorRatio=10 -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=256m -XX:MaxDirectMemorySize=1024m -XX:-OmitStackTraceInFastThrow -XX:-UseAdaptiveSizePolicy -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath="%BASEDIR%"/logs/java_heapdump.hprof -XX:+DisableExplicitGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=75 -Xloggc:"%BASEDIR%"/logs/seata_gc.log -verbose:gc -Dio.netty.leakDetectionLevel=advanced -Dlogback.color.disable-for-bat=true -classpath %CLASSPATH% -Dapp.name="seata-server" -Dapp.repo="%REPO%" -Dapp.home="%BASEDIR%" -Dbasedir="%BASEDIR%" io.seata.server.Server %CMD_LINE_ARGS%
  2. 执行seata-server.bat启动协调器

    在无事务的项目中添加Seata AT事务

  3. 父项目中添加seata依赖

  4. order中配置三个配置文件

    • application.yml—配置事务组的组名 ```yaml spring: application: name: order

    cloud: alibaba: seata:

    1. tx-service-group: order_tx_group #表示当前模块在那个事务组中

    ```

    • registry.conf—-注册中心地址 ```yaml registry {

      file 、nacos 、eureka、redis、zk、consul、etcd3、sofa

      type = “eureka”

    nacos { serverAddr = “localhost” namespace = “” cluster = “default” } eureka {

    连接eureka,从注册表中获取seata-server

    serviceUrl = “http://localhost:8761/eureka

    application = “default”

    weight = “1”

    } redis { serverAddr = “localhost:6379” db = “0” password = “” cluster = “default” timeout = “0” } zk { cluster = “default” serverAddr = “127.0.0.1:2181” session.timeout = 6000 connect.timeout = 2000 username = “” password = “” } consul { cluster = “default” serverAddr = “127.0.0.1:8500” } etcd3 { cluster = “default” serverAddr = “http://localhost:2379“ } sofa { serverAddr = “127.0.0.1:9603” application = “default” region = “DEFAULT_ZONE” datacenter = “DefaultDataCenter” cluster = “default” group = “SEATA_GROUP” addressWaitTime = “3000” } file { name = “file.conf” } }

config {

file、nacos 、apollo、zk、consul、etcd3、springCloudConfig

type = “file”

nacos { serverAddr = “localhost” namespace = “” group = “SEATA_GROUP” } consul { serverAddr = “127.0.0.1:8500” } apollo { app.id = “seata-server” apollo.meta = “http://192.168.1.204:8801“ namespace = “application” } zk { serverAddr = “127.0.0.1:2181” session.timeout = 6000 connect.timeout = 2000 username = “” password = “” } etcd3 { serverAddr = “http://localhost:2379“ } file { name = “file.conf” } }

  1. - file.conf---配置事务组,使用的协调器
  2. ```yaml
  3. transport {
  4. # tcp udt unix-domain-socket
  5. type = "TCP"
  6. #NIO NATIVE
  7. server = "NIO"
  8. #enable heartbeat
  9. heartbeat = true
  10. # the client batch send request enable
  11. enableClientBatchSendRequest = true
  12. #thread factory for netty
  13. threadFactory {
  14. bossThreadPrefix = "NettyBoss"
  15. workerThreadPrefix = "NettyServerNIOWorker"
  16. serverExecutorThread-prefix = "NettyServerBizHandler"
  17. shareBossWorker = false
  18. clientSelectorThreadPrefix = "NettyClientSelector"
  19. clientSelectorThreadSize = 1
  20. clientWorkerThreadPrefix = "NettyClientWorkerThread"
  21. # netty boss thread size,will not be used for UDT
  22. bossThreadSize = 1
  23. #auto default pin or 8
  24. workerThreadSize = "default"
  25. }
  26. shutdown {
  27. # when destroy server, wait seconds
  28. wait = 3
  29. }
  30. serialization = "seata"
  31. compressor = "none"
  32. }
  33. service {
  34. #transaction service group mapping
  35. # order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置一致
  36. # “seata-server” 与 TC 服务器的注册名一致
  37. # 从eureka获取seata-server的地址,再向seata-server注册自己,设置group
  38. vgroupMapping.order_tx_group = "seata-server"
  39. #only support when registry.type=file, please don't set multiple addresses
  40. order_tx_group.grouplist = "127.0.0.1:8091"
  41. #degrade, current not support
  42. enableDegrade = false
  43. #disable seata
  44. disableGlobalTransaction = false
  45. }
  46. client {
  47. rm {
  48. asyncCommitBufferLimit = 10000
  49. lock {
  50. retryInterval = 10
  51. retryTimes = 30
  52. retryPolicyBranchRollbackOnConflict = true
  53. }
  54. reportRetryCount = 5
  55. tableMetaCheckEnable = false
  56. reportSuccessEnable = false
  57. }
  58. tm {
  59. commitRetryCount = 5
  60. rollbackRetryCount = 5
  61. }
  62. undo {
  63. dataValidation = true
  64. logSerialization = "jackson"
  65. logTable = "undo_log"
  66. }
  67. log {
  68. exceptionRate = 100
  69. }
  70. }
  1. 创建自动配置类,创建DatasourceProxy数据源代理对象

    1. @Configuration
    2. public class DataSourceAutoConfiguration {
    3. /**
    4. * 新建原始数据源对象
    5. */
    6. @Bean
    7. @ConfigurationProperties(prefix = "spring.datasource")
    8. public DataSource dataSource() {
    9. return new HikariDataSource();
    10. }
    11. /**
    12. * 新建数据源代理对象
    13. * Primary注解设置首选对象
    14. */
    15. @Primary
    16. @Bean
    17. public DataSource dataSourceProxy(DataSource dataSource) {
    18. return new DataSourceProxy(dataSource);
    19. }
    20. }
    • AT事务的自动事务处理代码,由数据源代理提供
    • 禁用spring默认数据源配置,使用自定义的数据源配置(DataSourceAutoConfiguration)

禁用exclude = DataSourceAutoConfiguration.class

  1. @EnableFeignClients
  2. @SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
  3. public class OrderApplication {
  4. public static void main(String[] args) {
  5. SpringApplication.run(OrderApplication.class, args);
  6. }
  7. }
  1. 在业务方法上添加事务注解

    • spring本地事务添加@Transactionel
    • seata全局事务@GlobalTractionel

      1. @Transactional
      2. @GlobalTransactional
      3. @Override
      4. public void createOrder(Order order) {
      5. // TODO:远程调用发号器,生成订单id
      6. String id = easyIdClient.nextId("order_business");
      7. orderMapper.createOrder(order.setId(Long.valueOf(id)));
      8. // TODO:远程调用库存服务,减少库存
      9. //storageClient.decrease(order.getProductId(), order.getCount());
      10. // TODO:远程调用账户服务,减少账户金额
      11. //accountClient.decrease(order.getUserId(), order.getMoney());
      12. }
  2. 对库存(Storage)和账户(account)服务执行上述操作并访问:http://localhost:8083/create?userId=1&productId=1&count=10&money=100 进行测试

    TCC事务

    TCC是Try、Confirm、Cancel三个词语的缩写,TCC要求每个分支事务实现三个操作 :预处理Try、确认Confirm、撤销Cancel。Try操作做业务检查及资源预留,Confirm做业务确认操作,Cancel实现一个与Try相反的操作既回滚操作。TM首先发起所有的分支事务的try操作,任何一个分支事务的try操作执行失败,TM将会发起所有分支事务的Cancel操作,若try操作全部成功,TM将会发起所有分支事务的Confirm操作,其中Confirm/Cancel操作若执行失败,TM会进行重试。


  • TCC事务对代码侵入严重

每个阶段的数据操作都要自己进行编码来实现,事务框架无法自动处理。

  • TCC 效率更高
    不必对数据加全局锁,允许多个事务同时操作数据。
  • 底层数据库表结构需要进行修改

AT事务适用于大部分业务场景,在一些复杂情况下,自动事务无法进行自动处理,需要手动处理事务

TCC事务两个阶段的三个操作

  1. 一阶段
    • Try—预留资源,冻结数据
  2. 二阶段

    • Confirm—确认资源,使用冻结数据完成业务处理
    • Cancel—回滚,取消预留资源,将之前冻结的数据回滚(撤销,恢复)

      案例

  3. 新建工程导入无事务版本项目

  4. 添加tcc事务
  • 添加seata依赖
  • 修改三个配置文件

    • application.yml——添加事务组 ```yaml spring: application: name: storage

    datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/seata_storage?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8 username: root password: root

    cloud: alibaba:

    1. seata:
    2. tx-service-group: order_tx_group

    ```

    • file.conf——事务组对应协调器
    • registry.conf——添加注册中心地址
  1. 修改Mapper,添加TCC数据库操作

dao接口

  1. @Mapper
  2. public interface OrderMapper extends BaseMapper<Order> {
  3. /**
  4. * 新建订单
  5. *
  6. * @param order 订单信息
  7. */
  8. void createOrder(Order order);
  9. /**
  10. * 创建冻结订单
  11. *
  12. * @param order 订单信息
  13. */
  14. void createFrozen(Order order);
  15. /**
  16. * 修改订单状态
  17. *
  18. * @param orderId 订单编号
  19. * @param status 订单状态
  20. */
  21. void updateStatus(Long orderId, Integer status);
  22. //删除订单使用Mybatis-Plus
  23. }

mapper

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <!DOCTYPE mapper
  3. PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
  4. "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
  5. <mapper namespace="cn.tedu.order.mapper.OrderMapper">
  6. <resultMap id="BaseResultMap" type="Order">
  7. <id column="id" property="id" jdbcType="BIGINT"/>
  8. <result column="user_id" property="userId" jdbcType="BIGINT"/>
  9. <result column="product_id" property="productId" jdbcType="BIGINT"/>
  10. <result column="count" property="count" jdbcType="INTEGER"/>
  11. <result column="money" property="money" jdbcType="DECIMAL"/>
  12. <result column="status" property="status" jdbcType="INTEGER"/>
  13. </resultMap>
  14. <insert id="createOrder">
  15. INSERT INTO `order` (`id`, `user_id`, `product_id`, `count`, `money`, `status`)
  16. VALUES (#{id}, #{userId}, #{productId}, #{count}, #{money}, 1);
  17. </insert>
  18. <insert id="createFrozen">
  19. INSERT INTO `order` (`id`, `user_id`, `product_id`, `count`, `money`, `status`)
  20. VALUES (#{id}, #{userId}, #{productId}, #{count}, #{money}, 0);
  21. </insert>
  22. <update id="updateStatus">
  23. update `order`
  24. set status = #{status}
  25. where id = #{orderId}
  26. </update>
  27. <delete id="deleteById">
  28. delete
  29. from `order`
  30. where id = #{orederId}
  31. </delete>
  32. </mapper>
  1. 按照seata tcc的实现规则,定义TccAction接口和实现

    • 添加三个方法,实现TCC三个操作

      1. @LocalTCC
      2. public interface OrderTccAction {
      3. /**
      4. * 第一阶段的try
      5. * TwoPhaseBusinessAction注解用于定义一个TCC接口,添加在try方法上
      6. *
      7. * @param businessActionContext 业务操作上下文对象,用于在两个阶段进行数据传递
      8. * @param id 订单编号
      9. * @param userId 用户id
      10. * @param productId 产品id
      11. * @param count 产品数量
      12. * @param money 金额
      13. * @return 是否成功
      14. */
      15. @TwoPhaseBusinessAction(name = "OrderTccAction")
      16. boolean prepare(BusinessActionContext businessActionContext,
      17. @BusinessActionContextParameter(paramName = "orderId") Long id,
      18. Long userId,
      19. Long productId,
      20. Integer count,
      21. BigDecimal money);
      22. /**
      23. * 第二阶段的confirm
      24. *
      25. * @param businessActionContext 业务操作上下文对象,用于在两个阶段进行数据传递
      26. * @return 是否成功
      27. */
      28. boolean commit(BusinessActionContext businessActionContext);
      29. /**
      30. * 第二阶段的cancel
      31. *
      32. * @param businessActionContext 业务操作上下文对象,用于在两个阶段进行数据传递
      33. * @return 是否成功
      34. */
      35. boolean rollback(BusinessActionContext businessActionContext);
      36. }
    • 三个方法添加@Transactional注解控制本地事务

      1. @Component
      2. public class OrderTccActionImpl implements OrderTccAction {
      3. private final OrderMapper orderMapper;
      4. @Autowired
      5. public OrderTccActionImpl(OrderMapper orderMapper) {
      6. this.orderMapper = orderMapper;
      7. }
      8. @Transactional
      9. @Override
      10. public boolean prepare(BusinessActionContext businessActionContext,
      11. Long id, Long userId,
      12. Long productId, Integer count, BigDecimal money) {
      13. orderMapper.createOrder(new Order(id, userId, productId, count, money, 0));
      14. return true;
      15. }
      16. @Transactional
      17. @Override
      18. public boolean commit(BusinessActionContext businessActionContext) {
      19. Long orderId = Long.valueOf(businessActionContext.getActionContext("orderId").toString());
      20. orderMapper.updateStatus(orderId, 1);
      21. return true;
      22. }
      23. @Transactional
      24. @Override
      25. public boolean rollback(BusinessActionContext businessActionContext) {
      26. Long orderId = Long.valueOf(businessActionContext.getActionContext("orderId").toString());
      27. orderMapper.deleteById(orderId);
      28. return true;
      29. }
      30. }
  2. 修改业务方法,调用TCCAction的第一阶段方法

  3. 第二阶段的方法有seata的RM组件自动调用
  4. 添加@GlobalTransactional

    修改库存服务

  5. 添加配置文件

修改yml配置,添加事务组:

  1. server:
  2. port: 8082
  3. spring:
  4. application:
  5. name: storage
  6. datasource:
  7. driver-class-name: com.mysql.cj.jdbc.Driver
  8. url: jdbc:mysql://localhost:3306/seata_storage?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8
  9. username: root
  10. password: root
  11. cloud:
  12. alibaba:
  13. seata:
  14. tx-service-group: order_tx_group
  15. eureka:
  16. client:
  17. service-url:
  18. defaultZone: http://localhost:8761/eureka
  19. instance:
  20. prefer-ip-address: true
  21. instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}
  22. mybatis-plus:
  23. type-aliases-package: cn.tedu.storage.entity
  24. mapper-locations: classpath:/mapper/*.xml
  25. configuration:
  26. map-underscore-to-camel-case: true
  27. logging:
  28. level:
  29. cn.tedu.storage.mapper: DEBUG

在resource文件夹下添加seata配置文件
registry.conf:

  1. registry {
  2. # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  3. type = "eureka"
  4. nacos {
  5. serverAddr = "localhost"
  6. namespace = ""
  7. cluster = "default"
  8. }
  9. eureka {
  10. # 连接eureka,要从注册表发现 seata-server
  11. serviceUrl = "http://localhost:8761/eureka"
  12. # application = "default"
  13. # weight = "1"
  14. }
  15. redis {
  16. serverAddr = "localhost:6379"
  17. db = "0"
  18. password = ""
  19. cluster = "default"
  20. timeout = "0"
  21. }
  22. zk {
  23. cluster = "default"
  24. serverAddr = "127.0.0.1:2181"
  25. session.timeout = 6000
  26. connect.timeout = 2000
  27. username = ""
  28. password = ""
  29. }
  30. consul {
  31. cluster = "default"
  32. serverAddr = "127.0.0.1:8500"
  33. }
  34. etcd3 {
  35. cluster = "default"
  36. serverAddr = "http://localhost:2379"
  37. }
  38. sofa {
  39. serverAddr = "127.0.0.1:9603"
  40. application = "default"
  41. region = "DEFAULT_ZONE"
  42. datacenter = "DefaultDataCenter"
  43. cluster = "default"
  44. group = "SEATA_GROUP"
  45. addressWaitTime = "3000"
  46. }
  47. file {
  48. name = "file.conf"
  49. }
  50. }
  51. config {
  52. # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
  53. type = "file"
  54. nacos {
  55. serverAddr = "localhost"
  56. namespace = ""
  57. group = "SEATA_GROUP"
  58. }
  59. consul {
  60. serverAddr = "127.0.0.1:8500"
  61. }
  62. apollo {
  63. app.id = "seata-server"
  64. apollo.meta = "http://192.168.1.204:8801"
  65. namespace = "application"
  66. }
  67. zk {
  68. serverAddr = "127.0.0.1:2181"
  69. session.timeout = 6000
  70. connect.timeout = 2000
  71. username = ""
  72. password = ""
  73. }
  74. etcd3 {
  75. serverAddr = "http://localhost:2379"
  76. }
  77. file {
  78. name = "file.conf"
  79. }
  80. }

file.conf

  1. transport {
  2. # tcp udt unix-domain-socket
  3. type = "TCP"
  4. #NIO NATIVE
  5. server = "NIO"
  6. #enable heartbeat
  7. heartbeat = true
  8. # the client batch send request enable
  9. enableClientBatchSendRequest = true
  10. #thread factory for netty
  11. threadFactory {
  12. bossThreadPrefix = "NettyBoss"
  13. workerThreadPrefix = "NettyServerNIOWorker"
  14. serverExecutorThread-prefix = "NettyServerBizHandler"
  15. shareBossWorker = false
  16. clientSelectorThreadPrefix = "NettyClientSelector"
  17. clientSelectorThreadSize = 1
  18. clientWorkerThreadPrefix = "NettyClientWorkerThread"
  19. # netty boss thread size,will not be used for UDT
  20. bossThreadSize = 1
  21. #auto default pin or 8
  22. workerThreadSize = "default"
  23. }
  24. shutdown {
  25. # when destroy server, wait seconds
  26. wait = 3
  27. }
  28. serialization = "seata"
  29. compressor = "none"
  30. }
  31. service {
  32. #transaction service group mapping
  33. # order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置一致
  34. # “seata-server” 与 TC 服务器的注册名一致
  35. # 从eureka获取seata-server的地址,再向seata-server注册自己,设置group
  36. # order_tx_group 事务组,对应使用哪个协调器
  37. # seata-server 是注册表中的服务id
  38. vgroupMapping.order_tx_group = "seata-server"
  39. #only support when registry.type=file, please don't set multiple addresses
  40. order_tx_group.grouplist = "127.0.0.1:8091"
  41. #degrade, current not support
  42. enableDegrade = false
  43. #disable seata
  44. disableGlobalTransaction = false
  45. }
  46. client {
  47. rm {
  48. asyncCommitBufferLimit = 10000
  49. lock {
  50. retryInterval = 10
  51. retryTimes = 30
  52. retryPolicyBranchRollbackOnConflict = true
  53. }
  54. reportRetryCount = 5
  55. tableMetaCheckEnable = false
  56. reportSuccessEnable = false
  57. }
  58. tm {
  59. commitRetryCount = 5
  60. rollbackRetryCount = 5
  61. }
  62. undo {
  63. dataValidation = true
  64. logSerialization = "jackson"
  65. logTable = "undo_log"
  66. }
  67. log {
  68. exceptionRate = 100
  69. }
  70. }
  1. 修改Mapper,添加TCC事务数据库操作

定义Mapper接口方法:

  1. @Mapper
  2. public interface StorageMapper extends BaseMapper<Storage> {
  3. /**
  4. * 减少库存
  5. * @param productId 商品id
  6. * @param count 要减少的商品数量
  7. */
  8. void decrease(Long productId, Integer count);
  9. /**
  10. * 查询库存,判断有无足够库存
  11. * @param productId 产品id
  12. * @return 库存信息
  13. */
  14. Storage findByProductId(Long productId);
  15. /**
  16. * 冻结库存
  17. * @param productId 产品id
  18. * @param count 数量
  19. */
  20. void updateResidueToFrozen(Long productId,Integer count);
  21. void updateFrozenToUsed(Long productId,Integer count);
  22. void updateFrozenToResidue(Long productId,Integer count);
  23. }

创建xml配置操作数据库:

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <!DOCTYPE mapper
  3. PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
  4. "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
  5. <mapper namespace="cn.tedu.storage.mapper.StorageMapper">
  6. <resultMap id="BaseResultMap" type="Storage">
  7. <id column="id" property="id" jdbcType="BIGINT"/>
  8. <result column="product_id" property="productId" jdbcType="BIGINT"/>
  9. <result column="total" property="total" jdbcType="INTEGER"/>
  10. <result column="used" property="used" jdbcType="INTEGER"/>
  11. <result column="residue" property="residue" jdbcType="INTEGER"/>
  12. </resultMap>
  13. <update id="decrease">
  14. UPDATE storage
  15. SET used = used + #{count},
  16. residue = residue - #{count}
  17. WHERE product_id = #{productId}
  18. </update>
  19. <select id="findByProductId" resultMap="BaseResultMap">
  20. select *
  21. from storage
  22. where id = #{productId};
  23. </select>
  24. <update id="updateResidueToFrozen">
  25. update storage
  26. set residue = residue - #{count},
  27. frozen = frozen + #{count}
  28. where product_id = #{productId}
  29. </update>
  30. <update id="updateFrozenToUsed">
  31. update storage
  32. set frozen = frozen - #{count},
  33. used = used + #{count}
  34. where product_id = #{productId}
  35. </update>
  36. <update id="updateFrozenToResidue">
  37. update storage
  38. set frozen = frozen - #{count},
  39. residue = residue + #{count}
  40. where product_id = #{productId}
  41. </update>
  42. </mapper>
  1. 定义TCC接口和实现

定义TCC接口:

  1. @LocalTCC
  2. public interface StorageTccAction {
  3. @TwoPhaseBusinessAction(name = "StorageTccAction")
  4. boolean prepare(BusinessActionContext businessActionContext,
  5. @BusinessActionContextParameter(paramName = "productId") Long productId,
  6. @BusinessActionContextParameter(paramName = "count") Integer count);
  7. boolean commit(BusinessActionContext businessActionContext);
  8. boolean rollback(BusinessActionContext businessActionContext);
  9. }

定义TCC接口实现方法:

  1. @Component
  2. public class StorageTccActionImpl implements StorageTccAction {
  3. private final StorageMapper storageMapper;
  4. @Autowired
  5. public StorageTccActionImpl(StorageMapper storageMapper) {
  6. this.storageMapper = storageMapper;
  7. }
  8. @Transactional
  9. @Override
  10. public boolean prepare(BusinessActionContext businessActionContext, Long productId, Integer count) {
  11. Storage storage = storageMapper.findByProductId(productId);
  12. if (storage.getResidue() < count) {
  13. throw new RuntimeException("库存不足");
  14. }
  15. //冻结库存
  16. storageMapper.updateResidueToFrozen(productId, count);
  17. return true;
  18. }
  19. @Transactional
  20. @Override
  21. public boolean commit(BusinessActionContext businessActionContext) {
  22. Long productId = Long.valueOf(businessActionContext.getActionContext("productId").toString());
  23. Integer count = Integer.valueOf(businessActionContext.getActionContext("count").toString());
  24. storageMapper.updateFrozenToUsed(productId, count);
  25. return true;
  26. }
  27. @Transactional
  28. @Override
  29. public boolean rollback(BusinessActionContext businessActionContext) {
  30. Long productId = Long.valueOf(businessActionContext.getActionContext("productId").toString());
  31. Integer count = Integer.valueOf(businessActionContext.getActionContext("count").toString());
  32. storageMapper.updateFrozenToResidue(productId, count);
  33. return true;
  34. }
  35. }
  1. 调用TCC事务的Try方法

    1. @Service
    2. public class StorageImpl implements StorageService {
    3. private final StorageTccAction storageTccAction;
    4. @Autowired
    5. public StorageImpl(StorageTccAction storageTccAction) {
    6. this.storageTccAction = storageTccAction;
    7. }
    8. @Override
    9. public void decrease(Long productId, Integer count) {
    10. storageTccAction.prepare(null, productId, count);
    11. }
    12. }