Kafka是一个分布式的基于发布订阅模式的消息队列,主要作用解耦、削峰。
消息传递模式
点对点模式
生产者-消费者1对1,消费者确认消息接收后,队列即删除队列中的消息。该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序。
发布订阅模式
1对多,消费者消费数据后不会清除消息(Kafak为此模式)
常用MQ对比
ActiveMQ | RabbiMQ | RocketMQ | Kafka | |
---|---|---|---|---|
设计初衷 | Java | 基于erlang 语言,二次开发难度大 | 阿里参照kafka设计思想用java实现的mq | 使用scala实现的一个高性能分布式发布/订阅消息队列 |
支持的协议 | STOMP、AMQP、REST、XMPP | AMQP | 自定义 | 自定义(基于TCP) |
事务 | 支持 | 不支持 | 支持 | |
吞吐量 | 2w | 10w+ | 10w+ | |
组件支持 | 适合大数据,可接入flink、ss、flume | |||
多语言 | 支持 | 支持 | 只支持java | 支持 |
消息延迟 | 微秒 | 毫秒 | 毫秒 | |
可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 非常高(分布式) |
消息可靠性 | 基本不丢 | 基本不丢 | 配置参数修改可以做到0丢失 | 配置参数修改可以做到0丢失 |
功能 | MQ功能极其完善 | MQ功能较为完善 | 功能支持较为简单,大数据领域广泛使用 | |
消息存储 | 支持少量堆积 | 支持少量堆积 | 支持大量堆积 | 支持大量堆积 |
消息重试 | 支持 | 不支持 | ||
消息回溯 | 不支持 | 不支持 | 支持指定时间回溯 | 支持offset回溯 |
轻量级 | 较重,搭配zookeeper |
基础模型
文件存储
Kafka采取分片和索引机制,将每个partition分为多个segment,每个segment对应.log和.index文件
稀疏索引
segment index file采取稀疏索引存储方式,它减少索引文件大小,通过mmap可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。
问题:在partition中如何通过offser找到message?
以查找offset=7000为例。分两步:
首先查找到segement file,00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1。当offset=7000时定位到00000000000000000000.index|log(以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件)。
第二步通过segment file查找message,如上图所示,依次定位到00000000000000000000.index的元数据物理位置和 00000000000000000000.log的物理偏移地址。
kafka生产者
分区策略
分区以Partition为单位,方便集群的扩展;因为数据以Partition为单位进行读写,提高了并发能力。
生产者的每条数据都会封装成一个producerRecord对象发送给broker。producerRecord对象存在若干构造函数:
ProducerRecord(@NotNull String topic, Integer partition, String key, String value);//全部指定
ProducerRecord(@NotNull String topic, Integer partition, String value);//直接落盘对应partition
ProducerRecord(@NotNull String topic, String key, String value);//根据key的hash值与topic的partition数量取余得到partition值
ProducerRecord(@NotNull String topic, String value);//第一次调用随机生成一个整数(后面每次调用自增),然后与topic的partition数量取余数,也就是RoundRobin轮询算法。
数据可靠性保证
副本数据同步策略
方案 | 优点 | 缺点 |
---|---|---|
半数以上完成同步,就发送ack | 延迟低 | 选举新leader,如果能容忍n台节点故障,需要2n+1个副本 |
全部同步完成,才发送ack | 选举新leader,容忍n台节点故障,只需要n+1个副本 | 延迟高 |
Kafka选择方案二,原因如下:
1.相同要求下,方案一所需的副本过多,kafak的每个partition都存在大量的数据,会造成大量数据冗余。
2.虽然方案二延迟高,但网络延迟对Kafka影响较小。
问题:如果出现leader收到消息后,有一个follower副本迟迟没有与leader同步,leader只能一直等下去,无法发送ack,怎么解决呢?
这里引申出ISR,Kakfa维护了一个动态副本集合ISR。当ISR中的数据同步完成后,leader即发送ack。如果ISR中的follower长时间未向leader同步(时间阈值由replica.lag.time.max.ms参数设定),则该follower会被踢出ISR。当然此处只是针对ACK=-1时的处理方式。
如果设置ACK = 1 或 0 则不存在该问题,但可靠性会变低。
ISR
分区中所有的副本统称AR(Assigned Replicas),leader也成为leader副本。所有同步能”跟上”leader副本的副本(包括leader副本本身)组成ISR(In-Sync Replicas),同步速度不能”跟上”leader副本的副本(不包括leder副本)组成OSR(Out-Sync Replicas),即AR = ISR + OSR。
消息会先发送给leader副本,然后follower副本才能从leader副本中拉去消息进行同步。同步过程中会存在一定的滞后,判断是否滞后过多而无法”跟上”leader副本的阈值,可以通过参数配置。
当OSR中的副本”追上”leader副本后,可以重新进入ISR;当ISR中的副本被判断”落后”过多,则剔除到OSR中。另外只有ISR中的副本才有资格被选举为leader。
ACK应答机制
存在多种应答机制的原因本质是吞吐量和可靠性的博弈。
ACK | 说明 |
---|---|
-1 | 生产者发送消息后,需要等待ISR中所有副本都成功写入消息后才能收到来自服务器端的成功响应,可靠性最强 |
0 | 生产者发送消息后,无需等待任务服务端的响应,吞吐量最高 |
1 | 默认设置值。只要分区的leader副本写入成功消息,生产者就会收到来自服务器的成功响应 |
问题:为什么ACK=1的情况下,也可能存在消息丢失?
ack=1的情况下,producer只要leader写入成功即收到消息发送成功的响应。如果leader写入成功后,还没来得及把数据同步给follower节点就挂了,这个时候消息就丢失了。
故障处理
LEO&HW
LEO:每个副本的最后一个offset HW:所有副本(ISR)中最小的LEO(木桶效应) ⚠️注意:HW之前的数据才对消费者可见
Follower发生故障
follower发生故障后,会被临时踢出ISR。
待follower恢复后,读取本地磁盘上次记录的HW,将HW后的log记录丢弃,从HW开始向leader进行同步,当该follower的LEO大于等于该Partition的HW,即follower追上ISR的HW,就可以重新加入ISR中。
Leader发生故障
leader发生故障之后,从ISR中选出一个新的leader。
之后为了保证多个副本之间数据一致性,其余的follower会将HW之后的数据截断,重新从新的leader同步数据。
⚠️注意:Leader故障的处理方式只能保证副本间数据一致性,并不能保证数据不丢失或者不重复。
Exactly Once语义
ACK=0时候,保证生产者每条信息只发送一次,即At Most Once,当ACK=-1时候,保证patition副本一定同步完成,不会丢数据,即At Least Once。At Least Once保证数据不丢失,但可能存在消息重复发送的问题。
Exactly Once = At Least Once + 幂等性
在Kafka_0.11版本之前,是不支持Exactly Once的。处理方式只能启用At Least Once,下游数据进行幂等性校验,比如全局去重。
Kafka_0.11版本引入幂等性,启用幂等性方式只需要设置producer的参数_enable.idempotence=true。_Kafka实现幂等性的主要方式建立唯一索引(由PID,PatitionId, SeqNumber组成);Producer在初始化的时候会被分配一个PID,Producer发送数据的每个
事务
Kafka在0.11版本不仅引入了Exactly Onece语义,还引入了事务特性。
kafka事务特性是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务里,或者说是一个原子操作,生产消息和提交偏移量同时成功或失败。
Kafka的事务特性就是确保跨分区的多个写操作的原子性
应用场景
- 生产者发送多条消息,可以封装在一个事务里,形成原子操作
- read-process-write模式:将消息消费和生产封装在一个事务中,形成原子操作。比如流式操作中,服务从上游接收消息,经过处理发送到下游(这就对应消息的消费和生产)
事务配置
https://www.jianshu.com/p/64c93065473e
对于producer,需要设置transactional.id属性,设置后,enable.idempotent属性会自动设置为true。
对于consumer,需要设置isolation.level=read_commited,这样consumer只会读取已经提交事务的消息。另外需要设置enable.auto.commit=false来关闭自动提交offset的功能
kafka消费者
消费方式
consumer采用pull(拉)模式从broker中读取数据。
push(推)模式很难适应各个消费者的消费速率,因为消息发送速率由broker决定的,很容易造成消费者来不及处理消息(典型表现就是拒绝服务以及网络拥塞)
pull模式可以根据消费者能力以适应当前的消费速率,但pull模式的不足之处是,如果kafka中没有数据,消费者会陷入空轮询,一直返回空数据。
针对这点,kafka的消费者在消费数据的时候会传入一个时长参数timeout,如果当前没有数据可以消费,会等待一段时长(timeout)再进行返回。
offset维护
Kafka0.9
版本之前consumer
默认将offset
维护在zookeeper
中,从Kafka0.9
版本开始,consumer
默认将offset
在Kafka一个内置topic
中,该topic
为__consumer_offsets
。offset维护在zookeeper中有一个缺点,每次消费者都要去zookeeper上写入一次,效率较低,并且zk和kafka的offset
变化确认也要走网络IO。
offset由zookeeper维护称为low level api。
offset由broker维护称为high level api。high level api更新offset方式分为自动提交和手动提交,自动提交(_enable.auto.commit=true_
),fetch到消息后就可以更新offset,无论是否消费成功,为at most once。更新频率由_auto.commit.interval.ms_
设置;手动提交(_enable.auto.commit=false_
),fetch到消息后,消费成功再调用_consumer.commitySync()_
,手动更新offset,如果更新失败不调用该方法,offset也不会更新,此消息会被重复消费一次,所以是at least once。
重试机制【1】
默认情况下,如果消费者没有成功消费一条消息(消费者无法提交当前偏移量),它将重试同一条消息。根据没有成功消费引起的异常分为偶现(如下游服务短暂不可用、数据库异常)和必现异常(代码异常如8/0算术异常)。如果是偶现异常,一直重试是个好办法,既能保证数据消费有序性、上下游数据一致性还能保证能最终被消费成功。如果是必现异常,那么一直重试并不合适,他将一直阻塞住,后续消息无法继续消费。
折衷解决方案
有一种比较流行的重试主题方案,在此作为一种思路:
- 消费者尝试消费主体中一条数据
- 消费失败,将消息发送到
Retry Topic1
,提交偏移量继续处理下一条 - 订阅
Retry Topic1
的消费者处理逻辑和Main Topic一样,如果他也无法消费,继续发布到下一个重试主题,每次递进的主题重试次数增多和延迟增加。 - 最终仍无法消费的消息,将消息发布到死信队列(Dead Letter Queue,DLQ)中,进行手动处理
衍生出来的问题与优化处理
不同类型错误处理不该一样
偶现和必现的消费异常应该区别对待,所以我们在处理的时候,针对偶现的异常应该进行同个队列重试(主要保证消费有序性、下游数据一致)而针对必现的则直接扔到死信队列即可。
当然此处的异常类型判断也是一个难点。
忽略了排序
如果是A更新为A‘,再A’更新为A,此时第一条消息失败了,第二条成功了,根据上面的模型最后手动介入或者重试成功了,将导致更新顺序的改变而导致数据一致性问题。
此处排序解决方案(消息一经入到其他主题),更多我的建议是更多通过业务代码保证,如更新操作需要对比版本,旧版本消息不予处理,简单自增等正常处理即可。
进一步解决
异常状态采用“日志记录”+“任务健康检查”+“人工补偿”来解决。定时任务检查日志,发现异常进行人工补偿。
RabbitMQ的重试机制
- @RabbitListener 底层使用了AOP进行拦截,如果程序没有抛异常,自动提交事务。
- 如果AOP使用异常通知拦截获取异常信息的话,自动实现补偿机制,该消息会缓存到RabbitMQ服务端进行缓存,一直重试到不抛异常为准。
参考
【1】你可能用错了 Kafka 的重试机制 Dave Taubler InfoQ【微信公众号】