概念

Consumer消费数据时的可靠性是很容易保证的,因为数据在Kafka中是持久化的,故不用担心数据丢失问题。
由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。
所以offset的维护是Consumer消费数据是必须考虑的问题。

依赖

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>0.11.0.0</version>
  5. </dependency>

**

需要用到的类:


KafkaConsumer:需要创建一个消费者对象,用来消费数据
ConsumerConfig:获取所需的一系列配置参数
ConsuemrRecord**:每条数据都要封装成一个ConsumerRecord对象

自动提交offset

为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。
自动提交offset的相关参数:

消费者消费到了什么地方,这件事情是由Kafka自己维护的.我们consumer只需要关注自己逻辑即可,这个offset是kafka每隔一段时间内部topic提交一份offset

这种提交方式的问题:

就是采用提交的方式,假如说设置5秒自动提交一次,当你消费了3秒之后Kafka挂掉了,生产者长时间没有收到ack会重新发送,这时候由于重新选leader之后,重新接收数据, 这样就会出现数据重复的情况(重复消费.).
解决方式: 你可以在consumer业务代码手动去重,比如说用redis去重复等等.当然还有别的方式

enable.auto.commit:是否开启自动提交offset功能
auto.commit.interval.ms:自动提交offset的时间间隔

  1. package com.consumer;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.apache.kafka.clients.consumer.ConsumerRecords;
  4. import org.apache.kafka.clients.consumer.KafkaConsumer;
  5. import java.util.Arrays;
  6. import java.util.Properties;
  7. public class CustomConsumer {
  8. public static void main(String[] args) {
  9. Properties props = new Properties();
  10. props.put("bootstrap.servers", "zjj101:9092");
  11. props.put("group.id", "test");
  12. //enable.auto.commit:是否开启自动提交offset功能
  13. props.put("enable.auto.commit", "true");
  14. //auto.commit.interval.ms:自动提交offset的时间间隔
  15. props.put("auto.commit.interval.ms", "1000");
  16. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  17. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  18. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  19. //订阅 名字为 first的 topic ,可以订阅好几个topic
  20. consumer.subscribe(Arrays.asList("first"));
  21. while (true) {
  22. //拉取时间,如果拉取时间超过这个时间还没拉取出来数据就算拉失败了
  23. ConsumerRecords<String, String> records = consumer.poll(100);
  24. for (ConsumerRecord<String, String> record : records) {
  25. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  26. //输出: offset = 400, key = 0, value = 0
  27. }
  28. }
  29. }
  30. }

手动同步提交offset

虽然自动提交offset十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。

同步提交慢,但是安全,异步提交快,但是不安全.

1)手动同步提交offset

每次消费完一批数据就手动给ack提交一次.

由于同步提交offset有失败重试机制,故更加可靠,以下为同步提交offset的示例。

props.put(“enable.auto.commit”, “false”);
调用commitSync()方法提交.

  1. package com.consumer;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.apache.kafka.clients.consumer.ConsumerRecords;
  4. import org.apache.kafka.clients.consumer.KafkaConsumer;
  5. import java.util.Arrays;
  6. import java.util.Properties;
  7. public class CustomComsumer2 {
  8. public static void main(String[] args) {
  9. Properties props = new Properties();
  10. //Kafka集群
  11. props.put("bootstrap.servers", "zjj101:9092");
  12. //消费者组,只要group.id相同,就属于同一个消费者组
  13. props.put("group.id", "test");
  14. props.put("enable.auto.commit", "false");//关闭自动提交offset
  15. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  16. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  17. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  18. consumer.subscribe(Arrays.asList("first"));//消费者订阅主题
  19. while (true) {
  20. //消费者拉取数据
  21. ConsumerRecords<String, String> records = consumer.poll(100);
  22. for (ConsumerRecord<String, String> record : records) {
  23. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  24. }
  25. //同步提交,当前线程会阻塞直到offset提交成功
  26. consumer.commitSync();
  27. }
  28. }
  29. }

2)异步提交offset

虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交offset的方式。
以下为异步提交offset的示例:

  1. package com.consumer;
  2. import org.apache.kafka.clients.consumer.*;
  3. import org.apache.kafka.common.TopicPartition;
  4. import java.util.Arrays;
  5. import java.util.Map;
  6. import java.util.Properties;
  7. public class CustomConsumer3 {
  8. public static void main(String[] args) {
  9. Properties props = new Properties();
  10. //Kafka集群
  11. props.put("bootstrap.servers", "zjj101:9092");
  12. //消费者组,只要group.id相同,就属于同一个消费者组
  13. props.put("group.id", "test");
  14. //关闭自动提交offset
  15. props.put("enable.auto.commit", "false");
  16. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  17. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  18. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  19. consumer.subscribe(Arrays.asList("first"));//消费者订阅主题
  20. while (true) {
  21. ConsumerRecords<String, String> records = consumer.poll(100);//消费者拉取数据
  22. for (ConsumerRecord<String, String> record : records) {
  23. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  24. }
  25. //异步提交
  26. consumer.commitAsync(new OffsetCommitCallback() {
  27. @Override
  28. public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
  29. if (exception != null) {
  30. System.err.println("Commit failed for" + offsets);
  31. }
  32. }
  33. });
  34. }
  35. }
  36. }