1. 组件

1.1. Broker

1.1.1. Leader

  • broker中的一个会被选举为Leader,进行写操作
  • 选举逻辑

    • Controller初始化时也会初始化四种选举Leader的策略
      • OfflinePartitionLeaderSelector leader 掉线时触发
      1. 如果 isr 中至少有一个副本是存活的,那么从该 Partition 存活的 isr 中选举第一个副本作为新的 leader,存活的 isr 作为新的 isr;
      2. 否则,如果脏选举(unclear elect)是禁止的,那么就抛出 NoReplicaOnlineException 异常;
      3. 否则,即允许脏选举的情况下,从存活的、所分配的副本(不在 isr 中的副本)中选出一个副本作为新的 leader 和新的 isr 集合;否则,即是 Partition 分配的副本没有存活的,抛出 NoReplicaOnlineException 异常;
      4. 一旦 leader 被成功注册到 zk 中,它将会更新到 KafkaController 缓存中的 allLeaders 中。
      • ReassignedPartitionLeaderSelector 分区的副本重新分配数据同步完成后触发的 ReassignedPartitionLeaderSelector 是在 Partition 副本迁移后,副本同步完成(RAR 都处在 isr 中,RAR 指的是该 Partition 新分配的副本)后触发的,其 leader 选举逻辑如下:
      1. leader 选择存活的 RAR 中的第一个副本,此时 RAR 都在 isr 中了;
      2. new isr 是所有存活的 RAR 副本列表;
      • PreferredReplicaPartitionLeaderSelector 最优 leader 选举
        手动触发或自动 leader 均衡调度时触发
        选择 AR(assign replica)中的第一个副本作为 leader,前提是该 replica 在是存活的、并且在 isr 中,否则会抛出 StateChangeFailedException 的异常。
      • ControlledShutdownLeaderSelector
        Leader发送 ShutDown 请求主动关闭服务时触发
        ControlledShutdownLeaderSelector 是在处理 Leader 下线时调用的 leader 选举方法,它会选举 isr 中第一个没有正在关闭的 replica 作为 leader,否则抛出 StateChangeFailedException 异常。

        1.1.2. Follower

        分区的所有除Leader节点之外的节点,是Follower节点。也就是说Follower节点是与分区相关的。一个Broker可以是一个topic分区的Leader节点,也可以是其他topic分区的Follower节点

        1.1.3. Controller

  • Kafka集群中的其中一个Broker会被选举为Controller,主要负责Partition管理和副本状态管理,也会执行类似于重分配Partition之类的管理任务。如果当前的Controller失败,会从其他正常的Broker中重新选举Controller。具体职责描述如下:

    • 当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。
    • 当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。
    • 当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配
  • Controller的选举及切换流程
    • 依赖Zookeeper来实现的,在Kafka集群中哪个broker能够成功创建/controller这个临时(EPHEMERAL)节点他就可以成为Kafka Controller。
    • 在任意时刻,集群中有且仅有一个控制器。每个broker启动的时候会去尝试去读取/controller节点的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其它broker节点成功竞选为控制器,所以当前broker就会放弃竞选;如果Zookeeper中不存在/controller这个节点,或者这个节点中的数据异常,那么就会尝试去创建/controller这个节点,当前broker去创建节点的时候,也有可能其他broker同时去尝试创建这个节点,只有创建成功的那个broker才会成为控制器,而创建失败的broker则表示竞选失败。每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerId
    • Zookeeper中还有一个与控制器有关的/controller_epoch节点,这个节点是持久(PERSISTENT)节点,节点中存放的是一个整型的controller_epoch值。controller_epoch用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器,我们也可以称之为“控制器的纪元”。controller_epoch的初始值为1,即集群中第一个控制器的纪元为1,当控制器发生变更时,每选出一个新的控制器就将该字段值加1。每个和控制器交互的请求都会携带上controller_epoch这个字段,如果请求的controller_epoch值小于内存中的controller_epoch值,则认为这个请求是向已经过期的控制器所发送的请求,那么这个请求会被认定为无效的请求。如果请求的controller_epoch值大于内存中的controller_epoch值,那么则说明已经有新的控制器当选了。由此可见,Kafka通过controller_epoch来保证控制器的唯一性,进而保证相关操作的一致性
    • 当/controller节点的数据发生变化时,每个broker都会更新自身内存中保存的activeControllerId。如果broker在数据变更前是控制器,那么如果在数据变更后自身的brokerid值与新的activeControllerId值不一致的话,那么就需要“退位”,关闭相应的资源,比如关闭状态机、注销相应的监听器等。有可能控制器由于异常而下线,造成/controller这个临时节点会被自动删除;也有可能是其他原因将此节点删除了。
    • 当/controller节点被删除时,每个broker都会进行选举,如果broker在节点被删除前是控制器的话,在选举前还需要有一个“退位”的动作。如果有特殊需要,可以手动删除/controller节点来触发新一轮的选举。当然关闭控制器所对应的broker以及手动向/controller节点写入新的brokerid的所对应的数据同样可以触发新一轮的选举
  • 补充

    • 在Kafka的早期版本中,并没有采用Kafka Controller这样一个概念来对分区和副本的状态进行管理,而是依赖于Zookeeper,每个broker都会在Zookeeper上为分区和副本注册大量的监听器(Watcher)。当分区或者副本状态变化时,会唤醒很多不必要的监听器,这种严重依赖于Zookeeper的设计会有脑裂、羊群效应以及造成Zookeeper过载的隐患。在目前的新版本的设计中,只有Kafka Controller在Zookeeper上注册相应的监听器,其他的broker极少需要再监听Zookeeper中的数据变化,这样省去了很多不必要的麻烦。不过每个broker还是会对/controller节点添加监听器的,以此来监听此节点的数据变化(参考ZkClient中的IZkDataListener)

      1.2. Partition

      1.3. Topic

      1.4. Consumer

      2. 高性能

      Kafka的高性能是靠哪些机制来实现的呢?

      2.1. 零拷贝

      sendfile系统调用
  • 常规数据从本地到网卡的拷贝路径:

硬盘 —> 内核buffer —> 用户buffer —> 内核socket缓冲区 —> TCP协议栈

  • sendfile零拷贝:

硬盘 —> 内核buffer —> 内核socket缓冲区 —> TCP协议栈
减少了一次从内核buffer到用户buffer、用户buffer到内核socket缓冲区这两次系统调用

2.2. mmap

内核开辟一块与用户共享的空间,在这块用户空间的操作可以直接被内核感知,而不需要依赖于PageCache

2.3. 顺序I/O

Kafka利用分段、追加日志的方式,在很大程度上将读写限制为顺序I/O(sequential I/O),可以减少磁盘寻址及内存消耗

2.4. 依赖于操作系统内核的数据落地机制

  • Kafka在进行数据落地磁盘之前,并不会主动调用内核的fsync系统调用,主动进行数据的落地。而是依赖于内核提供的数据落地机制。这样可以保证每次的数据都是一整个I/O缓冲区刷写到磁盘的,因此效率相对较高。
  • 但是,这种形式的写入是不安全的,因为副本的出错可能导致数据丢失,即使记录似乎已经被ACK。换句话说,与关系型数据库不同,仅写入缓冲区并不意味着持久性。
  • 保证Kafka持久性的是运行几个同步的副本。即使其中一个出错了,其他的(假设不止一个)将继续运行——假设出错的原因不会导致其他的副本也出错。
  • 因此,无fsync的非阻塞I/O方法和冗余的同步副本组合为Kafka提供了高吞吐、持久性和可用性。

    2.5. AKF扩展立方体

  • X轴全量拷贝扩展:Broker

  • Y轴业务拆分数据:Topic
  • Z轴业务数据再拆分:Partition

    2.6. offset存储的改变

    • 从kafka 0.9开始,不再是zookeeper来按分区存储有关每个groupid在主题上消耗的偏移量的信息。
    • Kafka现在将此信息存储在名为__consumer_offsets的主题上。

      2.7. 记录的批处理

      Kafka的客户端和服务端会在一个批处理中积累多个记录——包括读写记录,然后在通过网络发送出去。记录的批处理可以缓解网络往返的开销,使用更大的数据包,提高带宽的效率。

      2.8. 批量压缩

      当启用压缩时,对批处理的影响特别明显,因为随着数据大小的增加,压缩通常会变得更有效。特别是在使用基于文本的格式时,比如JSON,压缩的效果会非常明显,压缩比通常在5x到7x之间。此外,记录的批处理主要作为一个客户端操作,负载在传递的过程中,不仅对网络带宽有积极影响,而且对服务端的磁盘I/O利用率也有积极影响。

      2.9. 消费者组

  • Kafka中的消费者是以消费者组为单位的,当一个Topic被分为多个Partition时,可以为这个Topic指定多个消费者,并行地对多个Partition同时进行消费,提高上层应用的读取能力

    3. 可靠性

  • 安全性

    • retry机制保证数据会落地,但是会出现重复记录。
      • retries设置的次数不包含第一次的发送,后续会再发指定的次数
    • ACK的值及关系。由request.required.acks参数指定
      • 1(默认): 数据发送到Kafka后,经过leader成功接收消息的的确认,就算是发送成功了。在这种情况下,如果leader宕机了,则会丢失数据。
      • 0 :生产者将数据发送出去就不管了,不去等待任何返回。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
      • -1 :producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。当ISR中所有Replica都向Leader发送ACK时,leader才commit,这时候producer才能认为一个请求中的消息都commit了。
    • min.insync.replicas参数

这个是说,ISR列表中最少有几个机器,如果低于这个数字,那么写服务将不可用

  • 幂等性

0.11.0.0版本以后

  • 为了解决retry机制的重复记录,需要开启幂等性
    • kafka会给每个生产者生成一个唯一的id:ProducerID/PID。PID和序列号与消息捆绑在一起,然后发送给Broker。由于序列号从零开始并且单调递增,因此,仅当消息的序列号比该PID / TopicPartition对中最后提交的消息正好大1时,Broker才会接受该消息。如果不是这种情况,则Broker认定 是生产者重新发送该消息。
  • enable.idempotence= false默认
    • 注意:在使用幂等性的时候,要求必须开启retries=true和acks=all
    • Kafka的幂等性,只能保证一条 记录的在分区发送的原子性,但是如果要(多分区)之间的完整性,这个时候就需要开启kafk的事务操作。
      • 事务控制
  • 一般来说默认消费者消费的消息的级别是read_uncommited数据,这有可能读取到事务失败的数据,所有在开启生产者事务之后,需要用户设置消费者的事务隔离级别。
  • 参数isolation.level = read_uncommitted 默认。该选项有两个值read_committedlread 和uncommitted, 如果开始事务控制,消费端必须将事务的隔离级别设置为read_committed
  • 开启的生产者事务的时候,只需要指定transactional.id属性即可,一旦开启了事务, 默认生产者就已经开启了幂等性。但是要求”transactional.id”的取值必须是唯一的,同一时刻只能有一个”transactional.id”存在,其他的将会被关闭。
  • 生产者Only:只往一个topic中写入消息
  • 此次事务是需要生产者控制提交或者回滚
  • 消费者&生产者:既从一个topic中读取消息,同时又往另一个topic中写入消息。此次事务需要提交消费者的消费offset,有需要对输出到下一个topic中的事务进行提交或回滚。注意:必须关闭消费者端的自动提交offset,才能自己手动提交
    • 架构原理

Kafka架构原理

4. 高可用

Controller
Coordinator
Broker
ISR
AR
RAR

5. 数据同步机制