Spring Cloud Stream 消息驱动组件帮助我们更快速,更方便,更友好地去构建消息驱动微服务。
Stream解决的痛点问题
MQ消息中间件广泛应用在 应用解耦合、 异步消息处理、 流量削峰等场景中。
但是,不同的MQ消息中间件内部机制、使用方式都有所不同。比如 RabbitMQ中有Exchange(交换机/交换器)的概念,kafka有Topic、Partition分区的概念。MQ消息中间件的差异性不利于我们上层的开发应用,如果系统在做MQ消息中间件的切换时,会比较困难。
Spring Cloud Stream 进行了很好的上层抽象,让我们与具体的消息中间件解耦合,屏蔽掉了底层具体MQ消息中间件的细节差异,类似Hibernate屏蔽掉具体的数据库MySQL/Oracle一样。
目前Spring Cloud Stream支持RabbitMQ和Kafka。
本质上:屏蔽掉了底层不同MQ消息中间件之间的差异,统一了MQ的编程模型,降低了学习、开发、维护MQ的成本。
Stream重要概念
inputs:相当于MQ的消息消费者Consumer;
outputs:相当于MQ的消息生产者Producer;
Binder对象:用来屏蔽底层MQ的细节,负责与具体的消息中间件交互。
应用程序通过inputs或者outputs与Spring Cloud Stream中的Binder对象交互。
Application Core: 应用服务(生产者/消费者)的业务逻辑代码。
inputs:关联消费者(相对应用服务来说,消息输入/消息消费)。
outputs:关联生产者。(相对应用程序来说,消息输出/消息生产)。
Binder:绑定器对象,用于屏蔽底层MQ差异,Stream提供不同的Binder,当需要切换MQ产品时,只需要切换Binder即可,而不需要修改任何应用逻辑(Binder绑定器的实现是框架内置的,Spring Cloud Stream目前支持RabbitMQ、Kafka两种消息队列)。
传统MQ模型与Stream消息驱动模型
传统MQ模型
Stream消息驱动模型
Stream消息通信方式及编程模型
Stream消息通信方式
在Spring Cloud Stream中消息通信方式遵循了发布-订阅模式,当一条消息被投递到消息中间件之后,会通过共享的Topic主题进行广播,消息消费者在订阅的主题中收到消息并触发自身的业务逻辑处理。
Topic:Stream中的抽象概念,用来代表发布共享消息给消费者的地方。 在RabbitMQ中,Topic对应的是Exchange,在Kafka中,Topic对应的是Topic。
Stream编程注解
注解主要是把上述结构图的组成部分上下关联起来,打通通道。
这样生产者的message数据才能进入mq,mq中的数据才能进入消费者。
注解 | 描述 |
---|---|
@Input 在消费者工程中使用 |
注解标识输入通道。 通过该输入通道接收到的消息进入应用程序。 |
@Output 在生产者工程中使用 |
注解标识输出通道。 发布的消息通过该通道离开应用程序。 |
@StreamListener 在消费者工程中使用,监听message的到来 |
监听队列。 用于消费者的队列的消息接收(消息监听) |
@EnableBinding | 把Channel和Exchange/Topic绑定在一起 |
Stream消息驱动工程开发
cloud-stream-producer-9090
: 作为生产者端发送消息。cloud-stream-consumer-9091
: 作为消费者端接收消息。cloud-stream-consumer-9092
: 作为消费者端接收消息。
生产者工程
(1)新建子工程模块
在 cloud-parent
下新建子module: cloud-stream-producer-9090
(2)引入坐标依赖
<!--eureka client 客户端依赖引⼊-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eurekaclient</artifactId>
</dependency>
<!--spring cloud stream 依赖(rabbit)-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
(3)application.yml添加配置
server:
port: 9090
spring:
application:
name: cloud-stream-producer
cloud:
stream:
binders: # 绑定MQ服务信息(此处我们是RabbitMQ)
lagouRabbitBinder: # 给Binder定义的名称,⽤于后⾯的关联
type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka
environment: # MQ环境配置(⽤户名、密码等)
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 关联整合通道和binder对象
output: # output是我们定义的通道名称,此处不能乱改
destination: lagouExchange # 要使⽤的Exchange名称(消息队列主题名称)
content-type: text/plain # application/json # 消息类型设置,⽐如json
binder: lagouRabbitBinder # 关联MQ服务
eureka:
client:
serviceUrl: # eureka server的路径
defaultZone: http://lagoucloudeurekaservera:8761/eureka/,http://lagoucloudeurekaserverb:8762/eureka/
instance:
prefer-ip-address: true #使⽤ip注册
(4) 启动类
@SpringBootApplication
@EnableDiscoveryClient
public class StreamProducerApplication9090 {
public static void main(String[] args) {
SpringApplication.run(StreamProducerApplication9090.class,args);
}
}
(5) 业务类开发
发送消息接口
public interface IMessageProducer {
public void sendMessage(String content);
}
发送消息实现类
// Source.class⾥⾯就是对输出通道的定义(这是Spring Cloud Stream内置的通道封装)
@EnableBinding(Source.class)
public class MessageProducerImpl implements IMessageProducer {
// 将MessageChannel的封装对象Source注⼊到这⾥使⽤
@Autowired
private Source source;
@Override
public void sendMessage(String content) {
// 向mq中发送消息(并不是直接操作mq,应该操作的是spring cloud stream)
// 使⽤通道向外发出消息(指的是Source⾥⾯的output通道)
source.output().send(MessageBuilder.withPayload(content).build());
}
}
测试类
SpringBootTest(classes = {StreamProducerApplication9090.class})
@RunWith(SpringJUnit4ClassRunner.class)
public class MessageProducerTest {
@Autowired
private IMessageProducer iMessageProducer;
@Test
public void testSendMessage() {
iMessageProducer.sendMessage("hello world-lagou101");
}
}
消费者工程
(1)新建子工程模块
(2)引入坐标依赖
(3)application.yml添加配置
server:
port: 9091
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 绑定MQ服务信息(此处我们是RabbitMQ)
lagouRabbitBinder: # 给Binder定义的名称,⽤于后⾯的关联
type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka
environment: # MQ环境配置(⽤户名、密码等)
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 关联整合通道和binder对象
input: # input是我们定义的通道名称,此处不能乱改
destination: lagouExchange # 要使⽤的Exchange名称(消息队列主题名称)
content-type: text/plain # application/json # 消息类型设置,⽐如json
binder: lagouRabbitBinder # 关联MQ服务
eureka:
client:
serviceUrl: # eureka server的路径
defaultZone: http://lagoucloudeurekaservera:8761/eureka/,http://lagoucloudeurekaserverb:8762/eureka/
instance:
prefer-ip-address: true #使⽤ip注册
(4) 消息消费者监听
@EnableBinding(Sink.class)
public class MessageConsumerService {
@StreamListener(Sink.INPUT)
public void recevieMessages(Message<String> message) {
System.out.println("=========接收到的消息:" + message);
}
}
Stream自定义消息通道
Stream内置了两种接口Source和Sink,分别定义了binding为”input”的输入流和”output”的输出流。
可以自定义输入通道和输出通道。
定义接口
interface CustomChannel {
String INPUT_LOG = "inputLog";
String OUTPUT_LOG = "outputLog";
@Input(INPUT_LOG)
SubscribableChannel inputLog();
@Output(OUTPUT_LOG)
MessageChannel outputLog();
}
使用
(1)在@EnableBinding注解中,绑定自定义接口
(2)使用@StreamListener做监听时,指定绑定 CustomChannel.INPUT_LOG
(3)服务配置文件补充bindings信息
bindings:
inputLog:
destination: lagouExchange # 要使⽤的Exchange名称(消息队列主题名称)
outputLog:
destination: eduExchange
Stream消息分组
业务场景中,希望主题上的一个Message只能被一个消费者端(多实例)消费处理,就需要通过使用消息分组的方式,来防止一个消费者端的多个实例重复消费消息。
在服务消费者端设置 spring.cloud.stream.bindings.input.group
属性,多个消费者实例配置为同一个group名称(同一个group中的消费者只能有一个可以获取到消费并消费)。