日志存储
    文件目录布局
    Kafka 中的消息是以主题为基本单位进行归类的,各个主题在逻辑上相互独立。每个主题又可以分为一个或多个分区,分区的数量可以在主题创建的时候指定,也可以在之后修改。每条消息在发送的时候会根据分区规则被追加到指定的分区中,分区中的每条消息都会被分配一个唯一的序列号,也就是通常所说的偏移量(offset),具有4个分区的主题的逻辑结构见图1-2。如果分区规则设置得合理,那么所有的消息可以均匀地分布到不同的分区中,这样就可以实现水平扩展。不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka又引入了日志分段(LogSegment)的概念,将Log切分为多个LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。事实上,Log 和LogSegment 也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以“.txnindex”为后缀的事务索引文件)。图4-1描绘了主题、分区与副本之间的关系,在图5-1中又补充了Log和LogSegment的关系。
    image.png

    Log对应了一个命名形式为<topic>-<partition>的文件夹 ,在4.1.1节中我们知道Log对应了一个命名形式为<topic>-<partition>的文件夹。举个例子,假设有一个名为“topic-log”的主题,此主题中具有 4 个分区,那么在实际物理存储上表现为“topic-log-0”“topic-log-1”“topic-log-2”“topic-log-3”这4个文件夹:

    向Log 中追加消息时是顺序写入的,只有最后一个 LogSegment 才能执行写入操作,在此之前所有的 LogSegment 都不能写入数据。为了方便描述,我们将最后一个 LogSegment 称为“activeSegment”,即表示当前活跃的日志分段。随着消息的不断写入,当activeSegment满足一定的条件时,就需要创建新的activeSegment,之后追加的消息将写入新的activeSegment。为了便于消息的检索,每个LogSegment中的日志文件(以“.log”为文件后缀)都有对应的两个索引文件:偏移量索引文件(以“.index”为文件后缀)和时间戳索引文件(以“.timeindex”为文件后缀)
    image.png
    日志索引
    每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率。偏移量索引文件用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置;时间戳索引文件则根据指定的时间戳(timestamp)来查找对应的偏移量信息。

    Kafka 中的索引文件以稀疏索引(sparse index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(由 broker 端参数 log.index.interval.bytes指定,默认值为4096,即4KB)的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,增大或减小log.index.interval.bytes的值,对应地可以增加或缩小索引项的密度。

    日志分段文件达到一定的条件时需要进行切分,那么其对应的索引文件也需要进行切分。日志分段文件切分包含以下几个条件,满足其一即可

    (1)当前日志分段文件的大小超过了 broker 端参数 log.segment.bytes 配置的值。log.segment.bytes参数的默认值为1073741824,即1GB。

    (2)当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于 log.roll.ms或log.roll.hours参数配置的值。如果同时配置了log.roll.ms和log.roll.hours参数,那么log.roll.ms的优先级高。默认情况下,只配置了log.roll.hours参数,其值为168,即7天。(3)偏移量索引文件或时间戳索引文件的大小达到broker端参数log.index.size.max.bytes配置的值。log.index.size.max.bytes的默认值为10485760,即10MB。

    (4)追加的消息的偏移量与当前日志分段的偏移量之间的差值大于Integer.MAX_VALUE,即要追加的消息的偏移量不能转变为相对偏移量(offset-baseOffset>Integer.MAX_VALUE)。

    偏移量索引

    偏移量索引项的格式如图5-8所示。每个索引项占用8个字节,分为两个部分。(1)relativeOffset:相对偏移量,表示消息相对于baseOffset 的偏移量,占用4 个字节,当前索引文件的文件名即为baseOffset的值。(2)position:物理地址,也就是消息在日志分段文件中对应的物理位置,占用4个字节。
    image.png
    时间戳索引
    image.png
    每个索引项占用12个字节,分为两个部分。(1)timestamp:当前日志分段最大的时间戳。(2)relativeOffset:时间戳所对应的消息的相对偏移量

    日志清理
    Kafka提供了两种日志清理策略。

    (1)日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段。

    (2)日志压缩(Log Compaction):针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本。

    我们可以通过broker端参数log.cleanup.policy来设置日志清理策略,此参数的默认值为“delete”,即采用日志删除的清理策略。如果要采用日志压缩的清理策略,就需要将log.cleanup.policy设置为“compact”,并且还需要将log.cleaner.enable(默认值为true)设定为true。通过将log.cleanup.policy参数设置为“delete,compact”,还可以同时支持日志删除和日志压缩两种策略。日志清理的粒度可以控制到主题级别,比如与log.cleanup.policy对应的主题级别的参数为 cleanup.policy,

    日志删除

    在Kafka的日志管理器中会有一个专门的日志删除任务来周期性地检测和删除不符合保留条件的日志分段文件,这个周期可以通过broker端参数log.retention.check.interval.ms来配置,默认值为300000,即5分钟。当前日志分段的保留策略有3种:基于时间的保留策略、基于日志大小的保留策略和基于日志起始偏移量的保留策略。

    ①基于时间:日志删除任务会检查当前日志文件中是否有保留时间超过设定的阈值(retentionMs)来寻找可删除的日志分段文件集合(deletableSegments).retentionMs可以通过broker端参数log.retention.hours、log.retention.minutes和log.retention.ms来配置,其中log.retention.ms 的优先级最高,log.retention.minutes 次之,log.retention.hours最低。默认情况下只配置了log.retention.hours参数,其值为168,故默认情况下日志分段文件的保留时间为7天。

    ②基于日志大小:日志删除任务会检查当前日志的大小是否超过设定的阈值(retentionSize)来寻找可删除的日志分段的文件集合(deletableSegments),如图5-14所示。retentionSize可以通过broker端参数log.retention.bytes来配置,默认值为-1,表示无穷大。注意log.retention.bytes配置的是Log中所有日志文件的总大小,而不是单个日志分段(确切地说应该为.log日志文件)的大小。单个日志分段的大小由 broker 端参数 log.segment.bytes 来限制,默认值为1073741824,即1GB

    ③3.基于日志起始偏移量:

    日志压缩

    为了防止日志不必要的频繁清理操作,Kafka 还使用了参数log.cleaner.min.cleanable.ratio(默认值为0.5)来限定可进行清理操作的最小污浊率。Kafka中用于保存消费者消费位移的主题__consumer_offsets使用的就是Log Compaction策略。

    磁盘存储
    Kafka 依赖于文件系统(更底层地来说就是磁盘)来存储和缓存消息。在我们的印象中,对于各个存储介质的速度认知大体同图5-20所示的相同,层级越高代表速度越快。很显然,磁盘处于一个比较尴尬的位置,这不禁让我们怀疑Kafka 采用这种持久化形式能否提供有竞争力的性能。在传统的消息中间件 RabbitMQ 中,就使用内存作为默认的存储介质,而磁盘作为备选介质,以此实现高吞吐和低延迟的特性。然而,事实上磁盘可以比我们预想的要快,也可能比我们预想的要慢,这完全取决于我们如何使用它。
    image.png
    页缓存

    页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘 I/O 的操作。具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问

    零拷贝

    除了消息顺序追加、页缓存等技术,Kafka还使用零拷贝(Zero-Copy)技术来进一步提升性能。所谓的零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。零拷贝大大提高了应用程序的性能,减少了内核和用户模式之间的上下文切换。

    深入服务端
    协议设计
    Kafka自定义了一组基于TCP的二进制协议,只要遵守这组协议的格式,就可以向Kafka发送消息,也可以从Kafka中拉取消息,或者做一些其他的事情,比如提交消费位移等。在目前的 Kafka 2.0.0 中,一共包含了 43 种协议类型,每种协议类型都有对应的请求(Request)和响应(Response),它们都遵守特定的协议模式。每种类型的Request都包含相同结构的协议请求头(RequestHeader)和不同结构的协议请求体(RequestBody

    时间轮
    Kafka中存在大量的延时操作,比如延时生产、延时拉取和延时删除等。Kafka并没有使用JDK自带的Timer或DelayQueue来实现延时的功能,而是基于时间轮的概念自定义实现了一个用于延时功能的定时器(SystemTimer)

    Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)

    image.png

    延时操作
    如果在使用生产者客户端发送消息的时候将 acks 参数设置为-1,那么就意味着需要等待ISR集合中的所有副本都确认收到消息之后才能正确地收到响应的结果,或者捕获超时异常。如果在一定的时间内,follower1副本或follower2副本没能够完全拉取到消息3和消息4,那么就需要返回超时异常给客户端。生产请求的超时时间由参数request.timeout.ms配置,默认值为30000,即30s。

    延时操作创建之后会被加入延时操作管理器(DelayedOperationPurgatory)来做专门的处理。延时操作有可能会超时,每个延时操作管理器都会配备一个定时器(SystemTimer)来做超时管理,定时器的底层就是采用时间轮(TimingWheel)实现的

    题外话:在Kafka中将延时操作管理器称为DelayedOperationPurgatory,这个名称比之前提及的ExpiredOperationReaper和SkimpyOffsetMap的取名更有意思。Purgatory直译为“炼狱”,但丁的《神曲》中有炼狱的相关描述。炼狱共有9层,在生前犯有罪过但可以得到宽恕的灵魂,按照人类的七宗罪(傲慢、忌妒、愤怒、怠惰、贪财、贪食、贪色)分别在这里修炼洗涤,而后一层层升向光明和天堂。Kafka 中采用这一称谓,将延时操作看作需要被洗涤的灵魂,在炼狱中慢慢修炼,等待解脱升入天堂(即完成延时操作)。

    下图描绘了客户端在请求写入消息到收到响应结果的过程中与延时生产操作相关的细节,在了解相关的概念之后应该比较容易理解:如果客户端设置的 acks 参数不为-1,或者没有成功的消息写入,那么就直接返回结果给客户端,否则就需要创建延时生产操作并存入延时操作管理器,最终要么由外部事件触发,要么由超时触发而执行。
    image.png

    有延时生产就有延时拉取

    Kafka在处理拉取请求时,会先读取一次日志文件,如果收集不到足够多(fetchMinBytes,由参数fetch.min.bytes配置,默认值为1)的消息,那么就会创建一个延时拉取操作(DelayedFetch)以等待拉取到足够数量的消息。当延时拉取操作执行时,会再读取一次日志文件,然后将拉取结果返回给 follower 副本。延时拉取操作也会有一个专门的延时操作管理器负责管理,大体的脉络与延时生产操作相同,

    控制器
    在 Kafka 集群中会有一个或多个 broker,其中有一个 broker 会被选举为控制器(KafkaController),它负责管理整个集群中所有分区和副本的状态。当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配。

    控制器的选举及异常恢复
    Kafka中的控制器选举工作依赖于ZooKeeper,成功竞选为控制器的broker会在ZooKeeper中创建/controller这个临时(EPHEMERAL)节点,此临时节点的内容参考如下:

    在任意时刻,集群中有且仅有一个控制器。每个 broker 启动的时候会去尝试读取/controller节点的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其他 broker 节点成功竞选为控制器,所以当前 broker 就会放弃竞选;如果 ZooKeeper 中不存在/controller节点,或者这个节点中的数据异常,那么就会尝试去创建/controller节点。当前broker去创建节点的时候,也有可能其他broker同时去尝试创建这个节点,只有创建成功的那个broker才会成为控制器,而创建失败的broker竞选失败。每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerId。

    ZooKeeper 中还有一个与控制器有关的/controller_epoch 节点,这个节点是持久(PERSISTENT)节点,节点中存放的是一个整型的controller_epoch值。controller_epoch用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器,我们也可以称之为“控制器的纪元”。

    Kafka 通过 controller_epoch 来保证控制器的唯一性,进而保证相关操作的一致性。 请求或携带这个标识和和内存中的表示比较,如果小于内存的值,则认为请求的控制器液晶过期了

    具备控制器身份的broker需要比其他普通的broker多一份职责,具体细节如下:

    · 监听分区相关的变化。为ZooKeeper中的/admin/reassign_partitions 节点注册PartitionReassignmentHandler,用来处理分区重分配的动作。为 ZooKeeper 中的/isr_change_notification节点注册IsrChangeNotificetionHandler,用来处理ISR集合变更的动作。为ZooKeeper中的/admin/preferred-replica-election节点添加PreferredReplicaElectionHandler,用来处理优先副本的选举动作。

    · 监听主题相关的变化。为 ZooKeeper 中的/brokers/topics 节点添加TopicChangeHandler,用来处理主题增减的变化;为 ZooKeeper 中的/admin/delete_topics节点添加TopicDeletionHandler,用来处理删除主题的动作。

    · 监听broker相关的变化。为ZooKeeper中的/brokers/ids节点添加BrokerChangeHandler,用来处理broker增减的变化。

    · 从ZooKeeper中读取获取当前所有与主题、分区及broker有关的信息并进行相应的管理。对所有主题对应的 ZooKeeper 中的/brokers/topics/<topic>节点添加PartitionModificationsHandler,用来监听主题中的分区分配变化。

    · 如果参数 auto.leader.rebalance.enable 设置为 true,则还会开启一个名为“auto-leader-rebalance-task”的定时任务来负责维护分区的优先副本的均衡。

    在Kafka的早期版本中,并没有采用Kafka Controller这样一个概念来对分区和副本的状态进行管理,而是依赖于ZooKeeper,每个broker都会在ZooKeeper上为分区和副本注册大量的监听器(Watcher)。当分区或副本状态变化时,会唤醒很多不必要的监听器,这种严重依赖ZooKeeper 的设计会有脑裂、羊群效应,以及造成 ZooKeeper 过载的隐患,在目前的新版本的设计中,只有KafkaController在ZooKeeper上注册相应的监听器,其他的broker极少需要再监听ZooKeeper中的数据变化,这样省去了很多不必要的麻烦。不过每个broker还是会对/controller节点添加监听器,以此来监听此节点的数据变化(ControllerChangeHandler)。

    优雅关闭

    Kafka自身提供了一个脚本工具,就是存放在其bin目录下的kafka-server-stop.sh,这个脚本的内容非常简单,具体内容如下


    ————————————————
    版权声明:本文为CSDN博主「sunhyly」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/ashylya/article/details/106614527