1 生产者push消息节点

producer的写入流程:

  1. producer先从kafka集群找到该partition的leader
  2. producer将消息发送给leader,leader将该消息写入本地
  3. follwers从leader pull消息,写入本地log后leader发送ack
  4. leader 收到所有 ISR 中的 replica 的 ACK 后,增加high watermark,并向 producer 发送 ack

image.png
可以看到, kafka最终会返回一个ack来确认推送消息结果,这里kafka提供了三种模式:

  1. NoResponse RequiredAcks = 0:这个代表的就是push数据的成功与否都与我无关了
  2. WaitForLocal RequiredAcks = 1:当local(leader)确认接收成功后,就可以返回了
  3. WaitForAll RequiredAcks = -1:当所有的leaderfollower都接收成功时,才会返回
  • 如果我们选择了模式1,这种模式丢失数据的几率很大,无法重试
  • 如果我们选择了模式2,这种模式下只要leader不挂,就可以保证数据不丢失,但是如果leader挂了,follower还没有同步数据,那么就会有一定几率造成数据丢失 (吞吐量高, 可靠性较低)
  • 如果选择了模式3,这种情况不会造成数据丢失,但是有可能会造成数据重复,假如leader与follower同步数据是网络出现问题,就有可能造成数据重复的问题。(可靠性高, 吞吐量较低)

    2 kafka集群自身故障

    kafka集群接收到数据后会将数据进行持久化存储,最终数据会被写入到磁盘中,
    在写入磁盘这一步也是有可能会造成数据损失的,
    因为写入磁盘的时候操作系统会先将数据写入缓存,
    操作系统将缓存中数据写入磁盘的时间是不确定的,
    所以在这种情况下,如果kafka机器突然宕机了,也会造成数据损失,不过这种概率发生很小,一般公司内部kafka机器都会做备份,这种情况很极端,可以忽略不计。

    3 消费者pull消息节点

    push消息时, 会把该消息追加到Partition并且分配为该消息一个偏移量offset
    消费者在pull到某个消息后,提交commit成功,该consumer在该分区的offset增加:
    image.png
    可以设置自动提交或者手动提交commit,

  • 在设置自动提交的时候,当我们拉取到一个消息后,此时offset已经提交了,但是我们在处理消费逻辑的时候失败了,这就会导致数据丢失了

  • 在设置手动提交时,如果我们是在处理完消息后提交commit,那么在commit这一步发生了失败,导致offset没有被更新, 就会导致重复消费的问题。

比起数据丢失,重复消费是符合业务预期的,我们可以通过一些幂等性设计来规避这个问题。

4 代码实战

https://github1s.com/asong2020/Golang_Dream/blob/master/code_demo/kafka_demo

(1) 解决push消息丢失问题

主要是通过两点来解决:

  • 通过设置RequiredAcks模式来解决,选用WaitForAll可以保证数据推送成功,不过会影响时延时
  • 引入重试机制,设置重试次数和重试间隔

    1. // broker.go
    2. func NewAsyncProducer() sarama.AsyncProducer {
    3. cfg := sarama.NewConfig()
    4. version, err := sarama.ParseKafkaVersion(VERSION)
    5. if err != nil {
    6. log.Fatal("NewAsyncProducer Parse kafka version failed", err.Error())
    7. return nil
    8. }
    9. cfg.Version = version
    10. cfg.Producer.RequiredAcks = sarama.WaitForAll // 三种模式任君选择
    11. cfg.Producer.Partitioner = sarama.NewHashPartitioner
    12. cfg.Producer.Return.Successes = true
    13. cfg.Producer.Return.Errors = true
    14. cfg.Producer.Retry.Max = 3 // 设置重试3次
    15. cfg.Producer.Retry.Backoff = 100 * time.Millisecond
    16. cli, err := sarama.NewAsyncProducer([]string{ADDR}, cfg)
    17. if err != nil {
    18. log.Fatal("NewAsyncProducer failed", err.Error())
    19. return nil
    20. }
    21. return cli
    22. }

    (2) 解决pull消息丢失问题

    我们这里使用的是自动提交,说好的使用手动提交呢?这是因为我们这个kafka库的特性不同,这个自动提交需要与MarkMessage()方法配合使用才会提交, 否则也会提交失败

    1. // broker.go
    2. func NewConsumerGroup(group string) sarama.ConsumerGroup {
    3. cfg := sarama.NewConfig()
    4. version, err := sarama.ParseKafkaVersion(VERSION)
    5. if err != nil {
    6. log.Fatal("NewConsumerGroup Parse kafka version failed", err.Error())
    7. return nil
    8. }
    9. cfg.Version = version
    10. cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
    11. cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
    12. cfg.Consumer.Offsets.Retry.Max = 3
    13. cfg.Consumer.Offsets.AutoCommit.Enable = true // 开启自动提交,需要手动调用MarkMessage才有效
    14. cfg.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second // 间隔
    15. client, err := sarama.NewConsumerGroup([]string{ADDR}, group, cfg)
    16. if err != nil {
    17. log.Fatal("NewConsumerGroup failed", err.Error())
    18. }
    19. return client
    20. }

    因为我们在写消费逻辑时要这样写:

    1. func (e EventHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    2. for msg := range claim.Messages() {
    3. var data common.KafkaMsg
    4. if err := json.Unmarshal(msg.Value, &data); err != nil {
    5. return errors.New("failed to unmarshal message err is " + err.Error())
    6. }
    7. // 操作数据,改用打印
    8. log.Print("consumerClaim data is ")
    9. // 处理消息成功后标记为处理, 然后才会真正提交
    10. session.MarkMessage(msg,"")
    11. }
    12. return nil
    13. }

    或者直接使用手动提交方法来解决,只需两步:

  1. 关闭自动提交:

    1. consumerConfig.Consumer.Offsets.AutoCommit.Enable = false // 禁用自动提交,改为手动
  2. 消费逻辑中添加如下代码,手动提交模式下,也需要先进行标记,在进行commit

    1. session.MarkMessage(msg,"")
    2. session.Commit()