架构解释
1)Producer :
消息生产者,就是向kafka broker发消息的客户端;
2)Consumer :
消息消费者,向kafka broker取消息的客户端;
3)Consumer Group (CG):
消费者组内每个消费者负责消费不同分区的数据,
工作中一个消费者组消费一个topic数据
一个分区只能由一个组内的一个消费者消费
consumer group中包含多个消费者,消费者的个数最好是与topic 分区数一致<br /> 如果consumer group中消费者个数>topic分区数,此时会有消费者没有分区数据可以消费<br /> 如果consumer group中消费者个数<topic分区数,此时会有消费者消费多个分区的数据(并行消费)<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/21361442/1620307808573-e77e1a52-e2f2-4e5c-8438-8f9f933eebd9.png#crop=0&crop=0&crop=1&crop=1&height=312&id=L9zOq&margin=%5Bobject%20Object%5D&name=image.png&originHeight=623&originWidth=980&originalType=binary&ratio=1&rotation=0&showTitle=false&size=41370&status=done&style=none&title=&width=490)
为什么消费者组不能并行消费?
因为消息队列是一个一个出来的,如果并行消费则不能保证队列的顺序一致
消费者组之间互不影响。
消费者组是逻辑上的一个订阅者。
4)Broker :
一台kafka服务器就是一个broker(节点)。一个集群由多个broker组成。一个broker可以容纳多个topic。
5)Topic :
可以理解为一个队列,生产者和消费者面向的都是一个topic
主题,一般工作中一个业务对应一个主题;
6)Partition:
为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列
作用:
增加了磁盘io能力
增强的并发运行能力
分布式存储(容错),便于扩容;
缺陷:因为分区的存在,所以消息整体上变得无序。
7)Replica:
副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,
kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower。
8)leader:
每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。
9)follower(副本):
每个分区多个副本中的“从”,实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个follower会成为新的leader。
10)offset:
偏移量,offset是数据在分区中的唯一标识,后续消费者组消费topic区数据的时候,会通过offset记录下一次应该从哪个位置开始消费
Kafka工作流程及文件存储机制
Kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。
每个partition对应于一个log文件,该log文件中存储的就是producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错时恢复,从上次的位置继续消费。
由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。每个segment对应两个文件——“.index”文件和“.log”文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如,first这个topic有三个分区,则其对应的文件夹为first-0,first-1,first-2。
log满足滚动条件(time、size)后就分片(segment)
说明:只能知道offset文件的起始位置,末端位置在下一个offset出来后才能知道
-------------------------- Segment1
00000000000000000000.index
00000000000000000000.log
-------------------------- Segment2
00000000000000170410.index
00000000000000170410.log
-------------------------- Segment3
00000000000000239430.index
00000000000000239430.log
segment、index、log、offset说明
offset种类:
kafka offset
comsume offset
索引创建规则:稀疏索引
最小每4kb的log文件创建一条索引
例如:
传输文件的时候,按批次传输,每次传输10kb,那么就会创建1个索引
每次传输2kb,那么就会创建2个索引
证据:
如下图的一个编号为0000000000000000000的segment中
log文件的offset是连贯的
而
index文件是不连贯的
0000000000000000000.log文件:
0000000000000000000.index文件:
其中position是message的物理偏移地址
可知:
1、index文件是稀疏索引 |
---|
2、最后一条offset都是一样的!99999 |
所以:下一条segment名字从99999+1开始! 即: 0000000000000100000.index 0000000000000100000.log |
可得结论:
①segment的命名规范:
1、每个分区第一个segment的文件名=00000000000000000
2、第N个segment的文件名 = 第N-1个segment中最后一个offset+1
比如此时有一个分区有三个segment: 0000000000000000、0000000000000019、0000000000000031
0000000000000000segment中存放的是offet从0到18的数据
0000000000000019segment中存放的是offet从19到30的数据
0000000000000031segment中存放的是offet 31之后的数据
**②如何根据offset找到数据**?<br /> 1、先确定segment:根据segment文件名(当前segment与上一个segment)依据二分查找法,确定offset数据在哪个segment中<br /> 2、再确定再哪个log文件区间:根据segment文件的Index索引确定数据处于log文件哪个区间(索引名=log名)
3、获得offset:根据log文件扫描区间得到offset数据<br />如图:<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/21361442/1619773414285-c77251cc-2862-4ed6-a396-fd75e25ab96e.png#crop=0&crop=0&crop=1&crop=1&height=487&id=bOgZD&margin=%5Bobject%20Object%5D&name=image.png&originHeight=974&originWidth=1811&originalType=binary&ratio=1&rotation=0&showTitle=false&size=140949&status=done&style=none&title=&width=905.5)
生产者
分区策略
1)分区的原因
(1)方便在集群中扩展
每个Partition可以通过调整以适应它所在的机器,
而一个topic又可以有多个Partition组成,
因此整个集群就可以适应任意大小的数据了;
(2)可以提高并发,
因为可以以Partition为单位读写了。
2)分区的原则
我们需要将producer发送的数据封装成一个ProducerRecord对象。
1、直接指定分区号: 数据发到指定分区中
2、没有指定分区号,但是有key: 数据发到 (key.hashCode % 分区数) 所在分区中
3、既没有分区号,也没有key:
新版本<随机,下一个分区不会是当前分区>:
1、第一个批次发送的时候,会生成一个随机数,第一个批次的数据发送 (随机数% 分区数) 所在分区中
2、第N个批次的数据发送的时候,此时会排除掉第N-1个批次发送的分区号,然后从剩余分区中随机选择一个发送
旧版本<轮询>:
1、第一个批次发送的时候,会生成一个随机数,第一个批次的数据发送 随机数% 分区数 所在分区中
2、第N个批次的数据发送的时候,此时会发送到 (第一个批次生成的随机数+(N-1))% 分区数 所在分区中
数据可靠性保证
topic—ack—>producer
为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement确认收到)[等到partition的leader和follower同步完数据后再发送ack],如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。
1)副本数据同步策略
方案 | 优点 | 缺点 |
---|---|---|
半数以上完成同步,就发送ack | 延迟低 | 选举新的leader时,容忍n台节点的故障时,需要2n+1个副本 |
全部完成同步,才发送ack | 选举新的leader时,容忍n台节点的故障时,需要n+1个副本 | 延迟高 |
Kafka
选择了第二种方案,原因如下:
1.同样为了容忍n台节点的故障,第一种方案需要2n+1个副本,而第二种方案只需要n+1个副本,而Kafka的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
2.虽然第二种方案的网络延迟会比较高,但网络延迟对Kafka的影响较小。
2)ISR(内部同步副本集)
采用第二种方案之后,设想以下情景:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步,那leader就要一直等下去,直到它完成同步,才能发送ack。这个问题怎么解决呢?<br /> Leader维护了一个动态的in-sync replica set (ISR)(内部同步副本集),意为和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给producer发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由**replica.lag.time.max.ms**参数设定。Leader发生故障之后,就会从ISR中选举新的leader。<br />LEO(log end offset) leader 每个副本的最后一个offset<br />HW(所有副本中最小的LEO) 消费者可以消费的位置 <br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/21361442/1629442377777-4bd235a2-0ca3-4689-9f43-f35526d138e8.png#clientId=u7654b609-9f1c-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=460&id=u7ce5934e&margin=%5Bobject%20Object%5D&name=image.png&originHeight=460&originWidth=977&originalType=binary&ratio=1&rotation=0&showTitle=false&size=40694&status=done&style=none&taskId=u4a8db48b-5bf9-4180-9814-dbd935b5507&title=&width=977)<br />前提宕机前分区内的HW是不变的<br />假如follower宕机<br />那么当flollower重启后,HW以上的部分被截断,follower重新从HW同步数据(防止数据重复)<br />假如leader宕机<br />kafka选择一个follower最为leader,leader重启后,HW以上的部分被截取,这部分数据就会被丢失(ACK=1)
总结:HW的作用:用来避免同步数据的时候发生重复同步,所以leader重启后,leader部分数据会丢失。
3)ack应答机制
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。
所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。
acks参数配置:
ack:先leader落盘,后同步follower落盘再ack
acks:
0:不等待
producer不等待broker的ack,这一操作提供了一个最低的延迟,
broker一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据;
1:leader落盘不同步follower
producer等待broker的ack,partition的leader落盘成功后返回ack,
如果在follower同步成功之前leader故障,那么将会丢失数据;
-1(all):
producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。
但是如果在follower同步完成后,broker发送ack之前,leader发生故障(没办法发送ack,producer重新发送上一条数据,但follower又已有该数据),那么会造成数据重复。
4)故障处理细节
LEO:指的是每个副本最大的offset;
HW:指的是消费者能见到的最大的offset,ISR队列中最小的LEO。
(1)follower故障
follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。
(2)leader故障
leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
消费者
consumer采用pull(拉)模式从broker中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。
分区分配策略
一个消费者组对应一个topic
一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费。
Kafka有两种分配策略,一是RoundRobin,一是Range。
1)RoundRobin
轮询: 一个分区分配给一个消费者,轮着来,
算法是
1、先对分区取hash值,
2、再对hash值取模。
比如:
Topic[partition0、partition1、partition2、partition3、partition4]
consumer group[consumer1、consumer2]
此时轮询的分配结果:
consumer1消费partition0、partition2、partition4
consumer2消费partition1、partition3
2)Range
1、确定每个消费者平均消费几个分区: 分区数/消费者个数
2、确定前几个消费者多消费一个分区: 分区数%消费者个数
比如:
Topic[partition0、partition1、partition2、partition3、partition4]
consumer group[consumer1、consumer2]
1、确定每个消费者平均消费几个分区: 分区数/消费者个数 = 5/2=2 ,求的是平均消费的分区数
2、确定前几个消费者多消费一个分区: 分区数%消费者个数 = 5%2=1,求的是人
此时range的分配结果:
consumer1消费partition0、partition1、partition2
consumer2消费partition3、partition4
offset的维护
由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。
Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,
记住:从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets。
另外如果以后重装kafka也需要删除kafka在zookeeper中的节点,因为zookeeper里面存放了ids信息,和自己配置好的节点信息产生冲突
1)修改配置文件consumer.properties(/opt/module/kafka_2.11-2.4.1/config)
说明:Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to true the only way to receive records from an internal topic is subscribing to it.
这个参数用于是否把内部topic的信息(例如offset)暴露给cosumer,如果设置为true,就只能通过订阅的方式来获取内部topic的数据。
exclude.internal.topics=false
2)读取offset(消费__consumer_offsets)
说明:kafka的日志文件如:000000000000.log在本地是读不出来的,会乱码,他只能被消费者消费后,通过指令消费着读取
注意:搞清楚:
消费数据的偏移量 —>comsume offset
生成数据的偏移量 —>kafka offset
0.11.0.0之前版本:
bin/kafka-console-consumer.sh \
--topic __consumer_offsets \
--zookeeper hadoop102:2181 \
--formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" \
--consumer.config config/consumer.properties \
--from-beginning
bin/kafka-console-consumer.sh
--topic __consumer_offsets -–bootstrap-server hadoop102:9092
--formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
--consumer.config config/consumer.properties
--from-beginning
0.11.0.0之后版本:
???
bin/kafka-console-consumer.sh \
--topic __consumer_offsets \
--zookeeper hadoop102:2181 \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
--consumer.config config/consumer.properties \
--from-beginning
//下面的指令才正确
bin/kafka-console-consumer.sh
--topic __consumer_offsets -–bootstrap-server hadoop102:9092
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
--consumer.config config/consumer.properties
--from-beginning
消费者组案例
1)需求:测试同一个消费者组中的消费者,同一时刻只能有一个消费者消费。
2)案例实操
(1)在hadoop102、hadoop103上修改/opt/module/kafka/config/consumer.properties配置文件中的group.id属性为任意组名。
[atguigu@hadoop103 config]$ vi consumer.properties
group.id=atguigu
(2)在hadoop102、hadoop103上分别启动消费者
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --topic first --consumer.config config/consumer.properties
[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first --consumer.config config/consumer.properties
(3)在hadoop104上启动生产者
[atguigu@hadoop104 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic first
>hello world
(4)查看hadoop102和hadoop103的接收者。<br /> 同一时刻只有一个消费者接收到消息。
Kafka 高效读写数据
生产者-消费者:
kafka不是立刻就把数据写到blocker中,
会事先写到缓冲区中,满足条件后在写:
读数据:
0)pagecache页缓存
1、生产者每个批次数据首先写入broker pagecache中,等到pagecache缓存空间不足的时候统一写入磁盘,相当于将单次写磁盘改为批次写磁盘,减少和磁盘打交道的次数
2、pagecache中的批次数据会根据物理地址进行排序,排序之后写入磁盘可以减少磁头寻址的时间
3、如果网络够好,生产者写入速率 = 消费者消费速率,此时数据刚刚写入page缓存中,消费者就直接从缓存中拉取数据消费了,不用经过磁盘
4、pagecache不属于JVM管理,不会增加GC的负担
1)顺序写磁盘
Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
写数据:
2)零复制技术
内存分为两块: 内核区、用户区
从磁盘读取数据发送网络正常流程:
1、通过io流读取磁盘数据,将数据读到内核区的pagecache中
2、将内核区的pagecache中的数据拷贝到用户区,在用户区中对数据做处理
3、将处理完成的数据通过io流发送到内核区的socket缓存区中
4、将数据从socket缓存区发送到网卡
零拷贝流程:
1、通过io流读取磁盘数据,将数据读到内核区的pagecache中
2、将数据从pagecache缓存区发送到网卡