1. 引入依赖
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>kafka-producer-api</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>0.11.0.0</version></dependency></dependencies></project>
2. 发送数据
1)异步发送数据
package com.example.kafka.producer;import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** 需要用到的类:* KafkaProducer:* 需要创建一个生产者对象,用来发送数据* ProducerConfig:* 获取所需的一系列配置参数* ProducerRecord:* 每条数据都要封装成一个ProducerRecord对象*/public class TestProducerApi {public static void main(String[] args) {// 1. 创建配置信息Properties properties = new Properties();// 集群信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Hadoop102:9092, Hadoop103:9092, Hadoop104:9092");// 键序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 值序列化properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 应答机制properties.put(ProducerConfig.ACKS_CONFIG, "all");// 重试次数properties.put(ProducerConfig.RETRIES_CONFIG, 3);// 批量发送properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// 请求延迟properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 2. 创建生产者Producer<String, String> producer = new KafkaProducer<String, String>(properties);// 3. 准备数据String topic = "first";String value = "hello kafka";ProducerRecord record = new ProducerRecord(topic, value);// 4. 发送数据(异步)producer.send(record);// 5. 关闭生产者producer.close();}}
2)异步发送数据 + 回调函数
package com.example.kafka.producer;import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** 需要用到的类:* KafkaProducer:* 需要创建一个生产者对象,用来发送数据* ProducerConfig:* 获取所需的一系列配置参数* ProducerRecord:* 每条数据都要封装成一个ProducerRecord对象*/public class TestProducerApi {public static void main(String[] args) {// 1. 创建配置信息Properties properties = new Properties();// 集群信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Hadoop102:9092, Hadoop103:9092, Hadoop104:9092");// 键序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 值序列化properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 应答机制properties.put(ProducerConfig.ACKS_CONFIG, "all");// 重试次数properties.put(ProducerConfig.RETRIES_CONFIG, 3);// 批量发送properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// 请求延迟properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 2. 创建生产者Producer<String, String> producer = new KafkaProducer<String, String>(properties);// 3. 准备数据String topic = "first";String value = "hello kafka";ProducerRecord record = new ProducerRecord(topic, value);// 4. 发送数据(异步 + 回调)producer.send(record, new Callback() {// 回调方法@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception exception) {System.out.println("当前数据发送到的分区:" + recordMetadata.partition());System.out.println("当前数据的偏移量:" + recordMetadata.offset());}});// 5. 关闭生产者producer.close();}}
3)同步发送数据
package com.example.kafka.producer;import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** 需要用到的类:* KafkaProducer:* 需要创建一个生产者对象,用来发送数据* ProducerConfig:* 获取所需的一系列配置参数* ProducerRecord:* 每条数据都要封装成一个ProducerRecord对象*/public class TestProducerApi {public static void main(String[] args) {// 1. 创建配置信息Properties properties = new Properties();// 集群信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Hadoop102:9092, Hadoop103:9092, Hadoop104:9092");// 键序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 值序列化properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 应答机制properties.put(ProducerConfig.ACKS_CONFIG, "all");// 重试次数properties.put(ProducerConfig.RETRIES_CONFIG, 3);// 批量发送properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// 请求延迟properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 2. 创建生产者Producer<String, String> producer = new KafkaProducer<String, String>(properties);// 3. 准备数据String topic = "first";String value = "hello kafka";ProducerRecord record = new ProducerRecord(topic, value);// 4. 发送数据(同步)try {producer.send(record).get();}catch (Exception exception) {}// 5. 关闭生产者producer.close();}}
3. 指定分区
将数据发送到指定分区,保证数据一致性
1)方式一(分页号固定)
package com.example.kafka.producer;import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** 需要用到的类:* KafkaProducer:* 需要创建一个生产者对象,用来发送数据* ProducerConfig:* 获取所需的一系列配置参数* ProducerRecord:* 每条数据都要封装成一个ProducerRecord对象*/public class TestProducerApi {public static void main(String[] args) {// 1. 创建配置信息Properties properties = new Properties();// 集群信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Hadoop102:9092, Hadoop103:9092, Hadoop104:9092");// 键序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 值序列化properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 应答机制properties.put(ProducerConfig.ACKS_CONFIG, "all");// 重试次数properties.put(ProducerConfig.RETRIES_CONFIG, 3);// 批量发送properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// 请求延迟properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 2. 创建生产者Producer<String, String> producer = new KafkaProducer<String, String>(properties);// 3. 准备数据String topic = "first";String value = "hello kafka";// 指定分区ProducerRecord record = new ProducerRecord(topic, 1, null, value);// 4. 发送数据(异步 + 回调)producer.send(record, new Callback() {// 回调方法@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception exception) {System.out.println("当前数据发送到的分区:" + recordMetadata.partition());System.out.println("当前数据的偏移量:" + recordMetadata.offset());}});// 5. 关闭生产者producer.close();}}
2)方式二(分区号不固定)
首先:通过 PARTITIONER_CLASS_CONFIG 属性指定 Partitioner 接口的实现类
package com.example.kafka.producer;import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** 需要用到的类:* KafkaProducer:* 需要创建一个生产者对象,用来发送数据* ProducerConfig:* 获取所需的一系列配置参数* ProducerRecord:* 每条数据都要封装成一个ProducerRecord对象*/public class TestProducerApi {public static void main(String[] args) {// 1. 创建配置信息Properties properties = new Properties();// 集群信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Hadoop102:9092, Hadoop103:9092, Hadoop104:9092");// 键序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 值序列化properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 应答机制properties.put(ProducerConfig.ACKS_CONFIG, "all");// 重试次数properties.put(ProducerConfig.RETRIES_CONFIG, 3);// 批量发送properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// 请求延迟properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 指定分区properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());// 2. 创建生产者Producer<String, String> producer = new KafkaProducer<String, String>(properties);// 3. 准备数据String topic = "first";String value = "hello kafka";// 指定分区ProducerRecord record = new ProducerRecord(topic, value);// 4. 发送数据(异步 + 回调)producer.send(record, new Callback() {// 回调方法@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception exception) {System.out.println("当前数据发送到的分区:" + recordMetadata.partition());System.out.println("当前数据的偏移量:" + recordMetadata.offset());}});// 5. 关闭生产者producer.close();}}
其次:创建一个 Partitioner 实现类,并重写其中的 partition() 方法,指定分区号
package com.example.kafka.producer;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map;public class MyPartitioner implements Partitioner {@Overridepublic int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {return 2;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}}
