- .消息可靠性
- mq的发送消息的时候可能会在发送给exchange交换机的时候丢失 可能在交换机发送给队列的时候丢失 也可能在监听消息的时候丢失
- 解决方案:
- 生产则确认机制:
- mq持久化
- 消费者确认机制
- 失败重试机制
- 生产者确认消息
- 发送到交换机的阶段:发送成功返回ack 不成功返回nack
- 交换机发送到队列的阶段: 发送者回执,消息发送到交换机了 如果没有发送到队列,就会返回失败的原因和ack
- 发送消息的时候,必须个每个消息设置一个唯一的id,以区分不同的消息
- 实现步骤:
- 需要在rabbit配置 publisher-confirm-type:correlated //simple 代表同步 correlated代表异步
publisher-returns:true 开启回执
template:
mandatory:true ```xml spring: rabbitmq: publisher-confirm-type: correlated publisher-returns: true template: mandatory: true
- 需要在rabbit配置 publisher-confirm-type:correlated //simple 代表同步 correlated代表异步
- 在消息的生产者定义一个配置类实现ApplicationContextAware接口 重写setApplicationcontext方法 通过application对象获取rabbittenplate对象- rabbtitemplate.setConfirm()定义实现确认逻辑 .setRentruncallback定义回调逻辑```xmlpackage cn.itcast.mq.config;@Slf4j@Configurationpublic class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 投递失败,记录日志log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",replyCode, replyText, exchange, routingKey, message.toString());// 如果有业务需要,可以重发消息});rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** @param correlationData 自定义的数据* @param ack 是否确认* @param cause 原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(ack){// 3.1.ack,消息成功log.debug("消息发送成功, ID:{}", correlationData.getId());}else{// 3.2.nack,消息失败log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), cause);}}});}@Beanpublic DirectExchange simpleExchange(){// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new DirectExchange("simple.direct", false, false);}@Beanpublic Queue simpleQueue(){return new Queue("simple.queue",false);}@Beanpublic Binding binding(){return BindingBuilder.bind(simpleQueue()).to(simpleExchange()).with("simple");}}
- 消息没有发送到交换机:就会触发confirm方法 返回ack为false 发送成功则为turn 消息发送到交换机 但是没有路由到队列 就会触发returncallback方法 返回失败的原因 以及状态,路由等消息
消息持久化 :
交换机持久化:mq中交换机默认数不持久化的如果宕机重启就会丢失消息 所以需要持久化: (事实上,默认情况下,由SpringAMQP声明的交换机都是持久化的。)
@Bean public DirectExchange simpleExchange(){ // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 return new DirectExchange("simple.direct", true, false); }队列持久化:在rabbitmq中队列默认是不持久化的,需要在声明的时候持久化
@Bean public Queue simpleQueue(){ return new Queue("simple.queue",true); } //事实上,默认情况下,由SpringAMQP声明的队列都是持久化的消息持久化

消费者确认:mq在收到消费则确认会删除消息 但是如果消费者发送ack回执给mq后 出现异常导致没有处理消息,消息就会丢失了 索引需要设置消费者的回复时机
springAMQP可以有3种确认模式
- none :关闭ack, mq会默认假定消费者在收到消息会马上处理,消息会被删除掉
- manual: 手动ack, 代码逻辑需要我们自己完成’
- auto:自动ack, 由spring检测listener代码的异常,没有异常则返回ack 否则nack(推荐使用)
配置:spring: rabbitmq: listener: simple: acknowledge-mode: none # 关闭ack auto 自动 manual手动ack 默认我们使用auto
失败重试机制:(消费者消费失败的机制 :会会发起重试机制 就是隔一段时间在次处理消息)
spring: rabbitmq: listener: simple: retry: enabled: true # 开启消费者失败重试 initial-interval: 1000ms # 初识的失败等待时长为1秒 multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval max-attempts: 3 # 最大重试次数 stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false重试失败策略:在重试失败后,有3种处理消息的策略
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽之后,发送到指定的交换机:需要定义一个republishemassagerecover 来绑定回收的交换机和队列
@Configuration public class ErrorMessageConfig { @Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } @Bean public Queue errorQueue(){ return new Queue("error.queue", true); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); } //2)定义一个RepublishMessageRecoverer,关联队列和交换机 @Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); } }.总结
如何确保RabbitMQ消息的可靠性?
开启生产者确认机制,确保生产者的消息能到达队列
- 开启持久化功能,确保消息未消费前在队列中不会丢失
- 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
- 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理
死信交换机
- 死信:
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false(消息会拒绝,且重新入列为false)
- 消息是一个过期消息,超时无人消费(消息超时了)
- 要投递的队列消息满了,无法投递(队列满了)
- 死信交换机:在定义交换机的时候,
@Bean public Queue simpleQueue(){ return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化 .deadLetterExchange("dl.direct") // 指定死信交换机 .build(); } // 声明死信交换机 dl.direct @Bean public DirectExchange dlExchange(){ return new DirectExchange("dl.direct", true, false); } // 声明存储死信的队列 dl.queue @Bean public Queue dlQueue(){ return new Queue("dl.queue", true); } // 将死信队列 与 死信交换机绑定 @Bean public Binding dlBinding(){ return BindingBuilder.bind(simpleQueue()).to(dlExchange()).with("simple"); }
- 死信:
TTL:消息超时分2种情况:一是消息设置超时,而是队列设置了超时时间,如果都设置 了则按时间最短的为标准
- 利用死信交换机来实现延迟消息的发送:原理,声明一个死信队列来保存死信(给队列或者消息设置超时时间)并声明绑定死信交换机(消息过期会发送到这个交换机上),在声明一个交换机(死信交换机是direct),再声明一个队列来和死信交换机绑定就可以接收超时了的死信, 消费者监听这个队列就行 当消息到达时间 就可以收到消息,了;
```java
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable(“ttl.queue”) // 指定队列名称,并持久化
}.ttl(10000) // 设置队列的超时时间,10秒 .deadLetterExchange("dl.ttl.direct") // 指定死信交换机 .build();
//消息也可以指定超时时间 @Test public void testTTLMsg() { // 创建消息 Message message = MessageBuilder .withBody(“hello, ttl message”.getBytes(StandardCharsets.UTF_8)) .setExpiration(“5000”) .build(); // 消息ID,需要封装到CorrelationData中 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 发送消息 rabbitTemplate.convertAndSend(“ttl.direct”, “ttl”, message, correlationData); log.debug(“发送消息成功”); }
- **延迟队列(延迟交换机):**
- 延迟队列可以更好的发送延迟短信,
- 应用场景:延迟发短信,处理超时没有支付的订单,预约工作会议等..
- 延迟队列需要安装一个插件delay queue(DelayExchange)
- 原理:
- 需要声明一个交换机 属性为delayed类型
- 发送消息的时候 使用messagebuilder 设置消息的消息头setHeader("x-delay",超时时间) 必须设置消息头携带:""x-delay" 延迟时间
- 延迟交换机 接收到消息头的时候会去检查是否携带x-delay 如果携带就会持久化到硬盘,读取x-delay的属性值作为延迟时间
- 返回routing not found结果给消息发送者
- x-delay时间到期后,重新投递消息到指定队列
- 代码实现如下
```java
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct",delayed = "true"),
key = "delay"
))
或者
@Bean
public DirectExchange exchange(){
// return new DirectExchange("dead.direct",true,false);
return ExchangeBuilder.directExchange("dead.direct").delayed().build();
}
惰性队列:
- 消息堆积问题:当发送消息的速度大于消费者处理消息的速度,就会造成消息堆积,直到到达队列的上限,之后在发的消息就会变成死信,可能造成消息的丢失
- 解决消息堆积方案:
- 1增加消费者的数量,提高消费速度,也就是work queue模式,
- 2或扩大队列的容积,提高堆积的上限,
- 3或者使用惰性队列
- 惰性队列:
- 消息不存储在内存,而是存储在硬盘;
- 消费者需要消息的时候才会读取到内存
- 支持海量消息的存储
- 优点:
- 基于磁盘存储,储存量高,
- 没有间歇性的page_out操作 性能比较稳定(普通队列持久化的时候需要先到内存,然后page_out操作到磁盘)
- 缺点:
- 性能依赖磁盘的io能力,
- 基于磁盘存储,时效性会降低
- 基于命名声明惰性队列:略…
基于注解声明惰性队列:
@bean public Queue queue(){ return QueueBuilde.durable("lazy.queue")//持久化并声明队列名字 .lazy()//声明为惰性队列 .build(); }基于@RabbitListener注解声明惰性队列
@RabbitListener(queuesToDeclare = @Queue( name = "lazy.queue",durable = "true", arguments = @Argument(name = "x-queue-mode",value = "lazy") ))
MQ集群:RabbitMQ的是基于Erlang语言编写,而Erlang又是一个面向并发的语言,天然支持集群模式 集群的rabbit需要有相同的erlang cookie 才可以实现集群。RabbitMQ的集群有两种模式:
- 普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。
- 镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。
- 普通集群 也叫标准集群
- 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
- 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
- 队列所在节点宕机,队列中的消息就会丢失(实际上队列只保存在创建队列的mq节点)
- 镜像集群 :本质上就是 主从模式
- 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
- 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点
- 一个队列的主节点可能是另一个队列的镜像节点
- 所有操作都是主节点完成,然后同步给镜像节点
- 主宕机后,镜像节点会替代成新的主节点(如果在主从同步完成前,主就已经宕机,可能出现数据丢失)
- 会有数据丢失的风险
- 仲裁队列:
- 仲裁队列是3.8版本以后才有的新功能,用来替代镜像队列,具备下列特征:
- 与镜像队列一样,都是主从模式,支持主从数据同步
- 使用非常简单,没有复杂的配置
- 主从同步基于Raft协议,强一致
- 仲裁队列是3.8版本以后才有的新功能,用来替代镜像队列,具备下列特征:
