1.简介
1.什么是Springcloud Stream
Spring Cloud Stream是一个构建消息驱动微服务的框架。说白了就是操作MQ的,可以屏蔽底层的MQ类型。
应用程序通过inputs
或者 outputs
与Spring Cloud Stream中binder
对象交互。所以我们通过配置来绑定(binding
),而与Spring Cloud Stream的binder
对象负责与消息中间件交互。所以我们只需要了解如何与Stream的binder
交互就可以了,屏蔽了与底层MQ交互。
Spring Cloud Stream为一些MQ提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
2.为什么引入Stream
屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。类似于Hibernate的作用一样, 我们更多的注重业务开发。Hibernate屏蔽了数据库的差异,可以很好的实现数据库的切换。Stream屏蔽了底层MQ的区别,可以很好的实现切换。目前主流的有ActiveMQ、RocketMQ、RabbitMQ、Kafka,Stream支持的有RabbitMQ和Kafka。
3.设计思想
- 标准的MQ
- 生产者/消费者通过消息媒介传递消息内容;
- 消息必须走特定的通道Channel;
- 引入Stream
通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节的隔离。
组成 | 说明 |
---|---|
Middleware | 中间件,目前只支持RabbitMQ和Kafka |
Binder | Binder是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现 |
@Input | 注解标识输入通道,通过该输入通道接收到的消息进入应用程序 |
@Output | 注解标识输出通道,发布的消息将通过该通道离开应用程序 |
@StreamListener | 监听队列,用于消费者的队列的消息接收 |
@EnableBinding | 指信道channel和exchange绑定在一起 |
- Stream 的消息通信模式遵循了发布-订阅模式,也就是Topic模式。在RabbitMQ中是Exchange交换机,在Kafka是Topic。
- 术语
- Binder 绑定器,通过Binder可以很方便的连接中间件,屏蔽差异。
- Channel: 通道,是Queue的一种抽象,主要实现存储和转发的媒介,通过Channel对队列进行配置。
- Source和Sink简单的理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接收消息就是输入。
- 过程可以理解为下图:
2.生产者
新增maven依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
新增application.yml配置 ```yaml server: port: 8801
spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: 192.168.99.100 port: 5672 username: guest password: guest bindings: # 服务的整合处理 output1: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置
#output2: # 这个名字是一个通道的名称
#destination: studyExchange # 表示要使用的Exchange名称定义
#content-type: application/json # 设置消息类型,文本则设置“text/plain”
#binder: defaultRabbit # 设置要绑定的消息服务的具体设置
3. 自定义消息通道
```java
/**
* 自定义消息通道
*/
public interface MessageSource {
@Output("output1")
MessageChannel output1();
//@Output("output2")
//MessageChannel output2();
}
- 业务类
Service
public interface IMessageProvider {
String send();
}
@EnableBinding(MessageSource.class) //定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider {
@Resource
private MessageSource messageSource; // 消息发送管道
@Override
public String send() {
String serial = UUID.randomUUID().toString();
messageSource.output1.send(MessageBuilder.withPayload(serial).build());
return serial;
}
}
- 测试
3.消费者
新增maven依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
新增application.yml配置 ```yaml server: port: 8802
spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: 192.16899.100 port: 5672 username: guest password: guest bindings: # 服务的整合处理 input1: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,如果是文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置
3. 自定义消息通道
```java
public interface CustomProcessor {
//消息消费者的配置
@Input("input1")
SubscribableChannel customInput();
}
业务类
@EnableBinding(CustomProcessor.class)
public class MessageConsumer {
//监听binding中的消息
@StreamListener("input1")
public void receive(Message<String> message) {
System.out.println("message = " + message.getPayload());
}
}
4.消息分组
通常在生产环境中,我们的每个服务都不会以单节点的方式,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Exchange)上。默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景下,我们希望生产者的消息只被一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能。
实现的方式很简单,我们只需要在服务消费端设置spring.cloud.stream.bindings.input.group
属性即可,如下配置: ```yaml server: port: 8802
spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: 192.16899.100 port: 5672 username: guest password: guest bindings: # 服务的整合处理 input1: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,如果是文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置 group: group1 # 设置消息的组名(同名组中的多个消息者,只有一个去消费消息)
<a name="wVXfP"></a>
## 5.消息分区
有一些场景需要满足,同一特征的数据被同一个实例消费,比如同一个id的传感器监测数据必须被同一个实例统计计算分组,否则可能无法获取全部的数据。又比如部分异步任务,首次请求启动task,二次请求取消task,此场景就必须保证两次请求到同一个实例。<br /><br />消息生产者配置:
```yaml
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: 192.168.99.100
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output1: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
producer:
# ===========修改部分开始===========
partition-key-expression: 0 #通过该参数指定了分组键的表达式规则,
#我们可以根据实际的输出消息规则来配置SpEL表达式来生成合适的分区键,例如当表达式的值为0, 那么在订阅者的instanceIndex中为0的接收方, 将会执行该消息.
partition-count: 2 # 该参数指定了消息分区的数量
# ===========修改部分结束===========
#output2: # 这个名字是一个通道的名称
#destination: studyExchange # 表示要使用的Exchange名称定义
#content-type: application/json # 设置消息类型,文本则设置“text/plain”
#binder: defaultRabbit # 设置要绑定的消息服务的具体设置
Spring SpEL表达式语言partition-key-expression
参数可以通过设置 SpEL 表达式来根据实际消息来动态选择输出分区。下面通过样例进行演示(只有生产者这边需要修改,消费者方面不需要变化):
首先我们定义一个 Bean 作为消息发送对象,注意对象中的 partition 属性用于指定该消息需要发送的分区索引:
@Setter
@Getter
@AllArgsConstructor
public class MyMessage {
private String name;
private Date date;
private Integer partition;
}
生产者这边改成定时发送这个消息对象(MyMessage),这里指定分区索引值为 1:
@EnableBinding(value = Source.class)
public class SinkSender {
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "1000"))
public MessageSource<MyMessage> timeMessageSource() {
return () -> new GenericMessage<>(new MyMessage("hangge", new Date(), 1));
}
}
生产者配置稍作修改,将分区键表达式设置为 payload.partition,表示根据消息对象的 partition 属性值来确定分区键:
partition-key-expression: payload.partition
消息消费者配置:
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: 192.16899.100
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input1: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,如果是文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
# ===========修改部分开始===========
consumer:
partitioned: true # 开启消费者分区功能
instanceCount: 2 #指定了当前消费者的总实例数量
instanceIndex: 0 #设置当前实例的索引号,从 0 开始
# ===========修改部分结束===========