一.SpringCloud Stream 消息驱动
1.使用MQ的场景
系统解耦 | 防止级联失败 |
---|---|
异步处理 | 节省耗时,避免链路调用耗时(支付==>优惠券,积分,短信) |
故障隔离 | 不影响主体业务 |
削峰填谷 | 存入消息队列,利用多线程去取(自己设置,可控,防止宕机) |
日志处理 | 收集,日志服务去读(保证性能,保存) |
2.MQ问题概述==>SpringCloud Stream
MQ在项目中的问题:
1、开发人员的学习成本 和项目开发的成本
2、系统和MQ之间的耦合
SpringCloud 官方出SpringCloud Stream 技术解决系统和MQ之间的耦合问题(封装了各种操作Api)
3.基本概述
Spring Cloud Stream的binder对象负责与消息中间件交互(发布-订阅、消费组、分区)
目前仅支持RabbitMQ、Kafka,或自定义
3.1、工作原理
通过binder对象完成应用程序与消息中间件解耦,只需要替换相应的binder对象即可
4.Stream消息驱动入门
4.1、快速入门
①引入依赖
②生产者配置yaml
③生产者开启@EbableBinding(Source.class)
④发消息
⑤配置接受消息:@EbableBinding(Sink.class);将配置文件中output==>input
消息对象:
4.2、自定义消息通道
①参考Source接口:修改OUTPUT值,@Output(xxx)
②配置类@EnableBinding({Source.class,MySource.class}) //添加自定义配置
③修改配置文件
④发消息
⑤接收消息
4.3、分组消息
问题:防止MQ在集群部署的情况下,分发到每个服务下,造成过量消费;下单(-1)扣减库存(-3)
bindings:
# 消息接收通道
stream_input:
destination: myexchange.message # 绑定的交换机名称
group: group1 #分组名称,解决集群消费消息问题
分组消息:在 Stream 中处于同一个 group 中的多个消费者是竞争关系,只有一个消费方
4.4、分区消息
同时有多条同一个用户的多个数据发送过来,我们需要根据用户统计;
根据hash算法使得该用户消息,落在固定的消费方
生产方配置
消费方配置
stream:
#分区配置---------------------------------------
instance-count: 2 # 消费者总数
instance-index: 1 # 当前消费者的索引【修改每个消息消费方的索引,从0开始】
#分区配置---------------------------------------
# Binder 配置项,对应 BinderProperties Map
binders:
#其它配置省略
bindings:
stream_input:
destination: myexchange.message # 绑定的交换机名称
group: group1 #分组名称,解决集群消费消息问题
consumer:
#分区配置---------------------------------------
partitioned: true # ##开启分区支持
4.5、延迟队列
安装rabbitmq-delayed-message-exchange 插件
配置:
设置过期时间:message.setHeader(“x-delay”, 5000)
应用场景:
- 订单超时未支付,30分钟后取消订单
- 添加购物车扣减库存,30分钟后删除购物车回滚库存
优点:
1、维护性增强、扩展性增强
2、MQ和系统完全解耦
3、屏蔽底层MQ使用