Spring Cloud Stream 消息驱动组件帮助我们更快速,更方便,更友好的去构建消息 驱动微服务的
Spring Cloud Stream进行了很好的上层抽象
本质:屏蔽掉了底层不同MQ消息中间件之间的差异,统一了MQ的编程模型,降低 了学习、开发、维护MQ的成本
Spring Cloud Stream 是一个构建消息驱动微服务的框架。应用程序通过inputs(相 当于消息消费者consumer)或者outputs(相当于消息生产者producer)来与 Spring Cloud Stream中的binder对象交互,而Binder对象是用来屏蔽底层MQ细节 的,它负责与具体的消息中间件交互。
使用时只需要知道如何使用Spring Cloud Stream与Binder对象 交互即可 
Stream编程注解
如下的注解无非在做一件事,把我们结构图中那些组成部分上下关联起来,打通通 道(这样的话生产者的message数据才能进入mq,mq中数据才能进行消费者工程)。
| 注解 | 描述 |
|---|---|
| @Input(在消费者工程中使用) | 注解标识输⼊通道,通过该输⼊通道 接收到的消息进入应用程序 |
| @Output(在生产者工程中使用) | 注解标识输出通道,发布的消息将通 过该通道离开应用程序 |
| @StreamListener(在消费者工程中使用,监听message的到来) | 监听队列,用于消费者的队列的消息 的接收(有消息监听…..) |
| @EnableBinding | 把Channel和Exchange(对于 RabbitMQ)绑定在一起 |
基于RabbitMQ创建三个工程
前提是安装并启动RabbitMQ
创建生产者项目
1、创建项目并引入依赖
是创建生产者项目 lagou-cloud-stream-producer-9090
<!--eureka client 客户端依赖引入--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><!--spring cloud stream 依赖(rabbit)--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
2、 在application.yml添加配置
server:port: 9090spring:application:name: lagou-cloud-stream-producercloud:stream:binders: # 绑定MQ服务信息(此处我们是RabbitMQ)lagouRabbitBinder: # 给Binder定义的名称,用于后面的关联type: rabbit # MQ类型,如果是Kafka的话,此处配置kafkaenvironment: # MQ环境配置(用户名、密码等)spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 关联整合通道和binder对象(output,input可以配置多个,和同时存在)output: # output是我们定义的通道名称,此处不能乱改destination: lagouExchange # 要使用的Exchange名称(消息队列主题名称)content-type: text/plain # application/json # 消息类型设置,比如jsonbinder: lagouRabbitBinder # 关联MQ服务eureka:client:serviceUrl: # eureka server的路径defaultZone: http://lagoucloudeurekaservera:8761/eureka/,http://lagoucloudeurekaserverb:8762/eureka/ #把 eureka 集群中的所有 url 都填写了进来,也可以只写一台,因为各个 eureka server 可以同步注册表instance:prefer-ip-address: true #使用ip注册
3、 创建启动类
@SpringBootApplication@EnableDiscoveryClientpublic class StreamProducerApplication9090 {public static void main(String[] args) {SpringApplication.run(StreamProducerApplication9090.class,args);}}
4、 业务类开发
创建接口
public interface IMessageProducer {public void sendMessage(String content);}
接口实现类
定义类是输出的(用Source),并且通过自动注入Source接口,进行调用输出
// Source.class里面就是对输出通道的定义(这是Spring Cloud Stream内置的通道封装)@EnableBinding(Source.class)public class MessageProducerImpl implements IMessageProducer {// 将MessageChannel的封装对象Source注入到这里使用@Autowiredprivate Source source;@Overridepublic void sendMessage(String content) {// 向mq中发送消息(并不是直接操作mq,应该操作的是spring cloud stream)// 使用通道向外发出消息(指的是Source里面的output通道)source.output().send(MessageBuilder.withPayload(content).build());}}
测试类
@SpringBootTest(classes = {StreamProducerApplication9090.class})@RunWith(SpringJUnit4ClassRunner.class)public class MessageProducerTest {@Autowiredprivate IMessageProducer iMessageProducer;@Testpublic void testSendMessage() {iMessageProducer.sendMessage("hello world-lagou101");}}
创建消费者项目
1、创建项目并引入依赖(同上)
2、 在application.yml添加配置
作为输入给消费者,端口号是9091,使用是input
server:port: 9091spring:application:name: lagou-cloud-stream-producercloud:stream:binders: # 绑定MQ服务信息(此处我们是RabbitMQ)lagouRabbitBinder: # 给Binder定义的名称,用于后面的关联type: rabbit # MQ类型,如果是Kafka的话,此处配置kafkaenvironment: # MQ环境配置(用户名、密码等)spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 关联整合通道和binder对象(output,input可以配置多个,和同时存在)input: # output是我们定义的通道名称,此处不能乱改destination: lagouExchange # 要使用的Exchange名称(消息队列主题名称)content-type: text/plain # application/json # 消息类型设置,比如jsonbinder: lagouRabbitBinder # 关联MQ服务eureka:client:serviceUrl: # eureka server的路径defaultZone: http://lagoucloudeurekaservera:8761/eureka/,http://lagoucloudeurekaserverb:8762/eureka/ #把 eureka 集群中的所有 url 都填写了进来,也可以只写一台,因为各个 eureka server 可以同步注册表instance:prefer-ip-address: true #使用ip注册
3、创建消息消费类并监听
sink作为输入到消费端
@EnableBinding(Sink.class)public class MessageConsumerService {@StreamListener(Sink.INPUT)public void recevieMessages(Message<String> message) {System.out.println("=========接收到的消息:" + message);}}
注意:消费者要先启动,生产者后启动
Stream之消息分组
如上的情况,消费者端有两个(消费同一个MQ的同一个主题),但是呢我们的 业务场景中希望这个主题的一个Message只能被一个消费者端消费处理,此时我们 就可以使用消息分组。
如果不进行消息分组,同一个消息就会被多个消费者消费。如果定义为同一个组,那么就只能被组内其中一个消费了。
消费者端添加消息分组
在bindings的下面添加 group: lagou001。所有多个消费者只要定义了同一个分组,一条消息过来,可用持久性,而且只能被一个消费者消费。
bindings: # 关联整合通道和binder对象input: # output是我们定义的通道名称,此处不能乱改destination: lagouExchange # 要使用的Exchange名称(消息队列主题名称)content-type: text/plain # application/json # 消息类型设置,比如jsonbinder: lagouRabbitBinder # 关联MQ服务group: lagou001
