如何保证消息不丢失?

kafka只对”已提交”的消息做有限度的持久化保证

  • 已提交是指有一个broker(或所有broker, 取决于配置)接收到消息, 并写入日志文件
  • 有限度是指, 至少需要1个Broker存活才能保证消息不丢失

Producer端

  • Producer使用带回调通知的发送API, producer.send(msg, callback), 对提交失败的场景做针对性处理, 如重试等
  • 设置Producer的retries>0
  • 设置 acks=all, 表示所有副本Broker都需接收到消息, 才算已提交

Consumer端

  • 设置enable.auto.commit=false, 采用手动提交位移
  • Consumer维持先消费, 再更新位移的顺序, 避免更新位移后消费报错(但可能导致重复消费, 最好消费端做幂等处理)
  • 如果是多线程异步处理消费消息, Consumer程序不要开启自动提交位移, 用手动提交, 防止子线程处理失败而位移已自动更新

Broker端

  • 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
  • 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
  • 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
  • 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。