学习链接:https://www.bilibili.com/video/BV1vr4y1677k?p=10&spm_id_from=pageDriver&vd_source=b9e4f35102d61e6d02e0a5e1bbfea480


1 生产者消息发送流程

1.1 发送原理

QQ截图20220701173651.png
将生产者的数据导入集群的流程:

  1. 在main线程中创建Producer对象,调用send方法发送数据,根据业务需求设置拦截器Interceptors,通过序列化器对数据进行序列化,用分区器规定数据发往哪个分区
  2. 发送到分区实际上是发送到某个缓存队列里,队列默认32m,每批16k(双端队列)
  3. sender线程主动拉取数据,拉取数据有两个条件:
    1. batch.size:数据积累到batch.size之后,sender才会发送数据,默认16k
    2. linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就发送数据,单位ms,默认值是0ms,表示没有延迟

以上两个条件满足一个就可以发送数据

  1. 每一个队列一个节点,发送时,如果没有应答,最多缓存5个请求
  2. 通过Selector打通底层链路,集群收到后进行副本同步,同步完应答,应答有3种级别:
    1. 0 生产者发送的数据,不需要等数据落盘应答
    2. 1 生产者发送的数据,Leader收到数据后应答
    3. -1(all) 生产者发送的数据,Leader和ISR队列里的所有节点收齐数据后应答
  3. 应答成功,清楚对应请求,同时清理分区数据
  4. 应答失败,重试

    1.2 生产者重要参数列表

    | 参数名称 | 描述 | | —- | —- | | bootstrap.servers | 生产者连接集群所需的broker地 址 清 单。例如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置 1 个或者多个,中间用逗号隔开。这里并非需要所有的 broker 地址,因为生产者从给定的 broker 里查找到其他 broker 信息。 | | key.serializer 和 value.serializer | 指定发送消息的 key 和 value 的序列化类型。一定要写全类名。 | | buffer.memory | RecordAccumulator 缓冲区总大小,默认32m。 | | batch.size | 缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 | | linger.ms | 如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。 | | acks | 0:生产者发送过来的数据,不需要等数据落盘应答。
    1:生产者发送过来的数据,Leader 收到数据后应答。
    -1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和all 是等价的 | | max.in.flight.requests.per.connection | 允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字 | | retries | 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是 int 最大值,2147483647。
    如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1
    否则在重试此失败消息的时候,其他的消息可能发送成功了 | | retry.backoff.ms | 两次重试之间的时间间隔,默认是 100ms。 | | enable.idempotence | 是否开启幂等性,默认 true,开启幂等性 | | compression.type | 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。
    支持压缩类型:none、gzip、snappy、lz4 和 zstd。 |

2 异步发送API

2.1 普通异步发送

创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker
导入依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.kafka</groupId>
  4. <artifactId>kafka-clients</artifactId>
  5. <version>3.0.0</version>
  6. </dependency>
  7. </dependencies>
  1. public class CustomProducer {
  2. public static void main(String[] args) {
  3. // 0. 配置
  4. Properties properties = new Properties();
  5. // 连接集群
  6. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
  7. // 指定对应的key和value的序列化类型 下面两种等价
  8. // properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  9. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  10. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  11. // 1. 创建kafka生产者对象
  12. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  13. // 2. 发送数据
  14. for (int i = 0; i < 5; i++) {
  15. kafkaProducer.send(new ProducerRecord<>("first", "hello" + i));
  16. }
  17. // 3. 关闭资源
  18. kafkaProducer.close();
  19. }
  20. }

QQ截图20220701194153.png

2.2 带回调函数的异步发送

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

  1. public class CustomProducerCallback {
  2. public static void main(String[] args) {
  3. // 0. 配置
  4. Properties properties = new Properties();
  5. // 连接集群
  6. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
  7. // 指定对应的key和value的序列化类型 下面两种等价
  8. // properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  9. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  10. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  11. // 1. 创建kafka生产者对象
  12. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  13. // 2. 发送数据
  14. for (int i = 0; i < 5; i++) {
  15. kafkaProducer.send(new ProducerRecord<>("first", "hello" + i), new Callback() {
  16. @Override
  17. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  18. if (e == null) {
  19. System.out.println("主题:" + recordMetadata.topic() + " 分区:" + recordMetadata.partition());
  20. }
  21. }
  22. });
  23. }
  24. // 3. 关闭资源
  25. kafkaProducer.close();
  26. }
  27. }

QQ截图20220701195318.png

3 同步发送API

在异步发送的基础上,调用一下get()方法。

  1. public class CustomProducerSync {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. // 0. 配置
  4. Properties properties = new Properties();
  5. // 连接集群
  6. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
  7. // 指定对应的key和value的序列化类型 下面两种等价
  8. // properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  9. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  10. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  11. // 1. 创建kafka生产者对象
  12. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  13. // 2. 发送数据
  14. for (int i = 0; i < 5; i++) {
  15. kafkaProducer.send(new ProducerRecord<>("first", "hello" + i)).get();
  16. }
  17. // 3. 关闭资源
  18. kafkaProducer.close();
  19. }
  20. }

4 生产者分区

4.1 分区的好处

  1. 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
  2. 提高并行度,生产者可以以分区为单位发送数据,消费者可以以分区为单位消费数据。

    4.2 生产者发送消息的分区策略

  3. 默认的分区器DefaultPartitioner

QQ截图20220703201546.png
QQ截图20220703212253.png

  1. public class CustomProducerCallbackPartitions {
  2. public static void main(String[] args) {
  3. // 0. 配置
  4. Properties properties = new Properties();
  5. // 连接集群
  6. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
  7. // 指定对应的key和value的序列化类型 下面两种等价
  8. // properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  9. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  10. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  11. // 关联自定义分区器
  12. properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.example.kafka.producer.MyPartitioner");
  13. // 1. 创建kafka生产者对象
  14. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  15. // 2. 发送数据
  16. for (int i = 0; i < 5; i++) {
  17. kafkaProducer.send(new ProducerRecord<>("first", "b","pppppp" + i), new Callback() {
  18. @Override
  19. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  20. if (e == null) {
  21. System.out.println("主题:" + recordMetadata.topic() + " 分区:" + recordMetadata.partition());
  22. }
  23. }
  24. });
  25. }
  26. // 3. 关闭资源
  27. kafkaProducer.close();
  28. }
  29. }

4.3 自定义分区器

  1. 需求:发送的数据如果包含hello,就发往0号分区,不包含,就发往1号分区
  2. 实现步骤

    1. 定义类实现Partitioner接口
    2. 重写partition()方法

      1. public class MyPartitioner implements Partitioner {
      2. @Override
      3. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
      4. // 获取数据
      5. String msgValues = value.toString();
      6. int partition;
      7. if (msgValues.contains("hello")) {
      8. partition = 0;
      9. } else {
      10. partition = 1;
      11. }
      12. return partition;
      13. }
      14. @Override
      15. public void close() {
      16. }
      17. @Override
      18. public void configure(Map<String, ?> configs) {
      19. }
      20. }

      ```java public class CustomProducerCallbackPartitions { public static void main(String[] args) { // 0. 配置 Properties properties = new Properties(); // 连接集群 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “hadoop102:9092,hadoop103:9092”);

      // 指定对应的key和value的序列化类型 下面两种等价 // properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, “org.apache.kafka.common.serialization.StringSerializer”); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

      // 关联自定义分区器 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, “com.example.kafka.producer.MyPartitioner”); // 1. 创建kafka生产者对象 KafkaProducer kafkaProducer = new KafkaProducer<>(properties);

      // 2. 发送数据 for (int i = 0; i < 5; i++) {

      1. kafkaProducer.send(new ProducerRecord<>("first", "b","pppppp" + i), new Callback() {
      2. @Override
      3. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
      4. if (e == null) {
      5. System.out.println("主题:" + recordMetadata.topic() + " 分区:" + recordMetadata.partition());
      6. }
      7. }
      8. });

      }

      // 3. 关闭资源 kafkaProducer.close(); } }

<a name="kQGX6"></a>
# 5 生产者提高吞吐量

- batch.size:批次大小,默认16k 
- linger.ms:等待时间,修改为5-100ms
- compression.type:压缩snappy
- RecordAccumulator:缓冲区大小,修改为64m
```java
public class CustomProducerParameters {
    public static void main(String[] args) {
        // 0. 配置信息
        Properties properties = new Properties();

        // 连接kafka集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
        // 序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 缓冲区大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 32m

        // 批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

        // linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

        // 压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

        // 1. 创建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2. 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "hello" + i));
        }

        // 3. 关闭资源
        kafkaProducer.close();
    }
}

6 数据可靠性

  1. ack应答原理
    1. 0

QQ截图20220703212915.png

  1. 1

QQ截图20220703212933.png

  1. -1(all)

QQ截图20220703212950.png

  • Leader收到数据,所有Follower都开始同步数据,但有一个Follower,因为某种故障,迟迟不能与Leader进行同步问题

Leader维护了一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。
如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。例如2超时,(leader:0, isr:0,1)。
这样就不用等长期联系不上或者已经故障的节点

  • 数据可靠性分析

如果分区副本设置为1个,或 者ISR里应答的最小副本数量( min.insync.replicas 默认为1)设置为1,和ack=1的效果是一样的,仍然有丢数的风险(leader:0,isr:0)。

  • 数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
  • 数据重复分析

QQ截图20220703213243.png

public class CustomProducerAcks {
    public static void main(String[] args) {
        // 0. 配置
        Properties properties = new Properties();
        // 连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");

        // 指定对应的key和value的序列化类型 下面两种等价
        // properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // acks
        properties.put(ProducerConfig.ACKS_CONFIG, "1");

        // 重试次数retries,默认是int最大值 2147483647
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);

        // 1. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2. 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "hello" + i));
        }

        // 3. 关闭资源
        kafkaProducer.close();
    }
}

7 数据去重

7.1 数据传递语义

  • 至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
  • 最多一次(At Most Once)= ACK级别设置为0
  • 总结:

At Least Once可以保证数据不丢失,但是不能保证数据不重复;
At Most Once可以保证数据不重复,但是不能保证数据不丢失。

  • 精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。

    7.2 幂等性

  1. 幂等性原理

幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。
重复数据的判断标准:具有相同主键的消息提交时,Broker只会持久化一条。其 中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。
所以幂等性只能保证的是在单分区单会话内不重复。

  1. 如何使用幂等性:开启参数enable.idempotence 默认为 true,false 关闭

    7.3 生产者事务

  2. Kafka事务原理

QQ截图20220703213612.png

  1. Kafka的事务一共有5个API

    // 1 初始化事务
    void initTransactions();
    // 2 开启事务
    void beginTransaction() throws ProducerFencedException;
    // 3 在事务内提交已经消费的偏移量(主要用于消费者)
    void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
    String consumerGroupId) throws 
    ProducerFencedException;
    // 4 提交事务
    void commitTransaction() throws ProducerFencedException;
    // 5 放弃事务(类似于回滚事务的操作)
    void abortTransaction() throws ProducerFencedException;
    
    public class CustomProducerTransactions {
     public static void main(String[] args) {
         // 0. 配置
         Properties properties = new Properties();
         // 连接集群
         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
    
         // 指定对应的key和value的序列化类型 下面两种等价
         // properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
         // 指定事务id
         properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_01");
    
         // 1. 创建kafka生产者对象
         KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
         kafkaProducer.initTransactions();
         kafkaProducer.beginTransaction();
         try {
             // 2. 发送数据
             for (int i = 0; i < 5; i++) {
                 kafkaProducer.send(new ProducerRecord<>("first", "hello" + i));
             }
             kafkaProducer.commitTransaction();
         } catch (Exception e) {
             kafkaProducer.abortTransaction();
         }finally {
             // 3. 关闭资源
             kafkaProducer.close();
         }
     }
    }
    

    8 数据有序

    QQ截图20220703213809.png

    9 数据乱序

    1)kafka在1.x版本之前保证数据单分区有序,条件如下:
    max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)。
    2)kafka在1.x及以后版本保证数据单分区有序,条件如下:
    (1)未开启幂等性
    max.in.flight.requests.per.connection需要设置为1。
    (2)开启幂等性
    max.in.flight.requests.per.connection需要设置小于等于5。
    原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的。
    QQ截图20220703214010.png