幂等性控制
若第一阶段冻结数据失败,第二阶段的回滚操作也会执行,二阶段失败,TC会重复发送二阶段指令,模块会重复执行二阶段操作.将会导致数据库出现冻结的负数库存和增加不存在的库存数据(即冻结的负数库存)
幂等性控制就是让多次重复的操作和一次操作的结果相同.
- 在第一阶段成果设置一个第一阶段成功标记,失败则没有标记
在第二阶段执行前先进行检查标记是否存在,存在则可以执行第二阶段的提交或者回滚操作,二阶段执行成功后删除标记,没有标记则不执行第二阶段的操作.
在库存服务和订单服务中添加幂等性控制
创建幂等性工具类:
public class ResultHolder {private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();public static void setResult(Class<?> actionClass, String xid, String v) {Map<String, String> results = map.get(actionClass);if (results == null) {synchronized (map) {if (results == null) {results = new ConcurrentHashMap<>();map.put(actionClass, results);}}}results.put(xid, v);}public static String getResult(Class<?> actionClass, String xid) {Map<String, String> results = map.get(actionClass);if (results != null) {return results.get(xid);}return null;}public static void removeResult(Class<?> actionClass, String xid) {Map<String, String> results = map.get(actionClass);if (results != null) {results.remove(xid);}}}
在TCC接口中添加幂等性控制
@Componentpublic class OrderTccActionImpl implements OrderTccAction {private final OrderMapper orderMapper;@Autowiredpublic OrderTccActionImpl(OrderMapper orderMapper) {this.orderMapper = orderMapper;}@Transactional@Overridepublic boolean prepare(BusinessActionContext businessActionContext,Long id, Long userId,Long productId, Integer count, BigDecimal money) {orderMapper.createOrder(new Order(id, userId, productId, count, money, 0));/*幂等性控制第一阶段成功时设置成功标记*/ResultHolder.setResult(OrderTccAction.class, businessActionContext.getXid(), "p");return true;}@Transactional@Overridepublic synchronized boolean commit(BusinessActionContext businessActionContext) {//先判断标记是否存在if (ResultHolder.getResult(OrderTccAction.class, businessActionContext.getXid()) == null) {return true;}Long orderId = Long.valueOf(businessActionContext.getActionContext("orderId").toString());orderMapper.updateStatus(orderId, 1);//二阶段成功则删除标记ResultHolder.removeResult(OrderTccAction.class, businessActionContext.getXid());return true;}@Transactional@Overridepublic synchronized boolean rollback(BusinessActionContext businessActionContext) {/*一阶段失败则没有成功标记,回滚不执行一阶段成功,有成功标记,但是其他模块执行失败,可以执行回归滚,回滚完成删除标记*/if (ResultHolder.getResult(OrderTccAction.class, businessActionContext.getXid()) == null) {return true;}Long orderId = Long.valueOf(businessActionContext.getActionContext("orderId").toString());orderMapper.deleteById(orderId);//二阶段成功则删除标记ResultHolder.removeResult(OrderTccAction.class, businessActionContext.getXid());return true;}}
账户服务中添加TCC事务
添加TCC事务配置文件
添加registry.conf配置文件 ```powershell registry {
file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = “eureka”
nacos { serverAddr = “localhost” namespace = “” cluster = “default” } eureka {
连接eureka,要从注册表发现 seata-server
serviceUrl = “http://localhost:8761/eureka“
application = “default”
weight = “1”
} redis { serverAddr = “localhost:6379” db = “0” password = “” cluster = “default” timeout = “0” } zk { cluster = “default” serverAddr = “127.0.0.1:2181” session.timeout = 6000 connect.timeout = 2000 username = “” password = “” } consul { cluster = “default” serverAddr = “127.0.0.1:8500” } etcd3 { cluster = “default” serverAddr = “http://localhost:2379“ } sofa { serverAddr = “127.0.0.1:9603” application = “default” region = “DEFAULT_ZONE” datacenter = “DefaultDataCenter” cluster = “default” group = “SEATA_GROUP” addressWaitTime = “3000” } file { name = “file.conf” } }
config {
file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
type = “file”
nacos { serverAddr = “localhost” namespace = “” group = “SEATA_GROUP” } consul { serverAddr = “127.0.0.1:8500” } apollo { app.id = “seata-server” apollo.meta = “http://192.168.1.204:8801“ namespace = “application” } zk { serverAddr = “127.0.0.1:2181” session.timeout = 6000 connect.timeout = 2000 username = “” password = “” } etcd3 { serverAddr = “http://localhost:2379“ } file { name = “file.conf” } }
2. 添加file.conf配置文件```powershelltransport {# tcp udt unix-domain-sockettype = "TCP"#NIO NATIVEserver = "NIO"#enable heartbeatheartbeat = true# the client batch send request enableenableClientBatchSendRequest = true#thread factory for nettythreadFactory {bossThreadPrefix = "NettyBoss"workerThreadPrefix = "NettyServerNIOWorker"serverExecutorThread-prefix = "NettyServerBizHandler"shareBossWorker = falseclientSelectorThreadPrefix = "NettyClientSelector"clientSelectorThreadSize = 1clientWorkerThreadPrefix = "NettyClientWorkerThread"# netty boss thread size,will not be used for UDTbossThreadSize = 1#auto default pin or 8workerThreadSize = "default"}shutdown {# when destroy server, wait secondswait = 3}serialization = "seata"compressor = "none"}service {#transaction service group mapping# order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置一致# “seata-server” 与 TC 服务器的注册名一致# 从eureka获取seata-server的地址,再向seata-server注册自己,设置group# order_tx_group 事务组,对应使用哪个协调器# seata-server 是注册表中的服务idvgroupMapping.order_tx_group = "seata-server"#only support when registry.type=file, please don't set multiple addressesorder_tx_group.grouplist = "127.0.0.1:8091"#degrade, current not supportenableDegrade = false#disable seatadisableGlobalTransaction = false}client {rm {asyncCommitBufferLimit = 10000lock {retryInterval = 10retryTimes = 30retryPolicyBranchRollbackOnConflict = true}reportRetryCount = 5tableMetaCheckEnable = falsereportSuccessEnable = false}tm {commitRetryCount = 5rollbackRetryCount = 5}undo {dataValidation = truelogSerialization = "jackson"logTable = "undo_log"}log {exceptionRate = 100}}
- 修改yml配置添加事务组 ```powershell server: port: 8081
spring: application: name: account
datasource: url: jdbc:mysql:///seata_account?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8 username: root password: root driver-class-name: com.mysql.cj.jdbc.Driver
添加事务组
cloud: alibaba: seata: tx-service-group: order_tx_group
eureka: client: service-url: defaultZone: http://localhost:8761/eureka
instance: prefer-ip-address: true instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}
mybatis-plus: mapper-locations: classpath:mapper/*.xml type-aliases-package: cn.tedu.account.entity configuration: map-underscore-to-camel-case: true
logging: level: cn.tedu.account.mapper: debug
<a name="PQZrD"></a>### 修改Mapper添加事务处理```java@Mapperpublic interface AccountMapper extends BaseMapper<Account> {/*** 扣除账户金额** @param userId 用户id* @param money 扣除金额数*/void decrease(Long userId, BigDecimal money);/*** 查询账户* @param userId 用户id* @return 用户账户信息*/Account findByUserId(Long userId);/*** 将可用金额冻结,对应第一阶段提交事务* @param userId 用户id* @param money 冻结金额数目*/void updateResidueToFrozen(Long userId,BigDecimal money);/*** 冻结金额解冻变为已使用,对应第二阶段提交* @param userId 用户id* @param money 已使用金额*/void updateFrozenToUsed(Long userId,BigDecimal money);/*** 冻结金额回滚至账户中,对应第二阶段回滚* @param userId 用户id* @param money 回滚金额*/void updateFrozenToResidue(Long userId,BigDecimal money);}
xml文件如下:
<?xml version="1.0" encoding="UTF-8" ?><!DOCTYPE mapperPUBLIC "-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd"><mapper namespace="cn.tedu.account.mapper.AccountMapper"><resultMap id="BaseResultMap" type="Account"><id column="id" property="id" jdbcType="BIGINT"/><result column="user_id" property="userId" jdbcType="BIGINT"/><result column="total" property="total" jdbcType="DECIMAL"/><result column="used" property="used" jdbcType="DECIMAL"/><result column="residue" property="residue" jdbcType="DECIMAL"/><result column="frozen" property="frozen" jdbcType="DECIMAL"/></resultMap><update id="decrease">UPDATE accountSET residue = residue - #{money},used = used + #{money}where user_id = #{userId};</update><select id="findByUserId" resultMap="BaseResultMap">select *from accountwhere user_id = #{userId};</select><update id="updateResidueToFrozen">update accountset residue = residue - #{money},frozen = frozen + #{money}where user_id = #{userId}</update><update id="updateFrozenToUsed">update accountset frozen = frozen - #{money},used = used + #{money}where user_id = #{userId}</update><update id="updateFrozenToResidue">update accountset frozen = frozen - #{money},residue = residue + #{money}where user_id = #{userId}</update></mapper>
添加幂等性处理机制
public class ResultHolder {private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();public static void setResult(Class<?> actionClass, String xid, String v) {Map<String, String> results = map.get(actionClass);if (results == null) {synchronized (map) {if (results == null) {results = new ConcurrentHashMap<>();map.put(actionClass, results);}}}results.put(xid, v);}public static String getResult(Class<?> actionClass, String xid) {Map<String, String> results = map.get(actionClass);if (results != null) {return results.get(xid);}return null;}public static void removeResult(Class<?> actionClass, String xid) {Map<String, String> results = map.get(actionClass);if (results != null) {results.remove(xid);}}}
添加TCC事务处理接口
@LocalTCCpublic interface AccountTccAction {@TwoPhaseBusinessAction(name = "AccountTccAction")boolean prepare(BusinessActionContext context,@BusinessActionContextParameter(paramName = "userId") Long userId,@BusinessActionContextParameter(paramName = "money") BigDecimal money);boolean commit(BusinessActionContext context);boolean rollback(BusinessActionContext context);}
接口实现类
@Componentpublic class AccountTccActionImpl implements AccountTccAction {private final AccountMapper accountMapper;@Autowiredpublic AccountTccActionImpl(AccountMapper accountMapper) {this.accountMapper = accountMapper;}@Transactional@Overridepublic boolean prepare(BusinessActionContext context, Long userId, BigDecimal money) {Account account = accountMapper.findByUserId(userId);if (account.getResidue().compareTo(money) < 0) {throw new RuntimeException("可用金额不足");}accountMapper.updateResidueToFrozen(userId, money);ResultHolder.setResult(AccountTccAction.class, context.getXid(), "p");return true;}@Transactional@Overridepublic synchronized boolean commit(BusinessActionContext context) {if (ResultHolder.getResult(AccountTccAction.class, context.getXid()) == null) {return true;}Long userId = Long.valueOf(context.getActionContext("userId").toString());BigDecimal money = new BigDecimal(context.getActionContext("money").toString());accountMapper.updateFrozenToUsed(userId, money);ResultHolder.removeResult(AccountTccAction.class, context.getXid());return true;}@Transactional@Overridepublic synchronized boolean rollback(BusinessActionContext context) {if (ResultHolder.getResult(AccountTccAction.class, context.getXid()) == null) {return true;}Long userId = Long.valueOf(context.getActionContext("userId").toString());BigDecimal money = new BigDecimal(context.getActionContext("money").toString());accountMapper.updateFrozenToResidue(userId, money);ResultHolder.removeResult(AccountTccAction.class, context.getXid());return true;}}
