同步调用的优点:
-
同步调用的问题:
耦合度高
- 性能下降
- 额外资源消耗
-
什么是MQ
中午是消息队列,存放消息的队列。事件驱动架构中的Broker
介绍消息队列的作用?
好处:
吞吐量提升:无需等待订阅者处理完成,响应更快速
- 故障隔离:服务没有直接调用,不存在级联失败问题
- 调用间没有阻塞,不会造成无效的资源占用
- 耦合度极低,每个服务都可以灵活插拔,可替换
- 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件
缺点:
- 架构复杂了,业务没有明显的流程线,不好管理
-
MQ的应用场景
提供应用与应用间消息通信的软件
- 异步处理
- 应用解耦
- 流量削峰
- 消息通知
缺点
是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求
rabbitmq支持哪些消息模式
简单模式
- 工作队列模式
- 发布订阅模式
- 路由模式
-
如何使用RabbitMQ收发消息?(基于SpringAMQP)?
发消息
父工程中引入依赖
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
在publisher服务的application.yml中添加配置
spring: rabbitmq: host: 192.168.150.101 # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast # 用户名 password: 123321 # 密码注入RabbitTemplate发送消息
收消息
父工程中引入依赖
<!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>在consumer服务的application.yml中添加配置
spring: rabbitmq: host: 192.168.150.101 # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast # 用户名 password: 123321 # 密码-
如何项目中声明交换机、队列及绑定关系?
创建配置类config添加@Configuration+@Bean ```yaml 展开/收起节点 聚焦 全部评论0
发表
@Configuration public class FanoutConfig { /**
* 声明交换机
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2个队列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
- @RabbitListener中通过注解声明
```yaml
@RabbitListener(bindings = @QueueBinding(
//消息队列
value = @Queue(name = "topic.queue1"),
//交换机队列
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
//通配符
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
//消息队列
value = @Queue(name = "topic.queue2"),
//交换机队列
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
//通配符
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}
消息转换器的作用?
- 让消息体积变小
- 提高安全性
-
如何修改MQ默认的消息转换器?
在publisher和consumer两个服务中都引入依赖
<dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency>在启动类中添加一个Bean
@Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); }MQ高级:
rabbitmq如何保证消息的可靠性?
开启生产者确认机制,确保生产者的消息能到达队列
- 常见的丢失原因包括
- 发送时丢失
- 生产者发送的消息未送达exchange
- 消息到达exchange后未到达queue
- MQ宕机,queue将消息丢失
- consumer接收到消息后未消费就宕机
- 发送时丢失
- 常见的丢失原因包括
- 开启持久化功能,确保消息未消费前在队列中不会丢失
- 交换机持久化
- 队列持久化
- 消息持久化
- 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
- 三种确认模式
- manual:手动ack,需要在业务代码结束后,调用api发送ack
- auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
- none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
- 三种确认模式
开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理
解决消息堆积有两种思路
- 队列上绑定多个消费者,提高消费速度
- 使用惰性队列,可以再mq中保存更多消息
- 惰性队列的优点有哪些
- 基于磁盘存储,消息上限高
- 没有间歇性的page-out,性能比较稳定
惰性队列的缺点有哪些
利用本地重试,而不是无限制的requeue到mq队列
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理
rabbitmq如何保证高可用?
MQ集群
本地重试
修改consumer服务的application.yml文件,添加内容
spring: rabbitmq: listener: simple: retry: enabled: true # 开启消费者失败重试 initial-interval: 1000ms # 初识的失败等待时长为1秒 multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval max-attempts: 3 # 最大重试次数 stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false结论
- 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
- 重试达到最大次数后,Spring会返回ack,消息会被丢弃
失败策略
死信交换机
- 消息被消费者reject或者返回nack
- 消息超时未消费
- 队列满了
- TTL 接收超时死信的死信交换机
- 给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信
- 给消息设置ttl属性,队列接收到消息超过ttl时间后变为死信
延迟队列
- 延迟队列的使用场景包括
- 延迟发送短信
- 用户下单,如果用户在15 分钟内未支付,则自动取消
- 预约工作会议,20分钟后自动通知所有参会人员
使用DelayExchange插件
- DelayExchange需要将一个交换机声明为delayed类型,流程如下
- 接收消息
- 判断消息是否具备x-delay属性
- 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间
- 返回routing not found结果给消息发送者
- x-delay时间到期后,重新投递消息到指定队列
使用DelayExchange
声明DelayExchange交换机
基于注解方式(推荐)
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "delay.queue", durable = "true"), exchange = @Exchange(name = "delay.direct",delayed = "true"), key = "delay" )) public void listenDelayedQueue(String msg){ log.info("接收到 delay.queue的延迟消息:{}", msg); }也可以基于@Bean的方式
- DelayExchange需要将一个交换机声明为delayed类型,流程如下
- 延迟队列的使用场景包括

- 发送消息
@Test
public void testDelayedMsg() {
// 创建消息
Message message = MessageBuilder
.withBody("hello, delay message".getBytes(StandardCharsets.UTF_8))
.setHeader("x-delay",10000)
.build();
// 消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息
rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
log.debug("发送消息成功");
}
rabbitmq在项目中的使用场景?
