可靠抵达

在分布式系统中,比如现在有很多微服务,微服务连接上消息队列服务器,其它微服务可能还要监听这些消息,

但是可能会因为服务器抖动、宕机,MQ 的宕机、资源耗尽,以及无论是发消息的生产者、还是收消息的消费者,它们的卡顿、宕机等各种问题,都会导致消息的丢失,比如发送者发消息的时候,给弄丢了 ,看起来消息是发出去了,MQ网络抖动没接到, 或者MQ接到了,但是它消费消息的时候,因为网络抖动又没拿到,等等各种问题

所以在分布式系统里面,一些关键环节,我们需要保证消息一定不能不丢失,比如:订单消息发出去之后,该算库存的、该算积分的、该算优惠的等等 ,这些消息千万不能丢,因为这都是经济上的问题

所以,想要保证不丢失,也就是可靠抵达,无论是发消息,可靠的抵达MQ,还是收消息,MQ的消息可靠抵达到我们的消费端,我们一定要保证消息可靠抵达,包括如果出现错误,我们也应该知道哪些消息丢失了

以前我们要做这种事情,可以使用事务消息,比如我们在发消息的时候,我们发消息的客户端首先会跟 MQ 建立一个连接,会在通道里面发消息,可以将通道设置成事务模式,这样发消息,只有整个消息发送过去,MQ消费成功给我们有完全的响应以后,我们才算消息成功,

但是使用事务消息,会使性能下降的很严重,官方文档说,性能会下降250倍……

为了保证在高并发期间能很快速的,确认哪些消息成功、哪些消息失败,我们引入了消息确认机制

消息准确送达的流程

image.png

首先生产者准备一个消息,消息只要投递给 MQ 服务器,服务器收到以后,消息该怎么存怎么存,该投给哪投给哪,

所以 Broker 首先会将消息交给 Exhchange,再有 Exchange 送达给 Queue,所以整个发送消息的过程,牵扯到两个

  1. P端到B端的过程
  2. E端到Q端的过程

如何保证消息的可靠送达

为了保证消息的可靠送达,每个消息被成功, 我们引入了发送者的两个确认回调

第一个是确认回调,叫 confirmCallback,就是P端给B端 发送消息的过程,Broker 一旦收到了消息,就会回调我们的方法 confirmCallback,这是第一个回调时机,这个时机就可以知道哪些消息到达服务器了

但是服务器收到消息以后,要使用 Exchange 交换机,最终投递给 Queue,但是投递给队列这个过程可能也会失败,比如我们指定的路由键有问题,或者我们队列正在使用的过程中,被其它的一些客户端删除等操作,可能都会投递失败,投递失败就会调用 returnCallback

当然,这两种回调都是针对的发送端

同样的,消费端,只要消息安安稳稳的存到了消息队列,接下来就由我们消费端进行消费了,但是消费端引用消费,会引入 ack 机制(消息确认机制)

这个机制能保证, 让 Broker 知道哪些消息都被消费者正确的拿到了,如果消费者正确接到,这个消息就要从队列里面删除,如果没有正确接到,可能就需要重新投递消息

总结

整个可靠抵达,分为两端处理,第一种是发送端的两种确认模式,第二个是消费端的 ack机制

发送端确认

确认回调-ConfimCallback

  1. spring:
  2. rabbitmq:
  3. publisher-confirms: true # 开启发送端确认
  • 在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启confirmcallback 。
  • CorrelationData:用来表示当前消息唯一性。
  • 消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有broker 接收到才会调用 confirmCallback
  • 被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback

退回回调-ReturnCallback

Broker 未将消息成功投递给 Queue 触发的回调

spring:
  rabbitmq:
    publisher-returns: true # 开启发送端消息抵达队列的确认  
    template:
      mandatory: true # 只要消息抵达了队列,以异步发送优先回调这个returnconfirm

示例

image.png
https://gitee.com/UnityAlvin/gulimall/commits/master

消费端确认-Ack消息确认机制

保证每个消息被正确消费,此时broker才可以删除这个消息

消费端默认是自动确认的,只要消息接收到,客户端会自动确认,服务端就会移除这个消息

queue无消费者,消息依然会被存储,直到消费者消费

带来的问题

消费端收到很多消息,自动回复给服务器ack,只有一个消息处理成功,消费端突然宕机了,结果MQ中剩下的消息全部丢失了

解决

消费端如果无法确定此消息是否被处理完成,可以手动确认消息,即处理一个确认一个,未确认的消息不会被删除

手动确认模式

只要我们没有明确告诉MQ收到消息。没有 Ack,消息就一直是 Unacked 状态,即使 consumer 宕机,消息也不会丢失,会重新变为 Ready,等待下次有新的 Consumer 连接进来时,再发给新的 Consumer

消费者获取到消息,成功处理,可以回复 Ack 给 Broker

  • ack() 用于肯定确认;broker 将移除此消息
  • nack() 用于否定确认;可以指定 broker 是否丢弃此消息,可以批量
  • reject() 用于否定确认;同上,但不能批量

消息如果一直没有调用ack()的话,则会一直处于 Unacked 状态,这些 Unacked 状态的消息,都不会被丢弃,如果客户端宕机,等服务端感知到消费端宕机了,它就会将这个消息改为 Ready 状态,Ready 状态的消息,全部都会被重新投递

示例

image.png
https://gitee.com/UnityAlvin/gulimall/commits/master

总结

结合消费端与发送端的消息确认机制,就能保证消息一定发出去,也能一定让别人接收到,即使没接收到,也可以重新发送,最终达到消息百分百不丢失!