1. 引入依赖

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>org.example</groupId>
  7. <artifactId>kafka-producer-api</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <maven.compiler.source>8</maven.compiler.source>
  11. <maven.compiler.target>8</maven.compiler.target>
  12. </properties>
  13. <dependencies>
  14. <dependency>
  15. <groupId>org.apache.kafka</groupId>
  16. <artifactId>kafka-clients</artifactId>
  17. <version>0.11.0.0</version>
  18. </dependency>
  19. <dependency>
  20. <groupId>org.apache.kafka</groupId>
  21. <artifactId>kafka_2.12</artifactId>
  22. <version>0.11.0.0</version>
  23. </dependency>
  24. </dependencies>
  25. </project>

2. 发送数据

1)异步发送数据

  1. package com.example.kafka.producer;
  2. import org.apache.kafka.clients.producer.*;
  3. import org.apache.kafka.common.serialization.StringSerializer;
  4. import java.util.Properties;
  5. /**
  6. * 需要用到的类:
  7. * KafkaProducer:
  8. * 需要创建一个生产者对象,用来发送数据
  9. * ProducerConfig:
  10. * 获取所需的一系列配置参数
  11. * ProducerRecord:
  12. * 每条数据都要封装成一个ProducerRecord对象
  13. */
  14. public class TestProducerApi {
  15. public static void main(String[] args) {
  16. // 1. 创建配置信息
  17. Properties properties = new Properties();
  18. // 集群信息
  19. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Hadoop102:9092, Hadoop103:9092, Hadoop104:9092");
  20. // 键序列化
  21. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  22. // 值序列化
  23. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  24. // 应答机制
  25. properties.put(ProducerConfig.ACKS_CONFIG, "all");
  26. // 重试次数
  27. properties.put(ProducerConfig.RETRIES_CONFIG, 3);
  28. // 批量发送
  29. properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
  30. // 请求延迟
  31. properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  32. // 2. 创建生产者
  33. Producer<String, String> producer = new KafkaProducer<String, String>(properties);
  34. // 3. 准备数据
  35. String topic = "first";
  36. String value = "hello kafka";
  37. ProducerRecord record = new ProducerRecord(topic, value);
  38. // 4. 发送数据(异步)
  39. producer.send(record);
  40. // 5. 关闭生产者
  41. producer.close();
  42. }
  43. }

2)异步发送数据 + 回调函数

  1. package com.example.kafka.producer;
  2. import org.apache.kafka.clients.producer.*;
  3. import org.apache.kafka.common.serialization.StringSerializer;
  4. import java.util.Properties;
  5. /**
  6. * 需要用到的类:
  7. * KafkaProducer:
  8. * 需要创建一个生产者对象,用来发送数据
  9. * ProducerConfig:
  10. * 获取所需的一系列配置参数
  11. * ProducerRecord:
  12. * 每条数据都要封装成一个ProducerRecord对象
  13. */
  14. public class TestProducerApi {
  15. public static void main(String[] args) {
  16. // 1. 创建配置信息
  17. Properties properties = new Properties();
  18. // 集群信息
  19. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Hadoop102:9092, Hadoop103:9092, Hadoop104:9092");
  20. // 键序列化
  21. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  22. // 值序列化
  23. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  24. // 应答机制
  25. properties.put(ProducerConfig.ACKS_CONFIG, "all");
  26. // 重试次数
  27. properties.put(ProducerConfig.RETRIES_CONFIG, 3);
  28. // 批量发送
  29. properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
  30. // 请求延迟
  31. properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  32. // 2. 创建生产者
  33. Producer<String, String> producer = new KafkaProducer<String, String>(properties);
  34. // 3. 准备数据
  35. String topic = "first";
  36. String value = "hello kafka";
  37. ProducerRecord record = new ProducerRecord(topic, value);
  38. // 4. 发送数据(异步 + 回调)
  39. producer.send(record, new Callback() {
  40. // 回调方法
  41. @Override
  42. public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
  43. System.out.println("当前数据发送到的分区:" + recordMetadata.partition());
  44. System.out.println("当前数据的偏移量:" + recordMetadata.offset());
  45. }
  46. });
  47. // 5. 关闭生产者
  48. producer.close();
  49. }
  50. }

3)同步发送数据

  1. package com.example.kafka.producer;
  2. import org.apache.kafka.clients.producer.*;
  3. import org.apache.kafka.common.serialization.StringSerializer;
  4. import java.util.Properties;
  5. /**
  6. * 需要用到的类:
  7. * KafkaProducer:
  8. * 需要创建一个生产者对象,用来发送数据
  9. * ProducerConfig:
  10. * 获取所需的一系列配置参数
  11. * ProducerRecord:
  12. * 每条数据都要封装成一个ProducerRecord对象
  13. */
  14. public class TestProducerApi {
  15. public static void main(String[] args) {
  16. // 1. 创建配置信息
  17. Properties properties = new Properties();
  18. // 集群信息
  19. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Hadoop102:9092, Hadoop103:9092, Hadoop104:9092");
  20. // 键序列化
  21. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  22. // 值序列化
  23. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  24. // 应答机制
  25. properties.put(ProducerConfig.ACKS_CONFIG, "all");
  26. // 重试次数
  27. properties.put(ProducerConfig.RETRIES_CONFIG, 3);
  28. // 批量发送
  29. properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
  30. // 请求延迟
  31. properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  32. // 2. 创建生产者
  33. Producer<String, String> producer = new KafkaProducer<String, String>(properties);
  34. // 3. 准备数据
  35. String topic = "first";
  36. String value = "hello kafka";
  37. ProducerRecord record = new ProducerRecord(topic, value);
  38. // 4. 发送数据(同步)
  39. try {
  40. producer.send(record).get();
  41. }catch (Exception exception) {
  42. }
  43. // 5. 关闭生产者
  44. producer.close();
  45. }
  46. }

3. 指定分区

将数据发送到指定分区,保证数据一致性

1)方式一(分页号固定)

  1. package com.example.kafka.producer;
  2. import org.apache.kafka.clients.producer.*;
  3. import org.apache.kafka.common.serialization.StringSerializer;
  4. import java.util.Properties;
  5. /**
  6. * 需要用到的类:
  7. * KafkaProducer:
  8. * 需要创建一个生产者对象,用来发送数据
  9. * ProducerConfig:
  10. * 获取所需的一系列配置参数
  11. * ProducerRecord:
  12. * 每条数据都要封装成一个ProducerRecord对象
  13. */
  14. public class TestProducerApi {
  15. public static void main(String[] args) {
  16. // 1. 创建配置信息
  17. Properties properties = new Properties();
  18. // 集群信息
  19. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Hadoop102:9092, Hadoop103:9092, Hadoop104:9092");
  20. // 键序列化
  21. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  22. // 值序列化
  23. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  24. // 应答机制
  25. properties.put(ProducerConfig.ACKS_CONFIG, "all");
  26. // 重试次数
  27. properties.put(ProducerConfig.RETRIES_CONFIG, 3);
  28. // 批量发送
  29. properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
  30. // 请求延迟
  31. properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  32. // 2. 创建生产者
  33. Producer<String, String> producer = new KafkaProducer<String, String>(properties);
  34. // 3. 准备数据
  35. String topic = "first";
  36. String value = "hello kafka";
  37. // 指定分区
  38. ProducerRecord record = new ProducerRecord(topic, 1, null, value);
  39. // 4. 发送数据(异步 + 回调)
  40. producer.send(record, new Callback() {
  41. // 回调方法
  42. @Override
  43. public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
  44. System.out.println("当前数据发送到的分区:" + recordMetadata.partition());
  45. System.out.println("当前数据的偏移量:" + recordMetadata.offset());
  46. }
  47. });
  48. // 5. 关闭生产者
  49. producer.close();
  50. }
  51. }

2)方式二(分区号不固定)

首先:通过 PARTITIONER_CLASS_CONFIG 属性指定 Partitioner 接口的实现类

  1. package com.example.kafka.producer;
  2. import org.apache.kafka.clients.producer.*;
  3. import org.apache.kafka.common.serialization.StringSerializer;
  4. import java.util.Properties;
  5. /**
  6. * 需要用到的类:
  7. * KafkaProducer:
  8. * 需要创建一个生产者对象,用来发送数据
  9. * ProducerConfig:
  10. * 获取所需的一系列配置参数
  11. * ProducerRecord:
  12. * 每条数据都要封装成一个ProducerRecord对象
  13. */
  14. public class TestProducerApi {
  15. public static void main(String[] args) {
  16. // 1. 创建配置信息
  17. Properties properties = new Properties();
  18. // 集群信息
  19. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Hadoop102:9092, Hadoop103:9092, Hadoop104:9092");
  20. // 键序列化
  21. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  22. // 值序列化
  23. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  24. // 应答机制
  25. properties.put(ProducerConfig.ACKS_CONFIG, "all");
  26. // 重试次数
  27. properties.put(ProducerConfig.RETRIES_CONFIG, 3);
  28. // 批量发送
  29. properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
  30. // 请求延迟
  31. properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  32. // 指定分区
  33. properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
  34. // 2. 创建生产者
  35. Producer<String, String> producer = new KafkaProducer<String, String>(properties);
  36. // 3. 准备数据
  37. String topic = "first";
  38. String value = "hello kafka";
  39. // 指定分区
  40. ProducerRecord record = new ProducerRecord(topic, value);
  41. // 4. 发送数据(异步 + 回调)
  42. producer.send(record, new Callback() {
  43. // 回调方法
  44. @Override
  45. public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
  46. System.out.println("当前数据发送到的分区:" + recordMetadata.partition());
  47. System.out.println("当前数据的偏移量:" + recordMetadata.offset());
  48. }
  49. });
  50. // 5. 关闭生产者
  51. producer.close();
  52. }
  53. }

其次:创建一个 Partitioner 实现类,并重写其中的 partition() 方法,指定分区号

  1. package com.example.kafka.producer;
  2. import org.apache.kafka.clients.producer.Partitioner;
  3. import org.apache.kafka.common.Cluster;
  4. import java.util.Map;
  5. public class MyPartitioner implements Partitioner {
  6. @Override
  7. public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
  8. return 2;
  9. }
  10. @Override
  11. public void close() {
  12. }
  13. @Override
  14. public void configure(Map<String, ?> map) {
  15. }
  16. }