基于kafka2.x 版本
生产者局部架构图
Sender线程run()步骤:场景驱动方式cluster中已有元数据
1、获取元数据
2、判断哪些partitinon有消息要发送,获取分区号对应的broker主机
3、标志还没有拉取到元数据的topic
4、检查与要发送数据的主机网络是否建立好
5、有可能要发送的parttition有多个,有一些parttition的leader partition在同一台服务器上。
按照broker进行分区,同一个broker的partition为同一组
6、放弃超时的batch
7、创建发送消息的请求
8、执行网络操作的这个NetWordClient组件,包括:发送请求、接收响应
run()流程图:
Sender 发送流程中,大部分都是调用 RecordAccumulator 方法来实现其特定逻辑,重点对上述涉及到RecordAccumulator 的方法进行一个详细剖析
RecordAccumulator#ready 该方法主要就是根据缓存区中的消息,判断哪些分区已经达到发送条件。
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();
boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) { // @1
TopicPartition part = entry.getKey();
Deque<ProducerBatch> deque = entry.getValue();
Node leader = cluster.leaderFor(part); // @2
synchronized (deque) {
if (leader == null && !deque.isEmpty()) { // @3
// This is a partition for which leader is not known, but messages are available to send.
// Note that entries are currently not removed from batches when deque is empty.
unknownLeaderTopics.add(part.topic());
} else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) { // @4
ProducerBatch batch = deque.peekFirst();
if (batch != null) {
// waitedTimeMs=100ms 消息最多存多久发出去
long waitedTimeMs = batch.waitedTimeMs(nowMs);
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
// 如果队列大于1说明队列里至少有一个批次写满了;也可能只有一个批次,这个批次正好写满
boolean full = deque.size() > 1 || batch.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) { // @5
readyNodes.add(leader);
} else {
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
代码@1:对生产者缓存区 ConcurrentHashMap
代码@2:从生产者元数据缓存中尝试查找分区(TopicPartition) 的 leader 信息,如果不存在,当将该 topic 添加到 unknownLeaderTopics (代码@3),稍后会发送元数据更新请求去 broker 端查找分区的路由信息。
代码@4:如果不在 readyNodes 中就需要判断是否满足条件,isMuted 与顺序消息有关,本文暂时不关注,在后面的顺序消息部分会重点探讨。
代码@5:这里就是判断是否准备好的条件,先一个一个来解读局部变量的含义。
- long waitedTimeMs
该 ProducerBatch 已等待的时长,等于当前时间戳 与 ProducerBatch 的 lastAttemptMs 之差,在 ProducerBatch 创建时或需要重试时会将当前的时间赋值给lastAttemptMs。 - retryBackoffMs
当发生异常时发起重试之前的等待时间,默认为 100ms,可通过属性 retry.backoff.ms 配置。 - batch.attempts()
该批次当前已重试的次数。 - backingOff
后台发送是否关闭,即如果需要重试并且等待时间小于 retryBackoffMs ,则 backingOff = true,也意味着该批次未准备好。 - timeToWaitMs
send 线程发送消息需要的等待时间,如果 backingOff 为 true,表示该批次是在重试,并且等待时间小于系统设置的需要等待时间,这种情况下 timeToWaitMs = retryBackoffMs 。否则需要等待的时间为 lingerMs。 - boolean full该批次是否已满,如果两个条件中的任意一个满足即为 true。
- Deque< ProducerBatch> 该队列的个数大于1,表示肯定有一个 ProducerBatch 已写满。
- ProducerBatch 已写满。
- boolean expired
是否过期,等于已经等待的时间是否大于需要等待的时间,如果把发送看成定时发送的话,expired 为 true 表示定时器已到达触发点,即需要执行。 - boolean exhausted
当前生产者缓存已不够,创建新的 ProducerBatch 时阻塞在申请缓存空间的线程大于0,此时应立即将缓存区中的消息立即发送到服务器。 - boolean sendable是否可发送。其满足下面的任意一个条件即可:
- 该批次已写满。(full = true)。
- 已等待系统规定的时长。(expired = true)
- 发送者内部缓存区已耗尽并且有新的线程需要申请(exhausted = true)。
- 该发送者的 close 方法被调用(close = true)。
- 该发送者的 flush 方法被调用。
RecordAccumulator#drain**
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) { // @1
if (nodes.isEmpty())
return Collections.emptyMap();
Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
for (Node node : nodes) {
List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now); // @2
batches.put(node.id(), ready);
}
return batches;
}
代码@1:我们首先来介绍该方法的参数:
- Cluster cluster
集群信息。 - Set< Node> nodes
已准备好的节点集合。 - int maxSize
一次请求最大的字节数。 - long now
当前时间。
代码@2:遍历所有节点,调用 drainBatchesForOneNode 方法抽取数据,
组装成 Map
RecordAccumulator#drainBatchesForOneNode
private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
int size = 0;
List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); // @1
List<ProducerBatch> ready = new ArrayList<>();
int start = drainIndex = drainIndex % parts.size(); // @2
do { // @3
PartitionInfo part = parts.get(drainIndex);
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
this.drainIndex = (this.drainIndex + 1) % parts.size();
if (isMuted(tp, now))
continue;
Deque<ProducerBatch> deque = getDeque(tp); // @4
if (deque == null)
continue;
synchronized (deque) {
// invariant: !isMuted(tp,now) && deque != null
ProducerBatch first = deque.peekFirst(); // @5
if (first == null)
continue;
// first != null
boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs; // @6
// Only drain the batch if it is not during backoff period.
if (backoff)
continue;
if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // @7
break;
} else {
if (shouldStopDrainBatchesForPartition(first, tp))
break;
// 这里省略与事务消息相关的代码,后续会重点学习。
batch.close(); // @8
size += batch.records().sizeInBytes();
ready.add(batch);
batch.drained(now);
}
}
} while (start != drainIndex);
return ready;
}
代码@1:根据 brokerId 获取该 broker 上的所有主分区。
代码@2:初始化 start。这里首先来阐述一下 start 与 drainIndex 。
- start 当前开始遍历的分区序号。
- drainIndex 上次抽取的队列索引后,这里主要是为了每个队列都是从零号分区开始抽取。
代码@3:循环从缓存区抽取对应分区中累积的数据。
代码@4:根据 topic + 分区号从生产者发送缓存区中获取已累积的双端Queue。
代码@5:从双端队列的头部获取一个元素。(消息追加时是追加到队列尾部)。
代码@6:如果当前批次是重试,并且还未到阻塞时间,则跳过该分区。
代码@7:如果当前已抽取的消息总大小 加上新的消息已超过 maxRequestSize,则结束抽取。
代码@8:将当前批次加入到已准备集合中,并关闭该批次,即不在允许向该批次中追加消息。
NetworkClient 的 poll 方法内部会调用 Selector 执行就绪事件的选择,并将抽取的消息通过网络发送到 Broker 服务器