1.工作流程
2.持久化
Kafka消息格式
目前Kafka的消息格式有三种V0、V1、V2。(详情可参考:https://honeypps.com/mq/kafka-log-format-evolution)
- Offset,message在partition内的偏移量
- length,消息长度
- CRC32,校验字段,校验消息的完整性
- Magic,标志位
- attribute,可选字段,消息的属性
- Timestamp,时间戳
- key length,key的长度
- key,key
- Value Length,值的长度
- Value,值
文件存储结构
Kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。 topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是producer生产的数据。_Producer生产的数据会被不断追加到该log文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。
由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。每个segment对应两个文件“.index”文件和“.log”文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。index和log文件以当前segment的第一条消息的offset命名。log文件大小可以通过修改配置参数(log.segment.bytes=1073741824_)进行改变,默认大小1G。
相关配置项如下:
配置项 | 默认值 | 说明 |
---|---|---|
log.index.interval.bytes | 4096(4k) | 增加索引项字节间隔密度,会影响索引文件中的区间密度和查询效率 |
log.segment.bytes | 1073741824(1G) | 日志文件大小最大值 |
log.roll.ms | NULL | 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值允许的最大范围,毫秒维度。如果未设置则表示采用log.roll.hours |
log.roll.hours | 168(7天) | 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值允许的最大范围,小时维度 |
log.index.size.max.bytes | 10485760(10M) | 触发偏移量索引文件或时间戳索引文件分段字节阈值 |
3.生产者
生产分区策略
应用场景
- 全局有序
- 不要求保持顺序
- 部分有序(例如:不同业务线的消息需保持有序)
在Kafka中,消息是以Topic为单位进行业务处理的,一个Topic会被分成若干个区,每个分区又有自己的主从体系。Topic下的一条消息只会存在一个区里面,而不会在多个区中保存多份。那么为什么要分区呢,首先考虑下,如果没有分区的话,一个机器上只有一个Topic去处理海量的消息,这样系统的吞吐量会大大降低,没有办法去进行水平扩展。分区的作用就是为了在水平层面上进行kafka对消息处理能力的扩展。
(1)轮询策略
Kafka当前默认的负载均衡策略,适合场景的话就是比较适合我们所有的机器配置都一样的情况,这样消息会被均匀的存放到Topic的不同分区上。
(2)随机策略
随机策略就是消息将会随机存放到Topic下的某个分区中,它是Kafka老的版本的默认的分区负载均衡策略,也是为了尽量将消息均匀分配到不同的分区中。但是均匀这方面来看不如轮询策略,所以现在新版本的默认是轮询负载均衡策略。
(3)Hash分区
这种策略依靠一定的策略生成的一个key,然后按照key的哈希值选择分区。比如按照业务、机器ip等方式去生成的,一旦key被定义了,那么所有该类key的消息都会被存放到相同的分区里。kafka新版本中,如果指定了消息的key的话,就会使用这种策略,如果没指定的话就会使用轮询策略。这种负载均衡方式可以实现局部业务的顺序消费,比如我们有3个业务的消息需要顺序推送,针对这3个业务线设置产生key策略,不同业务的key放到不同的分区上,相同的key在一个分区内是绝对顺序的。这样既保证了消息的顺序性,也利用了Kafka的高吞吐量的特性。(注意:Kafka不保证topic下的消息顺序,会保证分区的消息顺序,是用追加文件的日志方式记录的消息)
- 总结
如果Kafka消息生产者指定的分区,那么消息将直接分配到固定的分区。如果没指定分区,指定了key,就用key的哈希值计算到对应的分区将消息分配过去,如果都没指定,就使用轮询策略负载均衡将消息均匀的分配到不同的分区中。
数据可靠性保证
为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。
(1)副本数据同步策略
方案 | 优点 | 缺点 |
---|---|---|
半数以上完成同步,就发送ack | 延迟低 | 选举新的leader时,容忍n台节点的故障,需要2n+1个副本 |
全部完成同步,才发送ack | 选举新的leader时,容忍n台节点的故障,需要n+1个副本 | 延迟高 |
(2)ISR
Kafka选择了第二种方案,然而当出现以下情景:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步,那leader就要一直等下去,直到它完成同步,才能发送ack。为此,Leader维护了一个动态的in-sync replica set (ISR),意为和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给follower发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定(默认10秒钟)。leader发生故障之后,就会从ISR中选举新的leader。
(3)ACK
ack作用是确认收到消息,一条是producer发送消息到leader收到消息之后发送ack,另一条是leader和follower之间同步完成数据会发送ack。对于一些不太重要的数据,对数据的可靠性要求不是特别高的情况下,能够容忍少量的数据丢失,因此没有必要等待ISR中所有follower全部要接收成功,所以Kafka为用户提供了三种可靠级别设置,可以根据不同需求来修改选择。
acks | 描述 | 缺点 |
---|---|---|
0 | producer不需要确认消息,直接发送消息给leader,有没有收到消息,producer不管 | 当broker故障,会丢失数据 |
1 | producer发送消息,确保leader写入成功,partition的leader落盘成功后返回ack,followers不管 | 如果follower同步数据之前leader故障,无法获取原来leader的数据,此时会丢失数据 |
-1/all | producer等待broker的ackpartition的leader和follower全部落盘成功后,才会返回ack | 如果follower同步完成后,leader还没发送ack给Producer的时候,出现了故障,这时候会重新选出一个leader,Producer因为没有收到ack,就会重新发送给新的leader,造成数据重复。极限情况下,ISR中只有leader时,将退化为acks=1的情形,而丢失数据 |
案例:
1. request.required.acks=1
producer发送数据到leader,leader写本地日志成功,返回客户端成功;此时ISR中的副本还没有来得及拉取该消息,leader就宕机了,那么此次发送的消息就会丢失。
2. request.required.acks=-1
同步(Kafka默认为同步,即producer.type=sync)的发送模式,replication.factor>=2且min.insync.replicas>=2的情况下,不会丢失数据。
有两种典型情况。acks=-1的情况下,数据发送到leader, ISR的follower全部完成数据同步后,leader此时挂掉,那么会选举出新的leader,数据不会丢失。
acks=-1的情况下,数据发送到leader后 ,部分ISR的副本同步,leader此时挂掉。比如follower1和follower2都有可能变成新的leader, producer端会得到返回异常,producer端会重新发送数据,数据可能会重复。
当然上图中如果在leader crash的时候,follower2还没有同步到任何数据,而且follower2被选举为新的leader的话,这样消息就不会重复。
(4)故障修复
存在这样两种故障,offset会需要维护:
- follower故障后恢复。
follower故障,会被踢出ISR,恢复后,会读取本地磁盘,恢复故障前的HW之前的数据,然后开始向leader进行同步,一直同步到整个partition的HW,此时消息可认为是追上了leader,就可以重新加入ISR了; - leader故障后重新选出leader。
解决:比如leader故障,现在从ISRs中选第二个follower为leader;那么除了新leader之外的所有follower都将数据截取到HW:这样除了leader外,所有的offset都为HW;然后follower向leader同步数据;
注意:这只能保证副本之间的数据一致性(存储一致性、消费一致性),并不能保证数据不丢失或者不重复。 (ACK保证不丢失和不重复)
4.消费者
消费语义
- 至少一次语义(At least once semantics):如果生产者收到了Kafka broker的确认(acknowledgement,ack),并且生产者的acks配置项设置为all(或-1),这就意味着消息已经被精确一次写入Kafka topic了。然而,如果生产者接收ack超时或者收到了错误,它就会认为消息没有写入Kafka topic而尝试重新发送消息。如果broker恰好在消息已经成功写入Kafka topic后,发送ack前,出了故障,生产者的重试机制就会导致这条消息被写入Kafka两次,从而导致同样的消息会被消费者消费不止一次。
- 至多一次语义(At most once semantics):如果生产者在ack超时或者返回错误的时候不重试发送消息,那么消息有可能最终并没有写入Kafka topic中,因此也就不会被消费者消费到。但是为了避免重复处理的可能性,我们接受有些消息可能被遗漏处理。
- 精确一次语义(Exactly once semantics):即使生产者重试发送消息,也只会让消息被发送给消费者一次。精确一次语义是最令人满意的保证,但也是最难理解的。因为它需要消息系统本身和生产消息的应用程序还有消费消息的应用程序一起合作。比如,在成功消费一条消息后,你又把消费的offset重置到之前的某个offset位置,那么你将收到从那个offset到最新的offset之间的所有消息。这解释了为什么消息系统和客户端程序必须合作来保证精确一次语义。
在0.11版本以前的Kafka,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。 0.11版本的Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指Producer不论向Server发送多少次重复数据,Server端都只会持久化一条。幂等性结合At Least Once语义,就构成了Kafka的Exactly Once语义。即:
要启用幂等性,只需要将Producer的参数中enable.idompotence设置为true即可(ack=-1将被自动配置)。Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带Sequence Number。而Broker端会对
消费方式
consumer采用pull(拉)模式从broker中读取数据。pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。
消费分区策略
(1)轮询分区策略(RoundRobinAssignor)
大概逻辑如下:拿到组内所有Consumer 订阅的 TopicPartition,按照顺序挨个分发给 Consumer,此时如果和当前Consumer 没有订阅关系,则寻找下一个Consumer。
(2)范围分区策略(RangeAssignor)
默认分配策略,RangeAssignor是按照Topic的维度进行分配的,也就是说按照Topic对应的每个分区平均的按照范围区段分配给Consumer实例。这种分配方案是按照Topic的维度去分发分区的,此时可能会造成先分配分区的Consumer实例的任务过重。
大概逻辑如下:首先,将分区按数字顺序排行序,消费者按名称的字典序排序。然后,用分区总数除以消费者总数。如果能够除尽,平均分配;若除不尽,则位于排序前面的消费者将多负责一个分区。
(3)粘性分区策略(StickyAssignor)
Kafka从0.11 版本开始引入,可以通过 partition.assignment.strategy参数去设置,其主要实现了两个功能:
1、主题分区的分配要尽可能的均匀;
2、当Rebalance 发生时,尽可能保持上一次的分配方案。
消费者脱离了消费组的情形下,消费组就会执行再平衡操作(Rebalance)。
ZK应用
(1)消费offset维护
(2)控制器选举
(3)Leader选举
(4)Broker注、Topic、消费者注册。
(5)生产者、消费者负载均衡。
5.高效读写
- 并发读写。
- 顺序写磁盘。
Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械结构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
- 零拷贝。
6.事务
Kafka从0.11版本开始引入了事务支持。事务可以保证Kafka在Exactly Once语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。
(1)Producer事务
为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID。 为了管理Transaction,Kafka引入了一个新的组件Transaction Coordinator。Producer就是通过和Transaction Coordinator交互获得Transaction ID对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka的一个内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
(2)Consumer事务
上述事务机制主要是从Producer方面考虑,对于Consumer而言,事务的保证就会相对较弱,尤其时无法保证Commit的信息被精确消费。这是由于Consumer可以通过offset访问任意信息,而且不同的Segment File生命周期不同,同一事务的消息可能会出现重启后被删除的情况。
参考
【kafka数据可靠性深度解读】https://blog.csdn.net/u013256816/article/details/71091774
【KafkaConsumer分区分配策略】http://moguhu.com/article/detail?articleId=142
【一文看懂Kafka消息格式的演变】https://blog.csdn.net/u013256816/article/details/80300225
【Kafka系列】https://blog.csdn.net/lemonZhaoTao/article/details/84455588
【Kafka精粹】https://blog.csdn.net/shangboerds/article/details/39033267