基于kafka2.x 版本

生产者局部架构图

image.png

Sender线程run()步骤:场景驱动方式cluster中已有元数据

1、获取元数据
2、判断哪些partitinon有消息要发送,获取分区号对应的broker主机
3、标志还没有拉取到元数据的topic
4、检查与要发送数据的主机网络是否建立好
5、有可能要发送的parttition有多个,有一些parttition的leader partition在同一台服务器上。
按照broker进行分区,同一个broker的partition为同一组
6、放弃超时的batch
7、创建发送消息的请求
8、执行网络操作的这个NetWordClient组件,包括:发送请求、接收响应

run()流程图:

image.png
Sender 发送流程中,大部分都是调用 RecordAccumulator 方法来实现其特定逻辑,重点对上述涉及到RecordAccumulator 的方法进行一个详细剖析
RecordAccumulator#ready 该方法主要就是根据缓存区中的消息,判断哪些分区已经达到发送条件。

  1. public ReadyCheckResult ready(Cluster cluster, long nowMs) {
  2. Set<Node> readyNodes = new HashSet<>();
  3. long nextReadyCheckDelayMs = Long.MAX_VALUE;
  4. Set<String> unknownLeaderTopics = new HashSet<>();
  5. boolean exhausted = this.free.queued() > 0;
  6. for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) { // @1
  7. TopicPartition part = entry.getKey();
  8. Deque<ProducerBatch> deque = entry.getValue();
  9. Node leader = cluster.leaderFor(part); // @2
  10. synchronized (deque) {
  11. if (leader == null && !deque.isEmpty()) { // @3
  12. // This is a partition for which leader is not known, but messages are available to send.
  13. // Note that entries are currently not removed from batches when deque is empty.
  14. unknownLeaderTopics.add(part.topic());
  15. } else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) { // @4
  16. ProducerBatch batch = deque.peekFirst();
  17. if (batch != null) {
  18. // waitedTimeMs=100ms 消息最多存多久发出去
  19. long waitedTimeMs = batch.waitedTimeMs(nowMs);
  20. boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
  21. long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
  22. // 如果队列大于1说明队列里至少有一个批次写满了;也可能只有一个批次,这个批次正好写满
  23. boolean full = deque.size() > 1 || batch.isFull();
  24. boolean expired = waitedTimeMs >= timeToWaitMs;
  25. boolean sendable = full || expired || exhausted || closed || flushInProgress();
  26. if (sendable && !backingOff) { // @5
  27. readyNodes.add(leader);
  28. } else {
  29. long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
  30. // Note that this results in a conservative estimate since an un-sendable partition may have
  31. // a leader that will later be found to have sendable data. However, this is good enough
  32. // since we'll just wake up and then sleep again for the remaining time.
  33. nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
  34. }
  35. }
  36. }
  37. }
  38. }
  39. return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
  40. }

代码@1:对生产者缓存区 ConcurrentHashMap> batches 遍历,从中挑选已准备好的消息批次。
代码@2:从生产者元数据缓存中尝试查找分区(TopicPartition) 的 leader 信息,如果不存在,当将该 topic 添加到 unknownLeaderTopics (代码@3),稍后会发送元数据更新请求去 broker 端查找分区的路由信息。
代码@4:如果不在 readyNodes 中就需要判断是否满足条件,isMuted 与顺序消息有关,本文暂时不关注,在后面的顺序消息部分会重点探讨。
代码@5:这里就是判断是否准备好的条件,先一个一个来解读局部变量的含义。

  • long waitedTimeMs
    该 ProducerBatch 已等待的时长,等于当前时间戳 与 ProducerBatch 的 lastAttemptMs 之差,在 ProducerBatch 创建时或需要重试时会将当前的时间赋值给lastAttemptMs。
  • retryBackoffMs
    当发生异常时发起重试之前的等待时间,默认为 100ms,可通过属性 retry.backoff.ms 配置。
  • batch.attempts()
    该批次当前已重试的次数。
  • backingOff
    后台发送是否关闭,即如果需要重试并且等待时间小于 retryBackoffMs ,则 backingOff = true,也意味着该批次未准备好。
  • timeToWaitMs
    send 线程发送消息需要的等待时间,如果 backingOff 为 true,表示该批次是在重试,并且等待时间小于系统设置的需要等待时间,这种情况下 timeToWaitMs = retryBackoffMs 。否则需要等待的时间为 lingerMs。
  • boolean full该批次是否已满,如果两个条件中的任意一个满足即为 true。
    • Deque< ProducerBatch> 该队列的个数大于1,表示肯定有一个 ProducerBatch 已写满。
    • ProducerBatch 已写满。
  • boolean expired
    是否过期,等于已经等待的时间是否大于需要等待的时间,如果把发送看成定时发送的话,expired 为 true 表示定时器已到达触发点,即需要执行。
  • boolean exhausted
    当前生产者缓存已不够,创建新的 ProducerBatch 时阻塞在申请缓存空间的线程大于0,此时应立即将缓存区中的消息立即发送到服务器。
  • boolean sendable是否可发送。其满足下面的任意一个条件即可:
    • 该批次已写满。(full = true)。
    • 已等待系统规定的时长。(expired = true)
    • 发送者内部缓存区已耗尽并且有新的线程需要申请(exhausted = true)。
    • 该发送者的 close 方法被调用(close = true)。
    • 该发送者的 flush 方法被调用。


RecordAccumulator#drain**

  1. public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) { // @1
  2. if (nodes.isEmpty())
  3. return Collections.emptyMap();
  4. Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
  5. for (Node node : nodes) {
  6. List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now); // @2
  7. batches.put(node.id(), ready);
  8. }
  9. return batches;
  10. }

代码@1:我们首先来介绍该方法的参数:

  • Cluster cluster
    集群信息。
  • Set< Node> nodes
    已准备好的节点集合。
  • int maxSize
    一次请求最大的字节数。
  • long now
    当前时间。

代码@2:遍历所有节点,调用 drainBatchesForOneNode 方法抽取数据,
组装成 Map> batches。

RecordAccumulator#drainBatchesForOneNode

  1. private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
  2. int size = 0;
  3. List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); // @1
  4. List<ProducerBatch> ready = new ArrayList<>();
  5. int start = drainIndex = drainIndex % parts.size(); // @2
  6. do { // @3
  7. PartitionInfo part = parts.get(drainIndex);
  8. TopicPartition tp = new TopicPartition(part.topic(), part.partition());
  9. this.drainIndex = (this.drainIndex + 1) % parts.size();
  10. if (isMuted(tp, now))
  11. continue;
  12. Deque<ProducerBatch> deque = getDeque(tp); // @4
  13. if (deque == null)
  14. continue;
  15. synchronized (deque) {
  16. // invariant: !isMuted(tp,now) && deque != null
  17. ProducerBatch first = deque.peekFirst(); // @5
  18. if (first == null)
  19. continue;
  20. // first != null
  21. boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs; // @6
  22. // Only drain the batch if it is not during backoff period.
  23. if (backoff)
  24. continue;
  25. if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // @7
  26. break;
  27. } else {
  28. if (shouldStopDrainBatchesForPartition(first, tp))
  29. break;
  30. // 这里省略与事务消息相关的代码,后续会重点学习。
  31. batch.close(); // @8
  32. size += batch.records().sizeInBytes();
  33. ready.add(batch);
  34. batch.drained(now);
  35. }
  36. }
  37. } while (start != drainIndex);
  38. return ready;
  39. }

代码@1:根据 brokerId 获取该 broker 上的所有主分区。
代码@2:初始化 start。这里首先来阐述一下 start 与 drainIndex 。

  • start 当前开始遍历的分区序号。
  • drainIndex 上次抽取的队列索引后,这里主要是为了每个队列都是从零号分区开始抽取。

代码@3:循环从缓存区抽取对应分区中累积的数据。
代码@4:根据 topic + 分区号从生产者发送缓存区中获取已累积的双端Queue。
代码@5:从双端队列的头部获取一个元素。(消息追加时是追加到队列尾部)。
代码@6:如果当前批次是重试,并且还未到阻塞时间,则跳过该分区。
代码@7:如果当前已抽取的消息总大小 加上新的消息已超过 maxRequestSize,则结束抽取。
代码@8:将当前批次加入到已准备集合中,并关闭该批次,即不在允许向该批次中追加消息。
NetworkClient 的 poll 方法内部会调用 Selector 执行就绪事件的选择,并将抽取的消息通过网络发送到 Broker 服务器