kafka副本
kafka分区中的所有副本统称为AR(Assigned Replicas)。所有与leader保持一定同步的副本(包括leader副本在内)组成ISR(In-Sync Replicas),ISR是AR集合中的一个子集。与leader副本同步滞后过多的副本(不包括leader副本)组成OSR(Out-Of-Sync Replicas),由此可见AR=ISR+OSR。在正常情况下,所有的follower副本都应该与leader副本保持一定程度的同步,即AR=ISR,OSR集合为空。
ISR HW LEO 也有紧密的关系。HW是High Watermark 的缩写,俗称高水位,它标识了一个特定的消息偏移量( offset ),消费者只能拉取到这个 offset 前的消息。
如图1-4 示,它代表一个日志文件,这个日志文件中有9条消息,第一条消息的 offset ( LogStartOffset )为0,最后一条消息的 offset 8, offset为9的消息用虚线框表示,代表下一条待写入的消息。日志文件的 HW为6,表示消费者只能拉取到 offset 在0至5之间的消息, offset为6的消息对消费者而言是不可见的。
LEO Log End Offset 缩写,它标识当前日志文件中下一条待写入消息 offset ,图 1-4中 offset 的位置即为当前日志文件的 LEO, LEO 的大小相当于当前日志分区中最后一条消 息的offset值加 1。分区 ISR 集合中的每个副本都会维护自身的 LEO ,而 ISR集合中最小的 LEO 即为分区的 HW ,对消费者而言只能消费 HW 之前的消息 。
生产者拦截器
Kafka拦截器,主要是自定义实现ProducerInterceptor接口。主要实现三个方法:
- public ProducerRecord
onSend(ProducerRecord record); - public void onAcknowledgement(RecordMetadata metadata, Exception exception);
- public void close();
KafkaProducer在消息序列化和分区之前,调用生产者拦截器onSend()方法,对消息进行相应的定制化操作。一般来说最好不要修改消息ProducerRecord的topic、key和partition等信息。如果需要修改,则要确保对其准确的判断,否则会与预想的效果出现偏差。比如修改key不仅会影响分区的计算,同样会影响broker端日志压缩(Log Compaction)的功能。
KafkaProducer会在消息被应答之前(Acknowledgement)之前或消息发送失败时调用生产者拦截器的onAcknowledgement()方法,优先于用户设定的Callback之前执行。这个方法运行在Producer的I/O线程中,所以这个方法实现的代码逻辑越简单越好,否则会影响消息的发送速度。
整体架构
RecordAccumulator(消息累加器/消息收集器),其缓存大小通过buffer.memory来配置,默认32M。
消息在网络上是以字节Byte的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的,在RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个特定的大小由batch.size参数来指定,默认值16384B,即16K。我们可以适当地调大batch.size参数以便多缓存一些消息。
Sender从RecordAccumulator中获取缓存的消息后,会进一步将原本<分区, Deque
请求从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为Map
元数据更新
InFlightRequests还可以获得leastLoadedNode,即所有Node中负载最小的那一个。元数据是指Kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR、ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。当超过metadata.max.age.ms时间没有更新元数据都会引起元数据的更新操作。客户端参数metadata.max.age.ms的默认值为300000,即5分钟。当需要更新元数据时,会先挑出leastLoadedNode,然后向这个Node发送MetadataRequest请求来获取具体的元数据信息。
acks
- acks=1。默认值即为1。生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。
- acks=0。生产者发送消息之后不需要等待任何服务端的响应。
- acks=-1或者acks=all。生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能收到来自服务端的成功响应。当ISR中只有leader副本,这样就退化成acks=1的情况。要获得更高的消息可靠性需要配合min.insync.replicas等参数的联动。
max.request.size
这个参数用来限制生产者客户端能发送的消息的最大值,默认值为1048576B,即1MB。retries和retry.backoff.ms
retries参数用来配置生产者重试的次数,默认值为0,即在发生异常时候不进行任何重试动作。不过并不是所有的异常都是可以通过重试来解决的,比如消息太大,超过max.request.size参数配置的值时,这种方式就不可行了。
retry.backoff.ms参数用来设定两次重试之间的时间间隔,避免无效的频繁重试,默认值为100。
Kafka可以保证同一个分区中的消息是有序的。对于某些应用来说,顺序性非常重要,比如MySQL的binlog传输,如果出现错误就会造成非常严重的后果。如果将acks参数配置为非零值,并且max.in.flight.requests.per.connection参数配置为大于1的值,那么就会出现错序的现象;如果第一批次的消息写入失败,而第二批次的消息写入成功,那么生产者会重试第一批次的消息,此时如果第一批次的消息写入成功,那么这两个批次的消息就出现了错序。一般而言,在需要保证消息顺序的场合建议把参数max.in.flight.requests.per.connection配置为1,而不是把acks配置为0,不过这样也会影响整体的吞吐。compression.type
这个参数用来指定消息的压缩方式,默认值为”none”,即默认情况下,消息不会被压缩。该参数还可以配置为”gzip” “snappy” 和 “lz4”。对消息进行压缩可以极大的减少网络传输量、降低网络I/O,从而提高整体的性能。消息压缩是一种使用时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩。linger.ms
这个参数用来指定生产者发送ProducerBatch之前等待更多消息(ProducerRecord)加入ProducerBatch的时间,默认值为0。生产者客户端会在ProducerBatch被填满或等待时间超过linger.ms值时发送出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。receive.buffer.bytes
这个参数用来设置Socket接收消息缓冲区的大小,默认值为32768B,即32KB。如果设置为-1,则使用操作系统的默认值。如果Producer与Kafka处于不同的机房,则可以是当地调大这个参数值。send.buffer.bytes
这个参数用来设置Socket发送消息缓冲区的大小,默认值为131072B,即128KB。与receive.buffer.bytes参数一样,如果设置为-1,则使用操作系统的默认值。request.timeout.ms
这个参数用来配置Producer等待请求响应的最长时间,默认值为30000(ms)。请求超时之后可以选择进行重试。注意这个参数需要比broker端参数replica.lag.time.max.ms的值要大,这样可以减少因客户端重试而引起的消息重复的概率。部分生产者客户端参数
重要的消费者参数
fetch.min.bytes
该参数用来配置Consumer在一次拉取请求(调用poll()方法)中能从Kafka中拉取的最小数据量,默认值为1B。Kafka在收到Consumer的拉取请求时,如果返回给Consumer的数据量小于这个参数所配置的值,那么它就需要进行等待,直到数据量满足这个参数的配置大小。可以适当调大这个参数的值以提高一定的吞吐量,不过也会造成额外的延迟(latency),对于延迟敏感的应用可能就不可取了。fetch.max.bytes
该参数与fetch.max.bytes参数对应,它用来配置Consumer在一次拉取请求中从Kafka中拉取的最大数据量,默认值为52428800B,也就是50MB。如果这个参数设置的值比任何一条写入的Kafka中的消息要小,那么会不会造成无法消费呢?消息将仍然返回,以确保消费者继续工作。Kafka中所有接收的最大消息的大小通过服务端参数message.max.bytes(对应主题端参数max.message.bytes)来设置。fetch.max.wait.ms
这个参数也和fetch.min.bytes参数有关,如果Kafka仅仅参考fetch.min.bytes参数的要求,那么有可能会一直阻塞等待而无法发送响应给Consumer,显然这是不合理的。fetch.max.wait.ms参数用于指定Kafka的等待时间,默认值为500(ms)。如果Kafka中没有足够多的消息而满足不了fetch.min.bytes参数的要求,那么最终会等待500ms。这个参数的设定和Consumer与Kafka之间的延迟也有关系,如果业务应用对延迟敏感,那么可以适当调小这个参数。max.partition.fetch.bytes
这个参数用来配置从每个分区里返回给Consumer的最大数据量,默认值为1048576(B),即1MB。这个参数与fetch.max.bytes参数相似,只不过前者用来限制一次拉取中每个分区的消息大小,而后者用来限制一次拉取中整体消息的大小。同样,如果这个参数设定的值比消息的大小要小,那么也不会造成无法消费,Kafka为了保持消费逻辑的正常运转不会对此做强硬的限制。max.poll.records
这个参数用来配置Consumer在一次拉取中拉取的最大消息数,默认为500条。如果消息的大小都比较小,则可以适当调大这个参数值来提升一定的消费速度。connections.max.idle.ms
这个参数用来指定在多久之后关闭限制的连接,默认值是540000ms,即9分钟。exclude.internal.topics
Kafka中有两个内部的主题:consumer_offsets和transaction_state。exclude.internal.topics用来指定Kafka中的内部主题是否可以向消费者公开,默认值为true,那么只能使用subscribe(Collection)的方式而不能使用subscribe(Pattern)的方式来订阅内部主题,设置为false则没有这个限制。receive.buffer.bytes
这个参数用来设置Socket接收消息缓冲区的大小,默认值为65536B,即64KB。如果设置为-1,则使用操作系统的默认值。如果Consumer与Kafka处于不同的机房,则可以适当调大这个参数值。send.buffer.bytes
这个参数用来设置Socket发送消息缓冲区的大小,默认值为131072B,即128KB。与receive.buffer.bytes参数一样,如果设置为-1,则使用操作系统的默认值。request.timeout.ms
这个参数用来配置Consumer等待请求响应的最长时间,默认30000ms。metadata.max.age.ms
这个参数用来配置元数据的过期时间,默认值为300000ms,即5分钟。如果元数据在此参数所限定的时间范围内没有进行更新,则会被强制更新,即使没有任何分区变化或者有新的broker加入。reconnect.backoff.ms
这个参数用来配置尝试重新连接指定主机之前的等待时间(也成为退避时间),避免频繁地连接主机,默认值为50ms。这种机制适用于消费者向broker发送的所有请求。retry.backoff.ms
这个参数用来配置尝试重新发送失败的请求到指定的主题分区之前的等待(退避)时间,避免在某些故障情况下频繁地重复发送,默认值为100ms。isolation.level
这个参数用来配置消费者的事务隔离级别。字符串类型,有效值为”read_uncommitted”和”read_committed”,表示消费者所消费到的位置,如果设置为”read_committed”,那么消费者会忽略事务未提交的消息,即只能消费到LSO(LastStableOffset)的位置,默认情况下为”read_uncommitted”,即可以消费到HW(High Watermark)处的位置。部分消费者客户端的重要参数
主题端参数
优先副本的选举
kafka-prefferred-replica-election.sh 用来做分区自动平衡,一般使用path-to-json-file参数来分批、手动的执行优先副本的选举操作。
kafka-reassign-partitions.sh 用来做分区重分配
- 创建一个JSON文件(文件的名称假定为reassign.json),文件内容为要进行分区重分配的主题清单。示例如下:
- 根据这个json文件和指定所要分配的broker节点列表来生成一份候选的重分配方案。
bin/kafka-reassign-partitions.sh —zookeeper localhost:2181/kafka —generate —topics-to-move-json-file reassign.json —broker-list 0,2
- 执行具体的重分配动作
bin/kafka-reassign-partitions.sh —zookeeper localhost:2181/kafka —execute —reassignment-json-file project.json
- 验证查看分区重分配的进度。只需要将上面的execute替换为verify即可。(可选)
bin/kakfa-reassign-partitions.sh —zookeeper localhost:2181/kafka —verify —reassignment-json-file project.json
在第三步执行完后,主题的负载有可能不均衡,使用kafka-preferred-replica-election.sh脚本来执行一次优先副本的选举动作。
查询副本情况:bin/kafka-topics.sh —zookeeper localhost:2181/kafka —describe —topic topic-reassign
复制限流
分区重分配的第三步可以加上限流,命令如下:
bin/kafka-reassign-partitions.sh —zookeeper localhost:2181/kafka —execute —reassignment-json-file project.json —throttle 10
如何选择合适的分区数
性能测试工具
kafka本身提供的用于生产者性能测试的kafka-producer-perf-test.sh和用于消费者性能测试的kafka-consumer-perf-test.sh
bin/kafka-producer-perf-test.sh —topic topic-1 —num-records 1000000 —record-size 1024 —throughput -1 —producer-props bootstrap.servers=localhost:9092 acks=1
其中topic用来指定生产者发送消息的目标主题;num-records用来指定发送消息的总条数;record-size用来设置每条消息的字节数;producer-props参数用来指定生产者的配置,可同时指定多组配置,各组配置之间以空格分隔,与producer-props参数对应的还有一个producer.config参数,它用来指定生产者的配置文件;throughput用来进行限流控制,当设定的值小于0时不限流,当设置的值大于0时,当发送的吞吐量大于该值时就会被阻塞一段时间。
分区数越多吞吐量就越高吗?
随着分区数的增加,相应的吞吐量也会有所增长。一旦分区数超过了某个阈值之后,整体的吞吐量也是不升反降的,说明了分区数越多并不会使吞吐量一直增长。
分区数的上限
bin/kafka-topics.sh —zookeeper localhost:2181/kafka —create —topic topic-bomb —replication-factory 1 —partitions 10000
发现kafka服务已经崩溃,打开kafka的服务日志文件,会发现服务日志中出现大量的异常:Too many open files,这是一种常见的Linux系统错误,通常意味着文件描述符不足,它一般发生在创建线程、创建Socket、打开文件这些场景下。通过ulimit命令可以查看。
对于一个高并发、高性能的应用来说,1024或者4096的文件描述符限制未免太少,可以适当调大这个参数。比如使用ulimit -n 65525命令将上限提高至65535,这样足以应对大多数的应用情况,再高也完全没有必要了。
考量因素
从吞吐量方面考虑,增加合适的分区数可以再一定程度上提升整体吞吐量,但超过对应的阈值之后吞吐量不升反降。如果应用对吞吐量有一定程度上的要求,则建议在投入生产环境之前对同款硬件资源做一个完备的吞吐量相关的测试,以找到合适的分区数阈值区间。
一般情况下,根据预估的吞吐量及是否与key相关的规则来设定分区数即可,后期可以通过增加分区数、增加broker或分区重分配等手段来进行改进。如果一定要给一个准则,则建议将分区数设定为集群broker的倍数,即假定集群有3个broker节点,可以设定分区数为3、6、9等,至于倍数的选定可以参考预估的吞吐量。不过,如果集群中broker节点数很多,比如大几十或上百、上千,那么这种准则也不太适应,在选定分区数时进一步可以引入基架等参考因素。