一、消息驱动概述

1、什么是消息驱动?

有没有一种新的技术诞生,让我们不再关注具体MQ的细节,我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换。
因此出现了 SpringCloud Stream:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型,SpringCloud Stream是构建与共享消息系统相连的高度可扩展事件驱动的微服务的框架。

  • 官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。
  • 应用程序通过inputs或者outputs来与Spring cloud Stream中binder对象交互
  • 通过我们配置的binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。
  • 通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
  • Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
  • 目前仅支持RabbitMQ、Kafka。

image.png
屏蔽了

2、消息驱动原理

image.png

  1. Binder:很方便的连接中间件,屏蔽差异。
  2. Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置。
  3. Source和Sink:简单的可理解为参照对象是SpringCloud Stream自身,从Stream发布消息就是输出,接收消息就是输入。

    3、Stream编码常用注解
    image.png

    二、Stream的使用

    1、搭建生产者

  4. cloud-stream-rabbitmq-provider8801,作为生产者进行发消息模块

  5. 写pom

    1. <dependency>
    2. <groupId>org.springframework.cloud</groupId>
    3. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    4. </dependency>
  6. 写yml ```yaml server: port: 8801

spring: application: name: cloud-stream-provider

MQ

rabbitmq: host: 192.168.59.150 port: 5672 username: guest password: guest cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 bindings: # 服务的整合处理 output: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置


4. 主启动类
```java
@SpringBootApplication
public class StreamProvider8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamProvider8801.class, args);
    }
}
  1. 业务类

    @EnableBinding(Source.class) //绑定服务生产者
    public class SendServiceImpl {
    
     @Resource
     private MessageChannel output;
    
     public String send() {
         String serial = UUID.randomUUID().toString();
         output.send(MessageBuilder.withPayload(serial).build());
         System.out.println("*****serial: " + serial);
         return serial;
     }
    }
    

    写个测试 controller

    @RestController
    public class SendController {
    
     @Resource
     private SendService sendService;
    
     @GetMapping("/sendMessage")
     public String sendMessage() {
         return sendService.send();
     }
    }
    

记录一个问题:需要注意的是,如果使用的远程RabbitMQ,可能启动服务之后会报错Rabbit health check failed,此时,我们需要关闭RabbitMQ的心跳检测,在application.yml配置文件中添加如下配置(我没遇上)

#关闭Rabbit的心跳检测
management:
  health:
    rabbit:
      enabled: false

2、搭建消费者

  1. cloud-stream-rabbitmq-consumer8802,作为消息接收模块
  2. 写pom

    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    
  3. 写yml ```yaml server: port: 8802

spring: application: name: cloud-stream-consumer

MQ

rabbitmq: host: 192.168.59.150 port: 5672 username: guest password: guest cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置 :报红无视

eureka: client:

#表示是否将自己注册进eurekaserver
register-with-eureka: true
#是否从EurekaServer抓取已有的注册信息,默认为true。但节点无所谓,集群必须设置为true才能配合ribbon使用负载均衡
fetch-registry: true
service-url:
  defaultZone: http://localhost:7001/eureka

关闭Rabbit的心跳检测

management: health: rabbit: enabled: false

![image.png](https://cdn.nlark.com/yuque/0/2021/png/21567217/1623299782561-68afc93a-26ba-4313-b5e0-497c92835582.png#clientId=u32c6e05c-b4e5-4&from=paste&height=320&id=ude4e88ca&margin=%5Bobject%20Object%5D&name=image.png&originHeight=640&originWidth=1155&originalType=binary&ratio=2&size=79358&status=done&style=none&taskId=ud7f47dd5-71b3-44cd-aeca-436dac8efd4&width=577.5)

4. 主启动类
```java
@SpringBootApplication
public class StreamConsumer8802 {
    public static void main(String[] args) {
        SpringApplication.run(StreamConsumer8802.class, args);
    }
}
  1. 业务类

    @EnableBinding(Sink.class) //绑定消费者队列
    public class MessageConsumerController {
     @Value("${server.port}")
     private String serverPort;
    
     @StreamListener(Sink.INPUT)
     public void input(Message<String> message) {
         System.out.println("消费者:" + serverPort + ",接收到的消息:" + message.getPayload());
     }
    }
    
  2. 测试:开启消费者和生产者,然后访问 http://localhost:8801/sendMessage 生产消息。

存在问题:上面已经实现了消息的生产和消费,但是如果我们拷贝多个消费者,并且在没有对消费者进行特殊配置的前提下,会发现运行后有两个问题:

  1. 有重复消费问题
  2. 消息持久化问题

    3、重复消费问题

    为了解决重复消费的问题,需要用到一个分组和持久化属性group:将不能重复消费的消费者端分到一个组里即可。
    image.png

    4、持久化问题

    通过上述,解决了重复消费问题,再看看持久化。

  3. 停止8802/8803并去除掉8802的分组group: A_Group,8803的分组group: A_Group没有去掉。

  4. 8801先发送4条消息到RabbitMq。
  5. 先启动8802,无分组属性配置,后台没有打出来消息。
  6. 再启动8803,有分组属性配置,后台打出来了MQ上的消息 (消息持久化体现)。