Spring Cloud Stream消息驱动,可以简化开发人员对消息中间件的使用复杂度,让开发人员更多专注于核心业务逻辑的开发;目前消息驱动Spring Cloud Stream只支持RabbitMQ和Kafka;

21.png

  • Source: 当需要发送消息时,我们就需要通过Source,Source将会把我们所要发送的消息(POJO对象)进行序列化(默认转换成JSON格式字符串),然后将这些数据发送到Channel中;
  • Sink: 当我们需要监听消息时就需要通过Sink来,Sink负责从消息通道中获取消息,并将消息反序列化成消息对象(POJO对象),然后交给具体的消息监听处理进行业务处理;
  • Channel: 消息通道是Stream的抽象之一。通常我们向消息中间件发送消息或者监听消息时需要指定主题(Topic)/消息队列名称,但这样一旦我们需要变更主题名称的时候需要修改消息发送或者消息监听的代码,但是通过Channel抽象,我们的业务代码只需要对Channel就可以了,具体这个Channel对应的是那个主题,就可以在配置文件中来指定,这样当主题变更的时候我们就不用对代码做任何修改,从而实现了与具体消息中间件的解耦;
  • Binder: Stream中另外一个抽象层。通过不同的Binder可以实现与不同消息中间件的整合,比如上面的示例我们所使用的就是针对Kafka的Binder,通过Binder提供统一的消息收发接口,从而使得我们可以根据实际需要部署不同的消息中间件,或者根据实际生产中所部署的消息中间件来调整我们的配置。

1、RabbitMQ
生产者:

  1. <parent>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-parent</artifactId>
  4. <version>2.0.1.RELEASE</version>
  5. </parent>
  6. <dependencies>
  7. <!-- SpringBoot整合Web组件 -->
  8. <dependency>
  9. <groupId>org.springframework.boot</groupId>
  10. <artifactId>spring-boot-starter-web</artifactId>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.springframework.cloud</groupId>
  14. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  15. <version>2.0.1.RELEASE</version>
  16. </dependency>
  17. </dependencies>
  1. server:
  2. port: 9000
  3. spring:
  4. application:
  5. name: spingcloud-stream-producer
  6. # rabbitmq:
  7. # host: 192.168.174.128
  8. # port: 5672
  9. # username: guest
  10. # password: guest


  1. // 创建管道接口
  2. public interface SendMessageInterface {
  3. // 创建一个输出管道,用于发送消息
  4. @Output("my_msg")
  5. SubscribableChannel sendMsg();
  6. }


  1. @RestController
  2. public class SendMsgController {
  3. @Autowired
  4. private SendMessageInterface sendMessageInterface;
  5. @RequestMapping("/sendMsg")
  6. public String sendMsg() {
  7. String msg = UUID.randomUUID().toString();
  8. System.out.println("生产者发送内容msg:" + msg);
  9. Message build = MessageBuilder.withPayload(msg.getBytes()).build();
  10. sendMessageInterface.sendMsg().send(build);
  11. return "success";
  12. }
  13. }


  1. @SpringBootApplication
  2. @EnableBinding(SendMessageInterface.class) // 开启绑定
  3. public class AppProducer {
  4. public static void main(String[] args) {
  5. SpringApplication.run(AppProducer.class, args);
  6. }
  7. }


消费者

  1. <parent>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-parent</artifactId>
  4. <version>2.0.1.RELEASE</version>
  5. </parent>
  6. <dependencies>
  7. <!-- SpringBoot整合Web组件 -->
  8. <dependency>
  9. <groupId>org.springframework.boot</groupId>
  10. <artifactId>spring-boot-starter-web</artifactId>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.springframework.cloud</groupId>
  14. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  15. <version>2.0.1.RELEASE</version>
  16. </dependency>
  17. </dependencies>


  1. server:
  2. port: 9000
  3. spring:
  4. application:
  5. name: spingcloud-stream-consumer
  6. # rabbitmq:
  7. # host: 192.168.174.128
  8. # port: 5672
  9. # username: guest
  10. # password: guest


  1. public interface RedMsgInterface {
  2. // 从管道中获取消息
  3. @Input("my_msg")
  4. SubscribableChannel redMsg();
  5. }


  1. @Component
  2. public class Consumer {
  3. @StreamListener("my_msg")
  4. public void listener(String msg) {
  5. System.out.println("消费者获取生产消息:" + msg);
  6. }
  7. }



@SpringBootApplication
@EnableBinding(RedMsgInterface.class)
public class AppConsumer {

    public static void main(String[] args) {
        SpringApplication.run(AppConsumer.class, args);
    }

}

2、消费组

在现实的业务场景中,每一个微服务应用为了实现高可用和负载均衡,都会集群部署,按照上面我们启动了两个应用的实例,消息被重复消费了两次。为解决这个问题,Spring Cloud Stream 中提供了消费组,通过配置 spring.cloud.stream.bindings.myInput.group 属性为应用指定一个组名,下面修改下配置文件,


server:
  port: 8001
spring:
  application:
    name: spring-cloud-stream
#  rabbitmq:
#    host: 192.168.174.128
#    port: 5672
#    username: guest
#    password: guest
  cloud:
    stream:
      bindings:
        mymsg: ###指定 管道名称
          #指定该应用实例属于 stream 消费组
          group: stream

3、更改为Kafka


        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>

生产者配置

//生产者配置
server:
  port: 9000
spring:
  cloud:
    stream:
      # 设置成使用kafka
      kafka:
        binder:
          # Kafka的服务端列表,默认localhost
          brokers: 192.168.212.174:9092,192.168.212.175:9092,192.168.212.176:9092
          # Kafka服务端连接的ZooKeeper节点列表,默认localhost
          zkNodes: 192.168.212.174:2181,192.168.212.175:2181,192.168.212.176:2181
          minPartitionCount: 1
          autoCreateTopics: true
          autoAddPartitions: true

消费者配置

server:
  port: 8000
spring:
  application:
    name: springcloud_kafka_consumer
  cloud:
     instance-count: 1
     instance-index: 0
     stream:
        kafka:
          binder:
            brokers: 192.168.212.174:9092,192.168.212.175:9092,192.168.212.176:9092
            zk-nodes: 192.168.212.174:2181,192.168.212.175:2181,192.168.212.176:2181
            auto-add-partitions: true
            auto-create-topics: true
            min-partition-count: 1
        bindings:
          input:
            destination: my_msg
            group: s1
            consumer:
              autoCommitOffset: false
              concurrency: 1
              partitioned: false

SpringCloud消息驱动.doc上课.zip消息驱动原理.pptx