1.MQ的应用场景
1)系统解耦
使用mq进行解耦,比如商品服务把提供的数据发送到rabbitmq中,搜索服务、订单服务、库存服务只需要从对应的队列中拉取所需要的数据即可,不需要强依赖商品服务
2)异步处理
用户发起支付之后,支付服务只需要把用户支付的消息发送到mq,然后由mq把消息发送到所订阅的服务(积分,优惠券,短信通知等服务),这一个过程极大缩短了调用链路,节省了很多时间。
注意:
是否可以将项目中所有的远程调用都改成mq形式?
不行,服务和服务之间的调用使用mq,下一个服务如果使用了上一步远程调用的结果,就会出现数据为空的现象
3)故障隔离
即使搜索服务宕机也不会影响主体业务的使用
4)削峰填谷
用户的所有请求全部都发送到mq中,通过配置mq的消费队列的数量,来达到流量削峰的作用。
5)日志处理
所有服务把日志消息发送给mq,日志处理服务订阅这些消息, 把消息放到日志处理服务进行处理。好处项目服务日志和其他服务解耦,互不影响,比如kafka最开始就是专门为了处理日志而产生的。
2、常见的几种MQ
MQ还存在的问题:
1)开发人员的学习成本和项目的开发成本比较高
2)系统和MQ之间耦合度高,后期项目更换MQ的成本高。
3、spring cloud stream介绍
是一个消息驱动微服务的框架,spring官方目前只封装了RabbitMQ和Kafka,但是其他的MQ也有自己整合进了spring cloud stream。
4、spring cloud stream原理
4.1stream的核心概念
middleware | 消息中间间,支持rabbitMQ,kafka,rocketMQ等 | |
---|---|---|
Binder | 目标绑定器,就是封装了目标中间件的包。 如果是kafka就使用 spring-cloud-stream-binder-kafka 如果是rabbitMQ就使用 spring-cloud-stream-binder-rabbit |
|
@Input | 注解标识输入通道,接收的消息将通过该通道进入应用程序 | |
@Output | 注解标识输出通道,发布的消息将通过该通道进入应用程序 | |
@StreamListener | 监听队列,消费者的队列消息接收 | |
@EnableBinding | 注解标识绑定,将信道channel 和交换机exchange 绑定在一起 |
4.2stream的工作原理
Source | 当需要发送消息时,通过source.java ,它会把我们所要发送的消息进行序列化(默认是专程JSON格式字符串),然后把这些数据发送到channel中; |
|
---|---|---|
Sink | 当需要监听消息时通过Sink.java ,它会从消息通道中获取消息,并将消息反序列化消息对象,然后交给具体的消息监听处理 |
|
Channel | 当我们向消息中间件发送消息或者监听消息时需要指定主题(topic)和消息队列名称,一旦我们需要变更主题时就需要修改消息发送或消息监听的代码。通过channel 对象,我们的业务代码只需要对应channel 就可以了,具体这个channel 对应的是哪个主题,可以在配置文件中来指定,从而实现与具体消息中间件的解耦,不需要对代码做任何修改 |
|
Binder | 通过不同的binder 可以实现与不同的消息中间件整合,binder 提供统一的消息接收和发送接口,可以根据实际生产中所部署的消息中间件来调整我们的配置 |
5、分组消息
当出现多个消息消费者会出现什么样的问题?
如果有多个消息消费者,那么消息生产者发送的消息会被多个消费者都接收到。比如:下单扣减库存时,库存系统做集群部署,每个库存服务器都会从rabbitMQ中获取订单消息,如果一个订单消息同时被两个服务消费,系统肯定会出现问题。
有什么解决方案?
Stream提供了消息分组来解决。在sream中处于同一个分组中的多个消费者是竞争关系,就能够保证消息只会被其中一个消息消费者消费。通过spring.cloud.sream.bindings.<bindingName>.group
配置来指定组名。
6、分区消息
场景:同时有多条同一个用户的数据发送过来,我们需要根据用户统计,但是消息被分散到不同的集群节点上了,应该怎么办?
解决方案:设置消息分区,注意点:同一个消息区下可以设置消息分组
# 消息生产方配置文件添加分区支持
bindings:
my_output:
destination: myexchange.message # 绑定的交换机名称
# producer: # 参考源码 ProducerProperties 配置
# partition-key-expression: payload # 配置分区键的表达式规则
# partition-count: 2 # 配置消息分区的数量
# required-groups: #发送到的消费者分组,默认为空
producer:
partition-key-expression: headers['pkey'] # 消息转发的规则 pkey%2 = 0 1
partition-count: 2 #分区数量
required-groups: #分组名称
- group1
# 消息消费方添加分区配置
stream:
instance-count: 2 # 消费者总数
instance-index: 1 # 当前消费者的索引【修改每个消息消费方的索引,从0开始】
# Binder 配置项,对应 BinderProperties Map
binders:
#其它配置省略
bindings:
stream_input:
destination: myexchange.message # 绑定的交换机名称
group: group1 #分组名称,解决集群消费消息问题
consumer:
partitioned: true # 开启分区支持
7.延迟队列
mq本身不支持延迟队列,是通过delayed-message-exchange插件来支持的。
# 配置文件开启延迟消息
spring:
rabbitmq:
host: 192.168.200.129 # 服务器 IP
port: 5672 # 服务器端口
username: admin # 用户名
password: pass # 密码
virtual-host: / # 虚拟主机地址
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
binders:
rabbitmq:
type: rabbit # 设置 Binder 的类型
environment: # 设置 Binder 的环境配置
# 如果是 RabbitMQ 类型的时候,则对应的是 RabbitProperties 类
spring:
rabbitmq:
host: 192.168.200.129 # 服务器 IP
port: 5672 # 服务器端口
username: admin # 用户名
password: pass # 密码
virtual-host: / # 虚拟主机地址
bindings:
# 消息接收通道
delay_input:
destination: delay_exchange.message # 绑定的延迟交换机名称
group: group1 #分组名称,解决集群消费消息问题
rabbit:
bindings:
delay_input:
# RabbitMQ Producer 配置项,对应 RabbitProducerProperties 类
consumer:
delayed-exchange: true # 是否使用 x-delayed-message 类型的 Exchange,即延迟消息,默认为 false
8、购物车优化
目前项目购物车还存在哪些问题?
餐掌柜项目中添加购物车会扣减菜品的库存,如果消费者一直没有下单,会导致菜品库存占用,其他消费者无法完成菜品的下单,会影响商家的利益。
解决方案有两种:
1)改变库存扣减时机:用户在下单的时立刻完成库存的扣减
2)给购物车设置过期时间,到期后立刻删除购物车菜品数据并回滚菜品库存(项目选择的方案)
购物车优化流程:
1)当客户将菜品添加到购物车成功时,异步给rabbitmq发送延迟消息,延时时间30分钟内有效(cartSource);
2)在CartSinkListener类中监听延迟消息;
3)接收到延迟消息后解析消息内容获取消息中菜品id和订单编号,回滚数据库菜品库存,回滚redis中的菜品库存,清除购物车对应的菜品。
目前购物车是否还存在问题?
存在。当客户在30分钟的有效期内对购物车的商品进行下单操作了怎么办,有效期过了,当前的方案还是会回滚数据库的菜品库存和回滚redis中的菜品库存,还是会导致超卖现象的存在。
解决方案:
在CartSinkListener类中的onMessage( )方法中,判断当前redis中的购物车的是否还存在该订单,不存在直接return,如果存在继续进行下一步。