1 生产者push消息节点
producer的写入流程:
- producer先从kafka集群找到该partition的leader
- producer将消息发送给leader,leader将该消息写入本地
- follwers从leader pull消息,写入本地log后leader发送ack
- leader 收到所有 ISR 中的 replica 的 ACK 后,增加high watermark,并向 producer 发送 ack
可以看到, kafka最终会返回一个ack来确认推送消息结果,这里kafka提供了三种模式:
NoResponse RequiredAcks = 0:这个代表的就是push数据的成功与否都与我无关了
WaitForLocal RequiredAcks = 1:当local(leader)确认接收成功后,就可以返回了
WaitForAll RequiredAcks = -1:当所有的leader和follower都接收成功时,才会返回
- 如果我们选择了模式1,这种模式丢失数据的几率很大,无法重试
- 如果我们选择了模式2,这种模式下只要leader不挂,就可以保证数据不丢失,但是如果leader挂了,follower还没有同步数据,那么就会有一定几率造成数据丢失 (吞吐量高, 可靠性较低)
如果选择了模式3,这种情况不会造成数据丢失,但是有可能会造成数据重复,假如leader与follower同步数据是网络出现问题,就有可能造成数据重复的问题。(可靠性高, 吞吐量较低)
2 kafka集群自身故障
kafka集群接收到数据后会将数据进行持久化存储,最终数据会被写入到磁盘中,
在写入磁盘这一步也是有可能会造成数据损失的,
因为写入磁盘的时候操作系统会先将数据写入缓存,
操作系统将缓存中数据写入磁盘的时间是不确定的,
所以在这种情况下,如果kafka机器突然宕机了,也会造成数据损失,不过这种概率发生很小,一般公司内部kafka机器都会做备份,这种情况很极端,可以忽略不计。3 消费者pull消息节点
push消息时, 会把该消息追加到Partition并且分配为该消息一个偏移量offset
消费者在pull到某个消息后,提交commit成功,该consumer在该分区的offset增加:
可以设置自动提交或者手动提交commit,在设置自动提交的时候,当我们拉取到一个消息后,此时offset已经提交了,但是我们在处理消费逻辑的时候失败了,这就会导致数据丢失了
- 在设置手动提交时,如果我们是在处理完消息后提交commit,那么在commit这一步发生了失败,导致offset没有被更新, 就会导致重复消费的问题。
比起数据丢失,重复消费是符合业务预期的,我们可以通过一些幂等性设计来规避这个问题。
4 代码实战
https://github1s.com/asong2020/Golang_Dream/blob/master/code_demo/kafka_demo
(1) 解决push消息丢失问题
主要是通过两点来解决:
- 通过设置RequiredAcks模式来解决,选用WaitForAll可以保证数据推送成功,不过会影响时延时
引入重试机制,设置重试次数和重试间隔
// broker.go
func NewAsyncProducer() sarama.AsyncProducer {
cfg := sarama.NewConfig()
version, err := sarama.ParseKafkaVersion(VERSION)
if err != nil {
log.Fatal("NewAsyncProducer Parse kafka version failed", err.Error())
return nil
}
cfg.Version = version
cfg.Producer.RequiredAcks = sarama.WaitForAll // 三种模式任君选择
cfg.Producer.Partitioner = sarama.NewHashPartitioner
cfg.Producer.Return.Successes = true
cfg.Producer.Return.Errors = true
cfg.Producer.Retry.Max = 3 // 设置重试3次
cfg.Producer.Retry.Backoff = 100 * time.Millisecond
cli, err := sarama.NewAsyncProducer([]string{ADDR}, cfg)
if err != nil {
log.Fatal("NewAsyncProducer failed", err.Error())
return nil
}
return cli
}
(2) 解决pull消息丢失问题
我们这里使用的是自动提交,说好的使用手动提交呢?这是因为我们这个kafka库的特性不同,这个自动提交需要与MarkMessage()方法配合使用才会提交, 否则也会提交失败
// broker.go
func NewConsumerGroup(group string) sarama.ConsumerGroup {
cfg := sarama.NewConfig()
version, err := sarama.ParseKafkaVersion(VERSION)
if err != nil {
log.Fatal("NewConsumerGroup Parse kafka version failed", err.Error())
return nil
}
cfg.Version = version
cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
cfg.Consumer.Offsets.Retry.Max = 3
cfg.Consumer.Offsets.AutoCommit.Enable = true // 开启自动提交,需要手动调用MarkMessage才有效
cfg.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second // 间隔
client, err := sarama.NewConsumerGroup([]string{ADDR}, group, cfg)
if err != nil {
log.Fatal("NewConsumerGroup failed", err.Error())
}
return client
}
因为我们在写消费逻辑时要这样写:
func (e EventHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
var data common.KafkaMsg
if err := json.Unmarshal(msg.Value, &data); err != nil {
return errors.New("failed to unmarshal message err is " + err.Error())
}
// 操作数据,改用打印
log.Print("consumerClaim data is ")
// 处理消息成功后标记为处理, 然后才会真正提交
session.MarkMessage(msg,"")
}
return nil
}
或者直接使用手动提交方法来解决,只需两步:
关闭自动提交:
consumerConfig.Consumer.Offsets.AutoCommit.Enable = false // 禁用自动提交,改为手动
消费逻辑中添加如下代码,手动提交模式下,也需要先进行标记,在进行commit
session.MarkMessage(msg,"")
session.Commit()