Stream消息驱动
屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。
Spring Cloud Stream 是一个构建消息驱动微服务的框架。
应用程序通过inputs 或者 outputs 来与Spring Cloud Stream 中 binder对象交互。通过午门配置来绑定,而Spring Cloud Stream的binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。
通过使用Spring Integration 来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了 发布-订阅、消费组、分区的三个核心概念。
如果我们项目使用了两种不同的MQ,不同的MQ之间存在差异(例如RabbitMQ有交换机,kafka有分区)。这些中间件的差异导致如果我们想向另外一种消息队列进行迁移,无疑就是灾难性的。Spring Cloud Stream提供了一种接耦合的方式。
原理:在没有绑定器的概念的情况下,SpringBoot应用要直接与消息中间件进行消息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件。
消息生产者:将业务逻辑发送给Spring Cloud Stream (Source、Channel、Binder)
Spring Cloud Stream 发送到MQ
消息消费者:获取业务逻辑 Spring Cloud Stream(Sink、Channel、Binder)
Binder:很方便的连接中间件,屏蔽差异
Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
Source和Sink:简单的可以理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接收消息就是输入。
编码API和常用注解:
Middleware:中间件,目前官方只支持RabbitMQ、Kafka
Binder:Binder是应用与消息中间件之间的封装,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应Kafka的topic、RabbitMQ的exchange),这些都可以通过配置文件来实现
@Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output:注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener:监听队列,用于消费者的队列的消息接收
@EnableBinding:指信道channel和exchange绑定在一起
环境搭建
生产者搭建
创建maven工程
加入pom依赖
<dependencies><!-- rabbit类型的stream依赖 --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>
- 添加相关配置
server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: binders: # 配置要绑定的rabbitmq的服务信息 defaultRabbit: # 定义的名称,用于binding整合 type: rabbit # MQ类型 environment: # 设置rabbitmq的相关环境变量 spring: rabbitmq: host: 192.168.29.136 port: 5672 username: guest passsword: guest bindings: # 服务的整合处理 output: # 通道的名称 destination: studyExchange # 要使用的exchange名称定义 content-type: application/json # 消息类型,文本为text/plain binder: defaultRabbit # 要绑定的消息服务器的具体设置 eureka: client: fetch-registry: true register-with-eureka: false service-url: defaultZone: http://localhost:7001/eureka
- 编写主启动类
@SpringBootApplication public class StreamMQMain8801 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8801.class, args); } }
- 编写Service接口 和实现类
public interface IMessageProvider { String send(); }
@EnableBinding(Source.class) // 不需要写@Service,这里使用的stream,所以用Stream的@EnableBinding
public class MessageProviderImpl implements IMessageProvider {
@Resource
private MessageChannel output; // 消息发送通道
@Override
public String send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("----serialNo:" + serial);
return null;
}
}
编写测试用的Controller
@RestController @Slf4j public class SendMessageController { @Autowired private IMessageProvider messageProvider; @GetMapping("/sendMessage") public String sendMessage() { return messageProvider.send(); } }
启动后, 如果rabbitmq的健康检查连接报错(Rabbit health check failed),可以使用如下配置关闭rabbit的健康检查(不影响rabbitmq的使用)
management:
health:
rabbit:
enabled: false
也可以在spring下再配置一次rabbitmq的连接信息:
spring:
rabbitmq:
host: 192.168.29.136
#port: 5672
#username: guest
#password: guest
访问:http://localhost:8801/sendMessage,即可向rabbitmq的studyExchange交换机发送消息
消费者搭建
创建maven工程
加入pom依赖(同生产者的依赖一样)
编写配置 ```yaml server: port: 8802 spring: rabbitmq: host: 192.168.29.136 # 此处配置的rabbitmq信息是为了防止rabbitmq健康检查报错,去掉也不影响stream的使用
port: 5672
username: guest
password: guest
application: name: cloud-stream-rabbitmq-consumer cloud: stream: binders:
defaultRabbit: type: rabbit environment: spring: rabbitmq: host: 192.168.29.136 #port: 5672 #username: guest #password: guestbindings:
input: # 通道名称 destination: studyExchange content-type: application/json binder: defaultRabbit
eureka: client: fetch-registry: true register-with-eureka: false service-url: defaultZone: http://localhost:7001/eureka
4.
编写主启动类
```java
@SpringBootApplication
public class StreamMQConsumer8802 {
public static void main(String[] args) {
SpringApplication.run(StreamMQConsumer8802.class, args);
}
}
编写消费者监听
@EnableBinding(Sink.class) // 消费者绑定的是Sink public class ReceiveMessageController { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) // 监听 public void input(Message<String> message) { System.out.println("消费者[" + serverPort + "]:" + message.getPayload()); } }
生产者每发送一条消息,在消费者控制台马上就可以打印出来
消息重复消费
Stream的默认情况下。一个生产者发送给MQ的消息,如果有多台消费者服务器,则多台消费者都能消费到该消息,类似于广播模式。这就会造成消息的重复消费。
可以使用分组来避免这种情况:Stream中处于同一个Group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费。但是不同的Group是可以重复消费的。
在消费者上指定分组:
spring:
cloud:
stream:
bindings:
input:
group: study # 指定分组名
Stream中的Group其实对应的就是 RabbitMQ 的 队列Queue。
如果消费者不指定group,则stream会为其生成一个带有随机数的group,多个消费者每个都不重复。在rabbitmq控制台的queues中就能看到多个队列,点开exchange也可以看到下面绑定了多个队列。
如果在yaml中指定了group名,则最终在rabbitmq中生成的队列名为:
交换机名.分组名
消息持久化
如果消费者端没有设置 group属性,在消费者服务器宕机下线期间,生产者发送的消息消费者将永远获取不到。
在消费者端设置了group属性后,会有消息持久化的功能,消费者服务器宕机下线期间生产者如果发送了消息,则在消费者服务器重启时可以自动获取这些消息并消费。
