Kafka 消费方式

有两种消费方式,一种是 pull( 拉) 模式,一种是 push(推)模式。
Kafka 采用了 pull( 拉) 模式。


pull( 拉) 模式:Consumer 从 Broker 中主动拉取数据。
push(推)模式:Kafka 没有采用这种方式,因为由 Broker 决定消息发送的速率,很难适应所有消费者的消费速率。例如 Broker 推送的速度是 50m/s, Consumer1、Consumer2 就来不及处理消息。
pull 模式的不足之处是:如果 Kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。


image.png

Kafka 消费者工作流程

消费者总体工作流程

image.png

消费者组原理

Consumer Group(CG):消费者组,由多个 Consumer 组成。
形成一个消费者组的条件是:消费者的 groupid 相同。

消费者组内每个消费者负责消费不同分区的数据,一个分区的数据只能由消费者组中的一个消费者消费。
一个消费者可以消费多个分区的数据。
如果向消费组中添加更多的消费者,超过主题分区数量,则有一部分消费者会闲置,不会接收任何消息。
消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
image.png

image.png

消费者组初始化流程

  1. 每个 Consumer 都发送 JoinGroup 请求
  2. 选出一个 Consumer 作为 Leader 消费者
  3. 把要消费的 Topic 情况发送给 Leader 消费者
  4. Leader 负责制定消费方案,为其他消费者指定消费的分区
  5. 把消费方案发给 Coordinator
  6. Coordinator 就把消费方案下发给各个 Consumer

Coordinator:辅助实现消费者组的初始化和分区的分配。
Coordinator 节点选择 = groupid 的 hashcode 值 % 50( consumer_offsets 的分区数量默认为 50)
例如: groupid 的 hashcode 值 = 1,1% 50 = 1,那么 consumer_offsets 主题的 1 号分区,在哪个Broker 上,就选择这个节点的 Coordinator 作为这个消费者组的 Leader。消费者组下的所有的消费者提交 offset 的时候就往这个分区去提交 offset。

每个消费者都会和 Coordinator 保持心跳(心跳时间默认为 3s),一旦超时(session.timeout.ms=45s),该消费者会被移除,并触发再平衡;或者消费者处理消息的时间过长(max.poll.interval.ms=5分钟),也会触发再平衡
image.png

消费者组详细消费流程

image.png

消费者重要参数

参数名称 描述
bootstrap.servers 向 Kafka 集群建立初始连接用到的 host/port 列表。
key.deserializer 和 value.deserializer 指定接收消息的 key 和 value 的反序列化类型。
一定要写全类名。
group.id 标记消费者所属的消费者组。一定要写。
enable.auto.commit 消费者会自动周期性地向服务器提交偏移量。默认为 true
auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为 true,
则该值定义了消费者偏移量向 Kafka 提交的频率,默认为 5s
auto.offset.reset 当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在时,
(如,数据被删除了),该如何处理?
- earliest:自动重置偏移量到最早的偏移量。
- latest:自动重置偏移量为最新的偏移量。默认为 latest
- none:如果消费者原来的偏移量不存在,则向消费者抛异常。
- anything:向消费者抛异常。
offsets.topic.num.partitions consumer_offsets 的分区数,默认为 50 个分区
heartbeat.interval.ms Kafka 消费者和 Coordinator 之间的心跳时间,默认为 3s
该条目的值必须小于 session.timeout.ms , 也不应该高于session.timeout.ms 的 1/3。
session.timeout.ms Kafka 消费者和 Coordinator 之间的连接超时时间,默认为 45s。
超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms 消费者处理消息的最大时长,默认为 5 分钟。
超过该值,该消费者被移除,消费者组执行再平衡。
fetch.min.bytes 消费者从 Broker 获取一批消息最小的字节数。默认为 1 个字节
fetch.max.wait.ms 如果没有从 Broker 获取到一批数据的最小字节数。
该时间到,仍然会返回数据。默认为 500ms
fetch.max.bytes 消费者从 Broker 获取一批消息最大的字节数。
如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。
一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。
默认为:52428800(50 m)
max.poll.records 一次 poll 拉取数据返回消息的最大条数,默认是 500 条。
partition.assignment.strategy 消费者分区分配策略,可以选择的策略包括: Range、RoundRobin 、 Sticky 、CooperativeSticky
默认策略为 Range + CooperativeSticky
Kafka 可以同时使用多个分区分配策略。

消费者 API

  1. public class CustomCustomer {
  2. public static void main(String[] args) {
  3. // Kafka 的配置信息
  4. Properties kafkaProperties = new Properties();
  5. // 连接的 Kafka 服务器信息、key value 的反序列化方式、消费者组ID(必要信息)
  6. kafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "47.96.188.26:9092");
  7. kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  8. kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  9. // 配置消费者组ID, 可任意起名
  10. kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId");
  11. // 创建 Kafka 消费者对象
  12. KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(kafkaProperties);
  13. List<String> topics = new ArrayList<>();
  14. topics.add("myFirstTopic");
  15. // 订阅要消费的主题(可订阅多个)
  16. kafkaConsumer.subscribe(topics);
  17. // 拉取数据
  18. while (true) {
  19. // 设置尝试拉取数据, 但没有数据时最多等待 1s
  20. System.out.println("尝试拉取数据");
  21. ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
  22. for (ConsumerRecord<String, String> record : records) {
  23. System.out.println("key = " + record.key());
  24. System.out.println("value = " + record.value());
  25. System.out.println("offset = " + record.offset());
  26. }
  27. }
  28. }
  29. }
  1. List<TopicPartition> partitions = new ArrayList<>();
  2. partitions.add(new TopicPartition("myFirstTopic", 0));
  3. // 订阅指定主题的指定分区(可订阅多个)
  4. kafkaConsumer.assign(partitions);

分区的分配以及再平衡

一个「消费者组」中有多个「消费者」,一个「主题」由多个「分区」组成,
现在的问题是:到底由哪个「消费者」来消费哪个「分区」的数据。

Kafka 有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。
可以通过配置参数 partition.assignment.strategy 修改分区的分配策略。
默认策略是:Range + CooperativeSticky。
Kafka 可以同时使用多个分区分配策略。


Java 程序设置分区分配策略的代码

  1. // Kafka 的配置信息
  2. Properties kafkaProperties = new Properties();
  3. kafkaProperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"全限定类名");
  4. // 类名如下
  5. // RoundRobinAssignor
  6. // StickyAssignor
  7. // StickyAssignor
  8. // 创建 Kafka 消费者对象
  9. KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(kafkaProperties);

image.png

Range 以及再平衡

Range 是对每个 Topic 而言的。
首先对同一个 Topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。

假如现在有 7 个分区,3 个消费者,排序后的分区将会是:0, 1, 2, 3, 4, 5, 6;消费者排序后将会是:C0, C1, C2。
通过 Kafka 消费者 - 图8来决定每个消费者消费几个分区。如果除不尽,那么前面几个消费者将会多消费 1 个分区。
例如,Kafka 消费者 - 图9,除不尽,那么消费者 C0 便会多消费 1 个分区。 Kafka 消费者 - 图10,除不尽,那么 C0 和 C1 分别多消费一个。

注意:如果只是针对 1 个 Topic 而言,C0 消费者多消费 1 个分区影响不是很大。但是如果有 N 多个 Topic,那么针对每个 Topic,消费者 C0 都将多消费 1 个分区,Topic 越多,C0 消费的分区会比其他消费者明显多消费 N 个分区。容易产生数据倾斜!
image.png


Range 分区分配再平衡案例
停掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
结果发现:
1 号消费者:消费到 3、4 号分区数据。
2 号消费者:消费到 5、6 号分区数据。
0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者。
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 Broker 执行。

再次重新发送消息观看结果(45s 以后)。
结果发现:
1 号消费者:消费到 0、1、2、3 号分区数据。
2 号消费者:消费到 4、5、6 号分区数据。
说明:消费者 0 已经被踢出消费者组,所以重新按照 Range 方式分配。

RoundRobin 以及再平衡

RoundRobin 是针对集群中所有 Topic 而言。
RoundRobin 轮询分区策略,是把「所有的分区」和「所有的消费者」都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配「分区」给到「各个消费者」。
image.png


RoundRobin 分区分配再平衡案例
停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
结果发现:
1 号消费者:消费到 2、5 号分区数据
2 号消费者:消费到 4、1 号分区数据
0 号消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 0 、6 和 3 号分区数据,分别由 1 号消费者或者 2 号消费者消费。
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

再次重新发送消息观看结果(45s 以后)。
结果发现:
1 号消费者:消费到 0、2、4、6 号分区数据
2 号消费者:消费到 1、3、5 号分区数据
说明:消费者 0 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。

Sticky 以及再平衡

粘性分区定义:可以理解为分配的结果带有“粘性”。即在执行一次新的分配之前, 考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
Kafka 从 0.11.x 版本开始引入粘性分区这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。

offset 位移

offset 的默认维护位置

Kafka0.9 版本之前, Consumer 默认将 offset 保存在 Zookeeper 中。
从 0.9 版本开始,Consumer 默认将 offset 保存在 Kafka 的一个内置 Topic 中,该 Topic 为 consumer_offsets。

consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+ 分区号,value 就是当前 offset 的值。每隔一段时间, Kafka 内部会对这个 Topic 进行 compact,也就是每个 group.id+topic+分区号就保留最新数据。
image.png

自动提交 & 手动提交 offset

为了使我们能够专注于自己的业务逻辑,Kafka 提供了自动提交 offset 的功能。
自动提交 offset 的相关参数:

  • enable.auto.commit:是否开启自动提交 offset 功能,默认是 true
  • auto.commit.interval.ms:自动提交 offset 的时间间隔,默认是 5s

image.png


虽然自动提交 offset 十分简单便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因此 Kafka 还提供了手动提交 offset 的 API。
手动提交 offset 的方法有两种分别是:commitSync(同步提交)和 commitAsync(异步提交)。

  • commitSync(同步提交):必须等待 offset 提交完毕,再去消费下一批数据。
  • commitAsync(异步提交) :发送完提交 offset 请求后,就开始消费下一批数据了。

同步提交 & 异步提交 两者的
相同点是:都会将本次提交的一批数据最高的偏移量提交;
不同点是:同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。
image.png

由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。

  1. // Kafka 的配置信息
  2. Properties kafkaProperties = new Properties();
  3. // 设置不自动提交 offset
  4. kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
  5. // 创建 Kafka 消费者对象
  6. KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(kafkaProperties);
  7. // 同步提交
  8. kafkaConsumer.commitSync();
  9. // 异步提交
  10. kafkaConsumer.commitAsync();

指定 Offset 消费 & 指定时间消费

参数 auto.offset.reset = earliest | latest | none 默认是 latest。
当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不存在当前偏移量时(例如该数据已被删除),该怎么办?

  • earliest:自动将偏移量重置为最早的偏移量,—from-beginning。
  • latest(默认值):自动将偏移量重置为最新偏移量。
  • none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。

image.png


  1. public class CustomCustomer {
  2. public static void main(String[] args) {
  3. // Kafka 的配置信息
  4. Properties kafkaProperties = new Properties();
  5. // 连接的 Kafka 服务器信息、key value 的反序列化方式、消费者组ID(必要信息)
  6. kafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "47.96.188.26:9092");
  7. kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  8. kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  9. // 配置消费者组ID, 可任意起名
  10. kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId");
  11. // 创建 Kafka 消费者对象
  12. KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(kafkaProperties);
  13. List<String> topics = new ArrayList<>();
  14. topics.add("myFirstTopic");
  15. // 订阅要消费的主题(可订阅多个)
  16. kafkaConsumer.subscribe(topics);
  17. Set<TopicPartition> topicPartitions = new HashSet<>();
  18. while (topicPartitions.size() == 0) {
  19. // 不明白这里为什么要拉取一次数据,但是如果把这行代码注释掉,程序运行到下面一行就会停住
  20. kafkaConsumer.poll(Duration.ofSeconds(1));
  21. // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
  22. topicPartitions = kafkaConsumer.assignment();
  23. }
  24. // 遍历所有分区,并指定 offset 从 100 的位置开始消费
  25. for (TopicPartition topicPartition : topicPartitions) {
  26. kafkaConsumer.seek(topicPartition, 100);
  27. }
  28. // 拉取数据
  29. while (true) {
  30. // 设置尝试拉取数据, 但没有数据时最多等待 1s
  31. System.out.println("尝试拉取数据");
  32. ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
  33. for (ConsumerRecord<String, String> record : records) {
  34. System.out.println("key = " + record.key());
  35. System.out.println("value = " + record.value());
  36. System.out.println("offset = " + record.offset());
  37. }
  38. }
  39. }
  40. }

在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。
例如要求按照时间消费前一天的数据,怎么处理?

  1. Set<TopicPartition> topicPartitions = new HashSet<>();
  2. while (topicPartitions.size() == 0) {
  3. // 不明白这里为什么要拉取一次数据,但是如果把这行代码注释掉,程序运行到下面一行就会停住
  4. kafkaConsumer.poll(Duration.ofSeconds(1));
  5. // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
  6. topicPartitions = kafkaConsumer.assignment();
  7. }
  8. HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
  9. // 封装集合存储,每个分区对应一天前的数据
  10. for (TopicPartition topicPartition : topicPartitions) {
  11. timestampToSearch.put(topicPartition,
  12. System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
  13. }
  14. // 获取从 1 天前开始消费的每个分区的 offset
  15. Map<TopicPartition, OffsetAndTimestamp> offsets
  16. = kafkaConsumer.offsetsForTimes(timestampToSearch);
  17. // 遍历消费者分区分配信息,对每个分区设置消费时间。
  18. for (TopicPartition topicPartition : topicPartitions) {
  19. OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
  20. // 根据时间指定开始消费的位置
  21. if (offsetAndTimestamp != null) {
  22. kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
  23. }
  24. }

漏消费 & 重复消费

重复消费:已经消费了数据,但是 offset 没提交。
漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。

image.png

思考:怎么能做到既不漏消费也不重复消费呢?解决办法:消费者事务。

消费者事务

如果想完成 Consumer 端的精准一次性消费,那么需要 Kafka 消费端将消费过程和提交 offset 过程做原子绑定。此时我们需要将 Kafka 的 offset 保存到支持事务的自定义介质( 比如 MySQL)。这部分知识会在后续项目部分涉及。
image.png

数据积压(消费者如何提高吞吐量)

如果是 Kafka 消费能力不足,则可以考虑增加 Topic 的分区数,并且同时提升消费组的消费者数量,消费者数= 分区数。(两者缺一不可)
image.png


如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(Kafka 消费者 - 图20),使处理的数据小于生产的数据,也会造成数据积压。
image.png


对于数据积压问题,可以修改参数:

fetch.max.bytes 消费者从 Broker 获取一批消息最大的字节数。
如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。
一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。
默认为:52428800(50 m)
max.poll.records 一次 poll 拉取数据返回消息的最大条数,默认是 500 条。