概述

Kafka 的网络底层是基于 Java NIO 实现,考虑维护成本,尽量减少对其它项目的依赖,它并没有使用当下流行的网络基础设施框架 Netty,而是自己实现了一套基于 Java NIO 的异步非阻塞网络模型。本章,我们会通过源码深入了解客户端的各个组件的功能以及口味工业级的异步非阻塞网络框架有哪些细节。
Client 端网络层层次结构图.png
上图是客户端网络层层次结构图,里面包含了构建客户端异步非阻塞模型的核心组件,由下到上依次做简要介绍:

  1. 第一层:JDK NIO。熟悉 Java NIO 网络编程的同学都应该十分了解。这里就不做说明。
  2. 第二层:传输层。这里 Kafka 是将 SSL 握手动作抽象出来。Kafka 支持多种安全传输协议更多详见官方文档。安全协议的验证是由 SslTransportLayer 实现类完成,而我们通常不需要加密传输,一般使用 PlaintextTransportLayer。它们都持有 Java 的 SelectionKey 和 SocketChannel,这是不是很熟悉呢?
  3. 第三层:KafkaChannel。这个对象持有传输层的实现类,因此,可以向底层的 SocketChannel 发送数据。抽象这一层主要是有两个非常重要的变量,分别是表示从通道中读到的数据对象 receive 和写入通道的数据对象 send,通过这两个对象可以判断数据读取/写入即时状态。这也是 Kafka 处理”粘包”的关键。
  4. 第四层:Kafka Selector。这个并非是 Java 的 Selector,所以我特别在前面加上 Kafka。我们知道,Kafka 的客户端会和多个 Broker 端建立连接(因为可能需要向多个主题分区发送数据),因此,需要一个”大管家”管理 KafkaChannel,于是 Kafka Selector 应运而生。除了保存每个 KafkaChannel 对象外,它还持有 Java Selector,这样就能通过轮询获取感兴趣的事件集合。网络 I/O 事件的核心处理逻辑就是在 Selector 中实现的。
  5. 第五层:KafkaClient。这是客户端直接交互的类,KafkaClient 接口定义了非常常用的 API,比如判断节点是否就绪、发送数据、执行 I/O 操作、断开某个节点的 Socket 连接等等。对客户端屏蔽了底层网络实现细节。
  6. 第六层:Sender 线程。Client 端只有一个网络 I/O 线程,通过多路复用技术和多个 Broker 进行数据交互。它在 while() 循环中不断从消息缓冲区抽取数据并发送给 Broker。

通过上面的图示与概述,相信读者应该对 Client 端的网络组件有大致的了解,下面会从源码讲解组件之间是如何串和数据是如何流转的。

生产者-消费者模型 Sender 线程

概述是由下到上讲述各个组件的功能,接下来是由上到下通过源码方式讲述各个组件具体实现逻辑。
org.apache.kafka.clients.producer.internals.Sender 线程是我们的突破口,正如节点标题所说,它是构成生产者-消费者模型的一部分。很明显,它属于消费者,不断从 RecordAccumulator 消息缓冲区中获取消息批次然后发送给 Broker 端。核心源码如下:

  1. // org.apache.kafka.clients.producer.internals.Sender
  2. public class Sender implements Runnable {
  3. @Override
  4. public void run() {
  5. log.debug("Starting Kafka producer I/O thread.");
  6. // main loop, runs until close is called
  7. while (running) {
  8. try {
  9. // #1 不断从缓存队列中获取消息批次并发送给Broker
  10. runOnce();
  11. } catch (Exception e) {
  12. log.error("Uncaught error in kafka producer I/O thread: ", e);
  13. }
  14. }
  15. }
  16. /**
  17. * Sender线程核心逻辑。逻辑十分清晰:
  18. * 1.如果有事务管理器,则需要进行事务处理,比如生成PID、事务ID等。
  19. * 事务管理器比较麻烦,详见 https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
  20. * 2.从{@link RecordAccumulator}中抽取数据。这里,需要明确一点,存在{@link RecordAccumulator}是以分区为key,value就是该分区的消息数据。但是对Sender来说,
  21. * 它是以Broker节点为粒度,所以需要转换为key为Broker ID,value为消息数据,这个消息数据包含多个主题。
  22. * 3.执行网络I/O操作,对SocketChannel进行读/写操作。将数据发送给Broker端、接收来自上一次发送的ACK响应。
  23. */
  24. void runOnce() {
  25. // #1 阶段一:事务管理器处理事务/幂等(如果有必要的话)
  26. if (transactionManager != null) {
  27. try {
  28. // #1-1
  29. transactionManager.maybeResolveSequences();
  30. // #1-2 校验事务管理器状态,如果有致命异常,则不能继续发送数据
  31. if (transactionManager.hasFatalError()) {
  32. RuntimeException lastError = transactionManager.lastError();
  33. if (lastError != null)
  34. maybeAbortBatches(lastError);
  35. client.poll(retryBackoffMs, time.milliseconds());
  36. return;
  37. }
  38. // #1-3 检查当前Producer是否需要一个新的PId,如果需要,
  39. // 则组装InitProducerId请求并放入「pendingRequests」队列中等待发送
  40. transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
  41. // #1-4 如果事务管理器中有需要发送的请求,则先将请求发送出现
  42. if (maybeSendAndPollTransactionalRequest()) {
  43. // 返回
  44. return;
  45. }
  46. } catch (AuthenticationException e) {
  47. // This is already logged as error, but propagated here to perform any clean ups.
  48. log.trace("Authentication exception while processing transactional request", e);
  49. transactionManager.authenticationFailed(e);
  50. }
  51. }
  52. // 获取当前时间戳
  53. long currentTimeMs = time.milliseconds();
  54. // #2 阶段二:从消息缓冲区中(RecordAccumulator)抽取待消息批次,
  55. // 并放入指定KafkaChannel变量中,等待发送
  56. long pollTimeout = sendProducerData(currentTimeMs);
  57. // #3 阶段三:真正执行网络I/O操作,对底层SocketChannel进行读/写等I/O操作
  58. client.poll(pollTimeout, currentTimeMs);
  59. }
  60. }

Sender 实现 Runnable,意味着它是一个线程。runOnce() 才是 Sender 类的核心逻辑:

  1. 事务管理器处理事务相关操作,比如生成 PID、事务 ID 等。详见 kafka 官方文档对事务说明
  2. 从消息缓冲区(RecordAccumulator)抽取可发送的消息批次。并放入到 KafkaChannel#send 变量中,等待数据网络 I/O 执行。
  3. 执行网络 I/O 操作。向底层 SocketChannel 读取/写入二进制数据。

可见,Sender 线程作为消费者,不断从消息缓冲区中获取消息批次,并将消息调用 KafkaClient#poll(Time) 方法执行网络 I/O。

Sender 线程从消息缓冲区中抽取数据

主要核心方法是 Sender#sendProducerData,最重要的目的是从 RecordAccumulator 缓冲区中抽取数据并放入对应的 KafkaChannel#send 变量中,等待网络 I/O 执行。

  1. // org.apache.kafka.clients.producer.internals.Sender#sendProducerData
  2. /**
  3. * 从{@link RecordAccumulator} 中抽取可发送的消息批次,并放入 {@link KafkaChannel#send} 变量中
  4. *
  5. * 1. 过滤未知分区Leader的主题列表。需要重新从Broker获取这些主题的元数据信息。
  6. * 2. 遍历「数据已就绪」的节点列表,挨个判断KafkaChannel是否可以发送数据(连接是否就绪、inFlightReqeust数量等等)
  7. * 3. 从「消息缓冲区 {@link RecordAccumulator}」中抽取待发送批次,并以节点为单位转化为 Map<Integer, List<ProducerBatch>> 形式,
  8. * 表示我要向节点发送这么一批数据。数据抽取也是有讲究的,每个节点抽取的数据量不能超过 {@link #maxRequestSize}
  9. * 4. 将批次信息写入{@link #inFlightBatches}变量中,表示即将发送或已发送但未收到 ACK 的消息批次。
  10. * 5. 处理已过期的消息批次,分为事务和非事务批次处理。对于非事务的处理方式较为简单:1.更新ProducerBatch状态 2.从「inFlightBatches」中移除 3.从RecordAccumulator移除
  11. * 6. 将将待发送的消息批次写入{@link KafkaChannel#send}变量中,等待写入底层的 {@link java.nio.channels.SocketChannel}
  12. *
  13. * @param now 当前时间
  14. * @return
  15. */
  16. private long sendProducerData(long now) {
  17. // #1 获取集群元数据
  18. Cluster cluster = metadata.fetch();
  19. // #2 过滤得到可发送数据的节点列表、未知分区Leader的主题列表
  20. RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
  21. // #3 如果存在分区Leader未知的主题列表,那么可能的原因是主题被删除了或该分区正在发生副本Leader选举,会出现无Leader的真空期。
  22. // 此时客户端需要做的事情就是向Broker请求该主题的元数据信息,并有这些主题的数据暂时不会发送给Broker
  23. if (!result.unknownLeaderTopics.isEmpty()) {
  24. for (String topic : result.unknownLeaderTopics)
  25. this.metadata.add(topic, now);
  26. // 更新刚才添加的主题的元数据信息
  27. this.metadata.requestUpdate();
  28. }
  29. // #4 遍历已就绪节点集合,再次判断节点是否可以发送数据
  30. Iterator<Node> iter = result.readyNodes.iterator();
  31. long notReadyTimeout = Long.MAX_VALUE;
  32. while (iter.hasNext()) {
  33. Node node = iter.next();
  34. // #4-1 判断节点是否准备好发送数据(Socket连接状态、是否被限流等全局因素考虑)
  35. if (!this.client.ready(node, now)) {
  36. // 节点没有准备好,则从当前列表中移除,数据留待下次连接就绪后再发送
  37. iter.remove();
  38. // 计算并更新下次Sender唤醒时间
  39. notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
  40. }
  41. }
  42. // #5 从缓存队列中(RecordAccumulator)抽取数据,得到结果集:Map<Broker ID, 发送给该Broker的批次列表>
  43. Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
  44. // #6 将批次添加到Sender线程的「inFlightBatches」中
  45. addToInflightBatches(batches);
  46. // #7 保证单个分区的顺序性,前提是「max.in.flight.requests.per.connection=1」,
  47. // 这会让发往该分区的请求看起来是阻塞式I/O操作:即发送请求1->接收请求1响应->发送请求2->接收请求2响应。
  48. // 虽然能保证单个分区的顺序性,但是会造成分区吞吐量严重下降。
  49. // 如果一定要确保单分区的顺序性,则可以使用幂等Producer实现
  50. if (guaranteeMessageOrder) {
  51. // #7-1 告诉RecordAccumulator,下次抽取消息批次时跳过这些分区,
  52. // 因为它们每次只能发送一个消息批次,必须等待收到ACK后才能发送下一个批次
  53. for (List<ProducerBatch> batchList : batches.values()) {
  54. for (ProducerBatch batch : batchList)
  55. // 静默该分区
  56. this.accumulator.mutePartition(batch.topicPartition);
  57. }
  58. }
  59. // #8 重置下一个消息批次过期时间
  60. accumulator.resetNextBatchExpiryTime();
  61. // #9 从飞行队列「inFlightBatches」中获取「过期」的消息批次列表
  62. List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
  63. // #10 从消息缓冲区中获取「过期」的消息批次列表
  64. List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
  65. // #11 汇总两个过期批次,统一处理
  66. expiredBatches.addAll(expiredInflightBatches);
  67. // #12 处理「过期」的消息批次,如果有事务,则进行事务相关的错误处理
  68. for (ProducerBatch expiredBatch : expiredBatches) {
  69. String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " +
  70. expiredBatch.topicPartition + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
  71. // #12-1 对于过期的消息批次,无事务处理:1.更新ProducerBatch状态 2.从「inFlightBatches」中移除 3.从RecordAccumulator移除
  72. failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
  73. if (transactionManager != null && expiredBatch.inRetry()) {
  74. // #12-2 关于事务方面的处理
  75. transactionManager.markSequenceUnresolved(expiredBatch);
  76. }
  77. }
  78. sensors.updateProduceRequestMetrics(batches);
  79. // #12 计算发送超时时间。
  80. // ① 有可发送的批次,则置为0,这样可以立即循环并尝试发送更多数据。
  81. // ② 没有可发送的批次,超时时间将是「下一批到期时间」和「检查数据可用性的延迟时间」之间的最小值
  82. // ③ 节点可能由于 lingering、回退等原因,有尚未发送的数据。
  83. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
  84. pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
  85. pollTimeout = Math.max(pollTimeout, 0);
  86. if (!result.readyNodes.isEmpty()) {
  87. pollTimeout = 0;
  88. }
  89. // #13 将待发送的消息批次写入KafkaChannel#send变量中,等待发送
  90. sendProduceRequests(batches, now);
  91. // #4 返回执行poll(time)轮询的超时时间
  92. return pollTimeout;
  93. }

为了更好地理解相关 API 调用顺序,这里贴一幅图方便大家理解:
Sender线程API调用流程图.png
核心方法是步骤 #13,这个方法做以下操作:

  1. 为保证向下兼容,需要将消息转换为节点所能支持的最小版本号所对应的消息格式。
  2. 构建 ClientReqeust 对象,该对象包含目标的 Broker ID、收到 ACK 响应后执行的回调函数、请求头和请求体。

    注意:由于本章节重点是讲述客户端网络组件之间的串联关系,对于其它重要的但非本章重点的内容只做一点概述。

  1. // org.apache.kafka.clients.producer.internals.Sender#sendProduceRequest
  2. /**
  3. * 1.Magic版本处理。如果有不满足最小版本号的消息批次,为保证向下兼容,需要对每个消息进行转换。
  4. * 2.构建ProduceRequestData.TopicProduceDataCollection,存放分区消息批次
  5. * 3.构建ProduceRequest.Builder对象,包含ACKS、请求超时时间、事务ID、分区消息批次
  6. * 4.构建ClientRequest对象,节点ID、请求Builder对象、请求超时时间、响应成功后的回调函数
  7. * 5.将ClientRequest对象交给 {@link KafkaClient#send(ClientRequest, long)}实例
  8. *
  9. * @param now 现在时间戳
  10. * @param destination 目标节点ID
  11. * @param acks acks配置,详见 {@link org.apache.kafka.clients.producer.ProducerConfig#ACKS_CONFIG},-1=all
  12. * @param timeout 响应超时时间,详见 {@link org.apache.kafka.clients.CommonClientConfigs#REQUEST_TIMEOUT_MS_CONFIG}
  13. * @param batches 消息批次集合,集合内的批次都发往同一个节点Node
  14. */
  15. private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
  16. if (batches.isEmpty())
  17. return;
  18. // 分区对应批次。在同一个节点中,每个分区只会对应一个批次
  19. final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
  20. // #1 找到消息的最小版本号
  21. byte minUsedMagic = apiVersions.maxUsableProduceMagic();
  22. for (ProducerBatch batch : batches) {
  23. if (batch.magic() < minUsedMagic)
  24. minUsedMagic = batch.magic();
  25. }
  26. // 记录节点对应的所有分区数据
  27. ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
  28. // #2 遍历批次,如果版本号不匹配,为了保证向下兼容会对每条消息进行转换。
  29. for (ProducerBatch batch : batches) {
  30. TopicPartition tp = batch.topicPartition;
  31. // #2-1 获取记录
  32. MemoryRecords records = batch.records();
  33. // #2-2 为保证消息格式向下兼容,需要将所有的消息记录转化为最低版本号
  34. // 如果有必要,可以向下转换到使用的最小魔法值。
  35. // 一般来说,在生产者构建批处理和Sender线程发送请求之间可能会有延迟,所以我们可能根据已过时的元数据来选择消息格式。
  36. // 在最坏的情况下,由于我们乐观地选择使用新的消息格式,但发现Broker并不支持,所以我们需要在发送前在客户端进行向下转换。
  37. // 这是为了处理围绕集群升级的边缘情况,因为在这种情况下,并非所有Broker都支持相同的消息格式版本,肯定会有出入的。
  38. // 例如,如果一个分区从一个支持新的Magic版本的Broker迁移到一个不支持新的Magic版本的Broker,那么我们就需要对消息进行转换。
  39. if (!records.hasMatchingMagic(minUsedMagic))
  40. records = batch.records().downConvert(minUsedMagic, 0, time).records();
  41. // #2-3 根据主题名称从「tpd」对象中获取消息数据
  42. ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
  43. if (tpData == null) {
  44. // #2-4 如果没有则创建新的对象
  45. tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic());
  46. // #2-5 将对象添加到tpd
  47. tpd.add(tpData);
  48. }
  49. // #2-6 将消息批次添加到tpData对象
  50. tpData.partitionData().add(new ProduceRequestData.PartitionProduceData()
  51. .setIndex(tp.partition())
  52. .setRecords(records));
  53. recordsByPartition.put(tp, batch);
  54. }
  55. // #3 从事务管理器中获取事务ID
  56. String transactionalId = null;
  57. if (transactionManager != null && transactionManager.isTransactional()) {
  58. transactionalId = transactionManager.transactionalId();
  59. }
  60. // #4 创建一个请求体Builder(Builer设计模式),用于构建ProduceRequest对象
  61. ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic,
  62. new ProduceRequestData()
  63. .setAcks(acks) // ACKS配置
  64. .setTimeoutMs(timeout) // 请求超时时间
  65. .setTransactionalId(transactionalId) // 事务ID
  66. .setTopicData(tpd)); // 发往Broker的所有分区数据
  67. // #5 构建响应处理器,当成功收到ACK响应后,会回调这个函数
  68. RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds());
  69. String nodeId = Integer.toString(destination);
  70. // #6 使用ClientRequest对象包装请求节点ID、Builder、响应回调函数、是否期望收到Broker的ACK响应等数据
  71. ClientRequest clientRequest =
  72. client.newClientRequest(nodeId, requestBuilder, now, acks != 0, requestTimeoutMs, callback);
  73. // #7 将ClientRequest对象交给KafkaClient处理
  74. client.send(clientRequest, now);
  75. }

上面的源码重点就是构建 ClientRequet 对象,这个对象包含一个请求的所有元数据信息和批次消息,当然,Kafka 为向下兼容也做了很多努力。步骤 #7 将已构建好的 ClientRequest 对象交给 KakfaClient 处理。

KafkaClient 顶层接口 统一抽象

接口 KfkaClient 定义了很多十分好用的 API,比如我想找到最小负载的节点,可以调用 leastLoadedNode(long now); 比如我要发送数据,可调用 send(ClientRequest, long); 手动执行 I/O 操作,可调用 poll(long, long) 等等。其中最重要的两个方法当属 sendpoll

  1. // org.apache.kafka.clients.KafkaClient
  2. /**
  3. * 定义数据发送、数据接收、相关判断等接口。主要实现是 {@link NetworkClient}
  4. * ① 判断节点状态(是否已经准备就绪)
  5. * ② 判断连接等待时间。根据规则计算连接等待时间。
  6. * ③ 判断节点是否已经断开连接了。
  7. * ④ 将 {@link ClientRequest} 对象放入缓存
  8. * ⑤ 真正执行I/O操作,将缓存中的数据发往节点或读取节点返回的数据,并放入相应的对象中。
  9. *
  10. * 这个是一个高级的API接口,定义了数据发送、触发I/O操作、判断等一系列方法。
  11. */
  12. public interface KafkaClient extends Closeable {
  13. /**
  14. * 这个方法很重要,是客户端发送消息的入口。
  15. * 我们之前说过,Sender线程会按规则从 {@link org.apache.kafka.clients.producer.internals.RecordAccumulator}
  16. * 抽取数据,并按<node id, List<ProducerBatch>>形式组装数据,
  17. * 然后由 {@link org.apache.kafka.clients.producer.internals.Sender#sendProduceRequest(long, int, short, int, List)} 方法创建 {@link ClientRequest},
  18. * 最后调用这个API向SocketChannel写入数据(当然,这个写入不是由 {@link KafkaClient} 实现类完成,而是 {@link org.apache.kafka.common.network.TransportLayer} 完成) 。
  19. *
  20. * 这个方法主要目的是将待发送的 {@param request} 封装为 {@link org.apache.kafka.common.network.Send} 对象,
  21. * 然后再把该对象写入 {@link org.apache.kafka.common.network.KafkaChannel#send} 中。
  22. *
  23. * @param request 待发送请求对象
  24. * @param now 当前时间戳
  25. */
  26. void send(ClientRequest request, long now);
  27. /**
  28. * {@link #send(ClientRequest, long)} 方法将数据已经放入指定位置({@link org.apache.kafka.common.network.KafkaChannel#send}),
  29. * 这个方法本质就是调用JDK底层的 {@link java.nio.channels.SocketChannel#write(ByteBuffer)}将 {@link KafkaChannel#send}的数据写入通道中。
  30. * 当然,这个方法并非只做这么一件事情,还要接收从节点返回过来的Response消息,在收到响应后做收尾工作,比如触发相关回调函数。
  31. *
  32. * @param timeout 超时时间
  33. * @param now 当前时间戳
  34. * @return
  35. */
  36. List<ClientResponse> poll(long timeout, long now);
  37. // ...
  38. }

NetworkClient 是 KafkaClient 唯一实现类,熟悉一下内部变量:

  1. /**
  2. * 定义数据发送、数据接收、相关判断等接口。
  3. */
  4. public class NetworkClient implements KafkaClient {
  5. // 用于执行底层网络I/O请求/响应
  6. // 这个对象是Kafka的轮询器,用于
  7. private final Selectable selector;
  8. // 元数据更新类
  9. private final MetadataUpdater metadataUpdater;
  10. // 集群节点连接状态
  11. private final ClusterConnectionStates connectionStates;
  12. // 按节点ID分类,缓存「in-flight」的请求
  13. private final InFlightRequests inFlightRequests;
  14. // 配置 SO_SNDBUF(发送缓冲区大小),默认值:131072
  15. private final int socketSendBuffer;
  16. // 配置 SO_RCVBUF (接收缓冲区),默认值:32768
  17. private final int socketReceiveBuffer;
  18. // 每个响应超时时间,默认值:30000
  19. private final int defaultRequestTimeoutMs;
  20. // Socket重连退避时间,默认值:50
  21. private final long reconnectBackoffMs;
  22. /**
  23. * 是否需要和 broker 商进行版本协调。默认值:true
  24. * 由于Kafka大版本之间存在不同协议。因此,需要在发送数据之前进行版本协调,
  25. * 否则出现错误或broker端不接收发送端发送的数据。
  26. * 在发送数据前会发送一个 {@link ApiVersionsRequest} 类型的请求
  27. */
  28. private final boolean discoverBrokerVersions;
  29. // API版本号
  30. private final ApiVersions apiVersions;
  31. /**
  32. * 记录需要进行版本协调的节点。
  33. * 当和节点完成TCP/SSL连接后,下一步就是和节点协调版本。
  34. */
  35. private final Map<String, ApiVersionsRequest.Builder> nodesNeedingApiVersionsFetch = new HashMap<>();
  36. /**
  37. * 存储中止发送的请求
  38. */
  39. private final List<ClientResponse> abortedSends = new LinkedList<>();
  40. // ...
  41. }

个人觉得重要的变量是 selectorinFlightRequestconnectionStates

inFlightReqeust

这是说说 inFlightRequest,飞行集合我们在 Sender 见过,它是存储分区对应的消息批次,而在 NetworkClient 的这个是存储节点对应的请求体,源码如下图所示:

  1. // org.apache.kafka.clients.InFlightRequests
  2. final class InFlightRequests {
  3. /** 默认值:5 */
  4. private final int maxInFlightRequestsPerConnection;
  5. private final Map<String, Deque<NetworkClient.InFlightRequest>> requests = new HashMap<>();
  6. // others
  7. }

这个队列存在的意义有以下几点:

  1. 根据 Deque 数量判断节点负载情况。数量越小,说明节点负载越小。
  2. 限流。如果 Deque 数量超出 maxInFlightRequestsPerConnection,那么就不允许向该发送数据。Sender 线程在抽取数据时会通过这个队列判断是否可以抽取该节点的消息。
  3. 处理超时请求。如果 Deque 存在超时的请求,那么 kafka 会主动关闭该节点的 Socket 连接,并处理相关请求。
  4. 保证响应是有序接收的。当收到来自 Broker 端的 ACK 响应,这个响应就对应 Deque#peekFirst() 的请求。

    doSend

    1. // org.apache.kafka.clients.NetworkClient#doSend(org.apache.kafka.clients.ClientRequest, boolean, long, org.apache.kafka.common.requests.AbstractRequest)
    2. /**
    3. * 1.根据版本号生成请求头
    4. * 2.根据请求头和请求体构建 {@link Send} 子类
    5. * 3.生成 {@link InFlightRequest} 请求并放入 {@link #inFlightRequests} 队列中
    6. * 4.委托 {@link org.apache.kafka.common.network.Selector} 发送请求
    7. *
    8. * @param clientRequest 待发送的请求
    9. * @param isInternalRequest 是否属于内部请求
    10. * @param now 当前时间戳
    11. * @param request 请求体
    12. */
    13. private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
    14. String destination = clientRequest.destination();
    15. // #1 生成请求头
    16. RequestHeader header = clientRequest.makeHeader(request.version());
    17. // #2 根据请求头构建Send对象
    18. Send send = request.toSend(header);
    19. // #3 生成 InFlightRequest 并放入「in-flight」队列中
    20. // NetworkClient对象中也有「in-flight」的设计,可以通过这个对象判断Client与Broker的连接是否拥堵
    21. InFlightRequest inFlightRequest = new InFlightRequest(
    22. clientRequest,
    23. header,
    24. isInternalRequest,
    25. request,
    26. send,
    27. now);
    28. this.inFlightRequests.add(inFlightRequest);
    29. // #4 委托Selector发送数据,入参是目标的Broker ID和待发送的数据
    30. selector.send(new NetworkSend(clientRequest.destination(), send));
    31. }

    方法 doSend() 将请求体加入 InFlightRequest 对象中,然后构建 NetworkSend 对象交给 Selector 发送。

    kafka Selectable

    Kafka 使用 Selectable 来管理多个 KafkaChannel,核心功能是监控 KafkaChannel 数据读/写状态。当然,还有数据写入、执行 I/O 操作等方法。

    1. // org.apache.kafka.common.network.Selectable
    2. /**
    3. * 该接口定义一个异步、多通道网络I/O接口。
    4. * 我们时刻记住,Kafka的网络模型(无论客户端还是服务端)都是基于JDK NIO 异步非阻塞模型,所以它需要不断轮询并处理相关I/O。
    5. * 对于 {@link Selectable} 而言,相关 API 调用顺序是
    6. * // 1.建立TCP连接 {@link #connect(String, InetSocketAddress, int, int)}
    7. * // 在一个while(true)轮询并处理I/O数据
    8. * while(true) {
    9. * // 2.发送数据
    10. * send();
    11. * // 3.执行I/O
    12. * poll();
    13. * // 4.回调已完成发送的请求的回调函数
    14. * completedSends();
    15. * // 5.回调已成功接收的响应的回调函数
    16. * completedReceives()
    17. * }
    18. *
    19. *
    20. * 这个网络接口很有特点:
    21. * ① 一个 {@link Selectable} 实例对象掌管多个 {@link KafkaChannel} 连接通道,即一个对象掌管多个 TCP 连接。详见 {@link Selector} 实现类。
    22. * 因此,由于它们之间的操作是独立的,因此管控相对较容易。
    23. * ② {@link NetworkSend} 对象包含消息所发往的节点ID,方法 {@link #send(NetworkSend)} 根据节点ID从缓存中选择合适的 {@link KafkaChannel} 并写入缓存,
    24. * 此时数据并未没有写入底层的 {@link java.nio.channels.SocketChannel}。
    25. * ③ 方法 {@link #poll(long)} 才会触发底层I/O操作的核心方法,它会触发真正的I/O操作。
    26. */
    27. public interface Selectable {
    28. /**
    29. * See {@link #connect(String, InetSocketAddress, int, int) connect()}
    30. */
    31. int USE_DEFAULT_BUFFER_SIZE = -1;
    32. /**
    33. * 和目标节点建立Socket连接
    34. *
    35. * @param id 待连接的目标节点的Broker ID
    36. * @param address 待连接IP地址
    37. * @param sendBufferSize TCP发送缓冲区大小
    38. * @param receiveBufferSize TCP接收缓冲区大小
    39. * @throws IOException
    40. */
    41. void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException;
    42. /**
    43. * 调用 {@link java.nio.channels.Selector#wakeup()} 方法
    44. */
    45. void wakeup();
    46. /**
    47. * 关闭Kafka轮询器,并关闭该轮询器绑定的所有Socket连接
    48. */
    49. void close();
    50. /**
    51. * 关闭指定Socket连接
    52. *
    53. * @param id 待关闭的节点ID
    54. */
    55. void close(String id);
    56. /**
    57. * 将待发送的请求体对象放到对应的 {@link KafkaChannel#send} 变量中。
    58. * 此时还不会将数据写入底层的 SocketChannel,当调用 {@link #poll(long)} 后才会真正执行I/O操作。
    59. *
    60. * @param send 包装待发送的请求体
    61. */
    62. void send(NetworkSend send);
    63. /**
    64. * 执行底层的I/O操作
    65. * 将{@link KafkaChannel#send}变量的数据写入SocketChannel。
    66. * 或从SocketChannel读取数据并写入{@link KafkaChannel#receive}变量
    67. *
    68. * @param timeout 超时时间
    69. * @throws IOException
    70. */
    71. void poll(long timeout) throws IOException;
    72. /**
    73. * 获取当前Selector管理的所有KafkaChannel完成发送的 {@link KafkaChannel#send}变量值,返回一个List集合。
    74. * 这是由上次调用 {@link #poll(long)} 所生成。
    75. * 我们需要明确:{@link Selectable} 实例管理多个{@link KafkaChannel},也就是说,
    76. * 底层的 {@link KafkaChannel} 数据的接收和发送都是由Selectable实现类统一调度的。
    77. * 那Selectable是如何知道哪些通道事件已就绪了?答案就是Selectable持有{@link java.nio.channels.Selector},
    78. * 它所管理的通道会将相关事件注册到该Selector对象中,这样,Selectable就拥有感知通道的I/O事件。
    79. *
    80. * @return
    81. */
    82. List<NetworkSend> completedSends();
    83. /**
    84. * "收割"已完成数据接收的 {@link KafkaChannel#receive} 变量,
    85. * 这是由上次调用 {@link #poll(long)} 所生成。
    86. * @return
    87. */
    88. Collection<NetworkReceive> completedReceives();
    89. /**
    90. * 返回在最近一次 {@link #poll(long)} 方法产生的连接已断开的数据集合
    91. * @return
    92. */
    93. Map<String, ChannelState> disconnected();
    94. /**
    95. * 返回在最近一次 {@link #poll(long)} 方法产生的连接已完成的节点ID集合
    96. * @return
    97. */
    98. List<String> connected();
    99. // others
    100. }

    Selector 是 Selectable 唯一实现类,看看它有哪些重要的变量:

    1. // org.apache.kafka.common.network.Selector
    2. public class Selector implements Selectable, AutoCloseable {
    3. public static final long NO_IDLE_TIMEOUT_MS = -1;
    4. public static final int NO_FAILED_AUTHENTICATION_DELAY = 0;
    5. /**
    6. * 关闭模式枚举类
    7. */
    8. private enum CloseMode {
    9. /**
    10. * 优雅关闭:处理剩余收到的缓冲区中的数据,并通知连接已关闭
    11. */
    12. GRACEFUL(true),
    13. /**
    14. * 仅通知:丢弃缓冲区内的所有数据,仅通知连接已关闭
    15. */
    16. NOTIFY_ONLY(true),
    17. /**
    18. * 直接关闭:丢弃缓冲区内的所有数据,不做任何通知
    19. */
    20. DISCARD_NO_NOTIFY(false);
    21. boolean notifyDisconnect;
    22. CloseMode(boolean notifyDisconnect) {
    23. this.notifyDisconnect = notifyDisconnect;
    24. }
    25. }
    26. private final Logger log;
    27. // JDK 底层的轮询器。一个轮询器可以轮询多个 {@link SocketChannel} 通道
    28. private final java.nio.channels.Selector nioSelector;
    29. // 缓存Kafka Selector所管理的 {@link KafkaChannel}
    30. private final Map<String, KafkaChannel> channels;
    31. // 显示对KafkaChannel静默
    32. private final Set<KafkaChannel> explicitlyMutedChannels;
    33. // 表示缓存不够了
    34. private boolean outOfMemory;
    35. // 保存成功写入底层 SocketChannel 的对象请求体(由 {@link NetworkSend} 对象封装)
    36. private final List<NetworkSend> completedSends;
    37. // 保存从SocketChannel读取的完整的数据
    38. private final LinkedHashMap<String, NetworkReceive> completedReceives;
    39. private final Set<SelectionKey> immediatelyConnectedKeys;
    40. // 正在关闭的KafkaChannel。为了优雅关闭
    41. private final Map<String, KafkaChannel> closingChannels;
    42. /**
    43. * SocketChannel的内核Buffer还有未读取的数据,
    44. * 待下次调用 {@link #poll(long)} 时优先从这些通道中读取
    45. */
    46. private Set<SelectionKey> keysWithBufferedRead;
    47. /**
    48. * 一次「poll()」方法调用过程中发现的已断开的Socket连接
    49. */
    50. private final Map<String, ChannelState> disconnected;
    51. /**
    52. * 一次「poll()」方法调用过程中新建立的连接
    53. */
    54. private final List<String> connected;
    55. /**
    56. * 一次「poll()」过程中向哪些「Node」节点发送失败
    57. */
    58. private final List<String> failedSends;
    59. // 时间工具
    60. private final Time time;
    61. private final SelectorMetrics sensors;
    62. /**
    63. * 创建 {@link KafkaChannel} 的构造器(Builder),
    64. * 根据配置创建 {@link TransportLayer} 接口所对应的实现类。
    65. * 如果有安全要求,配置了SSL等安全协议,则会生成 {@link SslTransportLayer},
    66. * 如果没有安全要求,则会生成 {@link PlaintextTransportLayer}
    67. */
    68. private final ChannelBuilder channelBuilder;
    69. /**
    70. * 内存对象池,分配 ByteBuffer 对象
    71. */
    72. private final MemoryPool memoryPool;
    73. private final IdleExpiryManager idleExpiryManager;
    74. // 默认值:-1
    75. private final int maxReceiveSize;
    76. private final boolean recordTimePerConnection;
    77. private final LinkedHashMap<String, DelayedAuthenticationFailureClose> delayedClosingChannels;
    78. private final long lowMemThreshold;
    79. private final int failedAuthenticationDelayMs;
    80. /**
    81. * 指示上次调用{@link #poll(long)}后在读取数据方面是否取得进展。
    82. * 这个参数被用来避免当内存容量不足以读取更多的数据时出现小循环。
    83. *
    84. * 默认是true,表示上次{@link #poll(long)} 在读数据方面取得进展,否则为false
    85. */
    86. private boolean madeReadProgressLastPoll = true;
    87. // other
    88. }

    下面的列表是比较重要的变量:

变量 作用
java.nio.channels.Selector nioSelector JDK 底层轮询器,轮询 Client 所有 SocketChannel 的感兴趣事件
Map channels Client 所有 KafakChannel,key为 通道所连接的 Broker ID,因此,我们就可以通过 Broker ID 获取相应的 KafkaChannel
NetworkSend NetworkSend 是对 ClientRequest 的二进制数据数据的表示形式,它是关注二进制数据,直接可以和 SocketChannel 打交道,内部有变量记录数据大小、已写入字节数、剩余字节数等元数据。
List completedSends 保存将数据全部写入 SocketChannel 的 NetworkSend 对象。遍历 channels 列表,收集 NetworkSend 剩余字节数为 0 的对象即可。因为 NetworkSend 对象保存 destinationId,所以只有 List 存储即可
LinkedHashMap completedReceives 保存从 SocketChannel 接收完整的 NetworkReceive 对象。NetworkReceive 是处理 TCP “粘包”的关键类。
Set keysWithBufferedRead SocketChannel 的内核 Buffer 还有未读取的数据,需要将它记录下来,下次poll()操作时优先从这些通道中读取数据
ChannelBuilder channelBuilder 根据配置文件构建 TransportLayer 特定的实现类。
如果启用 SSL,则会生成 SslTransportLayer,否则生成 PlaintextTransportLayer
MemoryPool memoryPool ByteBuffer 对象池
IdleExpiryManager idleExpiryManager 根据 LRU 算法关闭空闲的 Socket 连接,减少资源浪费

再看核心方法 send(NetworkSend)

  1. // org.apache.kafka.common.network.Selector#send
  2. /**
  3. * 1.从缓存中获取{@link KafkaChannel}
  4. * 2.将 {@link NetworkSend}对象写入 {@link KafkaChannel#send}变量中
  5. *
  6. * @param send 包装待发送的请求体
  7. */
  8. public void send(NetworkSend send) {
  9. // #1 获取目标节点Broker ID
  10. String connectionId = send.destinationId();
  11. // #2 根据Broker ID从缓存中获取KafkaChannel
  12. KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
  13. if (closingChannels.containsKey(connectionId)) {
  14. this.failedSends.add(connectionId);
  15. } else {
  16. try {
  17. // #3 将NetworkSend写入KafkaChannel#send变量中
  18. channel.setSend(send);
  19. } catch (Exception e) {
  20. // #4 无论出现什么异常,都需要关闭KafkChannel,并设置相关状态
  21. channel.state(ChannelState.FAILED_SEND);
  22. this.failedSends.add(connectionId);
  23. close(channel, CloseMode.DISCARD_NO_NOTIFY);
  24. if (!(e instanceof CancelledKeyException)) {
  25. log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}", connectionId, e);
  26. throw e;
  27. }
  28. }
  29. }
  30. }
  1. 根据 Broker ID 从变量 channelsclosingChannels 获取 kafakChannel 对象。
  2. 如果 closingChannels 包含该 Broker ID,说明该通道正要关闭,不允许发送数据。
  3. 对于正常的通道则将待发送的对象写入 KafkaChannel#send 变量中。

怎么样,其实核心步骤就是将对象 NetworkSend 写入 KafkaChannel#send 变量中。
到这里,待发送的数据准备到位了,接下来我们就可以执行网络 I/O 操作。

I/O 操作

NetworkClient#poll

Sender 线程调用 KafkaClient#poll() 方法,就表示执行网络 I/O 操作。我们看看 NetworkClient#poll() 源码是如何实现的:

  1. // org.apache.kafka.clients.NetworkClient#poll
  2. /**
  3. * 对底层的 Socket 进行读/写操作
  4. *
  5. * @param timeout 超时时间。实际值可能是 timeout、reqeust timeout 和 metdata timeout 的最小值
  6. * @param now 当前时间
  7. * @return 接收到的响应集合
  8. */
  9. @Override
  10. public List<ClientResponse> poll(long timeout, long now) {
  11. // 确保NetworkClient状态处于「ACTIVE」
  12. ensureActive();
  13. // #1 处于由于不支持的版本号而出现异常或连接断开的请求
  14. if (!abortedSends.isEmpty()) {
  15. List<ClientResponse> responses = new ArrayList<>();
  16. // #1-1 将响应添加到response集合中,并清除「abortedSends」
  17. handleAbortedSends(responses);
  18. // #1-2 执行回调函数
  19. completeResponses(responses);
  20. // #1-3 返回
  21. return responses;
  22. }
  23. // #2 如果需要,则进行「元数据更新」操作。maybeUpdate(now)只是组装请求体,并不执行I/O操作,并返回超时时间
  24. // 向负载最小的节点发送「MetadataRequest」请求
  25. long metadataTimeout = metadataUpdater.maybeUpdate(now);
  26. try {
  27. // #2-1 执行网络I/O操作
  28. this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
  29. } catch (IOException e) {
  30. log.error("Unexpected error during I/O", e);
  31. }
  32. // process completed actions
  33. long updatedNow = this.time.milliseconds();
  34. List<ClientResponse> responses = new ArrayList<>();
  35. // #3 处理已完成数据发送的KafkaChannel。这是"收割"上一次调用poll()函数成功发送的请求
  36. handleCompletedSends(responses, updatedNow);
  37. // #4 处理成功收到Broker的ACK响应
  38. handleCompletedReceives(responses, updatedNow);
  39. // #5 处理已断开的Socket连接
  40. handleDisconnections(responses, updatedNow);
  41. // #6 处理新创建的Socket连接
  42. handleConnections();
  43. // #7 如果某个Socket连接刚创建,那么首先就要发送一个「APIVersion」请求
  44. handleInitiateApiVersionRequests(updatedNow);
  45. // #8 处理超时的Socket连接
  46. handleTimedOutConnections(responses, updatedNow);
  47. // #9 处理超时的请求
  48. handleTimedOutRequests(responses, updatedNow);
  49. // #10 执行回调函数
  50. completeResponses(responses);
  51. // #11 返回
  52. return responses;
  53. }

逻辑十分清晰明了,画个图看看:
#NwtworkClient#poll(long) 执行流程示意图.png
最核心的方法是 selector.poll(time),源码如下:

  1. // org.apache.kafka.common.network.Selector#poll
  2. /**
  3. * △ 基于Java的NIO网络编程模型,
  4. * 处理 {@link SocketChannel} 的OP_CONNECT、OP_WRITE、OP_READ等I/O事件。
  5. *
  6. * @param timeout 需要等待的超时时间
  7. * @throws IllegalArgumentException
  8. * @throws IllegalStateException
  9. */
  10. @Override
  11. public void poll(long timeout) throws IOException {
  12. if (timeout < 0) throw new IllegalArgumentException("timeout should be >= 0");
  13. boolean madeReadProgressLastCall = madeReadProgressLastPoll;
  14. // #1 清除所有与上次「poll()」调用的全部缓存数据
  15. clear();
  16. // #2 判断Socket Buffer中是否还有数据
  17. boolean dataInBuffers = !keysWithBufferedRead.isEmpty();
  18. // #3 设置超时时间
  19. if (!immediatelyConnectedKeys.isEmpty() || (madeReadProgressLastCall && dataInBuffers))
  20. timeout = 0;
  21. // #4 判断缓存压力是否仍然存在
  22. if (!memoryPool.isOutOfMemory() && outOfMemory) {
  23. // #4-1 走到这里,就说明之前迫于缓存压力而无法发送数据的情况已经过去了(缓解)
  24. // 现在可以发送数据了。遍历当前Selector管理的所有KafkaChannel,并将它们取消静默
  25. for (KafkaChannel channel : channels.values()) {
  26. if (channel.isInMutableState() && !explicitlyMutedChannels.contains(channel)) {
  27. channel.maybeUnmute();
  28. }
  29. }
  30. // #4-2 更新状态
  31. outOfMemory = false;
  32. }
  33. long startSelect = time.nanoseconds();
  34. // #5 调用JDK的Selector#selectNow()/select(time)获取已就绪事件数量
  35. // 记住,在NIO编程中,只有注册相关事件才会得到SelectionKey
  36. int numReadyKeys = select(timeout);
  37. long endSelect = time.nanoseconds();
  38. this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
  39. // #6 处理I/O事件,只要一个条件成立就会执行:
  40. // 1.I/O事件数量>0 2.存在可瞬时连接的Socket连接 3.Socket Buffer仍存在数据暂未被读取
  41. if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
  42. // #6-1 获取就绪事件集合
  43. Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
  44. // #6-2 从Socket Buffer仍存在数据的通道中抽取数据。这种情况是因为数据到了内核的Buffer中,但是Kafka没有足够的内存接收,
  45. // 所以这批数据一直存在内核的Buffer,等待下次轮询时读取
  46. if (dataInBuffers) {
  47. // 1.将「keysWithBufferedRead」集合一分为二,
  48. // 剔除本轮「readyKeys」,然后对剩余的「SelectionKey」进行相应的I/O处理
  49. keysWithBufferedRead.removeAll(readyKeys);
  50. Set<SelectionKey> toPoll = keysWithBufferedRead;
  51. keysWithBufferedRead = new HashSet<>();
  52. // 2.I/O处理
  53. pollSelectionKeys(toPoll, false, endSelect);
  54. }
  55. // #7 内核Buffer已经存在数据或底层SocketChannel已准备发送数据,那么对SocketChannel进行读/写操作
  56. pollSelectionKeys(readyKeys, false, endSelect);
  57. // #8 别忘记清除这些SelectionKey,否则会出大问题
  58. readyKeys.clear();
  59. // #9 瞬时建立的连接单独处理
  60. pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
  61. // #10 清除瞬时建立的连接集合
  62. immediatelyConnectedKeys.clear();
  63. } else {
  64. // #11 本次poll()在读数据操作中没有取得进展
  65. madeReadProgressLastPoll = true;
  66. }
  67. long endIo = time.nanoseconds();
  68. this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
  69. // #12 关闭延迟的通道
  70. completeDelayedChannelClose(endIo);
  71. // #13 根据LRU算法策略关闭最旧的Socket连接
  72. maybeCloseOldestConnection(endSelect);
  73. }
  1. 步骤 #5 就是调用 Java 的 Selector#select() 方法从内核中获取就绪的事件数量。
  2. 可能由于 ByteBuffer 对象池容量不够而导致没有空间容纳内核已经接收到的数据,Kafka 使用 keysWithBufferedRead 记录这个 SelectionKey,随后会优先从这些 SocketChannel 获取数据(步骤 #6-2)。
  3. 步骤 #7 的方法是 Selector 的核心方法,它会根据已就绪的 I/O 事件对 SocketChannel 建立Socket连接/读/写等操作。这是 Kafka 网络编程核心方法。
  4. 其余步骤与网络编程关联较小,所以就不在这里进行说明。

    处理 I/O 事件 pollSelectionKeys

    客户端的 SocketChannel 感兴趣的 I/O 事件共有三类,分别是
  • OP_CONNECT:客户端主题和 Broker 建立连接。
  • OP_READ:SocketChannel 可读。
  • OP_WRITE:SocketChannel 可写。

相关源码如下:

  1. // org.apache.kafka.common.network.Selector#pollSelectionKeys
  2. /**
  3. * {@link Selector} 核心方法,用于I/O操作
  4. * Selector对每个 {@link SocketChannel} 通道雨露均沾
  5. *
  6. * @param selectionKeys 待处理的 {@link SelectionKey} 集合,可以从该对象中获取底层的 {@link SocketChannel}
  7. * @param isImmediatelyConnected 是否处理瞬时连上的Socket连接
  8. * @param currentTimeNanos 当前时间戳
  9. */
  10. void pollSelectionKeys(Set<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {
  11. // #1 determineHandlingOrder方法打乱集合,避免相关通道处于饥饿
  12. for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
  13. // #2 获取KafkaChannel,与底层的java.nio.channels.SocketChannel一一对应,代表一个TCP连接
  14. KafkaChannel channel = channel(key);
  15. long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;
  16. boolean sendFailed = false;
  17. String nodeId = channel.id();
  18. sensors.maybeRegisterConnectionMetrics(nodeId);
  19. // #3 更新空闲连接时间
  20. if (idleExpiryManager != null)
  21. idleExpiryManager.update(nodeId, currentTimeNanos);
  22. try {
  23. // #4 处理瞬时连上或OP_CONNECT事件的Socket连接
  24. if (isImmediatelyConnected || key.isConnectable()) {
  25. if (channel.finishConnect()) {
  26. // #4-1 底层TCP连接已建立,Selector对象记录已建连的节点ID
  27. this.connected.add(nodeId);
  28. this.sensors.connectionCreated.record();
  29. } else {
  30. // #4-2 连接未就绪,跳过
  31. continue;
  32. }
  33. }
  34. // #5 底层TCP通道已连接(isConnected()返回true)但是还未完成身份认证(ready()返回false)
  35. // Kafka提供多种安全认证机制,主要分为SSL和SASL2两大类。其中SASL是基于账号密码的认证方式
  36. if (channel.isConnected() && !channel.ready()) {
  37. // #5-1 ①完成TCP或TCP+SSL握手; ② 完成身份认证
  38. channel.prepare();
  39. // #5-2 再次判断通道是否已准备就绪(通道就绪包含两个条件:①底层TCP/TCP+SSL已建立连接;②通道Kakfa身份认证)
  40. if (channel.ready()) {
  41. long readyTimeMs = time.milliseconds();
  42. boolean isReauthentication = channel.successfulAuthentications() > 1;
  43. // 记录认证延迟
  44. if (isReauthentication) {
  45. // 超过一次认证成功
  46. sensors.successfulReauthentication.record(1.0, readyTimeMs);
  47. if (channel.reauthenticationLatencyMs() == null)
  48. log.warn("Should never happen: re-authentication latency for a re-authenticated channel was null; continuing...");
  49. else
  50. sensors.reauthenticationLatency.record(channel.reauthenticationLatencyMs().doubleValue(), readyTimeMs);
  51. } else {
  52. sensors.successfulAuthentication.record(1.0, readyTimeMs);
  53. if (!channel.connectedClientSupportsReauthentication()) sensors.successfulAuthenticationNoReauth.record(1.0, readyTimeMs);
  54. }
  55. log.debug("Successfully {} authenticated with {}", isReauthentication ? "re-" : "", channel.socketDescription());
  56. }
  57. }
  58. // #6 更新KafkaChannel通道状态
  59. if (channel.ready() && channel.state() == ChannelState.NOT_CONNECTED)
  60. channel.state(ChannelState.READY);
  61. // 身份认证相关处理
  62. Optional<NetworkReceive> responseReceivedDuringReauthentication = channel.pollResponseReceivedDuringReauthentication();
  63. responseReceivedDuringReauthentication.ifPresent(receive -> {
  64. long currentTimeMs = time.milliseconds();
  65. addToCompletedReceives(channel, receive, currentTimeMs);
  66. });
  67. // #7 尝试从通道中读取数据
  68. if (channel.ready()
  69. && (key.isReadable() || channel.hasBytesBuffered())
  70. && !hasCompletedReceive(channel)
  71. && !explicitlyMutedChannels.contains(channel)) {
  72. attemptRead(channel);
  73. }
  74. // #8 对于SslTransportLayer而言有用,而PlainTransportLayer则固定返回false
  75. if (channel.hasBytesBuffered()) {
  76. // 这个通道有字节排在我们无法读取的中间缓冲区中(可能是因为没有内存)。
  77. // 可能的情况是底层套接字不会在下一次 poll() 中出现,因此我们需要记住该通道以进行下一次轮询调用,否则数据可能永远停留在所述缓冲区中。
  78. // 如果我们尝试处理缓冲数据并且没有任何进展,则会清除通道缓冲状态以避免每次检查的开销。
  79. keysWithBufferedRead.add(key);
  80. }
  81. // #9 尝试向SocketChannel中写入数据
  82. long nowNanos = channelStartTimeNanos != 0 ? channelStartTimeNanos : currentTimeNanos;
  83. try {
  84. attemptWrite(key, channel, nowNanos);
  85. } catch (Exception e) {
  86. sendFailed = true;
  87. throw e;
  88. }
  89. // #10 如果SocketChannel为非法,则主动关闭,避免资源泄漏
  90. if (!key.isValid())
  91. close(channel, CloseMode.GRACEFUL);
  92. } catch (Exception e) {
  93. String desc = channel.socketDescription();
  94. if (e instanceof IOException) {
  95. log.debug("Connection with {} disconnected", desc, e);
  96. } else if (e instanceof AuthenticationException) {
  97. boolean isReauthentication = channel.successfulAuthentications() > 0;
  98. if (isReauthentication)
  99. sensors.failedReauthentication.record();
  100. else
  101. sensors.failedAuthentication.record();
  102. String exceptionMessage = e.getMessage();
  103. if (e instanceof DelayedResponseAuthenticationException)
  104. exceptionMessage = e.getCause().getMessage();
  105. log.info("Failed {}authentication with {} ({})", isReauthentication ? "re-" : "",
  106. desc, exceptionMessage);
  107. } else {
  108. log.warn("Unexpected error from {}; closing connection", desc, e);
  109. }
  110. if (e instanceof DelayedResponseAuthenticationException)
  111. maybeDelayCloseOnAuthenticationFailure(channel);
  112. else
  113. close(channel, sendFailed ? CloseMode.NOTIFY_ONLY : CloseMode.GRACEFUL);
  114. } finally {
  115. maybeRecordTimePerConnection(channel, channelStartTimeNanos);
  116. }
  117. }
  118. }

画了这个函数执行流程图:
Selector 处理IO事件流程图.png

  1. 首先处理 OP_CONNECT 事件,由于是非阻塞的 SocketChannel,因此,调用 SocketChannel#connect() 方法后会立即返回,运气好可能会秒连接,而 Kafak 针对秒连也做了相应处理(使用 immediatelyConnectedKeys 变量存储秒连接的 SelectionKey),一连来说,连接建立是需要一定时间,因此,通常 #connect() 方法会返回 false。需要等待下一轮调用 SocketChannel#finishConnect() 判断是否连接成功。如果连接成功,则返回 true,否则,继续在下一轮接着判断,直到抛出 IOException 异常或成功连接。当通道成功连接后,马上要做的事情就是移除 OP_CONNETC 事件和添加 OP_READ 事件。
  2. 紧接着更新 KafkaChannel 的状态,将状态变更为 READY,意味着通道可以接收和发送数据了。
  3. 其它就和正常的 NIO 编程一样,处理 OP_READOP_WRITE 等 I/O 事件。

从源码中我们没有看到 Kafka 网络框架对 Epoll BUG 的修复,他们认为这是 JDK 的锅。

接收数据如何处理粘包问题?

Netty 解决粘包有四种方式:

  1. Fixed Length。定长的内容。这种解析很快,但协议不灵活。
  2. 特定分隔符。比如以 aaa 结尾。
  3. 基于换行符 \n\r\n 对文件进行分割。
  4. 基于特定的长度字段。这种方式告诉接收方内容长度,接收方收到该长度的数据就可认为是完整的数据了。

Kafka 是使用第四种方式,基于特定的长度字段,因为是自己实现的 RPC 协议,所以定义也特别简单:前 4 个字节表示后面内容的长度。
处理粘包.png
二进制数据的读取操作是在 NetworkReceive#readFrom(SocketChannel) 方法中:

  1. // org.apache.kafka.common.network.NetworkReceive#readFrom
  2. /**
  3. * 从 {@link SocketChannel} 通道中读取数据
  4. *
  5. * @param channel
  6. * @return
  7. * @throws IOException
  8. */
  9. public long readFrom(ScatteringByteChannel channel) throws IOException {
  10. // 本轮读取字节数
  11. int read = 0;
  12. // 如果「size」还有空位,说明未读满4个字节,继续从
  13. if (size.hasRemaining()) {
  14. // 继续填满「size」ByteBuffer
  15. int bytesRead = channel.read(size);
  16. if (bytesRead < 0)
  17. // 抛出异常
  18. throw new EOFException();
  19. // 记录读取字节数
  20. read += bytesRead;
  21. // 如果「size」还有空位,说明还没有填满,只能下次再读了
  22. // 如果「size」没有空位,说明内容长度已经确定,我们读取数据就有目标值,
  23. // 这就是解决TCP粘包的关键步骤了
  24. if (!size.hasRemaining()) {
  25. size.rewind();
  26. // 读取后的内容长度值
  27. int receiveSize = size.getInt();
  28. if (receiveSize < 0)
  29. throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
  30. // 校验长度,屏蔽恶意请求
  31. if (maxSize != UNLIMITED && receiveSize > maxSize)
  32. throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
  33. // 内容长度可能为0
  34. requestedBufferSize = receiveSize;
  35. if (receiveSize == 0) {
  36. buffer = EMPTY_BUFFER;
  37. }
  38. }
  39. }
  40. // 已确定内容长度,但还未给内容分配ByteBuffer对象以接收数据
  41. if (buffer == null && requestedBufferSize != -1) {
  42. // 向对象池申请ByteBuffer
  43. buffer = memoryPool.tryAllocate(requestedBufferSize);
  44. // 申请失败,说明当前内存吃紧,只能跳过,等待下一轮poll()方法的调用
  45. if (buffer == null)
  46. log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source);
  47. }
  48. // 成功申请到ByteBuffer对象
  49. if (buffer != null) {
  50. // 从SocketChannel读取数据以填充ByteBuffer对象
  51. int bytesRead = channel.read(buffer);
  52. if (bytesRead < 0)
  53. throw new EOFException();
  54. read += bytesRead;
  55. }
  56. // 记录此次成功读取的字节数
  57. return read;
  58. }
  1. 首先,判断重要的表示内容长度的 4 个字节是否读取成功。变量 size 就是用来存储这 4 个字节。成功读取的特征是 size.hasRemaining() = false。如果没有读完,则继续读。必须知道内容长度才能进行下一步操作。
  2. 已知内容长度,那么就可以根据长度值向 MemoryPool 对象池申请 ByteBuffer 对象,申请操作是有约束的,如果内存不够,则返回 null,本次读取操作就会跳过,等待下一轮 Selector#poll() 调用才能继续尝试从 SocketChannel 读取数据。
  3. 成功申请到 ByteBuffer,就直接调用 SocketChannel#read(ByteBuffer) 将数据填满 ByteBuffer 对象。

那怎么才算得到一个完整的响应呢? 是通过 complete() 方法完成的:

  1. /**
  2. * 判断是否成功得到一个完整的响应体
  3. *
  4. * @return
  5. */
  6. @Override
  7. public boolean complete() {
  8. return !size.hasRemaining() // 4个字节读完
  9. && buffer != null // ByteBuffer对象不能为空,即便是0负载的也会有指向EMPTY_BUFFER
  10. && !buffer.hasRemaining(); // 负载ByteBuffer没有空闲位置,因为ByteBuffer大小等于内容长度
  11. }

总结

写了两天,终于把 kafka 客户端网络组件写完了。有些细节并没有讲清楚,比如如何进行 SSL 握手。不过这不影响理解 Kafka 客户端网络组件。本章单纯地涉及到网络相关的知识,对于 Kafka Producer 业务相关的没有讲太多。希望大家能从这篇文章中对 Kafka 客户端网络组件有个更宏观的认识。