• 具有高吞吐量来支持高容量事件流,例如实时日志聚合。
  • 处理大量的数据积压,支持离线系统的周期性数据加载。
  • 低延迟分发,处理消息传递用例。
  • 分区、分布式、实时处理来创建新的分发馈送等特性,衍生出分区模式和消费者模式。
  • 数据流被推送到其他应用,接收方故障时保证容错。

    1. 持久化

    Kafka 建立在 JVM 之上,任何了解 Java 内存使用的人都知道两点:
  1. 对象的内存开销非常高,通常是所存储的数据的两倍,甚至更多。
  2. 随着堆中数据的增加,Java 的垃圾回收变得越来越复杂和缓慢。

使用文件系统和pagecache显得更有优势,通过自动访问所有空闲内存将可用缓存的容量至少翻倍,并且通过存储紧凑的字节结构而不是独立的对象,有望将缓存容量再翻一番。 这样使得32GB的机器缓存容量可以达到28-30GB,并且不会产生额外的 GC 负担。即使服务重新启动,缓存依旧可用。所有数据一开始就被写入到文件系统的持久化日志中,而不用在cache空间不足的时候flush到磁盘。

**数据结构**
BTree 是最通用的数据结构,在消息系统能够支持各种事务性和非事务性语义,操作复杂度是 O(log N),由于存储系统将非常快的cache操作和非常慢的物理磁盘操作混合在一起,当数据随着 fixed cache 增加时,可以看到树的性能通常是非线性的——比如数据翻倍时性能下降不只两倍。所以持久化队列可以建立在简单的读取和向文件后追加两种操作之上,这和日志解决方案相同,操作复杂度是O(1),而且读操作不会阻塞写操作,读操作之间也不会互相影响。

2. 性能

IO密集性能处理:Kafka协议是建立在一个 “消息块” 的抽象基础上,合理将消息分组。 使网络请求将多个消息打包成一组,从而减少网络开销,服务端依次将消息块一次加载到它的日志中。
高负载字节拷贝性能处理:Producer、Broker、Consumer共享的标准化的二进制消息格式,数据块不用修改便能在三者之间传递。

**Pagecache & Sendfile组合**
Broker维护的消息日志本身就是一个文件目录,每个文件都由一系列以相同格式写入到磁盘的消息集合组成,这种写入格式被Producer和Consumer 共用。保持这种通用格式可以对一些很重要的操作进行优化: 持久化日志块的网络传输。 现代的unix操作系统提供了一个高度优化的编码方式,用于将数据从pagecache转移到socket网络连接中。
zero-copy(零拷贝)优化,数据在使用时只会被复制到Pagecache一次,节省 拷贝/读取 内存的消耗。

**批量压缩**
网络带宽瓶颈:批处理格式支持一批消息可以压缩在一起发送到服务器,压缩格式写入,日志中仍保持压缩,只会在Consumer消费时解压缩。 Kafka 支持GZIPSnappyLZ4压缩协议。

3. 生产者

**负载均衡**
Kafka服务节点都能响应元数据请求,例如:存活的服务,主题主分区,分配至哪个服务上。客户端控制消息发送数据到哪个分区,实现随机的负载均衡,或者使用一些特定语义的分区函数。 Kafka提供特定分区的接口让用于根据指定的键值进行hash分区(当然也有选项可以重写分区函数)。


**异步发送**
生产者会尝试在内存中汇总数据,并用一次请求批次提交信息。 批处理,不仅仅可以配置指定的消息数量,也可以指定等待特定的延迟时间(如64k 或10ms),这允许汇总更多的数据后再发送,在服务器端也会减少更多的IO操作。 该缓冲是可配置的,并给出了一个机制,通过权衡少量额外的延迟时间获取更好的吞吐量。

4. 消费者

Consumer通过向broker发出fetch请求获取消费的partition。Consumer请求在log中指定对应offset,并接收从该位置开始的数据,并且在需要的时候通过回退到该位置,再次消费对应的数据。

**Push && Pull**
基于pull-based系统特性,当消息积压时,可以在适当的时间赶上来。或通过使用某种backoff协议来减少这种现象:即 consumer 可以通过 backoff 表示它已经不堪重负了,然而通过获得负载解决更麻烦。
优点:Consumer将所有可用的或者达到配置的最大长度消息pull至log当前位置的后面,使数据能够得到最佳的处理而不会引入不必要的延迟。
缺点:当Broker没有数据,Consumer会在一个紧密的循环中结束轮询。为了避免 busy-waiting,在pull请求中加入参数,使consumer阻塞等待,直到数据到来。

**消费者的位置**
持续追踪已经被消费的内容是消息系统的关键性能点之一。为了解决消息丢失,增加了确认机制:即当消息被发送出去的时候,消息仅被标记为sent 而不是 consumed;然后 broker 会等待一个来自 consumer 的特定确认,再将消息标记为consumed。
Kafka处理消息丢失:将topic分割成一组有序的partition,在任意给定时间内只能被订阅了的Consumers组中的一个消费。Partition中consumer的位置仅仅是一个数字,即下一条要消费的消息的offset。offset还可以作为周期性的checkpoint。实现了和消息确认机制等同的效果。并且consumer可以回退到之前的offset。例如:出现Bug时,修复后通过回退到之前的offset来再次消费这些数据。

**离线数据加载**
可伸缩的持久化特性允许Consumer进行周期性消费。例如:批量数据加载,周期性将数据加载到诸如 Hadoop 和关系型数据库之类的离线系统中。在Hadoop的应用场景中,通过将数据加载分配到多个独立的map任务实现并行,每个map任务负责一个node/topic/partition,从而达到充分并行化。Hadoop提供了任务管理机制,失败的任务可以重新启动而不会有重复数据的风险,需要从原来的位置重启即可。

5. 消息交付

  • At most once:消息会丢失但不重传。
  • At least once:消息可重传但不丢失。
  • Exactly once:每条消息只被传递一次。

发布消息的持久性保证和消费消息的保证。Kafka在Kafka Streams中支持了exactly-once的消息交付功能,并且在 topic 之间进行数据传递和处理时,通常使用事务型 producer/consumer 提供 exactly-once 的消息交付功能。到其它目标系统的exactly-once的消息交付通常需要与该类系统协作,但Kafka提供了 offset,使得这种应用场景的实现变得可行。否则,Kafka 默认保证 at-least-once 的消息交付, 并且 Kafka 允许用户通过禁用 producer 的重传功能和让 consumer 在处理一批消息之前提交 offset,来实现 at-most-once 的消息交付。

6. 副本

默认启动备份机制。创建副本的单位是partition,每个分区都有一个 leader和零或多个followers。副本总数等于leader数,所有读写操作由leader处理,一般partition数量大于broker数量,各分区leader均匀分布在brokers中。所有followers节点同步leader节点日志,日志中消息和偏移量和leader一致。在任何给定时间, leader节点的日志末尾时可能有几个消息尚未完成备份。
Kafka判断节点判活两种方式。

  1. Zookeeper通过心跳机制检查每个节点的连接。
  2. Follower节点必须能及时同步leader的写操作,并且延时不能太久。

满足条件的节点处于in sync状态,区别于alivefailed。 Leader追踪in sync节点,当节点挂掉、写超时、心跳超时等,leader会把它从同步副本列表中移除。同步超时和写超时由replica.lag.time.max.ms配置。
Kafka没有处理所谓的Byzantine故障。例如:节点挂掉,然后又恢复,即一个节点出现了随意响应和恶意响应,可能由于bug或非法操作导致。

**备份日志**
备份日志按照一系列有序的值(通常是编号为0、1、2、…)进行建模。最简单和最快的方法是由leader节点选择需要提供的有序的值,只要leader节点还存活,所有的follower只需要拷贝数据并按照leader节点的顺序排序。
如果选择写入时候需要保证一定数量的副本写入成功,读取时需要保证读取一定数量的副本,读取和写入之间有重叠。这样的读写机制称为Quorum。这种权衡的一种常见方法是对提交决策和 leader 选举使用多数投票机制。大多数投票方法优点:延迟取决于最快的服务器。分布式算法,包含ZooKeeper的Zab、Raft和Viewstamped Replication。Kafka实际执行情况最相似为微软的PacificA
Kafka动态维护一个同步状态的备份的集合 ISR(a set of in-sync replicas),只有这个集合成员才有资格被选举为leader,一条消息必须被这个集合 所有节点读取并追加到日志中了,这条消息才能视为提交。ISR集合发生变化会在ZooKeeper持久化。

**Unclean Leader选举**
当所有的备份不可用时,如何保证数据不丢失:

  1. 等待一个ISR副本重新恢复正常服务,并选择这个副本作为leader。
  2. 选择第一个重新恢复正常服务的副本作为leader。

可用性和一致性之间的妥协。Kafka默认是第二种策略,配置属性unclean.leader.election.enable禁用此策略,就会使用第一种策略即停机时间优于不同步。

**可用性 && 持久性**
写数据时,Producers设置ack是否提交完成,0不等待broker返回确认消息,1leader保存成功返回,-1(all)所有备份都保存成功返回。

:设置ack = all并不能保证所有的副本都写入消息。

当acks=all时,只要ISR副本同步完成,就会返回消息已经写入。
当持久性优先于持久性时,提供两种配置:

  1. 禁用unclean leader选举机制,如果所有的备份节点都挂了,分区数据就会不可用,直到最近leader恢复正常。这种策略优先于数据丢失的风险。
  2. 指定最小的 ISR 集合大小,只有当 ISR 的大小大于最小值,分区才能接受写入操作,以防止仅写入单个备份的消息丢失造成消息不可用的情况,这个设置只有在生产者使用 acks = all 的情况下才会生效,至少保证消息被 ISR 副本写入。此设置是一致性和可用性之间的折中选择,对于设置更大的最小ISR大小保证了更好的一致性,因为它保证将消息被写入了更多的备份,减少了消息丢失的可能性。但会降低可用性,因为如果 ISR 副本的数量低于最小阈值,那么分区将无法写入。

    7. 日志压缩

    保证至少单个topic partition的数据日志中每个message key保留最新值。解决了应用程序崩溃、系统故障后恢复或者应用在运行维护过程中重启后重新加载缓存的场景。

**应用场景**

  1. 数据库更改订阅。
  2. 事件源。
  3. 日志高可用。

在这些场景中,主要需要处理变化的实时feed,但是偶尔当机器崩溃或需要重新加载或重新处理数据时,需要处理所有数据。 日志压缩允许在同一topic下同时使用这两个用例。日志压缩机制是更细粒度的、每个记录都保留的机制,而不是基于时间的粗粒度。 这个理念是选择性的删除那些有更新的变更的记录的日志。 这样最终日志至少包含每个key的记录的最后一个状态。这个策略可以为每个Topic设置,这样一个集群中,可以一部分Topic通过时间和大小保留日志,另外一些可以通过压缩压缩策略保留。

**基础**
image.png
该日志逻辑图解释了日志的每条消息的offset逻辑结构Log head中包含传统的Kafka日志,它包含了连续的offset和所有的消息。 日志压缩增加了处理Log Tail的选项,Tail中的消息保存了初次写入时的offset,即使该offset的消息被压缩,所有offset仍然在日志中是有效的。在这个场景中,无法区分和下一个出现的更高offset的位置。 36、37、38是属于相同位置的,从他们开始读取日志都将从38开始。
通过消息的key和空负载(null payload)来标识该消息可从日志中删除。 这个删除标记将会引起所有之前拥有相同key的消息被移除(包括拥有key相同的新消息)。但是删除标记比较特殊,它将在一定周期后被从日志中删除来释放空间。这个时间点被称为“delete retention point”。
压缩操作通过在后台周期性的拷贝日志段来完成。 清除操作不会阻塞读取,并且可以被配置不超过一定IO吞吐来避免影响Producer和Consumer。实际的日志段压缩过程如下。
image.png
**保障措施**

  1. 任何滞留在日志head中的所有消费者能看到写入的所有消息;这些消息都是有序的offset。 topic使用min.compaction.lag.ms来保障消息写入之前必须经过的最小时间长度,才能被压缩。这限制了一条消息在Log Head中的最短存在时间。
  2. 始终保持消息的有序性。压缩永远不会重新排序消息。
  3. 消息的Offset不会变更。这是消息在日志中的永久标志。
  4. 任何从头开始处理日志的Consumer至少会拿到每个key的最终状态。 只要Consumer在小于Topic的delete.retention.ms设置(默认24小时)的时间段内到达Log head,将会看到所有删除记录的所有删除标记。 换句话说,因为移除删除标记和读取是同时发生的,Consumer可能会因为落后超过delete.retention.ms而导致错过删除标记。

**细节**
日志压缩由Log Cleaner执行,后台线程池重新拷贝日志段,移除那些key存在于Log Head中的记录。每个压缩线程如下工作:

  1. 选择log headlog tail比率最高的日志。
  2. head log中为每个key的最后offset创建一个的简单概要。
  3. 删除日志中最新出现的key的旧值。新的日志立即被交到日志中,只需要一个额外的日志段空间(不是日志的完整副本)。
  4. 日志head本质上是一个空间密集型的哈希表,每个条目使用24个字节。所以如果有8G的整理缓冲区,则能迭代处理大约366G的日志头部(假设消息大小为1k)。

**Log Cleaner**
默认启用,启动清理的线程池。如果开始特定Topic的清理功能,设置:

log.cleanup.policy=compact

通过创建Topic时配置或者之后使用Topic命令实现。Log Cleaner可以配置保留最小的不压缩的Log Head。配置压缩延迟时间:

log.cleaner.min.compaction.lag.ms

保证消息在配置的时长内不被压缩。如果没有设置,除了最后一个日志外,所有的日志都会被压缩。活动segment不会被压缩,即使它保存的消息的滞留时长已经超过了配置的最小压缩时间长。

8. 配额

Kafka集群可以对客户端请求进行配额,控制集群资源的使用。生产或消费数据会产生大量的请求,导致对broker资源垄断,引起网络的饱和,对其他clients和brokers本身造成DOS攻击。 资源的配额保护可以有效防止这些问题,在大型多租户集群中,因为一小部分表现不佳的客户端降低了良好的用户体验,这种情况下非常需要资源的配额保护。

  1. 定义字节率的阈值来限定网络带宽的配额。 (v 0.9开始)
  2. 请求率的配额,网络、I/O线程、cpu利用率的百分比。 (v 0.11开始)

**Client Groups**
Kafka client 是一个用户概念,集群中经过身份验证的用户。在一个支持非授权客户端的集群中,用户是一组非授权的users,broker使用一个可配置的PrincipalBuilder类来配置group规则。Client-id是客户端的逻辑分组,客户端应用使用一个有意义的名称进行标识。(user, client-id)元组定义了一个安全的客户端逻辑分组,使用相同的user和client-id标识。
资源配额可以针对(user,client-id),users或者client-id groups 三种规则进行配置。对于一个请求连接,连接会匹配最细化的配额规则的限制。同一个group的所有连接共享这个group的资源配额。

**配置**
资源配额的配置可以根据 (user, client-id),user 和 client-id 三种规则进行定义。在配额级别需要更高(或者更低)的配额的时候,是可以覆盖默认的配额配置。这种机制和每个topic可以自定义日志配置属性类似。 覆盖User和 (user, client-id) 规则的配额配置会写到zookeeper /config/users路径下,client-id 配额的配置会写到/config/clients路径下。这些配置的覆盖会被所有的brokers实时的监听到并生效。所以这使得我们修改配额配置不需要重启整个集群。
配额配置的优先级顺序。

  1. /config/users//clients/
  2. /config/users//clients/
  3. /config/users/
  4. /config/users//clients/
  5. /config/users//clients/
  6. /config/users/
  7. /config/clients/
  8. /config/clients/

Broker的配置属性quota.producer.default, quota.consumer.default也可以用来设置client-id groups默认的网络带宽配置。client-id 的默认配额也是用zookeeper配置,和其他配额配置的覆盖和默认方式是相似的。

**网络带宽**
默认不同的客户端 group 是集群配置的固定配额,单位是bytes/sec。这个配额会以broker为基础进行定义。在clients被限制之前,Group clients可以发布和拉取单个broker的最大速率。

**请求速率**
定义客户端可以使用broker request handler I/O 线程网络线程在配额窗口时间内使用的百分比。n%代表一个线程的使用率,建立在总容量((num.io.threads + num.network.threads) * 100)%之上。Group client的资源在被限制之前可以使用单位配额时间窗口内I/O线程和网络线程利用率的n%。 I/O和网络线程的数量基于broker核数,所以请求量的配额代表每个Group client使用cpu的百分比。