分布式事务

  • 什么是分布式
    指一次大的操作由不同的小操作组成的,这些小的操作分布在不同的服务器上,分布式事务需要保证这些小操作要么全部成功,要么全部失败。从本质上来说,分布式事务就是为了保证不同数据库的数据一致性。
  • 为什么要使用分布式
    在微服务独立数据源的思想,每一个微服务都有一个或者多个数据源,虽然单机单库事务已经非常成熟,但是由于网路延迟和不可靠的客观因素,分布式事务到现在也还没有成熟的方案,对于中大型网站,特别是涉及到交易的网站,一旦将服务拆分微服务,分布式事务一定是绕不开的一个组件,通常解决分布式事务问题。
  • seata介绍
    seata是阿里出的一个分布式事务解决方案组件
    AT 模式:参见(《Seata AT 模式》 (opens new window))文档
    TCC 模式:参见(《Seata TCC 模式》 (opens new window))文档
    Saga 模式:参见(《SEATA Saga 模式》 (opens new window))文档
    XA 模式:正在开发中… 目前使用的流行度情况是:AT > TCC > Saga。因此,我们在学习Seata的时候,可以花更多精力在AT模式上,最好搞懂背后的实现原理,毕竟分布式事务涉及到数据的正确性,出问题需要快速排查定位并解决。

下载地址

Github地址:https://github.com/seata/seata/
1.4.1下载地址: https://github.com/seata/seata/releases/tag/v1.4.1

运行seata

运行bin/server-start.bat

开始使用

1,创建测试数据库

  1. # 订单数据库信息 seata_order
  2. DROP DATABASE IF EXISTS seata_order;
  3. CREATE DATABASE seata_order;
  4. DROP TABLE IF EXISTS seata_order.p_order;
  5. CREATE TABLE seata_order.p_order
  6. (
  7. id INT(11) NOT NULL AUTO_INCREMENT,
  8. user_id INT(11) DEFAULT NULL,
  9. product_id INT(11) DEFAULT NULL,
  10. amount INT(11) DEFAULT NULL,
  11. total_price DOUBLE DEFAULT NULL,
  12. status VARCHAR(100) DEFAULT NULL,
  13. add_time DATETIME DEFAULT CURRENT_TIMESTAMP,
  14. last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  15. PRIMARY KEY (id)
  16. ) ENGINE = InnoDB
  17. AUTO_INCREMENT = 1
  18. DEFAULT CHARSET = utf8mb4;
  19. DROP TABLE IF EXISTS seata_order.undo_log;
  20. CREATE TABLE seata_order.undo_log
  21. (
  22. id BIGINT(20) NOT NULL AUTO_INCREMENT,
  23. branch_id BIGINT(20) NOT NULL,
  24. xid VARCHAR(100) NOT NULL,
  25. context VARCHAR(128) NOT NULL,
  26. rollback_info LONGBLOB NOT NULL,
  27. log_status INT(11) NOT NULL,
  28. log_created DATETIME NOT NULL,
  29. log_modified DATETIME NOT NULL,
  30. PRIMARY KEY (id),
  31. UNIQUE KEY ux_undo_log (xid, branch_id)
  32. ) ENGINE = InnoDB
  33. AUTO_INCREMENT = 1
  34. DEFAULT CHARSET = utf8mb4;
  35. # 产品数据库信息 seata_product
  36. DROP DATABASE IF EXISTS seata_product;
  37. CREATE DATABASE seata_product;
  38. DROP TABLE IF EXISTS seata_product.product;
  39. CREATE TABLE seata_product.product
  40. (
  41. id INT(11) NOT NULL AUTO_INCREMENT,
  42. price DOUBLE DEFAULT NULL,
  43. stock INT(11) DEFAULT NULL,
  44. last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  45. PRIMARY KEY (id)
  46. ) ENGINE = InnoDB
  47. AUTO_INCREMENT = 1
  48. DEFAULT CHARSET = utf8mb4;
  49. DROP TABLE IF EXISTS seata_product.undo_log;
  50. CREATE TABLE seata_product.undo_log
  51. (
  52. id BIGINT(20) NOT NULL AUTO_INCREMENT,
  53. branch_id BIGINT(20) NOT NULL,
  54. xid VARCHAR(100) NOT NULL,
  55. context VARCHAR(128) NOT NULL,
  56. rollback_info LONGBLOB NOT NULL,
  57. log_status INT(11) NOT NULL,
  58. log_created DATETIME NOT NULL,
  59. log_modified DATETIME NOT NULL,
  60. PRIMARY KEY (id),
  61. UNIQUE KEY ux_undo_log (xid, branch_id)
  62. ) ENGINE = InnoDB
  63. AUTO_INCREMENT = 1
  64. DEFAULT CHARSET = utf8mb4;
  65. INSERT INTO seata_product.product (id, price, stock)
  66. VALUES (1, 10, 20);
  67. # 账户数据库信息 seata_account
  68. DROP DATABASE IF EXISTS seata_account;
  69. CREATE DATABASE seata_account;
  70. DROP TABLE IF EXISTS seata_account.account;
  71. CREATE TABLE seata_account.account
  72. (
  73. id INT(11) NOT NULL AUTO_INCREMENT,
  74. balance DOUBLE DEFAULT NULL,
  75. last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  76. PRIMARY KEY (id)
  77. ) ENGINE = InnoDB
  78. AUTO_INCREMENT = 1
  79. DEFAULT CHARSET = utf8mb4;
  80. DROP TABLE IF EXISTS seata_account.undo_log;
  81. CREATE TABLE seata_account.undo_log
  82. (
  83. id BIGINT(20) NOT NULL AUTO_INCREMENT,
  84. branch_id BIGINT(20) NOT NULL,
  85. xid VARCHAR(100) NOT NULL,
  86. context VARCHAR(128) NOT NULL,
  87. rollback_info LONGBLOB NOT NULL,
  88. log_status INT(11) NOT NULL,
  89. log_created DATETIME NOT NULL,
  90. log_modified DATETIME NOT NULL,
  91. PRIMARY KEY (id),
  92. UNIQUE KEY ux_undo_log (xid, branch_id)
  93. ) ENGINE = InnoDB
  94. AUTO_INCREMENT = 1
  95. DEFAULT CHARSET = utf8mb4;
  96. INSERT INTO seata_account.account (id, balance)
  97. VALUES (1, 50);

其中,每个库中的undo_log表,是Seata AT模式必须创建的表,主要用于分支事务的回滚。
另外,考虑到测试方便,我们插入了一条id = 1的account记录,和一条id = 1的product记录。

2,引入依赖

seata依赖

  1. <dependency>
  2. <groupId>io.seata</groupId>
  3. <artifactId>seata-spring-boot-starter</artifactId>
  4. <version>1.4.1</version>
  5. </dependency>

多数据源依赖

  1. <!-- 多数据源 -->
  2. <dependency>
  3. <groupId>com.baomidou</groupId>
  4. <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
  5. <version>${dynamic-ds.version}</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.mybatis.spring.boot</groupId>
  9. <artifactId>mybatis-spring-boot-starter</artifactId>
  10. </dependency>
  11. <dependency>
  12. <groupId>com.baomidou</groupId>
  13. <artifactId>mybatis-plus-boot-starter</artifactId>
  14. </dependency>

3,服务配置

  1. # Tomcat
  2. server:
  3. port: 9003
  4. # Spring
  5. spring:
  6. application:
  7. # 应用名称
  8. name: javaitem-demo-seata
  9. profiles:
  10. # 环境配置
  11. active: dev
  12. redis:
  13. host: localhost
  14. port: 6379
  15. password:
  16. datasource:
  17. dynamic:
  18. primary: master
  19. strict: true
  20. seata: true #开启seata代理,开启后默认每个数据源都代理,如果某个不需要代理可单独关闭
  21. seata-mode: AT #支持XA及AT模式,默认AT
  22. datasource:
  23. # 主库数据源
  24. master:
  25. driver-class-name: com.mysql.cj.jdbc.Driver
  26. url: jdbc:mysql://127.0.0.1:3306/javaitem-cloud?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
  27. username: root
  28. password: 123456
  29. seata: false
  30. # seata_order数据源
  31. order:
  32. username: root
  33. password: 123456
  34. url: jdbc:mysql://127.0.0.1:3306/seata_order?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
  35. driver-class-name: com.mysql.cj.jdbc.Driver
  36. # seata_account数据源
  37. account:
  38. username: root
  39. password: 123456
  40. url: jdbc:mysql://127.0.0.1:3306/seata_account?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
  41. driver-class-name: com.mysql.cj.jdbc.Driver
  42. # seata_product数据源
  43. product:
  44. username: root
  45. password: 123456
  46. url: jdbc:mysql://127.0.0.1:3306/seata_product?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
  47. driver-class-name: com.mysql.cj.jdbc.Driver
  48. # seata配置
  49. seata:
  50. enabled: true
  51. application-id: applicationName
  52. tx-service-group: my_test_tx_group
  53. #一定要是false
  54. enable-auto-data-source-proxy: false
  55. service:
  56. vgroup-mapping:
  57. #key与上面的tx-service-group的值对应
  58. my_test_tx_group: default
  59. grouplist:
  60. #seata-server地址仅file注册中心需要
  61. default: localhost:8091
  62. config:
  63. type: file
  64. registry:
  65. type: file
  66. # Mybatis-plus配置
  67. mybatis-plus:
  68. mapper-locations: classpath:cn/javabb/**/*Mapper.xml
  69. typeAliasesPackage: cn.javabb.**.entity
  70. global-config:
  71. id-type: 0
  72. field-strategy: 1
  73. db-column-underline: true
  74. logic-delete-value: 1
  75. logic-not-delete-value: 0
  76. configuration:
  77. map-underscore-to-camel-case: true
  78. cache-enabled: false
  79. logging.level.org.springframework.boot.autoconfigure: error
  80. # swagger配置
  81. swagger:
  82. title: seate-demo
  83. license: Powered By javabb
  84. licenseUrl: http://javabb.cn

4,测试事务回滚

  1. /**
  2. * 正常下单
  3. */
  4. @Test
  5. public void test1() {
  6. orderService.placeOrder(new PlaceOrderRequest(1, 1, 2));
  7. }
  8. /**
  9. * 模拟库存不足
  10. */
  11. @Test
  12. public void test2() {
  13. orderService.placeOrder(new PlaceOrderRequest(1, 1, 22));
  14. }
  15. /**
  16. * 模拟余额不足
  17. */
  18. @Test
  19. public void test3() {
  20. orderService.placeOrder(new PlaceOrderRequest(1, 1, 6));
  21. }

5,嵌套事务回滚

修改OrderService.java

  1. // 扣减库存并计算总价
  2. Double totalPrice = productService.reduceStock(productId, amount);
  3. // 扣减余额
  4. accountService.reduceBalance(userId, totalPrice);
  5. // 在前面的基础上直接扣掉100.0
  6. accountService.reduceBalance(userId, 100.0);

测试

  1. /**
  2. * 正常下单
  3. */
  4. @Test
  5. public void test1() {
  6. orderService.placeOrder(new PlaceOrderRequest(1, 1, 2));
  7. }

测试正常下单,发现最后的回滚状态是回到最初的数据.