消费者与消费者组

消费者(consumer)需要订阅Kafka中的主题,并且从主题里拉取消息。Kafka的消费者通过拉模式来消费消息。与其他消息中间件不同的是,Kafka的消费者还有一层消费组(consumer group)的概念。每个消费者会归属一个消费者组,每条消息只会投递到消费组中的某个消费者。
消费者与消费者组:

image.png
消费者与消费组这种模型可以让整体的消费能力具备伸缩性,我们可以增加(减少)消费者的个数来提高(或降低)系统的消费能力。对于固定数量的分区而言,不停的增加消费者并不能一直提升消费能力。
Kafka默认的分配策略下:如果消费者个数太多大于分区的个数,则会出现某些消费者分配不到分区,此时增加消费者无意义(可通过参数partition.assignment.strategy配置)

组内有3个消费者:
image.png
组内消费者过多:
image.png**
对于消费中间件而言,一般有2中消息投递模式:点对点(Point-to-Point,P2P)模式与发布订阅模式(Pub/Sub).
点对点是基于队列的,消息生产者发送消息到队列,消费者从队列接收消息。
发布/订阅模式定义了如何小一个内容节点发布和订阅消息,这个内容节点成为主题(Topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者从主题中订阅消息.主题使得消息的订阅者与发布者互相保持独立,不需要耦合,不需要接触即可保证消息的传递。发布/订阅模式在消息的一对多广播时采用。
Kafka通过消费者与消费组同时支持两种消息投递模式。

  • 如果所有的消费者都属于同一个消费组,那么所有的消息都会被均衡的投递给每个消费者,即每条消息只会被一个消费者处理,这相当于的点对点模式。
  • 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费组,即每条消息会被所有的消费组处理,这就相当于发布订阅模式。

消费组是一个逻辑上的概念,他将旗下的消费组归为一类,每个消费者只隶属与一个消费组。每个消费组都会有一个固定的名称,消费者在进行消费前需要配置group.id来指定消费组的名称,默认为空字符串。

消费者并非逻辑上的概念,是实际的应用实例(如JVM实例),可以是一个线程或一个进程。同一个消费者组内的消费者一般部署到不同的机器上,当然也可以部署到同一台物理机上,通过Docker来隔离。

客户端开发

一般步骤

一般正常的消费者,其消费逻辑分为几个步骤:
1)配置消费者客户端参数即创建相应的消费者实例
2)订阅主题
3)拉取消息并消费
4)提交消费位移(手动提交或自动提交)
5)关闭消费者实例(可以跟随应用来关闭)

  1. /**
  2. * 手动同步提交
  3. * 同步提交:commitSync(); 阻塞等待-影响消费吞吐量,提交失败会抛出异常,自动重试
  4. * 异步提交:commitAsync(); 异步可能会失败,导致消费出现问题
  5. * @throws InterruptedException
  6. */
  7. public static void manCommitOffsetWithSync() throws InterruptedException {
  8. // 1. 消费者配置
  9. Properties properties = new Properties();
  10. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  11. // 自动提交offset
  12. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  13. // 提交offset的时间,单位ms,即1秒钟提交一次
  14. properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
  15. // 指定k-v反序列化规则
  16. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  17. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  18. // 指定消费者组
  19. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
  20. // 指定clientId
  21. properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "my-client-demo");
  22. // 2. 创建消费者
  23. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  24. try {
  25. // 订阅主题
  26. consumer.subscribe(Collections.singletonList("quickstart-events"));
  27. while (isRunning.get()){
  28. // 拉取数据,指定轮询时间为1秒
  29. ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
  30. for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  31. System.out.println(consumerRecord.toString());
  32. }
  33. // 同步提交
  34. consumer.commitSync();
  35. TimeUnit.SECONDS.sleep(1);
  36. }
  37. } catch (Exception e) {
  38. log.error("consumer error", e);
  39. } finally {
  40. consumer.close();
  41. }
  42. }

其他消费者参数

除了必需设置的消费者参数,如bootstrap.servers, group.id, key.deserializer, value.deserializer 等,一般还需要知晓一些额外的消费者参数,这些参数大多都有合适的默认值,通常不需要修改。

参数 说明
fetch.min.bytes 该属性指定了消费者从服务器获取记录的最小字节数。broker 在收到消费者的数据请求时, 如果可用的数据量小于 fetch.min.bytes 指定的大小,那么它会等到有足够的可用数据时 才把它返回给消费者。这样可以降低消费者和 broker 的工作负载,因为它们在主题不是很 活跃的时候(或者一天里的低谷时段)就不需要来来回回地处理消息。如果没有很多可用 数据,但消费者的 CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。如果 消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。
fetch.max.wait.ms 我们通过 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才把它返回给消费者。而 fetch.max.wait.ms 则用于指定 broker 的等待时间,默认是 500ms。如果没有足够的数据流入 Kafka,消费者获取最小数据量的要求就得不到满足,最终导致 500ms 的延迟。如果要降低 潜在的延迟(为了满足 SLA),可以把该参数值设置得小一些。如果 fetch.max.wait.ms 被设 为 100ms,并且 fetch.min.bytes 被设为 1MB,那么 Kafka 在收到消费者的请求后,要么返 回 1MB 数据,要么在 100ms 后返回所有可用的数据,就看哪个条件先得到满足。
max.partition.fetch.bytes 该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是 1MB,也就是说,KafkaConsumer.poll() 方法从每个分区里返回的记录最多不超过 max.partition. fetch.bytes 指定的字节。如果一个主题有 20 个分区和 5 个消费者,那么每个消费者需要 至少 4MB 的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,因 为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。max.partition. fetch.bytes 的值必须比 broker 能够接收的最大消息的字节数(通过 max.message.size 属 性配置)大,否则消费者可能无法读取这些消息,导致消费者一直挂起重试。在设置该属
性时,另一个需要考虑的因素是消费者处理数据的时间。消费者需要频繁调用 poll() 方法 来避免会话过期和发生分区再均衡,如果单次调用 poll() 返回的数据太多,消费者需要更 多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。如果出现这种情况,可 以把 max.partition.fetch.bytes 值改小,或者延长会话过期时间。
session.timeout.ms 该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。如 果消费者没有在 session.timeout.ms 指定的时间内发送心跳给群组协调器,就被认为 已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者。该属性与 heartbeat.interval.ms 紧 密 相 关。heartbeat.interval.ms 指 定 了 poll() 方 法 向 协 调 器 发送心跳的频率,session.timeout.ms 则指定了消费者可以多久不发送心跳。所以,一 般需要同时修改这两个属性,heartbeat.interval.ms 必须比 session.timeout.ms 小,一 般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那么 heartbeat. interval.ms 应该是 1s。把 session.timeout.ms 值设得比默认值小,可以更快地检测和恢 复崩溃的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡。把该属性的值设 置得大一些,可以减少意外的再均衡,不过检测节点崩溃需要更长的时间。
auto.offset.reset 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长 时间失效,包含偏移量的记录已经过时并被删除)该作何处理。它的默认值是 latest,意 思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之 后生成的记录)。另一个值是 earliest,意思是说,在偏移量无效的情况下,消费者将从 起始位置读取分区的记录。
enable.auto.commit 该属性指定了消费者是否自动提交偏移 量,默认值是 true。为了尽量避免出现重复数据和数据丢失,可以把它设为 false,由自 己控制何时提交偏移量。如果把它设为 true,还可以通过配置 auto.commit.interval.ms 属性来控制提交的频率。
partition.assignment.strategy 分区会被分配给群组里的消费者,PartitionAssignor根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。Kafka 有两个默认的分配策略:
- Range

该策略会把主题的若干个连续的分区分配给消费者。假设消费者 C1 和消费者 C2 同时 订阅了主题 T1 和主题 T2,并且每个主题有 3 个分区。那么消费者 C1 有可能分配到这 两个主题的分区 0 和分区 1,而消费者 C2 分配到这两个主题的分区 2。因为每个主题 拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消 费者更多的分区。只要使用了 Range 策略,而且分区数量无法被消费者数量整除,就会 出现这种情况。

- RoundRobin
该策略把主题的所有分区逐个分配给消费者。如果使用 RoundRobin 策略来给消费者 C1 和消费者 C2 分配分区,那么消费者 C1 将分到主题 T1 的分区 0 和分区 2 以及主题 T2 的分区 1,消费者 C2 将分配到主题 T1 的分区 1 以及主题 T2 的分区 0 和分区 2。一般 来说,如果所有消费者都订阅相同的主题(这种情况很常见),RoundRobin 策略会给所 有消费者分配相同数量的分区(或最多就差一个分区)。

可以通过设置 partition.assignment.strategy 来选择分区策略。默认使用的是 org. apache.kafka.clients.consumer.RangeAssignor,这个类实现了 Range 策略,不过也可以 把它改成 org.apache.kafka.clients.consumer.RoundRobinAssignor。我们还可以使用自定 义策略,在这种情况下,partition.assignment.strategy 属性的值就是自定义类的名字 | | client.id | 该属性可以是任意字符串,broker 用它来标识从客户端发送过来的消息,通常被用在日志、 度量指标和配额里。 | | max.poll.records | 该属性用于控制单次调用 call() 方法能够返回的记录数量,可以帮你控制在轮询里需要处 理的数据量 | | receive.buffer.bytes 和 send.buffer.bytes | socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。如果它们被设为 -1,就使用操 作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心内,可以适当增大这 些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。 |

位移提交

对于Kafka的分区而言,写入消息成功后每条消息都会一个偏移量(offset), 用来表示消息具体的位置。而对于Kafka消费者而言,也有一个消费者offset, 一般称为位移,表示当前消费者消费分区消息的位置。

Kafka通过poll方法每次拉取一批消息,返回未消费过的消息集,因此Kafka需要针对每个分区的消费者记录它上次消费确认过的位移(consumer-offset),并且这个上次消费位置需要持久化存储,便于某个消费者下线后重启,或再均衡时同一分区分配给其他消费者,都需要重新拉取该分区上次消费的位移。

为了能持久化这个消费位移,Kafka消费者需要显示告诉Broker,这个过程称为位移提交。在旧版本的Kafka中,提交的位移是保存在ZooKeeper中,由于ZK不适合做高并发、大数量的消息消息写,因此在新版的Kafka中,消费位移存储在Kafka内部主题:__consumer_offsets中,消费者提交位移时,直接向这个主题发送提交消息,其可靠性保证通过Kafka的分区与副本机制来保证。

提交消息位移

由于消费者是批量拉取消息,消费完成后,会找到该批次中最后一条消息的位置然后取出它的偏移量,增加1后并做提交,表示下一条要开始消费的消息位移。但是在提交消费位移的过程中,如果处理不当,会出现重复消费或消息的丢失的情况。
**
如下图所示,x表示某一次拉取分区消息最大的偏移量,当前已消费到x这个位置,表示当前消费者的消费位移为x

image.png
但是当前消费者需要提交的位移为:x+1, 对应与图中的position。在Kafka中的API中,用如下方法来获取下一条消息的位置

  1. Consumer#position(org.apache.kafka.common.TopicPartition)

1)上次提交的位移后,消费者拉取新的批次消息后,还未处理完发生宕机或再均衡,则消息会被重复投递
image.png
2)上次提交位移后,消费者拉取新的批次消息后,还未消费完就提交了本次批次消息,但是发生宕机或再均衡,则会丢失消息
image.png
所以,处理位移的方式对消息可靠性也有很大影响,但是Kafka 提供了多种API来提交位移。

自动提交
消费者设置enable.auto.commit=true 时,Kafka会在一定的间隔时间周期内,自动提交本次拉取批次消息。间隔时间通过设置auto.commit.interval.ms ,默认为5s.
注意:这种方式存在消息重复的情况,消费者还未提交消息时发生了宕机或再均衡,则消息会被重复拉取。

手动提交**

  1. KafkaConsumer#commitSync() // 同步提交
  2. KafkaConsumer#commitAsync() // 异步提交

再均衡(Rebalance)

Kafka通过消费者组来管理一组消费者实例,这种机制具有很好的扩展性且具有很好的容错性。Kafka的消费组通过再均衡协议来平衡组内的消费者,Rebalance的触发条件有3个:

  1. 组成员数发生变更。比如有新的Consumer实例加入组或者离开组,抑或是有Consumer实例崩溃被“踢出”组。
  2. 订阅主题数发生变更。Consumer Group可以使用正则表达式的方式订阅主题,比如consumer.subscribe(Pattern.compile(“t.*c”))就表明该Group订阅所有以字母t开头、字母c结尾的主题。在Consumer Group的运行过程中,你新创建了一个满足这样条件的主题,那么该Group就会发生Rebalance。
  3. 订阅主题的分区数发生变更。Kafka当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有Group开启Rebalance。

再均衡让消费者具备高可用与扩展性,使得主题可以很安全的增加消费者或减少消费者。但是再均衡发生期间,组内的所有消费者都会暂停消费,也就是说在暂停这个时间内,消费组变得不可用。另外当某个分区被重新分配给其他消费者时,之前的状态也会丢失,例如上个消费者的消费位移,导致消息可能重复消费。一般情况下,我们希望能尽量避免发生再均衡。

消费者客户端订阅某个主题时,可指定一个再均衡监听器。再均衡监听器会在再均衡生命周期中回调相应的动作,用以执行一些准备或收尾的动作。如果容忍再均衡带来的重复消费问题,可以通过再均衡监听器实现里把当前消费位移做持久化或手动提交一下。

  1. public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener);
  2. public interface ConsumerRebalanceListener {
  3. void onPartitionsRevoked(Collection<TopicPartition> partitions);
  4. void onPartitionsAssigned(Collection<TopicPartition> partitions);
  5. }

onPartitionsRevoked(Collection<TopicPartition> partitions) 这个方法会在再均衡发生之前和消费者停止读取消息之后被调用,参数partitions 表示再均衡前所分配的分区信息。可以用这个方法来处理消费位移的提交或保存,用来避免一些不必要的重复消费。

void onPartitionsAssigned(Collection<TopicPartition> partitions) 这个方法会在重新分配分区和消费者开始新消费之前被调用。partitions表示再均衡后分配的分区信息。

  1. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(...);
  2. Map<TopicPartition, OffsetAndMetadata> currentOffsets =new HashMap<>() ;
  3. consumer.subscribe(Arrays.asList("demo-topic"), new ConsumerRebalanceListener() {
  4. @Override
  5. public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
  6. // 处理方式1:手动提交一次位移
  7. consumer.commitSync(currentOffsets);
  8. currentOffsets .clear();
  9. // 处理方式2:持久化到DB
  10. // save(currentOffsets);
  11. }
  12. @Override
  13. public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
  14. for (TopicPartition tp : partitions) {
  15. consumer.seek(tp, getOffsetFromDB(tp));
  16. }
  17. }
  18. });
  19. try {
  20. while (isRunning.get()) {
  21. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));// 1秒轮询一次
  22. for (ConsumerRecord<String, String> record : records) {
  23. // 处理消费逻辑
  24. System.out.println(record);
  25. // 这里记录每个分区主题的下一个提交位移
  26. // 由于Key是TopicPartition,因此相同分区的消息的Offset会被覆盖,直到取最后一条消息的提交位移
  27. currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)) ;
  28. }
  29. consumer.commitAsync(currentOffsets, null);
  30. }
  31. } finally {
  32. consumer.close();
  33. }

消费者拦截器

与生产者类似,消费者可通过配置参数:org.apache.kafka.clients.consumer.ConsumerConfig#INTERCEPTOR_CLASSES_CONFIG 来指定消费者拦截器,实现的接口为:

  1. public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
  2. /**
  3. This is called just before the records are returned by poll
  4. This method is allowed to modify consumer records, in which case the new records will be
  5. * returned. There is no limitation on number of records that could be returned from this
  6. * method. I.e., the interceptor can filter the records or generate new records.
  7. Any exception thrown by this method will be caught by the caller, logged, but not propagated to the client
  8. */
  9. public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
  10. /**
  11. * This is called when offsets get committed.
  12. * <p>
  13. * Any exception thrown by this method will be ignored by the caller.
  14. *
  15. * @param offsets A map of offsets by partition with associated metadata
  16. */
  17. public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
  18. }

Kafka Consumer 拉取到消息后,会在调用poll方法之前调用拦截器的onConsume()方法来对消息做定制处理,比如修改消息的内容,过滤消息。如果抛出异常,则会被捕获,不会在继续传递出来。

多线程消费

KafkaProducer是线程安全的,然而KafkaConsumer却是非线程安全的。KafkaConsumer内部通过acquire方法来检测是否只有一个线程在操作,如有其它线程调用KafkaConsuemr的方法,则抛出异常,acquire方法定义:

  1. // currentThread holds the threadId of the current thread accessing KafkaConsumer
  2. // and is used to prevent multi-threaded access
  3. private static final long NO_CURRENT_THREAD = -1L
  4. private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
  5. // refcount is used to allow reentrant access by the thread who has acquired currentThread
  6. private final AtomicInteger refcount = new AtomicInteger(0);
  7. private void acquire() {
  8. long threadId = Thread.currentThread().getId();
  9. if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
  10. throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
  11. refcount.incrementAndGet();
  12. }

acuire方法实现了一个JVM级别的轻量级锁的功能,内部通过一个原子变量AtomicLong对象操作计数的方式来检测是否发生了并发,以此用来保证只有一个线程在操作。accquire方法与release方法组合使用,用以实现加锁与解锁操作。release方法如下:

  1. /**
  2. * Release the light lock protecting the consumer from multi-threaded access.
  3. */
  4. private void release() {
  5. if (refcount.decrementAndGet() == 0)
  6. currentThread.set(NO_CURRENT_THREAD);
  7. }

虽然KafkaConsumer对象是非线程安全的,但是仍然可以采用多线程方案来消费消息。

第一种:线程封闭

线程封闭即每个线程实例化一个KafkaConsumer对象
image.png
参考:指定的线程数量可通过获取分区数量,一般不建议大于分区数量,获取分区数量可通过如下方法获取:

List partitionsFor(String topic)

简要实现如下:

  1. public void oneKafkaConsumerPerThread() {
  2. Properties props = initConfig(true);
  3. int consumerThreadNum = 4;
  4. for(int i=0;i<consumerThreadNum;i++) {
  5. new KafkaConsumerPerThread(props, "demo-topic").start();
  6. }
  7. }
  8. private static class KafkaConsumerPerThread extends Thread {
  9. KafkaConsumer<String, String> consumer;
  10. public KafkaConsumerPerThread(Properties props, String topic) {
  11. this.consumer = new KafkaConsumer<>(props);
  12. this.consumer.subscribe(Arrays.asList(topic));
  13. }
  14. @Override
  15. public void run() {
  16. try {
  17. while (isRunning.get()) {
  18. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));// 1秒轮询一次
  19. for (ConsumerRecord<String, String> record : records) {
  20. // 处理消费逻辑
  21. System.out.println(record);
  22. }
  23. consumer.commitAsync();
  24. }
  25. } finally {
  26. consumer.commitSync();
  27. consumer.close();
  28. }
  29. }
  30. }

第二种:线程池消费

image.png
简要实现如下:

  1. public void threadPoolConsume() {
  2. new MultiThreadConsumePool(initConfig(false),
  3. Runtime.getRuntime().availableProcessors(),
  4. "demo-topic").start();;
  5. }
  6. private static class MultiThreadConsumePool extends Thread {
  7. private ExecutorService executor;
  8. private KafkaConsumer<String, String> consumer;
  9. private int consumerThreadNum;
  10. public MultiThreadConsumePool(Properties props, int consumerThreadNum, String topic) {
  11. consumer = new KafkaConsumer<>(props);
  12. consumer.subscribe(Collections.singleton(topic));
  13. this.consumerThreadNum = consumerThreadNum;
  14. executor =new ThreadPoolExecutor(consumerThreadNum, consumerThreadNum, 0L, TimeUnit.MILLISECONDS,
  15. new ArrayBlockingQueue<>(1000) , new ThreadPoolExecutor. CallerRunsPolicy() ) ;
  16. }
  17. @Override
  18. public void run() {
  19. while (isRunning.get()) {
  20. while (isRunning.get()) {
  21. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));// 1秒轮询一次
  22. if (!records.isEmpty()) {
  23. executor.submit(new Runnable() {
  24. @Override
  25. public void run() {
  26. // 消费Records
  27. }
  28. });
  29. }
  30. }
  31. }
  32. }
  33. }

对比

image.png