1.Kafka Rebalance机制

什么是Rebalance机制

Rebalance机制是一种协议,声明了Consumer Group如何分配Consumer来消费Topic中对应Partition的过程

Rebalance机制触发的时机

  1. 有新的消费者加入消费者组
  2. 消费者宕机下线。这点是最坑的,因为很多时候消费者并没有真的下线,而是因为长时间的GC,网络延迟导致消费者长时间未向GroupCoordinator发送心跳等情况,GroupCoordinator会认为消费者已经下线。
  3. 消费者组订阅的主题或主题对应的分区数量发生变化

    Rebalance机制带来的问题

    消息重复消费的问题
    在Rebalance时,所有的消费者会暂停消费。如果此时还没有提交Offset,那么很有可能在Rebalance后被分配到了新的分区,那么对于Rebalance之前消费的消息,因为偏移量没有提交,就会出现重复消费的现象

    2.Kafka如何保证消息顺序性

    Producer角度

    非顺序性的原因

    可能存在这样一种情况,Producer在某一时刻同时发送了m1,m2两条消息。m2发送成功,m1发送失败。此时由于Producer开启了重试机制,重新发送m1,发送成功。
    那么问题就来了
    理论上消息的顺序应该是m1,m2。但是由于网络异常以及重试机制,在Broker上消息的顺序变成了m2,m1

    解决方案

    开启Producer幂等性:即通过设置enable.idempotence = true实现
    在设置enable.idempotence = true之后,Kafka会开启Producer幂等性。大致原理是通过ProducerId(PID)、Sequence Number机制实现

  4. Producer在发布消息时,对于每个Topic的每个Partition,都会有一个递增的Sequence Number

  5. Broker在接收消息时,对于每个Producer向每个Topic的每个Partition发送的消息,同样维护了一个递增的Sequence Number

通过Producer发送消息的Sequence Number和Broker维护的Sequence Number对比,来确定是否出现了消息乱序、消息丢失、消息重复的情况
对比规则如下:

  1. 对于接收到的每条消息,如果消息的Sequence Number比Broker的Sequence Number大,并且差值等于1,代表当前消息顺序正常,Broker接收该消息并持久化
  2. 对于接收到的每条消息,如果消息的Sequence Number比Broker的Sequence Number大,并且差值大于1,代表出现了消息丢失的情况,Broker拒收该消息。Producer抛出InvalidSequenceNumber异常
  3. 对于接收到的每条消息,如果消息的Sequence Number比Broker的Sequence Number小,代表出现了重复消息的情况,Broker拒收该消息。Producer抛出DuplicateSequenceNumber异常

    Broker角度

    非顺序性的原因

    假设存在这样一种情况,当前有一个Topic,包含3个分区,Producer顺序发送了3条消息m1,m2,m3到Topic,3条消息通过轮询机制分别发送到了3个不同的分区。m1-p1,m2-p2,m3-p3
    此时有一个包含3个Consumer的Consumer Group正在监听该Topic,c1-p1,c2-p2,c3-p3。因为Consumer1的网络延迟原因,在c2,c3之后消费了消息。那么此时消息的消费顺序为m2,m3,m1。与预期的m1,m2,m3不一致,出现了乱序的情况
    根本原因在于,消息发送到Topic时,存储在了不同的Partition

    解决方案

  4. 如果当前只有一类有序消息,那么创建一个只有一个分区的Topic即可

  5. 如果当前只有多类有序消息,那么可以分别创建对应的Topic,或创建一个多分区的Topic,但是不同类型的消息在发布时,需携带不同的Key

    Consumer角度

    非顺序性的原因

    理论上如果Topic实现了单分区,或有序消息都存储在同一分区的话,Consumer从Partition拉取的消息就已经实现了有序性,无论是1个Consumer还是多个Consumer
    但是如果消费者通过多线程去处理消息的话,因为多线程是无法准确保证线程的顺序性执行的,因此可能会出现虽然消费者顺序的拉取到了消息,但是最终消息并没有被顺序执行的一个情况发生

    解决方案

    维护内存队列、一个线程对应一个内存队列,相同key的消息存储到一个内存队列中,这样就可以保证相同key的消息的有序性消费。其实本质上,还是通过单一线程处理了一类消息

    3.Kafka如何保证消息不丢失

    Producer角度

    消息丢失原因

    因为网络异常等原因,导致Producer没有将消息发布到Broker,并且Producer也没有重试机制,从而导致消息丢失

    解决方案

  6. 开启幂等,确保从底层机制上,避免消息丢失的现象出现

  7. 开启Producer自动重试机制,并合理设置重试时间间隔和重试次数
  8. 自动重试机制理论上也存在不靠谱的情况。因此也可以关闭自动重试,通过sendResult的callback方法中的onFailure手动处理重试逻辑,在消息发布异常时进行手动重发,确保消息一定能够发布成功

    Broker角度

    消息丢失原因

    每个分区的消息,都会在其Leader副本以及ISR中维护的Follow副本进行持久化。那么如果当前分区没Follow副本,或消息没有持久化到ollow副本时就返回存储成功,但是实际上Follow副本同步失败
    那么在Leader副本所在的Broker宕机或出现异常时,就会造成还未持久化到磁盘的消息丢失
    总结

  9. 创建Topic时,副本数设置为1,导致没有follow副本

  10. ack设置0或1,消息未持久化到Leader副本以及ISR维护的全部Follow副本就响应给Producer
  11. 写入成功应答副本数(min.insync.replicas)设置过小,导致即使ack设置为-1,也不能保证消息持久化到全部的副本
  12. 刷盘频率过长与数量阈值多大,导致数据长期存在于内存

    解决方案

  13. 创建Topic时,指定副本数>=3,确保当前分区具有2个或2个以上的follow副本进行数据冗余

  14. 设置Producer的Acks参数=-1(ALL):确保消息数据持久化到分区的Leader副本以及ISR列表中维护的全部Follow副本后,才会响应给Producer持久化成功
  15. 合理写入成功应答副本数(min.insync.replicas),确保当ack设置为-1(ALL)时,消息能够持久化到分区的全部副本
  16. 合理设置日志刷盘时间频率,以及数量阈值,避免数据长时间存在于内存。设置过短是会影响吞吐量的,毕竟内存的读写效率还是要高于磁盘的,即使是磁盘的顺序写

    Consumer角度

    消息丢失原因

    偏移量提交后,消息没有被消费完或根本没有被消费,但是消费者宕机或重启,那么此时就造成了消息的丢失

  17. 自动提交偏移量:消息消费时间较长,在偏移量提交后,消费者宕机,消息没有被消费完,消息丢失

  18. 手动提交偏移量:先提交偏移量,再消费消息,消费者宕机,消息没有被消费,消息丢失

    解决方案

  19. 关闭偏移量自动提交

  20. 在业务逻辑全部处理完成后,手动提交偏移量。同步或异步都可以。确保偏移量最终提交成功。(这种模式可能会造成消息的重复消费,这个问题下一个单元来讲解)

    4.Kafka如何保证消息不重复消费

    Producer角度

    消息重复的原因

    可能因为一些误差情况,虽然消息发送成功,但是还是触发了Produce的Callback中OnFailure方法,进而触发了异常重试机制(无论自动还是手动),导致重复发送了一条消息

    解决方案

  21. 开启幂等,确保从底层机制上,避免消息重复发送的现象出现

    Broker角度

    个人觉得,broker对于这个问题,没啥能做的。。。。。。

    Consumer角度

    消息重复问题原因

    核心原因:在消息消费之后,Consumer Offset提交失败
    那么那些原因会导致Consumer Offset消费失败呢?

  22. 消费者自动/手动提交偏移量,在偏移量还未提交时,消费者宕机,导致重启后消费的还是上一条消息,从而导致重复消费

  23. 触发Rebalance机制,导致消费者还没有消费完消息就被分配到了新的分区,没有提交偏移量。而老的分区对应新的消费者还需要继续消费上一条消息,从而导致重复消费

那么那些原因会触发Rebalance呢?

  1. 消费者消费逻辑耗时较长,导致超过了Kafka的session timeout时间,那么就会触发Reblance重平衡,此时有一定几率导致Offset提交失败,进而会导致Rebalance后重复消费。
  2. 消费者消费逻辑耗时较长,导致超过了Kafka的max.poll.interval.ms时间,那么就会触发Reblance重平衡,此时有一定几率导致Offset提交失败,进而会导致Rebalance后重复消费。

    解决方案

    如何避免Reblance机制的出现

  3. 调整session.timeout.ms(消费者与服务端通信超时时间):超过该时间,服务端会认为当前消费者不可用,触发Rebalance机制,导致Offset提交失败,出现重复消费现象

  4. 调整max.poll.interval.ms(消费者拉取消息到提交偏移量的最长时间间隔):超过该时间,与session timeout同理,也会触发Rebalance机制,导致Offset提交失败,出现重复消费现象‘
  5. 在代码层面,优化代码逻辑,通过多线程等方法优化消费者执行效率,确保在最短时间内消费完该消息

发生重复消费后的处理逻辑

  1. 在数据层面,通过设置唯一主键等方式,避免数据出现重复的情况
  2. 在业务逻辑层面,对于新增修改等接口,通过预查询以及锁机制,避免重复的数据插入,最大程度避免异常数据到达数据层

    数据冗余方案

    Redis中创建3个Hash,消息发送记录,消息接收记录,消息偏移量提交记录

  3. 发送消息时,在消息发送记录中存入一条数据

  4. 消费者接收消息时,在消息接收记录中存入一条数据
  5. 消息偏移量手动提交后,在消息偏移量提交记录中存入一条数据

每隔一段时间,可以是每小时或每天,这个根据具体业务来确定,生成一组这样的3个Hash
这样,可以通过不同的记录中的消息数据,来确定是否消费完成

  1. Producer在发送消息时,判定消息发送记录中是否存在该消息,若存在,代表消息已经被发送了,不用重复发送
  2. Consumer在消费消息时,判定消息接收记录中是否存在该消息,若不存在,代表消息还没被消费过,那么消费该消息。若存在,代表消息已经被消费了,此时需要继续判定
  3. Consumer消息偏移量提交记录中是否存在该消息,若存在,代表消息已经消费完成,直接丢弃消息即可。若不存在,代表消息还没有消费完,继续消费消息即可。但是消费者代码中涉及到的方法,要做幂等性处理
  4. 后台定时任务定期扫描上一批3个Hash的消息发送记录、消息接收记录。计算差集,差集代表丢失的消息,进行补发即可。(当然也有可能是消费者宕机,导致消息迟迟没有被消费)