Kafka组成
- broker: Kafka 服务器,负责消息存储和转发【接受producer的消息。broker集群中都有一个broker同时充当了集群控制器功能】
- topic:消息类别, Kafka 按照 topic 来分类消息
- partition: topic 的分区,一个 topic 可以包含多个 partition, topic 消息保存在各个partition 上 【实现Kafka的伸缩性,分区会创建在不同的服务器上】
- offset:消息在日志中的位置,可以理解是消息在 partition 上的偏移量,也是代表该消息的唯一序号
- Producer:消息生产者
- Consumer:消息消费者
- Consumer Group:消费者分组,每个 Consumer 必须属于一个 group
- Zookeeper:保存着集群 broker、 topic、 partition 等 meta 数据;另外,还负责 broker 故障发现, partition leader 选举,负载均衡等功能
Message[消息]:The body of each message sent. 批次:为提高效率,消息会分批次写入Kafka。
Rebalance[重平衡,高可用]:某个消费者挂了,其它消费者自动分配订阅主题分区。
spring.kafka.listener.concurrency={KAFKA_CONCURRENCY:4} 【设置每个@KafkaListener的并发监听线程个数】
spring.kafka.consumer.group-id=msaccounts_cgrp 【唯一标识consumer进程所在组】
spring.kafka.consumer.auto-offset-reset=latest 【没有初始化offset时。 当各分区下有已提交的offset时,从提交的offset开始消费。无提交的offset时,消费新产生的该分区数据】
spring.kafka.consumer.auto-commit-interval=1000 【consumer自动向zk提交offset的频率】
• 高吞吐、低延迟
• 高伸缩性
• 持久性、可靠性
• 容错性
• 高并发
partition 的数据文件(offffset, MessageSize, data)
- offffset 表示 Message 在这个 partition 中的偏移量。offffset 不是该 Message 在 partition 数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了 partition 中的一条 Message,可以认为 offffset 是partition 中 Message 的 id;
- MessageSize 表示消息内容 data 的大小;
-
数据文件分段 segment(顺序读写、分段命令、二分查找)
Kafka 为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展为.index。 index 文件中并没有为数据文件中的每条 Message 建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。
segment file
每个partition下有主要需要关注三类文件:
以log为后缀的数据文件(存储消息,当log文件触发分段条件后后自动滚动生成新的文件)
- 以index为后缀的索引文件(存储其对应的log文件中消息的物理偏移地址。)
- 以timestamp为后缀的索引文件(时间戳索引文件)
每一个index文件和一个log文件一一对应,这一对文件称为segment file,也就是一个数据段。
Kafka 为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。 index 文件中并没有为数据文件中的每条 Message 建立索引, 而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。
kafka查找机制
kafka查找数据主要分两部分
- 查找到对应的log文件
- 在对应的log文件中找到目标数据
每个log文件的名最后几位数字为该log文件存储的第一条消息对应的全局消息。
如:00000000000000368769.log存储的是第368769条消息对应的数据。
再结合index文件和log文件字段的对应关系就可以得到以下步骤:
- 根据想要查找的消息条数,用二分查找算法确定目标消息在哪个log文件中
去该log文件对应的index文件中,使用顺序查找算法查找到该目标消息在log文件中的起始物理地址
- 因为index文件采取稀疏存储,所以存在无法直接命中索引的情况,此时先确定距离此索引最近的索引所在物理地址,然后顺序查找即可
示例: 设有一个消费者消费到了368776(offset值为368776),现在要消费下一条数据。 所有log文件中进行二分查找,确认该文件在00000000000000368769.log中(368776<368769) 在00000000000000368769.log对应的index文件中,查找第七条消息(368769-368776 = 7,7为该index文件中的相对偏移量) 在index文件中使用顺序查找,找到第6条数据后,向下继续查找,发现是第8条数据。此时8>7还没有找到,回到上一条数据消息(第6条消息索引处),找到其对应的物理偏移量1407,然后从1407处向下顺序查找,知道找到全区偏移量为368776的消息的下一条消息。
Kafka 的消息队列
- 点对点模式 (一对一)
- 发布订阅模式 (一对多)
核心API
• Producer API 程序向一个或多个topics上发送消息记录。
• Consumer API 程序可订阅一个或多个topics。
• Streams API 程序作为流处理器。从topics中消费输入流生成输出流。
• Connector API 将topic链接到现有应用的生产者、消费者。
重要配置
Broker
• Broker.id 唯一标识
• Port 使用1024以下的端口。需要用root启动kafka
• Zookeeper.connect 用于保存broker元数据的zk地址。(可以有多个)
• Log.dirs 所有的消息都保存在磁盘上。
• Num.recovery.threads.per.data.dir 可配置的线程池处理日志片段。
1 服务器正常启动,用于打开每个分区的日志片段。
2 服务器崩溃重启,检查每个分区的日志片段。
3 服务器正常关闭,用于关闭日志片段。
所配置的数字对应的是log.dirs指定的单个日志目录。如果它设置为8,Log.dirs为3,则需要3*8=24个线程。
• Auto.create.topics.enable 建议false,不允许自动创建topic。
Topic
• Num.partintions 指定新创建的主题需要包含多少个分区。
• Defaul.replication.factor Kafka保存消息的副本数。
• Log.retention.ms 数据可以保留多久。
• Log.retention.bytes 每个分区数据最多保存的数据量。
• Log.segment.bytes 指定日志片段大小。这个参数越小,越会频繁的关闭和分配新文件。降低磁盘写入效率
• Log.segment.ms 日志片段到达时间上限被关闭。
• Message.max.bytes 限制单个消息的大小。【指的是压缩后的大小】
• Retention 该topic消息被保存的时长。
• Retention.bytes 要为该topic预留多少空间。
Kafka快速的原因
• 顺序读写。【避免随机磁盘寻址的浪费】
• 零拷贝。【避免内核之间的切换】
• 消息压缩
• 分批发送。【将数据记录分批发送】
磁盘中的程序都需要加载到内存才能运行,因为负责解析和运行程序内容的CPU需要通过程序计数器来指定内存地址从而读取程序指令。
虚拟内存被分割成多个物理碎片,虚拟内存需要与内存中的空间进行置换【分页式/分段式】,然后运行程序。
windows采用分页式,一页大小4KB。需要把应用程序按4KB的页划分,以页为单位放入磁盘中。
DLL(动态链接库文件)通过共有一个DLL文件节约内存。
划分空间方式-》扇区。磁盘表面分为若干个同心圆的空间(磁道),磁道按固定大小的存储空间划分而成的就是扇区。
Kafka高性能核心pageCache零拷贝zeroCopy原理解析
Kafka 调用了Java NIO库里的transferTo方法,Nginx 的sendfile on也是零拷贝。
零拷贝基于PageCache
文件传输过程,第一步都是把磁盘文件数据copy到【内核缓冲区】,也就是PageCahe。PageCahe会缓存最近访问的数据。磁盘读取数据时,优先从PageCahe中找
缓存最近被访问的数据。
预读功能。
传输大文件(GB)级别时,PageCahe不会起作用。白白浪费DMA多做的数据拷贝,使用了PageCahe的零拷贝也会损失性能。所以大文件不会用零拷贝与PageCahe。
Kafka producer
ProducerRecord 代表了一组kafka需要发送的kv对。由topic name,可选的partition Number 和kv对组成。
- 序列化。首先需要序列化kv对,这样才能在网络上传输。
- 消息到达分区器。未指定分区的话,使用key的hash映射指定一个分区。
- 消息被存放在一个记录批次中,由一个独立线程负责把他们发到kafka broker上。
- kafka broker收到消息的时会返回一个响应,写入成功会返回RecordMetaData 对象,它包含主题和分区信息,以及记录在分区里的偏移量,时间戳。重复错误的话会返回错误消息。
Kafka消息发送
• 简单消息发送。Send 消息先被写入分区中的缓冲区中,然后分批次发送给Kafka Broker。
• 同步发送消息。Send+Get 如果服务器返回错误【一类是重试错误(链接错误,无主错误),另一类是非重试错误(消息过大)】,Get方法会抛出异常。
• 异步发送消息。异步发送同样能对异常情况进行处理,实现Callback接口就行。
生产者分区机制(Partition)
Kafka对数据的读写是以分区为粒度的。通过分区部署在多个Broker来实现负载均衡效果。
Kafka分区轮询机制
- 顺序轮询(默认)
- 随机轮询
- 按照key进行消息保存
- 自定义分区策略。
生产者压缩机制 - 用CPU时间去换磁盘空间,I/O传输量
Kafka的消息分为两层:消息集合【kafka不会去操作具体消息而是总在这个层面上进行写入操作】和消息。
一个消息集合包含若干日志项,日志项是真正封装消息的地方。
压缩发生:kafka producer and consumer,采用何种压缩算法和kv一起发送过去的。
Kafka Consumer
消费者群组中的消费者订阅的都是相同的主题。(每个分区所产生的消息能够被每个消费者群组中的消费者消费,最多1v1)【点对点】
创建主题时多创建些分区,这样我们可以水平扩展消费者提高消费能力。
Kafka重要特性:只需写入一次消息。不同消费组也能读取到全量消息【发布-订阅】!
Consumer rebalance【高可用 + 伸缩性】:把分区所有权通过一个消费者转移到其它消费者
rebalance期间消费者无法读取消息。【STW】
消费者通过向broker发送心跳来维护自己是消费者组的一员并确认拥有的分区。
创建消费者
- 首先创建KafkaConsumer,在properties中存放bootstrap.server(broker address)、key.deserializer、value.deserializer
- 订阅主题。参数传入一个正则表达式。
- 定时轮询。Kafka Consumer采用轮询的方式去Broker制表符进行数据的检索。
提交 + 偏移量
消费者每次调用poll() 方法进行定时轮询,会返回生产者写入Kafka但未被消费者消费的记录(偏移量)。
消费者会向 _consumer_offset 的特殊主题发送消息,它会保存每次所发送消息的分区偏移量offset,待rebalance时使用。
- 自动提交偏移量。
- 提交当前偏移量。commitSync
- 异步提交偏移量。(commitAsync不会重试)
- 同步+异步组合提交。消费者关闭之前会使用 commitSync + commitAsync 提交偏移量
- 提交特定的偏移量。
Kafka线上问题
如何保证消息顺序?
按照一定的写规则(确保分发均匀)写到同一个 partition ,不同的消费者读不同的 partition 的消息。
顺序消息有一条失败了咋办?
例如”下单 - 支付 - 完成” 有一条消息失败。需要加 失败重试机制。(考虑同步+异步?)
使用 异步重试机制 不影响消费者消费速度。处理消息时,在重试表中判断有无数据,有就直接保存。否则失败时保存。(多条消息根据一个订单号判断)Elastic-job失败重试机制
消息积压怎么搞? 加服务器节点
从producer发送消息到broker需要一次网络IO,broker写数据到磁盘需要一次磁盘IO。
consumer从broker获取消息先经过一次磁盘IO,再进过一次网络IO。
【优化消息体大小】只保留关键信息。
消息已存储到partition 的话用【线程池处理消息】
幂等性怎么处理
由于kafka消费消息支持三种形式,默认at least once,所以保存时使用了 INSERT INO … ON DUPLICATE KEY UPDATE 存在时更新
预发布环境消费错数据了
生产环境消费者重置offset,重新读取丢失消息。
时间轮算法
kafka 中有一些定时任务DelayedOperation,它的添加、轮转、消亡通过时间轮算法实现。(Netty也有时间轮方式)
进入时间轮
- 根据延迟时间计算对应的时间轮“层次”(如钟表的“小时级”还是“分钟级”还是“秒级”,实际上是一个不断“升级”的过程,直到找到合适的“层次”)
- 计算在该轮中的位置,并插入该位置(每个bucket是一个双向链表,可能包含多个延迟任务)
- 若该bucket是首次插入,需要将该bucket加入DelayQueue中(DelayQueue引入是为了解决“空推进”)
降级 当时间“推进”到某个bucket时,说明bucket中的任务在当前时间轮中的时间已经走完,需要降级,进入更小粒度的时间轮中。
时间的推进
时间轮中大部分bucket是空的,指针的推进没有任何作用。为了减少空推进,DelayQueue引入。每当有bucket到期,即queue.poll能拿到结果,才进行时间的推进。
为什么要用时间轮?
比起用DelayQueue、ScheduledThreadPoolExecutor。它优势在于时间复杂度上。
时间轮结构+双向列表bucket,使得插入操作只需要O(1)的复杂度
Bucket让多个任务合并,同一个bucket只需要在DelayQueue入队一次,减少了DelayQueue中元素的数量与堆深度,插入弹出开销变小。