为什么要引入cloud stream?
市面上有很多消息中间件,如ActiveMQ、RabbitMQ、RocketMQ、Kafka,一个项目中不同部分也可能存在使用不同MQ的情况,因此考虑是否存在使开发人员不再关注MQ的细节,只需要用一种适配绑定的方式来自动切换各种MQ的技术。而cloud stream技术则解决了上述问题,它屏蔽底层消息中间件的差异,降低切换成本,统一了消息的编程模型。
什么是cloud stream?
一个构建消息驱动微服务的框架,应用层程序通过消息发送者与接收者与cloud stream中的binder对象交互,通过配置来binging(绑定),而binder对象负责与消息中间件交互。目前仅支持RabbitMQ和Kafka。
设计思想
遵行发布-订阅模式,通过topic主题进行广播,在RabbitMQ中是exchange,在Kafka中是topic。
cloud stream标准流程
Binder-Channel-Source/Sink
连接中间件-通道-输入/输出
编码API和常见注解
生产者
- pom引入spring-cloud-starter-stream-rabbit、spring-boot-starter-web、spring-boot-starter-actuator
- yml增加如下配置

- 业务类需要配置为消息推送方(Source),MessageChannel定义消息管道,将消息通过MessageBuilder封装后使用消息管道发送。
消费者
- pom与生产者一致
- yml配置如下,与生产者不同之处主要是input

- 业务类需要配置消息接收方(Sink),并在接收处增加@StreamListener(Sink.INPUT)注解,获取消息
分组消费
同一topic中的消息会被所有消费者消费(默认每一个消费者的分组group是不同的),产生重复消费的问题,为避免重复消费,需对消费者进行分组,不同的组可以消费同一个消息,组内会发生竞争关系,只有一个可以消费。
- 如何分组
yml文件中binder下增加group

