- kafka架构
- 架构深入
- 消息问题
- kafka生产者
- kafka消费者
- kafka高效读写数据
- zk在kafka的作用
- kafka事务
- 面试问题
- Kafka 中是怎么体现消息顺序性的?
- 4.Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?
- 5.Kafka生产者客户端使用了几个线程来处理?分别是什么?
- 6.消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?
- 7.消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?
- 8.有哪些情形会造成重复消费?
- 9.那些情景会造成消息漏消费?
- 10.当你使用kafka-topics.sh创建(删除)了一个topic之后,Kafka背后会执行什么逻辑?
- 11.topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?
- 12.topic的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?
- 13.Kafka有内部的topic吗?如果有是什么?有什么所用?
- 14.Kafka分区分配的概念?
- 15.简述Kafka的日志目录结构?
- 16.如果我指定了一个offset,Kafka Controller怎么查找到对应的消息?
- 17.聊一聊Kafka Controller的作用?
- 18.Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?
- 19.失效副本是指什么?有那些应对措施?
- 20.Kafka的哪些设计让它有如此高的性能?
- 21.Kafka的用途有哪些?使用场景如何?
- 22.聊一聊你对Kafka的Log Retention的理解
- 23.为什么选择Kafka?
- 24.KafkaConsumer是非线程安全的,那么怎么样实现多线程消费?
- 25.简述消费者与消费组之间的关系
- 26.创建topic时如何选择合适的分区数?
- 27.优先副本是什么?它有什么特殊的作用?
- 28.kafka过期数据清理?
- 29.Kafka中的幂等是怎么实现的
kafka架构
生产者、Broker、消费者、zk
注意:zk中保存broker id和消费者offsets等消息,但是没有生产者消息,0.9版本后offsets保存到broker端,因为zk不可以频繁读写,offset 已经直接维护在kafka集群的__consumer_offsets这个topic中。
kafak整体架构
kafka的zk存储结构
架构深入
kafka工作流程
Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic 的。
topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文 件,该 log 文件中存储的就是 producer 生产的数据。Producer 生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset。消费者组中的每个消费者,都会实时记录自己 消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。
文件存储机制
由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位 效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。每个 segment 对应两个文件——“.index”文件和“.log”文件。这些文件位于一个文件夹下,该文件夹的命名 规则为:topic 名称+分区序号。例如,first 这个 topic 有三个分区,则其对应的文件夹为 first- 0,first-1,first-2。
00000000000000000000.index
00000000000000000000.log
00000000000000000000.timeindex
00000000000000170410.index
00000000000000170410.log
00000000000000170410.timeindex
00000000000000239430.index
00000000000000239430.log
00000000000000239430.timeindex
index 和 log 文件以当前 segment 的第一条消息的 offset 命名。下图为 index 文件和 log 文件的结构示意图。
“.index”文件存储大量的索引信息,“.log”文件存储大量的数据,索引文件中的元 数据指向对应数据文件中 message 的物理偏移地址。
以稀疏索引(sparse index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(由 broker 端参数 log.index.interval.bytes 指定,默认值为 4096,即 4KB)的消息时,偏移量索引文件 和 时间戳索引文件 分别增加一个偏移量索引项和时间戳索引项,增大或减小 log.index.interval.bytes 的值,对应地可以缩小或增加索引项的密度。
稀疏索引通过 MappedByteBuffer 将索引文件映射到内存中,以加快索引的查询速度。它不会为每一条message都建立索引,而是每隔4k左右,建立一条索引。避免索引占用过多的空间,但是没有简历索引的offset不能一次定位到message的位置。需要做一次顺序扫描,但是扫面范围基本很小。
偏移量索引文件中的偏移量是单调递增的,查询指定偏移量时,使用二分查找法来快速定位偏移量的位置,如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量。
时间戳索引文件中的时间戳也保持严格的单调递增,查询指定时间戳时,也根据二分查找法来查找不大于该时间戳的最大偏移量,至于要找到对应的物理文件位置还需要根据偏移量索引文件来进行再次定位。
以偏移量索引文件来做具体分析。偏移量索引项的格式如下图所示。
(1) relativeOffset: 相对偏移量,表示消息相对于 baseOffset 的偏移量,占用 4 个字节(relativeOffset = offset - baseOffset),当前索引文件的文件名即为 baseOffset 的值。例如:一个日志片段的 baseOffset 为 32,那么其文件名就是 00000000000000000032.log,offset=35 的消息在索引文件中的 relativeOffset 的值为 35-32=3
(2) position: 物理地址,也就是消息在日志分段文件中对应的物理位置,占用 4 个字节。
如何通过offset找到对应的消息(图-根据offset找到message)?
- 先找到offset=3的message所在的segment文件(利用二分查找法),先判断.index文件名称offset(baseOffset)是否小于3。
若小于,则继续二分与下一个.index文件名称的offset比较
若大于,则返回上次小于3的.index文件,这里找到的就是在第一个segment文件。
- 找到segment中的.index文件,用查找的offset减去.index文件明德offset,也就是00000.index文件,我们要查找的offset为3的message在该.index文件内的索引为3
- 根据找到的相对offset为3的索引,确定message存储的物理偏移地址为756.
- 根绝物理偏移地址,去找.log文件找相应的message。
kafka的message存储采用了分区,磁盘顺序读写,分段和稀疏索引等一些手段来达到高效性。
参考链接:Kafka-工作流程,文件存储机制,索引机制,如何通过offset找到对应的消息
消息问题
消息丢失问题
1、生产者程序丢失数据
通常Kafka Producer 是异步发送消息的,发完立即返回,但不能认为消息发送已成功完成。
这种方式消息发送失败原因:网络抖动,消息没到Broker端;或者消息拒收,消息太大;Broker宕机等。
解决办法:Producer 永远要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。callback(回调)能够告诉你消息是否真的成功了。
Kafka 不认为这条消息属于已提交消息,故对它不做任何持久化保证。
2、消费者丢失数据
offset相当于书签。kafka消费者丢失数据基本上是先更新了书签100,但没读到书签位置(读到95),临时中止后接着从100读,就是消息丢失(96~99)。
解决办法:先消费消息(书签),再更新位移(书签)的顺序。这样能最大限度保证消息不丢失。
但这样可能重复。后续会讲。
还有一种比较隐蔽的消息丢失场景。
Consumer 程序从 Kafka 获取到消息后开启了多个线程异步处理消息,而 Consumer 程序自动地向前更新位移。假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于 Consumer 而言实际上是丢失了。
这里的关键在于 Consumer 自动提交位移,与你没有确认书籍内容被全部读完就将书归还类似,你没有真正地确认消息是否真的被消费就“盲目”地更新了位移。
解决办法:如果是多线程异步处理消费消息,Consumer 程序不要开启自动提交位移,而是要应用程序手动提交位移。在这里我要提醒你一下,单个 Consumer 程序使用多线程来消费消息说起来容易,写成代码却异常困难,因为你很难正确地处理位移的更新,也就是说避免无消费消息丢失很简单,但极易出现消息被消费了多次的情况。
3、最佳实践
- 看完这两个案例之后,我来分享一下 Kafka 无消息丢失的配置,每一个其实都能对应上面提到的问题。
- 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
- 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
- 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
- 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
- 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
- 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
- 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
- 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。
其实,Kafka 还有一种特别隐秘的消息丢失场景:增加主题分区。当增加主题分区后,在某段“不凑巧”的时间间隔后,Producer 先于 Consumer 感知到新增加的分区,而 Consumer 设置的是“从最新位移处”开始读取消息,因此在 Consumer 感知到新分区前,Producer 发送的这些消息就全部“丢失”了,或者说 Consumer 无法读取到这些消息。严格来说这是 Kafka 设计上的一个小缺陷,你有什么解决的办法吗?
解决办法:https://www.cnblogs.com/huxi2b/p/7089854.html,但总觉得不太完美。如果你想深入了解的话,推荐读一下Flink Kafka Connector的源码
kafka生产者
kafka发送消息流程
分区策略
- 分区原因
- 方便在集群中扩展,每个partition可以通过调整适应它所在的机器,而一个topic又可以有多个partition组成,因此整个集群就可以适应任意大小的数据了;
- 可以提高并发,因为可以以partition为单位读写了
分区原则
- 指明partition的情况下,直接将指明的值作为partition值;
- 没有指明但有key的情况下,将key的hash值与partition数取余得到partition值;
- 既没有partion值有没有key的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic可用的partition总数取余得到partition值,也就是常说的round-robin算法。
数据可靠性保障
为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到 producer 发送的数据后,都需要向 producer 发送 ack(acknowledgement 确认收到),如果 producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。
副本同步策略
半数以上完成同步就发送ack;优点:延迟低;缺点:选举新的leader时,容忍n台节点的故障,需要2n+1个副本
- 全部完成同步,才发送ack;优点:选举新的leader时,容忍n台节点的故障,需要n+1个副本;缺点:延迟高。
kafka默认选择了第二种方案,原因如下:
1.同样为了容忍n台节点故障,第一种方案需要2n+1个副本,而第二种方案只需要n+1个副本,而kafka的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
2.虽然第二种方案的网络延迟会比较高,但网络延迟对kafka影响较小。
ISR副本同步队列
采取第二种方案之后,设想以下情景:leader收到数据,所有follower都开始同步数据,但有一个follower因为某种故障,迟迟不能与leader进行同步,那leader就要一直等下去,直到它完成同步,才能发送ack。这个问题怎么解决呢?
Leader维护了一个动态的ISR(In Sync Replicas)副本同步队列,意为和 leader 保持同步的 follower 集合。ISR中包括Leader和Follower。如果Leader进程挂掉,会在ISR队列中选择一个服务器作为新的Leader。有replica.lag.max.messages(延迟条数)和replica.lag.time.max.ms(延迟时间)两个参数决定一台服务是否可以加入ISR副本队列,在0.10版本移除了replica.lag.max.messages参数,防止服务频繁的进去队列。
任意一个维度超过阈值都会把Follower踢出ISR,存入OSR(Outof Sync Replicas)队列,新加入的Follower也会先存放在OSR中。
ack应答机制
对于某些不太重要的数据,对数据的可靠性要求不高,能够容忍数据的少量丢失,没必要等ISR中的follower全部接收成功。kafka为用户提供了三种可靠性级别,用户根据可靠性和延迟的要求进行权衡,选择以下配置。
acks:
0:producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟,broker 一接收到还 没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;
1:producer 等待 broker 的 ack,partition 的 leader 落盘成功后返回 ack,如果在 follower 同步成功之前 leader 故障,那么将会丢失数据;如下图:
-1(all):producer 等待 broker 的 ack,partition 的 leader 和 follower 全部落盘成功后才 返回 ack。但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会 造成数据重复。如下图
故障处理细节
LEO(Log End Offset):指的是每个副本最大的offset;
HW(High watermark):指的是消费者能见到的最大的offset,ISR队列中最小的LEO。
- follower 故障
follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后,follower 会读取本地磁盘 记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。 等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重 新加入 ISR 了。
- leader 故障
leader 发生故障之后,会从 ISR 中选出一个新的 leader,之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader 同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
Exactly Once语义
将服务器的 ACK 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,即 At Least Once 语义。相对的,将服务器 ACK 级别设置为 0,可以保证生产者每条消息只会被 发送一次,即 At Most Once 语义。
At Least Once 可以保证数据不丢失,但是不能保证数据不重复;相对的,At Least Once 可以保证数据不重复,但是不能保证数据不丢失。但是,对于一些非常重要的信息,比如说 交易数据,下游数据消费者要求数据既不重复也不丢失,即 Exactly Once 语义。在 0.11 版 本以前的 Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局 去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。
0.11 版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论 向 Server 发送多少次重复数据,Server 端都只会持久化一条。幂等性结合 At Least Once 语 义,就构成了 Kafka 的 Exactly Once 语义。即:
At Least Once + 幂等性 = Exactly Once
要启用幂等性,只需要将 Producer 的参数中 enable.idompotence 设置为 true 即可。Kafka
的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在 初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而 Broker 端会对
但是 PID 重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨 分区跨会话的 Exactly Once。要保证全局Exactly Once,请看kafka事务。
kafka消费者
consumer 采用 pull(拉)模式从 broker 中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。
pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout。
分区分配策略
一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,所以必然会涉及
到 partition 的分配问题,即确定那个 partition 由哪个 consumer 来消费。
Kafka 有两种分配策略,一是 RoundRobin,一是 Range。
1.roundRobin(轮询分配):p0->c0,p1>c1,p2->c2,p3->c0,p4->c1,p5->c2,p6->c0
2.range(区间分配):7%3=1个多余(加到第一个分区,即p2),(7-1)/3=2(每个分配两个) , p0/p1/p2->c0,p3/p4->c1,p5/p6->c2,
3.Sticky(粘性分配):
1)分区的分配要尽可能的均匀;
2)分区的分配尽可能的与上次分配的保持相同。
3)当两者发生冲突时,第一个目标优先于第二个目标。
4)主要用来解决Rebalance问题
参考链接:Kafka Range、RoundRobin、Sticky 三种 分区分配策略区别
offset的维护
由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故
障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢
复后继续消费。Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中,从 0.9 版本开始,
consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets。
位移主题的消息格式为KV:
1.key保存
为什么保存这三个,主要是记录哪个消费者组消费了哪个主题的哪个分区;
2.value设计:1)消息体、2)用于保存 Consumer Group 信息的消息、3)用于保存 Consumer Group 信息的消息。
也许你会觉得消息体应该很简单,保存一个位移值就可以了。实际上,社区的方案要复杂得多,比如消息体还保存了位移提交的一些其他元数据,诸如时间戳和用户自定义的数据等。保存这些元数据是为了帮助 Kafka 执行各种各样后续的操作,比如删除过期位移消息等。但总体来说,我们还是可以简单地认为消息体就是保存了位移值。
第 2 种格式非常神秘,以至于你几乎无法在搜索引擎中搜到它的身影。不过,你只需要记住它是用来注册 Consumer Group 的就可以了。
第 2 种格式相对更加有名一些。它有个专属的名字:tombstone 消息,即墓碑消息,也称 delete mark。下次你在 Google 或百度中见到这些词,不用感到惊讶,它们指的是一个东西。这些消息只出现在源码中而不暴露给你。它的主要特点是它的消息体是 null,即空消息体。
那么,何时会写入这类消息呢?一旦某个 Consumer Group 下的所有 Consumer 实例都停止了,而且它们的位移数据都已被删除时,Kafka 会向位移主题的对应分区写入 tombstone 消息,表明要彻底删除这个 Group 的信息。
位移主题是怎么被创建的?
通常来说,当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题。我们说过,位移主题就是普通的 Kafka 主题,那么它自然也有对应的分区数。但如果是 Kafka 自动创建的,分区数是怎么设置的呢?这就要看 Broker 端参数 offsets.topic.num.partitions 的取值了。它的默认值是 50,因此 Kafka 会自动创建一个 50 分区的位移主题。如果你曾经惊讶于 Kafka 日志路径下冒出很多 __consumer_offsets-xxx 这样的目录,那么现在应该明白了吧,这就是 Kafka 自动帮你创建的位移主题啊。你可能会问,除了分区数,副本数或备份因子是怎么控制的呢?答案也很简单,这就是 Broker 端另一个参数 offsets.topic.replication.factor 要做的事情了。它的默认值是 3。总结一下,如果位移主题是 Kafka 自动创建的,那么该主题的分区数是 50,副本数是 3。
如果你选择的是自动提交位移,那么就可能存在一个问题:只要 Consumer 一直启动着,它就会无限期地向位移主题写入消息。
那如何手动提交位移呢?
Consumer 端有个参数叫 enable.auto.commit,如果值是 true,则 Consumer 在后台默默地为你定期提交位移,提交间隔由一个专属的参数 auto.commit.interval.ms 来控制。自动提交位移有一个显著的优点,就是省事,你不用操心位移提交的事情,就能保证消息消费不会丢失。但这一点同时也是缺点。因为它太省事了,以至于丧失了很大的灵活性和可控性,你完全没法把控 Consumer 端的位移管理。事实上,很多与 Kafka 集成的大数据框架都是禁用自动提交位移的,如 Spark、Flink 等。这就引出了另一种位移提交方式:手动提交位移,即设置 enable.auto.commit = false。一旦设置了 false,作为 Consumer 应用开发的你就要承担起位移提交的责任。Kafka Consumer API 为你提供了位移提交的方法,如 consumer.commitSync 等。当调用这些方法时,Kafka 会向位移主题写入相应的消息。
如何防止消息过多撑爆磁盘?
我们来举个极端一点的例子。假设 Consumer 当前消费到了某个主题的最新一条消息,位移是 100,之后该主题没有任何新消息产生,故 Consumer 无消息可消费了,所以位移永远保持在 100。由于是自动提交位移,位移主题中会不停地写入位移 =100 的消息。显然 Kafka 只需要保留这类消息中的最新一条就可以了,之前的消息都是可以删除的。这就要求 Kafka 必须要有针对位移主题消息特点的消息删除策略,否则这种消息会越来越多,最终撑爆整个磁盘。
Kafka 是怎么删除位移主题中的过期消息的呢?答案就是 Compaction。国内很多文献都将其翻译成压缩,我个人是有一点保留意见的。在英语中,压缩的专有术语是 Compression,它的原理和 Compaction 很不相同,我更倾向于翻译成压实,或干脆采用 JVM 垃圾回收中的术语:整理。
不管怎么翻译,Kafka 使用 Compact 策略来删除位移主题中的过期消息,避免该主题无限期膨胀。那么应该如何定义 Compact 策略中的过期呢?对于同一个 Key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息。Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起。我在这里贴一张来自官网的图片,来说明 Compact 过程。
图中位移为 0、2 和 3 的消息的 Key 都是 K1。Compact 之后,分区只需要保存位移为 3 的消息,因为它是最新发送的。
Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。这个后台线程叫 Log Cleaner。很多实际生产环境中都出现过位移主题无限膨胀占用过多磁盘空间的问题,如果你的环境中也有这个问题,我建议你去检查一下 Log Cleaner 线程的状态,通常都是这个线程挂掉了导致的。
kafka高效读写数据
- 顺序写磁盘
Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,
为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这
与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
- 零拷贝技术
zk在kafka的作用
Kafka 集群中有一个 broker 会被选举为 Controller,负责管理集群 broker 的上下线,所有 topic 的分区副本分配和 leader 选举等工作。
Controller 的管理工作都是依赖于 Zookeeper 的。
以下为 partition 的 leader 选举过程:
kafka事务
Kafka 从 0.11 版本开始引入了事务支持。事务可以保证 Kafka 在 Exactly Once 语义的基
础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。
扩展参考链接:https://blog.csdn.net/muyimo/article/details/91439222
producer事务
为了实现跨分区跨会话的事务,需要引入一个全局唯一的 Transaction ID,并将 Producer
获得的PID 和Transaction ID 绑定。这样当Producer 重启后就可以通过正在进行的 Transaction
ID 获得原来的 PID。
为了管理 Transaction,Kafka 引入了一个新的组件 Transaction Coordinator。Producer 就
是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态。Transaction
Coordinator 还负责将事务所有写入 Kafka 的一个内部 Topic(__transaction_state),这样即使整个服务重启,由于
事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
Consumer 事务
上述事务机制主要是从 Producer 方面考虑,对于 Consumer 而言,事务的保证就会相对
较弱,尤其时无法保证 Commit 的信息被精确消费。这是由于 Consumer 可以通过 offset 访
问任意信息,而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被
删除的情况。
面试问题
Kafka 中是怎么体现消息顺序性的?
- Kafka只能保证分区内消息顺序有序,无法保证全局有序
- 生产者:通过分区的leader副本负责数据顺序写入,来保证消息顺序性
- 消费者:同一个分区内的消息只能被一个group里的一个消费者消费,保证分区内消费有序
- 为什么做不到全局有序?
- 因为消息会发送到不一样的分区,分区之间发送的顺序是无法保证的
- 如何做到全局有序?
- topic只有一个分区。这样保证有序,但抹杀了kafka的优秀特性。
- topic有多个分区,自定义发送端的分区策略(比如用户id做分区键)。
4.Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?
如图kafka发送消息流程
Kafka通过生产者KafkaProducer的send()方法将消息发送到broker中,但在发送过程中需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往broker。消息在经过序列化后需要确定它发往的分区,如果消息ProducerRecord中指定了partition字段,那么就不需要分区器的作用,因为partition代表的就是所要发往的分区号
拦截器 -> 序列化器 -> 分区器5.Kafka生产者客户端使用了几个线程来处理?分别是什么?
如图kafka发送消息流程
整个生产者客户端主要有两个线程,主线程以及Sender线程。Producer在主线程中产生消息,然后通过拦截器,序列化器,分区器之后缓存到消息累加器RecordAccumulator中。Sender线程从RecordAccumulator中获取消息并发送到kafka中。6.消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?
不完全正确,consumer可以指定消费分区,但指定后会变成独立的消费者(standalone consumer)。7.消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?
offset+1,是它下次消费的起始点,拉取的会从这个开始消费,消费者端和服务端的offset不一样,消费者的是commited offset,表示已经提交过的消费位移。8.有哪些情形会造成重复消费?
kafka的重复消费问题原因在于,已经消费了数据,但是offset没来得及提交(比如Kafka没有或者不知道该数据已经被消费)。9.那些情景会造成消息漏消费?
先提交offset,后消费,有可能造成数据的重复10.当你使用kafka-topics.sh创建(删除)了一个topic之后,Kafka背后会执行什么逻辑?
1)会在zookeeper中的/brokers/topics节点下创建一个新的topic节点,如:/brokers/topics/first
2)触发Controller的监听程序
3)kafka Controller 负责topic的创建工作,并更新metadata cache11.topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?
可以增加
bin/kafka-topics.sh —zookeeper localhost:2181/kafka —alter —topic topic-config —partitions 312.topic的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?
不可以减少,现有的分区数据难以处理。13.Kafka有内部的topic吗?如果有是什么?有什么所用?
有
__consumer_offsets
保存消费者offset14.Kafka分区分配的概念?
一个topic多个分区,一个消费者组多个消费者,故需要将分区分配个消费者(roundrobin、range)
采用RoundRobin是面向组的,可能导致的问题是,同一个组里面的不同的消费者可以订阅不同的主题,因为是采用轮询的策略,这样配置会导致无效
考虑range是面向主题的,这种策略的问题是可能会导致负载不均。15.简述Kafka的日志目录结构?
每个partition一个文件夹,包含四类文件.index .log .timeindex leader-epoch-checkpoint
.index .log .timeindex 三个文件成对出现 前缀为上一个segment的最后一个消息的偏移16.如果我指定了一个offset,Kafka Controller怎么查找到对应的消息?
通过文件名前缀数字x找到该绝对offset 对应消息所在文件
offset-x为在文件中的相对偏移
通过index文件中记录的索引找到最近的消息的位置
从最近位置开始逐条寻找17.聊一聊Kafka Controller的作用?
负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。18.Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?
partition leader(ISR),controller(先到先得)19.失效副本是指什么?有那些应对措施?
不能及时与leader同步
暂时踢出ISR,等其追上leader之后再重新加入20.Kafka的哪些设计让它有如此高的性能?
分区,顺序写磁盘,0-copy21.Kafka的用途有哪些?使用场景如何?
异步处理、日常系统解耦、削峰、提速、广播
如果再说具体一点例如:消息,网站活动追踪,监测指标,日志聚合,流处理,事件采集,提交日志等22.聊一聊你对Kafka的Log Retention的理解
kafka留存策略包括 删除和压缩两种
删除: 根据时间和大小两个方式进行删除 大小是整个partition日志文件的大小,超过的会从老到新依次删除 时间指日志文件中的最大时间戳而非文件的最后修改时间
压缩: 相同key的value只保存一个 压缩过的是clean 未压缩的dirty 压缩之后的偏移量不连续 未压缩时连续23.为什么选择Kafka?
吞吐量高,大数据消息系统唯一选择。24.KafkaConsumer是非线程安全的,那么怎么样实现多线程消费?
每个线程维护一个KafkaConsumer
维护一个或多个KafkaConsumer,同时维护多个事件处理线程(worker thread)25.简述消费者与消费组之间的关系
消费者从属与消费组,消费偏移以消费组为单位。每个消费组可以独立消费主题的所有数据,同一消费组内消费者共同消费主题数据,每个分区只能被同一消费组内一个消费者消费。26.创建topic时如何选择合适的分区数?
创建一个只有1个分区的topic
测试这个topic的producer吞吐量和consumer吞吐量。
假设他们的值分别是Tp和Tc,单位可以是MB/s。
然后假设总的目标吞吐量是Tt,那么分区数=Tt / max(Tp,Tc)27.优先副本是什么?它有什么特殊的作用?
优先副本 会是默认的leader副本 发生leader变化时重选举会优先选择优先副本作为leader28.kafka过期数据清理?
日志清理保存的策略只有delete和compact两种
log.cleanup.policy=delete启用删除策略
log.cleanup.policy=compact启用压缩策略29.Kafka中的幂等是怎么实现的
Producer的幂等性指的是当发送同一条消息时,数据在Server端只会被持久化一次,数据不丟不重,但是这里的幂等性是有条件的:
1)只能保证Producer在单个会话内不丟不重,如果Producer出现意外挂掉再重启是无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重)。
2)幂等性不能跨多个Topic-Partition,只能保证单个Partition内的幂等性,当涉及多个 Topic-Partition时,这中间的状态并没有同步。