关于kafka消息老化与偏移量关系说明:

1.kafka会根据消息量来逐步建立多个segment文件存储消息,topic消息老化是指当kafka发现segment文件的时间超过设置的消息老化的时间(通过log.retention.minutes全局设置,也可在线针对topic单独设置)就会将该文件打上delete标记,然后由专门的删除任务进行删除
2.消息堆积是指当前消费偏移量和当前生产消息偏移量之间的差。
3.topic的消息老化和消息堆积没有直接关联。topic消息老化后不会让topic的当前消费偏移量与当前生产消息偏移量保持一致。如果需要恢复偏移量需要通过如下方式:
1)重置偏移量进度(需要在消息组未活跃的状态下执行)
2)消费者组继续消费该topic(前提条件是topic中有消息,消息未被老化,然后消费者去消费才会更新offset)
3)等待offset老化时间达到(offset老化的前提条件是消费者组已经不活跃,否者永远不会触发消息老化,老化时间达到会主动删除消费者组,消费者组是由客户端创建的,被主动删除后不会影响下次消费)
4)删除消费者组(需要在消息组未活跃的状态下执行)
4.Kafka通过offsets.retention.minutes参数控制消费组中offsets保留时间,在此时间内如果没有提交offset,并且consumer-ID(消费者)不在线(consumer-ID为空),offsets将会被删除。
当前offsets.retention.minutes默认为10080分钟,即7天一般要比老化时间大。当Kafka判定消息组中没有在线的消费者(如empty状态),且没有offsets时,将会删除此消费组。
关于offsets.retention.minutes老化问题,经过测试是先删除消费者组后才会去删除offsets记录,而且必须是消费者组状态不活跃的前提下。下面为测试的日志通过日志可以分析分到以上结论
[2022-06-28 03:10:16,658] INFO [GroupMetadataManager brokerId=0] Group test-group transitioned to Dead in generation 4 (kafka.coordinator.group.GroupMetadataManager)
[2022-06-28 03:10:16,663] INFO [GroupMetadataManager brokerId=0] Removed 1 expired offsets in 8 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

image.png
5.当topic的offsets记录被删除后,客户端消费时会根据auto.offset.reset的配置进行处理,auto.offset.reset属于客户端配置,默认配置为latest(表示当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据