一、消息驱动概述
1、什么是消息驱动?
有没有一种新的技术诞生,让我们不再关注具体MQ的细节,我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换。
因此出现了 SpringCloud Stream:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型,SpringCloud Stream是构建与共享消息系统相连的高度可扩展事件驱动的微服务的框架。
- 官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。
- 应用程序通过inputs或者outputs来与Spring cloud Stream中binder对象交互
- 通过我们配置的binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。
- 通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
- Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
- 目前仅支持RabbitMQ、Kafka。
2、消息驱动原理
- Binder:很方便的连接中间件,屏蔽差异。
- Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置。
Source和Sink:简单的可理解为参照对象是SpringCloud Stream自身,从Stream发布消息就是输出,接收消息就是输入。
3、Stream编码常用注解
二、Stream的使用
1、搭建生产者
cloud-stream-rabbitmq-provider8801,作为生产者进行发消息模块
写pom
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
写yml ```yaml server: port: 8801
spring: application: name: cloud-stream-provider
MQ
rabbitmq: host: 192.168.59.150 port: 5672 username: guest password: guest cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 bindings: # 服务的整合处理 output: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置
4. 主启动类
```java
@SpringBootApplication
public class StreamProvider8801 {
public static void main(String[] args) {
SpringApplication.run(StreamProvider8801.class, args);
}
}
业务类
@EnableBinding(Source.class) //绑定服务生产者 public class SendServiceImpl { @Resource private MessageChannel output; public String send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); System.out.println("*****serial: " + serial); return serial; } }
写个测试 controller
@RestController public class SendController { @Resource private SendService sendService; @GetMapping("/sendMessage") public String sendMessage() { return sendService.send(); } }
记录一个问题:需要注意的是,如果使用的远程RabbitMQ,可能启动服务之后会报错Rabbit health check failed
,此时,我们需要关闭RabbitMQ的心跳检测,在application.yml
配置文件中添加如下配置(我没遇上)
#关闭Rabbit的心跳检测
management:
health:
rabbit:
enabled: false
2、搭建消费者
- cloud-stream-rabbitmq-consumer8802,作为消息接收模块
写pom
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
写yml ```yaml server: port: 8802
spring: application: name: cloud-stream-consumer
MQ
rabbitmq: host: 192.168.59.150 port: 5672 username: guest password: guest cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置 :报红无视
eureka: client:
#表示是否将自己注册进eurekaserver
register-with-eureka: true
#是否从EurekaServer抓取已有的注册信息,默认为true。但节点无所谓,集群必须设置为true才能配合ribbon使用负载均衡
fetch-registry: true
service-url:
defaultZone: http://localhost:7001/eureka
关闭Rabbit的心跳检测
management: health: rabbit: enabled: false
![image.png](https://cdn.nlark.com/yuque/0/2021/png/21567217/1623299782561-68afc93a-26ba-4313-b5e0-497c92835582.png#clientId=u32c6e05c-b4e5-4&from=paste&height=320&id=ude4e88ca&margin=%5Bobject%20Object%5D&name=image.png&originHeight=640&originWidth=1155&originalType=binary&ratio=2&size=79358&status=done&style=none&taskId=ud7f47dd5-71b3-44cd-aeca-436dac8efd4&width=577.5)
4. 主启动类
```java
@SpringBootApplication
public class StreamConsumer8802 {
public static void main(String[] args) {
SpringApplication.run(StreamConsumer8802.class, args);
}
}
业务类
@EnableBinding(Sink.class) //绑定消费者队列 public class MessageConsumerController { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message) { System.out.println("消费者:" + serverPort + ",接收到的消息:" + message.getPayload()); } }
测试:开启消费者和生产者,然后访问 http://localhost:8801/sendMessage 生产消息。
存在问题:上面已经实现了消息的生产和消费,但是如果我们拷贝多个消费者,并且在没有对消费者进行特殊配置的前提下,会发现运行后有两个问题: