幂等性控制
若第一阶段冻结数据失败,第二阶段的回滚操作也会执行,二阶段失败,TC会重复发送二阶段指令,模块会重复执行二阶段操作.将会导致数据库出现冻结的负数库存和增加不存在的库存数据(即冻结的负数库存)
幂等性控制就是让多次重复的操作和一次操作的结果相同.
- 在第一阶段成果设置一个第一阶段成功标记,失败则没有标记
在第二阶段执行前先进行检查标记是否存在,存在则可以执行第二阶段的提交或者回滚操作,二阶段执行成功后删除标记,没有标记则不执行第二阶段的操作.
在库存服务和订单服务中添加幂等性控制
创建幂等性工具类:
public class ResultHolder {
private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();
public static void setResult(Class<?> actionClass, String xid, String v) {
Map<String, String> results = map.get(actionClass);
if (results == null) {
synchronized (map) {
if (results == null) {
results = new ConcurrentHashMap<>();
map.put(actionClass, results);
}
}
}
results.put(xid, v);
}
public static String getResult(Class<?> actionClass, String xid) {
Map<String, String> results = map.get(actionClass);
if (results != null) {
return results.get(xid);
}
return null;
}
public static void removeResult(Class<?> actionClass, String xid) {
Map<String, String> results = map.get(actionClass);
if (results != null) {
results.remove(xid);
}
}
}
在TCC接口中添加幂等性控制
@Component
public class OrderTccActionImpl implements OrderTccAction {
private final OrderMapper orderMapper;
@Autowired
public OrderTccActionImpl(OrderMapper orderMapper) {
this.orderMapper = orderMapper;
}
@Transactional
@Override
public boolean prepare(BusinessActionContext businessActionContext,
Long id, Long userId,
Long productId, Integer count, BigDecimal money) {
orderMapper.createOrder(new Order(id, userId, productId, count, money, 0));
/*
幂等性控制
第一阶段成功时设置成功标记
*/
ResultHolder.setResult(OrderTccAction.class, businessActionContext.getXid(), "p");
return true;
}
@Transactional
@Override
public synchronized boolean commit(BusinessActionContext businessActionContext) {
//先判断标记是否存在
if (ResultHolder.getResult(OrderTccAction.class, businessActionContext.getXid()) == null) {
return true;
}
Long orderId = Long.valueOf(businessActionContext.getActionContext("orderId").toString());
orderMapper.updateStatus(orderId, 1);
//二阶段成功则删除标记
ResultHolder.removeResult(OrderTccAction.class, businessActionContext.getXid());
return true;
}
@Transactional
@Override
public synchronized boolean rollback(BusinessActionContext businessActionContext) {
/*
一阶段失败则没有成功标记,回滚不执行
一阶段成功,有成功标记,但是其他模块执行失败,可以执行回归滚,回滚完成删除标记
*/
if (ResultHolder.getResult(OrderTccAction.class, businessActionContext.getXid()) == null) {
return true;
}
Long orderId = Long.valueOf(businessActionContext.getActionContext("orderId").toString());
orderMapper.deleteById(orderId);
//二阶段成功则删除标记
ResultHolder.removeResult(OrderTccAction.class, businessActionContext.getXid());
return true;
}
}
账户服务中添加TCC事务
添加TCC事务配置文件
添加registry.conf配置文件 ```powershell registry {
file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = “eureka”
nacos { serverAddr = “localhost” namespace = “” cluster = “default” } eureka {
连接eureka,要从注册表发现 seata-server
serviceUrl = “http://localhost:8761/eureka“
application = “default”
weight = “1”
} redis { serverAddr = “localhost:6379” db = “0” password = “” cluster = “default” timeout = “0” } zk { cluster = “default” serverAddr = “127.0.0.1:2181” session.timeout = 6000 connect.timeout = 2000 username = “” password = “” } consul { cluster = “default” serverAddr = “127.0.0.1:8500” } etcd3 { cluster = “default” serverAddr = “http://localhost:2379“ } sofa { serverAddr = “127.0.0.1:9603” application = “default” region = “DEFAULT_ZONE” datacenter = “DefaultDataCenter” cluster = “default” group = “SEATA_GROUP” addressWaitTime = “3000” } file { name = “file.conf” } }
config {
file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
type = “file”
nacos { serverAddr = “localhost” namespace = “” group = “SEATA_GROUP” } consul { serverAddr = “127.0.0.1:8500” } apollo { app.id = “seata-server” apollo.meta = “http://192.168.1.204:8801“ namespace = “application” } zk { serverAddr = “127.0.0.1:2181” session.timeout = 6000 connect.timeout = 2000 username = “” password = “” } etcd3 { serverAddr = “http://localhost:2379“ } file { name = “file.conf” } }
2. 添加file.conf配置文件
```powershell
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = true
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#transaction service group mapping
# order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置一致
# “seata-server” 与 TC 服务器的注册名一致
# 从eureka获取seata-server的地址,再向seata-server注册自己,设置group
# order_tx_group 事务组,对应使用哪个协调器
# seata-server 是注册表中的服务id
vgroupMapping.order_tx_group = "seata-server"
#only support when registry.type=file, please don't set multiple addresses
order_tx_group.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
reportSuccessEnable = false
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
}
undo {
dataValidation = true
logSerialization = "jackson"
logTable = "undo_log"
}
log {
exceptionRate = 100
}
}
- 修改yml配置添加事务组 ```powershell server: port: 8081
spring: application: name: account
datasource: url: jdbc:mysql:///seata_account?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8 username: root password: root driver-class-name: com.mysql.cj.jdbc.Driver
添加事务组
cloud: alibaba: seata: tx-service-group: order_tx_group
eureka: client: service-url: defaultZone: http://localhost:8761/eureka
instance: prefer-ip-address: true instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}
mybatis-plus: mapper-locations: classpath:mapper/*.xml type-aliases-package: cn.tedu.account.entity configuration: map-underscore-to-camel-case: true
logging: level: cn.tedu.account.mapper: debug
<a name="PQZrD"></a>
### 修改Mapper添加事务处理
```java
@Mapper
public interface AccountMapper extends BaseMapper<Account> {
/**
* 扣除账户金额
*
* @param userId 用户id
* @param money 扣除金额数
*/
void decrease(Long userId, BigDecimal money);
/**
* 查询账户
* @param userId 用户id
* @return 用户账户信息
*/
Account findByUserId(Long userId);
/**
* 将可用金额冻结,对应第一阶段提交事务
* @param userId 用户id
* @param money 冻结金额数目
*/
void updateResidueToFrozen(Long userId,BigDecimal money);
/**
* 冻结金额解冻变为已使用,对应第二阶段提交
* @param userId 用户id
* @param money 已使用金额
*/
void updateFrozenToUsed(Long userId,BigDecimal money);
/**
* 冻结金额回滚至账户中,对应第二阶段回滚
* @param userId 用户id
* @param money 回滚金额
*/
void updateFrozenToResidue(Long userId,BigDecimal money);
}
xml文件如下:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.tedu.account.mapper.AccountMapper">
<resultMap id="BaseResultMap" type="Account">
<id column="id" property="id" jdbcType="BIGINT"/>
<result column="user_id" property="userId" jdbcType="BIGINT"/>
<result column="total" property="total" jdbcType="DECIMAL"/>
<result column="used" property="used" jdbcType="DECIMAL"/>
<result column="residue" property="residue" jdbcType="DECIMAL"/>
<result column="frozen" property="frozen" jdbcType="DECIMAL"/>
</resultMap>
<update id="decrease">
UPDATE account
SET residue = residue - #{money},
used = used + #{money}
where user_id = #{userId};
</update>
<select id="findByUserId" resultMap="BaseResultMap">
select *
from account
where user_id = #{userId};
</select>
<update id="updateResidueToFrozen">
update account
set residue = residue - #{money},
frozen = frozen + #{money}
where user_id = #{userId}
</update>
<update id="updateFrozenToUsed">
update account
set frozen = frozen - #{money},
used = used + #{money}
where user_id = #{userId}
</update>
<update id="updateFrozenToResidue">
update account
set frozen = frozen - #{money},
residue = residue + #{money}
where user_id = #{userId}
</update>
</mapper>
添加幂等性处理机制
public class ResultHolder {
private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();
public static void setResult(Class<?> actionClass, String xid, String v) {
Map<String, String> results = map.get(actionClass);
if (results == null) {
synchronized (map) {
if (results == null) {
results = new ConcurrentHashMap<>();
map.put(actionClass, results);
}
}
}
results.put(xid, v);
}
public static String getResult(Class<?> actionClass, String xid) {
Map<String, String> results = map.get(actionClass);
if (results != null) {
return results.get(xid);
}
return null;
}
public static void removeResult(Class<?> actionClass, String xid) {
Map<String, String> results = map.get(actionClass);
if (results != null) {
results.remove(xid);
}
}
}
添加TCC事务处理接口
@LocalTCC
public interface AccountTccAction {
@TwoPhaseBusinessAction(name = "AccountTccAction")
boolean prepare(BusinessActionContext context,
@BusinessActionContextParameter(paramName = "userId") Long userId,
@BusinessActionContextParameter(paramName = "money") BigDecimal money);
boolean commit(BusinessActionContext context);
boolean rollback(BusinessActionContext context);
}
接口实现类
@Component
public class AccountTccActionImpl implements AccountTccAction {
private final AccountMapper accountMapper;
@Autowired
public AccountTccActionImpl(AccountMapper accountMapper) {
this.accountMapper = accountMapper;
}
@Transactional
@Override
public boolean prepare(BusinessActionContext context, Long userId, BigDecimal money) {
Account account = accountMapper.findByUserId(userId);
if (account.getResidue().compareTo(money) < 0) {
throw new RuntimeException("可用金额不足");
}
accountMapper.updateResidueToFrozen(userId, money);
ResultHolder.setResult(AccountTccAction.class, context.getXid(), "p");
return true;
}
@Transactional
@Override
public synchronized boolean commit(BusinessActionContext context) {
if (ResultHolder.getResult(AccountTccAction.class, context.getXid()) == null) {
return true;
}
Long userId = Long.valueOf(context.getActionContext("userId").toString());
BigDecimal money = new BigDecimal(context.getActionContext("money").toString());
accountMapper.updateFrozenToUsed(userId, money);
ResultHolder.removeResult(AccountTccAction.class, context.getXid());
return true;
}
@Transactional
@Override
public synchronized boolean rollback(BusinessActionContext context) {
if (ResultHolder.getResult(AccountTccAction.class, context.getXid()) == null) {
return true;
}
Long userId = Long.valueOf(context.getActionContext("userId").toString());
BigDecimal money = new BigDecimal(context.getActionContext("money").toString());
accountMapper.updateFrozenToResidue(userId, money);
ResultHolder.removeResult(AccountTccAction.class, context.getXid());
return true;
}
}