Apache Kafka 是消息引擎系统,也是一个分布式流处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。

基础概念

在 Kafka 中,发布订阅的对象是主题(Topic),Topic 只是一个逻辑概念,代表了一类消息,因此你可以为每个业务、每个应用甚至是每类数据都创建专属的 Topic。生产者(Producer)负责将消息发送到特定的主题,而消费者(Consumer)负责订阅主题并进行消费。我们把生产者和消费者统称为客户端(Clients)。

有客户端自然也就有服务器端。Kafka 的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群通常由多个 Broker 组成,Broker 负责接收和处理客户端发送过来的请求,以及对消息进行持久化。每个 Broker 拥有一个 Controller,基于 Zookeeper 做集群 Controller Leader 选举,以及存储集群核心元数据,Leader Controller 负责管理整个集群。通常将不同的 Broker 分散运行在不同机器上,这样如果集群中某一台机器宕机,其他机器上的 Broker 也依然能够对外提供服务。这其实就是 Kafka 提供高可用的手段之一。

1. 副本

Kafka 实现高可用的另一个手段就是备份机制。备份机制就是把相同的数据拷贝到多台机器上,而这些相同的数据拷贝在 Kafka 中被称为副本(Replica)。副本的数量是可以配置的,通过增加副本数量可以提升容灾能力。这些副本保存着相同的数据,但却有不同的角色和作用。在 Kafka 中定义了两种类型的副本:

  • 领导者副本(Leader Replica):与客户端程序进行交互。生产者总是向领导者副本写消息;而消费者总是从领导者副本读消息。

  • 追随者副本(Follower Replica):向领导者副本发送请求,请求领导者把最新生产的消息发给它,以保持与领导者的同步。唯一作用是充当 Leader 的候补。

你可能知道在很多其他系统中追随者副本是可以对外提供服务的,比如 MySQL 的从库是可以处理读操作的,但是在 Kafka 中追随者副本不会对外提供服务。主要原因有以下几点:

  • 如果允许 Follower 副本对外提供读服务,首先会存在数据一致性的问题,消息从主节点同步到从节点的延迟会造成主从节点的数据不一致。


  • 其次,主写从读无非就是为了减轻 Leader 节点的压力,将读请求的负载均衡到 Follower 节点,如果 Kafka的分区相对均匀地分散到各个 Broker 上,同样可以达到负载均衡的效果。


  • 最后,社区正在考虑引入适度的读写分离方案,比如允许某些指定的 Follower 副本(主要是为了考虑地理相近性)可以对外提供读服务。当然目前这个方案还在讨论阶段。

Kafka 会保证同一个分区的多个副本一定不会分配在同一台 Broker 上。当 Leader 副本出现故障时,Kafka 会从 Follower 副本中重新选举新的 Leader 副本对外提供服务。Kafka 通过多副本机制实现了故障的自动转移,当集群中某个 broker 失效时仍然能够保证服务的可用性。

Kafka 消费端也具备一定的容灾能力。Consumer 使用拉(Pull)模式从服务端拉取消息,并且保存消费的具体位置,当消费者宕机重启后可以根据之前保存的消费位置重新拉取需要的消息进行消费,不会造成消息丢失。

2. 分区

虽然多副本机制可以保证数据的持久化或消息不丢失,但没有解决伸缩性的问题。倘若领导者副本积累了太多数据以至于单台 Broker 机器都无法容纳了,那该怎么办呢?一个很自然的想法就是,能否把数据分割成多份保存在不同的 Broker 上?而 Kafka 也正是这么设计的。这种机制就是所谓的分区机制。

Kafka 中的分区机制指的是将每个主题划分成多个分区(Partition),每个分区是一组有序的消息日志。生产者生产的每条消息只会被发送到一个分区中,也就是说如果向一个双分区的主题发送一条消息,这条消息要么在分区 0 中,要么在分区 1 中。Kafka 的分区编号是从 0 开始依次递增的。
image.png
分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志、文件的时候都会分配一个特定的偏移量(offset)。offset 是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区有序而不是主题有序。

每一条消息被发送到 broker 前,会根据分区规则选择存储到哪个具体的分区。如果分区规则设定得合理,所有的消息都可以均匀地分配到不同的分区中。在创建主题时可以通过指定的参数来设置分区的个数,当然也可以在主题创建完成之后去修改分区的数量,通过增加分区的数量可以实现水平扩展。

Kafka 中的分区可以分布在不同的 broker 上,即一个主题可以横跨多个 broker,以此来提供比单个 broker 更强大的性能。此外,每个分区下还可以配置若干个副本,其中只能有 1 个领导者副本和 N-1 个追随者副本。至此我们能够完整地串联起 Kafka 的三层消息架构:

  • 第一层是主题层,主题是逻辑概念,每个主题可配置 M 个分区,而每个分区又可以配置 N 个副本。
  • 第二层是分区层,每个分区的 N 个副本中只能有一个充当领导者角色,对外提供服务;其他 N-1 个副本是追随者副本,只是提供数据冗余之用
  • 第三层是消息层,分区中包含若干条消息,每条消息的位移从 0 开始,依次递增。
  • 最后,客户端程序只能与分区的领导者副本进行交互。

3. 消费者模型

Kafka 引入了消费者组(Consumer Group)的概念。所谓的消费者组,指的是多个消费者实例共同组成一个组来消费一组主题。这组主题中的每个分区都只会被组内的一个消费者实例消费,其他消费者实例不能消费它。消费者组主要是为了提升消费者端的吞吐量,可以让多个消费者实例同时消费,加速整个消费端的吞吐量。

一个消费者组内的所有消费者实例不仅会瓜分订阅主题的数据,而且它们之间还能彼此协助。假设组内某个实例挂掉了,Kafka 能够自动检测到,然后把这个 Failed 实例之前负责的分区转移给其他活着的消费者。这个过程就称之为消费者组重平衡(Rebalance)。

每个消费者在消费消息的过程中会记录它当前消费到了分区的哪个位置上,这个字段就是消费者位移(Consumer Offset)。它和上面说到的分区位移不是一个概念。分区位移表征的是分区内的消息位置,它是不变的,即一旦消息被成功写入到一个分区上,它的位移值就是固定的了。而消费者位移则可能是随时变化的,毕竟它是消费者消费进度的指示器嘛。
image.png

4. 持久化

Kafka 使用消息日志来保存数据,一个日志就是磁盘上一个只能追加写消息的物理文件。追加写入避免了缓慢的随机 I/O 操作,改为性能较好的顺序 I/O 写操作,这也是实现 Kafka 高吞吐量特性的一个重要手段。

不过如果不停地向一个日志写入消息,最终也会耗尽所有的磁盘空间,因此 Kafka 必然要定期地删除消息以回收磁盘。怎么删除呢?简单来说就是通过日志段(Log Segment)机制。在 Kafka 底层,一个日志又进一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka 会自动切分出一个新的日志段,并将老的日志段封存起来。Kafka 在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的。

服务端参数配置

1. broker.id

broker.id 用来指定 Kafka 集群中 broker 的唯一标识。broker 启动时会在 ZooKeeper 中的 /brokers/ids 路径下创建一个以当前 broker.id 为名称的临时节点,broker 的健康状态检查就依赖于此节点。当 broker 下线时该节点会自动删除,其他 broker 节点或客户端通过判断 /brokers/ids 路径下是否有此 broker 的 broker.id 节点来确定该 broker 的健康状态。

自动生成
如果没有设置该参数,Kafka 会自动生成一个。生成策略与 broker.id.generation.enable reserved.broker.max.id 这两个参数有关。前者用来配置是否开启自动生成 brokerId 的功能,默认为 true。后者用来配置自动生成的 brokerId 的基准值,默认值为 1000,即自动生成的 brokerId 从 1001 开始。

2. zookeeper.connect

ZooKeeper 是一个分布式协调框架,负责协调管理并保存 Kafka 集群的所有元数据信息,比如集群都有哪些 Broker 在运行、创建了哪些 Topic,每个 Topic 都有多少分区以及这些分区的 Leader 副本都在哪些机器上等信息。Kafka 与 ZooKeeper 相关的最重要的参数就是 zookeeper.connect 配置了。

该参数指明 broker 要连接的 ZooKeeper 集群的服务地址,如果 ZooKeeper 集群中有多个节点,则可以用逗号将每个节点隔开,类似于 localhost1:2181,localhost2:2181,localhost3:2181 这种格式。最佳的实践方式是再加一个 chroot 路径,这样既可以明确指明该 chroot 路径下的节点是为 Kafka 所用的,也可以实现多个 Kafka 集群复用一套 ZooKeeper 集群,这样可以节省更多的硬件资源。

chroot 类似于别名,包含 chroot 路径的配置类似 localhost1:2181, localhost2:2181, localhost3:2181/kafka 这种格式,如果不指定 chroot,那么会默认使用 ZooKeeper 的根路径。切记 chroot 只需要写一次,而且是加到配置的最后。

3. log.dir、log.dirs

Kafka 把所有的消息都保存在磁盘上,而这两个参数用来配置 Kafka 日志文件存放的根目录:

  • log.dirs:配置 Broker 需要使用的多个根目录,逗号分割。该参数没有默认值,必须亲自指定。
  • log.dir:配置 Broker 需要使用的单个根目录,是补充上一个参数用的。

Kafka 并没有对这两个参数做强制性限制,也就是说,log.dir 和 log.dirs 都可以用来配置单个或多个根目录。但是 log.dirs 的优先级比 log.dir 高,如果没有配置 log.dirs,则会以 log.dir 配置为准。建议你在生产环境中设置 log.dirs 选项并配置多个路径,如果有条件的话最好将这些目录挂载到不同的物理磁盘上。优点如下:

  • 提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
  • 能够实现故障转移:这是 Kafka 1.1 版本新引入的强大功能。以前只要 Kafka Broker 使用的任何一块磁盘挂掉了,整个 Broker 进程都会关闭。但是自 1.1 版本开始,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且 Broker 还能正常工作。

    4. listener

    该参数指明 broker 监听客户端连接的地址列表,即为客户端要连接 broker 的入口地址列表。如果有多个地址则中间以逗号隔开,具体配置格式为:

    1. protocol1://{hostname1}:{port1},protocol2://{hostname2}:{port2}
  • protocol 代表协议类型,Kafka 当前支持的协议类型有 PLAINTEXT(明文传输)、SSL(使用 SSL 或 TLS 加密传输)、SASL_SSL 等。如果未开启安全认证,则使用简单的 PLAINTEXT 即可。

  • hostname 代表主机名,port 代表服务端口。

如果不指定主机名,则表示绑定默认网卡,注意有可能会绑定到 127.0.0.1,这样无法对外提供服务,所以主机名最好不要为空;如果主机名是 0.0.0.0,则表示绑定所有的网卡。与此参数关联的还有 advertised.listeners 参数,其作用和 listeners 类似。Advertised 的含义为公开的,就是说这组监听器是 Broker 用于对外发布的,主要是为外网访问用的。如果 clients 在内网环境下访问 Kafka 就不需要配置这个参数了。

5. message.max.bytes

该参数用来指定 broker 所能接收消息的最大值,默认 1MB。如果 Producer 发送的消息大于这个参数所设置的值,那么 Producer 就会抛出一个 RecordTooLargeException 的异常。如果需要修改这个参数,那么还要考虑 max.request.size(客户端参数)、max.message.bytes(topic 端参数)等参数的影响。为了避免修改此参数 而引起级联的影响,建议在修改此参数之前考虑分拆消息的可行性。

6. unclean.leader.election.enable

该参数用来控制是否允许 Unclean Leader 选举,默认值为 false。何谓 Unclean?我们知道 Kafka 的每个分区都有多个副本来提供高可用。在这些副本中只能有一个副本对外提供服务,即所谓的 Leader 副本。那其他副本都有资格竞争 Leader 吗?显然不是,只有保存数据比较多的那些副本才有资格竞选,那些落后进度太多的副本没资格做这件事。

假设那些保存数据比较多的副本都挂了怎么办?我们还要不要进行 Leader 选举了?如果设置成 false,那么就坚持之前的原则,坚决不能让那些落后太多的副本竞选 Leader。这样做的后果是这个分区就不可用了,因为没有 Leader 了。反之,Kafka 允许你从那些“跑得慢”的副本中选一个出来当 Leader。这样做的后果是数据有可能就丢失了,因为这些副本保存的数据本来就不全,当了 Leader 之后它认为自己的数据才是权威的。

7. auto.leader.rebalance.enable

该参数表示是否允许定期进行 Leader 选举,默认为 true。为 true 表示允许 Kafka 定期地对一些 Topic 分区进行 Leader 重选举,也就是更换当前的 Leader,当然这个重选举是要满足一定的条件才会发生。

比如 Leader A 一直表现得很好,但如果该参数设置为 true,那么有可能一段时间后 Leader A 就要被强行卸任换成 Leader B。更换一次 Leader 代价是很高的,原本向 A 发送请求的所有客户端都要切换成向 B 发送请求,而且这种换 Leader 本质上没有任何性能收益,因此建议在生产环境中把这个参数设置成 false。

8. 数据保留

  • log.retention.{hours|minutes|ms} 该参数用来控制一条消息数据被保存多长时间,从优先级上来说 ms 设置最高、minutes 次之、hours 最低。

  • log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小。默认是 -1,表明在容量方面不做任何限制,注意该参数与 log.segment.bytes 的区别,后者表示单个日志文件的大小。

    9. 其他参数

    | 参数名称 | 默认值 | 含义 | | —- | —- | —- | | auto.create.topics.enable | true | 是否开启自动创建主题的功能 | | auto.leader.rebalance.enable | true | 是否开启消费者组 Leader 重平衡功能 | | background.threads | 10 | 指定执行后台任务的线程数 | | compression.type | producer | 消息的压缩类型。Kafka 支持的压缩类型有 Gzip、Snappy、LZ4 等。默认值 producer 表示根据生产者使用的压缩类型压缩,也就是说,生产者不管是否压缩消息,或者使用何种 压缩方式都会被 broker 端继承。uncompressed 表示不启用压缩 | | delete.topic.enable | true | 是否可以删除主题 | | leader.imbalance.check.interval.seconds | 300 | 检查leader是否分布不均衡的周期 | | leader.imbalance.per.broker.percentage | 10 | 允许 Leader 不均衡的比例,若超过这个值就会触发 Leader 再均衡的操作,前提是 auto.leader.rebalance.enable 参数要为 true | | log.flush.interval.messages | Long.MAX_VALUE | 如果日志文件中的消息在存入磁盘前的数量达到这个参数所设定的阈值时,则会强制将这些刷新日志文件到磁盘中。

    消息在写入磁盘前还要经历一层操作系统页缓存,如果期间发生掉电,则这些页缓存中的消息会丢失,调小这个参数的大小会增大消息的可靠性,但也会降低系统的整体性能 | | log.flush.interval.ms | null | 刷新日志文件的时间间隔。

    如果没有配置这个值,则会依据 log.flush. scheduler.interval.ms 参数设置的值来运作 | | log.flush.scheduler.interval.ms | Long.MAX_VALUE | 检查日志文件是否需要刷新的时间间隔 | | log.roll.hours | 168(7天) | 经过多长时间之后会强制新建一个日志分段 | | log.roll.ms | null | 同上,不过单位为毫秒。优先级比 log.roll.hours 要高 | | log.segment.bytes | 1GB | 日志分段文件的最大值,超过这个值会强制创建一个新的日志分段 | | log.segment.delete.delay.ms | 60秒 | 从操作系统删除文件前的等待时间 | | min.insync.replicas | 1 | ISR 集合中最少的副本数,通常设置 min.insync.replicas > 1

    该参数控制的是消息至少要被写入到多少个副本才算是已提交。设置成大于 1 可以提升消息持久性。注意:acks=all 表示消息写入到所有 ISR 副本中才算已提交,但没要求 ISR 副本有多少个。而 min.insync.replicas 参数做了这样的保证。

    确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本宕机,整个分区就由于不满足最少写入副本数而无法正常工作了。推荐设置成 replication.factor = min.insync.replicas+1。 | | num.io.threads | 8 | 处理请求的线程数,包括磁盘 IO | | num.network.threads | 3 | 处理接收和返回响应的线程数 | | log.cleaner.enable | true | 是否开启日志清理的功能 | | log.cleaner.min.cleanable.ratio | 0.5 | 限定可执行清理操作的最小污浊率 | | log.cleaner.threads | 1 | 用于日志清理的后台线程数 | | log.cleanup.policy | delete | 日志清理策略,还有一个可选项为 compact,表示日志压缩。 | | log.index.interval.bytes | 4096 | 每隔多少个字节的消息量写入就添加一条索引 | | log.index.size.max.bytes | 10MB | 单个索引文件的最大值 | | log.message.timestamp.type | CreateTime | 消息中的时间戳类型,另一个可选项为 LogAppendTime。CreateTime 表示消息创建的时间,LogAppendTime 表示消息追加到日志中的时间。 | | log.retention.check.interval.ms | 5分钟 | 日志清理的检查周期 | | num.partitions | 1 | 主题中默认的分区数 | | create.topic.policy.class.name | null | 创建主题时用来验证合法性的策略,这个参数配置的是一个类的全限定名,需要实现org.apache.kafka.server.policy.
    CreateTopicPolicy 接口 |