为什么高性能、高吞吐

顺序读写

Kafka是将消息记录持久化到本地磁盘中的,一般人会认为磁盘读写性能差,可能会对Kafka性能如何保证提出质疑。实际上不管是内存还是磁盘,快或慢关键在于寻址的方式,磁盘分为顺序读写与随机读写,内存也一样分为顺序读写与随机读写。基于磁盘的随机读写确实很慢,但磁盘的顺序读写性能却很高,一般而言要高出磁盘随机读写三个数量级,一些情况下磁盘顺序读写性能甚至要高于内存随机读写。

Page Cache(页缓存)

Kafka利用了操作系统本身的Page Cache,就是利用操作系统自身的内存而不是JVM空间内存
操作系统层面的缓存利用率会更高
操作系统本身也对于Page Cache做了大量优化

零拷贝

image.pngimage.png

当Kafka客户端从服务器读取数据时,如果不使用零拷贝技术,那么大致需要经历这样的一个过程:

  1. 操作系统将数据从磁盘上读入到内核空间的读缓冲区中。
  2. 应用程序(也就是Kafka)从内核空间的读缓冲区将数据拷贝到用户空间的缓冲区中。
  3. 应用程序将数据从用户空间的缓冲区再写回到内核空间的socket缓冲区中。
  4. 操作系统将socket缓冲区中的数据拷贝到NIC缓冲区中,然后通过网络发送给客户端。

避免了在内核空间和用户空间之间的拷贝

分区分段+索引

  1. Kafkamessage是按topic分类存储的,topic中的数据又是按照一个一个的partition即分区存储到不同broker节点。每个partition对应了操作系统上的一个文件夹,partition实际上又是按照segment分段存储的。这也非常符合分布式系统分区分桶的设计思想。
  2. 通过这种分区分段的设计,Kafkamessage消息实际上是分布式存储在一个一个小的segment中的,每次文件操作也是直接操作的segment。为了进一步的查询优化,Kafka又默认为分段后的数据文件建立了索引文件,就是文件系统上的.index文件。这种分区分段+索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作的并行度。

参考
Kafka文件存储机制那些事

批量读写

在向Kafka写入数据时,可以启用批次写入,这样可以避免在网络上频繁传输单个消息带来的延迟和带宽开销。假设网络带宽为10MB/S,一次性传输10MB的消息比传输1KB的消息10000万次显然要快得多。

批量压缩

在很多情况下,系统的瓶颈不是CPU或磁盘,而是网络IO,对于需要在广域网上的数据中心之间发送消息的数据流水线尤其如此。进行数据压缩会消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑。

如果每个消息都压缩,但是压缩率相对很低,所以Kafka使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩
Kafka允许使用递归的消息集合,批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式,直到被消费者解压缩
Kafka支持多种压缩协议,包括Gzip和Snappy压缩协议

  1. Kafka速度的秘诀在于,它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,通过mmap提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优;读取数据的时候配合sendfile直接暴力输出。

事务

观点一
image.png

谢邀。 Kafka提供事务主要是为了实现精确一次处理语义(exactly-once semantics, EOS)的,而EOS是实现流处理系统正确性(correctness)的基石,故Kafka事务被大量应用于Kafka Streams之中。不过用户当然也能够调用API实现自己的事务需求。 具体的场景包括: 1. producer端跨多分区的原子性写入 2. consumer端读取事务消息 多分区原子性写入保证producer发送到多个分区的一批消息要么都成功要么都失败=>所谓的失败是指对事务型consumer不可见;而consumer端读取事务消息主要由consumer端隔离级别体现,它类似于数据库中隔离级别的概念,目前只是简单分为:read_uncommitted和read_committed,其中后者指的是consumer只能读取已成功提交事务的消息(当然也包括非事务型producer生产的消息)。目前Kafka事务在consumer端很难像一般的数据库那样提供更高级的隔离级别(比如串行化或者Snapshot),即事务型consumer能保证读取到的消息都是已提交事务的消息,但不敢保证能够读取到所有这样的消息——有很多原因会导致这一点,比如compact topic使用新版本消息覆盖了之前的事务消息或日志段删除导致部分数据不可读等。

观点二

image.png

Kafka的事务和数据库里用的ACID事务不是一个东西。 Kafka的事务主要用来处理consume-process-produce场景的原子性问题:一个数据处理服务从Kafka的若干源topic取数据,处理后,再发送到另外一些目标topic里。在这个过程中,Kafka事务保证:要么数据被处理了,目标topic的结果被正确写入,源topic的数据被消费掉;要么这个数据还能从源topic里读取到,就像没被处理过一样,不会出现源topic还没consume,目标topic已经produce出去的情况。并且,源和目标topic不止一个时也可以保证这个特性。

总结

1. producer端跨多分区的原子性写入
2. consumer端读取事务消息
3.实现端对端;一个数据处理服务从Kafka的若干源topic取数据,处理后,再发送到另外一些目标topic里

消息不丢失

生产端

send()接口

kafka生产者提供的send()接口,有三种模式:

  1. 发后即忘(fire-and-forget)
  2. 同步(sync)
  3. 异步(async)

    send(record, callback)接口的调用让生产者即不会像第1模式那样完全不在乎是否发送成功,也不像第2模式那样发送需阻塞等待。有了callback回调,消息发送既可以异步,同时如果kafka服务有错误信息,也能准确回调告诉程序,让其能针对性地进行处理。并且,对于同一个分区而言,回调函数的调用也可以保证分区有序。
    生产环境应用中,大部分人都会选择第3种模式,让callback回调告诉本次消息是否真的提交成功。另外可能还会因为网络抖动而发送失败的情况,我们可以配置重试机制。总之,生产端要保证消息「已提交」到kafka服务这个职责。

acks

生产者重要的配置参数:acks
acks=0。生产者发送消息无需等待kafka服务端的响应。这种方法可靠性最低,即使kafka出现任何异常,producer也无法感知到。
acks=1。生产者发送消息之后,只需等待其接收的leader副本成功写入就会收到成功的响应,无需等其他follower副本同步写入。这种方式是衡量可靠性和性能之间的一种折中方案。不过依然存在消息丢失的情况,即消息刚写入leader副本并成功返回之后,leader副本崩溃了,那么此时因为其他follower副本都没有同步,那么消息就丢失了。
acks=all。设置acks=-1是一样的。生产者发送消息之后,需等待ISR(in-sync replica)的所有副本都成功写入才会接收到成功的响应。这种方式是可靠性最强的。不过还需配合另一个参数min.insync.replicas的联动控制。

retries

设置retries为一个大于0的值
这个参数用来配置生产者发送消息失败后的重试次数。另外注意,如果设置了retries参数,则建议设置max.in.flight.requests.per.connection=1不然可能无法保证同一个分区的消息有序性。

max.in.flight.requests.per.connection参数指定了生产者在收到服务端响应之前可以发送多少条消息,默认为5条。把它设置为1可以保证消息发送的有序性。如果设置大于1,并且配置了重试机制,那么就会出现错序的现象。比如第一条消息写入失败,第二条消息写入成功,那么生产者会重试发送第一条消息,如果此时发送成功,则第一条消息会被插入到第二条消息后面,导致错序。
retry.backoff.ms参数指定两次重试之间的时间间隔,避免无效的频繁重试,默认值为100。

broker服务

设置replication.factor>=3

表示主题分区的副本数。生产环境中也是建议至少3个brokers,这样能让每个broker都冗余保存一份数据,保证数据的可靠性。

设置min.insync.replicas>1

当acks被设置为all时,这个参数控制指定了消息至少被写入到多少个副本才认为是成功的。min.insync.replocas参数是对acks=all的一种具体约束。但是要确保replication.factor>min.insync.replicas,如果配置两者相等,那么一旦有一个副本挂了,那就是导致消息永远无法发送成功了。


设置unclean.leader.election.enable=false

这个参数表示是否允许那些没有在ISR(in-sync-replicas)的broker有资格竞选分区leader。默认值为false,建议最好不要主动设置为true。因为如果没有在ISR集合中的副本,可能有些broker副本数据已经落后原先的leader太多了,一旦它成为新的leader副本,那必然出现消息的丢失。

消费端

维持先消费消息,再提交更新位移,这样可以最大限制保证消息不丢失

补充

  • AR。分区中的所有副本统称为AR(Assigned Replicas)。
  • ISR。所有与leader副本保持一定程度同步的副本(包括leader副本在内)称为ISR(In-sync Replicas)。ISR集合是AR的一个子集,并且可以看作是与leader副本数据比较同步的副本。
  • OSR。与leader副本同步滞后过多的副本(不包括leader副本)称为OSR(Out-sync Replicas)。OSR集合也是AR的一个子集。OSR是与ISR互斥的。

image.png

这个文件中有9条消息,最后一条消息的offset为8

  • LSO(Log Start Offset) offset为0的第一条消息
  • HW(High Watermark)俗称高水位,这里用来指定消费端可消费的最高偏移量,即消费者只能拉取这个HW offset之前的消息进行消费。
    LEO(Log End Offset),标识当前日志文件中下一条待写入的消息的offset,最新一条可写入的offset为9


分区ISR集合中的每个副本都会维护自身的LEO,而ISR集合中最小的LEO即为分区的HW,所以对消费者而言只能消费HW之前的消息。上图中,offset在0至5之间的消息,所有指定的副本已同步写入完成,所以消费者可以拉取消费;offset在6~8,说明ISR中部分副本已写入,另一部分处于正在同步状态,所以不能被消费者所拉取。

业务实现

要想client端消费数据不能丢,肯定是不能使用自动提交的,所以必须是手动提交的。
一、从Kafka拉取消息(一次批量拉取500条,这里主要看配置)时
二、为每条拉取的消息分配一个msgId(递增)
三、将msgId存入内存队列(sortSet)中
四、使用Map存储msgId与msg(有offset相关的信息)的映射关系
五、当业务处理完消息后,ack时,获取当前处理的消息msgId,然后从sortSet删除该msgId(此时代表已经处理过了)
六、接着与sortSet队列的首部第一个Id比较(其实就是最小的msgId),如果当前msgId<=sort Set第一个ID,则提交当前offset
七、系统即便挂了,在下次重启时就会从sortSet队首的消息开始拉取,实现至少处理一次语义
八、会有少量的消息重复,但只要下游做好幂等就OK了。


消息幂等

  • 业务查重,根据主键查询
  • 直接插入,因为主键相同会进异常
  • 生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。

业务实现

幂等Key我们由订单编号+订单状态所组成(一笔订单的状态只会处理一次)
在处理之前,我们首先会去查Redis是否存在该Key,如果存在,则说明我们已经处理过了,直接丢掉
如果Redis没处理过,则继续往下处理,最终的逻辑是将处理过的数据插入到业务DB上,再到最后把幂等Key插入到Redis上
显然,单纯通过Redis是无法保证幂等的
所以,Redis其实只是一个「前置」处理,最终的幂等性是依赖数据库的唯一Key来保证的(唯一Key实际上也是订单编号+状态)
总的来说,就是通过Redis做前置处理,DB唯一索引做最终保证来实现幂等性的

消息顺序消费

订单的状态比如有 支付、确认收货、完成等等,而订单下还有计费、退款的消息报
理论上来说,支付的消息报肯定要比退款消息报先到嘛,但程序处理的过程中可不一定的嘛
一、宽表:将每一个订单状态,单独分出一个或多个独立的字段。消息来时只更新对应的字段就好,消息只会存在短暂的状态不一致问题,但是状态最终是一致的
二、还有部分场景,可能我们只需要把相同userId/orderId发送到相同的partition(因为一个partition由一个Consumer消费),又能解决大部分消费顺序的问题

ISO

kafka也一样,它的Leader 维护了一个动态的 in-sync replica set,简称ISR,意为和leader保持同步的follower集 合。当ISR中的follower完成数据的同步之后,leader就会给follower发送ack。如果follower长时间 未 向leader同步数据 ,则该follower将被踢出ISR ,该时间阈值由配置文件中replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新的leader

image.png

follower故障处理

follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。

leader故障处理

leader发生故障之后,会从ISR中选出一个新的leader,之后为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。