1. 引入依赖

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>org.example</groupId>
  7. <artifactId>kafka-producer-api</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <maven.compiler.source>8</maven.compiler.source>
  11. <maven.compiler.target>8</maven.compiler.target>
  12. </properties>
  13. <dependencies>
  14. <dependency>
  15. <groupId>org.apache.kafka</groupId>
  16. <artifactId>kafka-clients</artifactId>
  17. <version>0.11.0.0</version>
  18. </dependency>
  19. <dependency>
  20. <groupId>org.apache.kafka</groupId>
  21. <artifactId>kafka_2.12</artifactId>
  22. <version>0.11.0.0</version>
  23. </dependency>
  24. </dependencies>
  25. </project>

2. 消费数据

1)高级 API 消费数据

  1. package com.example.kafka.producer;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import org.apache.kafka.common.serialization.StringDeserializer;
  7. import java.util.Arrays;
  8. import java.util.Iterator;
  9. import java.util.Properties;
  10. public class SeniorConsumer {
  11. public static void main(String[] args) {
  12. // 创建配置对象
  13. Properties properties = new Properties();
  14. // 集群信息
  15. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "Hadoop102:9092, Hadoop103:9092, Hadoop104:9092");
  16. // 键序列化
  17. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  18. // 值序列化
  19. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  20. // 消费者组
  21. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "terry");
  22. // 自动确认 offset
  23. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
  24. // 自动确认 offset 时间间隔
  25. properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
  26. // 创建消费者
  27. KafkaConsumer<String ,String> consumer = new KafkaConsumer<String, String>(properties);
  28. // 订阅主题
  29. consumer.subscribe(Arrays.asList("first"));
  30. while (true) {
  31. // 拉取数据
  32. ConsumerRecords<String, String> records = consumer.poll(1);
  33. Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
  34. for (ConsumerRecord record : records) {
  35. // 打印数据
  36. System.out.println(record);
  37. }
  38. }
  39. }
  40. }

2)低级 API 消费数据

  1. https://blog.csdn.net/brooks1024/article/details/104906077?utm_medium=distribute.pc_aggpage_search_result.none-task-blog-2~aggregatepage~first_rank_v2~rank_aggregation-1-104906077.pc_agg_rank_aggregation&utm_term=kafka+%E4%BD%8E%E7%BA%A7%E6%B6%88%E8%B4%B9%E8%80%85&spm=1000.2123.3001.4430

3. 知识点排查

https://www.cnblogs.com/bainianminguo/p/12247158.html