消息驱动概述
是什么
为什么引入cloud Stream,解决了什么问题?
在我们实际工作中,最常用的消息中间件有如下4种:
1. ActiveMQ
2. RabbitMQ
3. RocketMQ
4. kafka
有可能你学习的是 RabbitMQ , 然后到企业中使用的是 ActiveMQ,如果把上边的消息中间件全部学完,需要花费大量的时间
还有就是企业开发中,一般分前端,中端(JavaEE),后端(大数据)
这将会导致一个问题,开发,维护,切换成本变高
而cloud Stream 的出现就是要解决这些问题的
它屏蔽底层消息中间件的差异,降低切换版本,统一消息的编程模型
官方定义 Spring Cloud Stream是个构建消息驱动微服务的框架。
应用程序通过 inputs I或者 outputs来与 Spring Cloud Stream中 binder对象交互
通过我们配置来 binding(绑定),而 Spring Cloud Stream的 binder对象负责与消息中间件交互。
所以,我们只需要搞清楚如何与 Spring Cloud Stream交互就可以方便使用消息駆动的方式。
通过使用 Spring Integration 来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布订阅、消费组、分区的三个核心
目前仅支持 RabbitMQ, Kafka。
官网
设计思想
标准MQ
生产者/消费者之间靠消息媒介传递信息内容:Message
消息必须走特定的通道:消息通道MessageChannel
消息通道里的消息如何被消费呢,谁负责收发处理:消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器订阅
为什么用Cloud Stream
比方说我们用到了 RabbitMq和 Kafka,由于这两个消息中间件的架构上的不同
像 RabbitMq有 exchange, kafka有 Topic和 Partitions分区
这些中间件的差异性导致我们实际项目开发给我们造成了一定的困抗,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候 springcloud Stream给我们提供了一种解耦合的方式
stream凭什么可以统一底层差异
在没有绑定器这个概念的情况下,我们的 Spring,应用要直接与消息中间件进行信息交互的时候,
由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性
通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。
通过向应用程序暴露统一的 Channels通道,使得应用程序不需要再考虑各种不同的消息中间件实现
通过定义绑定器 Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
Binder
在没有绑定器这个概念的情況下,我们的 Spring,应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。 Stream对消息中间件的进一步封装可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为 kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程
Stream中的消息通信方式遵循了发布-订阅模式:Topic主题进行广播
1. 在RabbitMQ就是Exchange
2. 在kafka中就是Topic
Spring Cloud Stream标准流程套路
Binder
很方便的连接中间件,屏蔽差异
Channel
通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过对Channel对队列进行配置
Source和Sink
简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入
编码API和常用注解
案例说明
RabbitMQ环境已经OK
工程中新建三个子模块
cloud-stream-rabbitmq-provider8801,作为生产者进行发消息模块
cloud-stream-rabbitmq-consumer8802,作为消息接收模块
cloud-stream-rabbitmq-consumer8803,作为消息接收模块
消息驱动之生产者
新建8801
cloud-stream-rabbitmq-provider8801
pom依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>false</optional>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
配置文件
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client:
register-with-eureka: true
fetch-registry: true
service-url:
# defaultZone: http://eureka7002.com:7002/eureka,http://eureka7001.com:7001/eureka/ #集群版
defaultZone: http://eureka7001.com:7001/eureka/
instance:
instance-id: cloud-stream-provider
prefer-ip-address: true #访问路径可以显示ip地址
# Eureka客户端向服务端发送心跳的时间间隔,单位为秒(默认是30秒)
lease-renewal-interval-in-seconds: 1
#Eureka服务端在收到最后一次心跳后等待时间上限,单位为秒(默认是90秒),超时将剔除服务
lease-expiration-duration-in-seconds: 2
主启动类
StreamMQMain8801
@SpringBootApplication
@EnableEurekaClient
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class,args);
}
}
service
接口
public interface IMessageSend {
void send(String message);
}
实现类
package com.sgy.springcloud.service.impl;
import com.sgy.springcloud.service.IMessageSend;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
/**
* Created by AaronShen on 2020/7/26
*/
@EnableBinding(Source.class) // 定义消息的推送管道
public class IMessageSendImpl implements IMessageSend {
@Resource
MessageChannel output; // 消息发送管道
@Override
public void send(String message) {
output.send(MessageBuilder.withPayload(message).build());
}
}
controller
@RestController
public class SendMessageController {
@Resource
IMessageSend iMessageSend;
@GetMapping(value = "/sendMessage")
public String send() {
String uuid = UUID.randomUUID().toString();
iMessageSend.send(uuid);
return uuid;
}
}
测试
启动7001eureka
启动rabbitmq
http://localhost:15672/
启动8801,访问http://localhost:8801/sendMessage
来看一下消息队列
当不断访问 http://localhost:8801/sendMessage,消息队列界面的曲线会随之发生变化
消息驱动之消费者
新建Module
cloud-stream-rabbitmq-consumer8802
pom
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>false</optional>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
配置文件
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client:
register-with-eureka: true
fetch-registry: true
service-url:
# defaultZone: http://eureka7002.com:7002/eureka,http://eureka7001.com:7001/eureka/ #集群版
defaultZone: http://eureka7001.com:7001/eureka/
instance:
instance-id: cloud-stream-consumer
prefer-ip-address: true #访问路径可以显示ip地址
# Eureka客户端向服务端发送心跳的时间间隔,单位为秒(默认是30秒)
lease-renewal-interval-in-seconds: 1
#Eureka服务端在收到最后一次心跳后等待时间上限,单位为秒(默认是90秒),超时将剔除服务
lease-expiration-duration-in-seconds: 2
主启动类
@SpringBootApplication
@EnableEurekaClient
public class RabbitMqConsumerMain8802 {
public static void main(String[] args) {
SpringApplication.run(RabbitMqConsumerMain8802.class,args);
}
}
controller
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
System.out.println("消费者" + serverPort + "\t消费的消息是\t" + message.getPayload());
}
}
测试
测试8801发送8802接收消息
8002接收消息
Stream之消息重复消费
创建8803 Module
依照8802,clone出来一份运行8803,cloud-stream-rabbitmq-consumer8803
pom
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>false</optional>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
yml配置
server:
port: 8803
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client:
register-with-eureka: true
fetch-registry: true
service-url:
# defaultZone: http://eureka7002.com:7002/eureka,http://eureka7001.com:7001/eureka/ #集群版
defaultZone: http://eureka7001.com:7001/eureka/
instance:
instance-id: cloud-stream-consumer
prefer-ip-address: true #访问路径可以显示ip地址
# Eureka客户端向服务端发送心跳的时间间隔,单位为秒(默认是30秒)
lease-renewal-interval-in-seconds: 1
#Eureka服务端在收到最后一次心跳后等待时间上限,单位为秒(默认是90秒),超时将剔除服务
lease-expiration-duration-in-seconds: 2
主启动类,RabbitMqConsumerMain8803
@SpringBootApplication
@EnableEurekaClient
public class RabbitMqConsumerMain8803 {
public static void main(String[] args) {
SpringApplication.run(RabbitMqConsumerMain8803.class,args);
}
}
controller
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
System.out.println("消费者" + serverPort + "\t消费的消息是\t" + message.getPayload());
}
}
启动测试
1. 先启动RabbitMQ
2. 7001
3. 8801
4. 8802
5. 8803
访问8801生产者发送消息,http://localhost:8801/sendMessage
然后我们依次看看8802和8803后台的打印结果
启动后遇到两个问题
有重复消费问题
消息持久化问题
消费者
目前是8802/8803同时都收到了,存在重复消费问题
如何解决
分组和持久化属性group
生产实际案例
比如在如下场景中,订单系统我们做集群部暑,都会从 RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避兔这种情况。
这时我们就可以使用 Stream中的消息分组来解决
注意在 Stream中处于同一个 group中的多个消费者是克争关系,就能够保证消息只会被其中个应用消费一次。
不同组是可以全面消费的(重复消费)
同一组内会发生竟争关系,只有其中一个可以消费
分组解决重复消费
故障现象:重复消费
导致原因:默认分组是不同的,组流水号不一样,被认为是不同的组,可以消费
解决方案:
微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。
8802/8803都变成不同组,group两个不同
group: suiyueranA,suiyueranB
8802修改yml
8803修改yml
看一下,消息界面中交换机的分组情况
还是存在重复消费
最终解决
8802/8803实现了轮询分组,每次只有一个消费者 8801模块的发的消息只能被8802或8803其中一个接收到,这样避免了重复消费
8802/8803都变成相同组,group两个相同
group: suiyueranA
8802修改yml
8803修改yml
同一个组的多个微服务实例,每次只会有一个拿到
持久化
通过上述,解决了重复消费问题,再看看持久化
停止8802/8803并去除掉8802的分组group:suiyueranA,但是8803的分组group:suiyueranA没有去掉
8801先发送4条信息到rabbitmq
先启动8802,无分组属性配置,后台没有打出来消息
启动8803,有分组属性配置,后台打出来了MQ上的消息