知识点

  • 同步发送
  • 异步发送
  • 批量发送
  • 超时重发
  • 消息网络协议

疑问:

  • 什么时候获取集群元数据
  • 对集群元数据如何处理

对生产者来说,分区的数量以及 Leader 副本的分布是动态变化的。

  • 情况一:在运行过程中,Leader 副本随时有可能出现故障进而导致 Leader 副本的重新选举。新的 Leader 副本会在其他 Broker 上继续对外提供服务。
  1. 查询 Broker 获取元数据信息:明确 Topic 分区数量以及所在位置
  2. 经分区算法确定目标分区
  3. 与目标分区所在的 Broker 建立连接,向此 Broker 发送数据
  4. 消息如何保证分区有序性
  5. 消息如何重试
  6. 消息大小如何配置
  7. 限流
  8. 重分区如何处理
  9. 压缩数据是如何处理,在哪个线程压缩

语义
http://kafka.apache.org/documentation/#theconsumer
在 0.11.0.0 之前,Kafka 只能保证 at-least-once 语义,消息可能会重复写入日志文件中。
从 0.11.0.0 开始,Kafak 生产者支持幂等发送选项,这可以避免消息重复写入日志文件。实现这一方式是通过 broker 为每个 producer 分配一个 ID 以及生产者为每条消息分配序列号(sequence number),同时这个版本支持使用事务保证发送多个 Topic 的强一致性。所有消息要么都成功写入,要么都不写入。

Kafka生产者源码已经看得七七八八了。可以将生产者部分的源码一分为二:消息的创建和消息的发送。Kafka生产者只有两个线程,主线程用于消息的创建,Sender线程用于消息的发送。我们在阅读源码的时候做到心中有数,这样就不会迷失在代码的世界了。
消息的创建非常简单,示例如下:

  1. // #1 创建生产者
  2. KafkaProducer<String, String> producer = new KafkaProducer<>(props);
  3. // #2 创建消息
  4. ProducerRecord<String, String> r1 = new ProducerRecord<>(TOPIC3, 1, null, "HELLO WORLD_par_1");
  5. // #3 发送消息
  6. producer.send(r1);

Kafka 提供简易的 API 让用户轻松可以向节点发送数据。当用户调用 KafkaProducer#send(ProducerRecord<K,V>) 方法时,其实只是触发了消息的创建这一步。它的实现逻辑非常简单,内部有一个被称为消息累加器的 RecordAccumulator 对象,相当于是待发送消息的缓冲池,消息第一站是进入到这里。Kafka 并非来一条消息就发送一条,而是以批次为概念,将大大小小的消息组成多个批次发送,这样既能减少网络连接的消耗,也能提高整体吞吐量。当然,在数据准备队列我们需要明确消息发往哪个地方,也就有了分区器,不同策略有不同的分区器实例,对于有 key 的消息,通过 murmur2 哈希算法计算得到分区 ID,而无 key 的消息现在默认使用黏性分区算法,这样能提高吞吐量。使用分区器得需要集群的元数据,如果发送元数据缺失,则会设置元数据更新标志位,Sender 每次轮询都会判断是否需要进行元数据更新操作。
Sender 线程处理的事情比较多,包含消息发送和元数据更新操作等。Sender 线程最重要的功能就是接收和发送数据,Sender 线程管控多条 TCP 连接。那么保证这些消息有序且无差错发送是一件令人头痛的事情。Kafka 将消息的发送细分为两个阶段,分别是消息的准备和执行 I/O 操作。消息的准备会根据通道连接状态、通道数据发送状态、通道是否被限流等一系列状态得出本轮是否需要为当前通道准备数据。如果需要,则记录通道ID,如果不需要则跳过。数据准备阶段其实挺复杂的,因为如果加入幂等性、事务的话,需要判断的条件更加复杂。这等到讲解源码的时候再说吧。数据准备好,则会被封装为对象放到 KafkaChannel 对应的位置,并等待发送。
触发数据发送的是 Network.poll 方法,其实 Sender 很多事情都是委托这个类来完成的。具体在源码中分析,由于底层的 SocketChannel 是非阻塞通道,所以一次发送多少数据完全是不确定的,这里就会记录没有完成数据发送的通道 ID,等待下轮发送数据。读取数据和发送数据原理类似。由于非阻塞通道的不确定性,可能本轮 poll 并没有发送一个完整的请求或收到一个完整的响应,这是可能存在的情况。而且在轮询中可能遇到连接断开、连接超时等情况,我们都需要进行相应处理,比如重试呀或向用户抛出异常等方式。当我们收到一个响应后,就可以执行收尾工作,执行回调函数。这样一次请求和响应就算完成了。
关于生产者,Kakfa 是如何提高吞吐量,我总结有以下几点:

  1. 以批为最小单位。将单条消息整合成若干批次发送。
  2. 单条连接可以同时存在多个未收到 ACK 的请求。