静态页面

图片.png

Feign远程调用丢失请求头问题

图片.png

图片.png
图片.png

Feign异步情况丢失上下文问题

图片.png

图片.png
图片.png图片.png图片.png图片.png

订单确认页流程图片.png下单流程

图片.png图片.png

锁库存逻辑

图片.png

下单失败

图片.png本地事务和分布式事务

本地事务的问题图片.png

图片.png

图片.png本地事务

1、事务的基本性质

图片.png2、事务的隔离级别

图片.png3、事务的传播行为

图片.png图片.png

4、SpringBoot 事务关键点

图片.png
图片.png

图片.png分布式事务

1、为什么有分布式事务

图片.png2、CAP 定理与 BASE 理论

1、CAP 定理
图片.png图片.png图片.png http://thesecretlivesofdata.com/raft/

2、面临的问题

对于多数大型互联网应用的场景,主机众多、部署分散,而且现在的集群规模越来越大,所
以节点故障、网络故障是常态,而且要保证服务可用性达到 99.99999%(N 个 9),即保证
P 和 A,舍弃 C。

3、BASE 理论

是对 CAP 理论的延伸,思想是即使无法做到强一致性(CAP 的一致性就是强一致性),但可以采用适当的采取弱一致性,即最终一致性图片.png4、强一致性、弱一致性、最终一致性
图片.png

3、分布式事务几种方案

1)、2PC 模式
数据库支持的 2PC【2 phase commit 二阶提交】,又叫做 XA Transactions。
MySQL 从 5.5 版本开始支持,SQL Server 2005 开始支持,Oracle 7 开始支持。
其中,XA 是一个两阶段提交协议,该协议分为以下两个阶段:

第一阶段:事务协调器要求每个涉及到事务的数据库预提交(precommit)此操作,并反映是否可以提交
第二阶段:事务协调器要求每个数据库提交数据。
其中,如果有任何一个数据库否决此次提交,那么所有数据库都会被要求回滚它们在此事务中的那部分信息。
图片.png
图片.png

2)、柔性事务-TCC 事务补偿型方案
刚性事务:遵循 ACID 原则,强一致性。
柔性事务:遵循 BASE 理论,最终一致性;
与刚性事务不同,柔性事务允许一定时间内,不同节点的数据不一致,但要求最终一致。
图片.png
一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
二阶段 commit 行为:调用 自定义 的 commit 逻辑。
二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。
所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。
图片.png

3)、柔性事务-最大努力通知型方案
按规律进行通知,不保证数据一定能通知成功,但会提供可查询操作接口进行核对。这种方案主要用在与第三方系统通讯时,比如:调用微信或支付宝支付后的支付结果通知。这种方案也是结合 MQ 进行实现,例如:通过 MQ 发送 http 请求,设置最大通知次数。达到通知次数后即不再通知。
案例:银行通知、商户通知等(各大交易业务平台间的商户通知:多次通知、查询校对、对账文件),支付宝的支付成功异步回调

4)、柔性事务-可靠消息+最终一致性方案(异步确保型)
实现:业务处理服务在业务事务提交之前,向实时消息服务请求发送消息,实时消息服务只记录消息数据,而不是真正的发送。业务处理服务在业务事务提交之后,向实时消息服务确认发送。只有在得到确认发送指令后,实时消息服务才会真正发送。

图片.pngseata控制分布式事务

seata的分布式事务解决方案

图片.png图片.png
图片.png

消息队列流程图片.png

锁库存增强版逻辑

图片.png
图片.png

定时任务的时效性问题

图片.png

延时队列场景

图片.png

RabbitMQ延时队列(实现定时任务)

图片.png消息的TTL(Time To Live)

消息的TTL就是消息的存活时间。
RabbitMQ可以对队列和消息分别设置TTL。
对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的 设置。超过了这个时
间,我们认为这个消息就死了,称之为死信。
如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队 列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x- message-ttl属性来设置时间,两者是一样的效果。

Dead Letter Exchanges(DLX)

一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列, 一个路由可以对应很多队列。(什么是死信)
一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不 会被再次放在队列
里,被其他消费者使用。(basic.reject/ basic.nack)requeue=false
上面的消息的TTL到了,消息过期了。
队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上
Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有 消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息 被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列

延时队列实现-1

图片.png

延时队列实现-2

图片.png

图片.png

图片.png订单服务创建队列、交换机、绑定关系

  1. package com.atguigu.gulimall.order.config;
  2. import com.atguigu.gulimall.order.entity.OrderEntity;
  3. import com.rabbitmq.client.Channel;
  4. import org.springframework.amqp.core.*;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  7. import org.springframework.amqp.support.converter.MessageConverter;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.context.annotation.Configuration;
  10. import java.io.IOException;
  11. import java.util.HashMap;
  12. import java.util.Map;
  13. @Configuration
  14. public class MyMQConfig {
  15. @RabbitListener(queues = "order.release.order.queue")
  16. public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
  17. System.out.println("收到过期的订单信息,准备关闭订单:" + orderEntity);
  18. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  19. }
  20. /**
  21. * 注解@Bean注解:可以在容器中自动创建Binding、Queue、Exchange(RabbitMQ没有的情况)
  22. * RabbitMQ只要存在,@Bean声明的属性发生变化,重启服务后也不会覆盖先前的数据,只有删除队列才行
  23. * 所有的消息默认会先抵达延迟队列order.delay.queue,延迟一段时间后,才会抵达order.release.order.queue队列。然后才被消费掉
  24. * @return
  25. */
  26. // 创建延迟队列(过了存活时间就会变成死信,然后将信息抛给orderReleaseOrderQueue队列,orderReleaseOrderQueue队列处理的数据都是过期的信息)
  27. @Bean
  28. public Queue orderDelayQueue() {
  29. Map<String, Object> arguments = new HashMap<>();
  30. // 指定死信路由
  31. arguments.put("x-dead-letter-exchange", "order-event-exchange");
  32. // 指定死信路由键
  33. arguments.put("x-dead-letter-routing-key", "order.release.order");
  34. // 消息过期时间(毫秒)
  35. arguments.put("x-message-ttl", 60000);
  36. Queue orderDelayQueue = new Queue("order.delay.queue", true, false, false, arguments);
  37. return orderDelayQueue;
  38. }
  39. // 创建普通队列
  40. @Bean
  41. public Queue orderReleaseOrderQueue() {
  42. return new Queue("order.release.order.queue", true, false, false);
  43. }
  44. // 创建topic类型交换机
  45. @Bean
  46. public Exchange orderEventExchange() {
  47. return new TopicExchange("order-event-exchange", true, false);
  48. }
  49. // 创建交换机与队列order.delay.queue的绑定关系。
  50. // Binding(目的地,目的地类型,交换机,路由键,参数)
  51. @Bean
  52. public Binding orderCreateBingding() {
  53. return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null);
  54. }
  55. // 创建交换机与队列order.release.order.queue的绑定关系
  56. @Bean
  57. public Binding orderReleaseBingding() {
  58. return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null);
  59. }
  60. @Bean
  61. public Binding orderReleaseOtherBingding() {
  62. return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.other.#", null);
  63. }
  64. @Bean
  65. public MessageConverter messageConverter() {
  66. return new Jackson2JsonMessageConverter();
  67. }
  68. }

模拟延时队列定时关闭订单

1、模拟下单成功图片.png

2、RabbitMQ监听消息图片.png

3、发送请求图片.png

4、发送消息图片.png

5、收到消息图片.png

库存服务创建交换机、队列、绑定关系

  1. package com.atguigu.gulimall.ware.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.Exchange;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.amqp.core.TopicExchange;
  6. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  7. import org.springframework.amqp.support.converter.MessageConverter;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.context.annotation.Configuration;
  10. import java.util.HashMap;
  11. import java.util.Map;
  12. @Configuration
  13. public class MyRabbitConfig {
  14. // 使用JSON序列化机制,进行消息转换
  15. @Bean
  16. public MessageConverter messageConverter() {
  17. return new Jackson2JsonMessageConverter();
  18. }
  19. // 创建topic类型交换机。路由键采用模糊匹配。TopicExchange(交换机名称,是否持久化,是否自动删除)
  20. @Bean
  21. public Exchange stockEventExchange() {
  22. return new TopicExchange("stock-event-exchange", true, false);
  23. }
  24. // 创建普通队列。Queue(队列名称,是否持久化,是否排他(只允许单个连接它,若支持多个连接,则谁抢到消息算谁的),是否自动删除)
  25. @Bean
  26. public Queue stockReleaseStockQueue() {
  27. return new Queue("stock.release.stock.queue", true, false, false);
  28. }
  29. // 创建延迟队列。库存锁定成功后,消息先发给延迟队列,等待消息过期后,再发给普通队列
  30. @Bean
  31. public Queue stockDelayQueue() {
  32. Map<String, Object> arguments = new HashMap<>();
  33. // 设置死信路由,表示消息过期后交给哪个交换机
  34. arguments.put("x-dead-letter-exchange", "stock-event-exchange");
  35. // 设置死信路由键,表示消息过期后交给哪个路由键
  36. arguments.put("x-dead-letter-routing-key", "stock.release");
  37. // 设置消息过期时间
  38. arguments.put("x-message-ttl", 120000);
  39. return new Queue("stock.delay.queue", true, false, false, arguments);
  40. }
  41. // 创建交换机与普通队列的绑定关系
  42. @Bean
  43. public Binding stockReleaseBinding() {
  44. return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.release.#", null);
  45. }
  46. // 创建交换机与延迟队列的绑定关系
  47. @Bean
  48. public Binding stockLockedBinding() {
  49. return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.locked", null);
  50. }
  51. // 第一次监听消息时,idea会连接RabbitMQ,此时才会创建RabbitMQ中没有的队列、交换机和绑定关系
  52. // 如果没有监听消息操作,RabbitMQ中就不会创建队列、交换机和绑定关系
  53. // 如果需要修改rabbitMQ中已存在的队列交换机,需要先删除,然后再次创建
  54. @RabbitListener(queues = "stock.release.stock.queue") // 需要注释掉,否则此时有两个在监听stock.release.stock.queue队列,导致消息消费异常
  55. public void listener(WareInfoEntity entity, Channel channel, Message msg) throws IOException, IOException {
  56. System.out.println("收到过期的订单信息:准备关闭订单" + entity.getId());
  57. channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
  58. }
  59. }

库存自动解锁

  1. @Service
  2. @RabbitListener(queues = "stock.release.stock.queue")
  3. public class StockReleaseListener {
  4. @Autowired
  5. WareSkuService wareSkuService;
  6. /**
  7. * 监听解锁库存功能
  8. */
  9. @RabbitHandler
  10. public void handleStockLockedRelease(StockLockedTo stockLockedTo, Message message, Channel channel) throws IOException {
  11. try {
  12. System.out.println("收到接收解锁库存的信息......");
  13. wareSkuService.unlockStock(stockLockedTo);
  14. // 手动确认RabbitMQ中order.release.order.queue队列的的消息(即消费这条消息)
  15. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  16. }catch (Exception e){
  17. System.out.println("rabbitMQ错误:"+e.getMessage());
  18. // 只要有任何异常,回退消息。拒绝消息以后重放到队列里面,让其他服务继续消费解锁(防止因自身原因误删RabbitMQ中的消息)
  19. channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
  20. }
  21. }
  22. }
  1. /**
  2. * 1、库存自动解锁
  3. * 下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚,之前锁定的库存就要自动解锁
  4. * 订单失败:锁库存失败
  5. *
  6. * 只要解锁库存的消息失败,一定要告诉服务解锁失败,需要手动ACK,回复消息
  7. *
  8. * 解锁库存
  9. * 查询数据库关于这个订单的锁定库存信息wms_ware_order_task、wms_ware_order_task_detail
  10. * 若有数据,即表示库存锁定成功。此时根据订单情况判断是否需要解锁库存
  11. * 1、订单不存在,表示订单数据自身已回滚,此时必须解锁库存
  12. * 2、订单存在,根据订单状态确认是否需要解锁库存
  13. * 1)订单状态为已取消,此时需要解锁库存
  14. * 2)订单状态未取消,此时不能解锁库存
  15. * 若没有数据,表示库存锁定失败,库存已回滚,这种情况就无需解锁库存
  16. */
  17. @Override
  18. public void unlockStock(StockLockedTo stockLockedTo){
  19. System.out.println("收到解锁库存的消息......");
  20. StockDetailTo detail = stockLockedTo.getDetail();
  21. Long detailId = detail.getId();
  22. // 解锁库存
  23. // 1.查询关于这个订单的锁定库存信息
  24. WareOrderTaskDetailEntity orderTaskDetailEntity = orderDetailService.getById(detailId);
  25. if (orderTaskDetailEntity != null) {
  26. // 有锁定库存信息,即库存锁定成功,根据订单情况解锁
  27. Long id = stockLockedTo.getId(); //库存工作单wms_ware_order_task表的Id
  28. WareOrderTaskEntity taskEntity = wareOrderTaskService.getById(id);
  29. String orderSn = taskEntity.getOrderSn();
  30. // 远程调用订单服务,根据订单号获取订单实体
  31. R r = orderFeignService.getOrderStatus(orderSn);
  32. if (r.getCode() == 0) {
  33. OrderVo data = r.getData(new TypeReference<OrderVo>() { });
  34. if (data == null || data.getStatus() == 4) {
  35. // 订单不存在(订单数据已经回滚) 或者 有订单但订单状态是已取消,才可以解锁库存
  36. // 只有状态是1(已锁定),才能解锁
  37. if (orderTaskDetailEntity.getLockStatus() == 1) {
  38. unLockStock(detail.getSkuId(), detail.getWareId(), detail.getSkuNum(), detailId);
  39. // 手动确认RabbitMQ中order.release.order.queue队列的的消息(即消费这条消息)
  40. // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  41. }
  42. }
  43. } else {
  44. // 其它状态(包含订单成功)不解锁
  45. // 拒绝消息以后重放到队列里面,让其他服务继续消费解锁(防止因自身原因误删RabbitMQ中的消息)
  46. // channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
  47. throw new RuntimeException("远程服务失败....."); // 需要重新解锁。监听器中已实现
  48. }
  49. } else {
  50. // 若不存在锁定库存信息,即库存锁定失败,库存回滚,这种情况无需解锁
  51. // 手动确认RabbitMQ中order.release.order.queue队列的的消息(即消费这条消息)
  52. // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  53. }
  54. }
  1. // 库存解锁,将锁定库存数stock_locked减去num。即恢复原库存
  2. public void unLockStock(Long skuId, Long wareId, Integer num, Long taskDetailId) {
  3. // TODO 恢复stock为原库存数
  4. wareSkuDao.unlockStock(skuId, wareId, num);
  5. // 更新库存工作单状态
  6. WareOrderTaskDetailEntity entity = new WareOrderTaskDetailEntity();
  7. entity.setId(taskDetailId);
  8. entity.setLockStatus(2); // 2-已解锁
  9. orderDetailService.updateById(entity);
  10. }

订单解锁图片.png

  1. package com.atguigu.gulimall.order.listener;
  2. import com.atguigu.gulimall.order.entity.OrderEntity;
  3. import com.atguigu.gulimall.order.service.OrderService;
  4. import com.rabbitmq.client.Channel;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.stereotype.Component;
  10. import java.io.IOException;
  11. // 监听order.release.order.queue队列中的消息(30分钟后才会到达这个队列)
  12. @Component
  13. @RabbitListener(queues = "order.release.order.queue")
  14. public class OrderCloseListener {
  15. @Autowired
  16. OrderService orderService;
  17. @RabbitHandler
  18. public void listener(OrderEntity entity, Channel channel, Message msg) throws IOException {
  19. try {
  20. System.out.println("收到过期的订单信息,准备关闭订单:" + entity.getOrderSn());
  21. orderService.closeOrder(entity);
  22. channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
  23. } catch (Exception e) {
  24. System.out.println("订单关闭异常,库存解锁异常:" + e.getMessage());
  25. // 拒绝消息,让其重新回到消息队列
  26. channel.basicReject(msg.getMessageProperties().getDeliveryTag(), true);
  27. }
  28. }
  29. }
  1. /**
  2. * 关闭订单
  3. * @param entity
  4. */
  5. @Override
  6. public void closeOrder(OrderEntity entity) {
  7. // 查询订单的最新状态
  8. OrderEntity orderEntity = this.getById(entity.getId());
  9. if (orderEntity.getStatus() == OrderStatusEnum.CREATE_NEW.getCode()) {
  10. // 关闭订单,将订单状态修改为已取消
  11. OrderEntity update = new OrderEntity();
  12. update.setId(entity.getId());
  13. update.setStatus(OrderStatusEnum.CANCLED.getCode());
  14. this.updateById(update);
  15. OrderTo orderTo = new OrderTo();
  16. BeanUtils.copyProperties(orderEntity, orderTo);
  17. try {
  18. // 每一条消息进行日志记录(数据库保存每一条消息的详细信息)
  19. // 定期扫描数据库将失败的消息再发送一遍
  20. rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);
  21. } catch (Exception e) {
  22. // 将没法送成功的消息进行重试发送
  23. }
  24. }
  25. }