一、kafka特点:
- 消息系统:生产者消费者模型,先入先出(FIFO)。Partition内部是FIFO的,partition之间呢不是FIFO的,当然我们可以把topic设为一个partition,这样就是严格的FIFO。
- 持久化:可进行持久化操作。将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。通过将数据持久化到硬盘以及replication防止数据丢失。直接append到磁盘里去,这样的好处是直接持久化,数据不会丢失,第二个好处是顺序写,消费数据也是顺序读,所以持久化的同时还能保证顺序。
- 分布式:易于向外扩展。所有的producer、broker和consumer都会有多个,均为分布式的。无需停机即可扩展机器
高吞吐量:同时为发布和订阅提供高吞吐量。据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)。
消息系统。Kafka作为一款优秀的消息系统,可以用于异步处理,服务解耦,流量消峰。
- 应用监控。利用Kafka采集应用程序和服务器健康相关的指标,如CPU占用率、IO、内存、连接数、TPS、QPS等,然后将指标信息进行处理,从而构建一个具有监控仪表盘、曲线图等可视化监控系统。例如,很多公司采用Kafka与ELK(ElasticSearch、Logstash和Kibana)整合构建应用服务监控系统。
- 网站用户行为追踪。为了更好地了解用户行为、操作习惯,改善用户体验,进而对产品升级改进,将用户操作轨迹、内容等信息发送到Kafka集群上,通过Hadoop、Spark或Strom等进行数据分析处理,生成相应的统计报告,为推荐系统推荐对象建模提供数据源,进而为每个用户进行个性化推荐。
- 流处理。需要将已收集的流数据提供给其他流式计算框架进行处理,用Kafka收集流数据是一个不错的选择,而且当前版本的Kafka提供了Kafka Streams支持对流数据的处理。
- 持久性日志。Kafka可以为外部系统提供一种持久性日志的分布式系统。日志可以在多个节点间进行备份,Kafka为故障节点数据恢复提供了一种重新同步的机制。同时,Kafka很方便与HDFS和Flume进行整合,这样就方便将Kafka采集的数据持久化到其他外部系统。更多的场景主要是用来做日志分析系统,除了日志,网站的一些浏览数据应该也适用。(只要原始数据不需要直接存DB的都可以)
三、kafka集群介绍
(一)整体结构
如果看到这张图你很懵逼,木有关系!我们先来分析相关概念:
- Producer:Producer即生产者,消息的产生者,是消息的入口。
- Consumer:消费者,即消息的消费方,是消息的出口。
Consumer Group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高kafka的吞吐量!
kafka cluster:
- Broker:Broker是kafka实例,每个服务器上有一个或多个kafka的实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等……partition位于broker实例上,1个brokder实例可以有多个partition.
- Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。 在每个broker上都可以创建多个topic。
- Partition:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹!
- Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候 会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
- Message:每一条发送的消息主体。
- Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,主要用来跟踪Kafka 集群中的节点状态, 以及Kafka Topic, message 等等其他信息. 同时, Kafka 依赖于Zookeeper, 没有Zookeeper 是不能运行起来Kafka的。
说明:说通俗一点,和mysql对应起来,broker相当于我们的服务器,topic相当于数据库,一个broker上可以有多个数据库,pritition相当于我们数据库的分表,为了高可用,服务器分为主从
(二)工作流程分析
1. 发送数据
我们看上面的架构图中,producer就是生产者,是数据的入口。Producer在写入数据的时候永远的找leader,不会直接将数据写入follower!
需要注意的一点是,消息写入leader后,follower是主动的去leader进行同步的!producer采用push模式将数据发布到broker,每条消息追加到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的!写入示意图如下:
(1)数据分区
上面说到数据会写入到不同的分区,那kafka为什么要做分区呢?相信大家应该也能猜到,分区的主要目的是:
1、方便扩展。因为一个topic可以有多个partition,所以我们可以通过扩展机器去轻松的应对日益增长的数据量。
2、提高并发。以partition为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。
(2)kafka写入数据时如何选择partition
那在kafka中,如果某个topic有多个partition,producer又怎么知道该将数据发往哪个partition呢?kafka中有几个原则:
1、partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。
2、如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。
3、如果既没指定partition,又没有设置key,则会轮询选出一个partition。
(3)如何保证消息可靠性ACK
保证消息不丢失是一个消息队列中间件的基本保证,那producer在向kafka写入消息的时候,怎么保证消息不丢失呢?其实上面的写入流程图中有描述出来,那就是通过ACK应答机制(request.required.acks)
在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据,这个参数可设置的值为1、0、-1。
1(默认):这意味着producer的leader已成功收到的数据并发送ACK确认后,生产者就可以发送下一条message。如果leader宕机了,这条消息还没有被其他flower 同步到,则会丢失数据。
0:这意味着producer无需等待broker的确认,就可以继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
-1:producer需要等待ISR((in-sync Replica))中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当ISR中只有leader时,这样就变成了acks=1的情况。
(4)数据一致性(ISR/HW)
kafka不是完全同步,也不是完全异步,是一种ISR机制:
1. leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica),每个Partition都会有一个ISR,而且是由leader动态维护
2. 如果一个flower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其重ISR中移除(参数:replica.lag.time.max.ms & replica.lag.max.messages )
3. 当ISR中所有Replica都向Leader发送ACK时,leader才commit
LEO(last end offset):每个partition中标识当前日志文件中已写入消息的最后一条的下一条待写入的消息的offset
HW(high watermark):一个partition对应的ISR中最小的LEO作为HW,consumer最多只能消费到HW所在的位置上一条信息。小于 HW 值的offset所对应的消息被认为是“已提交”或“已备份”的消息,才对消费者可见。
假设分区的副本为3,其中副本0是 Leader,副本1和副本2是 follower,并且在 ISR 列表里面。虽然副本0已经写入了 Message4,但是 Consumer 只能读取到 Message2。因为所有的 ISR 都同步了 Message2,只有 High Water Mark 以上的消息才支持 Consumer 读取,而 High Water Mark 取决于 ISR 列表里面偏移量最小的分区,对应于上图的副本2,这个很类似于木桶原理。
(5)leader选举
当 Leader 挂掉了,Kafka 会从 ISR 列表中选择第一个 follower 作为新的 Leader,因为这个分区拥有最新的已经 committed 的消息。通过这个可以保证已经 committed 的消息的数据可靠性。
leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
2. 保存数据
Producer将数据写入kafka后,集群就需要对数据进行保存了!kafka将数据保存在磁盘,可能在我们的一般的认知里,写入磁盘是比较耗时的操作,不适合这种高并发的组件。Kafka初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)。
Partition 结构
前面说过了每个topic都可以分为一个或多个partition,如果你觉得topic比较抽象,那partition就是比较具体的东西了!Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件,log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。
如上图,这个partition有三组segment文件,每个log文件的大小是一样的,但是存储的message数量是不一定相等的(每条的message大小不一致)。partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。 数值最大为64位long大小,19位数字字符长度,没有数字用0填充。kafka就是利用分段+索引的方式来解决查找效率的问题。
Message结构
上面说到log文件就实际是存储message的地方,我们在producer往kafka写入的也是一条一条的message,那存储在log中的message是什么样子的呢?消息主要包含消息体、消息大小、offset、压缩类型……等等!我们重点需要知道的是下面三个:
1、offset:offset是一个占8byte的有序id号,它可以唯一确定每条消息在parition内的位置!
2、消息大小:消息大小占用4byte,用于描述消息的大小。
3、消息体:消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。
存储策略
无论消息是否被消费,kafka都会保存所有的消息。那对于旧数据有什么删除策略呢?
1、基于时间,默认配置是168小时(7天)。
2、基于大小,默认配置是1073741824。 需要注意的是,kafka读取特定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能!
3. 消费数据
消息存储在log文件后,消费者就可以进行消费了。与生产消息相同的是,消费者在拉取消息的时候也是找leader去拉取。
多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组id,同一个消费组者组的消费者可以消费同一topic下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!!!
图示是消费者组内的消费者小于partition数量的情况,所以会出现某个消费者消费多个partition数据的情况,消费的速度也就不及只处理一个partition的消费者的处理速度!如果是消费者组的消费者多于partition的数量,那会不会出现多个消费者消费同一个partition的数据呢?上面已经提到过不会出现这种情况!多出来的消费者不消费任何partition的数据。所以在实际的应用中,建议消费者组的consumer的数量与partition的数量一致!
如何读取/查找message?
在保存数据的小节里面,我们聊到了partition划分为多组segment,每个segment又包含.log、.index、.timeindex文件,存放的每条message包含offset、消息大小、消息体……我们多次提到segment和offset,查找消息的时候是怎么利用segment+offset配合查找的呢?
假如现在需要查找一个offset为368801的message是什么样的过程呢?我们先看看下面的图:
1、先找到offset的368801message所在的segment文件(利用二分法查找),这里找到的就是在第二个segment文件。
2、打开找到的segment中的.index文件(也就是368796.index文件,该文件起始偏移量为368796+1,我们要查找的offset为368801的message在该index内的偏移量为368796+5=368801,所以这里要查找的相对offset为5)。由于该文件采用的是稀疏索引的方式存储着相对offset及对应message物理偏移量的关系,所以直接找相对offset为5的索引找不到,这里同样利用二分法查找相对offset小于或者等于指定的相对offset的索引条目中最大的那个相对offset,所以找到的是相对offset为4的这个索引。
3、根据找到的相对offset为4的索引确定message存储的物理偏移位置为256。打开数据文件,从位置为256的那个地方开始顺序扫描直到找到offset为368801的那条Message。
这套机制是建立在offset为有序的基础上,利用segment+有序offset+稀疏索引+二分查找+顺序查找等多种手段来高效的查找数据!至此,消费者就能拿到需要处理的数据进行处理了。那每个消费者又是怎么记录自己消费的位置呢?在早期的版本中,消费者将消费到的offset维护zookeeper中,consumer每间隔一段时间上报一次,这里容易导致重复消费,且性能不好!在新的版本中消费者消费到的offset已经直接维护在kafk集群的__consumer_offsets这个topic中【我们公司的就放在 consumer_offset topic中】。
四、总结
(1)Kafka架构是由producer(消息生产者)、consumer(消息消费者)、borker(kafka集群的server,负责处理消息读、写请求,存储消息,在kafka cluster这一层这里,其实里面是有很多个broker)、topic(消息队列/分类相当于队列,里面有生产者和消费者模型)、zookeeper(元数据信息存在zookeeper中,包括:存储消费偏移量,topic话题信息,partition信息) 这些部分组成。
(2)kafka里面的消息是有topic来组织的,简单的我们可以想象为一个队列,一个队列就是一个topic,然后它把每个topic又分为很多个partition,这个是为了做并行的,在每个partition内部消息强有序,相当于有序的队列,其中每个消息都有个序号offset,比如0到12,从前面读往后面写。一个partition对应一个broker,一个broker可以管多个partition,比如说,topic有6个partition,有两个broker,那每个broker就管3个partition。这个partition可以很简单想象为一个文件,当数据发过来的时候它就往这个partition上面append,追加就行,消息不经过内存缓冲,直接写入文件,kafka和很多消息系统不一样,很多消息系统是消费完了我就把它删掉,而kafka是根据时间策略删除,而不是消费完就删除,在kafka里面没有一个消费完这么个概念,只有过期这样一个概念。
(3)producer自己决定往哪个partition里面去写,这里有一些的策略,譬如如果hash,不用多个partition之间去join数据了。consumer自己维护消费到哪个offset,每个consumer都有对应的group,group内是queue消费模型(各个consumer消费不同的partition,因此一个消息在group内只消费一次),group间是publish-subscribe消费模型,各个group各自独立消费,互不影响,因此一个消息在被每个group消费一次。
其他:
bmq平台:https://sea.pri.ibanyu.com/bmq/topic/list
kafka公司库:http://midplatform.book.pri.ibanyu.com/doc/middleware/mq.html
kafka为什么这么快?https://zhuanlan.zhihu.com/p/78335525
文件存储机制:https://feizichen.me/2019/12/KafkaFile/
kafka 数据可靠性保证3板斧:https://cloud.tencent.com/developer/article/1665709
日志分段策略:https://blog.csdn.net/a3125504x/article/details/108165372
总的来说Kafka快的原因:
1、partition顺序读写,充分利用磁盘特性,这是基础;
2、Producer生产的数据持久化到broker,采用mmap文件映射,实现顺序的快速写入;
3、Customer从broker读取数据,采用sendfile,将磁盘文件读到OS内核缓冲区后,直接转到socket buffer进行网络发送。
mmap 和 sendfile总结
1、都是Linux内核提供、实现零拷贝的API;
2、sendfile 是将读到内核空间的数据,转到socket buffer,进行网络发送;
3、mmap主要实现方式是将读缓冲区的地址和用户缓冲区的地址进行映射,内核缓冲区和应用缓冲区共享,从而减少了从读缓冲区到用户缓冲区的一次CPU拷贝。mmap将磁盘文件映射到内存,支持读和写,对内存的操作会反映在磁盘文件上。
RocketMQ 在消费消息时,使用了 mmap。kafka 使用了 sendFile。