Stream消息驱动

屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。

Spring Cloud Stream 是一个构建消息驱动微服务的框架。

应用程序通过inputs 或者 outputs 来与Spring Cloud Stream 中 binder对象交互。通过午门配置来绑定,而Spring Cloud Stream的binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。

通过使用Spring Integration 来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了 发布-订阅、消费组、分区的三个核心概念。

如果我们项目使用了两种不同的MQ,不同的MQ之间存在差异(例如RabbitMQ有交换机,kafka有分区)。这些中间件的差异导致如果我们想向另外一种消息队列进行迁移,无疑就是灾难性的。Spring Cloud Stream提供了一种接耦合的方式。

原理:在没有绑定器的概念的情况下,SpringBoot应用要直接与消息中间件进行消息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。

Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件。

消息生产者:将业务逻辑发送给Spring Cloud Stream (Source、Channel、Binder)

Spring Cloud Stream 发送到MQ

消息消费者:获取业务逻辑 Spring Cloud Stream(Sink、Channel、Binder)

Binder:很方便的连接中间件,屏蔽差异

Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置

Source和Sink:简单的可以理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接收消息就是输入。

编码API和常用注解:

Middleware:中间件,目前官方只支持RabbitMQ、Kafka

Binder:Binder是应用与消息中间件之间的封装,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应Kafka的topic、RabbitMQ的exchange),这些都可以通过配置文件来实现

@Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序

@Output:注解标识输出通道,发布的消息将通过该通道离开应用程序

@StreamListener:监听队列,用于消费者的队列的消息接收

@EnableBinding:指信道channel和exchange绑定在一起

环境搭建

生产者搭建

  1. 创建maven工程

  2. 加入pom依赖

    1. <dependencies>
    2. <!-- rabbit类型的stream依赖 -->
    3. <dependency>
    4. <groupId>org.springframework.cloud</groupId>
    5. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    6. </dependency>
    7. <dependency>
    8. <groupId>org.springframework.boot</groupId>
    9. <artifactId>spring-boot-starter-web</artifactId>
    10. </dependency>
    11. <dependency>
    12. <groupId>org.springframework.boot</groupId>
    13. <artifactId>spring-boot-starter-actuator</artifactId>
    14. </dependency>
    15. <dependency>
    16. <groupId>org.springframework.cloud</groupId>
    17. <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    18. </dependency>
    19. <dependency>
    20. <groupId>org.springframework.boot</groupId>
    21. <artifactId>spring-boot-devtools</artifactId>
    22. </dependency>
    23. <dependency>
    24. <groupId>org.springframework.boot</groupId>
    25. <artifactId>spring-boot-starter-test</artifactId>
    26. </dependency>
    27. <dependency>
    28. <groupId>org.projectlombok</groupId>
    29. <artifactId>lombok</artifactId>
    30. </dependency>
    31. </dependencies>
  1. 添加相关配置
    server:
    port: 8801
    spring:
    application:
     name: cloud-stream-provider
    cloud:
     stream:
       binders: # 配置要绑定的rabbitmq的服务信息
         defaultRabbit: # 定义的名称,用于binding整合
           type: rabbit # MQ类型
           environment: # 设置rabbitmq的相关环境变量
             spring:
               rabbitmq:
                 host: 192.168.29.136
                 port: 5672
                 username: guest
                 passsword: guest
       bindings: # 服务的整合处理
         output: # 通道的名称
           destination: studyExchange # 要使用的exchange名称定义
           content-type: application/json # 消息类型,文本为text/plain
           binder: defaultRabbit # 要绑定的消息服务器的具体设置
    eureka:
    client:
     fetch-registry: true
     register-with-eureka: false
     service-url:
       defaultZone: http://localhost:7001/eureka
    
  1. 编写主启动类
    @SpringBootApplication
    public class StreamMQMain8801 {
     public static void main(String[] args) {
         SpringApplication.run(StreamMQMain8801.class, args);
     }
    }
    
  1. 编写Service接口 和实现类
    public interface IMessageProvider {
     String send();
    }
    
@EnableBinding(Source.class) // 不需要写@Service,这里使用的stream,所以用Stream的@EnableBinding
public class MessageProviderImpl implements IMessageProvider {

    @Resource
    private MessageChannel output; // 消息发送通道

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("----serialNo:" + serial);
        return null;
    }
}
  1. 编写测试用的Controller

    @RestController
    @Slf4j
    public class SendMessageController {
     @Autowired
     private IMessageProvider messageProvider;
    
     @GetMapping("/sendMessage")
     public String sendMessage() {
         return messageProvider.send();
     }
    }
    

启动后, 如果rabbitmq的健康检查连接报错(Rabbit health check failed),可以使用如下配置关闭rabbit的健康检查(不影响rabbitmq的使用)

management:
  health:
    rabbit:
      enabled: false

也可以在spring下再配置一次rabbitmq的连接信息:

spring:
  rabbitmq:
    host: 192.168.29.136
    #port: 5672
    #username: guest
    #password: guest

访问:http://localhost:8801/sendMessage,即可向rabbitmq的studyExchange交换机发送消息

消费者搭建

  1. 创建maven工程

  2. 加入pom依赖(同生产者的依赖一样)

  3. 编写配置 ```yaml server: port: 8802 spring: rabbitmq: host: 192.168.29.136 # 此处配置的rabbitmq信息是为了防止rabbitmq健康检查报错,去掉也不影响stream的使用

    port: 5672

    username: guest

    password: guest

    application: name: cloud-stream-rabbitmq-consumer cloud: stream: binders:

     defaultRabbit:
       type: rabbit
       environment:
         spring:
           rabbitmq:
             host: 192.168.29.136
             #port: 5672
             #username: guest
             #password: guest
    

    bindings:

     input:  # 通道名称
       destination: studyExchange
       content-type: application/json
       binder: defaultRabbit
    

eureka: client: fetch-registry: true register-with-eureka: false service-url: defaultZone: http://localhost:7001/eureka



4. 
编写主启动类
```java
@SpringBootApplication
public class StreamMQConsumer8802 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQConsumer8802.class, args);
    }
}
  1. 编写消费者监听

    @EnableBinding(Sink.class)  // 消费者绑定的是Sink
    public class ReceiveMessageController {
     @Value("${server.port}")
     private String serverPort;
    
     @StreamListener(Sink.INPUT)  // 监听
     public void input(Message<String> message) {
         System.out.println("消费者[" + serverPort + "]:" + message.getPayload());
     }
    }
    

生产者每发送一条消息,在消费者控制台马上就可以打印出来

消息重复消费

Stream的默认情况下。一个生产者发送给MQ的消息,如果有多台消费者服务器,则多台消费者都能消费到该消息,类似于广播模式。这就会造成消息的重复消费。

可以使用分组来避免这种情况:Stream中处于同一个Group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费。但是不同的Group是可以重复消费的。

在消费者上指定分组:

spring:
  cloud:
    stream:
      bindings:
        input:
          group: study # 指定分组名

Stream中的Group其实对应的就是 RabbitMQ 的 队列Queue。

如果消费者不指定group,则stream会为其生成一个带有随机数的group,多个消费者每个都不重复。在rabbitmq控制台的queues中就能看到多个队列,点开exchange也可以看到下面绑定了多个队列。

如果在yaml中指定了group名,则最终在rabbitmq中生成的队列名为:交换机名.分组名

消息持久化

如果消费者端没有设置 group属性,在消费者服务器宕机下线期间,生产者发送的消息消费者将永远获取不到。

在消费者端设置了group属性后,会有消息持久化的功能,消费者服务器宕机下线期间生产者如果发送了消息,则在消费者服务器重启时可以自动获取这些消息并消费。