Spring Cloud Stream 消息驱动组件帮助我们更快速,更方便,更友好地去构建消息驱动微服务。

Stream解决的痛点问题

MQ消息中间件广泛应用在 应用解耦合、 异步消息处理、 流量削峰等场景中。
但是,不同的MQ消息中间件内部机制、使用方式都有所不同。比如 RabbitMQ中有Exchange(交换机/交换器)的概念,kafka有Topic、Partition分区的概念。MQ消息中间件的差异性不利于我们上层的开发应用,如果系统在做MQ消息中间件的切换时,会比较困难。
Spring Cloud Stream 进行了很好的上层抽象,让我们与具体的消息中间件解耦合,屏蔽掉了底层具体MQ消息中间件的细节差异,类似Hibernate屏蔽掉具体的数据库MySQL/Oracle一样。
目前Spring Cloud Stream支持RabbitMQ和Kafka。
本质上:屏蔽掉了底层不同MQ消息中间件之间的差异,统一了MQ的编程模型,降低了学习、开发、维护MQ的成本。

Stream重要概念

inputs:相当于MQ的消息消费者Consumer;
outputs:相当于MQ的消息生产者Producer;
Binder对象:用来屏蔽底层MQ的细节,负责与具体的消息中间件交互。
应用程序通过inputs或者outputs与Spring Cloud Stream中的Binder对象交互。

image.png
Application Core: 应用服务(生产者/消费者)的业务逻辑代码。
inputs:关联消费者(相对应用服务来说,消息输入/消息消费)。
outputs:关联生产者。(相对应用程序来说,消息输出/消息生产)。
Binder:绑定器对象,用于屏蔽底层MQ差异,Stream提供不同的Binder,当需要切换MQ产品时,只需要切换Binder即可,而不需要修改任何应用逻辑(Binder绑定器的实现是框架内置的,Spring Cloud Stream目前支持RabbitMQ、Kafka两种消息队列)。

传统MQ模型与Stream消息驱动模型

传统MQ模型

image.png

Stream消息驱动模型

image.png

Stream消息通信方式及编程模型

Stream消息通信方式

在Spring Cloud Stream中消息通信方式遵循了发布-订阅模式,当一条消息被投递到消息中间件之后,会通过共享的Topic主题进行广播,消息消费者在订阅的主题中收到消息并触发自身的业务逻辑处理。

Topic:Stream中的抽象概念,用来代表发布共享消息给消费者的地方。 在RabbitMQ中,Topic对应的是Exchange,在Kafka中,Topic对应的是Topic。

Stream编程注解

注解主要是把上述结构图的组成部分上下关联起来,打通通道。
这样生产者的message数据才能进入mq,mq中的数据才能进入消费者。

注解 描述
@Input
在消费者工程中使用
注解标识输入通道。
通过该输入通道接收到的消息进入应用程序。
@Output
在生产者工程中使用
注解标识输出通道。
发布的消息通过该通道离开应用程序。
@StreamListener
在消费者工程中使用,监听message的到来
监听队列。
用于消费者的队列的消息接收(消息监听)
@EnableBinding 把Channel和Exchange/Topic绑定在一起

Stream消息驱动工程开发

cloud-stream-producer-9090 : 作为生产者端发送消息。
cloud-stream-consumer-9091 : 作为消费者端接收消息。
cloud-stream-consumer-9092 : 作为消费者端接收消息。

生产者工程

(1)新建子工程模块

cloud-parent 下新建子module: cloud-stream-producer-9090

(2)引入坐标依赖

  1. <!--eureka client 客户端依赖引⼊-->
  2. <dependency>
  3. <groupId>org.springframework.cloud</groupId>
  4. <artifactId>spring-cloud-starter-netflix-eurekaclient</artifactId>
  5. </dependency>
  6. <!--spring cloud stream 依赖(rabbit)-->
  7. <dependency>
  8. <groupId>org.springframework.cloud</groupId>
  9. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  10. </dependency>

(3)application.yml添加配置

  1. server:
  2. port: 9090
  3. spring:
  4. application:
  5. name: cloud-stream-producer
  6. cloud:
  7. stream:
  8. binders: # 绑定MQ服务信息(此处我们是RabbitMQ)
  9. lagouRabbitBinder: # 给Binder定义的名称,⽤于后⾯的关联
  10. type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka
  11. environment: # MQ环境配置(⽤户名、密码等)
  12. spring:
  13. rabbitmq:
  14. host: localhost
  15. port: 5672
  16. username: guest
  17. password: guest
  18. bindings: # 关联整合通道和binder对象
  19. output: # output是我们定义的通道名称,此处不能乱改
  20. destination: lagouExchange # 要使⽤的Exchange名称(消息队列主题名称)
  21. content-type: text/plain # application/json # 消息类型设置,⽐如json
  22. binder: lagouRabbitBinder # 关联MQ服务
  23. eureka:
  24. client:
  25. serviceUrl: # eureka server的路径
  26. defaultZone: http://lagoucloudeurekaservera:8761/eureka/,http://lagoucloudeurekaserverb:8762/eureka/
  27. instance:
  28. prefer-ip-address: true #使⽤ip注册

(4) 启动类

  1. @SpringBootApplication
  2. @EnableDiscoveryClient
  3. public class StreamProducerApplication9090 {
  4. public static void main(String[] args) {
  5. SpringApplication.run(StreamProducerApplication9090.class,args);
  6. }
  7. }

(5) 业务类开发

发送消息接口

  1. public interface IMessageProducer {
  2. public void sendMessage(String content);
  3. }

发送消息实现类

  1. // Source.class⾥⾯就是对输出通道的定义(这是Spring Cloud Stream内置的通道封装)
  2. @EnableBinding(Source.class)
  3. public class MessageProducerImpl implements IMessageProducer {
  4. // 将MessageChannel的封装对象Source注⼊到这⾥使⽤
  5. @Autowired
  6. private Source source;
  7. @Override
  8. public void sendMessage(String content) {
  9. // 向mq中发送消息(并不是直接操作mq,应该操作的是spring cloud stream)
  10. // 使⽤通道向外发出消息(指的是Source⾥⾯的output通道)
  11. source.output().send(MessageBuilder.withPayload(content).build());
  12. }
  13. }

测试类

  1. SpringBootTest(classes = {StreamProducerApplication9090.class})
  2. @RunWith(SpringJUnit4ClassRunner.class)
  3. public class MessageProducerTest {
  4. @Autowired
  5. private IMessageProducer iMessageProducer;
  6. @Test
  7. public void testSendMessage() {
  8. iMessageProducer.sendMessage("hello world-lagou101");
  9. }
  10. }

消费者工程

(1)新建子工程模块

(2)引入坐标依赖

(3)application.yml添加配置

  1. server:
  2. port: 9091
  3. spring:
  4. application:
  5. name: cloud-stream-consumer
  6. cloud:
  7. stream:
  8. binders: # 绑定MQ服务信息(此处我们是RabbitMQ)
  9. lagouRabbitBinder: # 给Binder定义的名称,⽤于后⾯的关联
  10. type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka
  11. environment: # MQ环境配置(⽤户名、密码等)
  12. spring:
  13. rabbitmq:
  14. host: localhost
  15. port: 5672
  16. username: guest
  17. password: guest
  18. bindings: # 关联整合通道和binder对象
  19. input: # input是我们定义的通道名称,此处不能乱改
  20. destination: lagouExchange # 要使⽤的Exchange名称(消息队列主题名称)
  21. content-type: text/plain # application/json # 消息类型设置,⽐如json
  22. binder: lagouRabbitBinder # 关联MQ服务
  23. eureka:
  24. client:
  25. serviceUrl: # eureka server的路径
  26. defaultZone: http://lagoucloudeurekaservera:8761/eureka/,http://lagoucloudeurekaserverb:8762/eureka/
  27. instance:
  28. prefer-ip-address: true #使⽤ip注册

(4) 消息消费者监听

  1. @EnableBinding(Sink.class)
  2. public class MessageConsumerService {
  3. @StreamListener(Sink.INPUT)
  4. public void recevieMessages(Message<String> message) {
  5. System.out.println("=========接收到的消息:" + message);
  6. }
  7. }

Stream自定义消息通道

Stream内置了两种接口Source和Sink,分别定义了binding为”input”的输入流和”output”的输出流。
可以自定义输入通道和输出通道。

定义接口

  1. interface CustomChannel {
  2. String INPUT_LOG = "inputLog";
  3. String OUTPUT_LOG = "outputLog";
  4. @Input(INPUT_LOG)
  5. SubscribableChannel inputLog();
  6. @Output(OUTPUT_LOG)
  7. MessageChannel outputLog();
  8. }

使用

(1)在@EnableBinding注解中,绑定自定义接口
(2)使用@StreamListener做监听时,指定绑定 CustomChannel.INPUT_LOG
(3)服务配置文件补充bindings信息

  1. bindings:
  2. inputLog:
  3. destination: lagouExchange # 要使⽤的Exchange名称(消息队列主题名称)
  4. outputLog:
  5. destination: eduExchange

Stream消息分组

业务场景中,希望主题上的一个Message只能被一个消费者端(多实例)消费处理,就需要通过使用消息分组的方式,来防止一个消费者端的多个实例重复消费消息。
在服务消费者端设置 spring.cloud.stream.bindings.input.group 属性,多个消费者实例配置为同一个group名称(同一个group中的消费者只能有一个可以获取到消费并消费)。
image.png