生产者客户端架构图

image.png

原理分析

消息发送流程

1)整个生产者客户端由两个线程协调运行,分别为主线程(调用Producer的线程)与Sender线程。在主线程中初始化KafkaProducer实例,创建消息(ProducerRecord),然后依次执行发送端消息拦截器、消息序列化器、消息分区器,最终同主题分区的消息会加入到相同的一个批次里,并缓存到消息收集器里(RecordAccumulator),存储形式为如下:

private final ConcurrentMap> batches;

Sender线程从消息收集器里的batches对象里取可发送的批次,然后调用KafkaClient网络组件发送到主题分区对应的Broker里。

2) RecordAccumulator主要用来缓冲消息, 以便Sender线程可批量发送,进而减少网络传输的资源消耗, 以提升性能与发送吞吐量。缓冲区的大小可通过生产者客户端参数buffer.memory 配置,默认为32MB.
如果生产者发送消息的速度超过发送到服务器的速度,则会导致发送的缓冲空间不足,这时候主线程调用KafkaProducer的send方法可能会阻塞,或抛出异常。可通过参数max.block.ms配置最大阻塞时间,默认为60秒. 内部通过加重入锁来控制内存分配,如果未获取到内存则被加入到Lock关联的Condition队列中等待,超过max.block.ms后,则抛出中断异常。

3)主线程中发送的ProducerRecord 执行分区策略后,得到一个TopicPartition,然后会被放到RecordAccumulator的一个Map中,如代码:

  1. KafkaProducer#doSend() {
  2. ...
  3. int partition = partition(record, serializedKey, serializedValue, cluster);
  4. TopicPartition tp = new TopicPartition(record.topic(), partition); // 创建Record时指定了Topic
  5. }

RecordAccumulator中内部为每个TopicPartition维护了一个Deque双端队列,队列的内容存储的是:ProducerBatch,即Deque, 发送的单个消息会被追加到队列的尾部的ProducerBatch里, 读取时会从双端队列的头部开始读取。也就是说Kafka会把单个消息打包到一个批次里,批量发送,这样内部的消息体字节会更加紧凑。将小的消息拼凑成大的批次可以减少网络请求,提报吞吐量。
如果生产者客户端需要向很多主题分区发送消息,可以适当调大buffer.memory 参数以提高吞吐量。
关键代码:

  1. 放入队列尾部
  2. RecordAccumulator#tryAppend() {
  3. ProducerBatch last = deque.peekLast();
  4. if (last != null) {
  5. FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
  6. }
  7. }
  8. 从队列头开始读:
  9. RecordAccumulator#ready() {
  10. ...
  11. ProducerBatch batch = deque.peekFirst();
  12. if (batch != null) {
  13. TopicPartition part = entry.getKey();
  14. Node leader = cluster.leaderFor(part);
  15. }
  16. ...
  17. }

由于消息在网络上是以字节Byte的形式传输, 因此RecordAccumulator内部也有一个块单独的内存用来存储消息体,Kafka直接基于JDK原生的NIO来实现字节流的内存与网络控制,减少依赖第三方库,避免引入不稳定的因素。通过java.nio.ByteBuffer 来实现内存的分配与释放, 由于频繁的创建与释放内存比较浪费资源,因此RecordAccumulator内部维护了一个BufferPool, 它主要用于复用ByteBuffer,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存到BufferPool, 通过batch.size 参数来配置这个特定大小的ByteBuffer,默认为值为16KB. 我们可以适当调整这个参数,以便多缓存一些消息.(Kafka建议不要修改这个参数大于默认值)

同时,batch.size 的大小与ProducerBatch的关系很密切。当一条消息(ProducerRecord)进入RecordAccumulator,先通过主题分区检查是否存在对应的双端队列,没有则会创建一个双端队列。然后从该双端的尾部获取一个ProducerBatch(没有则新建),查看该ProducerBatch是否还可以写入消息,如果可以则写入。如果不可写入则需创建一个新的ProducerBatch,在创建时会评估消息是否大于batch.size 如不超过,则以batch.size 指定的大小来创建一个ProducerBatch. 这样在使用完这段内存区域后,可以通过BufferPool的管理来实现复用。如果单个消息体的大小超过了batch.size, 则就已评估后的大小来创建ProducerBatch, 那么这段内存区域不会纳入BufferPool的管理,不会被复用。关键代码如下:

  1. RecordAccumulator.append() {
  2. ....
  3. byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
  4. int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
  5. log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
  6. buffer = free.allocate(size, maxTimeToBlock);
  7. ....
  8. }

4) Sender从RecordAccumulator中获取缓存的消息时,取出的形式为:>, 然后会进一步转换为>, 其中Node表示Kafka集群的Broker节点。对应Kafka的网络组件来说,他只关心与具体的Broker的连接,只关心向那个Broker发消息,并不关系消息在哪个分区,但是对于Kafka的Producer组件来说,他需要关心向那个分区发送哪些消息,所以在这里需要做一层应用逻辑层到网络I/O层的转换。

在转换为>后,Sender还会进一步封装为的对象,这样就可以将Request请求发往各个Node了。这里的Request是只Kafka的各种协议请求,对于消息发送而言就是ProducerRequest,其他还有元数据请求(MetadataRequest) 消费者请求等。

发送到Broker的Request还会进一步保存在InFligntRequest 可以理解为正在发送的请求列表,InFligntRequest保存的Request形式为>, 他的作用主要是缓存已经发出去但还没收到响应的请求(NodeId是一个字符串类型,表示节点的编号(Integer.toString(id))).
除此之外,InFligntRequest内部提供了很多控制方法,可通过配置参数限制每个连接(客户端与Node)最多缓存的请求数,这个配置参数为:max.in.flignt.request.per.connection,默认值为5,即每个连接最多缓存5个未响应的请求。超过该数值,Kakfa的网络组件则不会向这个连接发送更多的请求,进而转入等待中。除非有缓存的请求收到了响应。通过比较Deque的size与此参数的大小来判断对应的Node是否堆积了很多未响应的请求,如果真是如此,说明当前Node的负载较大或网络连接有问题,在继续发生请求会增大请求超时的可能。

获取最小负载节点(leastLoadedNode)

InFligntRequest 除了做请求限制外,还可以用于获取最小负载的节点。通过每个Node在InFligntRequest中未完成响应的请求可以判断当前Node的负载情况,如果未完成请求的数量越大,则仍未该Node的负载越大. 如:
image.png
可以看到Node1的负载相对较小,因此可作为可选的节点,选择负载较小的节点,可以避免网络拥塞等异常可以快速发送请求并获取响应。一般用于发送元数据更新请求,消费者组播协议。
相关代码逻辑:

  1. org.apache.kafka.clients.NetworkClient#leastLoadedNode() {
  2. int offset = this.randOffset.nextInt(nodes.size());
  3. for (int i = 0; i < nodes.size(); i++) {
  4. int idx = (offset + i) % nodes.size();
  5. Node node = nodes.get(idx);
  6. if (canSendRequest(node.idString(), now)) {
  7. // 优先选择
  8. int currInflight = this.inFlightRequests.count(node.idString());
  9. if (currInflight == 0) {
  10. // if we find an established connection with no in-flight requests we can stop right away
  11. log.trace("Found least loaded node {} connected with no in-flight requests", node);
  12. return node;
  13. } else if (currInflight < inflight) {
  14. // otherwise if this is the best we have found so far, record that
  15. inflight = currInflight;
  16. foundReady = node;
  17. }
  18. }
  19. }