为什么要幂等处理
因为常用的mq如kafka和rocketmq都无法保证消息不会重复消费(他们的思想都是at-least-once),如果不幂等处理会导致数据一致性问题。
什么情况下会重复消费
- consumergroup内的consumer数量变更,需要重新rebalance一下topic的所有分区(kafka叫partition,rocketmq叫queue)和consumer的分配关系,调整之后,原来老consumer已经下发但未上报offset的消息会被新consumer重新消费。
consumer内部宕机,导致内存中offset丢失,重新拉取后重新消费。
如何幂等处理
新增
- 业务表上如果有唯一约束,就先查询是否存在,如果已存在直接消费完成。
- 如果没有唯一约束,可以考虑加一个业务无关的唯一约束,比如uuid之类的。
- 如果不想加字段,就考虑引入分布式kv中间件来判重(redis),但是这里的判重最好不要用布隆过滤器,因为存在误判,一旦误判会导致消息丢失。这里建议就用正常的string结构的get/set即可,但是可以设一个较短的过期值(因为不过是rebalance还是宕机恢复,过程都是比较快的)减少内存浪费。
- 删除:如果要删除的业务数据已经查询不到,即认为消费成功。
更新:直接覆盖,如果更新数据内带有状态字段,如val = val + 1、currentTime等等,要么最好避免这种情况在消息体内带上绝对值,要么进行乐观锁处理增加version字段。
防止消息丢失
生产者端:
- kafka可以设置acks=all,要求leader落盘、全部follower完成同步才会返回消息发送成功。
- rocketmq可以采用事务消息,采用两段消息状态处理。
- producer发送消息。
- mq返回消息发送成功(处于half状态)。
- producer继续执行其他业务
- producer发起commit,mq修改消息为正常状态。
- producer发起rollback,mq删除half状态消息。
- producer没有响应,mq回查事务状态继续上述两步,回查方式通过实现添加自定义的TransactionListener接口实现。
- mq端:分区备份,自动切换
- 备份kafka和rocketmq差不多,都是一个分区在多个broker上进行备份,备份方式可以基于性能和可用性做异步还是同步、全部还是局部的选择。
- 自动切换方面,kafka基于sync队列进行选取,rocketmq基于Dledger算法进行选主。
消费者端:不要在没有正真消费完成的情况下提交offset(关闭自动提交)。
如何保证顺序消息
在某些场景下,我们需要保证消息的顺序性,比如下单、优惠券、付款这个业务顺序是不能变的。
- kafka和rocketmq都是分布式架构,带来的优点也是我们关注的点就是可以水平扩展的性能,但也是由于分布式,不同分区之间的消息消费是无法控制顺序的,所以这是天然矛盾的。
- 那就是说如何一定要保证顺序,那就要舍弃分布式,也就是将所有消息都发布到一个分区内用一个消费者来消费(因为一个分区只能对应一个消费者),这样就可以实现顺序,很显然这样就导致无法享有多分区带来的高性能。
- 那如何将消息都发送到同一个分区,kafka和rocketmq的方式其实差不多,那就是提供了一个可自定义分区散列值的方式,可以理解为hashmap中的key的hashcode方法重写,mq会根据该值进行取模散列:
- 如果要全局顺序,该接口返回一个常量即可。
- 有的场景可以根据某业务字段内顺序,比如用户、订单等,就可以返回用户/订单ID作为散列值。
- kafka对应接口是
Partitioner
。 - rocketmq对应接口是
MessageQueueSelector
。