1. 为什么会有 kafka

在看之前,可以先了解一下什么是消息系统:https://www.yuque.com/studys/nd86k0/gwofxq
这个问题也可以理解为 kafka 到底是为了解决什么问题而出现的。

🌰1:

假设你意气风发,要开发新一代的互联网应用,以期在互联网事业中一展宏图。借助云计算,很容易开发出如下原型系统:

  1. Web应用:部署在云服务器上,为个人电脑或者移动用户提供的访问体验。
  2. SQL数据库:为Web应用提供数据持久化以及数据查询。

image.png

这套架构简洁而高效,很快便能够部署到云计算平台,以便快速推向市场。互联网不就是讲究小步快跑嘛!

好景不长。随着用户的迅速增长,所有的访问都直接通过SQL数据库使得它不堪重负,不得不

  • 加上缓存服务以降SQL数据库的荷载;
  • 为了理解用户行为,开始收集日志并保存到Hadoop上离线处理,同时把日志放在全文检索系统中以便快速定位问题;
  • 由于需要给投资方看业务状况,也需要把数据汇总到数据仓库中以便提供交互式报表。

此时的系统的架构已经盘根错节了,考虑将来还会加入实时模块以及外部数据交互,emmmmm………..
image.png

本质上,这是一个数据集成问题。没有任何一个系统能够解决所有的事情,所以业务数据根据不同用途存而放在不同的系统,比如归档、分析、搜索、缓存等。数据冗余本身没有任何问题,但是不同系统之间像意大利面条一样复杂的数据同步却是挑战。

如何解决这种非常混乱的数据问题呢?这时候 kafka 出现了
Kafka可以让合适的数据以合适的形式出现在合适的地方。Kafka的做法是提供消息队列,让生产者单往队列的末尾添加数据,让多个消费者从队列里面依次读取数据然后自行处理。之前连接的复杂度是O(N^2),而现在降低到O(N),扩展起来方便多了:

image.png

以上故事说明了Kafka主要用途是数据集成,或者说是流数据集成,以Pub/Sub形式的消息总线形式提供。但是,Kafka不仅仅是一套传统的消息总线,本质上Kafka是分布式的流数据平台,因为以下特性而著名:

  1. 提供Pub/Sub方式的海量消息处理。
  2. 以高容错的方式存储海量数据流。
  3. 保证数据流的顺序。

可能通过上面例子,稍微有点印象,那么通过例子2,通过稍微专业的角度在解释下。

🌰2:

例子 2从上面说的:提供Pub/Sub方式的海量消息处理 来说起。

说起Pub/Sub,熟悉企业应用集成(Enterprise Application Integration,EAI)的朋友不会陌生,它是一种处理消息的范式,消息的发布者(Pub)**只需要指定消息的类别,而不需要与消息订阅者(Sub**)打交道。订阅者对一个或多个类别表达兴趣,于是只接收感兴趣的消息,而不需要知道什么样的发布者发布的消息。这种发布者和订阅者的解耦可以给应用带来更好的可扩展性。

打个比方,你的公司业务蓬勃发展,前后开发了多个互联网应用都需要做市场推广。

  • 方法 1:

通过电话向客户推广产品新特性,不但需要找到每个互联网应用所对应的客户名单,还要挨个电话联系,可是,客户不一定有空接电话或者已经在通讯中了,长长的客户名单也会让你头疼不已;

  • 方法 2:

另一种方法是为每个互联网应用创建一个微信公众号(用公众号分割推广信息),让客户订阅后推送产品新特性的信息(客户不用关心到底谁发的),客户有空的时候看一下(客户不用立等答复你),信息量太大的话就再加个同事订阅(水平扩展客户处理能力),有机会还可以介绍给其他潜在用户订阅(你也不必特意通知新客户)。很明显,电话这种同步消息交换的方式很容易产生瓶颈,而微信公众号这类异步消息交换的方式客户再多也不用担心。

image.png image.png
方法 1 方法 2

Kafka提供的Pub/Sub就是典型的异步消息交换,用户可以为服务器日志或者物联网设备创建不同主题(Topic),之后数据可以源源不断地发送到各个主题,后端数据仓库、流式分析或者全文检索等对接特定主题,服务器或者物联网设备是无需关心的。

同时,Kafka可以将主题划分为多个分区(Partition),会根据分区规则选择把消息存储到哪个分区中,只要如果分区规则设置的合理,那么所有的消息将会被均匀的分布到不同的分区中,这样就实现了负载均衡和水平扩展。另外,多个订阅者可以从一个或者多个分区中同时消费数据,以支撑海量数据处理能力:
image.png

Kafka的设计也是源自生活,好比是为公路运输,不同的起始点和目的地需要修不同高速公路(主题),高速公路上可以提供多条车道(分区),流量大的公路多修几条车道保证畅通,流量小的公路少修几条车道避免浪费。收费站好比消费者,车多的时候多开几个一起收费避免堵在路上,车少的时候开几个让汽车并道就好了。

由于消息是以追加到分区中的,多个分区顺序写磁盘的总效率要比随机写内存还要高(引用Apache Kafka – A High Throughput Distributed Messaging System的观点),是Kafka高吞吐率的重要保证之一。

为了保证数据的可靠性,Kafka会给每个分区找一个节点当带头大哥(Leader),以及若干个节点当随从(Follower)。消息写入分区时,带头大哥除了自己复制一份外还会复制到多个随从。如果随从挂了,Kafka会再找一个随从从带头大哥那里同步历史消息;如果带头大哥挂了,随从中会选举出新一任的带头大哥,继续笑傲江湖。

kafka 做什么总结

为什么要用消息系统

  • 解耦
    在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息队列在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束
  • 冗余
    有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在被许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理过程明确的指出该消息已经被处理完毕,确保你的数据被安全的保存直到你使用完毕。
  • 扩展性
    因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的;只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。
  • 灵活性 & 峰值处理能力
    在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
  • 可恢复性
    当体系的一部分组件失效,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。而这种允许重试或者延后处理请求的能力通常是造就一个略感不便的用户和一个沮丧透顶的用户之间的区别。
  • 送达保证
    消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可。在此基础上,部分消息系统提供了一个”只送达一次”保证。无论有多少进程在从队列中领取数据,每一个消息只能被处理一次。这之所以成为可能,是因为获取一个消息只是”预定”了这个消息,暂时把它移出了队列。除非客户端明确的表示已经处理完了这个消息,否则这个消息会被放回队列中去,在一段可配置的时间之后可再次被处理。
  • 顺序保证
    在大多使用场景下,数据处理的顺序都很重要。消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。部分消息系统保证消息通过FIFO(先进先出)的顺序来处理,因此消息在队列中的位置就是从队列中检索他们的位置。
  • 缓冲
    在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行–写入队列的处理会尽可能的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流经过系统的速度。
  • 理解数据流
    在一个分布式系统里,要得到一个关于用户操作会用多长时间及其原因的总体印象,是个巨大的挑战。消息队列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化。
  • 异步通信
    很多时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。你想向队列中放入多少消息就放多少,然后在你乐意的时候再去处理它们。

疑问

redis 同样也支持消息的订阅和发布,那么 kafka 功能和 redis 是否有重复?
同样举个🌰:

老板有个好消息要告诉大家,有两个办法: 1.到每个座位上挨个儿告诉每个人。什么?张三去上厕所了?那张三就只能错过好消息了! 2.老板把消息写到黑板报上,谁想知道就来看一下,什么?张三请假了?没关系,我一周之后才擦掉,总会看见的!什么张三请假两周?那就算了,我反正只保留一周,不然其他好消息没地方写了 redis用第一种办法,kafka用第二种办法

专业点就是:

  • 先Redis,它首先是一个内存数据库,其提供的PUB/SUB功能把消息保存在内存中(基于channel),因此如果你的消息的持久性需求并不高且后端应用的消费能力超强的话,使用Redis PUB/SUB是比较合适的使用场景。比如官网说提供的一个网络聊天室的例子:模拟IRC,因为channel就是IRC中的服务器。用户发起连接,发布消息到channel,接收其他用户的消息。这些对于持久性的要求并不高,使用Redis PUB/SUB来做足矣。

  • 而Kafka是一个完整的系统,它提供了一个高吞吐量、分布式的提交日志(由于提供了Kafka Connect和Kafka Streams,目前Kafka官网已经将自己修正为一个分布式的流式处理平台)。除了p2p的消息队列,它当然提供PUB/SUB方式的消息模型。而且,Kafka默认提供了消息的持久化,确保消息的不丢失性(至少是大部分情况下)。另外,由于消费元数据是保存在consumer端的,所以对于消费而言consumer被赋予极大的自由度。consumer可以顺序地消费消息,也可以重新消费之前处理过的消息。这些都是Redis PUB/SUB无法做到的。

2.kafka解析

知道了 kafka 是干什么的,现在说一下属于 kafka 的专属名词

kafka术语

Broker

Kafka集群包含一个或多个服务器,这种服务器被称为broker。即你在一台机器上安装了Kafka,那么这台机器就叫Broker,kafka集群包含了一个或者多个这样的实例。

Topic

  • 每条发布到Kafka集群的消息都有一个类别,如何区别分类?这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上,但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)
  • 如果往不存在的 topic写数据,kafka 会自动创建 topic,partition 和 replication 的数量默认配置为 1

Producer

负责发布消息(写入数据)到Kafka broker,消息的生产者一般写在业务系统里。

Partition

parition是物理上的概念,将Topic拆成多个段,增加并行度后,拆成的每个部分叫做Partition,每个topic包含一个或多个partition,创建topic时可指定parition数量。

每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件,分区的命名规则为${topicName}-{partitionId},__consumer_offsets-0

分区目录下存储的是该分区的日志段,包括日志数据文件和两个索引文件

每条消息被追加到相应的分区中,是顺序写磁盘,因此效率非常高,这也是Kafka高吞吐率的一个重要保证。

每一条消息被发送到broker时,会根据paritition规则选择被存储到哪一个partition。如果partition规则设置的合理,所有消息可以均匀分布到不同的partition里,这样就实现了水平扩展。(如果一个topic对应一个文件,那这个文件所在的机器I/O将会成为这个topic的性能瓶颈,而partition解决了这个问题)

在创建topic时可以在 /config/server.properties 中指定这个partition的数量(如下所示),当然也可以在topic创建之后去修改parition数量。

  1. num.partitions=3

leader

分区的主节点(老大)

follower

分区的从节点,负责从 leader 节点拉取数据存储到自身。
image.png

Consumer

那些消费Kafka中数据的应用程序,就叫做Consumer

每个consumer属于一个特定的consumer group(为某个主题的某个消费业务起一个名字,这么名字就叫做Consumer Group,可为每个consumer指定group name,若不指定group name则属于默认的group)。

使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。
image.png

如上图所示,一个典型的kafka集群中包含若干producer(可以是web前端产生的page view,或者是服务器日志,系统CPU、memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高)
若干consumer group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在consumer group发生变化时进行rebalance。producer使用push模式将消息发布到broker,consumer使用pull模式从broker订阅并消费消息。  

Push & Pull

作为一个messaging system,Kafka遵循了传统的方式,选择由producer向broker push消息并由consumer从broker pull消息。
  push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。

Topic & Partition

Topic在逻辑上可以被认为是一个queue。每条消费都必须指定它的topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以水平扩展,物理上把topic分成一个或多个partition,每个partition在物理上对应一个文件夹,该文件夹下存储这个partition的所有消息和索引文件。

选择 Partition 的原则

在 kafka 中,如果某个 topic 存在多个 partition,producer 怎么知道该往哪个 partition 里面发送数据呢?

  • partition 在写入的时候需要指定 partition,如果有指定,则写入对应的 partition。
  • 如果没有指定 partition,但是设置了数据的 key,则会根据 key 的值 hash 出一个 partition。
  • 如果既没有指定 partition,又没有设置 key,则会采用轮询的方式,即每次取出一小段时间的数据写入某个 partition,下一段时间写入下一个 partition。

ack 应答机制

producer 在向 kafka 写入消息的时候,可以设置参数来确定是否确认 kafka 接收到数据。这个参数可设置的值有:0、1、all。

  • 0 代表 producer 往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低 但是效率高。
  • 1 代表producer 往集群发送消息只要 leader 应答就可以发送下一条了,只确保 leader 发送成功
  • all 代表 producer 往集群发送数据需要所有的 follower 都完成从 leader 的同步才会发送下一条数据,确保 leader 发送成功和所有的副本都完成备份,安全性最高,但是效率低。


message

消息是Kafka通讯的基本单位,有一个固定长度的消息头和一个可变长度的消息体(payload)构成。在Java客户端中又称之为记录(Record)。

消息结构各部分说明如下:

  • CRC32: CRC32校验和,4个字节。
  • magic: Kafka服务程序协议版本号,用于做兼容。1个字节。
  • attributes: 该字段占1字节,其中低两位用来表示压缩方式,第三位表示时间戳类型(0表示LogCreateTime,1表示LogAppendTime),高四位为预留位置,暂无实际意义。
  • timestamp: 消息时间戳,当magic>0 时消息头必须包含该字段。8个字节。
  • key-length: 消息key长度,4个字节。
  • key: 消息key实际数据。
  • payload-length: 消息实际数据长度,4个字节。
  • payload: 消息实际数据
  • 在实际存储一条消息还包括12字节的额外开销(LogOverhead):
    • 消息的偏移量: 8字节,类似于消息的Id。
    • 消息的总长度: 4字节

对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略去删除旧数据。

  • 基于时间,
  • 基于partition文件大小。

例如可以通过配置config/server.properties,让Kafka删除一周前的数据
也可通过配置让Kafka在partition文件超过1GB时删除旧数据,如下所示。

  1. log.retention.hours=168
  2. log.segment.bytes=1073741824
  3. log.retention.check.interval.ms=300000
  4. log.cleaner.enable=false

这里要注意,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除文件与Kafka性能无关,选择怎样的删除策略只与磁盘以及具体的需求有关。另外,Kafka会为每一个consumer group保留一些metadata信息–当前消费的消息的position,也即offset。这个offset由consumer控制。正常情况下consumer会在消费完一条消息后线性增加这个offset。当然,consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些consumer过,不需要通过broker去保证同一个consumer group只有一个consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。

image.png

至于如何使用和一些参数配置,可参考两篇文章:文章1文章2