生产者客户端架构图
原理分析
消息发送流程
1)整个生产者客户端由两个线程协调运行,分别为主线程(调用Producer的线程)与Sender线程。在主线程中初始化KafkaProducer实例,创建消息(ProducerRecord),然后依次执行发送端消息拦截器、消息序列化器、消息分区器,最终同主题分区的消息会加入到相同的一个批次里,并缓存到消息收集器里(RecordAccumulator),存储形式为如下:
private final ConcurrentMap
> batches;
Sender线程从消息收集器里的batches对象里取可发送的批次,然后调用KafkaClient网络组件发送到主题分区对应的Broker里。
2) RecordAccumulator主要用来缓冲消息, 以便Sender线程可批量发送,进而减少网络传输的资源消耗, 以提升性能与发送吞吐量。缓冲区的大小可通过生产者客户端参数buffer.memory
配置,默认为32MB.
如果生产者发送消息的速度超过发送到服务器的速度,则会导致发送的缓冲空间不足,这时候主线程调用KafkaProducer的send方法可能会阻塞,或抛出异常。可通过参数max.block.ms
配置最大阻塞时间,默认为60秒. 内部通过加重入锁来控制内存分配,如果未获取到内存则被加入到Lock关联的Condition队列中等待,超过max.block.ms
后,则抛出中断异常。
3)主线程中发送的ProducerRecord 执行分区策略后,得到一个TopicPartition,然后会被放到RecordAccumulator的一个Map中,如代码:
KafkaProducer#doSend() {
...
int partition = partition(record, serializedKey, serializedValue, cluster);
TopicPartition tp = new TopicPartition(record.topic(), partition); // 创建Record时指定了Topic
}
RecordAccumulator中内部为每个TopicPartition维护了一个Deque双端队列,队列的内容存储的是:ProducerBatch,即Deque
如果生产者客户端需要向很多主题分区发送消息,可以适当调大buffer.memory
参数以提高吞吐量。
关键代码:
放入队列尾部
RecordAccumulator#tryAppend() {
ProducerBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
}
}
从队列头开始读:
RecordAccumulator#ready() {
...
ProducerBatch batch = deque.peekFirst();
if (batch != null) {
TopicPartition part = entry.getKey();
Node leader = cluster.leaderFor(part);
}
...
}
由于消息在网络上是以字节Byte的形式传输, 因此RecordAccumulator内部也有一个块单独的内存用来存储消息体,Kafka直接基于JDK原生的NIO来实现字节流的内存与网络控制,减少依赖第三方库,避免引入不稳定的因素。通过java.nio.ByteBuffer
来实现内存的分配与释放, 由于频繁的创建与释放内存比较浪费资源,因此RecordAccumulator内部维护了一个BufferPool, 它主要用于复用ByteBuffer,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存到BufferPool, 通过batch.size
参数来配置这个特定大小的ByteBuffer,默认为值为16KB. 我们可以适当调整这个参数,以便多缓存一些消息.(Kafka建议不要修改这个参数大于默认值)
同时,batch.size
的大小与ProducerBatch的关系很密切。当一条消息(ProducerRecord)进入RecordAccumulator,先通过主题分区检查是否存在对应的双端队列,没有则会创建一个双端队列。然后从该双端的尾部获取一个ProducerBatch(没有则新建),查看该ProducerBatch是否还可以写入消息,如果可以则写入。如果不可写入则需创建一个新的ProducerBatch,在创建时会评估消息是否大于batch.size
如不超过,则以batch.size
指定的大小来创建一个ProducerBatch. 这样在使用完这段内存区域后,可以通过BufferPool的管理来实现复用。如果单个消息体的大小超过了batch.size, 则就已评估后的大小来创建ProducerBatch, 那么这段内存区域不会纳入BufferPool的管理,不会被复用。关键代码如下:
RecordAccumulator.append() {
....
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
buffer = free.allocate(size, maxTimeToBlock);
....
}
4) Sender从RecordAccumulator中获取缓存的消息时,取出的形式为:
在转换为
发送到Broker的Request还会进一步保存在InFligntRequest
可以理解为正在发送的请求列表,InFligntRequest保存的Request形式为
除此之外,InFligntRequest内部提供了很多控制方法,可通过配置参数限制每个连接(客户端与Node)最多缓存的请求数,这个配置参数为:max.in.flignt.request.per.connection
,默认值为5,即每个连接最多缓存5个未响应的请求。超过该数值,Kakfa的网络组件则不会向这个连接发送更多的请求,进而转入等待中。除非有缓存的请求收到了响应。通过比较Deque
获取最小负载节点(leastLoadedNode)
InFligntRequest
除了做请求限制外,还可以用于获取最小负载的节点。通过每个Node在InFligntRequest中未完成响应的请求可以判断当前Node的负载情况,如果未完成请求的数量越大,则仍未该Node的负载越大. 如:
可以看到Node1的负载相对较小,因此可作为可选的节点,选择负载较小的节点,可以避免网络拥塞等异常可以快速发送请求并获取响应。一般用于发送元数据更新请求,消费者组播协议。
相关代码逻辑:
org.apache.kafka.clients.NetworkClient#leastLoadedNode() {
int offset = this.randOffset.nextInt(nodes.size());
for (int i = 0; i < nodes.size(); i++) {
int idx = (offset + i) % nodes.size();
Node node = nodes.get(idx);
if (canSendRequest(node.idString(), now)) {
// 优先选择
int currInflight = this.inFlightRequests.count(node.idString());
if (currInflight == 0) {
// if we find an established connection with no in-flight requests we can stop right away
log.trace("Found least loaded node {} connected with no in-flight requests", node);
return node;
} else if (currInflight < inflight) {
// otherwise if this is the best we have found so far, record that
inflight = currInflight;
foundReady = node;
}
}
}