Zookeeper 中存储的 Kafka 信息

image.png
image.png

Kafka Broker 的工作流程

image.png

Broker 的重要参数

参数名称 描述
replica.lag.time.max.ms ISR 集合中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。默认为:30s
auto.leader.rebalance.enable 自动 Leader Partition 平衡。默认为:true。
leader.imbalance.per.broker.percentage 每个 Broker 允许的不平衡的 Leader 的比率。如果每个 Broker 超过了这个值,控制器会触发 Leader 的平衡。默认为:10%
leader.imbalance.check.interval.seconds 检查 Leader 负载是否平衡的间隔时间。默认为:300 秒
log.segment.bytes Kafka 中 log 日志是分成一块一块存储的,
此配置是指:log 日志划分成块的大小,默认为:1G。
log.index.interval.bytes Kafka 里面每当写入了 4kb 大小的日志(.log),就会往 index 文件里面记录一个索引。默认为:4kb
log.retention.hours Kafka 中数据保存的时间,默认为:7 天
log.retention.minutes Kafka 中数据保存的时间,分钟级别,默认关闭
log.retention.ms Kafka 中数据保存的时间,毫秒级别,默认关闭
log.retention.check.interval.ms 每隔一定时间,检查一次数据是否保存超时,默认为:5 分钟
log.retention.bytes 超过设置的所有日志总大小,删除最早的 segment。
默认为:-1,表示无穷大
log.cleanup.policy 默认为:delete,表示所有数据启用删除策略
如果设置值为 compact,表示所有数据启用压缩策略。
num.io.threads 负责写磁盘的线程数。
整个参数值要占总核数的 50%。默认为:8个线程
num.replica.fetchers 副本拉取线程数,这个参数占总核数的 50% 的 1/3
num.network.threads 数据传输线程数,这个参数占总核数的 50% 的 2/3 。
默认是 3
log.flush.interval.messages 强制页缓存刷写到磁盘的条数,默认为:Long 的最大值
一般不建议修改,交给系统自己管理。
log.flush.interval.ms 每隔多久,刷数据到磁盘,默认为:null
一般不建议修改,交给系统自己管理。

节点服役 & 节点退役

当新启动一个 Broker,想把它加入已经 Kafka 集群,分担其他 Broker 的压力,需要进行服役新节点操作。
当想把一个 Broker 移除 Kafka 集群时,不能直接把该 Broker 停止服务,而是应该进行退役旧节点操作。

服役新节点

  1. 首先要准备一个新节点,并启动 Kafka 服务器。
  2. 执行负载均衡操作

下面对执行负载均衡操作,进行详细说明

  1. 创建一个要均衡的主题
  2. 生成一个负载均衡的计划
  3. 创建副本存储计划
  4. 执行副本存储计划
  5. 验证副本存储计划 ``` [atguigu @hadoop102 kafka] $ vim topics-to-move.json

向文件中输入以下内容

{ “topics”: [

  1. {
  2. "topic": "first"
  3. }
  4. ],
  5. "version": 1

}

[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh —bootstrap-server hadoop102:9092 —topics-to-move-json-file topics-to-move.json —broker-list “0,1,2,3” —generate

输入命令回车后,输出的内容

Current partition replica assignment {“version”:1,”partitions”:[{“topic”:”first”,”partition”:0,”replic as”:[0,2,1],”log_dirs”:[“any”,”any”,”any”]},{“topic”:”first”,”par tition”:1,”replicas”:[2,1,0],”log_dirs”:[“any”,”any”,”any”]},{“to pic”:”first”,”partition”:2,”replicas”:[1,0,2],”log_dirs”:[“any”,” any”,”any”]}]}

Proposed partition reassignment configuration {“version”:1,”partitions”:[{“topic”:”first”,”partition”:0,”replic as”:[2,3,0],”log_dirs”:[“any”,”any”,”any”]},{“topic”:”first”,”par tition”:1,”replicas”:[3,0,1],”log_dirs”:[“any”,”any”,”any”]},{“to pic”:”first”,”partition”:2,”replicas”:[0,1,2],”log_dirs”:[“any”,” any”,”any”]}]}

[atguigu@hadoop102 kafka]$ vim increase-replication-factor.json

向文件中输入以下内容(内容是复制上一步生成的负载均衡计划)

{“version”:1,”partitions”:[{“topic”:”first”,”partition”:0,”replic as”:[2,3,0],”log_dirs”:[“any”,”any”,”any”]},{“topic”:”first”,”par tition”:1,”replicas”:[3,0,1],”log_dirs”:[“any”,”any”,”any”]},{“to pic”:”first”,”partition”:2,”replicas”:[0,1,2],”log_dirs”:[“any”,” any”,”any”]}]}

[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh —bootstrap-server hadoop102:9092 —reassignment-json-file increase-replication-factor.json —execute

[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh —bootstrap-server hadoop102:9092 —reassignment-json-file increase-replication-factor.json —verify

输入命令回车后,输出的内容

Status of partition reassignment: Reassignment of partition first-0 is complete. Reassignment of partition first-1 is complete. Reassignment of partition first-2 is complete. Clearing broker-level throttles on brokers 0,1,2,3 Clearing topic-level throttles on topic first

  1. <a name="qirTX"></a>
  2. ## 退役旧节点
  3. 1. 执行负载均衡操作(具体的操作和服役新节点相同,只是生成负载均衡计划的命令中,去掉该 Broker ID)
  4. 2. 执行停止 kafka 服务器的命令
  5. <a name="EGN8G"></a>
  6. # Kafka 副本
  7. <a name="PnMa1"></a>
  8. ## 副本基本信息
  9. Kafka 副本的作用:提高数据可靠性。
  10. Kafka 默认副本数量:1 个,生产环境一般配置为 2 个,保证数据可靠性;<br />太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
  11. Kafka 中副本分为:Leader 和 Follower。<br />Kafka 的生产者只会把数据发往 Leader, 然后 Follower 找 Leader 进行同步数据。
  12. Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。<br />AR = ISR + OSR。
  13. ISR,表示和 Leader 保持同步的 Follower 集合。<br />如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。<br />该时间阈值由 replica.lag.time.max.ms 参数设定,默认时间为:30s。<br />Leader 发生故障之后,就会从 ISR 中选举新的 Leader。
  14. OSR,表示 Follower 与 Leader 副本同步时,延迟过多的副本。
  15. <a name="BUebI"></a>
  16. ## Leader 选举流程
  17. Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,该 Leader 负责管理集群 broker 的上下线,所有 topic 的分区副本分配 和 Leader 选举等工作。
  18. Controller 的信息同步工作依赖于 Zookeeper。<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/12877145/1650884132591-09f0ac45-b7e7-4b10-a540-8c3498d28ef2.png#clientId=u55ccf2cf-f0f2-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=385&id=u874e9d98&margin=%5Bobject%20Object%5D&name=image.png&originHeight=528&originWidth=1015&originalType=binary&ratio=1&rotation=0&showTitle=false&size=173071&status=done&style=shadow&taskId=uc1313149-8b1d-4e43-a076-483c8640297&title=&width=740)
  19. <a name="TQpRX"></a>
  20. ## Leader 和 Follower 故障处理细节
  21. LEO(Log End Offset):每个副本的最后一个 offset,LEO 其实就是最新的 offset + 1。<br />HW(High Watermark):所有副本中最小的 LEO 。
  22. ---
  23. **Follower 故障处理细节**
  24. 1. Follower 发生故障后会被临时踢出 ISR
  25. 2. 这个期间 Leader 和其他 Follower 继续接收数据
  26. 3. 等到该 Follower 恢复后,Follower 会读取本地磁盘记录的上一次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 Leader 进行同步。
  27. 4. 等到该 Follower 的 LEO ≥ 该 Partition 的 HW,即 Follower 追上 Leader 之后,它就可以重新加入 ISR 了。
  28. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/12877145/1650893941033-ddc536d9-0353-4643-9bd7-3b0053c58622.png#clientId=u55ccf2cf-f0f2-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=320&id=u00ca7991&margin=%5Bobject%20Object%5D&name=image.png&originHeight=412&originWidth=515&originalType=binary&ratio=1&rotation=0&showTitle=false&size=31067&status=done&style=shadow&taskId=ude739cc0-1660-4b86-82a5-448c37d5165&title=&width=400)
  29. ---
  30. **Leader 故障处理细节**
  31. 1. Leader 发生故障后,会从 ISR 中选出一个新的 Leader
  32. 2. 为了保证多个副本之间的数据一致性,其余的 Follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 Leader 同步数据。
  33. 注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/12877145/1650894031381-c65ff1d7-fa49-483f-a866-f338657c9818.png#clientId=u55ccf2cf-f0f2-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=321&id=u31c590e6&margin=%5Bobject%20Object%5D&name=image.png&originHeight=402&originWidth=476&originalType=binary&ratio=1&rotation=0&showTitle=false&size=27411&status=done&style=shadow&taskId=u6ee49278-bb00-4558-9157-76820f1989c&title=&width=380)
  34. <a name="p8HYI"></a>
  35. ## 分区副本分配
  36. 如果 kafka 服务器只有 4 个节点,那么设置 kafka 的分区数 > 服务器台数(即分区数 > 4),在 Kafka 底层如何分配存储副本呢?(或者说,副本存储在 Broker 的规律)
  37. ---
  38. 创建一个名为 second 的 Topic,设置为 16 分区,3 个副本,查看分区和副本情况
  39. ```bash
  40. [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092
  41. --create --partitions 16 --replication-factor 3 -- topic second
  42. [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092
  43. --describe --topic second
  44. Topic: second4 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
  45. Topic: second4 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
  46. Topic: second4 Partition: 2 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
  47. Topic: second4 Partition: 3 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1
  48. Topic: second4 Partition: 4 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3
  49. Topic: second4 Partition: 5 Leader: 1 Replicas: 1,3,0 Isr: 1,3,0
  50. Topic: second4 Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
  51. Topic: second4 Partition: 7 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
  52. Topic: second4 Partition: 8 Leader: 0 Replicas: 0,3,1 Isr: 0,3,1
  53. Topic: second4 Partition: 9 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
  54. Topic: second4 Partition: 10 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
  55. Topic: second4 Partition: 11 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0
  56. Topic: second4 Partition: 12 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
  57. Topic: second4 Partition: 13 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
  58. Topic: second4 Partition: 14 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
  59. Topic: second4 Partition: 15 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1

image.png

=======生产经验=======

以下都为生产经验。

手动调整分区副本存储

在生产环境中,每台服务器的配置和性能不一致,
但是 Kafka 只会根据自己的代码规则,创建对应的分区副本,就会导致个别服务器存储压力较大。
所有需要手动调整分区副本的存储。


需求:创建一个新的名为 three 的 Topic,4 个分区,2 个副本。将该 Topic 的所有副本都存储到 broker0 和
broker1 两台服务器上。
image.png
手动调整分区副本存储的步骤如下:

  1. 创建副本存储计划(所有副本都指定存储在 Broker0、Broker1 中)
  2. 执行副本存储计划
  3. 验证副本存储计划
  1. 创建副本存储计划:vim increase-replication-factor.json
  2. 输入的内容如下
  3. {
  4. "version": 1,
  5. "partitions": [{
  6. "topic": "three",
  7. "partition": 0,
  8. "replicas": [0, 1]
  9. },
  10. {
  11. "topic": "three",
  12. "partition": 1,
  13. "replicas": [0, 1]
  14. },
  15. {
  16. "topic": "three",
  17. "partition": 2,
  18. "replicas": [1, 0]
  19. },
  20. {
  21. "topic": "three",
  22. "partition": 3,
  23. "replicas": [1, 0]
  24. }
  25. ]
  26. }
  1. 执行副本存储计划
  2. [atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh -- bootstrap-server hadoop102:9092
  3. --reassignment-json-file increase-replication-factor.json --execute
  4. 验证副本存储计划
  5. [atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh -- bootstrap-server hadoop102:9092
  6. --reassignment-json-file increase-replication-factor.json --verify

Leader Partition 负载平衡

Leader Partition 自动平衡:正常情况下,Kafka 本身会自动把 Leader Partition 均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。
但是如果某些 Broker 宕机,Leader Partition 过于集中在其他少部分几台 Broker 上,这就会导致少数几台 Broker 的读写请求压力过高,其他宕机的 Broker 重启之后都是 Follower Partition,读写请求很低,造成集群负载不均衡。

参数名称 描述
auto.leader.rebalance.enable 自动 Leader Partition 平衡。
生产环境中,Leader 重新选举的代价比较大,可能会带来
性能影响,建议设置为 false 关闭。默认为:true
leader.imbalance.per.broker.percentage 每个 Broker 允许的不平衡的 Leader 的比率。
如果每个 Broker 超过了这个值,控制器 Controller 会触发 Leader 的平衡。默认为:10%
leader.imbalance.check.interval.seconds 检查 Leader 负载是否平衡的间隔时间。默认为:300 秒

下面举例说明,假设集群只有一个主题,如下图所示:
image.png
针对 Broker0 节点,分区 2 的 AR 优先副本是 0 节点,但是 0 节点却不是 Leader 节点,所以不平衡数 + 1,AR副本总数是 4,所以 Broker0 节点不平衡率为Kafka Broker - 图7,大于默认比率 10%,因此 Broker0 需要再平衡。
Broker2、Broker3 节点 和 Broker0 不平衡率一样,都需要再平衡。
Broker1 节点的不平衡数为 0,因此 Broker1 不需要再平衡。

增加副本因子

在生产环境当中,由于某个 Topic 的重要等级需要提升,我们考虑增加副本的数量。
副本数量的增加需要先制定计划,然后根据计划执行。


添加副本因子的步骤如下:(相当于手动调整分区副本存储)

  1. 创建副本存储计划(所有副本都指定存储在 Broker0、Bbroker1、Broker2 中)。
  2. 执行副本存储计划
  3. 验证副本存储计划
  1. [atguigu@hadoop102 kafka]$ vim increase-replication-factor.json
  2. {
  3. "version": 1,
  4. "partitions": [{
  5. "topic": "four",
  6. "partition": 0,
  7. "replicas": [0, 1, 2]
  8. }, {
  9. "topic": "four",
  10. "partition": 1,
  11. "replicas": [0, 1, 2]
  12. }, {
  13. "t opic": "four",
  14. "partition": 2,
  15. "replicas": [0, 1, 2]
  16. }]
  17. }
  1. 执行副本存储计划
  2. [atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh -- bootstrap-server hadoop102:9092
  3. --reassignment-json-file increase-replication-factor.json --execute
  4. 验证副本存储计划
  5. [atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh -- bootstrap-server hadoop102:9092
  6. --reassignment-json-file increase-replication-factor.json --verify

文件存储

Topic 数据的存储机制

Topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 Producer 生产的数据。

Producer 生产的数据会被不断追加到该 log 文件末端,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制, 将每个 partition 分为多个 segment。
每个 segment 包括:“.index”文件、“.log”文件和 “.timeindex” 等文件。
这些文件位于一个文件夹下,该文件夹的命名规则为:topic 名称 + 分区序号,例如:first-0。

image.png

Topic 数据到底存储在什么位置

Topic 数据存储在 server.properties 文件中设置的路径中,对应的文件夹下。
image.png
image.png
image.png


直接打开“.index”文件、“.log”文件后是乱码,这需要我们通过工具查看文件信息。
命令:kafka-run-class.sh kafka.tools.DumpLogSegments --files 绝对路径
image.png

Index 文件和 Log 文件详解

如何在“.log”文件中找指定 offset 的 Record

  1. 根据目标 offset 定位 Segment 文件
  2. 找到小于等于目标 offset 的最大 offset 对应的索引项(.index 文件)
  3. 定位到 log 文件
  4. 向下遍历找到目标 Record

注意:

  • Index 为稀疏索引,大约每往 log 文件写入 4kb 数据,会往 index 文件写入一条索引。

参数 log.index.interval.bytes 默认 4kb。

  • Index 文件中保存的 offset 为相对 offset,这样能确保 offset 的值所占空间不会过大, 因此能将 offset 的值控制在固定大小

image.png

参数 描述
log.segment.bytes Kafka 中 log 日志是分成一块块存储的,
此配置是指 log 日志划分成块的大小,默认为 1G
log.index.interval.bytes Kafka 里面每当写入了 4kb 大小的日志(.log),然后就
往 Index 文件里面记录一个索引。 Index 为稀疏索引。默认为 4kb

文件清理策略

Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。

  • log.retention.hours,最低优先级,小时,默认 7 天。
  • log.retention.minutes,分钟。
  • log.retention.ms,最高优先级,毫秒。
  • log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。

那么日志一旦超过了设置的时间,怎么处理呢?
Kafka 中提供的日志清理策略有 delete 和 compact 两种。

delete 日志删除

delete 日志删除:将过期数据删除,调整参数:log.cleanup.policy = delete 设置所有数据启用删除策略。

基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。
基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。log.retention.bytes,默认为 -1,表示日志总大小无穷大。

compact 日志压缩

compact日志压缩:对于相同 key 的不同 value 值,只保留最后一个版本。
调整参数:log.cleanup.policy = compact 设置所有数据启用压缩策略。

压缩后的 offset 可能是不连续的,比如下图中没有 6,当从这些 offset 消费消息时,将会拿到比这个 offset 大的offset 对应的消息,实际上会拿到 offset 为 7 的消息,并从这个位置开始消费。

这种策略只适合特殊场景,比如消息的 key 是用户 ID,value 是用户的资料,通过这种压缩策略,整个消息
集里就保存了所有用户最新的资料。

image.png

高效读写数据

  1. Kafka 本身是分布式集群,可以采用分区技术,并行度高
  2. 读数据采用稀疏索引,可以快速定位要消费的数据
  3. 顺序写磁盘
  4. 页缓存 + 零拷贝技术

Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端, 为顺序写。
官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。
这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间
image.png


页缓存 + 零拷贝技术

零拷贝:Kafka 的数据加工处理操作交由 Kafka 生产者和 Kafka 消费者处理。Kafka Broker 应用层不关心存储的数据,所以就不用走应用层,传输效率高。
image.png

PageCache 页缓存: Kafka 重度依赖底层操作系统提供的 PageCache 功能。
当上层有写操作时, 操作系统只是将数据写入 PageCache。当读操作发生时,先从 PageCache 中查找,如果找不到,再去磁盘中读取。实际上 PageCache 是把尽可能多的空闲内存都当做了磁盘缓存来使用。

参数 描述
log.flush.interval.messages 强制页缓存刷写到磁盘的条数, 默认为 long 的最大值,
9223372036854775807。一般不建议修改,交给系统自己管理。
log.flush.interval.ms 每隔多久,刷页缓存中的数据到磁盘,默认是 null。
一般不建议修改,交给系统自己管理。