消息驱动概述

是什么

为什么引入cloud Stream,解决了什么问题?

在我们实际工作中,最常用的消息中间件有如下4种:
 1. ActiveMQ
 2. RabbitMQ
 3. RocketMQ
 4. kafka

有可能你学习的是 RabbitMQ , 然后到企业中使用的是 ActiveMQ,如果把上边的消息中间件全部学完,需要花费大量的时间

还有就是企业开发中,一般分前端,中端(JavaEE),后端(大数据)

1610803437259.png

这将会导致一个问题,开发,维护,切换成本变高

而cloud Stream 的出现就是要解决这些问题的

它屏蔽底层消息中间件的差异,降低切换版本,统一消息的编程模型

官方定义 Spring Cloud Stream是个构建消息驱动微服务的框架。

应用程序通过 inputs I或者 outputs来与 Spring Cloud Stream中 binder对象交互
通过我们配置来 binding(绑定),而 Spring Cloud Stream的 binder对象负责与消息中间件交互。
所以,我们只需要搞清楚如何与 Spring Cloud Stream交互就可以方便使用消息駆动的方式。

通过使用 Spring Integration 来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布订阅、消费组、分区的三个核心

目前仅支持 RabbitMQ, Kafka。

官网

官网

API文档

中文指导手册

设计思想

标准MQ

生产者/消费者之间靠消息媒介传递信息内容:Message

消息必须走特定的通道:消息通道MessageChannel

消息通道里的消息如何被消费呢,谁负责收发处理:消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器订阅

1610803437294.png

为什么用Cloud Stream

比方说我们用到了 RabbitMq和 Kafka,由于这两个消息中间件的架构上的不同
像 RabbitMq有 exchange, kafka有 Topic和 Partitions分区

1610803437340.png

这些中间件的差异性导致我们实际项目开发给我们造成了一定的困抗,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候 springcloud Stream给我们提供了一种解耦合的方式

stream凭什么可以统一底层差异

在没有绑定器这个概念的情况下,我们的 Spring,应用要直接与消息中间件进行信息交互的时候,
由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性
通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。
通过向应用程序暴露统一的 Channels通道,使得应用程序不需要再考虑各种不同的消息中间件实现

通过定义绑定器 Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

Binder

在没有绑定器这个概念的情況下,我们的 Spring,应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。 Stream对消息中间件的进一步封装可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为 kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程

1610803437388.png

Stream中的消息通信方式遵循了发布-订阅模式:Topic主题进行广播
 1. 在RabbitMQ就是Exchange
 2. 在kafka中就是Topic

Spring Cloud Stream标准流程套路

1610803437427.png

1610803437467.png

Binder

很方便的连接中间件,屏蔽差异

Channel

通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过对Channel对队列进行配置

Source和Sink

简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入

编码API和常用注解

1610803437559.png

案例说明

RabbitMQ环境已经OK

工程中新建三个子模块

cloud-stream-rabbitmq-provider8801,作为生产者进行发消息模块

cloud-stream-rabbitmq-consumer8802,作为消息接收模块

cloud-stream-rabbitmq-consumer8803,作为消息接收模块

消息驱动之生产者

新建8801

cloud-stream-rabbitmq-provider8801

pom依赖

  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-web</artifactId>
  8. </dependency>
  9. <dependency>
  10. <groupId>org.springframework.boot</groupId>
  11. <artifactId>spring-boot-starter-test</artifactId>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.springframework.boot</groupId>
  15. <artifactId>spring-boot-devtools</artifactId>
  16. <scope>runtime</scope>
  17. <optional>false</optional>
  18. </dependency>
  19. <dependency>
  20. <groupId>org.springframework.cloud</groupId>
  21. <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.springframework.boot</groupId>
  25. <artifactId>spring-boot-starter-actuator</artifactId>
  26. </dependency>

配置文件

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit  # 设置要绑定的消息服务的具体设置

eureka:
  client:
    register-with-eureka: true
    fetch-registry: true
    service-url:
      # defaultZone: http://eureka7002.com:7002/eureka,http://eureka7001.com:7001/eureka/ #集群版
      defaultZone: http://eureka7001.com:7001/eureka/
  instance:
    instance-id: cloud-stream-provider
    prefer-ip-address: true #访问路径可以显示ip地址
    # Eureka客户端向服务端发送心跳的时间间隔,单位为秒(默认是30秒)
    lease-renewal-interval-in-seconds: 1
    #Eureka服务端在收到最后一次心跳后等待时间上限,单位为秒(默认是90秒),超时将剔除服务
    lease-expiration-duration-in-seconds: 2

主启动类

StreamMQMain8801

@SpringBootApplication
@EnableEurekaClient
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class,args);
    }
}

service

接口

public interface IMessageSend {
    void send(String message);
}

实现类

package com.sgy.springcloud.service.impl;

import com.sgy.springcloud.service.IMessageSend;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import javax.annotation.Resource;

/**
 * Created by AaronShen on 2020/7/26
 */
@EnableBinding(Source.class) // 定义消息的推送管道
public class IMessageSendImpl implements IMessageSend {

    @Resource
    MessageChannel output;  // 消息发送管道

    @Override
    public void send(String message) {
        output.send(MessageBuilder.withPayload(message).build());
    }
}

controller

@RestController
public class SendMessageController {
    @Resource
    IMessageSend iMessageSend;

    @GetMapping(value = "/sendMessage")
    public String send() {
        String uuid = UUID.randomUUID().toString();
        iMessageSend.send(uuid);
        return uuid;
    }
}

测试

启动7001eureka

启动rabbitmq

http://localhost:15672/

启动8801,访问http://localhost:8801/sendMessage

1610803437586.png

来看一下消息队列

1610803437639.png

当不断访问 http://localhost:8801/sendMessage,消息队列界面的曲线会随之发生变化

1610803437665.png

消息驱动之消费者

新建Module

cloud-stream-rabbitmq-consumer8802

pom

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-devtools</artifactId>
    <scope>runtime</scope>
    <optional>false</optional>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

配置文件

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit  # 设置要绑定的消息服务的具体设置

eureka:
  client:
    register-with-eureka: true
    fetch-registry: true
    service-url:
      # defaultZone: http://eureka7002.com:7002/eureka,http://eureka7001.com:7001/eureka/ #集群版
      defaultZone: http://eureka7001.com:7001/eureka/
  instance:
    instance-id: cloud-stream-consumer
    prefer-ip-address: true #访问路径可以显示ip地址
    # Eureka客户端向服务端发送心跳的时间间隔,单位为秒(默认是30秒)
    lease-renewal-interval-in-seconds: 1
    #Eureka服务端在收到最后一次心跳后等待时间上限,单位为秒(默认是90秒),超时将剔除服务
    lease-expiration-duration-in-seconds: 2

主启动类

@SpringBootApplication
@EnableEurekaClient
public class RabbitMqConsumerMain8802 {
    public static void main(String[] args) {
        SpringApplication.run(RabbitMqConsumerMain8802.class,args);
    }
}

controller

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
    @Value("${server.port}")
    private String serverPort;


    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("消费者" + serverPort + "\t消费的消息是\t" + message.getPayload());
    }
}

测试

测试8801发送8802接收消息

1610803437688.png

8002接收消息

1610803437720.png

Stream之消息重复消费

创建8803 Module

依照8802,clone出来一份运行8803,cloud-stream-rabbitmq-consumer8803

pom

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-devtools</artifactId>
    <scope>runtime</scope>
    <optional>false</optional>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

yml配置

server:
  port: 8803

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit           # 设置要绑定的消息服务的具体设置

eureka:
  client:
    register-with-eureka: true
    fetch-registry: true
    service-url:
      # defaultZone: http://eureka7002.com:7002/eureka,http://eureka7001.com:7001/eureka/ #集群版
      defaultZone: http://eureka7001.com:7001/eureka/
  instance:
    instance-id: cloud-stream-consumer
    prefer-ip-address: true #访问路径可以显示ip地址
    # Eureka客户端向服务端发送心跳的时间间隔,单位为秒(默认是30秒)
    lease-renewal-interval-in-seconds: 1
    #Eureka服务端在收到最后一次心跳后等待时间上限,单位为秒(默认是90秒),超时将剔除服务
    lease-expiration-duration-in-seconds: 2

主启动类,RabbitMqConsumerMain8803

@SpringBootApplication
@EnableEurekaClient
public class RabbitMqConsumerMain8803 {
    public static void main(String[] args) {
        SpringApplication.run(RabbitMqConsumerMain8803.class,args);
    }
}

controller

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
    @Value("${server.port}")
    private String serverPort;


    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("消费者" + serverPort + "\t消费的消息是\t" + message.getPayload());
    }
}

启动测试

 1. 先启动RabbitMQ
 2. 7001
 3. 8801
 4. 8802
 5. 8803

访问8801生产者发送消息,http://localhost:8801/sendMessage

然后我们依次看看8802和8803后台的打印结果

1610803437753.png

1610803437778.png

启动后遇到两个问题

有重复消费问题

消息持久化问题

消费者

目前是8802/8803同时都收到了,存在重复消费问题

1610803437802.png

如何解决

分组和持久化属性group

生产实际案例

比如在如下场景中,订单系统我们做集群部暑,都会从 RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避兔这种情况。

这时我们就可以使用 Stream中的消息分组来解决

1610803437844.png

注意在 Stream中处于同一个 group中的多个消费者是克争关系,就能够保证消息只会被其中个应用消费一次。

不同组是可以全面消费的(重复消费)

同一组内会发生竟争关系,只有其中一个可以消费

分组解决重复消费

故障现象:重复消费
导致原因:默认分组是不同的,组流水号不一样,被认为是不同的组,可以消费

解决方案:
微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。

8802/8803都变成不同组,group两个不同

group: suiyueranA,suiyueranB

8802修改yml

1610803437875.png

8803修改yml

1610803437903.png

看一下,消息界面中交换机的分组情况

1610803437941.png

还是存在重复消费

最终解决

8802/8803实现了轮询分组,每次只有一个消费者 8801模块的发的消息只能被8802或8803其中一个接收到,这样避免了重复消费

8802/8803都变成相同组,group两个相同

group: suiyueranA

8802修改yml
8803修改yml

1610803437971.png

同一个组的多个微服务实例,每次只会有一个拿到

持久化

通过上述,解决了重复消费问题,再看看持久化

停止8802/8803并去除掉8802的分组group:suiyueranA,但是8803的分组group:suiyueranA没有去掉

8801先发送4条信息到rabbitmq

先启动8802,无分组属性配置,后台没有打出来消息

1610803438009.png

启动8803,有分组属性配置,后台打出来了MQ上的消息

1610803438041.png