4.1-stream-概述

• Spring Cloud Stream 是一个构建消息驱动微服务应用的框架。
• Stream 解决了开发人员无感知的使用消息中间件的问题,因为Stream对消息中间件的进一步封装,可以做
到代码层面对中间件的无感知,甚至于动态的切换中间件,使得微服务开发的高度解耦,服务可以关注更多
自己的业务流程。
• Spring Cloud Stream目前支持两种消息中间件RabbitMQ和Kafka
1587801268585.png

1587801281505.png

4.2-stream-组件

• Spring Cloud Stream 构建的应用程序与消息中间件之间是通过绑定器 Binder相关联的。绑定器对于应用程序而言起到了隔离作用, 它使得不同消息中间件的实现细节对应用程序来说是透明的。

• binding 是我们通过配置把应用和spring cloud stream 的 binder 绑定在一起

• output:发送消息 Channel,内置 Source接口

• input:接收消息 Channel,内置 Sink接口

1587801301281.png

4.3-stream-消息生产者

  1. 创建消息生产者模块,引入依赖 starter-stream-rabbit

    1. <!-- stream -->
    2. <dependency>
    3. <groupId>org.springframework.cloud</groupId>
    4. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    5. </dependency>
  2. 编写配置,定义 binder,和 bingings

    1. server:
    2. port: 8000
    3. spring:
    4. cloud:
    5. stream:
    6. # 定义绑定器,绑定到哪个消息中间件上
    7. binders:
    8. itheima_binder: # 自定义的绑定器名称
    9. type: rabbit # 绑定器类型
    10. environment: # 指定mq的环境
    11. spring:
    12. rabbitmq:
    13. host: localhost
    14. port: 5672
    15. username: guest
    16. password: guest
    17. virtual-host: /
    18. bindings:
    19. output: # channel名称
    20. binder: itheima_binder #指定使用哪一个binder
    21. destination: itheima_exchange # 消息目的地
  3. 定义消息发送业务类。添加 @EnableBinding(Source.class),注入
    MessageChannel output ,完成消息发送

MessageProducer

  1. package com.itheima.stream.producer;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.cloud.stream.annotation.EnableBinding;
  4. import org.springframework.cloud.stream.messaging.Source;
  5. import org.springframework.messaging.MessageChannel;
  6. import org.springframework.messaging.support.MessageBuilder;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. @EnableBinding(Source.class)
  10. public class MessageProducer {
  11. @Autowired
  12. private MessageChannel output;
  13. public void send(){
  14. String msessage = "hello stream~~~";
  15. //发送消息
  16. output.send(MessageBuilder.withPayload(msessage).build());
  17. System.out.println("消息发送成功~~~");
  18. }
  19. }

ProducerController

  1. package com.itheima.stream.producer;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.web.bind.annotation.RequestMapping;
  4. import org.springframework.web.bind.annotation.RestController;
  5. @RestController
  6. public class ProducerController {
  7. @Autowired
  8. private MessageProducer producer;
  9. @RequestMapping("/send")
  10. public String sendMsg(){
  11. producer.send();
  12. return "success";
  13. }
  14. }
  1. 编写启动类,测试
  1. package com.itheima.stream;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class ProducerApp {
  6. public static void main(String[] args) {
  7. SpringApplication.run(ProducerApp.class,args);
  8. }
  9. }

4.4-stream-消息消费者

  1. 创建消息消费者模块,引入依赖 starter-stream-rabbit

    1. <!-- stream -->
    2. <dependency>
    3. <groupId>org.springframework.cloud</groupId>
    4. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    5. </dependency>
  2. 编写配置,定义 binder,和 bingings
    ```yaml server: port: 9000

spring: cloud: stream:

  1. # 定义绑定器,绑定到哪个消息中间件上
  2. binders:
  3. itheima_binder: # 自定义的绑定器名称
  4. type: rabbit # 绑定器类型
  5. environment: # 指定mq的环境
  6. spring:
  7. rabbitmq:
  8. host: localhost
  9. port: 5672
  10. username: guest
  11. password: guest
  12. virtual-host: /
  13. bindings:
  14. input: # channel名称
  15. binder: itheima_binder #指定使用哪一个binder
  16. destination: itheima_exchange # 消息目的地
  1. 3. 定义消息接收业务类。添加 @EnableBinding(Sink.class),使用<br />@StreamListener(Sink.INPUT),完成消息接收。
  2. ```java
  3. package com.itheima.stream.consumer;
  4. import org.springframework.cloud.stream.annotation.EnableBinding;
  5. import org.springframework.cloud.stream.annotation.StreamListener;
  6. import org.springframework.cloud.stream.messaging.Sink;
  7. import org.springframework.messaging.Message;
  8. import org.springframework.stereotype.Component;
  9. /**
  10. * 消息接收类
  11. */
  12. @EnableBinding({Sink.class})
  13. @Component
  14. public class MessageListener {
  15. @StreamListener(Sink.INPUT)
  16. public void receive(Message message){
  17. System.out.println(message);
  18. System.out.println(message.getPayload());
  19. }
  20. }
  1. 编写启动类,测试
  1. package com.itheima.stream;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class ConsumerApp {
  6. public static void main(String[] args) {
  7. SpringApplication.run(ConsumerApp.class,args);
  8. }
  9. }