1. 引入依赖
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>kafka-producer-api</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>0.11.0.0</version> </dependency> </dependencies></project>
2. 消费数据
1)高级 API 消费数据
package com.example.kafka.producer;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Arrays;import java.util.Iterator;import java.util.Properties;public class SeniorConsumer { public static void main(String[] args) { // 创建配置对象 Properties properties = new Properties(); // 集群信息 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "Hadoop102:9092, Hadoop103:9092, Hadoop104:9092"); // 键序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 值序列化 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "terry"); // 自动确认 offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 自动确认 offset 时间间隔 properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); // 创建消费者 KafkaConsumer<String ,String> consumer = new KafkaConsumer<String, String>(properties); // 订阅主题 consumer.subscribe(Arrays.asList("first")); while (true) { // 拉取数据 ConsumerRecords<String, String> records = consumer.poll(1); Iterator<ConsumerRecord<String, String>> iterator = records.iterator(); for (ConsumerRecord record : records) { // 打印数据 System.out.println(record); } } }}
2)低级 API 消费数据
https://blog.csdn.net/brooks1024/article/details/104906077?utm_medium=distribute.pc_aggpage_search_result.none-task-blog-2~aggregatepage~first_rank_v2~rank_aggregation-1-104906077.pc_agg_rank_aggregation&utm_term=kafka+%E4%BD%8E%E7%BA%A7%E6%B6%88%E8%B4%B9%E8%80%85&spm=1000.2123.3001.4430
3. 知识点排查
https://www.cnblogs.com/bainianminguo/p/12247158.html