Kafka 消费方式
有两种消费方式,一种是 pull( 拉) 模式,一种是 push(推)模式。
Kafka 采用了 pull( 拉) 模式。
pull( 拉) 模式:Consumer 从 Broker 中主动拉取数据。
push(推)模式:Kafka 没有采用这种方式,因为由 Broker 决定消息发送的速率,很难适应所有消费者的消费速率。例如 Broker 推送的速度是 50m/s, Consumer1、Consumer2 就来不及处理消息。
pull 模式的不足之处是:如果 Kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。
Kafka 消费者工作流程
消费者总体工作流程
消费者组原理
Consumer Group(CG):消费者组,由多个 Consumer 组成。
形成一个消费者组的条件是:消费者的 groupid 相同。
消费者组内每个消费者负责消费不同分区的数据,一个分区的数据只能由消费者组中的一个消费者消费。
一个消费者可以消费多个分区的数据。
如果向消费组中添加更多的消费者,超过主题分区数量,则有一部分消费者会闲置,不会接收任何消息。
消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
消费者组初始化流程
- 每个 Consumer 都发送 JoinGroup 请求
- 选出一个 Consumer 作为 Leader 消费者
- 把要消费的 Topic 情况发送给 Leader 消费者
- Leader 负责制定消费方案,为其他消费者指定消费的分区
- 把消费方案发给 Coordinator
- Coordinator 就把消费方案下发给各个 Consumer
Coordinator:辅助实现消费者组的初始化和分区的分配。
Coordinator 节点选择 = groupid 的 hashcode 值 % 50( consumer_offsets 的分区数量默认为 50)
例如: groupid 的 hashcode 值 = 1,1% 50 = 1,那么 consumer_offsets 主题的 1 号分区,在哪个Broker 上,就选择这个节点的 Coordinator 作为这个消费者组的 Leader。消费者组下的所有的消费者提交 offset 的时候就往这个分区去提交 offset。
每个消费者都会和 Coordinator 保持心跳(心跳时间默认为 3s),一旦超时(session.timeout.ms=45s),该消费者会被移除,并触发再平衡;或者消费者处理消息的时间过长(max.poll.interval.ms=5分钟),也会触发再平衡
消费者组详细消费流程
消费者重要参数
参数名称 | 描述 |
---|---|
bootstrap.servers | 向 Kafka 集群建立初始连接用到的 host/port 列表。 |
key.deserializer 和 value.deserializer | 指定接收消息的 key 和 value 的反序列化类型。 一定要写全类名。 |
group.id | 标记消费者所属的消费者组。一定要写。 |
enable.auto.commit | 消费者会自动周期性地向服务器提交偏移量。默认为 true。 |
auto.commit.interval.ms | 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认为 5s。 |
auto.offset.reset | 当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在时, (如,数据被删除了),该如何处理? - earliest:自动重置偏移量到最早的偏移量。 - latest:自动重置偏移量为最新的偏移量。默认为 latest。 - none:如果消费者原来的偏移量不存在,则向消费者抛异常。 - anything:向消费者抛异常。 |
offsets.topic.num.partitions | consumer_offsets 的分区数,默认为 50 个分区。 |
heartbeat.interval.ms | Kafka 消费者和 Coordinator 之间的心跳时间,默认为 3s。 该条目的值必须小于 session.timeout.ms , 也不应该高于session.timeout.ms 的 1/3。 |
session.timeout.ms | Kafka 消费者和 Coordinator 之间的连接超时时间,默认为 45s。 超过该值,该消费者被移除,消费者组执行再平衡。 |
max.poll.interval.ms | 消费者处理消息的最大时长,默认为 5 分钟。 超过该值,该消费者被移除,消费者组执行再平衡。 |
fetch.min.bytes | 消费者从 Broker 获取一批消息最小的字节数。默认为 1 个字节。 |
fetch.max.wait.ms | 如果没有从 Broker 获取到一批数据的最小字节数。 该时间到,仍然会返回数据。默认为 500ms。 |
fetch.max.bytes | 消费者从 Broker 获取一批消息最大的字节数。 如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。 一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。 默认为:52428800(50 m)。 |
max.poll.records | 一次 poll 拉取数据返回消息的最大条数,默认是 500 条。 |
partition.assignment.strategy | 消费者分区分配策略,可以选择的策略包括: Range、RoundRobin 、 Sticky 、CooperativeSticky 默认策略为 Range + CooperativeSticky。 Kafka 可以同时使用多个分区分配策略。 |
消费者 API
public class CustomCustomer {
public static void main(String[] args) {
// Kafka 的配置信息
Properties kafkaProperties = new Properties();
// 连接的 Kafka 服务器信息、key value 的反序列化方式、消费者组ID(必要信息)
kafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "47.96.188.26:9092");
kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组ID, 可任意起名
kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId");
// 创建 Kafka 消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(kafkaProperties);
List<String> topics = new ArrayList<>();
topics.add("myFirstTopic");
// 订阅要消费的主题(可订阅多个)
kafkaConsumer.subscribe(topics);
// 拉取数据
while (true) {
// 设置尝试拉取数据, 但没有数据时最多等待 1s
System.out.println("尝试拉取数据");
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println("key = " + record.key());
System.out.println("value = " + record.value());
System.out.println("offset = " + record.offset());
}
}
}
}
List<TopicPartition> partitions = new ArrayList<>();
partitions.add(new TopicPartition("myFirstTopic", 0));
// 订阅指定主题的指定分区(可订阅多个)
kafkaConsumer.assign(partitions);
分区的分配以及再平衡
一个「消费者组」中有多个「消费者」,一个「主题」由多个「分区」组成,
现在的问题是:到底由哪个「消费者」来消费哪个「分区」的数据。
Kafka 有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。
可以通过配置参数 partition.assignment.strategy 修改分区的分配策略。
默认策略是:Range + CooperativeSticky。
Kafka 可以同时使用多个分区分配策略。
Java 程序设置分区分配策略的代码
// Kafka 的配置信息
Properties kafkaProperties = new Properties();
kafkaProperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"全限定类名");
// 类名如下
// RoundRobinAssignor
// StickyAssignor
// StickyAssignor
// 创建 Kafka 消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(kafkaProperties);
Range 以及再平衡
Range 是对每个 Topic 而言的。
首先对同一个 Topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
假如现在有 7 个分区,3 个消费者,排序后的分区将会是:0, 1, 2, 3, 4, 5, 6;消费者排序后将会是:C0, C1, C2。
通过 来决定每个消费者消费几个分区。如果除不尽,那么前面几个消费者将会多消费 1 个分区。
例如,,除不尽,那么消费者 C0 便会多消费 1 个分区。
,除不尽,那么 C0 和 C1 分别多消费一个。
注意:如果只是针对 1 个 Topic 而言,C0 消费者多消费 1 个分区影响不是很大。但是如果有 N 多个 Topic,那么针对每个 Topic,消费者 C0 都将多消费 1 个分区,Topic 越多,C0 消费的分区会比其他消费者明显多消费 N 个分区。容易产生数据倾斜!
Range 分区分配再平衡案例
停掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
结果发现:
1 号消费者:消费到 3、4 号分区数据。
2 号消费者:消费到 5、6 号分区数据。
0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者。
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 Broker 执行。
再次重新发送消息观看结果(45s 以后)。
结果发现:
1 号消费者:消费到 0、1、2、3 号分区数据。
2 号消费者:消费到 4、5、6 号分区数据。
说明:消费者 0 已经被踢出消费者组,所以重新按照 Range 方式分配。
RoundRobin 以及再平衡
RoundRobin 是针对集群中所有 Topic 而言。
RoundRobin 轮询分区策略,是把「所有的分区」和「所有的消费者」都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配「分区」给到「各个消费者」。
RoundRobin 分区分配再平衡案例
停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
结果发现:
1 号消费者:消费到 2、5 号分区数据
2 号消费者:消费到 4、1 号分区数据
0 号消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 0 、6 和 3 号分区数据,分别由 1 号消费者或者 2 号消费者消费。
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
再次重新发送消息观看结果(45s 以后)。
结果发现:
1 号消费者:消费到 0、2、4、6 号分区数据
2 号消费者:消费到 1、3、5 号分区数据
说明:消费者 0 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。
Sticky 以及再平衡
粘性分区定义:可以理解为分配的结果带有“粘性”。即在执行一次新的分配之前, 考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
Kafka 从 0.11.x 版本开始引入粘性分区这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
offset 位移
offset 的默认维护位置
Kafka0.9 版本之前, Consumer 默认将 offset 保存在 Zookeeper 中。
从 0.9 版本开始,Consumer 默认将 offset 保存在 Kafka 的一个内置 Topic 中,该 Topic 为 consumer_offsets。
consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+ 分区号,value 就是当前 offset 的值。每隔一段时间, Kafka 内部会对这个 Topic 进行 compact,也就是每个 group.id+topic+分区号就保留最新数据。
自动提交 & 手动提交 offset
为了使我们能够专注于自己的业务逻辑,Kafka 提供了自动提交 offset 的功能。
自动提交 offset 的相关参数:
- enable.auto.commit:是否开启自动提交 offset 功能,默认是 true
- auto.commit.interval.ms:自动提交 offset 的时间间隔,默认是 5s
虽然自动提交 offset 十分简单便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因此 Kafka 还提供了手动提交 offset 的 API。
手动提交 offset 的方法有两种分别是:commitSync(同步提交)和 commitAsync(异步提交)。
- commitSync(同步提交):必须等待 offset 提交完毕,再去消费下一批数据。
- commitAsync(异步提交) :发送完提交 offset 请求后,就开始消费下一批数据了。
同步提交 & 异步提交 两者的
相同点是:都会将本次提交的一批数据最高的偏移量提交;
不同点是:同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。
由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。
// Kafka 的配置信息
Properties kafkaProperties = new Properties();
// 设置不自动提交 offset
kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 创建 Kafka 消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(kafkaProperties);
// 同步提交
kafkaConsumer.commitSync();
// 异步提交
kafkaConsumer.commitAsync();
指定 Offset 消费 & 指定时间消费
参数 auto.offset.reset = earliest | latest | none 默认是 latest。
当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不存在当前偏移量时(例如该数据已被删除),该怎么办?
- earliest:自动将偏移量重置为最早的偏移量,—from-beginning。
- latest(默认值):自动将偏移量重置为最新偏移量。
- none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。
public class CustomCustomer {
public static void main(String[] args) {
// Kafka 的配置信息
Properties kafkaProperties = new Properties();
// 连接的 Kafka 服务器信息、key value 的反序列化方式、消费者组ID(必要信息)
kafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "47.96.188.26:9092");
kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组ID, 可任意起名
kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId");
// 创建 Kafka 消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(kafkaProperties);
List<String> topics = new ArrayList<>();
topics.add("myFirstTopic");
// 订阅要消费的主题(可订阅多个)
kafkaConsumer.subscribe(topics);
Set<TopicPartition> topicPartitions = new HashSet<>();
while (topicPartitions.size() == 0) {
// 不明白这里为什么要拉取一次数据,但是如果把这行代码注释掉,程序运行到下面一行就会停住
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
topicPartitions = kafkaConsumer.assignment();
}
// 遍历所有分区,并指定 offset 从 100 的位置开始消费
for (TopicPartition topicPartition : topicPartitions) {
kafkaConsumer.seek(topicPartition, 100);
}
// 拉取数据
while (true) {
// 设置尝试拉取数据, 但没有数据时最多等待 1s
System.out.println("尝试拉取数据");
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println("key = " + record.key());
System.out.println("value = " + record.value());
System.out.println("offset = " + record.offset());
}
}
}
}
在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。
例如要求按照时间消费前一天的数据,怎么处理?
Set<TopicPartition> topicPartitions = new HashSet<>();
while (topicPartitions.size() == 0) {
// 不明白这里为什么要拉取一次数据,但是如果把这行代码注释掉,程序运行到下面一行就会停住
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
topicPartitions = kafkaConsumer.assignment();
}
HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
// 封装集合存储,每个分区对应一天前的数据
for (TopicPartition topicPartition : topicPartitions) {
timestampToSearch.put(topicPartition,
System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}
// 获取从 1 天前开始消费的每个分区的 offset
Map<TopicPartition, OffsetAndTimestamp> offsets
= kafkaConsumer.offsetsForTimes(timestampToSearch);
// 遍历消费者分区分配信息,对每个分区设置消费时间。
for (TopicPartition topicPartition : topicPartitions) {
OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
// 根据时间指定开始消费的位置
if (offsetAndTimestamp != null) {
kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
}
}
漏消费 & 重复消费
重复消费:已经消费了数据,但是 offset 没提交。
漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。
思考:怎么能做到既不漏消费也不重复消费呢?解决办法:消费者事务。
消费者事务
如果想完成 Consumer 端的精准一次性消费,那么需要 Kafka 消费端将消费过程和提交 offset 过程做原子绑定。此时我们需要将 Kafka 的 offset 保存到支持事务的自定义介质( 比如 MySQL)。这部分知识会在后续项目部分涉及。
数据积压(消费者如何提高吞吐量)
如果是 Kafka 消费能力不足,则可以考虑增加 Topic 的分区数,并且同时提升消费组的消费者数量,消费者数= 分区数。(两者缺一不可)
如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(),使处理的数据小于生产的数据,也会造成数据积压。
对于数据积压问题,可以修改参数:
fetch.max.bytes | 消费者从 Broker 获取一批消息最大的字节数。 如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。 一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。 默认为:52428800(50 m)。 |
---|---|
max.poll.records | 一次 poll 拉取数据返回消息的最大条数,默认是 500 条。 |