为什么要幂等处理

因为常用的mq如kafka和rocketmq都无法保证消息不会重复消费(他们的思想都是at-least-once),如果不幂等处理会导致数据一致性问题。

什么情况下会重复消费

  1. consumergroup内的consumer数量变更,需要重新rebalance一下topic的所有分区(kafka叫partition,rocketmq叫queue)和consumer的分配关系,调整之后,原来老consumer已经下发但未上报offset的消息会被新consumer重新消费。
  2. consumer内部宕机,导致内存中offset丢失,重新拉取后重新消费。

    如何幂等处理

  3. 新增

    1. 业务表上如果有唯一约束,就先查询是否存在,如果已存在直接消费完成。
    2. 如果没有唯一约束,可以考虑加一个业务无关的唯一约束,比如uuid之类的。
    3. 如果不想加字段,就考虑引入分布式kv中间件来判重(redis),但是这里的判重最好不要用布隆过滤器,因为存在误判,一旦误判会导致消息丢失。这里建议就用正常的string结构的get/set即可,但是可以设一个较短的过期值(因为不过是rebalance还是宕机恢复,过程都是比较快的)减少内存浪费。
  4. 删除:如果要删除的业务数据已经查询不到,即认为消费成功。
  5. 更新:直接覆盖,如果更新数据内带有状态字段,如val = val + 1、currentTime等等,要么最好避免这种情况在消息体内带上绝对值,要么进行乐观锁处理增加version字段。

    防止消息丢失

  6. 生产者端:

    1. kafka可以设置acks=all,要求leader落盘、全部follower完成同步才会返回消息发送成功。
    2. rocketmq可以采用事务消息,采用两段消息状态处理。
      1. producer发送消息。
      2. mq返回消息发送成功(处于half状态)。
      3. producer继续执行其他业务
      4. producer发起commit,mq修改消息为正常状态。
      5. producer发起rollback,mq删除half状态消息。
      6. producer没有响应,mq回查事务状态继续上述两步,回查方式通过实现添加自定义的TransactionListener接口实现。
  7. mq端:分区备份,自动切换
    1. 备份kafka和rocketmq差不多,都是一个分区在多个broker上进行备份,备份方式可以基于性能和可用性做异步还是同步、全部还是局部的选择。
    2. 自动切换方面,kafka基于sync队列进行选取,rocketmq基于Dledger算法进行选主。
  8. 消费者端:不要在没有正真消费完成的情况下提交offset(关闭自动提交)。

    如何保证顺序消息

  9. 在某些场景下,我们需要保证消息的顺序性,比如下单、优惠券、付款这个业务顺序是不能变的。

  10. kafka和rocketmq都是分布式架构,带来的优点也是我们关注的点就是可以水平扩展的性能,但也是由于分布式,不同分区之间的消息消费是无法控制顺序的,所以这是天然矛盾的。
  11. 那就是说如何一定要保证顺序,那就要舍弃分布式,也就是将所有消息都发布到一个分区内用一个消费者来消费(因为一个分区只能对应一个消费者),这样就可以实现顺序,很显然这样就导致无法享有多分区带来的高性能。
  12. 那如何将消息都发送到同一个分区,kafka和rocketmq的方式其实差不多,那就是提供了一个可自定义分区散列值的方式,可以理解为hashmap中的key的hashcode方法重写,mq会根据该值进行取模散列:
    1. 如果要全局顺序,该接口返回一个常量即可。
    2. 有的场景可以根据某业务字段内顺序,比如用户、订单等,就可以返回用户/订单ID作为散列值。
    3. kafka对应接口是Partitioner
    4. rocketmq对应接口是MessageQueueSelector