前提
- Kafka 只对 已提交 的消息(committed message)做有限度的持久化保证。
- 已提交的消息: 当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交
- 看你如何定义已提交,因为 partition 是跨 broker 的,写入 leader replication,follow replication 异步拉取
- 如果要 leader replication 写入成功算已提交,则只要一个 broker 响应即可
- 如果要 follower replication 写入成功算已提交,则需要多个 broker 响应
- 看你如何定义已提交,因为 partition 是跨 broker 的,写入 leader replication,follow replication 异步拉取
- 有限度的持久化保证: 至少有一个 broker 存活
- 已提交的消息: 当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交
最佳实践
Producer
不要使用
producer.send(msg)
,而要使用producer.send(msg, callback)
。记住,一定要使用带有回调通知的 send 方法。- 异步发送不带回调,可能导致发送失败都不知道
- 网络抖动,导致消息压根就没有发送到 Broker 端
- 消息本身不合格导致 Broker 拒绝接收(比如消息太大了,超过了 Broker 的承受能力)等
- 带回调才能判断是否提交成功
- 异步发送不带回调,可能导致发送失败都不知道
设置
acks = all
。代表了你对 “已提交” 消息的定义。- 如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是”已提交”。
- follower replication 都要同步完成
- 这是最高等级的 “已提交” 定义。
- 如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是”已提交”。
设置
retries
为一个较大的值。是关于 Producer 的自动重试。- 当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了
retries > 0
的 Producer 能够自动重试消息发送,避免消息丢失。
- 当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了
Broker
设置
unclean.leader.election.enable = false
。控制的是哪些 Broker 有资格竞选分区的 Leader。- 如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。
- 一般都要将该参数设置成 false,即不允许这种情况的发生。
- 因为分区的 leader replication 和 follower replication 不在同一个 broker 上
设置
replication.factor >= 3
。- 最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
设置
min.insync.replicas > 1
。- 控制的是消息至少要被写入到多少个副本才算是”已提交”。
- 控制”已提交”的下限
- 设置成大于 1 可以提升消息持久性。
- 在实际环境中千万不要使用默认值 1
- 控制的是消息至少要被写入到多少个副本才算是”已提交”。
确保
replication.factor
>min.insync.replicas
。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。- 推荐设置成 replication.factor = min.insync.replicas + 1
consumer
- 维持先消费消息,再更新位移的顺序即可。
- 这样就能最大限度地保证消息不丢失。
- 导致可能导致重复消费
enable.auto.commit
- 最好把它设置成 false,并采用手动提交位移的方式。
- 对于 Consumer 多线程处理的场景而言是至关重要的。
- 最好把它设置成 false,并采用手动提交位移的方式。