Spring Cloud Stream 消息驱动组件帮助我们更快速,更方便,更友好的去构建消息 驱动微服务的
Spring Cloud Stream进行了很好的上层抽象
本质:屏蔽掉了底层不同MQ消息中间件之间的差异,统一了MQ的编程模型,降低 了学习、开发、维护MQ的成本

Spring Cloud Stream 是一个构建消息驱动微服务的框架。应用程序通过inputs(相 当于消息消费者consumer)或者outputs(相当于消息生产者producer)来与 Spring Cloud Stream中的binder对象交互,而Binder对象是用来屏蔽底层MQ细节 的,它负责与具体的消息中间件交互。

使用时只需要知道如何使用Spring Cloud Stream与Binder对象 交互即可
image.png

Stream编程注解

如下的注解无非在做一件事,把我们结构图中那些组成部分上下关联起来,打通通 道(这样的话生产者的message数据才能进入mq,mq中数据才能进行消费者工程)。

注解 描述
@Input(在消费者工程中使用) 注解标识输⼊通道,通过该输⼊通道 接收到的消息进入应用程序
@Output(在生产者工程中使用) 注解标识输出通道,发布的消息将通 过该通道离开应用程序
@StreamListener(在消费者工程中使用,监听message的到来) 监听队列,用于消费者的队列的消息 的接收(有消息监听…..)
@EnableBinding 把Channel和Exchange(对于 RabbitMQ)绑定在一起

基于RabbitMQ创建三个工程

前提是安装并启动RabbitMQ

创建生产者项目

1、创建项目并引入依赖

是创建生产者项目 lagou-cloud-stream-producer-9090

  1. <!--eureka client 客户端依赖引入-->
  2. <dependency>
  3. <groupId>org.springframework.cloud</groupId>
  4. <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  5. </dependency>
  6. <!--spring cloud stream 依赖(rabbit)-->
  7. <dependency>
  8. <groupId>org.springframework.cloud</groupId>
  9. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  10. </dependency>

2、 在application.yml添加配置

  1. server:
  2. port: 9090
  3. spring:
  4. application:
  5. name: lagou-cloud-stream-producer
  6. cloud:
  7. stream:
  8. binders: # 绑定MQ服务信息(此处我们是RabbitMQ)
  9. lagouRabbitBinder: # 给Binder定义的名称,用于后面的关联
  10. type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka
  11. environment: # MQ环境配置(用户名、密码等)
  12. spring:
  13. rabbitmq:
  14. host: localhost
  15. port: 5672
  16. username: guest
  17. password: guest
  18. bindings: # 关联整合通道和binder对象(output,input可以配置多个,和同时存在)
  19. output: # output是我们定义的通道名称,此处不能乱改
  20. destination: lagouExchange # 要使用的Exchange名称(消息队列主题名称)
  21. content-type: text/plain # application/json # 消息类型设置,比如json
  22. binder: lagouRabbitBinder # 关联MQ服务
  23. eureka:
  24. client:
  25. serviceUrl: # eureka server的路径
  26. defaultZone: http://lagoucloudeurekaservera:8761/eureka/,http://lagoucloudeurekaserverb:8762/eureka/ #把 eureka 集群中的所有 url 都填写了进来,也可以只写一台,因为各个 eureka server 可以同步注册表
  27. instance:
  28. prefer-ip-address: true #使用ip注册

3、 创建启动类

  1. @SpringBootApplication
  2. @EnableDiscoveryClient
  3. public class StreamProducerApplication9090 {
  4. public static void main(String[] args) {
  5. SpringApplication.run(StreamProducerApplication9090.class,args);
  6. }
  7. }

4、 业务类开发

(发送消息接口、接口实现类、Controller)

创建接口

  1. public interface IMessageProducer {
  2. public void sendMessage(String content);
  3. }

接口实现类

定义类是输出的(用Source),并且通过自动注入Source接口,进行调用输出

  1. // Source.class里面就是对输出通道的定义(这是Spring Cloud Stream内置的通道封装)
  2. @EnableBinding(Source.class)
  3. public class MessageProducerImpl implements IMessageProducer {
  4. // 将MessageChannel的封装对象Source注入到这里使用
  5. @Autowired
  6. private Source source;
  7. @Override
  8. public void sendMessage(String content) {
  9. // 向mq中发送消息(并不是直接操作mq,应该操作的是spring cloud stream)
  10. // 使用通道向外发出消息(指的是Source里面的output通道)
  11. source.output().send(MessageBuilder.withPayload(content).build());
  12. }
  13. }

测试类

  1. @SpringBootTest(classes = {StreamProducerApplication9090.class})
  2. @RunWith(SpringJUnit4ClassRunner.class)
  3. public class MessageProducerTest {
  4. @Autowired
  5. private IMessageProducer iMessageProducer;
  6. @Test
  7. public void testSendMessage() {
  8. iMessageProducer.sendMessage("hello world-lagou101");
  9. }
  10. }

创建消费者项目

1、创建项目并引入依赖(同上)

2、 在application.yml添加配置

作为输入给消费者,端口号是9091,使用是input

  1. server:
  2. port: 9091
  3. spring:
  4. application:
  5. name: lagou-cloud-stream-producer
  6. cloud:
  7. stream:
  8. binders: # 绑定MQ服务信息(此处我们是RabbitMQ)
  9. lagouRabbitBinder: # 给Binder定义的名称,用于后面的关联
  10. type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka
  11. environment: # MQ环境配置(用户名、密码等)
  12. spring:
  13. rabbitmq:
  14. host: localhost
  15. port: 5672
  16. username: guest
  17. password: guest
  18. bindings: # 关联整合通道和binder对象(output,input可以配置多个,和同时存在)
  19. input: # output是我们定义的通道名称,此处不能乱改
  20. destination: lagouExchange # 要使用的Exchange名称(消息队列主题名称)
  21. content-type: text/plain # application/json # 消息类型设置,比如json
  22. binder: lagouRabbitBinder # 关联MQ服务
  23. eureka:
  24. client:
  25. serviceUrl: # eureka server的路径
  26. defaultZone: http://lagoucloudeurekaservera:8761/eureka/,http://lagoucloudeurekaserverb:8762/eureka/ #把 eureka 集群中的所有 url 都填写了进来,也可以只写一台,因为各个 eureka server 可以同步注册表
  27. instance:
  28. prefer-ip-address: true #使用ip注册

3、创建消息消费类并监听

sink作为输入到消费端

  1. @EnableBinding(Sink.class)
  2. public class MessageConsumerService {
  3. @StreamListener(Sink.INPUT)
  4. public void recevieMessages(Message<String> message) {
  5. System.out.println("=========接收到的消息:" + message);
  6. }
  7. }

注意:消费者要先启动,生产者后启动

Stream之消息分组

如上的情况,消费者端有两个(消费同一个MQ的同一个主题),但是呢我们的 业务场景中希望这个主题的一个Message只能被一个消费者端消费处理,此时我们 就可以使用消息分组。
如果不进行消息分组,同一个消息就会被多个消费者消费。如果定义为同一个组,那么就只能被组内其中一个消费了。

消费者端添加消息分组

在bindings的下面添加 group: lagou001。所有多个消费者只要定义了同一个分组,一条消息过来,可用持久性,而且只能被一个消费者消费。

  1. bindings: # 关联整合通道和binder对象
  2. input: # output是我们定义的通道名称,此处不能乱改
  3. destination: lagouExchange # 要使用的Exchange名称(消息队列主题名称)
  4. content-type: text/plain # application/json # 消息类型设置,比如json
  5. binder: lagouRabbitBinder # 关联MQ服务
  6. group: lagou001