一.SpringCloud Stream 消息驱动

1.使用MQ的场景

系统解耦 防止级联失败
异步处理 节省耗时,避免链路调用耗时(支付==>优惠券,积分,短信)
故障隔离 不影响主体业务
削峰填谷 存入消息队列,利用多线程去取(自己设置,可控,防止宕机)
日志处理 收集,日志服务去读(保证性能,保存)

注意:上一步需要的结果,下一步需要则不能使用MQ

2.MQ问题概述==>SpringCloud Stream

MQ在项目中的问题:
1、开发人员的学习成本 和项目开发的成本
2、系统和MQ之间的耦合
SpringCloud 官方出SpringCloud Stream 技术解决系统和MQ之间的耦合问题(封装了各种操作Api)

3.基本概述

Spring Cloud Stream的binder对象负责与消息中间件交互(发布-订阅、消费组、分区)
目前仅支持RabbitMQ、Kafka,或自定义
image.png

3.1、工作原理

通过binder对象完成应用程序与消息中间件解耦,只需要替换相应的binder对象即可
image.png

4.Stream消息驱动入门

4.1、快速入门

①引入依赖
image.png
②生产者配置yaml
image.png
③生产者开启@EbableBinding(Source.class)
④发消息
image.png
⑤配置接受消息:@EbableBinding(Sink.class);将配置文件中output==>input
image.png
消息对象:
be471b4cd966ecfd3a77d9be7556288.png

4.2、自定义消息通道

①参考Source接口:修改OUTPUT值,@Output(xxx)
②配置类@EnableBinding({Source.class,MySource.class}) //添加自定义配置
③修改配置文件
④发消息
image.png
⑤接收消息
image.png
image.png

4.3、分组消息

问题:防止MQ在集群部署的情况下,分发到每个服务下,造成过量消费;下单(-1)扣减库存(-3)

  1. bindings:
  2. # 消息接收通道
  3. stream_input:
  4. destination: myexchange.message # 绑定的交换机名称
  5. group: group1 #分组名称,解决集群消费消息问题

分组消息:在 Stream 中处于同一个 group 中的多个消费者是竞争关系,只有一个消费方

4.4、分区消息

同时有多条同一个用户的多个数据发送过来,我们需要根据用户统计;
根据hash算法使得该用户消息,落在固定的消费方

生产方配置

image.png
image.png

消费方配置

  1. stream:
  2. #分区配置---------------------------------------
  3. instance-count: 2 # 消费者总数
  4. instance-index: 1 # 当前消费者的索引【修改每个消息消费方的索引,从0开始】
  5. #分区配置---------------------------------------
  6. # Binder 配置项,对应 BinderProperties Map
  7. binders:
  8. #其它配置省略
  9. bindings:
  10. stream_input:
  11. destination: myexchange.message # 绑定的交换机名称
  12. group: group1 #分组名称,解决集群消费消息问题
  13. consumer:
  14. #分区配置---------------------------------------
  15. partitioned: true # ##开启分区支持

同一分区下,两个分组也是竞争关系

4.5、延迟队列

安装rabbitmq-delayed-message-exchange 插件
配置:
设置过期时间:message.setHeader(“x-delay”, 5000) image.png
应用场景:

  • 订单超时未支付,30分钟后取消订单
  • 添加购物车扣减库存,30分钟后删除购物车回滚库存

优点:
1、维护性增强、扩展性增强
2、MQ和系统完全解耦
3、屏蔽底层MQ使用