多线程开发消费者实例

KafkaProducer 是线程安全的,但 KafkaConsumer 却是非线程安全的。所有的网络 I/O 处理都发生在用户主线程中,因此在使用过程中必须要确保线程安全。KafkaConsumer 的内部定义了一个 acquire() 方法用来检测当前是否只有一个线程在操作,若有其他线程正在操作则会抛出 ConcurrentModifcationException 异常。

KafkaConsumer 中的每个公用方法在执行所要执行的动作之前都会调用这个 acquire() 方法,不过 wakeup() 方法是个例外,acquire() 方法的具体定义如下:

  1. private void acquire() {
  2. long threadId = Thread.currentThread().getId();
  3. if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
  4. throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
  5. refcount.incrementAndGet();
  6. }
  7. private void release() {
  8. if (refcount.decrementAndGet() == 0)
  9. currentThread.set(NO_CURRENT_THREAD);
  10. }

由代码可知,acquire() 方法不会造成阻塞等待,可以将其看作一个轻量级的锁,它仅通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作。acquire() 方法与 release() 方法成对出现,表示相应的加锁和解锁操作。这两个方法都是私有方法,因此在实际应用中不需要我们显示调用,但了解其内部但机制后可以帮助我们编写正确、有效地程序逻辑。

KafkaConsumer 非线程安全并不意味着我们在消费消息时只能以单线程的方式执行。当我们以单线程的方式进行消息消费时,如果生产者发送消息的速度大于消费者处理消息的速度,那么就会有越来越多的消息得不到及时的消费,造成了一定的延迟。此外,由于 Kafka 中消息保留机制的作用,有些消息有可能在被消费之前就被清理了,从而造成消息的丢失。所以我们可以通过多线程的方式来提高整体的消息消费能力。

1. 实现方案

1.1 一个线程一个实例

多线程的实现方式有多种,第一种也是最常见的方式:线程封闭,即为每个线程实例化一个 KafkaConsumer 对象,负责完整的消息获取、处理流程。具体如下图所示:
image.png
一个线程对应一个 KafkaConsumer 实例,我们可以称之为消费线程。一个消费线程可以消费一个或多个分区中的消息,所有的消费线程都隶属于同一个消费组。这种方式实现简单,并且多个线程之间彼此没有任何交互,省去了很多保障线程安全方面的开销。但这种实现方式的并发度受限于分区的实际个数。当消费线程的个数大于分区数时,就有部分消费线程一直处于空闲的状态。

代码示例:

  1. public static void main(String[] args) {
  2. Properties properties = new Properties();
  3. properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  4. properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  5. properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  6. properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group_demo");
  7. int consumerThreadNum = 4;
  8. for (int i = 0; i < consumerThreadNum; i++) {
  9. new KafkaConsumerThread(properties, Collections.singletonList("topic_demo")).start();
  10. }
  11. }
  12. private static class KafkaConsumerThread extends Thread {
  13. private final KafkaConsumer<String, String> consumer;
  14. public KafkaConsumerThread(Properties properties, Collection<String> topics) {
  15. this.consumer = new KafkaConsumer<>(properties);
  16. consumer.subscribe(topics);
  17. }
  18. @Override
  19. public void run() {
  20. try {
  21. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000));
  22. // do logic
  23. } catch (Exception e) {
  24. e.printStackTrace();
  25. } finally {
  26. consumer.close();
  27. }
  28. }
  29. }

内部类 KafkaConsumerThread 代表消费线程,其内部包裹着一个独立的 KafkaConsumer 实例,消费线程的数量由 consumerThreadNum 变量指定。一般一个主题的分区数事先可以知晓,可以将 consumerThreadNum 设置成不大于分区数的值,如果不知道主题的分区数也可以通过 KafkaConsumer 类的 partitionsFor() 方法来间接获取,进而再设置合理的线程数量。

上面这种多线程的实现方式和开启多个消费进程的方式没有本质上的区别,优点是每个线程可以按顺序消费各个分区的消息。缺点也很明显,每个消费线程都要维护一个独立的TCP连接,如果分区数和线程数的值都很大,那么会造成系统比较大的开销。

1.2 多线程执行消息消费

一般而言,poll() 拉取消息的速度是相当快的,而整体消费的瓶劲是在处理消息的这一块,如果通过一定的方式来改进这一部分,那么就能带动整体消费的性能提升。我们可以考虑如下实现方式:
image.png
消费者程序可以使用单个或多个 KafkaConsumer 实例(一个实例一个线程)来获取消息,同时创建多个消息消费线程执行消息处理逻辑。消息处理可以交由特定的线程池来做,从而实现消息获取与处理的解耦。

该方案将任务切分成了消息获取消息处理两个部分,分别由不同的线程处理它们。这种方案的最大优势就在于它的高伸缩性,我们可以独立地调节消息获取的线程数,以及消息处理的线程数。如果是消息拉取速度慢,那么增加消息拉取的线程数即可;如果是消息处理速度慢,那么增加 Handler 线程数即可。

不过这种方案因为将消息获取和消息处理分开了,也就是说获取某条消息的线程不是处理该消息的线程,因此对于消息的顺序处理就比较困难了。并且该方案使得整个消息消费链路被拉长,最终导致正确位移提交会变得异常困难,结果就是可能会出现消息的重复消费。

代码示例:

  1. public static void main(String[] args) {
  2. Properties properties = new Properties();
  3. properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  4. properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  5. properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  6. properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group_demo");
  7. KafkaConsumerThread consumerThread = new KafkaConsumerThread(properties,
  8. Collections.singletonList("topic_demo"), Runtime.getRuntime().availableProcessors());
  9. consumerThread.start();
  10. }
  11. // 消息拉取线程
  12. private static class KafkaConsumerThread extends Thread {
  13. private final KafkaConsumer<String, String> consumer;
  14. private final ExecutorService executorService;
  15. public KafkaConsumerThread(Properties properties, Collection<String> topics, int workThreadNum) {
  16. this.consumer = new KafkaConsumer<>(properties);
  17. this.consumer.subscribe(topics);
  18. this.executorService = new ThreadPoolExecutor(workThreadNum, workThreadNum, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
  19. }
  20. @Override
  21. public void run() {
  22. try {
  23. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000));
  24. if (!records.isEmpty()) {
  25. executorService.execute(new WorkRunnable(records));
  26. }
  27. } catch (Exception e) {
  28. e.printStackTrace();
  29. } finally {
  30. consumer.close();
  31. }
  32. }
  33. }
  34. // 消息处理线程
  35. private static class WorkRunnable implements Runnable {
  36. private final ConsumerRecords<String, String> records;
  37. public WorkRunnable(ConsumerRecords<String, String> records) {
  38. this.records = records;
  39. }
  40. @Override
  41. public void run() {
  42. for (ConsumerRecord<String, String> record : records) {
  43. // do logic
  44. }
  45. }
  46. }

上述代码中的 WorkRunnable 类是用来处理消息的,而 KafkaConsumerThread 类对应的是一个消费线程,里面通过线程池的方法来调用 WorkRunnable 处理一批批的消息。第二种实现方式相比第一种实现方式而言,除了横向扩展的能力,还可以减少 TCP 连接对系统资源的消耗,不过缺点就是对消息的顺序处理比较困难。

2. 位移提交

对于第一种实现方式而言,如果要做具体的位移提交,可以直接在 KafkaConsumerThread 中的 run() 方法里实现即可。而对于第二种实现方式,这里引入一个共享变量 offsets 来参与提交。
image.png
每一个处理消息的 WorkRunnable 类在处理完消息之后都将对应的消费位移保存到共享变量 offsets 中,KafkaConsumerThread 在每一次 poll() 方法后都读取 offsets 中的内容并对其进行位移提交。注意在实现的过程中对 offsets 读写需要加锁防止出现并发问题,并且在写入 offsets 时要注意位移覆盖的问题。

  1. private static class WorkRunnable implements Runnable {
  2. private final ConsumerRecords<String, String> records;
  3. public WorkRunnable(ConsumerRecords<String, String> records) {
  4. this.records = records;
  5. }
  6. @Override
  7. public void run() {
  8. // 按分区消费
  9. for (TopicPartition topicPartition : records.partitions()) {
  10. List<ConsumerRecord<String, String>> tpRecords = records.records(topicPartition);
  11. for (ConsumerRecord<String, String> tpRecord : tpRecords) {
  12. //
  13. }
  14. long lastConsumerOffset = tpRecords.get(tpRecords.size() - 1).offset();
  15. // offsets是共享变量,key为分区信息,value为位移信息
  16. synchronized (offsets) {
  17. if (!offsets.containsKey(topicPartition)) {
  18. offsets.put(topicPartition, new OffsetAndMetadata(lastConsumerOffset + 1));
  19. } else {
  20. // 避免位移覆盖
  21. long position = offsets.get(topicPartition).offset();
  22. if (position < lastConsumerOffset + 1) {
  23. offsets.put(topicPartition, new OffsetAndMetadata(lastConsumerOffset + 1));
  24. }
  25. }
  26. }
  27. }
  28. }
  29. }

上述代码的多线程实现方式性能高效,且能最大限度地保证数据不丢失,但这种位移提交的方式还是会有数据丢失的风险。对于同一个分区中的信息,假设第一个处理线程正在处理 offset 为 0~99 的消息,而另一个处理线程已经处理完成了 offset 为 100~199 的消息并进行了位移提交,此时如果第一个处理线程发生了异常,则之后的消费只能从 200 开始而无法再次消费 0~99 的消息,从而造成了消息丢失。

这里虽然针对位移覆盖做了一定的处理,但还没有解决异常情况下的位移覆盖问题,对此就要引入更加复杂的处理机制。这里再提供一种基于滑动窗口的解决思路。滑动窗口式的实现方式是:将拉取的消息暂存起来,多个消费线程可以拉取暂存的消息,这个用于暂存消息的缓存大小即为滑动窗口的大小,与上面通过 offsets 来记录消费位移的区别在于对消费位移的把控。
image.png
如图所示,每一个方格代表一个批次的消息,一个滑动窗口包含若干方格,startOffset 标注的是当前滑动窗口的起始位置,endOffset 标注的是末尾位置。每当 startOffset 指向的方格中的消息被消费完成,就可以提交这部分的位移,与此同时,窗口向前滑动一格,删除原来 startOffset 所指方格中对应的消息,并且拉取新的消息进入窗口。滑动窗口的大小固定,所以对应的用来暂存消息的缓存大小也就固定了。

方格大小和滑动窗口的大小同时决定了消费线程的并发数:一个方格对应一个消费线程,对于窗口大小固定的情况,方格越小并行度越高;对于方格大小固定的情况,窗口越大并行度越高。不过,若窗口设置得过大,不仅会增大内存的开销,而且在发生异常的情况下也会引起大量的重复消费。

如果一个方格内的消息无法被标记为消费完成,那么就会造成 startOffset 的悬停。为了使窗口能够继续向前滑动,那么就需要设定一个阈值,当 startOffset 悬停一定的时间后就对这部分消息进行本地重试消费,如果重试失败就转入重试队列,如果还不奏效就转入死信队列。如果需要消息高度可靠,也可以将无法进行业务逻辑的消息存入磁盘、数据库或 Kafka,然后继续消费下一条消息以保证整体消费进度合理推进,之后可以通过一个额外的处理任务来分析死信进而找出异常的原因。

消费者客户端参数配置

1. key.deserializer、value.deserializer

与生产者客户端中的 key.serializer 和 value.serializer 参数对应。消费者从 broker 端获取的消息格式都是字节数组(byte[])类型,所以需要执行相应的反序列化操作才能还原成原有的对象格式,这两个参数无默认值。注意这里必须填写序列化器的全限定名。

2. group.id

消费者隶属的消费者组的名称,默认为空字符串。如果设置为空,则会抛出异常。一般而言,这个参数需要设置成具有一定业务意义的名称。

3. fetch.min.bytes

该参数用来配置 Consumer 在一次拉取请求(调用 poll 方法)中能从 Kafka 中拉取的最小数据量,该参数默认是 1B。Kafka 在收到 Consumer 的拉取请求时,如果返回给 Consumer 的数据量小于该参数所配置的值的大小的话,它就需要进行等待,直到数据量满足这个参数的配置大小。一般可以适当调大这个参数的值以提高一定的吞吐量,不过也会造成额外的延迟。

4. fetch.max.bytes

该参数与 fatch.min.bytes 参数对应,它用来配置 Consumer 在一次拉取请求中从 Kafka 中能够拉取的最大数据量,默认值为 50(MB)。注意,该参数设定的不是绝对的最大值,如果在第一个非空分区中拉取的第一条消息大于该参数值,那么该消息仍然会返回,以确保消费者继续工作。

于此相关的,Kafka 中所能接收的最大消息的大小通过服务端参数 message.max.bytes 来设置。

5. fetch.max.wait.ms

这个参数也和 fetch.min.bytes 参数有关,如果 Kafka 仅仅参考 fatch.min.bytes 参数的要求,那么有可能会一直阻塞等待而无法发送响应给 Consumer,显然这是不合理的。fetch.max.wait.ms 参数用于指定 Kafka 的等待时间,默认是 500(ms)。如果 Kafka 中没有足够多的消息而满足不了 fetch.min.bytes 参数的要求,那么最终会等待 500 ms。这个参数的设定和 Consumer 与 Kafka 之间的延迟也有关系,如果业务应用对延迟敏感,那么可以适当调小这个参数。

6. max.poll.records

这个参数用来配置 Consumer 在一次拉取请求中拉取的最大消息数,默认值为 500 条。如果消息的大小都比较小,则可以适当调大这个参数值来提升一定的消费速度。

7. max.poll.interval.ms

当通过消费者组管理消费者时,该参数用来指定拉取消息线程最长的空闲时间,默认值为 5 分钟。如果超过这个时间间隔还没有发起 poll 操作,则消费者组认为该消费者已经离开了组,此时会进行重平衡操作。

8. request.timeout.ms

这个参数用来配置 Consumer 等待请求响应的最长时间,默认值为 30000(ms)。

9. session.timeout.ms

该参数表示 Consumer 在被认为死亡之前可以与服务器断开连接的时间,默认值是 10000(ms)。如果消费者没有在指定时间内发送心跳给群组协调器,那就会被认为是已经死亡,此时协调器就会触发重平衡,把它之前负责消费的分区分配给群组里的其他消费者。

10. heartbeat.interval.ms

当使用 kafka 的分组管理功能时,心跳到消费者协调器之间的预计时间。心跳用于确保消费者的会话保持活动状态,当有新消费者加入或离开组时方便重平衡。该值必须比 session.timeout.ms 参数值小,通常不高于 1/3。它可以调整得更低,以控制正常重新平衡的预期时间。

11. partition.assignment.strategy

分区会被分配给消费者组里的消费者,Kafka 有两个默认的分配策略:

  • Range:该策略会把主题的若干个连续的分区分配给消费者,可能分配不均匀。
  • RoundRobin:该策略把主题的所有分区逐个分配给消费者。一般如果所有消费者都订阅相同的主题,该策略会给所有消费者分配相同数量的分区(或最多就差一个分区)。