概述

  • 解决的问题:一个系统中使用的消息中间件可能有多种(比如微服务使用的是RabbitMQ,而大数据使用的是Kafka),SpringCloud Stream消息驱动屏蔽这些消息中间件的差异,统一消息的编程模型,对于多种消息中间件,只需要操作SpringCloud Stream即可

  • SpringCloud Stream官网

    SpringCloud Stream消息驱动 - 图1

Spring Cloud Stream是用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务框架,该框架提供了一个灵活的编程模型,它建立在已经建立和熟悉的Spring熟语和最佳实践上,包括支持持久化的发布/订阅、消费组以及消息分区这三个核心概念

  • 结构,SpringCloud官方介绍

    Spring Cloud Stream 是一个构建消息驱动微服务的框架,应用程序通过input通道或者output通道来与Spring Cloud Stream中Binder交互,通过配置来Binding。 而Spring Cloud Stream的binder负责与中间件交互, 消息的中间件有(RabbitMQ, Kafka, ActiveMQ)。

SpringCloud Stream消息驱动 - 图2

设计思想

  • 通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

    • 在没有绑定器之前,我们的应用程序都是直接和消息中间件进行交互。

    • 如果需要更换消息中间件,那么我们的应用程序也必须进行修改

    • Binder相当于在我们应用程序和消息中间件的中间层,应用程序直接与Binder进行交互,而不用考虑底层是哪一种消息中间件

      SpringCloud Stream消息驱动 - 图3

  • Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。

  • SpringCloud中的消息通信方式遵循Pub/Sub模式(发布/订阅模式)

编程模型

SpringCloud Stream消息驱动 - 图4

消费方式

  1. 发布订阅模式

    SpringCloud Stream消息驱动 - 图5

  1. 消费者组模式

    SpringCloud Stream消息驱动 - 图6

可以解决重复消费的问题

  1. 分区模式

    SpringCloud Stream消息驱动 - 图7

将一组数据发给同一个消费者进行处理,确保一致性或者顺序原因。比如同一个订单的所有数据,发给一个消费者进行处理,避免像消费者组模式,一个订单的消息交给多个消费者处理

案例实现

  • 模块实现需要一个生产者以及两个消费者,消息中间件采用RabbitMQ

消息生产者

  1. 创建Module

    SpringCloud Stream消息驱动 - 图8

  1. 修改pom文件

    需要引入rabbitmq的依赖

```xml

org.springframework.cloud spring-cloud-starter-stream-rabbit

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-actuator</artifactId>
  8. </dependency>
  9. <dependency>
  10. <groupId>org.springframework.cloud</groupId>
  11. <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  12. </dependency>
  13. <!--基础配置-->
  14. <dependency>
  15. <groupId>org.springframework.boot</groupId>
  16. <artifactId>spring-boot-devtools</artifactId>
  17. <scope>runtime</scope>
  18. <optional>true</optional>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.projectlombok</groupId>
  22. <artifactId>lombok</artifactId>
  23. <optional>true</optional>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.springframework.boot</groupId>
  27. <artifactId>spring-boot-starter-test</artifactId>
  28. <scope>test</scope>
  29. </dependency>
  30. </dependencies>

```

  1. 配置文件

    SpringCloud Stream消息驱动 - 图9

  1. 主启动类

    普通的SpringBoot应用

SpringCloud Stream消息驱动 - 图10

  1. 业务类

    Service层,用于定义发送消息的接口以及实现类。通过此接口可以将消息发送给RabbitMQ中的对应的Exchange

SpringCloud Stream消息驱动 - 图11

Controller层,用于调用向RabbitMQ中发送消息

SpringCloud Stream消息驱动 - 图12

  1. 测试

    SpringCloud Stream消息驱动 - 图13

消息消费者

  1. 创建Module

    SpringCloud Stream消息驱动 - 图14

  1. 修改pom文件:和生产者的pom一致

  2. 编写配置文件

    SpringCloud Stream消息驱动 - 图15

  1. 主启动类

    SpringCloud Stream消息驱动 - 图16

  1. 业务类

    SpringCloud Stream消息驱动 - 图17

小细节纠正

SpringCloud Stream消息驱动 - 图18

与中间件建立一个输入通道(消费)

SpringCloud Stream消息驱动 - 图19

  1. 测试

    生产者生产消息,消费者消费消息

SpringCloud Stream消息驱动 - 图20

至此,我们没有任何直接与RabbitMQ的交互,通过Stream操作来完成了消息的生产与消费。

消息的重复消费

  1. 搭建一个与8802消费者一样的模块8803

    SpringCloud Stream消息驱动 - 图21

  1. 生产者生产消息,两个消费者都消费了消息

    SpringCloud Stream消息驱动 - 图22

这种方式其实就是默认的发布订阅模式

因为默认情况下,一个消费者是一个组

SpringCloud Stream消息驱动 - 图23

  1. 将消费者放到同一个组进行竞争消费

    修改消费者配置,添加group属性,并设置相同

SpringCloud Stream消息驱动 - 图24

SpringCloud Stream消息驱动 - 图25

  1. 测试

    两个消费者竞争消费

SpringCloud Stream消息驱动 - 图26

消费者组的持久化演示

  • 消费方式中,如果一个组至少有一个消费者,该组就会收到消息,即使这些消息是所有消费者都停止运行时发送的。

    先关闭8802,8803,然后8801发送消息。

由于之前生产者已经给Group1消费者组发送给消息了,所以这里并不会消失

SpringCloud Stream消息驱动 - 图27

8801发送消息,此时8802,8803都在关闭状态

SpringCloud Stream消息驱动 - 图28

启动Group1中的任何一个实例,发现即使是消费者全部停止后,生产者生产的消息会保存到Group中(副本)。

只要这个Group中的任何一个消费者重新启动,都会进行消息的消费

SpringCloud Stream消息驱动 - 图29

SpringCloud Stream消息驱动 - 图30