1.MQ的应用场景

1)系统解耦
mq系统解耦.png
使用mq进行解耦,比如商品服务把提供的数据发送到rabbitmq中,搜索服务、订单服务、库存服务只需要从对应的队列中拉取所需要的数据即可,不需要强依赖商品服务
2)异步处理
mq异步处理.png
用户发起支付之后,支付服务只需要把用户支付的消息发送到mq,然后由mq把消息发送到所订阅的服务(积分,优惠券,短信通知等服务),这一个过程极大缩短了调用链路,节省了很多时间。
注意:
是否可以将项目中所有的远程调用都改成mq形式?
不行,服务和服务之间的调用使用mq,下一个服务如果使用了上一步远程调用的结果,就会出现数据为空的现象
3)故障隔离
imq故障隔离.png
即使搜索服务宕机也不会影响主体业务的使用
4)削峰填谷
mq削峰填谷.png
用户的所有请求全部都发送到mq中,通过配置mq的消费队列的数量,来达到流量削峰的作用。
5)日志处理
mq日志处理.png
所有服务把日志消息发送给mq,日志处理服务订阅这些消息, 把消息放到日志处理服务进行处理。好处项目服务日志和其他服务解耦,互不影响,比如kafka最开始就是专门为了处理日志而产生的。

2、常见的几种MQ

常见的几种mq的比较.png
MQ还存在的问题:
1)开发人员的学习成本和项目的开发成本比较高
2)系统和MQ之间耦合度高,后期项目更换MQ的成本高。

3、spring cloud stream介绍

是一个消息驱动微服务的框架,spring官方目前只封装了RabbitMQ和Kafka,但是其他的MQ也有自己整合进了spring cloud stream。

4、spring cloud stream原理

4.1stream的核心概念
spring cloud stream原理.png

middleware 消息中间间,支持rabbitMQ,kafka,rocketMQ等
Binder 目标绑定器,就是封装了目标中间件的包。
如果是kafka就使用spring-cloud-stream-binder-kafka
如果是rabbitMQ就使用spring-cloud-stream-binder-rabbit
@Input 注解标识输入通道,接收的消息将通过该通道进入应用程序
@Output 注解标识输出通道,发布的消息将通过该通道进入应用程序
@StreamListener 监听队列,消费者的队列消息接收
@EnableBinding 注解标识绑定,将信道channel和交换机exchange绑定在一起

4.2stream的工作原理
spring cloud stream工作原理.png

Source 当需要发送消息时,通过source.java,它会把我们所要发送的消息进行序列化(默认是专程JSON格式字符串),然后把这些数据发送到channel中;
Sink 当需要监听消息时通过Sink.java,它会从消息通道中获取消息,并将消息反序列化消息对象,然后交给具体的消息监听处理
Channel 当我们向消息中间件发送消息或者监听消息时需要指定主题(topic)和消息队列名称,一旦我们需要变更主题时就需要修改消息发送或消息监听的代码。通过channel对象,我们的业务代码只需要对应channel就可以了,具体这个channel对应的是哪个主题,可以在配置文件中来指定,从而实现与具体消息中间件的解耦,不需要对代码做任何修改
Binder 通过不同的binder可以实现与不同的消息中间件整合,binder提供统一的消息接收和发送接口,可以根据实际生产中所部署的消息中间件来调整我们的配置

5、分组消息

当出现多个消息消费者会出现什么样的问题?
如果有多个消息消费者,那么消息生产者发送的消息会被多个消费者都接收到。比如:下单扣减库存时,库存系统做集群部署,每个库存服务器都会从rabbitMQ中获取订单消息,如果一个订单消息同时被两个服务消费,系统肯定会出现问题。
有什么解决方案?
Stream提供了消息分组来解决。在sream中处于同一个分组中的多个消费者是竞争关系,就能够保证消息只会被其中一个消息消费者消费。通过spring.cloud.sream.bindings.<bindingName>.group配置来指定组名。

6、分区消息

stream分区消息.png
场景:同时有多条同一个用户的数据发送过来,我们需要根据用户统计,但是消息被分散到不同的集群节点上了,应该怎么办?
解决方案:设置消息分区,注意点:同一个消息区下可以设置消息分组

  1. # 消息生产方配置文件添加分区支持
  2. bindings:
  3. my_output:
  4. destination: myexchange.message # 绑定的交换机名称
  5. # producer: # 参考源码 ProducerProperties 配置
  6. # partition-key-expression: payload # 配置分区键的表达式规则
  7. # partition-count: 2 # 配置消息分区的数量
  8. # required-groups: #发送到的消费者分组,默认为空
  9. producer:
  10. partition-key-expression: headers['pkey'] # 消息转发的规则 pkey%2 = 0 1
  11. partition-count: 2 #分区数量
  12. required-groups: #分组名称
  13. - group1
  1. # 消息消费方添加分区配置
  2. stream:
  3. instance-count: 2 # 消费者总数
  4. instance-index: 1 # 当前消费者的索引【修改每个消息消费方的索引,从0开始】
  5. # Binder 配置项,对应 BinderProperties Map
  6. binders:
  7. #其它配置省略
  8. bindings:
  9. stream_input:
  10. destination: myexchange.message # 绑定的交换机名称
  11. group: group1 #分组名称,解决集群消费消息问题
  12. consumer:
  13. partitioned: true # 开启分区支持

7.延迟队列

mq本身不支持延迟队列,是通过delayed-message-exchange插件来支持的。

  1. # 配置文件开启延迟消息
  2. spring:
  3. rabbitmq:
  4. host: 192.168.200.129 # 服务器 IP
  5. port: 5672 # 服务器端口
  6. username: admin # 用户名
  7. password: pass # 密码
  8. virtual-host: / # 虚拟主机地址
  9. cloud:
  10. # Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
  11. stream:
  12. # Binder 配置项,对应 BinderProperties Map
  13. binders:
  14. rabbitmq:
  15. type: rabbit # 设置 Binder 的类型
  16. environment: # 设置 Binder 的环境配置
  17. # 如果是 RabbitMQ 类型的时候,则对应的是 RabbitProperties 类
  18. spring:
  19. rabbitmq:
  20. host: 192.168.200.129 # 服务器 IP
  21. port: 5672 # 服务器端口
  22. username: admin # 用户名
  23. password: pass # 密码
  24. virtual-host: / # 虚拟主机地址
  25. bindings:
  26. # 消息接收通道
  27. delay_input:
  28. destination: delay_exchange.message # 绑定的延迟交换机名称
  29. group: group1 #分组名称,解决集群消费消息问题
  30. rabbit:
  31. bindings:
  32. delay_input:
  33. # RabbitMQ Producer 配置项,对应 RabbitProducerProperties 类
  34. consumer:
  35. delayed-exchange: true # 是否使用 x-delayed-message 类型的 Exchange,即延迟消息,默认为 false

8、购物车优化

目前项目购物车还存在哪些问题?
餐掌柜项目中添加购物车会扣减菜品的库存,如果消费者一直没有下单,会导致菜品库存占用,其他消费者无法完成菜品的下单,会影响商家的利益。
解决方案有两种:
1)改变库存扣减时机:用户在下单的时立刻完成库存的扣减
2)给购物车设置过期时间,到期后立刻删除购物车菜品数据并回滚菜品库存(项目选择的方案)
购物车优化思路.png
购物车优化流程:
1)当客户将菜品添加到购物车成功时,异步给rabbitmq发送延迟消息,延时时间30分钟内有效(cartSource);
2)在CartSinkListener类中监听延迟消息;
3)接收到延迟消息后解析消息内容获取消息中菜品id和订单编号,回滚数据库菜品库存,回滚redis中的菜品库存,清除购物车对应的菜品。
目前购物车是否还存在问题?
存在。当客户在30分钟的有效期内对购物车的商品进行下单操作了怎么办,有效期过了,当前的方案还是会回滚数据库的菜品库存和回滚redis中的菜品库存,还是会导致超卖现象的存在。
解决方案:
在CartSinkListener类中的onMessage( )方法中,判断当前redis中的购物车的是否还存在该订单,不存在直接return,如果存在继续进行下一步。