1. 概述

一句话:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型 image.png image.png image.png

2. 设计思想

2.1 标准MQ

图像.jpeg

  • 生产者/消费者之间靠消息媒介传递信息内容:Message
  • 消息必须走特定的通道:MessageChannel
  • 消息通道里的消息如何被消费呢?谁负责收发处理
    • 消息通道MessageChannel的子接口SubscribeChannel,由MessageHandler消息处理器所订阅

      2.2 CloudStream

      Stream中的消息通讯方式遵循了 **发布-订阅模式** ,使用 **Topic** 主题进行广播,在RabbitMQ就是Exchange,在Kafka中就是Topic

image.png
image.png
image.png
image.png

3. Binder

通过定义绑定器Binder作为中间件,实现了应用程序与消息中间件细节之间的隔离。

  • INPUT适用于消费者
  • OUTPUT适用于生产者

image.png
image.png

4. Stream的核心组件

  • Binder ———很方便的连接中间件,屏蔽差异
  • Channel ———通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过channel对队列进行配置
  • SourceSink :简单的可理解为参照对象是SpringcloudStream自身,从Stream发布消息就是输出Sink,接受消息就是输入Source

    5. 常用API

    image.png

    6. 依赖

    1. <!-- Stream -->
    2. <dependency>
    3. <groupId>org.springframework.cloud</groupId>
    4. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    5. </dependency>

    7. 简单案例

    7.1 生产者

    7.1.1 YAML配置

    spring:
    application:
      name: cloud-stream-privider
    cloud:
      stream:
        binders:   #自此处配置要绑定的rabbitmq的服务信息
          defaultRabbit: #表示定义的名称,用于binding整合
            type: rabbit #消息组件类型
            environment:  # 设置rabbitmq的相关的环境配置
              spring:
                rabbitmq:
                  host: localhost
                  port: 5672
                  username: guest
                  password: guest
        bindings:   #服务的整合处理
          output: #这个名字是一个通道的名称
            destination: studyExchange #表示要使用的exchange名称定义
            content-type: application/json #设置消息类型,本次为json
            binder: defaultRabbit  #设置要绑定的消息服务的具体设置
    

    7.1.2 Service

    ```java public interface IMessageService {

    public String send(); }

@EnableBinding(Source.class) // 定义消息推送通道 @Slf4j public class MessageServiceImpl implements IMessageService {

/**
 * 消息发送通道
 */
@Resource
private MessageChannel output;

@Override
public String send() {
    String uuid = IdUtil.simpleUUID();
    output.send(MessageBuilder.withPayload(uuid).build());
    log.info("******uuid" + uuid);
    return uuid;
}

}

<a name="TxzEn"></a>
### 7.1.3 Controller
```java
@RestController
public class SendMessageController {

    @Resource
    private IMessageService messageService;

    @RequestMapping("/sendMessage")
    public String sendMessage() {
        String send = messageService.send();
        return send;
    }
}

7.1.4 测试

http://localhost:8801/sendMessage
image.png

7.2 消费者

7.2.1 YAML配置

和生产者主要区别只在output 变为input

server:
  port: 8802
spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders:   #自此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: #表示定义的名称,用于binding整合
          type: rabbit #消息组件类型
          environment:  # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 15672
                username: guest
                password: guest
      bindings:   #服务的整合处理
       ################区别####################
        input: #这个名字是一个通道的名称
          destination: studyExchange #表示要使用的exchange名称定义
          content-type: application/json #设置消息类型,本次为json
          binder: defaultRabbit  #设置要绑定的消息服务的具体设置
          #group: atguiguA   #消费这组,解决分组消费和持久化问题
eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka
  instance:
    lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔
    lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔
    instance-id: send-8802.com
    prefer-ip-address: true #访问的路径变为IP地址

7.2.2 监听器

@Slf4j
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {

    @Value("${server.port}")
    private String serverPort;


    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
       log.info("我是消费者1号,-----》接受到的消息是:"+message.getPayload()+"\t"+serverPort);
    }
}

7.2.3 测试

image.png

8. 重复消费问题———分组消费

问题:按照简单案例中的消费者再clone出一份8803出来,出现 重复消费 问题。 解决思路:原因很简单,因为Stream默认会给每一个消费者分配一个消费者组,不同消费者组可以同时消费同一个exchange/topic,只需要将两个消费者放入同一个 消费者组group 即可 image.png

8.1 解决方案

即将7.2.1 YAML配置中yml文件注释掉的 group 放开,将2个实例放入同一分组内即可,测试即可看到2个实例轮询获取,每次只有一个消费者进行消费,这样就避免了重复消费

9. 持久化

问题:通过上述解决了重复消费问题,而当消费者下线后,生产者还在发送消息,当消费者上线后想要消费之前已发送的消息,怎么办? 解决:同样需要分组 group ,如果消费者已分组,则消费者上线后可以消费历史消息,如果未分组,则消费不到,案例:

  • 停止8802/8803;除掉8802的分组group:atguiguA;8003的分组属性保留
  • 8801先发送4条消息到rabbitmq
  • 先启动8802,无分组属性配置,后台没有打出来消息

image.png

  • 再启动8803,有分组属性配置,后台打出来了MQ上的消息

image.png