1.简介

1.什么是Springcloud Stream

Spring Cloud Stream是一个构建消息驱动微服务的框架。说白了就是操作MQ的,可以屏蔽底层的MQ类型。
应用程序通过inputs或者 outputs与Spring Cloud Stream中binder对象交互。所以我们通过配置来绑定(binding),而与Spring Cloud Stream的binder对象负责与消息中间件交互。所以我们只需要了解如何与Stream的binder交互就可以了,屏蔽了与底层MQ交互。
Spring Cloud Stream为一些MQ提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

2.为什么引入Stream

屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。类似于Hibernate的作用一样, 我们更多的注重业务开发。Hibernate屏蔽了数据库的差异,可以很好的实现数据库的切换。Stream屏蔽了底层MQ的区别,可以很好的实现切换。目前主流的有ActiveMQ、RocketMQ、RabbitMQ、Kafka,Stream支持的有RabbitMQ和Kafka

3.设计思想

  1. 标准的MQ
    1. 生产者/消费者通过消息媒介传递消息内容;
    2. 消息必须走特定的通道Channel;
  2. 引入Stream

通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节的隔离。
Spring Cloud Stream - 图1

组成 说明
Middleware 中间件,目前只支持RabbitMQ和Kafka
Binder Binder是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现
@Input 注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener 监听队列,用于消费者的队列的消息接收
@EnableBinding 指信道channel和exchange绑定在一起
  1. Stream 的消息通信模式遵循了发布-订阅模式,也就是Topic模式。在RabbitMQ中是Exchange交换机,在Kafka是Topic。
  2. 术语
  • Binder 绑定器,通过Binder可以很方便的连接中间件,屏蔽差异。
  • Channel: 通道,是Queue的一种抽象,主要实现存储和转发的媒介,通过Channel对队列进行配置。
  • Source和Sink简单的理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接收消息就是输入。
  1. 过程可以理解为下图:

Spring Cloud Stream - 图2

2.生产者

  1. 新增maven依赖

    1. <dependency>
    2. <groupId>org.springframework.cloud</groupId>
    3. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    4. </dependency>
  2. 新增application.yml配置 ```yaml server: port: 8801

spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: 192.168.99.100 port: 5672 username: guest password: guest bindings: # 服务的整合处理 output1: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置

  1. #output2: # 这个名字是一个通道的名称
  2. #destination: studyExchange # 表示要使用的Exchange名称定义
  3. #content-type: application/json # 设置消息类型,文本则设置“text/plain”
  4. #binder: defaultRabbit # 设置要绑定的消息服务的具体设置
  1. 3. 自定义消息通道
  2. ```java
  3. /**
  4. * 自定义消息通道
  5. */
  6. public interface MessageSource {
  7. @Output("output1")
  8. MessageChannel output1();
  9. //@Output("output2")
  10. //MessageChannel output2();
  11. }
  1. 业务类
  • Service

    1. public interface IMessageProvider {
    2. String send();
    3. }
    1. @EnableBinding(MessageSource.class) //定义消息的推送管道
    2. public class MessageProviderImpl implements IMessageProvider {
    3. @Resource
    4. private MessageSource messageSource; // 消息发送管道
    5. @Override
    6. public String send() {
    7. String serial = UUID.randomUUID().toString();
    8. messageSource.output1.send(MessageBuilder.withPayload(serial).build());
    9. return serial;
    10. }
    11. }
  1. 测试

启动后可以到RabbitMQ查看有一个交换机:
Spring Cloud Stream - 图3

3.消费者

  1. 新增maven依赖

    1. <dependency>
    2. <groupId>org.springframework.cloud</groupId>
    3. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    4. </dependency>
  2. 新增application.yml配置 ```yaml server: port: 8802

spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: 192.16899.100 port: 5672 username: guest password: guest bindings: # 服务的整合处理 input1: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,如果是文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置

  1. 3. 自定义消息通道
  2. ```java
  3. public interface CustomProcessor {
  4. //消息消费者的配置
  5. @Input("input1")
  6. SubscribableChannel customInput();
  7. }
  1. 业务类

    1. @EnableBinding(CustomProcessor.class)
    2. public class MessageConsumer {
    3. //监听binding中的消息
    4. @StreamListener("input1")
    5. public void receive(Message<String> message) {
    6. System.out.println("message = " + message.getPayload());
    7. }
    8. }

    4.消息分组

    通常在生产环境中,我们的每个服务都不会以单节点的方式,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Exchange)上。默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景下,我们希望生产者的消息只被一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能。
    Spring Cloud Stream - 图4
    实现的方式很简单,我们只需要在服务消费端设置spring.cloud.stream.bindings.input.group属性即可,如下配置: ```yaml server: port: 8802

spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: 192.16899.100 port: 5672 username: guest password: guest bindings: # 服务的整合处理 input1: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,如果是文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置 group: group1 # 设置消息的组名(同名组中的多个消息者,只有一个去消费消息)

  1. <a name="wVXfP"></a>
  2. ## 5.消息分区
  3. 有一些场景需要满足,同一特征的数据被同一个实例消费,比如同一个id的传感器监测数据必须被同一个实例统计计算分组,否则可能无法获取全部的数据。又比如部分异步任务,首次请求启动task,二次请求取消task,此场景就必须保证两次请求到同一个实例。<br />![](https://cdn.nlark.com/yuque/0/2020/png/726269/1586159575373-b78521ca-677e-4307-9f67-c4f8c6f5419e.png#crop=0&crop=0&crop=1&crop=1&from=url&id=n4HbN&margin=%5Bobject%20Object%5D&originHeight=260&originWidth=490&originalType=binary&ratio=1&rotation=0&showTitle=false&status=done&style=none&title=)<br />消息生产者配置:
  4. ```yaml
  5. server:
  6. port: 8801
  7. spring:
  8. application:
  9. name: cloud-stream-provider
  10. cloud:
  11. stream:
  12. binders: # 在此处配置要绑定的rabbitmq的服务信息;
  13. defaultRabbit: # 表示定义的名称,用于于binding整合
  14. type: rabbit # 消息组件类型
  15. environment: # 设置rabbitmq的相关的环境配置
  16. spring:
  17. rabbitmq:
  18. host: 192.168.99.100
  19. port: 5672
  20. username: guest
  21. password: guest
  22. bindings: # 服务的整合处理
  23. output1: # 这个名字是一个通道的名称
  24. destination: studyExchange # 表示要使用的Exchange名称定义
  25. content-type: application/json # 设置消息类型,文本则设置“text/plain”
  26. binder: defaultRabbit # 设置要绑定的消息服务的具体设置
  27. producer:
  28. # ===========修改部分开始===========
  29. partition-key-expression: 0 #通过该参数指定了分组键的表达式规则,
  30. #我们可以根据实际的输出消息规则来配置SpEL表达式来生成合适的分区键,例如当表达式的值为0, 那么在订阅者的instanceIndex中为0的接收方, 将会执行该消息.
  31. partition-count: 2 # 该参数指定了消息分区的数量
  32. # ===========修改部分结束===========
  33. #output2: # 这个名字是一个通道的名称
  34. #destination: studyExchange # 表示要使用的Exchange名称定义
  35. #content-type: application/json # 设置消息类型,文本则设置“text/plain”
  36. #binder: defaultRabbit # 设置要绑定的消息服务的具体设置

Spring SpEL表达式语言
partition-key-expression 参数可以通过设置 SpEL 表达式来根据实际消息来动态选择输出分区。下面通过样例进行演示(只有生产者这边需要修改,消费者方面不需要变化):

  • 首先我们定义一个 Bean 作为消息发送对象,注意对象中的 partition 属性用于指定该消息需要发送的分区索引:

    1. @Setter
    2. @Getter
    3. @AllArgsConstructor
    4. public class MyMessage {
    5. private String name;
    6. private Date date;
    7. private Integer partition;
    8. }
  • 生产者这边改成定时发送这个消息对象(MyMessage),这里指定分区索引值为 1:

    1. @EnableBinding(value = Source.class)
    2. public class SinkSender {
    3. @Bean
    4. @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "1000"))
    5. public MessageSource<MyMessage> timeMessageSource() {
    6. return () -> new GenericMessage<>(new MyMessage("hangge", new Date(), 1));
    7. }
    8. }
  • 生产者配置稍作修改,将分区键表达式设置为 payload.partition,表示根据消息对象的 partition 属性值来确定分区键:

    1. partition-key-expression: payload.partition

消息消费者配置:

  1. server:
  2. port: 8802
  3. spring:
  4. application:
  5. name: cloud-stream-consumer
  6. cloud:
  7. stream:
  8. binders: # 在此处配置要绑定的rabbitmq的服务信息;
  9. defaultRabbit: # 表示定义的名称,用于于binding整合
  10. type: rabbit # 消息组件类型
  11. environment: # 设置rabbitmq的相关的环境配置
  12. spring:
  13. rabbitmq:
  14. host: 192.16899.100
  15. port: 5672
  16. username: guest
  17. password: guest
  18. bindings: # 服务的整合处理
  19. input1: # 这个名字是一个通道的名称
  20. destination: studyExchange # 表示要使用的Exchange名称定义
  21. content-type: application/json # 设置消息类型,如果是文本则设置“text/plain”
  22. binder: defaultRabbit # 设置要绑定的消息服务的具体设置
  23. # ===========修改部分开始===========
  24. consumer:
  25. partitioned: true # 开启消费者分区功能
  26. instanceCount: 2 #指定了当前消费者的总实例数量
  27. instanceIndex: 0 #设置当前实例的索引号,从 0 开始
  28. # ===========修改部分结束===========

参考