消费方式
- 拉模式:Kafka采用拉模式
- 推模式
原因:每个消费者的消费能力可能都不一样,采用推模式的话不灵活
消费者工作流程
- 消费者组初始化
- 每个broker中都有一个coordinator
- 多个broker分摊50个__consumer_offset
- 通过消费者组groupid计算后,落在哪个consumer_offset,这个consumer_offset所在的broker的coordinator辅助整个消费者组的分区分配
- coordinator选出消费者组中的一个consumer作为leader
- coordinator把消费的topic情况发给consumer-leader
- consumer-leader负责指定消费方案
- consumer-leader消费方案发给coordinator
- coordinator下发方案到每个consumer
- consumer和coordinator保持心跳 3s,一旦超过消费者移除,或者消费者消费时间过长也移除
分区会重新平衡分配
消费者组详细消费流程
- 消费者通过ConsumerNetworkClient工具拉取消息
- Fetch.min.bytes:每次抓取大小
- fetch.max.wait.ms:最小值未达到的超时时间
- fetch.max:byts: 每次抓取最大
- max.poll.records:最大条数
-
消费者重要参数
分区的分配和再平衡
分区分配策略
Range:余数依次分配给前面几个消费者
- RoundRobin:轮询
- Sticky:数量上和Range分配一致,但是每个消费者的分区号不规律,是随机的
- CooperativeRange:
默认:Range+CooperativeRange
Offset
- 0.9开始会有一个记录__consumer_offset的系统自建主题,默认不可消费,可以通过
exclude.internal.topics=false
设置为可以消费。
自动提交
消费者每间隔一段时间自动提交offset
手动提交
消费者通过API调用提交
- 异步
- 同步
指定Offset消费
auto.offset.reset = earliest | latest | none 默认是 latest
漏消费
offset手动提交后,消费者还没消费完这批数据,woc这个不能等消费完再提交吗????
重复消费
自动提交offset会产生。A消费完成后,由于是自动提交,所以消费完成后还未来得及提交就挂了,此时再启动。offset由于未提交,会重复消费
消费者事务
消息积压
- 增加分区和消费者数
- 提高每次拉取数据大小