前提条件
Kafka作为消息组件,只有在特定条件下,才能保证不丢失数据。
生产端消息已提交即Committed message。kafka对于conmitted message的定义为:生产者提交消息到broker,并等到多个broker确认并返回消息给生产者。
Acks:
- acks=0:无需服务端的Response、性能较高、丢数据风险较大。
- acks=1:服务端主节点写成功即返回Response、性能中等、丢数据风险中等、主节点宕机可能导致数据丢失。
- acks=all:服务端主节点写成功且备节点同步成功才返回Response、性能较差、数据较为安全、主节点和备节点都宕机才会导致数据丢失。
- 一般建议选择acks=1,重要的服务可以设置acks=all。
Kafka分布式集群必须要有broker能正常工作,能对消息持久化做保证。
如何保证下次不丢失
生产端
Producer端发送消息有多个API
// 异步发送不带回调方法Future<RecordMetadata> send(ProducerRecord<K, V> record);// 异步发送带回调方法Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
当Producer端调用send方法时,如果不获取执行结果,通常不能保证消息发送已经完成,可能会出现
网络波动,消息未发送到Broker端
- 消息本身不合规,Broker拒绝接受。如:消息过大,超出Broker限制。
当想从生产端保证消息不丢失时,可以采用带有回调方法的send方法,在回调方法里进行发送失败的处理。
服务端
服务端必须保证分布式集群里最少有一个Broker能持久化消息
消费端
通过手动控制offset控制消息的的提交,采用消息分批处理提交的方式,消息处理失败时做好失败处理方案。
实践配置
- producer端使用producer.send(msg, callback)带有回调的send方法。
- 设置acks = all。acks是Producer的一个参数,代表“已提交”消息的定义。如果设置成all,则表明所有Broker都要接收到消息,该消息才算是“已提交”。
- 设置retries为一个较大的值。同样是Producer的参数。当出现网络抖动时,消息发送可能会失败,此时配置了retries的Producer能够自动重试发送消息,尽量避免消息丢失。
- 设置unclean.leader.election.enable = false。这是Broker端的参数,在kafka版本迭代中社区也多次反复修改过他的默认值,之前比较具有争议。它控制哪些Broker有资格竞选分区的Leader。如果一个Broker落后原先的Leader太多,那么它一旦成为新的Leader,将会导致消息丢失。故一般都要将该参数设置成false。
- 设置replication.factor >= 3。这也是Broker端的参数。保存多份消息冗余,不多解释了。
- 设置min.insync.replicas > 1。Broker端参数,控制消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在生产环境中不要使用默认值 1。确保replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本离线,整个分区就无法正常工作了。推荐设置成replication.factor = min.insync.replicas + 1。
- 确保消息消费完成再提交。Consumer端有个参数enable.auto.commit,最好设置成false,并自己来处理offset的提交更新。
