1.简介
1.什么是Springcloud Stream
Spring Cloud Stream是一个构建消息驱动微服务的框架。说白了就是操作MQ的,可以屏蔽底层的MQ类型。
应用程序通过inputs或者 outputs与Spring Cloud Stream中binder对象交互。所以我们通过配置来绑定(binding),而与Spring Cloud Stream的binder对象负责与消息中间件交互。所以我们只需要了解如何与Stream的binder交互就可以了,屏蔽了与底层MQ交互。
Spring Cloud Stream为一些MQ提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
2.为什么引入Stream
屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。类似于Hibernate的作用一样, 我们更多的注重业务开发。Hibernate屏蔽了数据库的差异,可以很好的实现数据库的切换。Stream屏蔽了底层MQ的区别,可以很好的实现切换。目前主流的有ActiveMQ、RocketMQ、RabbitMQ、Kafka,Stream支持的有RabbitMQ和Kafka。
3.设计思想
- 标准的MQ
- 生产者/消费者通过消息媒介传递消息内容;
- 消息必须走特定的通道Channel;
- 引入Stream
通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节的隔离。
| 组成 | 说明 |
|---|---|
| Middleware | 中间件,目前只支持RabbitMQ和Kafka |
| Binder | Binder是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现 |
| @Input | 注解标识输入通道,通过该输入通道接收到的消息进入应用程序 |
| @Output | 注解标识输出通道,发布的消息将通过该通道离开应用程序 |
| @StreamListener | 监听队列,用于消费者的队列的消息接收 |
| @EnableBinding | 指信道channel和exchange绑定在一起 |
- Stream 的消息通信模式遵循了发布-订阅模式,也就是Topic模式。在RabbitMQ中是Exchange交换机,在Kafka是Topic。
- 术语
- Binder 绑定器,通过Binder可以很方便的连接中间件,屏蔽差异。
- Channel: 通道,是Queue的一种抽象,主要实现存储和转发的媒介,通过Channel对队列进行配置。
- Source和Sink简单的理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接收消息就是输入。
- 过程可以理解为下图:
2.生产者
新增maven依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
新增application.yml配置 ```yaml server: port: 8801
spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: 192.168.99.100 port: 5672 username: guest password: guest bindings: # 服务的整合处理 output1: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置
#output2: # 这个名字是一个通道的名称#destination: studyExchange # 表示要使用的Exchange名称定义#content-type: application/json # 设置消息类型,文本则设置“text/plain”#binder: defaultRabbit # 设置要绑定的消息服务的具体设置
3. 自定义消息通道```java/*** 自定义消息通道*/public interface MessageSource {@Output("output1")MessageChannel output1();//@Output("output2")//MessageChannel output2();}
- 业务类
Service
public interface IMessageProvider {String send();}
@EnableBinding(MessageSource.class) //定义消息的推送管道public class MessageProviderImpl implements IMessageProvider {@Resourceprivate MessageSource messageSource; // 消息发送管道@Overridepublic String send() {String serial = UUID.randomUUID().toString();messageSource.output1.send(MessageBuilder.withPayload(serial).build());return serial;}}
- 测试
3.消费者
新增maven依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
新增application.yml配置 ```yaml server: port: 8802
spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: 192.16899.100 port: 5672 username: guest password: guest bindings: # 服务的整合处理 input1: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,如果是文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置
3. 自定义消息通道```javapublic interface CustomProcessor {//消息消费者的配置@Input("input1")SubscribableChannel customInput();}
业务类
@EnableBinding(CustomProcessor.class)public class MessageConsumer {//监听binding中的消息@StreamListener("input1")public void receive(Message<String> message) {System.out.println("message = " + message.getPayload());}}
4.消息分组
通常在生产环境中,我们的每个服务都不会以单节点的方式,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Exchange)上。默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景下,我们希望生产者的消息只被一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能。

实现的方式很简单,我们只需要在服务消费端设置spring.cloud.stream.bindings.input.group属性即可,如下配置: ```yaml server: port: 8802
spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: 192.16899.100 port: 5672 username: guest password: guest bindings: # 服务的整合处理 input1: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,如果是文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置 group: group1 # 设置消息的组名(同名组中的多个消息者,只有一个去消费消息)
<a name="wVXfP"></a>## 5.消息分区有一些场景需要满足,同一特征的数据被同一个实例消费,比如同一个id的传感器监测数据必须被同一个实例统计计算分组,否则可能无法获取全部的数据。又比如部分异步任务,首次请求启动task,二次请求取消task,此场景就必须保证两次请求到同一个实例。<br /><br />消息生产者配置:```yamlserver:port: 8801spring:application:name: cloud-stream-providercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: 192.168.99.100port: 5672username: guestpassword: guestbindings: # 服务的整合处理output1: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置producer:# ===========修改部分开始===========partition-key-expression: 0 #通过该参数指定了分组键的表达式规则,#我们可以根据实际的输出消息规则来配置SpEL表达式来生成合适的分区键,例如当表达式的值为0, 那么在订阅者的instanceIndex中为0的接收方, 将会执行该消息.partition-count: 2 # 该参数指定了消息分区的数量# ===========修改部分结束===========#output2: # 这个名字是一个通道的名称#destination: studyExchange # 表示要使用的Exchange名称定义#content-type: application/json # 设置消息类型,文本则设置“text/plain”#binder: defaultRabbit # 设置要绑定的消息服务的具体设置
Spring SpEL表达式语言partition-key-expression 参数可以通过设置 SpEL 表达式来根据实际消息来动态选择输出分区。下面通过样例进行演示(只有生产者这边需要修改,消费者方面不需要变化):
首先我们定义一个 Bean 作为消息发送对象,注意对象中的 partition 属性用于指定该消息需要发送的分区索引:
@Setter@Getter@AllArgsConstructorpublic class MyMessage {private String name;private Date date;private Integer partition;}
生产者这边改成定时发送这个消息对象(MyMessage),这里指定分区索引值为 1:
@EnableBinding(value = Source.class)public class SinkSender {@Bean@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "1000"))public MessageSource<MyMessage> timeMessageSource() {return () -> new GenericMessage<>(new MyMessage("hangge", new Date(), 1));}}
生产者配置稍作修改,将分区键表达式设置为 payload.partition,表示根据消息对象的 partition 属性值来确定分区键:
partition-key-expression: payload.partition
消息消费者配置:
server:port: 8802spring:application:name: cloud-stream-consumercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: 192.16899.100port: 5672username: guestpassword: guestbindings: # 服务的整合处理input1: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,如果是文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置# ===========修改部分开始===========consumer:partitioned: true # 开启消费者分区功能instanceCount: 2 #指定了当前消费者的总实例数量instanceIndex: 0 #设置当前实例的索引号,从 0 开始# ===========修改部分结束===========
