Kafka笔记
Kafka 的基本术语
注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper
集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止,
Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程了。
消息:Kafka 中的数据单元被称为消息,也被称为记录,可以把它看作数据库表中某一行的记录。
批次:
为了提高效率, 消息会分批次写入 Kafka,批次就代指的是一组消息。
主题:
消息的种类称为 主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。主题就像是数据库中的表。
分区:
主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序
生产者:
向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。
消费者:
订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。
消费者群组:
生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,消费者群组(Consumer Group)指的就是由一个或多个消费者组成的群体。
一个消费者只能对应一个分区,broker 否则里边假如5个数据,两个消费者会混乱,分区就无序了
消费者初始化
偏移量:
偏移量(Consumer Offset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。
分区器:发送到哪一个分区,默认50个分区,
内存缓冲区实现批量发送 基于内存,大小32兆,每一批队列大小16k,每一个分区一个队列。
。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据 传输延迟增加
每批次满了之后才拉数据,sender线程,或者到延迟时间了,默认为0,效率低。
如果对方无应答大于5个请求,sender线程就不发了,
允许最多没有返回 ack 的次数,默认为 5,开启幂等性
要保证该值是 1-5 的数字。
失败就重试,默认int最大值。
异步,直接到了缓存中获得分区器给的区号就返回,而同步是知道发送到broker确认之后才返回(阻塞)。
收回ack确认才删除缓存,否则不删,重试。
分区器实现负载均衡。
数据如何不丢
Ack = -1 分区副本数 >=2 ISR最小副本数 >=2 保证不丢失
幂等性(数据不重复)
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。所以消费者只会消费一条。
幂等性对有序性的影响
Prodcuer开启幂等性会保证消息在broker中的Topic-partition的有序性,因为幂等性是基于sequence number的。如果没有开启幂等性,由于失败重试,可能会导致消息的乱序。
重复数据的判断标准:具有
所以幂等性只能保证的是在单分区单会话内不重复。
解决:事务+幂等性(保证数据不重复)
有序性
当消息发送出现错误的时候,系统会重发消息。retries
表示重试次数。默认是 int 最大值,2147483647。
如果设置了重试,还想保证消息的有序性,需要设置
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1
否则在重试此失败消息的时候,其他的消息可能发送
成功了。
开启幂等,如果保证单分区有序,
每个broker请求确认应答队列为缓存为5个请求,因此幂等性设置缓存为1-5;
SeqNumber>保持自增,如果是自增消息才落磁盘,否则在内存里边耗着,如果设置为6,那自增键只有1 - 5,最后一个只能下波,不保证有序自增。
broker:
一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。
broker 集群:
broker 是集群 的组成部分,broker 集群由一个或多个 broker 组成,每个集群都有一个 broker 同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。
副本:只是备份,消费的一直是领导者副本,除非挂掉
Kafka 中消息的备份又叫做 副本(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica),前者对外提供服务,后者只是被动跟随。
重平衡:
Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。
消费者重平衡
我们从上面的消费者演变图中可以知道这么一个过程:最初是一个消费者订阅一个主题并消费其全部分区的消息,后来有一个消费者加入群组,随后又有更多的消费者加入群组,而新加入的消费者实例分摊了最初消费者的部分消息,这种把分区的所有权通过一个消费者转到其他消费者的行为称为重平衡,英文名也叫做 Rebalance 。
数据默认保存7天
Kafka存储日志
Kafka 的特性(设计原则)
高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。
高伸缩性: 每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。
持久性、可靠性: Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储。
容错性: 允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作
高并发: 支持数千个客户端同时读写
RocketMQ,与kafka存储区别
当broker里面的topic的partition数量过多时,kafka的性能却不如rocketMq。
kafka和rocketMq都使用文件存储,但是kafka是一个分区一个文件,当topic过多,分区的总量也会增加,kafka中存在过多的文件,当对消息刷盘时,就会出现文件竞争磁盘,出现性能的下降。一个partition(分区)一个文件,顺序读写。一个分区只能被一个消费组中的一个 消费线程进行消费,因此可以同时消费的消费端也比较少。
rocketMq所有的队列都存储在一个文件中,每个队列的存储的消息量也比较小,因此topic的增加对rocketMq的性能的影响较小。rocketMq可以存在的topic比较多,可以适应比较复杂的业务
RockeMq所谓批量发送消息(手动)
- 批量消息要求必要具有同一topic、相同消息配置
- 不支持延时消息
- 建议一个批量消息最好不要超过1MB大小
- 如果不确定是否超过限制,可以手动计算大小分批发送消息模式
Kafka 的消息队列一般分为两种模式:点对点模式和发布订阅模式
1.Kafka 是支持消费者群组的,也就是说 Kafka 中会有一个或者多个消费者,如果一个生产者生产的消息由一个消费者进行消费的话,那么这种模式就是点对点模式
2.点对点模式的消息队列
如果一个生产者或者多个生产者产生的消息能够被多个消费者同时消费的情况,这样的消息队列成为发布订阅模式的消息队列Kafka 为何如此之快
Kafka 实现了零拷贝原理来快速移动数据,避免了内核之间的切换。Kafka 可以将数据记录分批发送,从生产者到文件系统(Kafka 主题日志)到消费者,可以端到端的查看这些批次的数据。
批处理能够进行更有效的数据压缩并减少 I/O 延迟,Kafka 采取顺序写入磁盘的方式,避免了随机磁盘寻址的浪费,更多关于磁盘寻址的了解,请参阅 程序员需要了解的硬核知识之磁盘 。
顺序读写
零拷贝
消息压缩(默认不压缩)
在对大数据处理上,瓶颈往往体现在网络上而不是CPU(压缩和解压会耗掉部分CPU资源)。
分批发送
消息堆积(场景)
我们可以通过jstack打印出线程的堆栈信息(可连续打印多次观察变化)。重点搜索
如果发现大量的消费线程处于WAITING(parking)状态,说明消费线程在等待待消费的消息。如果仍然存在消息堆积,则极有可能是拉取能力不足,重点应该加强rocketmq拉取消息的能力。
Kafka分区只能增加不能减少,利用临时队列,防止处理完之后浪费资源
消息中间件的主要功能是异步解耦,还有个重要功能是挡住前端的数据洪峰,保证后端系统的稳定性,这就要求消息中间件具有一定的消息堆积能力,消息堆积分以下两种情况:
消息堆积在内存Buffer,一旦超过内存Buffer,可以根据一定的丢弃策略来丢弃消息,如CORBA Notification规范中描述。适合能容忍丢弃消息的业务,这种情况消息的堆积能力主要在于内存Buffer大小,而且消息堆积后,性能下降不会太大,因为内存中数据多少对于对外提供的访问能力影响有限。
消息堆积到持久化存储系统中,例如DB,KV存储,文件记录形式。 当消息不能在内存Cache命中时,要不可避免的访问磁盘,会产生大量读IO,读IO的吞吐量直接决定了消息堆积后的访问能力
解决
线上故障了,怎么处理
消息堆积了10小时,有几千万条消息待处理,现在怎么办?
修复consumer, 然后慢慢消费?也需要几小时才可以消费完成,新的消息怎么办
核心思想:紧急临时扩容,更快的速度去消费数据
- 修复Consumer不消费问题,使其恢复正常消费,根据业务需要看是否要暂停
- 临时topic队列扩容,并提高消费者能力,但是如果增加Consumer数量,但是堆积的topic里面的message queue数量固定,过多的consumer不能分配到message queue
编写临时处理分发程序,从旧topic快速读取到临时新topic中,新topic的queue数量扩容多倍,然后再启动更多consumer进行在临时新的topic里消费
直到堆积的消息处理完成,再还原到正常的机器数量,删除这个临时队列。
Kafka 的使用场景
活动跟踪:Kafka 可以用来跟踪用户行为,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到 Kafka ,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给 Kafka ,这样就可以生成报告,可以做智能推荐,购买喜好等。
传递消息:Kafka 另外一个基本用途是传递消息,应用程序向用户发送通知就是通过传递消息来实现的,这些应用组件可以生成消息,而不需要关心消息的格式,也不需要关心消息是如何发送的。
度量指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
日志记录:Kafka 的基本概念来源于提交日志,比如我们可以把数据库的更新发送到 Kafka 上,用来记录数据库的更新时间,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
流式处理:流式处理是有一个能够提供多种应用程序的领域。
限流削峰:Kafka 多用于互联网领域某一时刻请求特别多的情况下,可以把请求写入Kafka 中,避免直接请求后端程序导致服务崩溃生产者分区策略(针对一个topic分区)
指定分区号就发该分区,指定key就取模。
采用黏性分区分配(随机满了随机)
1.Key取模
在实际使用中,我们一般不会指定消息发送的具体partition,最多只会传入key值,类似下面这种方式:
producer.send(new ProducerRecord2、自定义负载策略
我们也可以通过实现Partitioner接口,自定义分发策略,看下具体实现
自定义实现Partitioner接口 传入数字区号3.黏性分区
消费者分区策略(针对topic而言)
1 默认Range
2.RoundRobin
2 Sticky粘性分区
假如7个消费者 7/3 = 2 分成3,3,2 三个尽量平均的组,组里区号随意
如果某分区挂了,45秒之后就按照分区策略再平衡。
消费者提交
1.自动提交(默认)
3 手动提交
1)同步提交 offset
由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提
交的效率比较低 吞吐量会受到很大的影响。其会阻塞当前线程,直到提交成功
2)异步提交
没有重试,不会阻塞,回调通知是否成功
-
5.5.6 漏消费和重复消费
重复消费:已经消费了数据,但是 offset 没提交。
自动提交出现
漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。 手动消费出现
解决:
消费者事务,确保精准一次消费成功消息堆积解决
检测出
创建一个临时topic
因为分区数只能增加不能减小
同时增加分区数和消费者数量
之后删除临时队列
比如下游线上故障处理完之后肯定会有大量消息堆积查看监控
- 临时topic队列扩容,并提高消费者能力,但是如果增加Consumer数量,但是堆积的topic里面的message queue数量固定,过多的consumer不能分配到message queue
编写临时处理分发程序,从旧topic快速读取到临时新topic中,新topic的queue数量扩容多倍,然后再启动更多consumer进行在临时新的topic里消费
直到堆积的消息处理完成,再还原到正常的机器数量,删除这个临时队列。
分区数为何只能增加不能减小(报异常)
修改配置文件可以支持减小,但是报异常
为什么不支持减少分区?
按照Kafka现有的代码逻辑而言,此功能完全可以实现,不过也会使得代码的复杂度急剧增大。实现此功能需要考虑的因素很多,比如删除掉的分区中的消息该作何处理?如果随着分区一起消失则消息的可靠性得不到保障;如果需要保留则又需要考虑如何保留。直接存储到现有分区的尾部,消息的时间戳就不会递增
如果分区broker挂掉都可以重平衡,但是如果这个则不会重平衡。而是报异常
50个分区
kafka内部自己创建了_consumer_offsets主题包含了50个分区。
这个主题用来存放消费者消费某个主题的偏移量。
定期将自己消费分区的offset提交给kafka内部topic:__consumer_offsets,提交过去的时候,key是consumerGroupld-topict分区号,value就是当前offset的值,kafka会定期清理topic里的消息,最后就保留最新的那条数据
当一个消费者挂掉后,该消费者消费的信息会存放到__consumer_offsets-分区号中,在另一个同样的消费者组中的消费者会再去消费这个生产者生产的消息,此时便会从这个分区中获取挂掉的消费者消费的位置
因为_consumer_offsets可能会接收高并发的请求,kafka默认给其分配50个分区(可以通过offsets.topic.num.partitions设置),这样可以通过加机器的方式抗大并发。
通过如下公式可以选出consumer消费的offset要提交到_consumer_offsets的哪个分区公式: hash(consumerGroupld) % _consumer_offsets主题的分区数
重平衡的前两个触发条件
Kafka与RocketMq对比
Broker差异
主从差异:
kafka的master/slave是基于partition维度的,而rocketmq是基于broker维度的;kafka的master/slave是可以切换的,而rocketmq不行,当rocketmq的master宕机时,读能被路由到slave上,但写会被路由到此topic的其他broker上。
刷盘:
rocketmq支持同步刷盘,也就是每次消息都等刷入磁盘后再返回,保证消息不丢失,但对吞吐量稍有影响。一般在主从结构下,选择异步双写策略是比较可靠的选择。
消息查询:
rocketmq支持消息查询,除了queue的offset外,还支持自定义key。rocketmq对offset和key都做了索引,均是独立的索引文件。
消费失败重试与延迟消费:
rocketmq针对每个topic都定义了延迟队列,当消息消费失败时,会发回给broker存入延迟队列中,每个消费者在启动时默认订阅延迟队列,这样消费失败的消息在一段时候后又能够重新消费。延迟时间适合延迟级别一一对应的,延迟时间是随失败次数逐渐增加的,最后一次间隔2小时。当然发送消息是也可以指定延迟级别,这样就能主动设置延迟消费,在一些特定场景下还是有作用的。
数据写入:
kafka每个partition独占一个目录,每个partition均有各自的数据文件.log;而rocketmq是每个topic共享一个数据文件commitlog因为kafka的topic一般有多个partition,所以kafka的数据写入熟读比rocketmq高出一个量级。但超过一定数量的文件同时写入,会导致原先的顺序写转为随机写,性能急剧下降,所以kafka的分区数量是有限制的。
服务治理:
kafka用zookeeper来做服务发现和治理,broker和consumer都会向其注册自身信息,同时订阅相应的znode,这样当有broker或者consumer宕机时能立刻感知,做相应的调整;而rocketmq用自定义的nameServer做服务发现和治理,其实时性差点,比如如果broker宕机,producer和consumer不会实时感知到,需要等到下次更新broker集群时(最长30S)才能做相应调整,服务有个不可用的窗口期,但数据不会丢失,且能保证一致性。但是某个consumer宕机,broker会实时反馈给其他consumer,立即触发负载均衡,这样能一定程度上保证消息消费的实时性。
Producer差异
发送方式:
kafka默认使用异步发送的形式,有一个memory buffer暂存消息,同时会将多个消息整合成一个数据包发送,这样能提高吞吐量,但对消息的实效有些影响;rocketmq可选择使用同步或者异步发送。
发送响应:
kafka的发送ack支持三种设置:消息存进memory buffer就返回;等到leader收到消息返回,等到leader和isr的follower都收到消息返回,当然kafka都是异步刷盘。rocketmq都需要等broker的响应确认,有同步刷盘,异步刷盘,同步双写,异步双写等策略,相比于kafka多了一个同步刷盘。
Consumer差异
消息过滤:
rocketmq的queue和kafka的partition对应,但rocketmq的topic还能更加细分,可对消息加tag,同时订阅时也可指定特定的tag来对消息做更进一步的过滤。或者向服务器上传一段Java代码,可以对消息做任意形式的过滤,甚至可以做Message Body的过滤拆分。
有序消息:
rocketmq支持全局有序和局部有序,kafka也支持有序消息,但是如果某个broker宕机了,就不能在保证有序了。
因为kafka批量发送缓存应答数量为5个,其中一个失败了就可能造成分区内乱序,幂等性解决,
消费确认:
rocketmq仅支持手动确认,也就是消费完一条消息ack+1,会定期向broker同步消费进度,或者在下一次pull时附带上offset。kafka支持定时确认,拉取到消息自动确认和手动确认,offset存在zookeeper上。
消费并行度:
kafka的消费者默认是单线程的,一个Consumer可以订阅一个或者多个Partition,一个Partition同一时间只能被一个消费者消费,也就是有多少个Partition就最多有多少个线程同时消费。rocketmq消费者分有序消费模式和并发消费模式,有序模式下,一个消费者也只存在一个线程消费;并发模式下,每次拉取的消息按consumeMessageBatchMaxSize(默认1)拆分后分配给消费者线程池,消费者线程池min=20,max=64。也就是每个queue的并罚度在20-64之间,一个topic有多个queue就相乘。所以rocketmq的并发度比Kafka高出一个量级。
事务消息:
rocketmq指定一定程度上的事务消息,当前开源版本删除了事务消息回查功能,事务机制稍微变得没有这么可靠了,不过阿里云的rocketmq支持可靠的事务消息;kafka不支持分布式事务消息。
消费实时性:
kafka是通过短轮训形式拉取消息,消费实时性取决于轮训间隔;rocketmq是通过长连接形式拉取消息,当有新消息时会立即出发拉取,只要消费能力足够,实时性比价可靠
Kafka 中的长轮询
像 Kafka 在拉请求中有参数,可以使得消费者请求在 “长轮询” 中阻塞等待。 简单的说就是消费者去 Broker 拉消息,定义了一个超时时间,也就是说消费者去请求消息,如果有的 话马上返回消息,如果没有的话消费者等着直到超时,然后再次发起拉消息请求。并且 Broker 也得配合,如果消费者请求过来,有消息肯定马上返回,没有消息那就建立一个延迟操作,等条件满足了再返回。
原理图:
Kafka保证顺序性
选举机制
2.kafka副本机制
说完了分区,再来说说副本。先说说副本的基本内容,在kafka中,每个主题可以有多个分区,每个分区又可以有多个副本。这多个副本中,只有一个是leader,而其他的都是follower副本。仅有leader副本可以对外提供服务。
多个follower副本通常存放在和leader副本不同的broker中。通过这样的机制实现了高可用,当某台机器挂掉后,其他follower副本也能迅速”转正“,开始对外提供服务。
这里通过问题来整理这部分内容。
kafka的副本都有哪些作用?
在kafka中,实现副本的目的就是冗余备份,且仅仅是冗余备份,所有的读写请求都是由leader副本进行处理的。follower副本仅有一个功能,那就是从leader副本拉取消息,尽量让自己跟leader副本的内容一致。
说说follower副本为什么不对外提供服务?
这个问题本质上是对性能和一致性的取舍。试想一下,如果follower副本也对外提供服务那会怎么样呢?首先,性能是肯定会有所提升的。但同时,会出现一系列问题。类似数据库事务中的幻读,脏读。
比如你现在写入一条数据到kafka主题a,消费者b从主题a消费数据,却发现消费不到,因为消费者b去读取的那个分区副本中,最新消息还没写入。而这个时候,另一个消费者c却可以消费到最新那条数据,因为它消费了leader副本。
看吧,为了提高那么些性能而导致出现数据不一致问题,那显然是不值得的。
leader副本挂掉后,如何选举新副本?
如果你对zookeeper选举机制有所了解,就知道zookeeper每次leader节点挂掉时,都会通过内置id,来选举处理了最新事务的那个follower节点。
从结果上来说,kafka分区副本的选举也是类似的,都是选择最新的那个follower副本,但它是通过一个In-sync(ISR)副本集合实现。
kafka会将与leader副本保持同步的副本放到ISR副本集合中。当然,leader副本是一直存在于ISR副本集合中的,在某些特殊情况下,ISR副本中甚至只有leader一个副本。
当leader挂掉时,kakfa通过zookeeper感知到这一情况,在ISR副本中选取新的副本成为leader,对外提供服务。
但这样还有一个问题,前面提到过,有可能ISR副本集合中,只有leader,当leader副本挂掉后,ISR集合就为空,这时候怎么办呢?这时候如果设置unclean.leader.election.enable参数为true,那么kafka会在非同步,也就是不在ISR副本集合中的副本中,选取出副本成为leader,但这样意味这消息会丢失,这又是可用性和一致性的一个取舍了。
跟副本关系最大的,那自然就是acks机制,acks决定了生产者如何在性能与数据可靠之间做取舍。
配置acks的代码其实很简单,只需要在新建producer的时候多加一个配置:
val properties = new Properties()
……
props.put(“acks”, “0/1/-1”); //配置acks,有三个可选值
……其他配置
val producer = new KafkaProducerString, String
acks这个配置可以指定三个值,分别是0,1和-1。我们分别来说三者代表什么:
acks为0:这意味着producer发送数据后,不会等待broker确认,直接发送下一条数据,性能最快
acks为1:为1意味着producer发送数据后,需要等待leader副本确认接收后,才会发送下一条数据,性能中等
acks为-1:这个代表的是all,意味着发送的消息写入所有的ISR集合中的副本(注意不是全部副本)后,才会发送下一条数据,性能最慢,但可靠性最强
Follwer同步数据
首先,Follower 发送 FetchRequest 请求给 Leader。 接着,Leader 会读取底层日志文件中的消 息数据,再更新它内存中的 Follower 副本的 LEO 值,更新为 FetchRequest 请求中的 fetchOffset 值。 最后,尝试更新分区高水位值(HW )。Follower 接收到 FETCH 响应之后,会把 消息写入到底层日志,接着更新 LEO 和 HW 值。
Kafaka的复制机制不是完全的同步复制,也不是单纯的异步复制,事实上, 同步复制要求所有能工作的Follower副本都复制完,这条消息才会被确认为成功提交, 这种复制方式影响了性能。而在异步复制的情况下, follower副本异步地从leader副本中复制数据, 数据只要被leader副本写入就被认为已经成功提交。在这种情况下,如果follower副本都没有复制完而落后于leader副本, 如果突然leader副本宕机,则会造成数据丢失。Kafka正是使用这种ISR的方式有效的权衡了数据可靠性与性能之间的关系。
分区 Leader故障转移&选举策略
Kafka会选择一个 broker 作为 “controller”节点。 controller 节点负责 检测 brokers 级别故障,并负责在 broker 故障的情况下更改这个故障 Broker 中的 partition 的 leadership 。 这种方式可以批量的通知主从关系的变化,使得对于拥有大量partition 的broker ,选举过程的代价更低并且速度更快。 如果 controller 节点挂了,其他存活的 broker 都可能成为新的 controller 节点。
Kafka的选举策略大致分一下 几种情况
OfflinePartition Leader 选举:每当有分区上线时,就需要执行 Leader 选举。 所谓的分区上线,可能是创建了新分区,也可能是之前的下线分区重新上线。这是最常见的分区 Leader 选举场景。
ReassignPartition Leader 选举:当你手动运行 kafka-reassign-partitions 命令,或者是调用 Admin 的 alterPartitionReassignments 方法执行分区副本重分配时, 可能触发此类选举。假设原来的 AR 是[1,2,3],Leader 是 1,当执行副本重分配后,副本集 合 AR 被设置成[4,5,6],显然, Leader 必须要变更,此时会发生 Reassign Partition Leader 选举。
PreferredReplicaPartition Leader 选举:当你手动运行 kafka-preferred-replica- election 命令,或自动触发了 Preferred Leader 选举时,该类策略被激活。 所谓的 Preferred Leader,指的是 AR 中的第一个副本。比如 AR 是[3,2,1],那么, Preferred Leader 就是 3。
ControlledShutdownPartition Leader 选举:当 Broker 正常关闭时,该 Broker 上 的所有 Leader 副本都会下线,因此,需要为受影响的分区执行相应的 Leader 选举。
Rebalance:重平衡
基于上面的分析,我们对 kafka 的内部原理和集群架构有了更多的理解。最后,再来看一下 kafka 中的 Rebalance 机制。重平衡说的是某个主题下面的分区重新分配给一个消费者组中的消费者。
这里就可以类比 2PC 协议的处理过程。首先是每个主题都有一个协调者,并且消费者和协调者之间存在定时的心跳。其次,两阶段为:
joinGroup请求:请求入组
syncGroup请求:分配队列结果
假设是新增消费者的场景,新增消费者C3,那么 C3 主动向协调者发起 joinGroup 请求,协调者就知道该启动重平衡了。协调者会在响应其他消费者的心跳中,携带发起重平衡的通知。这样,当其他消费者收到心跳响应后,就会主动发起 joinGroup 请求。
joinGroup 请求中携带的是该消费者的订阅信息。协调者会认为第一个发起 joinGroup 请求的是领导者,也即负责计算分配结果的。对于普通节点的请求,会直接返回入组成功。对于领导者的 joinGroup 请求,会在收集到所有节点入组信息后,统一返回给领导者。
那么领导者会对这些信息做判断,并按规则计算分配结果。与此同时各节点可以发起 syncGroup 请求,等到领导者的 syncGroup 请求发到协调者以后,协调者会根据分配结果,在各节点的 syncGroup 响应中携带其被分配的队列信息。这样就实现了分区的负载均衡。
投票选举过程(基于共识算法)
Raft 使用投票过程来确保选举成为 leader 的 candidate 一定包含全部committed 的日志。
具体如下:
1)选举时,各个节点只会投票给 commited 日志大于等于自己的节点;
2)Candidate 必须获得超过半数的选票才能赢得选举;
3)Leader 复制日志时也需要复制给超过半数的节点。
leader election约束:
同一任期内最多只能投一票;
只会投票给日志和自己一样,或者比自己新的节点
log replication约束:
一个log被复制到大多数节点,就是committed,保证不会回滚
leader一定包含最新的committed log,因此leader只会追加日志,不会删除覆盖日志
不同节点,某个位置上日志相同,那么这个位置之前的所有日志一定是相同的
共识算法
通过 leader,raft 将一致性问题分解成三个相当独立的子问题:
Leader Election:当集群启动或者 leader 失效时必须选出一个新的l eader。
Log Replication:leader 必须接收客户端提交的日志,并将其复制到集群中的其他节点,强制其他节点的日志与 leader 一样。
Safety:最关键的安全点就是图3.2中的 State Machine Safety Property。如果任何一个 server 已经在它的状态机apply了一条日志,其他的 server 不可能在相同的 index 处 apply 其他不同的日志条目。后面将会讲述 raft 如何实现这一点。
Kafka 的事务消息,
所以说这个事务消息不是我们想要的那个事务消息,其实不是今天的主题
了,不过我还是简单的说一下。
Kafka 的事务有事务协调者角色,事务协调者其实就是 Broker 的一部分。
在开始事务的时候,生产者会向事务协调者发起请求表示事务开启,事务协调者会将这个消息记录到特殊的日志-事务日志中,然后生产者再发送真正想要发送的消息,这里 Kafka 和 RocketMQ 处理不一样,而不是半消息。Kafka 会像对待正常消息一样处理这些事务消息,由消费端来过滤这个消息。
然后发送完毕之后生产者会向事务协调者发送提交或者回滚请求,由事务协调者来进行两阶段提交,如果是提交那么会先执行预提交,即把事务的状态置为预提交然后写入事务日志,然后再向所有事务有关的分区写入一条类似事务结束的消息,这样消费端消费到这个消息的时候就知道事务好了,可以把消息放出来了。
最后协调者会向事务日志中再记一条事务结束信息,至此 Kafka 事务就完成了,我拿 confluent.io 上的图来总结一下这个流程。
常用消息中间件对比
Kafka事务
保证的全分区幂等性,消息只发一次,并且保证发送多条分区的任务要发送成功都成功,失败都失败。