《第1篇·为什么使用消息队列》我们曾提到过使用消息队列可能会带来的影响,包括降低系统可用性、增加系统复杂度、一致性问题等等,今天就来讨论增加系统复杂度的其中一种情况——消息重复。

MQ-3封面.png

1. 背景

事实上,重复消息是我们在使用消息队列时比较常见的一个问题,如果没有很好的预防措施,将会造成数据上极大的漏洞,严重时可能给公司带来极大资损。

在讨论消息队列的优点——异步时,我们是以营销中心发放新人优惠券的案例来描述的。同样的,我们也以这个场景来讲述重复消息问题。

再来回顾一下发放会员新人优惠券业务的流程图:

在会员注册时,会抛出一个 MQ 消息,营销中心消费这条消息执行发放新人优惠券逻辑。但是实际上在该功能提测的过程中,偶尔会出现这样的情况:用户注册成功后,发现自己有两张新人优惠券。

这就是本文所要讨论的重复消息。

如果线上出现了这样的问题,可能有用户就会收到多张新人优惠券,对于企业来说,这可是一笔不小的资损。为了防止这样的情况出现,我们先来分析一下产生重复消息的原因。

若没有特殊说明,本文中讨论的消息队列都是 RocketMQ。

2. 重复消息出现的原因

在消息投递以及消费的过程中,通常流程就是:

  1. 生产者发送消息
  2. 消息队列收到消息
  3. 消费者收到消息
  4. 消费者消费消息
  5. 消息队列收到消息消费结果

在这个过程中有出现了三个参与者:生产者、消费者、消息队列,它们都有可能存在异常情况,导致出现重复消息。

2.1 生产者导致的消息重投

第一种情况是生产者导致的消息重投,这里又分为两种情况。

2.1.1 生产者发送消息失败的消息重投

首先是生产者发送消息失败的消息重投,在 RocketMQ 中生产者会将消息投递到 Broker 中,这里有三种模式:同步、异步、单向模式。

单向模式下无论消息发送的状态是成功还是失败都不会做处理。

在同步消息模式下,当消息发送状态不等于 org.apache.rocketmq.client.producer.SendStatus#SEND_OKisRetryAnotherBrokerWhenNotStoreOK 配置项为开启时,生产者将对另外的 broker 发起默认三次的消息重投,这里就可能导致产生重复消息。

MQ-3-1.png

生产者的默认重试次数是可配置的,可以通过声明 RocketMQ 生产者时的 retryTimesWhenSendFailed 参数来控制。

MQ-3-2.png

而在异步模式下,如果消息发送失败了,只会在当前 Broker 进行消息重投。

还有一点需要注意:如果生产者发送消息时超时,则认为消息队列不可用,不会再进行消息重投。

2.1.2 生产者主动发起的业务性重复消息

第二种生产者导致的重复消息是生产者主动发起的业务性重复消息。比如在一些消费者并不会持久化消息的场景下,依赖于生产者的定时扫描未成功被消费的数据,触发消息重试从而完成一些业务逻辑。

说的有些抽象了,还是拿会员注册异步赠送新人优惠券的场景来举例。现在对于会员中心而言会员注册消息已经抛出去了,如何保证这个消息一定被营销中心消费成功了呢?换言之,如何保证新会员的新人优惠券一定能够发送成功呢?

事实上,在领域层面,这应该是由营销中心来保证一定消费成功的。一般比较无脑的做法就是,在营销中心记录一张消息表,将每个会员注册消息都记录下来,消费状态为初始化,当新人优惠券发送成功后,才将消息的消费状态改为完成,由此标志着新注册会员的新人优惠券发放完成。同时营销中心会有一个定时任务,去扫描一段时间内(一般是今天以前)消息消费状态仍旧是未完成的消息,然后重新触发发放新人优惠券逻辑,如果多次重试仍旧失败,就应该由运营介入排查原因,并手动补发优惠券。上述由营销中心保证消息一定被消费成功的流程图为:

上面说的跟重复消息没有一点关系,只是为了解释保证消息消费成功的一种做法。

其实还有另一种做法,就是在会员中心去保证新人优惠券的发放必定成功:在会员表冗余一个字段叫是否已发放新人优惠券(或者新建一张表去记录会员异步任务,新人优惠券是其中一种类型),当营销中心消费会员注册消息成功后,同步调用会员中心提供的接口将是否已发放新人优惠券更新为「是」,然后会员中心定义一个定时任务,扫描一段时间内未发放新人优惠券的数据,重新抛出会员注册消息(这里改成新人优惠券消息比较好,因为还会有别的业务会消费会员注册消息),这样就会存在重复消息。这种由会员中心抛出重复消息来保证新人优惠券发放成功的流程图为:

2.2 消息队列导致的消息重试

第二种情况是消息队列导致的消息重试。消费者会对消息队列抛出的消息进行消费,当消息消费失败消费者消费确认超时以及消费位点异常都可能导致消息队列发送重复的消息。

我们使用官方给出的 QuickStart 示例代码进行验证,有兴趣的同学可以在 apache/rocketmq 获取源码,源代码在 exmple 包下:

  • 生产者类:org.apache.rocketmq.example.quickstart.Producer
  • 消费者类:org.apache.rocketmq.example.quickstart.Consumer

2.2.1 消费失败导致的消息重试

首先模拟消费者消费消息失败导致的消息重试,启动 RocketMQ(nameserv、broker),启动生产者,发送一条消息,然后在消费者端的消费逻辑中手动抛出一个 RuntimeException,如下图所示:

MQ-3-3.png

启动消费者,我们可以看到消费者每隔一段时间会不断打印一条消费该消息的日志:

MQ-3-4.png

消费者消费消息有两种状态:CONSUME_SUCCESSRECONSUME_LATER

当消费者消费失败时,它返回的状态就是后者,消息队列在接收到消息消费失败的状态后,会将进行消息重试,默认的消息重试次数是 18 次。这是因为消息队列会以延迟消息的形式进行消息重试,而开源版 RocketMQ 提供的延时消息总共有 18 种时间间隔:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

当在2h的延时消息发送后消费状态仍是失败,它将进入死信队列,不再进行重试。

2.2.2 消费确认超时导致的消息重试

然后我们模拟由于消费确认超时导致的消息重试

默认消费确认的超时时间是15分钟,我们可以通过 org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#setConsumeTimeout 方法来手动设置消费确认超时时间(单位是分钟)。在创建消费者时,指定消费超时时间为1分钟,然后修改消费者消费逻辑,设置线程睡眠60秒以上,如下图所示:

MQ-3-5.png

然后观察消费者的输出,我们可以在超过消费确认时间后消费者并没有再次收到这条消息,再启动一个新的消费者,可以看到新的消费者收到了相同的消息。

由此我们可以知道在消费者消费确认超时后,消息队列会认为该消费者无法正常接收到消息,从而将消息发送到另外的消费者进行消费尝试,直到收到消息确认状态为止(消费状态成功或者失败都认为正常收到了消息)。

MQ-3-6.png

在这种情况下,假设消费者消费逻辑的耗时较长,恰好大于消费确认超时时间,从而导致消息队列向其他消费者发送重复的消息。如果每个消费者没有做好对重复消息的预防措施,那么就有可能在两个消费者端都成功消费该消息,从而导致数据重复,严重情况下会操作极大资损。

2.2.3 消费位点同步失败导致的重复消息

消息队列还有另外一种会产生重复消息的情况:消费位点同步失败导致的重复消息

关于消费位点就需要牵扯到消息队列的持久化机制了,但这并不在本文的重点讨论范围内。现在我们只需要知道,当消息被传递到 RocketMQ 消息队列时会保存在 CommitLog 中,消息队列会通过操作系统的能力将消息写入磁盘文件。同时每个消费者组都会用一个消费位点来标识当前消费者已经消费到的消息偏移量,并且定时与 Broker 同步自己的消费位点(offset,偏移量)。

由于这里消费者组的消费位点是定时与 Broker 进行同步的,就可能出现一些数据异常的情况,如:消费者在还没有同步消费位点的时候就由于某种异常中断了进程,导致 Broker 端对于该消费者的消费位点是旧值,所以在该消费者重启的时候,消息队列会从 Broker 端记录的的消费位点处(旧值)开始重新进行消息传递,也就产生了重复消息。

3. 如何处理重复消息

上面总结了五种产生重复消息的原因,分别是:

  1. 生产者发送消息失败,生产者进行消息重投
  2. 生产者主动发起的业务性重复消息
  3. 消费者消息消费失败,导致消息队列发起的消息重试
  4. 消费者消费确认超时,导致消息队列发起的消息重试
  5. 消费者消费位点同步失败,导致消息队列发送了重复消息

事实上我们无法保证消息队列一定不会产生重复消息,从另一个角度来说,发送重复消息也是一种保证消息一定传达的方式,作为业务方而言,我们只需要做好处理重复消息的措施就能防止重复消息带来的业务数据异常——即如何保证业务幂等性。

关于幂等性这里需要做一个区分,第一种是更新为给定结果的 update 操作,如将用户的账户金额更新为100,这就不需要太过关注幂等性,因为这种操作本身就具备幂等性,无论发多少个重复消息,都消费之后,用户的账户金额还是100。当然这里有一个消息顺序的问题,比如先将用户的账户金额更新为100,再将用户的账户金额更新为200,如果这里存在重复消息,理论上就可能出现两条消息消费成功后,用户的账户金额还是100,与预期不符。

第二种是 insert 或者不附带结果的 update 操作,如将用户的账户金额增加100、给用户发送一张新人优惠券等,这种操作是需要保证消息幂等性的,否则就可能出现一条消息给用户的账户金额增加了200、给用户发送了多张新人优惠券的资损情况。

对于第二种情况,可以考虑下面这种解决方案:每条消息都定义一个全局唯一的 key,比如订单编号、活动流水号、数据库唯一联合索引等,在处理这条消息前,先根据这个全局唯一的 key 去(Redis 或数据库)查询一下是否已存在,如果已存在这就是一条重复消息直接跳过不执行消费逻辑了,如果不存在那就继续执行消费逻辑。

需要注意的是,虽然消息队列在发送消息时会有一个唯一的 MsgId,但是不同 MsgId 消息的消息体可能是相同的,所以我们不能根据消息的 MsgId 来过滤重复消息。

同时,建议在选择全局唯一 Key 的时候,在持久化的表中定义一个唯一索引,因为在并发情况下,如果没有数据库唯一索引的兜底,还是可能会存在重复数据的。

4. 参考

最后,本文收录于个人语雀知识库: 我所理解的后端技术,欢迎来访。