为什么要引入cloud stream?

市面上有很多消息中间件,如ActiveMQ、RabbitMQ、RocketMQ、Kafka,一个项目中不同部分也可能存在使用不同MQ的情况,因此考虑是否存在使开发人员不再关注MQ的细节,只需要用一种适配绑定的方式来自动切换各种MQ的技术。而cloud stream技术则解决了上述问题,它屏蔽底层消息中间件的差异,降低切换成本,统一了消息的编程模型。

什么是cloud stream?

一个构建消息驱动微服务的框架,应用层程序通过消息发送者与接收者与cloud stream中的binder对象交互,通过配置来binging(绑定),而binder对象负责与消息中间件交互。目前仅支持RabbitMQ和Kafka。

设计思想

遵行发布-订阅模式,通过topic主题进行广播,在RabbitMQ中是exchange,在Kafka中是topic。

cloud stream标准流程

Binder-Channel-Source/Sink
连接中间件-通道-输入/输出

编码API和常见注解

image.png

生产者

  • pom引入spring-cloud-starter-stream-rabbit、spring-boot-starter-web、spring-boot-starter-actuator
  • yml增加如下配置

image.png

  • 业务类需要配置为消息推送方(Source),MessageChannel定义消息管道,将消息通过MessageBuilder封装后使用消息管道发送。

image.png
image.png

消费者

  • pom与生产者一致
  • yml配置如下,与生产者不同之处主要是input

image.png

  • 业务类需要配置消息接收方(Sink),并在接收处增加@StreamListener(Sink.INPUT)注解,获取消息

image.png

分组消费

同一topic中的消息会被所有消费者消费(默认每一个消费者的分组group是不同的),产生重复消费的问题,为避免重复消费,需对消费者进行分组,不同的组可以消费同一个消息,组内会发生竞争关系,只有一个可以消费。

  • 如何分组

yml文件中binder下增加group
image.png