简介

groupid相同就属于同一个消费组

  1. 每个consumer都要属于一个consumer.group,就是一个消费组,topic的一个分区只会分配给 一个消费组下的一个consumer来处理,每个consumer可能会分配多个分区,也有可能某个consumer没有分配到任何分区
  2. 如果想要实现一个广播的效果,那只需要使用不同的group id去消费就可以。topicA: partition0、partition1 groupA:consumer1:消费 partition0 consuemr2:消费 partition1 consuemr3:消费不到数据 groupB: consuemr3:消费到partition0和partition1
  3. 如果consumer group中某个消费者挂了,此时会自动把分配给他的分区交给其他的消费者,如果他又重启了,那么又会把一些分区重新交还给他

偏移量管理

  1. 每个consumer内存里数据结构保存对每个topic的每个分区的消费offset,定期会提交offset,老版本是写入zk,但是那样高并发请求zk是不合理的架构设计,zk是做分布式系统的协调的,轻量级的元数据存储,不能负责高并发读写,作为数据存储。
  2. 现在新的版本提交offset发送给kafka内部topic:__consumer_offsets,提交过去的时候, key是group.id+topic+分区号,value就是当前offset的值,每隔一段时间,kafka内部会对这个topic进行compact(合并),也就是每个group.id+topic+分区号就保留最新数据。
  3. __consumer_offsets可能会接收高并发的请求,所以默认分区50个(leader partitiron -> 50 kafka),这样如果你的kafka部署了一个大的集群,比如有50台机器,就可以用50台机器来抗offset提交的请求压力. 消费者 -> broker端的数据 message -> 磁盘 -> offset 顺序递增 从哪儿开始消费?-> offset 消费者(offset)

    消费异常感知

    heartbeat.interval.ms:consumer心跳时间间隔,必须得与coordinator保持心跳才能知道consumer是否故障了, 然后如果故障之后,就会通过心跳下发rebalance的指令给其他的consumer通知他们进行rebalance的操作 session.timeout.ms:kafka多长时间感知不到一个consumer就认为他故障了,默认是10秒 max.poll.interval.ms:如果在两次poll操作之间,超过了这个时间,那么就会认为这个consume处理能力太弱了,会被踢出消费组,分区分配给别人去消费,一般来说结合业务处理的性能来设置就可以了。

    核心参数解释

    fetch.max.bytes:获取一条消息最大的字节数,一般建议设置大一些,默认是1M 其实我们在之前多个地方都见到过这个类似的参数,意思就是说一条信息最大能多大?

  4. Producer 发送的数据,一条消息最大多大, -> 10M

  5. Broker 存储数据,一条消息最大能接受多大 -> 10M

Consumer max.poll.records: 一次poll返回消息的最大条数,默认是500条
connection.max.idle.ms:consumer跟broker的socket连接如果空闲超过了一定的时间,此时就会自动回收连接,但是下次消费就要重新建立socket连接,这个建议设置为-1,不要去回收
enable.auto.commit: 开启自动提交偏移量
auto.commit.interval.ms: 每隔多久提交一次偏移量,默认值5000毫秒
_consumer_offset auto.offset.reset:earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 topica -> partition0:1000 partitino1:2000 latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常