从编程的角度而言,生产者就是负责向 Kafka 发送消息的应用程序。在 Kafka 的历史变迁中,一共有两个大版本的生产者客户端:第一个是于 Kafka 开源之初使用 Scala 语言编写的客户端,我们可以称之为旧生产者客户端;第二个是从 Kafka 0.9.x 版本开始推出的使用 Java 编写的客户端,我们可以称之为新生产者客户端,它弥补了旧版本客户端中存在的诸多设计缺陷。

客户端开发

  1. public class ProducerDemo {
  2. private static final String BROKER_LIST = "127.0.0.1:9092";
  3. private static final String TOPIC = "TestTopic";
  4. public static void main(String[] args) {
  5. KafkaProducer<String, String> producer = new KafkaProducer<>(initConfig());
  6. ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "Hello Kafka");
  7. try {
  8. producer.send(record);
  9. } catch (Exception e) {
  10. e.printStackTrace();
  11. } finally {
  12. producer.close();
  13. }
  14. }
  15. private static Properties initConfig() {
  16. Properties properties = new Properties();
  17. properties.setProperty("bootstrap.servers", BROKER_LIST);
  18. properties.setProperty("key.serializer", StringSerializer.class.getName());
  19. properties.setProperty("value.serializer", StringSerializer.class.getName());
  20. return properties;
  21. }
  22. }

KafkaProducer 是线程安全的,可以在多个线程中共享单个 KafkaProducer 实例,也可以将 KafkaProducer 实例进行池化来供其他线程调用。此外,ProducerRecord 并不是单纯意义上的消息,它包含了多个属性,原本需要发送的与业务相关的消息体只是其中的一个 value 属性。ProducerRecord 类的定义如下:

  1. public class ProducerRecord<K, V> {
  2. // 主题
  3. private final String topic;
  4. // 分区号
  5. private final Integer partition;
  6. // 消息头
  7. private final Headers headers;
  8. // 键
  9. private final K key;
  10. // 值
  11. private final V value;
  12. // 消息时间戳
  13. private final Long timestamp;
  14. ......
  15. }

其中 topic 和 partition 分别代表消息要发往的主题和分区号。headers 是消息头,在 Kafka 0.11.x 版本开始引入这个属性,它大多用来设定一些与应用无关的信息。key 用来指定消息的键,它不仅是消息的附加信息,还可以用来计算分区号进而可以让消息发往特定的分区,同时有 key 的消息还可以支持日志压缩的功能。value 是指消息体,一般不为空,如果为空则表示特定的消息——墓碑消息。timestamp 是指消息的时间戳,它有 CreateTime 和 LogAppendTime 两种类型,前者表示消息创建的时间,后者表示消息追加到日志文件的时间。

生产者参数配置

1. bootstrap.servers

该参数用来指定生产者客户端连接 Kafka 集群所需的 broker 地址清单,格式为 host:port,可以设置一个或多个地址,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者会从给定的 broker 里查找到其他 broker 的信息。不过建议至少要设置两个以上的 broker 地址信息,当其中任意一个宕机时,生产者仍然能够连接到 Kafka 集群上。

2. key.serializer、value.serializer

broker 端接收的消息必须以字节数组(byte[])的形式存在。生产者接口允许使用参数化类型,因此可以把 Java对象作为键和值发送给 broker。这样的代码具有良好的可读性,不过生产者需要知道如何把这些 Java 对象转换成字节数组。key.serializer 和 value.serializer 这两个参数就是分别用来指定 key 和 value 序列化操作的序列化器,注意这里必须填写序列化器的类的全限定名。

3. acks

这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。这个参数涉及消息的可靠性和吞吐量之间的权衡。acks 参数有三种类型的值(都是字符串类型):

acks = 1(默认)
生产者发送消息后,只要分区的 leader 副本成功写入消息,那么它就会收到来自服务器端的成功响应。如果消息无法写入 leader 副本,比如在 leader 副本崩溃、重新选举新的 leader 副本的过程中,那么生产者就会收到一个错误的响应,为了避免数据丢失,生产者可以选择重发消息。如果消息写入 leader 副本并返回成功响应给生产者且在被其他 follower 副本拉取该消息前这个 leader 副本崩溃了,此时消息还是会丢失,因为新选举的 leader 副本中并没有这条对应的消息。acks 设置为 1 是消息可靠性和吞吐量之间的折中方案。

acks = 0
生产者发送消息后不需要等待任何服务端的响应。如果在消息从发送到写入 Kafka 的过程中出现某些异常,导致 Kafka 并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。在其他配置环境相同的情况下,acks 设置为 0 可以达到最大的吞吐量。

acks = all 或 acks = -1
生产者在消息发送后,需要等待 ISR 中的所有副本都成功写入消息后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下,acks 设置为 -1 可以达到最强的可靠性。但这并不意味着消息就一定可靠,因为 ISR 中可能只有 leader 副本,这样就退化成了 acks = 1 的情况。要获得更高的消息可靠性需要配合 min.insync.replicas 等参数的联动。

4. max.request.size

这个参数用来限制生产者客户端能发送的消息的最大值,默认为 1MB。一般情况下,这个默认值可以满足大多数场景。修改时要注意该参数还涉及一些其他参数的联动,比如 broker 端的 message.max.bytes 参数,两边的配置最好可以匹配避免生产者发送的消息被 broker 拒绝。

5. retries、retry.backoff.ms

retries 参数用来配置生产者重试的次数,默认为 0,即在发生异常时不进行任何重试动作。消息在从生产者发出到成功写入服务器之前可能会发生一些临时性的异常,比如网络抖动、leader 副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置 retries 大于 0 的值,以此通过内部重试来恢复而不是一味地将异常抛给生产者的应用程序。如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。但并非所有异常都可以通过重试来解决,比如消息太大,超过了 max.request.size 参数配置的值,那这种就不会重试了。重试还和另一个参数 retry.backoff.ms 有关,该参数默认值为 100,用来设定两次重试之间的时间间隔,避免无效的频繁重试。

Kafka 可以保证同一个分区中的消息是有序的。如果生产者按照一定的顺序发送消息,那么这些消息也会顺序地写入分区,进而消费者也可以按照同样的顺序消费。但是,如果将 retries 参数配置为非零值,并且将 max.in.flight.requests.pre.connection 参数配置为大于 1 的值(默认是 5)时就可能会出现错序。因为如果第一批次消息写入失败,而第二批次消息写入成功,那么生产者会重试发送第一批次的消息,此时如果第一批次的消息写入成功,那么这两个批次的消息就出现了错序。一般而言,在需要保证消息顺序的场合建议把参数 max.in.flight.requests.pre.connection 配置为 1,但这样也会影响整体的吞吐量。

6. max.in.flight.requests.per.connection

这个参数指定了生产者客户端在收到服务器端响应之前可以发送多少个消息。它的值越高,就会占用越多的内存空间,不过也会提升吞吐量,默认值为 5。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,表示 broker 在响应请求之前,生产者客户端不能再向同一个 broker 发送请求了。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。

7. compression.type

该参数用来指定消息的压缩方式,默认值为 none,即默认消息不会被压缩。该参数可以配置为 gzip、snappy 和 lz4,分别对应不同的压缩算法。对消息进行压缩可以极大地减少网络传输量、降低网络 I/O,从而提高整体的性能。但消息压缩是一种使用时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩。

8. linger.ms、batch.size

linger.ms 参数用来指定生产者在发送 ProducerBatch 之前等待更多消息加入 ProducerBatch 的时间,默认值为 0,即只要有可用线程,生产者就会把消息发送出去,即使 ProducerBatch 里只有一个消息。batch.size 参数用来指定 ProducerBatch 可以复用内存区域的大小,如果消息大小超过该参数设定的值则会为该消息单独创建一块内存区域,这块内存区域不会被复用,这样会降低性能。

生产者客户端会在 ProducerBatch 被填满或超过等待时间后将这一批消息发送出去。把 linger.ms 设置成比 0 大的数,可以让生产者在发送 ProducerBatch 前等待更多的消息加入到这个 ProducerBatch 中,从而提升一定的吞吐量,但同时也加大消息的延迟。这个 linger.ms 参数与 TCP 协议中的 Nagle 算法有异曲同工之妙。

9. request.timeout.ms

这个参数用来配置 Producer 等待服务器返回响应的最长时间,默认值是 30000 (ms)。请求超时之后可以选择进行重试。生产者要么重试发送请求要么返回一个错误(抛出异常或执行回调)。

消息发送模式

在创建完生产者实例之后,接下来就是构建消息,即创建 ProducerRecord 对象。ProducerRecord 中的 topic 和 value 是必填项,其余属性是选填项。发送消息主要有三种模式:发后即忘、同步、异步。

1)发后即忘
上面代码示例中的那种发送方式就是发后即忘,它只管往 Kafka 中发送消息而不关心消息是否正确到达。在大多数情况下,这种发送方式没什么问题,不过在某些时候(比如发生不可重试的异常时)会造成消息的丢失。这种发送方式的性能最高,可靠性也最差。

2)同步发送
实际上 send 方法本身就是异步的,send 方法返回的 Future 对象可以使调用方稍后获得发送的结果。下面示例中调用了 get() 方法来阻塞等待 kafka 的响应,直到消息发送成功或异常。如果发生异常,则需要捕获并交由外层逻辑处理。

  1. try {
  2. Future<RecordMetadata> future = producer.send(record);
  3. RecordMetadata metadata = future.get();
  4. System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
  5. } catch (Exception e) {
  6. e.printStackTrace();
  7. }

通过 get() 方法还可以获取一个 RecordMetadata 对象,在 RecordMetadata 对象里包含了消息的一些元数据信息,比如当前消息的主题、分区号、分区中的偏移量、时间戳等。KafkaProducer 中一般会发生两种类型的异常:可重试的异常和不可重试的异常。常见的可重试的异常有:

  • NetworkException:网络异常,有可能是由于网络瞬时故障而导致的,重试后可恢复
  • LeaderNotAvailableException:分区的 leader 副本不可用,该异常通常发生在 leader 副本下线而新的 leader 副本选举完成之前,重试后可恢复。
  • UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException 等。

对于可重试的异常,如果配置了 retries 参数,那么只要在规定的重试次数内自行恢复了就不会抛出异常,该参数的默认为 0。如果达到重试次数仍没有恢复则抛出异常,进而外层逻辑就要处理异常了。不可重试的异常比如有 RecordTooLargeException 异常,暗示了所发送的消息太大。对于这种类型的异常,KafkaProducer 不会进行任何重试,直接抛出异常。

同步发送的方式可靠性高,要么消息被发送成功,要么发生异常。如果异常则可以捕获并进行相应的处理,而不会像发后即忘的方式直接造成消息的丢失。不过同步发送的方式性能会差很多,需要阻塞等待一条消息发送完之后才能发送下一条。

3)异步发送
异步发送一般是在 send() 方法里指定一个 Callback 的回调函数,Kafka 在返回响应时会调用该函数来实现异步的发送确认。Kafka 有响应就会触发回调,要么发送成功,要么抛出异常。

  1. producer.send(record, new Callback() {
  2. @Override
  3. public void onCompletion(RecordMetadata metadata, Exception exception) {
  4. if (exception != null) {
  5. exception.printStackTrace();
  6. } else {
  7. System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
  8. }
  9. }
  10. });

onCompletion 方法的两个参数是互斥的,消息发送成功时,metadata 不为 null 而 exception 为 null;消息发送异常时,metadata 为 null 而 exception 不为 null。对于同一个分区而言,如果消息 A 先于消息 B 之前先发送,那么 KafkaProducer 就可以保证对应的 callbackA 在 callbackB 之前调用,即回调函数也保证分区有序。

消息序列化

生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给 Kafka。而在对侧,消费者需要用反序列化器(Deserializer)把从 Kafka 中收到的字节数组转换成相应的对象。

序列化器除了用于 String 类型的序列化器,还有 ByteArray、ByteBuffer、Bytes、Double、Integer、Long 这几种类型,它们都实现了 Serializer 接口,该接口定义如下:

  1. public interface Serializer<T> extends Closeable {
  2. // 配置当前类
  3. void configure(Map<String, ?> configs, boolean isKey);
  4. // 执行序列化操作
  5. byte[] serialize(String topic, T data);
  6. // 一般是空方法,如果实现了此方法,则必须确保此方法的幂等性,因为该方法会被多次调用
  7. @Override
  8. void close();
  9. }

生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的,如果生产者使用了某种序列化器,而消费者使用了另一种序列化器,那么是无法解析出想要的数据的。如果 Kafka 客户端默认提供的这几种序列化器都无法满足应用需求,则可以选择使用如 Avro、JSON、Thrift 等通用的序列化工具来实现。

分区器

消息在通过 send 方法发往 broker 的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partition)的一系列作用后才能被真正地发往 broker。拦截器不是必须的,而序列化器是必须的。消息经过序列化之后就需要确定它要发往的分区了。
image.png
ProducerRecord 对象里可以指定 key 或 partition 字段,如果指定了 partition 字段,则不需要分区器干预,直接发往指定的分区。如果没有指定 partition,那就需要依赖分区器,如果消息中指定了 key 字段,则分区器会根据 key 来计算 partition 的值。如果 key 和 partition 都不存在,则分区器会采用默认的分区策略。

Kafka 提供的默认分区器是 DefaultPartitioner,分区策略为:如果 key 不为 null,则对 key 进行哈希(采用 MurmurHash2 算法,具备高运算性能及低碰撞率),最终根据得到的哈希值来计算分区号,拥有相同 key 的消息会被写入同一个分区。如果 key 为 null,则消息会以轮询的方式发往主题内的各个可用分区。

  1. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  2. if (keyBytes == null) {
  3. return stickyPartitionCache.partition(topic, cluster);
  4. }
  5. List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  6. int numPartitions = partitions.size();
  7. // hash the keyBytes to choose a partition
  8. return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
  9. }

在不改变主题分区数量的情况下,key 与分区之间的映射可以保持不变。不过,一旦主题中增加了分区,就难以保证 key 与分区之间的映射关系了。除了使用 Kafka 提供的默认分区器进行分区分配,还可以使用自定义的分区器,自定义分区器需要实现 Partitioner 接口并配置 partitioner.class 客户端参数来显式指定这个分区。

  1. public interface Partitioner extends Configurable, Closeable {
  2. // 计算分区号
  3. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
  4. public void close();
  5. }

这里的 topic、key、keyBytes、value 和 valueBytes 都属于消息数据,cluster 则是集群的元数据信息(比如当前 Kafka 集群共有多少主题、多少 Broker 等),通过这些信息可以实现功能丰富的分区器。

切记分区是实现负载均衡以及高吞吐量的关键,故在生产者这一端就要仔细盘算合适的分区策略,避免造成消息数据的倾斜,使得某些分区成为性能瓶颈,这样极易引发下游数据消费的性能下降。

生产者拦截器

拦截器(Interceptor)是早在 kafka 0.10.0.0 中就引入的一个功能,Kafka 一共有两种拦截器:生产者拦截器和消费者拦截器。生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。

当前 Kafka 拦截器的设置方法是通过参数配置完成的。生产者和消费者两端都有一个相同的参数,名字叫做 interceptor.classes,它指定的是一组类的列表,每个类就是特定逻辑的拦截器实现类。

  1. Properties props = new Properties();
  2. List<String> interceptors = new ArrayList<>();
  3. interceptors.add("com.xxx.xxx.AddTimestampInterceptor"); // 拦截器1
  4. interceptors.add("com.xxx.xxx.UpdateCounterInterceptor"); // 拦截器2
  5. props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

自定义 Producer 端的拦截器都要实现 ProducerInterceptor 接口,该接口定义如下:

  1. public interface ProducerInterceptor<K, V> extends Configurable {
  2. // 在将消息序列化和计算分区之前就会调用该方法来对消息进行相应的定制化操作
  3. public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
  4. // 在消息被应答之前或消息发送失败时调用该方法,优先于send方法中设定的callback之前执行。
  5. public void onAcknowledgement(RecordMetadata metadata, Exception exception);
  6. public void close();
  7. }

注意,onAcknowledgement 方法运行在 Producer 的 I/O 线程中,所以这个方法中的逻辑不要太重,否则会影响消息的发送速度。并且,该方法和 onSend 方法不是在同一个线程中被调用的,因此如果在这两个方法中调用了某个共享可变对象一定要保证线程安全。

拦截器支持将一组拦截器串连成一个大的拦截器,Kafka 会按照添加顺序依次执行拦截器逻辑。在执行拦截器链时,如果某个拦截器执行失败,那么下一个拦截器会接着从上一个执行成功的拦截器继续执行。

幂等性生产者

在 Kafka 中,Producer 默认不是幂等性的,因为只有 Broker 成功写入消息且 Producer 接收到了 Broker 的应答才会认为该消息成功发送。如果消息写入成功但 Producer 没有收到 Broker 的应答(比如网络瞬时抖动),那 Producer 就无法确定消息是否真的写入成功。因此只能重试,而这也会导致消息重复发送。

在 Kafka 0.11 版本后,我们可以创建幂等性 Producer 来避免消息的重复发送,通过设置 enable.idempotence 属性为 true 可以开启 Producer 幂等性。该参数设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有代码逻辑都不需要改变,Kafka 自动帮你做消息去重。底层原理很简单,就是经典的用空间换时间,即在 Broker 端多保存一些字段,当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们丢弃掉。

但是,幂等性 Producer 也存在一定的局限性。首先,它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息。其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。如果想实现多分区以及多会话上的消息无重复,则可以依赖事务型 Producer。这也是幂等性 Producer 和事务型 Producer 的最大区别!

事务型生产者

Kafka 自 0.11 版本开始提供了对事务的支持,目前主要是在 read committed 隔离级别上做事情。它能保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息。并且事务型 Producer 不惧进程重启,重启后 Kafka 依然保证它们发送消息的精确一次处理。

设置事务型 Producer 的方法也很简单,满足两个要求即可:

  • 和幂等性 Producer 一样,开启 enable.idempotence=true。
  • 设置 Producer 端参数 transactional.id,最好为其设置一个有意义的名字。

此外,在发送消息时还需要显式调用事务相关的 API,具体示例如下所示:

  1. // 事务的初始化
  2. producer.initTransactions();
  3. try {
  4. // 事务开始
  5. producer.beginTransaction();
  6. producer.send(record1);
  7. producer.send(record2);
  8. // 事务提交
  9. producer.commitTransaction();
  10. } catch (KafkaException e) {
  11. // 事务终止
  12. producer.abortTransaction();
  13. }

这段代码能够保证 record1 和 record2 被当作一个事务统一提交到 Kafka,要么它们全部提交成功,要么全部写入失败。实际上即使写入失败,Kafka 也会把它们写入到底层的日志中,即 Consumer 还是会看到这些消息。因此在 Consumer 端,读取事务型 Producer 发送的消息也需要设置 isolation.level 参数的值。当前这个参数有两个取值选项:

  • read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论提交事务还是终止事务,其写入的消息都可以读取。


  • read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然它也能看到非事务型 Producer 写入的所有消息。

Kafka 事务详细设计文档:https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit