延时操作

当客户端生成消息时,设置了ack=all, Kafka Leader写入消息后需要等待ISR所有的副本都同步完成或捕获超时异常,才返回响应给客户端。等待消息写入ISR的过程中,Kafka会创建延时生产操作(DelayedProduce),用来处理消息正常写入所有副本或超时的情况,以返回响应给客户端。如图所示:
1)Leader收到消息
image.png
2)Leader写入消息,创建延时操作并等待执行
image.png
3)所有副本同步完成后更新HW触发延时操作自动自行
image.png
所有的延时操作都有超时时间,如延时生产(DelayedProduce)的默认时间通过配置request.timeout.ms来指定,默认30秒,延时拉取(DelayedFetch),如果在超时时间里没有完成既定的任务,则会强制完成以返回响应给客户端。

延时操作通过外部事件触发可以在超时时间之前完成。对于延时生产而言,它的外部事件就是分区的高水位(HW)发生增长,也就是Leader的HW发生增长。也就是说随着Follower与Leader的不断同步消息,促使Leader的HW进一步增长,HW的每一次增长都会检测是否能够完成此次延时生产操作(我理解这里应该是对比Leader的LEO与HW是否一致,因为Leader的LEO表示客户端所有的消息写入完成,HW表示ISR副本已经同步完成,如果相同则表示所有副本都已同步完成)。如果可以完成,则返回响应给客户端,如果在超时时间内无法完成,则强制执行返回。

Kafka内部通过炼狱组件来处理延时操作,基于外部事件或超时时间来触发操作的执行。
如图所示:延时生产的细节
image.png

控制器

在Kafka集群中往往有多个Broker。其中有一个Broker会选举为控制器,它负责管理整个集群的分区和副本状态,当前检查某个分区的Leader出现故障时,由控制器负责为该分区选举新的Leader副本。当检查到ISR集合发生变化时,有控制器通知所有Broker更新其元数据。当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样也有控制器负责分区的重新分配。

控制器的选举

在任意时刻,集群中只有一个控制器。Kafka的每个Broker在启动时,都尝试去ZooKeeper上先读取/controller节点的brokerid,如果不为-1,则认为有其他Broker已经成为控制器,所以当前Broker放弃竞选。如果不存在/controller节点,则会创建临时节点/controller, 基于zk的并发控制创建成功的Broker成为控制器,zk中/controller节点存储的内容为:

  1. {"version":1, "brokerid":0, "timestamp":"15100121212"}

其中version固定为1,brokerid表示成为控制器的Broker的编号(配置文件中指定),timestamp表示竞选成为控制器的时间戳。

Broker竞选控制器成功后,每个Broker都会收到通知,然后每个Broker会保存当前控制器的BrokerID,这个值标识为:activeControllerId。

控制器会在zk上注册很多监听器,其他的Broker则只需要监听/controller节点,用以感知节点的数据变化。当/controller节点发生变化时,每个Broker都会更新自身内存中的activeControllerId.如果控制器Broker的activeControllerId在变化前后不一致,则需要“退位”,让出控制器角色。此时该Broker需要关闭相应的资源,关闭状态机,注销监听器等。有可能是控制器异常而被下线,造成/controller临时节点被zk自动删除。 /controller节点被删除时,每个Broker则会再次竞选控制器,过程之前类似。

分区Leader的选举

当以下情况时,控制器会执行分区Leader的选举:

  • 创建分区时(创建主题或增加分区,都会创建分区)
  • 分区上线时(原有Leader副本下线,需要重新选举Leader副本)

选举分区的策略较为简单:按照AR集合中副本的顺序查找第一个存活的副本,并且这个副本在ISR中存在。
一个分区的AR集合在分配时就指定,并且在不发生重新分配时,集合内部副本的顺序是保持不变的,但是ISR中的副本顺序可能会不断改变。

如果ISR中没有可用的副本,控制器会检查不完全选举是否开启,如果unclean.leader.election.enable为true,(默认为false), 则从AR中找到第一个存活的副本即为Leader。

当分区重分配时,选举策略为:从重分配的AR集合中找到第一个存活的副本,并且这个副本在ISR集合中

当发生优先副本选举时,直接将优先副本设置为Leader,AR集合中第一个副本即为优先副本。

当某节点下线或优雅关闭时,位于这个节点上的Leader也会被下线,此时也会选举Leader,选举策略为:从AR集合中找到第一个存活的副本,且这个副本在ISR中,且这个副本不在正在下线的Broker上。

Broker端参数

参数名称 默认值 参数说明
auto.create.topics.enable true 是否自动创建主题
delete.topic.enable true 是否可以删除主题
compression.type producer 消息的压缩类型。Kafka支持的压缩类型有Gzip,Snappy,Lz4,默认为producer,表示使用生产者选用的压缩类型。
uncompressed表示不启用压缩
log.flush.interval.messages Long.MAX_VALUE 刷盘的日志大小阈值。如果写入的日志大小达到该参数值,则会强制写入操作系统页缓存,由OS写入磁盘。如果期间掉电,则会丢失消息。调小这个参数会提高可靠性,但是会影响系统性能
log.retention.bytes -1 分区级别的日志保留最大值
log.retention.hours 168(7天) 日志文件的保留时间,单位小时
log.segments.bytes 1GB 日志分段文件的最大值,超过这个值则会创建一个新的日志段
min.insync.replicas 1 ISR中最小的副本数,即最小的同步副本数
broker.rack Broker的机架信息

**