幂等性控制

若第一阶段冻结数据失败,第二阶段的回滚操作也会执行,二阶段失败,TC会重复发送二阶段指令,模块会重复执行二阶段操作.将会导致数据库出现冻结的负数库存和增加不存在的库存数据(即冻结的负数库存)


幂等性控制就是让多次重复的操作和一次操作的结果相同.

  • 在第一阶段成果设置一个第一阶段成功标记,失败则没有标记
  • 在第二阶段执行前先进行检查标记是否存在,存在则可以执行第二阶段的提交或者回滚操作,二阶段执行成功后删除标记,没有标记则不执行第二阶段的操作.

    在库存服务和订单服务中添加幂等性控制

    创建幂等性工具类:

    1. public class ResultHolder {
    2. private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();
    3. public static void setResult(Class<?> actionClass, String xid, String v) {
    4. Map<String, String> results = map.get(actionClass);
    5. if (results == null) {
    6. synchronized (map) {
    7. if (results == null) {
    8. results = new ConcurrentHashMap<>();
    9. map.put(actionClass, results);
    10. }
    11. }
    12. }
    13. results.put(xid, v);
    14. }
    15. public static String getResult(Class<?> actionClass, String xid) {
    16. Map<String, String> results = map.get(actionClass);
    17. if (results != null) {
    18. return results.get(xid);
    19. }
    20. return null;
    21. }
    22. public static void removeResult(Class<?> actionClass, String xid) {
    23. Map<String, String> results = map.get(actionClass);
    24. if (results != null) {
    25. results.remove(xid);
    26. }
    27. }
    28. }

    在TCC接口中添加幂等性控制

    1. @Component
    2. public class OrderTccActionImpl implements OrderTccAction {
    3. private final OrderMapper orderMapper;
    4. @Autowired
    5. public OrderTccActionImpl(OrderMapper orderMapper) {
    6. this.orderMapper = orderMapper;
    7. }
    8. @Transactional
    9. @Override
    10. public boolean prepare(BusinessActionContext businessActionContext,
    11. Long id, Long userId,
    12. Long productId, Integer count, BigDecimal money) {
    13. orderMapper.createOrder(new Order(id, userId, productId, count, money, 0));
    14. /*
    15. 幂等性控制
    16. 第一阶段成功时设置成功标记
    17. */
    18. ResultHolder.setResult(OrderTccAction.class, businessActionContext.getXid(), "p");
    19. return true;
    20. }
    21. @Transactional
    22. @Override
    23. public synchronized boolean commit(BusinessActionContext businessActionContext) {
    24. //先判断标记是否存在
    25. if (ResultHolder.getResult(OrderTccAction.class, businessActionContext.getXid()) == null) {
    26. return true;
    27. }
    28. Long orderId = Long.valueOf(businessActionContext.getActionContext("orderId").toString());
    29. orderMapper.updateStatus(orderId, 1);
    30. //二阶段成功则删除标记
    31. ResultHolder.removeResult(OrderTccAction.class, businessActionContext.getXid());
    32. return true;
    33. }
    34. @Transactional
    35. @Override
    36. public synchronized boolean rollback(BusinessActionContext businessActionContext) {
    37. /*
    38. 一阶段失败则没有成功标记,回滚不执行
    39. 一阶段成功,有成功标记,但是其他模块执行失败,可以执行回归滚,回滚完成删除标记
    40. */
    41. if (ResultHolder.getResult(OrderTccAction.class, businessActionContext.getXid()) == null) {
    42. return true;
    43. }
    44. Long orderId = Long.valueOf(businessActionContext.getActionContext("orderId").toString());
    45. orderMapper.deleteById(orderId);
    46. //二阶段成功则删除标记
    47. ResultHolder.removeResult(OrderTccAction.class, businessActionContext.getXid());
    48. return true;
    49. }
    50. }

    账户服务中添加TCC事务

    添加TCC事务配置文件

  1. 添加registry.conf配置文件 ```powershell registry {

    file 、nacos 、eureka、redis、zk、consul、etcd3、sofa

    type = “eureka”

    nacos { serverAddr = “localhost” namespace = “” cluster = “default” } eureka {

    连接eureka,要从注册表发现 seata-server

    serviceUrl = “http://localhost:8761/eureka

    application = “default”

    weight = “1”

    } redis { serverAddr = “localhost:6379” db = “0” password = “” cluster = “default” timeout = “0” } zk { cluster = “default” serverAddr = “127.0.0.1:2181” session.timeout = 6000 connect.timeout = 2000 username = “” password = “” } consul { cluster = “default” serverAddr = “127.0.0.1:8500” } etcd3 { cluster = “default” serverAddr = “http://localhost:2379“ } sofa { serverAddr = “127.0.0.1:9603” application = “default” region = “DEFAULT_ZONE” datacenter = “DefaultDataCenter” cluster = “default” group = “SEATA_GROUP” addressWaitTime = “3000” } file { name = “file.conf” } }

config {

file、nacos 、apollo、zk、consul、etcd3、springCloudConfig

type = “file”

nacos { serverAddr = “localhost” namespace = “” group = “SEATA_GROUP” } consul { serverAddr = “127.0.0.1:8500” } apollo { app.id = “seata-server” apollo.meta = “http://192.168.1.204:8801“ namespace = “application” } zk { serverAddr = “127.0.0.1:2181” session.timeout = 6000 connect.timeout = 2000 username = “” password = “” } etcd3 { serverAddr = “http://localhost:2379“ } file { name = “file.conf” } }

  1. 2. 添加file.conf配置文件
  2. ```powershell
  3. transport {
  4. # tcp udt unix-domain-socket
  5. type = "TCP"
  6. #NIO NATIVE
  7. server = "NIO"
  8. #enable heartbeat
  9. heartbeat = true
  10. # the client batch send request enable
  11. enableClientBatchSendRequest = true
  12. #thread factory for netty
  13. threadFactory {
  14. bossThreadPrefix = "NettyBoss"
  15. workerThreadPrefix = "NettyServerNIOWorker"
  16. serverExecutorThread-prefix = "NettyServerBizHandler"
  17. shareBossWorker = false
  18. clientSelectorThreadPrefix = "NettyClientSelector"
  19. clientSelectorThreadSize = 1
  20. clientWorkerThreadPrefix = "NettyClientWorkerThread"
  21. # netty boss thread size,will not be used for UDT
  22. bossThreadSize = 1
  23. #auto default pin or 8
  24. workerThreadSize = "default"
  25. }
  26. shutdown {
  27. # when destroy server, wait seconds
  28. wait = 3
  29. }
  30. serialization = "seata"
  31. compressor = "none"
  32. }
  33. service {
  34. #transaction service group mapping
  35. # order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置一致
  36. # “seata-server” 与 TC 服务器的注册名一致
  37. # 从eureka获取seata-server的地址,再向seata-server注册自己,设置group
  38. # order_tx_group 事务组,对应使用哪个协调器
  39. # seata-server 是注册表中的服务id
  40. vgroupMapping.order_tx_group = "seata-server"
  41. #only support when registry.type=file, please don't set multiple addresses
  42. order_tx_group.grouplist = "127.0.0.1:8091"
  43. #degrade, current not support
  44. enableDegrade = false
  45. #disable seata
  46. disableGlobalTransaction = false
  47. }
  48. client {
  49. rm {
  50. asyncCommitBufferLimit = 10000
  51. lock {
  52. retryInterval = 10
  53. retryTimes = 30
  54. retryPolicyBranchRollbackOnConflict = true
  55. }
  56. reportRetryCount = 5
  57. tableMetaCheckEnable = false
  58. reportSuccessEnable = false
  59. }
  60. tm {
  61. commitRetryCount = 5
  62. rollbackRetryCount = 5
  63. }
  64. undo {
  65. dataValidation = true
  66. logSerialization = "jackson"
  67. logTable = "undo_log"
  68. }
  69. log {
  70. exceptionRate = 100
  71. }
  72. }
  1. 修改yml配置添加事务组 ```powershell server: port: 8081

spring: application: name: account

datasource: url: jdbc:mysql:///seata_account?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8 username: root password: root driver-class-name: com.mysql.cj.jdbc.Driver

添加事务组

cloud: alibaba: seata: tx-service-group: order_tx_group

eureka: client: service-url: defaultZone: http://localhost:8761/eureka

instance: prefer-ip-address: true instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}

mybatis-plus: mapper-locations: classpath:mapper/*.xml type-aliases-package: cn.tedu.account.entity configuration: map-underscore-to-camel-case: true

logging: level: cn.tedu.account.mapper: debug

  1. <a name="PQZrD"></a>
  2. ### 修改Mapper添加事务处理
  3. ```java
  4. @Mapper
  5. public interface AccountMapper extends BaseMapper<Account> {
  6. /**
  7. * 扣除账户金额
  8. *
  9. * @param userId 用户id
  10. * @param money 扣除金额数
  11. */
  12. void decrease(Long userId, BigDecimal money);
  13. /**
  14. * 查询账户
  15. * @param userId 用户id
  16. * @return 用户账户信息
  17. */
  18. Account findByUserId(Long userId);
  19. /**
  20. * 将可用金额冻结,对应第一阶段提交事务
  21. * @param userId 用户id
  22. * @param money 冻结金额数目
  23. */
  24. void updateResidueToFrozen(Long userId,BigDecimal money);
  25. /**
  26. * 冻结金额解冻变为已使用,对应第二阶段提交
  27. * @param userId 用户id
  28. * @param money 已使用金额
  29. */
  30. void updateFrozenToUsed(Long userId,BigDecimal money);
  31. /**
  32. * 冻结金额回滚至账户中,对应第二阶段回滚
  33. * @param userId 用户id
  34. * @param money 回滚金额
  35. */
  36. void updateFrozenToResidue(Long userId,BigDecimal money);
  37. }

xml文件如下:

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <!DOCTYPE mapper
  3. PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
  4. "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
  5. <mapper namespace="cn.tedu.account.mapper.AccountMapper">
  6. <resultMap id="BaseResultMap" type="Account">
  7. <id column="id" property="id" jdbcType="BIGINT"/>
  8. <result column="user_id" property="userId" jdbcType="BIGINT"/>
  9. <result column="total" property="total" jdbcType="DECIMAL"/>
  10. <result column="used" property="used" jdbcType="DECIMAL"/>
  11. <result column="residue" property="residue" jdbcType="DECIMAL"/>
  12. <result column="frozen" property="frozen" jdbcType="DECIMAL"/>
  13. </resultMap>
  14. <update id="decrease">
  15. UPDATE account
  16. SET residue = residue - #{money},
  17. used = used + #{money}
  18. where user_id = #{userId};
  19. </update>
  20. <select id="findByUserId" resultMap="BaseResultMap">
  21. select *
  22. from account
  23. where user_id = #{userId};
  24. </select>
  25. <update id="updateResidueToFrozen">
  26. update account
  27. set residue = residue - #{money},
  28. frozen = frozen + #{money}
  29. where user_id = #{userId}
  30. </update>
  31. <update id="updateFrozenToUsed">
  32. update account
  33. set frozen = frozen - #{money},
  34. used = used + #{money}
  35. where user_id = #{userId}
  36. </update>
  37. <update id="updateFrozenToResidue">
  38. update account
  39. set frozen = frozen - #{money},
  40. residue = residue + #{money}
  41. where user_id = #{userId}
  42. </update>
  43. </mapper>

添加幂等性处理机制

  1. public class ResultHolder {
  2. private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();
  3. public static void setResult(Class<?> actionClass, String xid, String v) {
  4. Map<String, String> results = map.get(actionClass);
  5. if (results == null) {
  6. synchronized (map) {
  7. if (results == null) {
  8. results = new ConcurrentHashMap<>();
  9. map.put(actionClass, results);
  10. }
  11. }
  12. }
  13. results.put(xid, v);
  14. }
  15. public static String getResult(Class<?> actionClass, String xid) {
  16. Map<String, String> results = map.get(actionClass);
  17. if (results != null) {
  18. return results.get(xid);
  19. }
  20. return null;
  21. }
  22. public static void removeResult(Class<?> actionClass, String xid) {
  23. Map<String, String> results = map.get(actionClass);
  24. if (results != null) {
  25. results.remove(xid);
  26. }
  27. }
  28. }

添加TCC事务处理接口

  1. @LocalTCC
  2. public interface AccountTccAction {
  3. @TwoPhaseBusinessAction(name = "AccountTccAction")
  4. boolean prepare(BusinessActionContext context,
  5. @BusinessActionContextParameter(paramName = "userId") Long userId,
  6. @BusinessActionContextParameter(paramName = "money") BigDecimal money);
  7. boolean commit(BusinessActionContext context);
  8. boolean rollback(BusinessActionContext context);
  9. }

接口实现类

  1. @Component
  2. public class AccountTccActionImpl implements AccountTccAction {
  3. private final AccountMapper accountMapper;
  4. @Autowired
  5. public AccountTccActionImpl(AccountMapper accountMapper) {
  6. this.accountMapper = accountMapper;
  7. }
  8. @Transactional
  9. @Override
  10. public boolean prepare(BusinessActionContext context, Long userId, BigDecimal money) {
  11. Account account = accountMapper.findByUserId(userId);
  12. if (account.getResidue().compareTo(money) < 0) {
  13. throw new RuntimeException("可用金额不足");
  14. }
  15. accountMapper.updateResidueToFrozen(userId, money);
  16. ResultHolder.setResult(AccountTccAction.class, context.getXid(), "p");
  17. return true;
  18. }
  19. @Transactional
  20. @Override
  21. public synchronized boolean commit(BusinessActionContext context) {
  22. if (ResultHolder.getResult(AccountTccAction.class, context.getXid()) == null) {
  23. return true;
  24. }
  25. Long userId = Long.valueOf(context.getActionContext("userId").toString());
  26. BigDecimal money = new BigDecimal(context.getActionContext("money").toString());
  27. accountMapper.updateFrozenToUsed(userId, money);
  28. ResultHolder.removeResult(AccountTccAction.class, context.getXid());
  29. return true;
  30. }
  31. @Transactional
  32. @Override
  33. public synchronized boolean rollback(BusinessActionContext context) {
  34. if (ResultHolder.getResult(AccountTccAction.class, context.getXid()) == null) {
  35. return true;
  36. }
  37. Long userId = Long.valueOf(context.getActionContext("userId").toString());
  38. BigDecimal money = new BigDecimal(context.getActionContext("money").toString());
  39. accountMapper.updateFrozenToResidue(userId, money);
  40. ResultHolder.removeResult(AccountTccAction.class, context.getXid());
  41. return true;
  42. }
  43. }