1、防止消息丢失

同步模式下,确认机制设置为-1,即让消息写入Leader和Follower之后再确认消息发送成功;
异步模式下,为防止缓冲区满,可以在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态
首先对kafka进行限速, 其次启用重试机制,重试间隔时间设置长一些,最后Kafka设置acks=all,即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功

2、消息重复解决方案

消息可以使用唯一id标识
生产者(ack=all 代表至少成功发送一次)
消费者 (消费者的配置enable.auto.commit设为false,禁止kafka自动提交offset,offset手动提交,业务逻辑成功处理后,提交offset)
落表(主键或者唯一索引的方式,避免重复数据)
业务逻辑处理(选择唯一主键存储到Redis或者mongdb中,先查询是否存在,若存在则不处理;若不存在,先插入Redis或Mongdb,再进行业务逻辑处理)
调整session-time和maxpollrecordsconfig的值

3、ISR机制

kafka 为了保证数据的一致性使用了isr 机制
1 首先我们知道kafka 的数据是多副本的,每个topic 下的每个分区下都有一个leader 和多个follower,
2 每个follower 的数据都是同步leader的 这里需要注意 是follower 主动拉取leader 的数据
注意问题:follewer 只是数据的副本提供数据的可恢复性,本身和kafka 的读写性能无关(kafka的读写都是和leader 相关)
3 那么问题就出来了 虽然每个分区都有多个副本,但是如何确定副本的数据和leader 的数据是同步的?
isr 的全称是:In-Sync Replicas isr 是一个副本的列表,里面存储的都是能跟leader 数据一致的副本,确定一个副本在isr列表中,有2个判断条件
条件1:根据副本和leader 的交互时间差,如果大于某个时间差 就认定这个副本不行了,就把此副本从isr 中剔除,此时间差根据 配置参数rerplica.lag.time.max.ms=10000 决定 单位ms
条件2:根据leader 和副本的信息条数差值决定是否从isr 中剔除此副本,此信息条数差值根据配置参数rerplica.lag.max.messages=4000 决定 单位 条 0.9版本移除了该条件
isr 中的副本删除或者增加 都是通过一个周期调度来管理的
4 kafka 根据isr 机制和消息的ack方式保证的数据的一致性和保证幂等性(消息是否会重复消费。发送等)
min.insync.replicas=n 配置参数表示 当满足了n个副本的消息确认(n默认为1,最好大于1,因为leader 也在isr 列表中),才认为这条消息是发送成功的
min.insync.replicas 参数只有配合request.required.acks =-1 时才能达到最大的可靠性
request.required.acks 的参数说明:
0:生产者只管发送,不管服务器,消费者是否收到信息
1:只有当leader 确认了收到消息,才确认此消息发送成功
-1:只有isr 中的n-1个副本(leader 除外所以n-1)都同步了消息 此消息才确认发送成功
注意生产者发送的消息只有在确认发送成功后 才能被消费者消费