消息丢失可能发生在生产者发送消息、MQ 本身丢失消息、消费者丢失消息 3 个方面。

生产者丢失

生产者丢失消息的可能点在于程序发送失败抛异常了没有重试处理,或者发送的过程成功但是过程中网络闪断 MQ 没收到,消息就丢失了。

由于同步发送的一般不会出现这样使用方式,所以我们就不考虑同步发送的问题,我们基于异步发送的场景来说。

异步发送分为两个方式:异步有回调和异步无回调,无回调的方式,生产者发送完后不管结果可能就会造成消息丢失,而通过异步发送 + 回调通知 + 本地消息表的形式我们就可以做出一个解决方案。以下单的场景举例。

  1. 下单后先保存本地数据和 MQ 消息表,这时候消息的状态是发送中,如果本地事务失败,那么下单失败,事务回滚。
  2. 下单成功,直接返回客户端成功,异步发送 MQ 消息
  3. MQ 回调通知消息发送结果,对应更新数据库 MQ 发送状态
  4. JOB 轮询超过一定时间(时间根据业务配置)还未发送成功的消息去重试
  5. 在监控平台配置或者 JOB 程序处理超过一定次数一直发送不成功的消息,告警,人工介入。

image.png

一般而言,对于大部分场景来说异步回调的形式就可以了,只有那种需要完全保证不能丢失消息的场景我们做一套完整的解决方案。

MQ 丢失

如果生产者保证消息发送到 MQ,而 MQ 收到消息后还在内存中,这时候宕机了又没来得及同步给从节点,就有可能导致消息丢失。

比如 RocketMQ:

RocketMQ 分为同步刷盘和异步刷盘两种方式,默认的是异步刷盘,就有可能导致消息还未刷到硬盘上就丢失了,可以通过设置为同步刷盘的方式来保证消息可靠性,这样即使 MQ 挂了,恢复的时候也可以从磁盘中去恢复消息。

比如 Kafka 也可以通过配置做到:

acks=all 只有参与复制的所有节点全部收到消息,才返回生产者成功。这样的话除非所有的节点都挂了,消息才会丢失。replication.factor=N,设置大于1的数,这会要求每个partion至少有2个副本min.insync.replicas=N,设置大于1的数,这会要求leader至少感知到一个follower还保持着连接retries=N,设置一个非常大的值,让生产者发送失败一直重试```

虽然我们可以通过配置的方式来达到MQ本身高可用的目的,但是都对性能有损耗,怎样配置需要根据业务做出权衡。

消费者丢失

消费者丢失消息的场景:消费者刚收到消息,此时服务器宕机,MQ认为消费者已经消费,不会重复发送消息,消息丢失。

RocketMQ默认是需要消费者回复ack确认,而kafka需要手动开启配置关闭自动offset。

消费方不返回ack确认,重发的机制根据MQ类型的不同发送时间间隔、次数都不尽相同,如果重试超过次数之后会进入死信队列,需要手工来处理了。(Kafka没有这些)

消息可靠性怎么保证? - 图2