1. 概述
一句话:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型
2. 设计思想
2.1 标准MQ
- 生产者/消费者之间靠消息媒介传递信息内容:Message
- 消息必须走特定的通道:MessageChannel
- 消息通道里的消息如何被消费呢?谁负责收发处理
3. Binder
通过定义绑定器Binder作为中间件,实现了应用程序与消息中间件细节之间的隔离。
- INPUT适用于消费者
- OUTPUT适用于生产者
4. Stream的核心组件
Binder
———很方便的连接中间件,屏蔽差异Channel
———通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过channel对队列进行配置Source
和Sink
:简单的可理解为参照对象是SpringcloudStream自身,从Stream发布消息就是输出Sink,接受消息就是输入Source5. 常用API
6. 依赖
<!-- Stream -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
7. 简单案例
7.1 生产者
7.1.1 YAML配置
spring: application: name: cloud-stream-privider cloud: stream: binders: #自此处配置要绑定的rabbitmq的服务信息 defaultRabbit: #表示定义的名称,用于binding整合 type: rabbit #消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: #服务的整合处理 output: #这个名字是一个通道的名称 destination: studyExchange #表示要使用的exchange名称定义 content-type: application/json #设置消息类型,本次为json binder: defaultRabbit #设置要绑定的消息服务的具体设置
7.1.2 Service
```java public interface IMessageService {
public String send(); }
@EnableBinding(Source.class) // 定义消息推送通道 @Slf4j public class MessageServiceImpl implements IMessageService {
/**
* 消息发送通道
*/
@Resource
private MessageChannel output;
@Override
public String send() {
String uuid = IdUtil.simpleUUID();
output.send(MessageBuilder.withPayload(uuid).build());
log.info("******uuid" + uuid);
return uuid;
}
}
<a name="TxzEn"></a>
### 7.1.3 Controller
```java
@RestController
public class SendMessageController {
@Resource
private IMessageService messageService;
@RequestMapping("/sendMessage")
public String sendMessage() {
String send = messageService.send();
return send;
}
}
7.1.4 测试
http://localhost:8801/sendMessage
7.2 消费者
7.2.1 YAML配置
和生产者主要区别只在
output
变为input
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: #自此处配置要绑定的rabbitmq的服务信息
defaultRabbit: #表示定义的名称,用于binding整合
type: rabbit #消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 15672
username: guest
password: guest
bindings: #服务的整合处理
################区别####################
input: #这个名字是一个通道的名称
destination: studyExchange #表示要使用的exchange名称定义
content-type: application/json #设置消息类型,本次为json
binder: defaultRabbit #设置要绑定的消息服务的具体设置
#group: atguiguA #消费这组,解决分组消费和持久化问题
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka
instance:
lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔
lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔
instance-id: send-8802.com
prefer-ip-address: true #访问的路径变为IP地址
7.2.2 监听器
@Slf4j
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
log.info("我是消费者1号,-----》接受到的消息是:"+message.getPayload()+"\t"+serverPort);
}
}
7.2.3 测试
8. 重复消费问题———分组消费
问题:按照简单案例中的消费者再clone出一份8803出来,出现
重复消费
问题。 解决思路:原因很简单,因为Stream默认会给每一个消费者分配一个消费者组,不同消费者组可以同时消费同一个exchange/topic,只需要将两个消费者放入同一个消费者组group
即可
8.1 解决方案
即将7.2.1 YAML配置中yml文件注释掉的
group
放开,将2个实例放入同一分组内即可,测试即可看到2个实例轮询获取,每次只有一个消费者进行消费,这样就避免了重复消费
9. 持久化
问题:通过上述解决了重复消费问题,而当消费者下线后,生产者还在发送消息,当消费者上线后想要消费之前已发送的消息,怎么办? 解决:同样需要分组
group
,如果消费者已分组,则消费者上线后可以消费历史消息,如果未分组,则消费不到,案例:
- 停止8802/8803;除掉8802的分组group:atguiguA;8003的分组属性保留
- 8801先发送4条消息到rabbitmq
- 先启动8802,无分组属性配置,后台没有打出来消息
- 再启动8803,有分组属性配置,后台打出来了MQ上的消息