面试题

如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?

面试官心理分析

其实这是很常见的一个问题,这俩问题基本可以连起来问。既然是消费消息,那肯定要考虑会不会重复消费?能不能避免重复消费?或者重复消费了也别造成系统异常可以吗?这个是 MQ 领域的基本问题,其实本质上还是问你使用消息队列如何保证幂等性,这个是你架构里要考虑的一个问题。

面试题剖析

上一讲提到,MQ 会导致系统复杂性提高,比如说,怎么保证消息不被重复消费呢?
在回答这个问题之前,可以先想一想,什么问题会导致消息重复消费呢?
首先,比如 RabbitMQ、RocketMQ、Kafka,都有可能会出现消息重复消费的问题,正常。因为这问题通常不是 MQ 自己保证的,是由我们开发来保证的。 Kafka 来举个例子,说说怎么重复消费吧。
kafka 实际上有个 offset 的概念,就是每写进去一个消息,都有自己的一个 offset,代表消息的序号,然后 consumer 消费了消息之后,每隔一段时间(kafka 可以配置),会把自己消费过的消息的 offset 提交一下,表示 “我已经消费过了,下一次重启什么的,就接着这个 offset 继续 “。
举个例子 —— 重复消费
有这么个场景。数据 1/2/3 依次进入 Kafka,Kafka 会给这三条数据每条分配一个 offset,代表这条数据的序号,我们就假设分配的 offset 依次是 152/153/154。消费者从 Kafka 去消费的时候,也是按照这个顺序去消费。假如当消费者消费了 offset=153 的这条数据,刚准备去提交 offset 到 Zookeeper,此时消费者进程被重启了。那么此时消费过的数据 1/2 的 offset 并没有提交,Kafka 也就不知道你已经消费了 offset=153 这条数据。那么重启之后,消费者会找 Kafka 说,嘿,哥儿们,你给我接着把上次我消费到的那个地方后面的数据继续给我传递过来。由于之前的 offset 没有提交成功,那么数据 1/2 会再次传过来,如果此时消费者没有去重的话,那么就会导致重复消费。
还有一种场景就是 consumer 消费速度很快,在指定的时间内提前读取完了 1、2、3 条消息,offset 还没来得及提交呢,consumer 又去读取数据了,此时也会导致重复消费。
【注意】新版的 Kafka 已经将 offset 的存储从 Zookeeper 转移至 Kafka brokers,并使用内部位移主题 __consumer_offsets 进行存储。
image.png
如果消费者干的事儿是拿一条数据就往数据库里写一条,会导致说,你可能就把数据 1/2 在数据库里插入了 2 次,那么数据就错啦。
所以第二个问题来了 —- 怎么保证幂等性
其实重复消费不可怕,可怕的是没有考虑重复消费之后怎么保证幂等性,幂等性,通俗说, 就是一个数据,或者一个请求,给你重复来多次,得确保对应的数据是不会改变,不能出错的。
幂等性
这个需要结合实际实际业务考虑,举例

  • 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
  • 比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
  • 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
  • 比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。

顺序性
针对重复消费来说,上文中重复消费问题中的消费这只有一个,那如果有多个消费者,怎么保证消费顺序?
场景举例:比如说我们建了一个 topic,有三个 partition。生产者在写的时候,其实可以指定一个 key,比如说我们指定了某个订单 id 作为 key,那么这个订单相关的数据,一定会被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。
消费者从 partition 中取出来数据的时候,也一定是有顺序的。到这里,顺序还是 ok 的,没有错乱。接着,我们在消费者里可能会搞多个线程来并发处理消息。因为如果消费者是单线程消费处理,而处理比较耗时的话,比如处理一条消息耗时几十 ms,那么 1 秒钟只能处理几十条消息,这吞吐量太低了。而多个线程并发跑的话,顺序可能就乱掉了。
image.png
解决方法:写 N 个 queue,将具有相同 key 的数据都存储到同一个 queue,然后对 N 个线程,每个线程分别消费一个 queue 即可
image.png