Spring Cloud Stream 消息驱动组件帮助我们更快速,更方便,更友好的去构建消息 驱动微服务的
Spring Cloud Stream进行了很好的上层抽象
本质:屏蔽掉了底层不同MQ消息中间件之间的差异,统一了MQ的编程模型,降低 了学习、开发、维护MQ的成本
Spring Cloud Stream 是一个构建消息驱动微服务的框架。应用程序通过inputs(相 当于消息消费者consumer)或者outputs(相当于消息生产者producer)来与 Spring Cloud Stream中的binder对象交互,而Binder对象是用来屏蔽底层MQ细节 的,它负责与具体的消息中间件交互。
使用时只需要知道如何使用Spring Cloud Stream与Binder对象 交互即可
Stream编程注解
如下的注解无非在做一件事,把我们结构图中那些组成部分上下关联起来,打通通 道(这样的话生产者的message数据才能进入mq,mq中数据才能进行消费者工程)。
注解 | 描述 |
---|---|
@Input(在消费者工程中使用) | 注解标识输⼊通道,通过该输⼊通道 接收到的消息进入应用程序 |
@Output(在生产者工程中使用) | 注解标识输出通道,发布的消息将通 过该通道离开应用程序 |
@StreamListener(在消费者工程中使用,监听message的到来) | 监听队列,用于消费者的队列的消息 的接收(有消息监听…..) |
@EnableBinding | 把Channel和Exchange(对于 RabbitMQ)绑定在一起 |
基于RabbitMQ创建三个工程
前提是安装并启动RabbitMQ
创建生产者项目
1、创建项目并引入依赖
是创建生产者项目 lagou-cloud-stream-producer-9090
<!--eureka client 客户端依赖引入-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<!--spring cloud stream 依赖(rabbit)-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
2、 在application.yml添加配置
server:
port: 9090
spring:
application:
name: lagou-cloud-stream-producer
cloud:
stream:
binders: # 绑定MQ服务信息(此处我们是RabbitMQ)
lagouRabbitBinder: # 给Binder定义的名称,用于后面的关联
type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka
environment: # MQ环境配置(用户名、密码等)
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 关联整合通道和binder对象(output,input可以配置多个,和同时存在)
output: # output是我们定义的通道名称,此处不能乱改
destination: lagouExchange # 要使用的Exchange名称(消息队列主题名称)
content-type: text/plain # application/json # 消息类型设置,比如json
binder: lagouRabbitBinder # 关联MQ服务
eureka:
client:
serviceUrl: # eureka server的路径
defaultZone: http://lagoucloudeurekaservera:8761/eureka/,http://lagoucloudeurekaserverb:8762/eureka/ #把 eureka 集群中的所有 url 都填写了进来,也可以只写一台,因为各个 eureka server 可以同步注册表
instance:
prefer-ip-address: true #使用ip注册
3、 创建启动类
@SpringBootApplication
@EnableDiscoveryClient
public class StreamProducerApplication9090 {
public static void main(String[] args) {
SpringApplication.run(StreamProducerApplication9090.class,args);
}
}
4、 业务类开发
创建接口
public interface IMessageProducer {
public void sendMessage(String content);
}
接口实现类
定义类是输出的(用Source),并且通过自动注入Source接口,进行调用输出
// Source.class里面就是对输出通道的定义(这是Spring Cloud Stream内置的通道封装)
@EnableBinding(Source.class)
public class MessageProducerImpl implements IMessageProducer {
// 将MessageChannel的封装对象Source注入到这里使用
@Autowired
private Source source;
@Override
public void sendMessage(String content) {
// 向mq中发送消息(并不是直接操作mq,应该操作的是spring cloud stream)
// 使用通道向外发出消息(指的是Source里面的output通道)
source.output().send(MessageBuilder.withPayload(content).build());
}
}
测试类
@SpringBootTest(classes = {StreamProducerApplication9090.class})
@RunWith(SpringJUnit4ClassRunner.class)
public class MessageProducerTest {
@Autowired
private IMessageProducer iMessageProducer;
@Test
public void testSendMessage() {
iMessageProducer.sendMessage("hello world-lagou101");
}
}
创建消费者项目
1、创建项目并引入依赖(同上)
2、 在application.yml添加配置
作为输入给消费者,端口号是9091,使用是input
server:
port: 9091
spring:
application:
name: lagou-cloud-stream-producer
cloud:
stream:
binders: # 绑定MQ服务信息(此处我们是RabbitMQ)
lagouRabbitBinder: # 给Binder定义的名称,用于后面的关联
type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka
environment: # MQ环境配置(用户名、密码等)
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 关联整合通道和binder对象(output,input可以配置多个,和同时存在)
input: # output是我们定义的通道名称,此处不能乱改
destination: lagouExchange # 要使用的Exchange名称(消息队列主题名称)
content-type: text/plain # application/json # 消息类型设置,比如json
binder: lagouRabbitBinder # 关联MQ服务
eureka:
client:
serviceUrl: # eureka server的路径
defaultZone: http://lagoucloudeurekaservera:8761/eureka/,http://lagoucloudeurekaserverb:8762/eureka/ #把 eureka 集群中的所有 url 都填写了进来,也可以只写一台,因为各个 eureka server 可以同步注册表
instance:
prefer-ip-address: true #使用ip注册
3、创建消息消费类并监听
sink作为输入到消费端
@EnableBinding(Sink.class)
public class MessageConsumerService {
@StreamListener(Sink.INPUT)
public void recevieMessages(Message<String> message) {
System.out.println("=========接收到的消息:" + message);
}
}
注意:消费者要先启动,生产者后启动
Stream之消息分组
如上的情况,消费者端有两个(消费同一个MQ的同一个主题),但是呢我们的 业务场景中希望这个主题的一个Message只能被一个消费者端消费处理,此时我们 就可以使用消息分组。
如果不进行消息分组,同一个消息就会被多个消费者消费。如果定义为同一个组,那么就只能被组内其中一个消费了。
消费者端添加消息分组
在bindings的下面添加 group: lagou001。所有多个消费者只要定义了同一个分组,一条消息过来,可用持久性,而且只能被一个消费者消费。
bindings: # 关联整合通道和binder对象
input: # output是我们定义的通道名称,此处不能乱改
destination: lagouExchange # 要使用的Exchange名称(消息队列主题名称)
content-type: text/plain # application/json # 消息类型设置,比如json
binder: lagouRabbitBinder # 关联MQ服务
group: lagou001