:::info Kafka 对消息保存时根据 Topic 进行归类,发送消息者称为 Producer,消息接受者称为 Consumer,此外 Kafka 集群有多个 Kafka 实例组成,每个实例 (server) 称为 broker。无论是 Kafka 集群,还是 consumer 都依赖于 zookeeper 集群保存一些 meta 信息,来保证系统可用性 :::
1、Kafka 介绍
Kafka 是最初由 Linkedin 公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于 zookeeper 协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于 hadoop 的批处理系统、低延迟的实时系统、storm/Spark 流式处理引擎,web/nginx 日志、访问日志,消息服务等等,用 scala 语言编写,Linkedin 于2010年贡献给了 Apache 基金会并成为顶级开源项目
2、Kafka 特性
- 高吞吐量、低延迟:kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个 topic 可以分多个 partition。consumer group 对 partition 进行 consume 操作
- 可扩展性:kafka 集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
-
3、Kafka 的使用场景
日志收集:一个公司可以用 Kafka 可以收集各种服务的 log,通过 kafka 以统一接口服务的方式开放给各种 consumer,例如 hadoop、Hbase、Solr等
- 消息系统:解耦和生产者和消费者、缓存消息等
- 用户活动跟踪:Kafka 经常被用来记录 web 用户或者 app 用户的各种活动。如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后订阅者通过订阅这些 topic 来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘
- 运营指标:Kafka 也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告
- 流式处理:比如 spark streaming 和 storm
-
4、Kafka 文件存储机制
4.1 Kafka 部分名词解释
Kafka 中发布订阅的对象是 topic。我们可以为每类数据创建一个 topic,把向 topic 发布消息的客户端称作 producer,从 topic 订阅消息的客户端称作 consumer。Producers 和 consumers 可以同时从多个 topic 读写数据。一个 kafka 集群由一个或多个broker服务器组成,它负责持久化和备份具体的kafka消息。
Broker:Kafka 节点,一个 Kafka 节点就是一个 broker,多个 broker 可以组成一个 Kafka 集群
- Topic:一类消息,消息存放的目录即主题,例如 page view日志、click日志等都可以以 topic 的形式存在,Kafka 集群能够同时负责多个topic的分发
- Partition:topic 物理上的分组,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列
- Segment:partition 物理上由多个 segment 组成,每个 Segment 存着 message 信息
- Producer : 生产 message 发送到 topic
- Consumer : 订阅 topic 消费 message, consumer 作为一个线程来消费
Consumer Group:一个Consumer Group包含多个consumer, 这个是预先在配置文件中配置好的。各个consumer(consumer 线程)可以组成一个组(Consumer group ),partition中的每个message只能被组(Consumer group ) 中的一个consumer(consumer 线程 )消费,如果一个message可以被多个consumer(consumer 线程 ) 消费的话,那么这些consumer必须在不同的组。Kafka不支持一个partition中的message由两个或两个以上的consumer thread来处理, 除非来自不同的consumer group
4.2 kafka一些原理概念
4.2.1 持久化
kafka 使用文件存储消息(append only log),这就直接决定 kafka 在性能上严重依赖文件系统的本身特性。且无论任何 OS 下,对文件系统本身的优化是非常艰难的。文件缓存/直接内存映射等是常用的手段。因为 kafka 是对日志文件进行 append 操作,因此磁盘检索的开支是较小的;
同时为了减少磁盘写入的次数,broker 会将消息暂时 buffer 起来,当消息的个数(或尺寸)达到一定阀值时,再 flush 到磁盘。这样减少了磁盘 IO 调用的次数。对于 kafka 而言,较高性能的磁盘将会带来更加直接的性能提升4.2.2 性能
除磁盘 IO 之外,还需要考虑网络 IO。kafka 并没有提供太多高超的技巧:
对于 producer 端,可以将消息 buffer 起来,当消息的条数达到一定阀值时,批量发送给 broker
- 对于 consumer 端也是一样,批量 fetch 多条消息。不过消息量的大小可以通过配置文件来指定
对于 broker 端,有个 sendfile 系统调用可以潜在的提升网络IO的性能:将文件的数据映射到系统内存中,socket 直接读取相应的内存区域即可,而无需进程再次copy和交换
4.2.3 负载均衡
kafka 集群中的任何一个 broker 都可以向 producer 提供 metadata 信息,这些 metadata 中包含”集群中存活的servers列表”、”partitions leader列表” 等信息。 当 producer 获取到 metadata 信息之后, producer 将会和 Topic 下所有 partition leader 保持 socket 连接
消息由 producer 直接通过 socket 发送到 broker,中间不会经过任何”路由层”
异步发送,将多条消息暂且在客户端 buffer 起来,并将他们批量发送到 broker4.2.4 消息传输一致
Kafka 提供3种消息传输一致性语义:最多1次、最少1次、恰好1次
最少1次:可能会重传数据,有可能出现数据被重复处理的情况
- 最多1次:可能会出现数据丢失情况
- 恰好1次:并不是指真正只传输1次,只不过有一个机制,确保不会出现“数据被重复处理”和“数据丢失”的情况
4.2.5 副本
kafka 中,replication 策略是基于 partition,而不是 topic。kafka 将每个 partition 数据复制到多个 server上,任何一个 partition 有一个 leader 和多个 follower (可以没有)。备份的个数可以通过 broker 配置文件来设定4.2.6 Leader 的选择
Kafka 的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素
如果 leaders 永远不会 down 的话我们就不需要 followers 了!一旦 leader down 掉了,需要在 followers 中选择一个新的leader。但是 followers 本身有可能延时太久或者 crash,所以必须选择高质量的 follower 作为 leader。必须保证,一旦一个消息被提交了,但是 leader down 掉了,新选出的 leader 必须可以提供这条消息。
大部分的分布式系统采用了多数投票法则选择新的 leader,对于多数投票法则,就是根据所有副本节点的状况动态的选择最适合的作为 leader。Kafka并不是使用这种方法
Kafka 动态维护了一个同步状态的副本的集合,简称 ISR,在这个集合中的节点都是和 leader 保持高度一致的,任何一条消息必须被这个集合中的每个节点读取并追加到日志中了,才会通知外部这个消息已经被提交了。因此这个集合中的任何一个节点随时都可以被选为 leader。ISR 在 ZooKeeper 中维护。ISR 中有 f+1 个节点,就可以允许在 f 个节点 down 掉的情况下不会丢失消息并正常提供服。ISR 的成员是动态的,如果一个节点被淘汰了,当它重新达到“同步中”的状态时,他可以重新加入 ISR。这种 leader 的选择方式是非常快速的,适合 kafka 的应用场景
4.2.7 副本管理
实际上一个 Kafka 将会管理成千上万的 topic 分区。Kafka 尽量的使所有分区均匀的分布到集群所有的节点上而不是集中在某些节点上,另外主从关系也尽量均衡这样每个几点都会担任一定比例的分区的 leader
4.3 partiton 中文件存储方式
- 每个 partion (目录)相当于一个巨型文件被平均分配到多个大小相等 segment (段)数据文件中。但每个段 segment file 消息数量不一定相等,这种特性方便 old segment file快速被删除
每个 partiton 只需要支持顺序读写就行了,segment 文件生命周期由服务端配置参数决定
4.4 Kafka Broker一些特性
无状态的 Kafka Broker:
Broker没有副本机制,一旦 broker 宕机,该 broker 的消息将都不可用
- Broker不保存订阅者的状态,由订阅者自己保存
- 无状态导致消息的删除成为难题(可能删除的消息正在被订阅),kafka采用基于时间的SLA(服务水平保证),消息保存一定时间(通常为7天)后会被删除
- 消息订阅者可以rewind back到任意位置重新进行消费,当订阅者故障时,可以选择最小的offset进行重新读取消费消息
4.4.1 压缩
Kafka支持以集合(batch)为单位发送消息,在此基础上,Kafka 还支持对消息集合进行压缩,Producer 端可以通过 GZIP 或 Snappy 格式对消息集合进行压缩。Producer 端进行压缩之后,在 Consumer 端需进行解压。压缩的好处就是减少传输的数据量,减轻对网络传输的压力,在对大数据处理上,瓶颈往往体现在网络上而不是CPU
如何区分消息是压缩的还是未压缩的呢,Kafka 在消息头部添加了一个描述压缩属性字节,这个字节的后两位表示消息的压缩采用的编码,如果后两位为0,则表示消息未被压缩4.4.2 消息可靠性
在消息系统中,保证消息在生产和消费过程中的可靠性是十分重要的,在实际消息传递过程中,可能会出现如下3种情况:
- 一个消息发送失败
- 一个消息被发送多次
- 最理想的情况:exactly-once ,一个消息发送成功且仅发送了一次
有许多系统声称它们实现了 exactly-once,但是它们其实忽略了生产者或消费者在生产和消费过程中有可能失败的情况。比如虽然一个 Producer 成功发送一个消息,但是消息在发送途中丢失,或者成功发送到 broker,也被 consumer 成功取走,但是这个 consumer 在处理取过来的消息时失败了
- 从 Producer 端看:Kafka 是这么处理的,当一个消息被发送后,Producer 会等待 broker 成功接收到消息的反馈(可通过参数控制等待时间),如果消息在途中丢失或是其中一个 broker 挂掉,Producer 会重新发送(我们知道 Kafka 有备份机制,可以通过参数控制是否等待所有备份节点都收到消息)
从 Consumer 端看:broker端记录了 partition 中的一个 offset 值,这个值指向 Consumer下一个即将消费 message。当 Consumer收到了消息,但却在处理过程中挂掉,此时 Consumer 可以通过这个 offset 值重新找到上一个消息再进行处理。Consumer 还有权限控制这个 offset 值,对持久化到 broker 端的消息做任意处理
5、Kakfa的设计思想
Kakfa Broker Leader 的选举
Kakfa Broker 集群受 Zookeeper 管理。所有的 Kafka Broker 节点一起去 Zookeeper 上注册一个临时节点,因为只有一个 Kafka Broker 会注册成功,其他的都会失败,所以这个成功在 Zookeeper 上注册临时节点的这个 Kafka Broker 会成为 Kafka Broker Controller,其他的 Kafka broker叫 Kafka Broker follower。(这个过程叫 Controller在 ZooKeeper 注册 Watch) :::info 例如:
一旦有一个 broker宕机了,这个 kafka broker controller 会读取该宕机 broker 上所有的 partition 在zookeeper 上的状态,并选取 ISR 列表中的一个 replica 作为 partition leader(如果ISR列表中的 replica 全挂,选一个幸存的 replica 作为 leader;如果该 partition 的所有的 replica 都宕机了,则将新的 leader 设置为-1,等待恢复,等待ISR中的任一个 Replica“活”过来,并且选它作为 Leader;或选择第一个“活”过来的 Replica(不一定是ISR中的)作为 Leader),这个 broker 宕机的事情,kafka controller 也会通知 zookeeper,zookeeper 就会通知其他的 kafka broker :::Consumergroup
各个 consumer 可以组成一个组,partition 中的每个 message 只能被组(Consumer group)中的一个 consumer(consumer 线程) 消费,如果一个 message 可以被多个 consumer 消费的话,那么这些 consumer 必须在不同的组。
Kafka 不支持一个 partition 中的 message 由两个或两个以上的同一个 consumer group 下的 consumer thread 来处理,除非再启动一个新的 consumer group。所以如果想同时对一个 topic 做消费的话,启动多个 consumer group 就可以了,但是要注意的是,这里的多个 consumer 的消费都必须是顺序读取 partition 里面的 message,新启动的 consumer 默认从 partition 队列最头端最新的地方开始阻塞的读 messageTopic & Partition
Topic 相当于传统消息系统 MQ 中的一个队列 queue,producer 端发送的 message 必须指定是发送到哪个 topic,但是不需要指定 topic下的哪个 partition,因为 kafka 会把收到的 message 进行 load balance,均匀的分布在这个 topic下的不同的 partition上
一般来说,一个 Topic 的 Partition 数量大于等于 Broker 的数量,可以提高吞吐率。同一个 Partition 的 Replica 尽量分散到不同的机器,高可用消息投递可靠性
一个消息如何算投递成功,Kafka提供了3种模式:
第一种是啥都不管,发送出去就当作成功,这种情况当然不能保证消息成功投递到broker
- 第二种是Master-Slave模型,只有当Master和所有Slave都接收到消息时,才算投递成功,这种模型提供了最高的投递可靠性,但是损伤了性能
- 第三种是只要Master确认收到消息就算投递成功;实际使用时,根据应用特性选择,绝大多数情况下都会中和可靠性和性能选择第三种模型
message状态
在 Kafka 中,消息的状态被保存在 consumer 中,broker 不会关心哪个消息被消费了被谁消费了,只记录一个 offset 值(指向partition中下一个要被消费的消息位置),这就意味着如果 consumer 处理不好的话,broker 上的一个消息可能会被消费多次message持久化
Kafka 中会把消息持久化到本地文件系统中,并且保持o(1)极高的效率message有效期
Kafka 会长久保留其中的消息,以便 consumer 可以多次消费,当然其中很多细节是可配置的Produer
Producer 向 Topic 发送 message,不需要指定 partition,直接发送就好了。
kafka 通过 partition ack 来控制是否发送成功并把信息返回给 producer,producer 可以有任意多的 thread,这些 kafka 服务器端是不 care 的。Producer 端的 delivery guarantee 默认是 At least once 的。也可以设置 Producer 异步发送实现 At most once。Producer 可以用主键幂等性实现 Exactly onceKafka高吞吐量
Kafka 的高吞吐量体现在读写上,分布式并发的读和写都非常快,写的性能体现在以o(1)的时间复杂度进行顺序写入。读的性能体现在以o(1)的时间复杂度进行顺序读取,对 topic 进行 partition分区,consume group 中的 consume 线程可以以很高能性能进行顺序读批量发送
Kafka 支持以消息集合为单位进行批量发送,以提高 push 效率负载均衡方面
Kafka 提供了一个 metadata API 来管理 broker 之间的负载(对Kafka0.8.x而言,对于0.7.x主要靠 zookeeper 来实现负载均衡)解耦
相当于一个 MQ,使得 Producer 和 Consumer 之间异步的操作,系统之间解耦冗余
replica 有多个副本,保证一个 broker node 宕机后不会影响整个服务扩展性
broker 节点可以水平扩展,partition 也可以水平增加,partition replica 也可以水平增加峰值
在访问量剧增的情况下,kafka 水平扩展, 应用仍然需要继续发挥作用可恢复性
系统的一部分组件失效时,由于有 partition 的 replica 副本,不会影响到整个系统顺序保证性
由于 kafka 的 producer 的写 message 与 consumer 去读 message 都是顺序的读写,保证了高效的性能缓冲
由于 producer 那面可能业务很简单,而后端 consumer 业务会很复杂并有数据库的操作,因此肯定是 producer 会比 consumer 处理速度快,如果没有 kafka,producer 直接调用 consumer,那么就会造成整个系统的处理速度慢,加一层 kafka 作为 MQ,可以起到缓冲的作用。异步通信
作为 MQ,Producer 与 Consumer 异步通信
