4.1-stream-概述
• Spring Cloud Stream 是一个构建消息驱动微服务应用的框架。
• Stream 解决了开发人员无感知的使用消息中间件的问题,因为Stream对消息中间件的进一步封装,可以做
到代码层面对中间件的无感知,甚至于动态的切换中间件,使得微服务开发的高度解耦,服务可以关注更多
自己的业务流程。
• Spring Cloud Stream目前支持两种消息中间件RabbitMQ和Kafka

4.2-stream-组件
• Spring Cloud Stream 构建的应用程序与消息中间件之间是通过绑定器 Binder相关联的。绑定器对于应用程序而言起到了隔离作用, 它使得不同消息中间件的实现细节对应用程序来说是透明的。
• binding 是我们通过配置把应用和spring cloud stream 的 binder 绑定在一起
• output:发送消息 Channel,内置 Source接口
• input:接收消息 Channel,内置 Sink接口

4.3-stream-消息生产者
创建消息生产者模块,引入依赖 starter-stream-rabbit
<!-- stream --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
编写配置,定义 binder,和 bingings
server:port: 8000spring:cloud:stream:# 定义绑定器,绑定到哪个消息中间件上binders:itheima_binder: # 自定义的绑定器名称type: rabbit # 绑定器类型environment: # 指定mq的环境spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /bindings:output: # channel名称binder: itheima_binder #指定使用哪一个binderdestination: itheima_exchange # 消息目的地
定义消息发送业务类。添加 @EnableBinding(Source.class),注入
MessageChannel output ,完成消息发送
MessageProducer
package com.itheima.stream.producer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.messaging.Source;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;@Component@EnableBinding(Source.class)public class MessageProducer {@Autowiredprivate MessageChannel output;public void send(){String msessage = "hello stream~~~";//发送消息output.send(MessageBuilder.withPayload(msessage).build());System.out.println("消息发送成功~~~");}}
ProducerController
package com.itheima.stream.producer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class ProducerController {@Autowiredprivate MessageProducer producer;@RequestMapping("/send")public String sendMsg(){producer.send();return "success";}}
- 编写启动类,测试
package com.itheima.stream;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class ProducerApp {public static void main(String[] args) {SpringApplication.run(ProducerApp.class,args);}}
4.4-stream-消息消费者
创建消息消费者模块,引入依赖 starter-stream-rabbit
<!-- stream --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
编写配置,定义 binder,和 bingings
```yaml server: port: 9000
spring: cloud: stream:
# 定义绑定器,绑定到哪个消息中间件上binders:itheima_binder: # 自定义的绑定器名称type: rabbit # 绑定器类型environment: # 指定mq的环境spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /bindings:input: # channel名称binder: itheima_binder #指定使用哪一个binderdestination: itheima_exchange # 消息目的地
3. 定义消息接收业务类。添加 @EnableBinding(Sink.class),使用<br />@StreamListener(Sink.INPUT),完成消息接收。```javapackage com.itheima.stream.consumer;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.annotation.StreamListener;import org.springframework.cloud.stream.messaging.Sink;import org.springframework.messaging.Message;import org.springframework.stereotype.Component;/*** 消息接收类*/@EnableBinding({Sink.class})@Componentpublic class MessageListener {@StreamListener(Sink.INPUT)public void receive(Message message){System.out.println(message);System.out.println(message.getPayload());}}
- 编写启动类,测试
package com.itheima.stream;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class ConsumerApp {public static void main(String[] args) {SpringApplication.run(ConsumerApp.class,args);}}
