本文介绍kafka-SDK的消费者消费消息示例的使用

准备配置

安装Java依赖库

在pom.xml中添加以下依赖。

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>2.8.0</version>
  5. </dependency>

使用示例

推荐使用安服优的AfuKafkaConfig默认配置,需要先引入相关sdk详情可参照:SDK下载与使用,可以直接使用以下示例进行单Consumer订阅消息。
以下是创建单Consumer订阅消息程序AfuKafkaConsumerDemo.java。

  1. import com.afu.common.sdk.config.AfuKafkaConfig;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import org.apache.kafka.clients.producer.ProducerConfig;
  7. import java.time.Duration;
  8. import java.util.*;
  9. public class AfuKafkaConsumerDemo {
  10. public static void main(String args[]) {
  11. AfuKafkaConfig kafkaConfig=new AfuKafkaConfig();
  12. Properties props = new Properties();
  13. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBOOTSTRAP_SERVERS_CONFIG());
  14. //两次Poll之间的最大允许间隔。
  15. //消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance。
  16. //(AfuKafkaConfig配置为20(s))
  17. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfig.getSESSION_TIMEOUT_MS_CONFIG());
  18. //每次Poll的最大数量。
  19. //注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。
  20. //(AfuKafkaConfig配置为20)
  21. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConfig.getMAX_POLL_RECORDS_CONFIG());
  22. //消息的反序列化方式。
  23. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaConfig.getKEY_DESERIALIZER_CLASS_CONFIG());
  24. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaConfig.getVALUE_DESERIALIZER_CLASS_CONFIG());
  25. //当前消费实例所属的消费组,请在控制台申请之后填写。
  26. //属于同一个组的消费实例,会负载消费消息。
  27. //(AfuKafkaConfig配置为"default_consumer_group")
  28. props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGROUP_ID_CONFIG());
  29. //构造消费对象,也即生成一个消费实例。
  30. KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
  31. //设置消费组订阅的消费组id(groupId),可以订阅多个。
  32. //如果GROUP_ID_CONFIG是一样,则订阅的groupId也建议设置成一样。
  33. List<String> subscribedGroupIds = new ArrayList<>();
  34. //如果需要订阅多个GroupId,则在这里添加进去即可。
  35. //每个GroupId需要先在安服优物联云平台进行创建,您可以查看后再填入。
  36. subscribedGroupIds.add("TestGroupId");
  37. consumer.subscribe(subscribedGroupIds);
  38. //循环消费消息。
  39. while (true){
  40. try {
  41. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  42. //必须在下次Poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG。
  43. //建议开一个单独的线程池来消费消息,然后异步返回结果。
  44. for (ConsumerRecord<String, String> record : records) {
  45. System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset())+"value:"+record.value());
  46. }
  47. } catch (Exception e) {
  48. try {
  49. Thread.sleep(1000);
  50. } catch (Throwable ignore) {
  51. }
  52. e.printStackTrace();
  53. }
  54. }
  55. }
  56. }

当生产者发布消息后,该demo将进行单Consumer的消费