静态页面
Feign远程调用丢失请求头问题
Feign异步情况丢失上下文问题
订单确认页流程下单流程
锁库存逻辑
下单失败
本地事务和分布式事务
本地事务的问题
本地事务
1、事务的基本性质
2、事务的隔离级别
3、事务的传播行为
4、SpringBoot 事务关键点
分布式事务
1、为什么有分布式事务
2、CAP 定理与 BASE 理论
1、CAP 定理
http://thesecretlivesofdata.com/raft/
2、面临的问题
对于多数大型互联网应用的场景,主机众多、部署分散,而且现在的集群规模越来越大,所
以节点故障、网络故障是常态,而且要保证服务可用性达到 99.99999%(N 个 9),即保证
P 和 A,舍弃 C。
3、BASE 理论
是对 CAP 理论的延伸,思想是即使无法做到强一致性(CAP 的一致性就是强一致性),但可以采用适当的采取弱一致性,即最终一致性。 4、强一致性、弱一致性、最终一致性
3、分布式事务几种方案
1)、2PC 模式
数据库支持的 2PC【2 phase commit 二阶提交】,又叫做 XA Transactions。
MySQL 从 5.5 版本开始支持,SQL Server 2005 开始支持,Oracle 7 开始支持。
其中,XA 是一个两阶段提交协议,该协议分为以下两个阶段:
第一阶段:事务协调器要求每个涉及到事务的数据库预提交(precommit)此操作,并反映是否可以提交
第二阶段:事务协调器要求每个数据库提交数据。
其中,如果有任何一个数据库否决此次提交,那么所有数据库都会被要求回滚它们在此事务中的那部分信息。
2)、柔性事务-TCC 事务补偿型方案
刚性事务:遵循 ACID 原则,强一致性。
柔性事务:遵循 BASE 理论,最终一致性;
与刚性事务不同,柔性事务允许一定时间内,不同节点的数据不一致,但要求最终一致。
一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
二阶段 commit 行为:调用 自定义 的 commit 逻辑。
二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。
所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。
3)、柔性事务-最大努力通知型方案
按规律进行通知,不保证数据一定能通知成功,但会提供可查询操作接口进行核对。这种方案主要用在与第三方系统通讯时,比如:调用微信或支付宝支付后的支付结果通知。这种方案也是结合 MQ 进行实现,例如:通过 MQ 发送 http 请求,设置最大通知次数。达到通知次数后即不再通知。
案例:银行通知、商户通知等(各大交易业务平台间的商户通知:多次通知、查询校对、对账文件),支付宝的支付成功异步回调
4)、柔性事务-可靠消息+最终一致性方案(异步确保型)
实现:业务处理服务在业务事务提交之前,向实时消息服务请求发送消息,实时消息服务只记录消息数据,而不是真正的发送。业务处理服务在业务事务提交之后,向实时消息服务确认发送。只有在得到确认发送指令后,实时消息服务才会真正发送。
seata控制分布式事务
seata的分布式事务解决方案
消息队列流程
锁库存增强版逻辑
定时任务的时效性问题
延时队列场景
RabbitMQ延时队列(实现定时任务)
消息的TTL(Time To Live)
消息的TTL就是消息的存活时间。
RabbitMQ可以对队列和消息分别设置TTL。
对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的 设置。超过了这个时
间,我们认为这个消息就死了,称之为死信。
如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队 列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x- message-ttl属性来设置时间,两者是一样的效果。
Dead Letter Exchanges(DLX)
一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列, 一个路由可以对应很多队列。(什么是死信)
一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不 会被再次放在队列
里,被其他消费者使用。(basic.reject/ basic.nack)requeue=false
上面的消息的TTL到了,消息过期了。
队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上
Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有 消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息 被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列
延时队列实现-1
延时队列实现-2
订单服务创建队列、交换机、绑定关系
package com.atguigu.gulimall.order.config;
import com.atguigu.gulimall.order.entity.OrderEntity;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MyMQConfig {
@RabbitListener(queues = "order.release.order.queue")
public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
System.out.println("收到过期的订单信息,准备关闭订单:" + orderEntity);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
/**
* 注解@Bean注解:可以在容器中自动创建Binding、Queue、Exchange(RabbitMQ没有的情况)
* RabbitMQ只要存在,@Bean声明的属性发生变化,重启服务后也不会覆盖先前的数据,只有删除队列才行
* 所有的消息默认会先抵达延迟队列order.delay.queue,延迟一段时间后,才会抵达order.release.order.queue队列。然后才被消费掉
* @return
*/
// 创建延迟队列(过了存活时间就会变成死信,然后将信息抛给orderReleaseOrderQueue队列,orderReleaseOrderQueue队列处理的数据都是过期的信息)
@Bean
public Queue orderDelayQueue() {
Map<String, Object> arguments = new HashMap<>();
// 指定死信路由
arguments.put("x-dead-letter-exchange", "order-event-exchange");
// 指定死信路由键
arguments.put("x-dead-letter-routing-key", "order.release.order");
// 消息过期时间(毫秒)
arguments.put("x-message-ttl", 60000);
Queue orderDelayQueue = new Queue("order.delay.queue", true, false, false, arguments);
return orderDelayQueue;
}
// 创建普通队列
@Bean
public Queue orderReleaseOrderQueue() {
return new Queue("order.release.order.queue", true, false, false);
}
// 创建topic类型交换机
@Bean
public Exchange orderEventExchange() {
return new TopicExchange("order-event-exchange", true, false);
}
// 创建交换机与队列order.delay.queue的绑定关系。
// Binding(目的地,目的地类型,交换机,路由键,参数)
@Bean
public Binding orderCreateBingding() {
return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null);
}
// 创建交换机与队列order.release.order.queue的绑定关系
@Bean
public Binding orderReleaseBingding() {
return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null);
}
@Bean
public Binding orderReleaseOtherBingding() {
return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.other.#", null);
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
模拟延时队列定时关闭订单
1、模拟下单成功
2、RabbitMQ监听消息
3、发送请求
4、发送消息
5、收到消息
库存服务创建交换机、队列、绑定关系
package com.atguigu.gulimall.ware.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MyRabbitConfig {
// 使用JSON序列化机制,进行消息转换
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
// 创建topic类型交换机。路由键采用模糊匹配。TopicExchange(交换机名称,是否持久化,是否自动删除)
@Bean
public Exchange stockEventExchange() {
return new TopicExchange("stock-event-exchange", true, false);
}
// 创建普通队列。Queue(队列名称,是否持久化,是否排他(只允许单个连接它,若支持多个连接,则谁抢到消息算谁的),是否自动删除)
@Bean
public Queue stockReleaseStockQueue() {
return new Queue("stock.release.stock.queue", true, false, false);
}
// 创建延迟队列。库存锁定成功后,消息先发给延迟队列,等待消息过期后,再发给普通队列
@Bean
public Queue stockDelayQueue() {
Map<String, Object> arguments = new HashMap<>();
// 设置死信路由,表示消息过期后交给哪个交换机
arguments.put("x-dead-letter-exchange", "stock-event-exchange");
// 设置死信路由键,表示消息过期后交给哪个路由键
arguments.put("x-dead-letter-routing-key", "stock.release");
// 设置消息过期时间
arguments.put("x-message-ttl", 120000);
return new Queue("stock.delay.queue", true, false, false, arguments);
}
// 创建交换机与普通队列的绑定关系
@Bean
public Binding stockReleaseBinding() {
return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.release.#", null);
}
// 创建交换机与延迟队列的绑定关系
@Bean
public Binding stockLockedBinding() {
return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.locked", null);
}
// 第一次监听消息时,idea会连接RabbitMQ,此时才会创建RabbitMQ中没有的队列、交换机和绑定关系
// 如果没有监听消息操作,RabbitMQ中就不会创建队列、交换机和绑定关系
// 如果需要修改rabbitMQ中已存在的队列交换机,需要先删除,然后再次创建
@RabbitListener(queues = "stock.release.stock.queue") // 需要注释掉,否则此时有两个在监听stock.release.stock.queue队列,导致消息消费异常
public void listener(WareInfoEntity entity, Channel channel, Message msg) throws IOException, IOException {
System.out.println("收到过期的订单信息:准备关闭订单" + entity.getId());
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
}
}
库存自动解锁
@Service
@RabbitListener(queues = "stock.release.stock.queue")
public class StockReleaseListener {
@Autowired
WareSkuService wareSkuService;
/**
* 监听解锁库存功能
*/
@RabbitHandler
public void handleStockLockedRelease(StockLockedTo stockLockedTo, Message message, Channel channel) throws IOException {
try {
System.out.println("收到接收解锁库存的信息......");
wareSkuService.unlockStock(stockLockedTo);
// 手动确认RabbitMQ中order.release.order.queue队列的的消息(即消费这条消息)
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
System.out.println("rabbitMQ错误:"+e.getMessage());
// 只要有任何异常,回退消息。拒绝消息以后重放到队列里面,让其他服务继续消费解锁(防止因自身原因误删RabbitMQ中的消息)
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
}
/**
* 1、库存自动解锁
* 下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚,之前锁定的库存就要自动解锁
* 订单失败:锁库存失败
*
* 只要解锁库存的消息失败,一定要告诉服务解锁失败,需要手动ACK,回复消息
*
* 解锁库存
* 查询数据库关于这个订单的锁定库存信息wms_ware_order_task、wms_ware_order_task_detail
* 若有数据,即表示库存锁定成功。此时根据订单情况判断是否需要解锁库存
* 1、订单不存在,表示订单数据自身已回滚,此时必须解锁库存
* 2、订单存在,根据订单状态确认是否需要解锁库存
* 1)订单状态为已取消,此时需要解锁库存
* 2)订单状态未取消,此时不能解锁库存
* 若没有数据,表示库存锁定失败,库存已回滚,这种情况就无需解锁库存
*/
@Override
public void unlockStock(StockLockedTo stockLockedTo){
System.out.println("收到解锁库存的消息......");
StockDetailTo detail = stockLockedTo.getDetail();
Long detailId = detail.getId();
// 解锁库存
// 1.查询关于这个订单的锁定库存信息
WareOrderTaskDetailEntity orderTaskDetailEntity = orderDetailService.getById(detailId);
if (orderTaskDetailEntity != null) {
// 有锁定库存信息,即库存锁定成功,根据订单情况解锁
Long id = stockLockedTo.getId(); //库存工作单wms_ware_order_task表的Id
WareOrderTaskEntity taskEntity = wareOrderTaskService.getById(id);
String orderSn = taskEntity.getOrderSn();
// 远程调用订单服务,根据订单号获取订单实体
R r = orderFeignService.getOrderStatus(orderSn);
if (r.getCode() == 0) {
OrderVo data = r.getData(new TypeReference<OrderVo>() { });
if (data == null || data.getStatus() == 4) {
// 订单不存在(订单数据已经回滚) 或者 有订单但订单状态是已取消,才可以解锁库存
// 只有状态是1(已锁定),才能解锁
if (orderTaskDetailEntity.getLockStatus() == 1) {
unLockStock(detail.getSkuId(), detail.getWareId(), detail.getSkuNum(), detailId);
// 手动确认RabbitMQ中order.release.order.queue队列的的消息(即消费这条消息)
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
} else {
// 其它状态(包含订单成功)不解锁
// 拒绝消息以后重放到队列里面,让其他服务继续消费解锁(防止因自身原因误删RabbitMQ中的消息)
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
throw new RuntimeException("远程服务失败....."); // 需要重新解锁。监听器中已实现
}
} else {
// 若不存在锁定库存信息,即库存锁定失败,库存回滚,这种情况无需解锁
// 手动确认RabbitMQ中order.release.order.queue队列的的消息(即消费这条消息)
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
// 库存解锁,将锁定库存数stock_locked减去num。即恢复原库存
public void unLockStock(Long skuId, Long wareId, Integer num, Long taskDetailId) {
// TODO 恢复stock为原库存数
wareSkuDao.unlockStock(skuId, wareId, num);
// 更新库存工作单状态
WareOrderTaskDetailEntity entity = new WareOrderTaskDetailEntity();
entity.setId(taskDetailId);
entity.setLockStatus(2); // 2-已解锁
orderDetailService.updateById(entity);
}
订单解锁
package com.atguigu.gulimall.order.listener;
import com.atguigu.gulimall.order.entity.OrderEntity;
import com.atguigu.gulimall.order.service.OrderService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
// 监听order.release.order.queue队列中的消息(30分钟后才会到达这个队列)
@Component
@RabbitListener(queues = "order.release.order.queue")
public class OrderCloseListener {
@Autowired
OrderService orderService;
@RabbitHandler
public void listener(OrderEntity entity, Channel channel, Message msg) throws IOException {
try {
System.out.println("收到过期的订单信息,准备关闭订单:" + entity.getOrderSn());
orderService.closeOrder(entity);
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
System.out.println("订单关闭异常,库存解锁异常:" + e.getMessage());
// 拒绝消息,让其重新回到消息队列
channel.basicReject(msg.getMessageProperties().getDeliveryTag(), true);
}
}
}
/**
* 关闭订单
* @param entity
*/
@Override
public void closeOrder(OrderEntity entity) {
// 查询订单的最新状态
OrderEntity orderEntity = this.getById(entity.getId());
if (orderEntity.getStatus() == OrderStatusEnum.CREATE_NEW.getCode()) {
// 关闭订单,将订单状态修改为已取消
OrderEntity update = new OrderEntity();
update.setId(entity.getId());
update.setStatus(OrderStatusEnum.CANCLED.getCode());
this.updateById(update);
OrderTo orderTo = new OrderTo();
BeanUtils.copyProperties(orderEntity, orderTo);
try {
// 每一条消息进行日志记录(数据库保存每一条消息的详细信息)
// 定期扫描数据库将失败的消息再发送一遍
rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);
} catch (Exception e) {
// 将没法送成功的消息进行重试发送
}
}
}