持久化
不要害怕文件系统(磁盘)
Kafka 对消息的存储和缓存严重依赖于文件系统。人们对于“磁盘速度慢”的普遍印象,使得人们对于持久化的架构能够提供强有力的性能产生怀疑。事实上,磁盘的速度比人们预期的要慢的多,也快得多,这取决于人们使用磁盘的方式。而且设计合理的磁盘结构通常可以和网络一样快。
关于磁盘性能的关键事实是,磁盘的吞吐量和过去十年里磁盘的寻址延迟不同。因此,使用6个7200rpm、SATA接口、RAID-5的磁盘阵列在JBOD配置下的顺序写入的性能约为600MB/秒,但随机写入的性能仅约为100k/秒,相差6000倍以上。因为线性的读取和写入是磁盘使用模式中最有规律的,并且由操作系统进行了大量的优化。现代操作系统提供了 read-ahead 和 write-behind 技术,read-ahead 是以大的 data block 为单位预先读取数据,而 write-behind 是将多个小型的逻辑写合并成一次大型的物理磁盘写入。关于该问题的进一步讨论可以参考 ACM Queue article,他们发现实际上顺序磁盘访问在某些情况下比随机内存访问还要快!
为了弥补这种性能差异,现代操作系统在越来越注重使用内存对磁盘进行 cache。现代操作系统主动将所有空闲内存用作 disk caching,代价是在内存回收时性能会有所降低。所有对磁盘的读写操作都会通过这个统一的 cache。如果不使用直接I/O,该功能不能轻易关闭。因此即使进程维护了 in-process cache,该数据也可能会被复制到操作系统的 pagecache 中,事实上所有内容都被存储了两份。
此外,Kafka 建立在 JVM 之上,任何了解 Java 内存使用的人都知道两点:
对象的内存开销非常高,通常是所存储的数据的两倍(甚至更多)。
随着堆中数据的增加,Java 的垃圾回收变得越来越复杂和缓慢。
受这些因素影响,相比于维护 in-memory cache 或者其他结构,使用文件系统和 pagecache 显得更有优势—我们可以通过自动访问所有空闲内存将可用缓存的容量至少翻倍,并且通过存储紧凑的字节结构而不是独立的对象,有望将缓存容量再翻一番。 这样使得32GB的机器缓存容量可以达到28-30GB,并且不会产生额外的 GC 负担。此外,即使服务重新启动,缓存依旧可用,而 in-process cache 则需要在内存中重建(重建一个10GB的缓存可能需要10分钟),否则进程就要从 cold cache 的状态开始(这意味着进程最初的性能表现十分糟糕)。 这同时也极大的简化了代码,因为所有保持 cache 和文件系统之间一致性的逻辑现在都被放到了 OS 中,这样做比一次性的进程内缓存更准确、更高效。如果你的磁盘使用更倾向于顺序读取,那么 read-ahead 可以有效的使用每次从磁盘中读取到的有用数据预先填充 cache。
这里给出了一个非常简单的设计:相比于维护尽可能多的 in-memory cache,并且在空间不足的时候匆忙将数据 flush 到文件系统,我们把这个过程倒过来。所有数据一开始就被写入到文件系统的持久化日志中,而不用在 cache 空间不足的时候 flush 到磁盘。实际上,这表明数据被转移到了内核的 pagecache 中。
这种 pagecache-centric 的设计风格出现在一篇关于 Varnish 设计的文章中。
常量时间就足够了
消息系统使用的持久化数据结构通常是和 BTree 相关联的消费者队列或者其他用于存储消息源数据的通用随机访问数据结构。BTree 是最通用的数据结构,可以在消息系统能够支持各种事务性和非事务性语义。 虽然 BTree 的操作复杂度是 O(log N),但成本也相当高。通常我们认为 O(log N) 基本等同于常数时间,但这条在磁盘操作中不成立。磁盘寻址是每10ms一跳,并且每个磁盘同时只能执行一次寻址,因此并行性受到了限制。 因此即使是少量的磁盘寻址也会很高的开销。由于存储系统将非常快的cache操作和非常慢的物理磁盘操作混合在一起,当数据随着 fixed cache 增加时,可以看到树的性能通常是非线性的——比如数据翻倍时性能下降不只两倍。
所以直观来看,持久化队列可以建立在简单的读取和向文件后追加两种操作之上,这和日志解决方案相同。这种架构的优点在于所有的操作复杂度都是O(1),而且读操作不会阻塞写操作,读操作之间也不会互相影响。这有着明显的性能优势,由于性能和数据大小完全分离开来——服务器现在可以充分利用大量廉价、低转速的1+TB SATA硬盘。 虽然这些硬盘的寻址性能很差,但他们在大规模读写方面的性能是可以接受的,而且价格是原来的三分之一、容量是原来的三倍。
在不产生任何性能损失的情况下能够访问几乎无限的硬盘空间,这意味着我们可以提供一些其它消息系统不常见的特性。例如:在 Kafka 中,我们可以让消息保留相对较长的一段时间(比如一周),而不是试图在被消费后立即删除。正如我们后面将要提到的,这给消费者带来了很大的灵活性。
Efficiency(效率)
我们在性能上已经做了很大的努力。 我们主要的使用场景是处理WEB活动数据,这个数据量非常大,因为每个页面都有可能大量的写入。此外我们假设每个发布 message 至少被一个consumer (通常很多个consumer) 消费, 因此我们尽可能的去降低消费的代价。
我们还发现,从构建和运行许多相似系统的经验上来看,性能是多租户运营的关键。如果下游的基础设施服务很轻易被应用层冲击形成瓶颈,那么一些小的改变也会造成问题。通过非常快的(缓存)技术,我们能确保应用层冲击基础设施之前,将负载稳定下来。 当尝试去运行支持集中式集群上成百上千个应用程序的集中式服务时,这一点很重要,因为应用层使用方式几乎每天都会发生变化。
我们在上一节讨论了磁盘性能。 一旦消除了磁盘访问模式不佳的情况,该类系统性能低下的主要原因就剩下了两个:大量的小型 I/O 操作,以及过多的字节拷贝。
小型的 I/O 操作发生在客户端和服务端之间以及服务端自身的持久化操作中。
为了避免这种情况,我们的协议是建立在一个 “消息块” 的抽象基础上,合理将消息分组。 这使得网络请求将多个消息打包成一组,而不是每次发送一条消息,从而使整组消息分担网络中往返的开销。Consumer 每次获取多个大型有序的消息块,并由服务端 依次将消息块一次加载到它的日志中。
这个简单的优化对速度有着数量级的提升。批处理允许更大的网络数据包,更大的顺序读写磁盘操作,连续的内存块等等,所有这些都使 KafKa 将随机流消息顺序写入到磁盘, 再由 consumers 进行消费。
另一个低效率的操作是字节拷贝,在消息量少时,这不是什么问题。但是在高负载的情况下,影响就不容忽视。为了避免这种情况,我们使用 producer ,broker 和 consumer 都共享的标准化的二进制消息格式,这样数据块不用修改就能在他们之间传递。
broker 维护的消息日志本身就是一个文件目录,每个文件都由一系列以相同格式写入到磁盘的消息集合组成,这种写入格式被 producer 和 consumer 共用。保持这种通用格式可以对一些很重要的操作进行优化: 持久化日志块的网络传输。 现代的unix 操作系统提供了一个高度优化的编码方式,用于将数据从 pagecache 转移到 socket 网络连接中;在 Linux 中系统调用 sendfile 做到这一点。
为了理解 sendfile 的意义,了解数据从文件到套接字的常见数据传输路径就非常重要:
操作系统从磁盘读取数据到内核空间的 pagecache
应用程序读取内核空间的数据到用户空间的缓冲区
应用程序将数据(用户空间的缓冲区)写回内核空间到套接字缓冲区(内核空间)
操作系统将数据从套接字缓冲区(内核空间)复制到通过网络发送的 NIC 缓冲区
这显然是低效的,有四次 copy 操作和两次系统调用。使用 sendfile 方法,可以允许操作系统将数据从 pagecache 直接发送到网络,这样避免重新复制数据。所以这种优化方式,只需要最后一步的copy操作,将数据复制到 NIC 缓冲区。
我们期望一个普遍的应用场景,一个 topic 被多消费者消费。使用上面提交的 zero-copy(零拷贝)优化,数据在使用时只会被复制到 pagecache 中一次,节省了每次拷贝到用户空间内存中,再从用户空间进行读取的消耗。这使得消息能够以接近网络连接速度的 上限进行消费。
pagecache 和 sendfile 的组合使用意味着,在一个kafka集群中,大多数 consumer 消费时,您将看不到磁盘上的读取活动,因为数据将完全由缓存提供。
可查看JAVA 中更多有关 sendfile 方法和 zero-copy (零拷贝)相关的资料
端到端的批量压缩
在某些情况下,数据传输的瓶颈不是 CPU ,也不是磁盘,而是网络带宽。对于需要通过广域网在数据中心之间发送消息的数据管道尤其如此。当然,用户可以在不需要 Kakfa 支持下一次一个的压缩消息。但是这样会造成非常差的压缩比和消息重复类型的冗余,比如 JSON 中的字段名称或者是或 Web 日志中的用户代理或公共字符串值。高性能的压缩是一次压缩多个消息,而不是压缩单个消息。
Kafka 以高效的批处理格式支持一批消息可以压缩在一起发送到服务器。这批消息将以压缩格式写入,并且在日志中保持压缩,只会在 consumer 消费时解压缩。
Producer(发送消息)
负载均衡(Load balancing)
生产者直接发送数据到主分区的服务器上,不需要经过任何中间路由。 为了让生产者实现这个功能,所有的 kafka 服务器节点都能响应这样的元数据请求: 哪些服务器是活着的,主题的哪些分区是主分区,分配在哪个服务器上,这样生产者就能适当地直接发送它的请求到服务器上。
客户端控制消息发送数据到哪个分区,这个可以实现随机的负载均衡方式,或者使用一些特定语义的分区函数。 我们有提供特定分区的接口让用于根据指定的键值进行hash分区(当然也有选项可以重写分区函数),例如,如果使用用户ID作为key,则用户相关的所有数据都会被分发到同一个分区上。 这允许消费者在消费数据时做一些特定的本地化处理。这样的分区风格经常被设计用于一些本地处理比较敏感的消费者。
异步发送(Asyncronous send)
批处理是提升性能的一个主要驱动,为了允许批量处理,kafka 生产者会尝试在内存中汇总数据,并用一次请求批次提交信息。 批处理,不仅仅可以配置指定的消息数量,也可以指定等待特定的延迟时间(如64k 或10ms),这允许汇总更多的数据后再发送,在服务器端也会减少更多的IO操作。 该缓冲是可配置的,并给出了一个机制,通过权衡少量额外的延迟时间获取更好的吞吐量。
消费者
Kafka consumer通过向 broker 发出一个“fetch”请求来获取它想要消费的 partition。consumer 的每个请求都在 log 中指定了对应的 offset,并接收从该位置开始的一大块数据。因此,consumer 对于该位置的控制就显得极为重要,并且可以在需要的时候通过回退到该位置再次消费对应的数据。
Push vs. Pull
最初我们考虑的问题是:究竟是由 consumer 从 broker 那里 pull 数据,还是由 broker 将数据 push 到 consumer。Kafka 在这方面采取了一种较为传统的设计方式,也是大多数的消息系统所共享的方式:即 producer 把数据 push 到 broker,然后 consumer 从 broker 中 pull 数据。 也有一些 logging-centric 的系统,比如 Scribe 和 Apache Flume,沿着一条完全不同的 push-based 的路径,将数据 push 到下游节点。这两种方法都有优缺点。然而,由于 broker 控制着数据传输速率, 所以 push-based 系统很难处理不同的 consumer。让 broker 控制数据传输速率主要是为了让 consumer 能够以可能的最大速率消费;不幸的是,这导致着在 push-based 的系统中,当消费速率低于生产速率时,consumer 往往会不堪重负(本质上类似于拒绝服务攻击)。pull-based 系统有一个很好的特性, 那就是当 consumer 速率落后于 producer 时,可以在适当的时间赶上来。还可以通过使用某种 backoff 协议来减少这种现象:即 consumer 可以通过 backoff 表示它已经不堪重负了,然而通过获得负载情况来充分使用 consumer(但永远不超载)这一方式实现起来比它看起来更棘手。前面以这种方式构建系统的尝试,引导着 Kafka 走向了更传统的 pull 模型。
另一个 pull-based 系统的优点在于:它可以大批量生产要发送给 consumer 的数据。而 push-based 系统必须选择立即发送请求或者积累更多的数据,然后在不知道下游的 consumer 能否立即处理它的情况下发送这些数据。如果系统调整为低延迟状态,这就会导致一次只发送一条消息,以至于传输的数据不再被缓冲,这种方式是极度浪费的。 而 pull-based 的设计修复了该问题,因为 consumer 总是将所有可用的(或者达到配置的最大长度)消息 pull 到 log 当前位置的后面,从而使得数据能够得到最佳的处理而不会引入不必要的延迟。
简单的 pull-based 系统的不足之处在于:如果 broker 中没有数据,consumer 可能会在一个紧密的循环中结束轮询,实际上 busy-waiting 直到数据到来。为了避免 busy-waiting,我们在 pull 请求中加入参数,使得 consumer 在一个“long pull”中阻塞等待,直到数据到来(还可以选择等待给定字节长度的数据来确保传输长度)。
你可以想象其它可能的只基于 pull 的, end-to-end 的设计。例如producer 直接将数据写入一个本地的 log,然后 broker 从 producer 那里 pull 数据,最后 consumer 从 broker 中 pull 数据。通常提到的还有“store-and-forward”式 producer, 这是一种很有趣的设计,但我们觉得它跟我们设定的有数以千计的生产者的应用场景不太相符。我们在运行大规模持久化数据系统方面的经验使我们感觉到,横跨多个应用、涉及数千磁盘的系统事实上并不会让事情更可靠,反而会成为操作时的噩梦。在实践中, 我们发现可以通过大规模运行的带有强大的 SLAs 的 pipeline,而省略 producer 的持久化过程。
消费者的位置
令人惊讶的是,持续追踪已经被消费的内容是消息系统的关键性能点之一。
大多数消息系统都在 broker 上保存被消费消息的元数据。也就是说,当消息被传递给 consumer,broker 要么立即在本地记录该事件,要么等待 consumer 的确认后再记录。这是一种相当直接的选择,而且事实上对于单机服务器来说,也没与其它地方能够存储这些状态信息。 由于大多数消息系统用于存储的数据结构规模都很小,所以这也是一个很实用的选择——因为只要 broker 知道哪些消息被消费了,就可以在本地立即进行删除,一直保持较小的数据量。
也许不太明显,但要让 broker 和 consumer 就被消费的数据保持一致性也不是一个小问题。如果 broker 在每条消息被发送到网络的时候,立即将其标记为 consumed,那么一旦 consumer 无法处理该消息(可能由 consumer 崩溃或者请求超时或者其他原因导致),该消息就会丢失。 为了解决消息丢失的问题,许多消息系统增加了确认机制:即当消息被发送出去的时候,消息仅被标记为sent 而不是 consumed;然后 broker 会等待一个来自 consumer 的特定确认,再将消息标记为consumed。这个策略修复了消息丢失的问题,但也产生了新问题。 首先,如果 consumer 处理了消息但在发送确认之前出错了,那么该消息就会被消费两次。第二个是关于性能的,现在 broker 必须为每条消息保存多个状态(首先对其加锁,确保该消息只被发送一次,然后将其永久的标记为 consumed,以便将其移除)。 还有更棘手的问题要处理,比如如何处理已经发送但一直得不到确认的消息。
Kafka 使用完全不同的方式解决消息丢失问题。Kafka的 topic 被分割成了一组完全有序的 partition,其中每一个 partition 在任意给定的时间内只能被每个订阅了这个 topic 的 consumer 组中的一个 consumer 消费。这意味着 partition 中 每一个 consumer 的位置仅仅是一个数字,即下一条要消费的消息的offset。这使得被消费的消息的状态信息相当少,每个 partition 只需要一个数字。这个状态信息还可以作为周期性的 checkpoint。这以非常低的代价实现了和消息确认机制等同的效果。
这种方式还有一个附加的好处。consumer 可以回退到之前的 offset 来再次消费之前的数据,这个操作违反了队列的基本原则,但事实证明对大多数 consumer 来说这是一个必不可少的特性。 例如,如果 consumer 的代码有 bug,并且在 bug 被发现前已经有一部分数据被消费了, 那么 consumer 可以在 bug 修复后通过回退到之前的 offset 来再次消费这些数据。
传统消息中间件发送消息借助2阶段来实现:即发送与确认机制,Kafka采用partition+offset的机制,避免的状态的维持
离线数据加载
可伸缩的持久化特性允许 consumer 只进行周期性的消费,例如批量数据加载,周期性将数据加载到诸如 Hadoop 和关系型数据库之类的离线系统中。
在 Hadoop 的应用场景中,我们通过将数据加载分配到多个独立的 map 任务来实现并行化,每一个 map 任务负责一个 node/topic/partition,从而达到充分并行化。Hadoop 提供了任务管理机制,失败的任务可以重新启动而不会有重复数据的风险,只需要简单的从原来的位置重启即可。
消息交付语义
Kafka可以提供的消息交付语义保证有多种:
- At most once——消息可能会丢失但绝不重传。
- At least once——消息可以重传但绝不丢失。
- Exactly once——这正是人们想要的, 每一条消息只被传递一次.