整体架构

前面讲到消息在真正发往 Kafka 之前,有可能需要经历拦截器、序列化器和分区器等一系列的作用,那么在此之后又会发生什么呢?我们先来看一下生产者客户端的整体架构:
image.png
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。在主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器 RecordAccumulator 中。Sender 线程负责从 RecordAccumulator 中获取消息并将其发送到 Kafka 中。

1. RecordAccumulator

RecordAccumulator 主要用来缓存消息以便 Sender 线程可以进行批量发送,从而减少网络传输的资源消耗进而提升性能。RecordAccumulator 缓存的大小通过生产者客户端参数 buffer.memory 控制,默认值为 32 MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,此时 KafkaProducer 的 send 方法会被被阻塞,阻塞时间取决于参数 max.block.ms 的配置,默认为 60 秒。超时后则抛出异常。

  1. public final class RecordAccumulator {
  2. private final BufferPool free;
  3. private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
  4. ......
  5. }

主线程发送过来的消息都会被追加到 RecordAccumulator 的某个双端队列中,RecordAccumulator 内部为每个分区都维护了一个双端队列,队列中的内容就是 ProducerBatch,即 Deque。消息写入缓存时会追回到双端队列的尾部;Sender 读取消息时会从双端队列的头部读取。

注意:ProducerBatch 是指一个消息批次,ProducerRecord 会被包含在 ProducerBatch 中,这样可以使字节的使用更加紧凑,同时将较小的 ProducerRecord 拼凑成一个较大的 ProducerBatch 也可以减少网络请求的次数以提升整体的吞量。如果生产者客户端需要向很多分区发送消息,则可以将 buffer.memory 参数适当地调大以增加整体的吞吐量。

消息追加到双端队列尾部:

  1. private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
  2. Callback callback, Deque<ProducerBatch> deque, long nowMs) {
  3. // 获取双端队列尾部元素
  4. ProducerBatch last = deque.peekLast();
  5. if (last != null) {
  6. // 向ProducerBatch追加消息并获取写入的Future
  7. FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
  8. if (future == null)
  9. last.closeForRecordAppends();
  10. else
  11. return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
  12. }
  13. return null;
  14. }

消息在网络上都是以字节(Byte)形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在 Kafka 生产者客户端中,通过 java.io.ByteBuffer 实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的,因此 RecordAccumulator 内部自己实现了一个 BufferPool,它主要是用来实现 ByteBuffer 的复用,以实现缓存的高效利用。不过 BufferPool 只针对特定大小的 ByteBuffer 进行管理,而其它大小的 ByteBuffer 不会缓存进 BufferPool 中,这个特定的大小由 batch.size 参数控制,默认值为 16 KB。

ProducerBatch 的大小和 batch.size 参数也有着密切的关系。当一条消息流入 RecordAccumulator 时,会先寻找与消息分区所对应的双端队列,如果没有则创建。再从这个双端队列的尾部获取一个 ProducerBatch,如果没有则创建,判断 ProducerBatch 中是否还可以写入这个 ProdcucerRecord,如果可以则写入,如果不可以则需要创建一个新的 ProducerBatch。在新建 ProducerBatch 时需要评估这条消息的大小是否超过 batch.size 参数的大小,如果不超过就以 batch.size 参数指定的大小来创建 ProducerBatch,这样在使用完这段内存区域后可以通过 BufferPool 的管理来进行复用;如果超过就以这条消息的大小创建 ProducerBatch,但这段内存区域就不会被复用了。

RecordAccumulator 的 append 方法实现:

  1. // 分配可用内存,size为消息的大小
  2. buffer = free.allocate(size, maxTimeToBlock);
  3. // 追加到缓冲区
  4. MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
  5. // 新建ProducerBatch并添加到双端队列
  6. ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
  7. dq.addLast(batch);
  8. // 回收内存
  9. free.deallocate(buffer);

2. Sender

Sender 线程在创建 KafkaProducer 时启动,它会与 bootstrap.servers 参数指定的所有 Broker 建立连接,并在一个死循环中不断读取 RecordAccumulator 中缓存的批次消息发送给 broker,直到客户端被关闭。

  1. public class Sender implements Runnable {
  2. /* the state of each nodes connection */
  3. private final KafkaClient client;
  4. /* the record accumulator that batches records */
  5. private final RecordAccumulator accumulator;
  6. /* the maximum request size to attempt to send to the server */
  7. private final int maxRequestSize;
  8. /* the number of acknowledgements to request from the server */
  9. private final short acks;
  10. /* the number of times to retry a failed request before giving up */
  11. private final int retries;
  12. /* the max time to wait for the server to respond to the request*/
  13. private final int requestTimeoutMs;
  14. /* The max time to wait before retrying a request which has failed */
  15. private final long retryBackoffMs;
  16. // A per-partition queue of batches ordered by creation time for tracking the in-flight batches
  17. private final Map<TopicPartition, List<ProducerBatch>> inFlightBatches;
  18. ......
  19. }

Sender 从 RecordAccumulator 中获取缓存的消息后,会进一步将原本 <分区,Deque> 的保存形式转变成 > 的形式,其中 Node 表示 Kafka 集群的 broker 节点。对于网络连接来说,生产者客户端是与具体的 broker 节点建立连接,也就是向具体的 broker 节点发送消息,而并不关心消息属于哪一个分区;而对于 KafkaProducer 的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,而不管这个分区 leader 位于哪个 broker 上,所以这里需要做一个应用逻辑层面到网络 I/O 层面的转换。

  1. private long sendProducerData(long now) {
  2. Cluster cluster = metadata.fetch();
  3. // 获取待发送消息对应分区的节点
  4. RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
  5. ......
  6. // 获取要发送的消息集合,其中 key 为 NodeId
  7. Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
  8. // 添加到InFlight缓冲区
  9. addToInflightBatches(batches);
  10. ......
  11. // 发送生产者消息
  12. sendProduceRequests(batches, now);
  13. ......
  14. }

在转换成 > 的形式后,Sender 还会进一步封装成 的形式,这样就可以将 Request 请求发往各个 Node 了,这里的 Request 是指 Kafka 的各种协议请求,对于消息发送而言就是指具体的 ProducerRequest

  1. /**
  2. * Create a produce request from the given record batches
  3. */
  4. private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
  5. Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
  6. final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
  7. ......
  8. for (ProducerBatch batch : batches) {
  9. TopicPartition tp = batch.topicPartition;
  10. MemoryRecords records = batch.records();
  11. ......
  12. produceRecordsByPartition.put(tp, records);
  13. recordsByPartition.put(tp, batch);
  14. }
  15. // 构建ProduceRequest请求体
  16. ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
  17. produceRecordsByPartition, transactionalId);
  18. // 构建请求回调函数,执行成功则清除缓冲区数据,执行失败则重试或抛异常
  19. RequestCompletionHandler callback = new RequestCompletionHandler() {
  20. public void onComplete(ClientResponse response) {
  21. handleProduceResponse(response, recordsByPartition, time.milliseconds());
  22. }
  23. };
  24. // 发送网络请求到broker
  25. String nodeId = Integer.toString(destination);
  26. ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
  27. requestTimeoutMs, callback);
  28. client.send(clientRequest, now);
  29. }

3. InFlightRequests

请求在从 Sender 线程发往 Kafka 之前还会保存到 InFlightRequests 中,InFlightRequests 保存对象的具体形式为 Map>,其中 key 表示节点的 ID,它的主要作用是缓存了已经发出去但还没有收到响应的请求。

NetworkClient 的 doSend 方法逻辑:

  1. private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
  2. String destination = clientRequest.destination();
  3. RequestHeader header = clientRequest.makeHeader(request.version());
  4. ......
  5. Send send = request.toSend(destination, header);
  6. InFlightRequest inFlightRequest = new InFlightRequest(
  7. clientRequest,
  8. header,
  9. isInternalRequest,
  10. request,
  11. send,
  12. now);
  13. // 添加到InFlightRequests中
  14. this.inFlightRequests.add(inFlightRequest);
  15. selector.send(send);
  16. }
  17. public void add(NetworkClient.InFlightRequest request) {
  18. String destination = request.destination;
  19. Deque<NetworkClient.InFlightRequest> reqs = this.requests.get(destination);
  20. if (reqs == null) {
  21. reqs = new ArrayDeque<>();
  22. this.requests.put(destination, reqs);
  23. }
  24. // 添加到双端队列头部
  25. reqs.addFirst(request);
  26. inFlightRequestCount.incrementAndGet();
  27. }

与此同时,InFlightRequests 还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与 Node 之间的连接)最多缓存的请求数。这个配置参数为 max.in.flight.requests.per.connection,默认值为 5,即每个连接最多只能缓存 5 个未响应的请求,超过该数值之后就不能向这个连接发送更多的请求了,除非有缓存的请求收到了响应。

4. Selector

Kafka 客户端与 broker 的交互采用了 NIO 的非阻塞读写。当发送生产者数据时,会根据主题分区对应的 Node 获取远程 broker 的地址,如果此时客户端没有与该 broker 创建连接,则会尝试建立连接。

  1. private void initiateConnect(Node node, long now) {
  2. String nodeConnectionId = node.idString();
  3. try {
  4. connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup);
  5. InetAddress address = connectionStates.currentAddress(nodeConnectionId);
  6. selector.connect(nodeConnectionId,
  7. new InetSocketAddress(address, node.port()),
  8. this.socketSendBuffer,
  9. this.socketReceiveBuffer);
  10. }
  11. ......
  12. }
  13. public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
  14. ensureNotRegistered(id);
  15. SocketChannel socketChannel = SocketChannel.open();
  16. SelectionKey key = null;
  17. try {
  18. configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
  19. // 建立socket连接
  20. boolean connected = doConnect(socketChannel, address);
  21. // 注册到底层 NIO Selector 上
  22. key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
  23. ......
  24. }
  25. ......
  26. }
  27. protected SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps) throws IOException {
  28. SelectionKey key = socketChannel.register(nioSelector, interestedOps);
  29. // 把底层socket封装成KafkaChannel,用来执行读写操作
  30. KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key);
  31. // 维护节点与channel之间的映射关系
  32. this.channels.put(id, channel);
  33. // 更新channle的空闲时间,channle对应的连接超过指定时间则会被关闭
  34. if (idleExpiryManager != null)
  35. idleExpiryManager.update(channel.id(), time.nanoseconds());
  36. return key;
  37. }

从代码中可以看到,Selector 为每个节点维护了一个 KafkaChannel,KafkaChannel 封装了底层的 socket 用来进行读写事件,生产者请求发送到 Selector 指定的 KafkaChannel 后就会返回了,此时客户端还未真正将请求发送给 broker,请求会在 Sender 线程的 poll 轮训中发送给 broker。

  1. /**
  2. * Queue the given request for sending in the subsequent {@link #poll(long)} calls
  3. * @param send The request to send
  4. */
  5. public void send(Send send) {
  6. String connectionId = send.destination();
  7. // 根据节点ID获取关联的channel
  8. KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
  9. ......
  10. // send里维护了ByteBuffer,里面存储了要发送的数据
  11. channel.setSend(send);
  12. ......
  13. }
  14. public void setSend(Send send) {
  15. this.send = send;
  16. // 注册一个可写事件
  17. this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
  18. }

在 Sender 线程中会不断轮训执行 Selector 的 poll 方法,在其 poll 方法中会调用底层 NIO 的 selector 组件获取可读或可写的 channel。

Kafka 自己实现的 Selector 的 poll 方法逻辑:

  1. public void poll(long timeout) throws IOException {
  2. ......
  3. // 检查是否有已经就绪的事件
  4. int numReadyKeys = select(timeout);
  5. if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
  6. // 获取可以处理的SelectionKey
  7. Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
  8. ......
  9. // 处理读写事件,写事件就是发送生产者请求
  10. pollSelectionKeys(readyKeys, false, endSelect);
  11. // 处理完成后清除所有选定的键,以便它们可以在下一次轮训中被选中
  12. readyKeys.clear();
  13. ......
  14. } else {
  15. madeReadProgressLastPoll = true; //no work is also "progress"
  16. }
  17. ......
  18. }

Kafka 客户端与 broker 建立的 TCP 连接的存活时间可以通过 connections.max.idle.ms 参数控制。默认情况下该参数值是 9 分钟,即如果在 9 分钟内没有任何请求流过某个 TCP 连接,那么 Kafka 会主动关闭该 TCP 连接。用户可以在 Producer 端设置该参数为 -1 以禁掉这种主动关闭的机制。

元数据的更新

元数据是指 Kafka 集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的 leader 副本分配在哪个节点上,follow 副本分配在哪些节点上,哪些副本在 AR、ISR 等集合中,集群中有哪些节点,控制器节点是哪一个等等信息。

当客户端中没有需要使用的元数据信息时,比如给一个不存在的主题发送消息时,Broker 会告诉 Producer 这个主题不存在。此时 Producer 会给 Kafka 集群发送请求尝试获取最新的元数据信息。其次 Producer 会通过 metadata.max.age.ms 参数定期更新元数据信息,默认值是 5 分钟,即不管集群那边是否有变化,Producer 每隔五分钟都会强制刷新一次元数据以保证它可以最及时的数据。

元数据的更新操作是在客户端内部进行的,对客户端的外部使用者不可见。当需要更新元数据时,会先挑选出 leastLoadedNode(负载最小的节点),然后向这个 Node 发送 MetadataRequest 请求来获取具体的元数据信息。这个更新操作是由 Sender 线程发起的,在创建完 MetadataRequest 后同样会存入 InFlightRequests,之后的步骤和发送消息时是类似的。

  1. @Override
  2. public List<ClientResponse> poll(long timeout, long now) {
  3. // 更新集群元数据
  4. long metadataTimeout = metadataUpdater.maybeUpdate(now);
  5. try {
  6. this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
  7. } catch (IOException e) {
  8. log.error("Unexpected error during I/O", e);
  9. }
  10. ......
  11. return responses;
  12. }