一、生产者

1、消息发送

数据生产流程

image.png
image.png

  1. Producer创建时,会创建一个Sender线程并设置为守护线程。
  2. 生产消息时,内部其实是异步流程;生产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)。
  3. 批次发送的条件为:缓冲区数据大小达到batch.size或者linger.ms达到上限,哪个先达到就算哪个。
  4. 批次发送后,发往指定分区,然后落盘到broker;如果生产者配置了retrires参数大于0并且失败原因允许重试,那么客户端内部会对该消息进行重试。
  5. 落盘到broker成功,返回生产元数据给生产者。
  6. 元数据返回有两种方式:一种是通过阻塞直接返回,另一种是通过回调返回。

    必要参数配置

  • 生产中一般使用 avro 来进行数据的序列化
  • 由于 Kafka 中的数据都是字节数组,在将消息发送到 Kafka 之前需要先将数据序列化为字节数组
  • 序列化器的作用就是用于序列化要发送的消息的

Kafka使用 org.apache.kafka.common.serialization.Serializer 接口用于定义序列化器,将泛型指定类型的数据转换为字节数组
官方提供Serializer接口及其子接口和实现类:
org.apache.kafka.common.serialization.ByteArraySerializer
org.apache.kafka.common.serialization.ByteBufferSerializer
org.apache.kafka.common.serialization.BytesSerializer
org.apache.kafka.common.serialization.DoubleSerializer
org.apache.kafka.common.serialization.FloatSerializer
org.apache.kafka.common.serialization.IntegerSerializer
org.apache.kafka.common.serialization.StringSerializer
org.apache.kafka.common.serialization.LongSerializer
org.apache.kafka.common.serialization.ShortSerializer

  • 自定义序列化器

    • 需要实现 org.apache.kafka.common.serialization.Serializer<T> 接口,并实现其中的 serialize 方法
    • 实体类

      1. public class User {
      2. private Integer userId;
      3. private String username;
      4. ...
    • 序列化类

      1. public class UserSerializer implements Serializer<User> {
      2. @Override public void configure(Map<String, ?> configs, boolean isKey) {
      3. // do nothing
      4. }
      5. @Override public byte[] serialize(String topic, User data) {
      6. try {
      7. // 如果数据是null,则返回null
      8. if (data == null) return null;
      9. Integer userId = data.getUserId();
      10. String username = data.getUsername();
      11. int length = 0; byte[] bytes = null;
      12. if (null != username) {
      13. bytes = username.getBytes("utf-8");
      14. length = bytes.length;
      15. }
      16. ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
      17. buffer.putInt(userId);
      18. buffer.putInt(length);
      19. buffer.put(bytes);
      20. return buffer.array();
      21. } catch (UnsupportedEncodingException e) {
      22. throw new SerializationException("序列化数据异常");
      23. }
      24. }
      25. @Override public void close() {
      26. // do nothing
      27. }
      28. }

      分区器

      默认(DefaultPartitioner)分区计算:

  1. 如果record提供了分区号,则使用record提供的分区号
  2. 如果record没有提供分区号,则使用key的序列化后的值的hash值对分区数量取模
  3. 如果record没有提供分区号,也没有提供key,则使用轮询的方式分配分区号

① 首先在可用的分区中分配分区号
② 如果没有可用的分区,则在该主题所有分区中分配分区号

自定义分区器
1. 首先开发Partitioner接口的实现类

  1. public interface Partitioner extends Configurable, Closeable {
  2. /**
  3. * 为指定的消息记录计算分区值
  4. *
  5. * @param topic 主题名称
  6. * @param key 根据该key的值进行分区计算,如果没有则为null。
  7. * @param keyBytes key的序列化字节数组,根据该数组进行分区计算。如果没有key,则为 null
  8. * @param value 根据value值进行分区计算,如果没有,则为null
  9. * @param valueBytes value的序列化字节数组,根据此值进行分区计算。如果没有,则为 null
  10. * @param cluster 当前集群的元数据
  11. */
  12. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  13. //...分区号的自定义处理
  14. };
  15. /**
  16. * 关闭分区器的时候调用该方法
  17. */
  18. public void close();
  19. }

2. 在KafkaProducer中进行设置:**configs.put("partitioner.class", "xxx.xx.Xxx.class")**
image.png

拦截器

  • 拦截器,interception【producer和consumer】是在Kafka 0.10版本被引入的,主要用于实现Client端的定制化控制逻辑
  • 对于Producer而言,Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,Producer允许用户指定多个Interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是 org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
    • onSend(ProducerRecord):该方法封装进KafkaProducer.send方法中,即运行在用户主线程中。Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
    • onAcknowledgement(RecordMetadata, Exception):该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在Producer回调逻辑触发之前onAcknowledgement运行在Producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢Producer的消息发送效率。
    • close:关闭Interceptor,主要用于执行一些资源清理工作。

如前所述,Interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全
另外倘若指定了多个Interceptor,则Producer将按照指定顺序调用它们,并仅仅是捕获每个Interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。

自定义拦截器
1. 实现 ProducerInterceptor 接口

  1. public class InterceptorOne<KEY, VALUE> implements ProducerInterceptor<KEY, VALUE> {
  2. private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorOne.class);
  3. @Override public ProducerRecord<KEY, VALUE> onSend(ProducerRecord<KEY, VALUE> record) {
  4. System.out.println("拦截器1---go");
  5. // 此处根据业务需要对相关的数据作修改
  6. String topic = record.topic();
  7. Integer partition = record.partition();
  8. Long timestamp = record.timestamp();
  9. KEY key = record.key();
  10. VALUE value = record.value();
  11. Headers headers = record.headers();
  12. // 添加消息头
  13. headers.add("interceptor", "interceptorOne".getBytes());
  14. ProducerRecord<KEY, VALUE> newRecord = new ProducerRecord<KEY, VALUE>( topic, partition, timestamp, key,value, headers );
  15. return newRecord;
  16. }
  17. @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
  18. System.out.println("拦截器1---back"); if (exception != null) {
  19. // 如果发生异常,记录日志中
  20. LOGGER.error(exception.getMessage());
  21. }
  22. }
  23. @Override public void close() { }
  24. @Override public void configure(Map<String, ?> configs) { }
  25. }
  1. KafkaProducer 的设置中设置自定义的拦截器
    image.png

    2、原理剖析

    image.png
    由上图可以看出:KafkaProducer有两个基本线程:
  • 主线程:负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器RecoderAccumulator中;
    • 消息收集器RecoderAccumulator为每个分区都维护了一个Deque 类型的双端队列。
    • ProducerBatch 可以理解为是 ProducerRecord 的集合,批量发送有利于提升吞吐量,降低网络影响;
    • 由于生产者客户端使用 java.io.ByteBuffer 在发送消息之前进行消息保存,并维护了一个 BufferPool 实现 ByteBuffer 的复用;该缓存池只针对特定大小(batch.size 指定)的 ByteBuffer进行管理,对于消息过大的缓存,不能做到重复利用。每次追加一条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取一个ProducerBatch,判断当前消息的大小是否可以写入该批次中。若可以写入则写入;若不可以写入,则新建一个ProducerBatch,判断该消息大小是否超过客户端参数配置 batch.size 的值,不超过,则以 batch.size建立新的ProducerBatch,
    • 这样方便进行缓存重复利用;若超过,则以计算的消息大小建立对应的ProducerBatch ,缺点就是该内存不能被复用了。
  • Sender线程:

    • 该线程从消息收集器获取缓存的消息,将其处理为 的形式, Node 表示集群的broker节点。
    • 进一步将转化为形式,此时才可以向服务端发送数据。
    • 在发送之前,Sender线程将消息以 Map> 的形式保存到 InFlightRequests 中进行缓存,可以通过其获取 leastLoadedNode ,即当前Node中负载压力最小的一个,以实现消息的尽快发出。

      3、生产者参数配置【较完整】

      image.png
      image.png
      image.png
      image.png

      二、消费者

      1、概念入门

      消费者、消费组

  • 消费者从订阅的主题消费消息,消费消息的偏移量保存在Kafka的名字为 __consumer_offsets 的主题中,也可以将自己的偏移量存储到Zookeeper,需要设置offset.storage=zookeeper

    • 推荐使用Kafka来存储消费者的偏移量。因为 Zookeeper不适合高并发
  • 将多个从同一主题消费的消费者加入到一个消费组中,共享一个group_id

    • configs.put("group.id", "xxx");
    • 消费组均衡地给消费者分配分区,每个分区只由消费组中一个消费者消费
    • 如果向消费组中添加更多的消费者,超过主题分区数量,则有一部分消费者就会闲置,不会接收任何消息
      • 所以,必要时,需要为主题创建大量分区,在负载增长时可以加入更多的消费者,但是不要让消费者数量超过主题分区的数量
      • 向消费组添加消费者是横向扩展消费能力的主要方式
      • 横向扩展消费者和消费组不会对性能造成负面影响

        心跳机制

  • Kafka 的心跳是 Kafka ConsumerBroker 之间的健康检查,只有当 Broker Coordinator 正常时,Consumer 才会发送心跳

    broker端

  • sessionTimeoutMs 参数

  • broker 处理心跳的逻辑在 GroupCoordinator 类中:如果心跳超期, broker coordinator 会把消费者从 group 中移除,并触发 rebalance

    consumer端

  • sessionTimeoutMs参数,rebalanceTimeoutMs参数

  • 如果客户端发现心跳超期,客户端会标记 coordinator 为不可用,并阻塞心跳线程;如果超过了poll 消息的间隔超过了 rebalanceTimeoutMs,则 consumer 告知 broker 主动离开消费组,也会触发rebalance

    2、消息接收

    必要参数配置

    image.png
    image.png

    订阅

    主题和分区

  • Topic,Kafka用于分类管理消息的逻辑单元,类似与MySQL的数据库。

  • Partition,是Kafka下数据存储的基本单元,这个是物理上的概念。同一个topic的数据,会被分散的存储到多个partition中,这些partition可以在同一台机器上,也可以在多台机器上
    • 优势在于:有利于水平扩展,避免单台机器在磁盘空间和性能上的限制,同时可以通过复制来增加数据冗余性,提高容灾能力。为了做到均匀分布,通常partition的数量通常是Broker Server数量的整数倍
  • Consumer Group,同样是逻辑上的概念,是Kafka实现**单播广播**两种消息模型的手段。保证一个消费组获取到特定主题的全部的消息。
    • 在消费组内部,若干个消费者消费主题分区的消息,消费组可以保证一个主题的每个分区只被消费组中的一个消费者消费。

consumer 采用 pull 模式从 broker 中读取数据

  • 采用 pull 模式,consumer 可自主控制消费消息的速率, 可以自己控制消费方式(批量消费/逐条消费),还可以选择不同的提交方式从而实现不同的传输语义

    反序列化

  • Kafka 的 broker 中所有的消息都是字节数组,消费者获取到消息之后,需要先对消息进行反序列化处理,然后才能交给用户程序消费处理

  • 消费者的反序列化器包括key的和value的反序列化器

    • key.deserializer
    • value.deserializer
    • IntegerDeserializer
    • StringDeserializer

      自定义反序列化

  • 需要实现 org.apache.kafka.common.serialization.Deserializer<T> 接口

    com.lagou.kafka.demo.deserializer.UserDeserializer
    1. public class UserDeserializer implements Deserializer<User> {
    2. @Override public void configure(Map<String, ?> configs, boolean isKey) {
    3. }
    4. @Override public User deserialize(String topic, byte[] data) {
    5. ByteBuffer allocate = ByteBuffer.allocate(data.length);
    6. allocate.put(data);
    7. allocate.flip();
    8. int userId = allocate.getInt();
    9. int length = allocate.getInt();
    10. System.out.println(length);
    11. String username = new String(data, 8, length);
    12. return new User(userId, username);
    13. }
    14. @Override public void close() {
    15. }
    16. }

    地方
    1. public class MyConsumer {
    2. public static void main(String[] args) {
    3. Map<String, Object> configs = new HashMap<>();
    4. configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
    5. configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    6. configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);
    7. configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer1");
    8. configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    9. configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "con1");
    10. KafkaConsumer<String, User> consumer = new KafkaConsumer<String, User>(configs);
    11. consumer.subscribe(Collections.singleton("tp_user_01"));
    12. ConsumerRecords<String, User> records = consumer.poll(Long.MAX_VALUE);
    13. records.forEach(new Consumer<ConsumerRecord<String, User>>() {
    14. @Override public void accept(ConsumerRecord<String, User> record) {
    15. System.out.println(record.value());
    16. }
    17. });
    18. // 关闭消费者 consumer.close();
    19. }
    20. }

    位移提交

    自动提交

  • Kafka Consumer 后台提交

  • 开启自动提交: enable.auto.commit=true
  • 配置自动提交间隔:Consumer端: auto.commit.interval.ms ,默认 5s ```java Map configs = new HashMap<>(); configs.put(“bootstrap.servers”, “node1:9092”); configs.put(“group.id”, “mygrp”);

// 设置偏移量自动提交。自动提交是默认值。这里做示例 configs.put(“enable.auto.commit”, “true”);

// 偏移量自动提交的时间间隔 configs.put(“auto.commit.interval.ms”, “3000”); configs.put(“key.deserializer”, StringDeserializer.class); configs.put(“value.deserializer”, StringDeserializer.class); KafkaConsumer consumer = new KafkaConsumer (configs); consumer.subscribe(Collections.singleton(“tp_demo_01”)); while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.println(record.topic() + “\t” + record.partition() + “\t” + record.offset() + “\t” + record.key() + “\t” + record.value()); } }

  1. <a name="QtD8B"></a>
  2. ##### 自动提交位移的顺序
  3. - 配置 `enable.auto.commit = true`
  4. - Kafka会保证在开始调用 `poll` 方法时,提交上次 poll 返回的所有消息
  5. - 因此自动提交不会出现消息丢失,但会 **重复消费**
  6. <a name="krVy7"></a>
  7. ##### 重复消费举例
  8. - Consumer 每 5s 提交 offset
  9. - 假设提交 offset 后的 3s 发生了 Rebalance
  10. - Rebalance 之后的所有 Consumer 从上一次提交的 offset 处继续消费
  11. - 因此 Rebalance 发生前 3s 的消息会被重复消费
  12. <a name="opiPX"></a>
  13. #### 同步提交和异步提交
  14. <a name="RzOxn"></a>
  15. ##### 同步提交
  16. - 使用 `KafkaConsumer#commitSync()`:会提交 `KafkaConsumer#poll()` 返回的最新 offset
  17. - 该方法为**同步操作**,等待直到 offset 被成功提交才返回
  18. ```java
  19. while (true) {
  20. ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
  21. process(records);
  22. // 处理消息
  23. try {
  24. consumer.commitSync();
  25. } catch (CommitFailedException e) {
  26. handle(e); // 处理提交失败异常
  27. }
  28. }
  • commitSync 在处理完所有消息之后
  • 手动同步提交可以控制 offset 提交的时机和频率
  • 手动同步提交会:

    • 调用 commitSync 时,Consumer 处于阻塞状态,直到 Broker 返回结果
    • 会影响 TPS(Transactions Per Second)
    • 可以选择拉长提交间隔,但有以下问题:
      • 会导致 Consumer 的提交频率下降
      • Consumer 重启后,会有更多的消息被消费【重复消费】
        异步提交
  • KafkaConsumer#commitAsync()

    1. while (true) {
    2. ConsumerRecords<String, String> records = consumer.poll(3_000);
    3. process(records); // 处理消息
    4. consumer.commitAsync((offsets, exception) -> {
    5. if (exception != null) {
    6. handle(exception);
    7. }
    8. });
    9. }
  • commitAsync 出现问题不会自动重试!!!

  • 处理方式:

    1. try {
    2. while(true) {
    3. ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    4. process(records); // 处理消息
    5. commitAysnc(); // 使用异步提交规避阻塞
    6. }
    7. } catch(Exception e) {
    8. handle(e); // 处理异常
    9. } finally {
    10. try {
    11. consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
    12. } finally {
    13. consumer.close();
    14. }
    15. }

    消费者位移管理

  • Kafka中,消费者根据消息的位移顺序消费消息

  • 消费者的位移由消费者管理,可以存储于zookeeper中,也可以存储于Kafka主题 __consumer_offsets
  • Kafka提供了消费者API,让消费者可以管理自己的位移

API如下:KafkaConsumer
image.png
image.png
image.png
image.png

API实战

  1. /**
  2. * # 生成消息文件
  3. * [root@node1 ~]# for i in `seq 60`; do echo "hello lagou $i" >> nm.txt; done
  4. * # 创建主题,三个分区,每个分区一个副本
  5. * [root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_demo_01 --partitions 3 --replication-factor 1
  6. * # 将消息生产到主题中
  7. * [root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 -- topic tp_demo_01 < nm.txt
  8. *
  9. * 消费者位移管理
  10. */
  11. public class MyConsumerMgr1 {
  12. public static void main(String[] args) {
  13. Map<String, Object> configs = new HashMap<>();
  14. configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
  15. configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  16. configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  17. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
  18. /**
  19. * 给当前消费者手动分配一系列主题分区。
  20. * 手动分配分区不支持增量分配,如果先前有分配分区,则该操作会覆盖之前的分配。
  21. * 如果给出的主题分区是空的,则等价于调用unsubscribe方法。
  22. * 手动分配主题分区的方法不使用消费组管理功能。当消费组成员变了,或者集群或主题 的元数据改变了,不会触发分区分配的再平衡。
  23. *
  24. * 手动分区分配assign(Collection)不能和自动分区分配 subscribe(Collection, ConsumerRebalanceListener)一起使用。
  25. * 如果启用了自动提交偏移量,则在新的分区分配替换旧的分区分配之前,会对旧的分区 分配中的消费偏移量进行异步提交。
  26. **/
  27. // consumer.assign(Arrays.asList(new TopicPartition("tp_demo_01", 0)));
  28. //
  29. // Set<TopicPartition> assignment = consumer.assignment();
  30. // for (TopicPartition topicPartition : assignment) {
  31. // System.out.println(topicPartition);
  32. // }
  33. // 获取对用户授权的所有主题分区元数据。该方法会对服务器发起远程调用。
  34. // Map<String, List<PartitionInfo>> stringListMap = consumer.listTopics();
  35. //
  36. // stringListMap.forEach((k, v) -> {
  37. // System.out.println("主题:" + k);
  38. // v.forEach(info -> {
  39. // System.out.println(info);
  40. // });
  41. // });
  42. // Set<String> strings = consumer.listTopics().keySet();
  43. //
  44. // strings.forEach(topicName -> {
  45. // System.out.println(topicName);
  46. // });
  47. // List<PartitionInfo> partitionInfos = consumer.partitionsFor("tp_demo_01");
  48. // for (PartitionInfo partitionInfo : partitionInfos) {
  49. // Node leader = partitionInfo.leader();
  50. // System.out.println(leader);
  51. // System.out.println(partitionInfo);
  52. //
  53. // 当前分区在线副本
  54. // Node[] nodes = partitionInfo.inSyncReplicas();
  55. //
  56. // 当前分区下线副本
  57. // Node[] nodes1 = partitionInfo.offlineReplicas();
  58. // }
  59. // 手动分配主题分区给当前消费者
  60. consumer.assign(Arrays.asList(
  61. new TopicPartition("tp_demo_01", 0),
  62. new TopicPartition("tp_demo_01", 1),
  63. new TopicPartition("tp_demo_01", 2)
  64. ));
  65. // 列出当前主题分配的所有主题分区
  66. // Set<TopicPartition> assignment = consumer.assignment();
  67. // assignment.forEach(k -> {
  68. // System.out.println(k);
  69. // });
  70. // 对于给定的主题分区,列出它们第一个消息的偏移量。
  71. // 注意,如果指定的分区不存在,该方法可能会永远阻塞。
  72. // 该方法不改变分区的当前消费者偏移量。
  73. // Map<TopicPartition, Long> topicPartitionLongMap = consumer.beginningOffsets(consumer.assignment());
  74. //
  75. // topicPartitionLongMap.forEach((k, v) -> {
  76. // System.out.println("主题:" + k.topic() + "\t分区:" + k.partition() + "偏移量\t" + v);
  77. // });
  78. // 将偏移量移动到每个给定分区的最后一个。
  79. // 该方法延迟执行,只有当调用过poll方法或position方法之后才可以使用。
  80. // 如果没有指定分区,则将当前消费者分配的所有分区的消费者偏移量移动到最后。
  81. // 如果设置了隔离级别为:isolation.level=read_committed,则会将分区的消费 偏移量移动到
  82. // 最后一个稳定的偏移量,即下一个要消费的消息现在还是未提交状态的事务消息。
  83. // consumer.seekToEnd(consumer.assignment());
  84. // 将给定主题分区的消费偏移量移动到指定的偏移量,即当前消费者下一条要消费的消息 偏移量。
  85. // 若该方法多次调用,则最后一次的覆盖前面的。
  86. // 如果在消费中间随意使用,可能会丢失数据。
  87. // consumer.seek(new TopicPartition("tp_demo_01", 1), 10);
  88. //
  89. //
  90. // 检查指定主题分区的消费偏移量
  91. // long position = consumer.position(new TopicPartition("tp_demo_01", 1));
  92. // System.out.println(position); consumer.seekToEnd(Arrays.asList(new TopicPartition("tp_demo_01", 1)));
  93. // 检查指定主题分区的消费偏移量
  94. long position = consumer.position(new TopicPartition("tp_demo_01", 1));
  95. System.out.println(position);
  96. //
  97. 关闭生产者 consumer.close();
  98. }
  99. }

再均衡

  • rebalance,也叫重平衡,是kafka为人诟病最多的一个点
  • 重平衡其实就是一个协议,它规定了如何让消费者组下的所有消费者来分配topic中的每一个分区
  • 比如一个topic有100个分区,一个消费者组内有20个消费者,在协调者的控制下让组内每一个消费者分配到5个分区,这个分配的过程就是重平衡。
  • 重平衡的触发条件主要有三个:
    1. 消费者组内成员发生变更,这个变更包括了增加和减少消费者,比如消费者宕机退出消费组。
    2. 主题的分区数发生变更,kafka目前只支持增加分区,当增加的时候就会触发重平衡
    3. 订阅的主题发生变化,当消费者组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会触发重平衡

进行组内分区的分配的策略:

  • RangeAssignor
  • RoundRobinAssignor
  • StickyAssignor

Kafka提供了一个角色:Group Coordinator 来执行对于消费组的管理

  • Group Coordinator —— 每个消费组分配一个消费组协调器用于组管理和位移管理。当消费组的第一个消费者启动的时候,它会去和Kafka Broker确定谁是它们组的组协调器。之后该消费组内所有消费者和该组协调器协调通信
  • 如何确定coordinator:
    1. 确定消费组位移信息写入 __consumers_offsets 的哪个分区。具体计算公式:

__consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
注意:groupMetadataTopicPartitionCount由 offsets.topic.num.partitions 指定,默认是50个分区
b. 该分区leader所在的broker就是组协调器

为什么说重平衡为人诟病呢?

  • 因为重平衡过程中,消费者无法从kafka消费消息,这对kafka的TPS影响极大,而如果kafka集内节点较多,比如数百个,那重平衡可能会耗时极多。数分钟到数小时都有可能,而这段时间kafka基本处于不可用状态。所以在实际环境中,应该尽量避免重平衡发生

如何避免重平衡?

  • 完全避免重平衡是不可能,因为你无法完全保证消费者不会故障。而消费者故障其实也是最常见的引发重平衡的地方,所以我们需要保证尽力避免消费者故障
    • 而其他几种触发重平衡的方式:增加分区、增加订阅的主题、增加消费者,更多的是主动控制
  • 如果消费者真正挂掉了,就没办法了,但实际中,会有一些情况我们可以避免,比如:kafka错误地认为一个正常的消费者已经挂掉了

    • 首先要知道哪些情况会出现错误判断挂掉的情况
      • 在分布式系统中,通常是通过心跳来维持分布式系统的,kafka也不例外。在分布式系统中,由于网络问题你不清楚没接收到心跳,是因为对方真正挂了还是只是因为负载过重没来得及发生心跳或是网络堵塞。所以一般会约定一个时间,超时即判定对方挂了。
        • 而在kafka消费者场景中,**session.timout.ms** 参数就是规定这个超时时间是多少
        • 还有一个参数,**heartbeat.interval.ms**,这个参数控制发送心跳的频率,频率越高越不容易被误判,但也会消耗更多资源
        • 最后一个参数,**max.poll.interval.ms**,消费者poll数据后,需要一些处理,再进行拉取。如果两次拉取时间间隔超过这个参数设置的值,那么消费者就会被踢出消费者组。这个参数的默认值是5分钟,而如果消费者接收到数据后会执行耗时的操作,则应该将其设置得大一些
        • 三个参数,
          • session.timout.ms 控制心跳超时时间
          • heartbeat.interval.ms 控制心跳发送频率
          • max.poll.interval.ms 控制poll的间隔
        • 这里给出一个相对较为合理的配置,如下:
          • session.timout.ms:设置为 6s
          • heartbeat.interval.ms:设置 2s
          • max.poll.interval.ms:推荐为消费者处理消息最长耗时再加 1 分钟

            消费者拦截器

  • 消费者在拉取了分区消息之后,要首先经过反序列化器对key和value进行反序列化处理。

  • 处理完之后,如果消费端设置了拦截器,则需要经过拦截器的处理之后,才能返回给消费者应用程序进行处理

    • 消费端定义消息拦截器,需要实现:
      • org.apache.kafka.clients.consumer.ConsumerInterceptor<K, V> 接口。
        1. 一个可插拔接口,允许拦截甚至更改消费者接收到的消息。首要的用例在于将第三方组件引入消费者应用程序,用于定制的监控、日志处理等。
        2. 该接口的实现类通过configre方法获取消费者配置的属性,如果消费者配置中没有指定clientID,还可以获取KafkaConsumer生成的clientId。
          • 获取的这个配置是跟其他拦截器共享的,需要保证不会在各个拦截器之间产生冲突。
        3. ConsumerInterceptor方法抛出的异常会被捕获、记录,但是不会向下传播。如果用户配置了错误的key或value类型参数,消费者不会抛出异常,而仅仅是记录下来
        4. ConsumerInterceptor回调发生在 org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)方法同一个线程。

          拦截器代码实现

  • 一、二、三个拦截器依次执行

    1. public class OneInterceptor implements ConsumerInterceptor<String, String> {
    2. @Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
    3. // poll方法返回结果之前最后要调用的方法
    4. System.out.println("One -- 开始");
    5. // 消息不做处理,直接返回
    6. return records;
    7. }
    8. @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    9. // 消费者提交偏移量的时候,经过该方法
    10. System.out.println("One -- 结束");
    11. }
    12. @Override public void close() {
    13. // 用于关闭该拦截器用到的资源,如打开的文件,连接的数据库等
    14. }
    15. @Override public void configure(Map<String, ?> configs) {
    16. // 用于获取消费者的设置参数
    17. configs.forEach((k, v) -> {
    18. System.out.println(k + "\t" + v);
    19. });
    20. }
    21. }

    消费者使用拦截器

    1. public class MyConsumer {
    2. public static void main(String[] args) {
    3. Properties props = new Properties();
    4. props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
    5. props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    6. props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    7. props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "mygrp");
    8. // props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "myclient");
    9. // 如果在kafka中找不到当前消费者的偏移量,则设置为最旧的
    10. props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    11. // 配置拦截器
    12. // One -> Two -> Three,接收消息和发送偏移量确认都是这个顺序
    13. props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.lagou.kafka.demo.interceptor.OneInterceptor" +
    14. ",com.lagou.kafka.demo.interceptor.TwoInterceptor" +
    15. ",com.lagou.kafka.demo.interceptor.ThreeInterceptor" );
    16. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    17. // 订阅主题
    18. consumer.subscribe(Collections.singleton("tp_demo_01"));
    19. while (true) {
    20. final ConsumerRecords<String, String> records = consumer.poll(3_000);
    21. records.forEach(record -> {
    22. System.out.println(record.topic() +
    23. "\t" +
    24. record.partition() +
    25. "\t" +
    26. record.offset() + "\t" +
    27. record.key() + "\t" +
    28. record.value());
    29. });
    30. // consumer.commitAsync();
    31. // consumer.commitSync();
    32. }
    33. // consumer.close();
    34. }
    35. }

消费者参数配置补充

image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png

3、消费组管理

Rebalance Generation

  • 它表示Rebalance之后主题分区到消费组中消费者映射关系的一个版本,主要是用于保护消费组,隔离无效偏移量提交的。如上一个版本的消费者无法提交位移到新版本的消费组中,因为映射关系变了,消费的或许已经不是原来的那个分区了。每次group进行Rebalance之后,Generation号都会加1,表示消费组和分区的映射关系到了一个新版本
  • 如下图所示: Generation 1时group有3个成员,随后成员2退出组,消费组协调器触发Rebalance,消费组进入Generation 2,之后成员4加入,再次触发Rebalance,消费组进入Generation 3

image.png

协议(protocol)

  • kafka提供了5个协议来处理与消费组协调相关的问题:
    • Heartbeat请求:consumer需要定期给组协调器发送心跳来表明自己还活着
    • LeaveGroup请求:主动告诉组协调器我要离开消费组
    • SyncGroup请求:消费组Leader把分配方案告诉组内所有成员
    • JoinGroup请求:成员请求加入组
    • DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等。通常该请求是给管理员使用
  • 组协调器在再均衡的时候主要用到了前面4种请求

    liveness

  • 消费者如何向消费组协调器证明自己还活着? 通过定时向消费组协调器发送Heartbeat请求。如果超过了设定的超时时间,那么协调器认为该消费者已经挂了。一旦协调器认为某个消费者挂了,那么它就会开启新一轮再均衡,并且在当前其他消费者的心跳响应中添加“REBALANCE_IN_PROGRESS”,告诉其他消费者:重新分配分区

    再均衡过程

    再均衡分为2步:Join和Sync

  1. Join, 加入组。所有成员都向消费组协调器发送JoinGroup请求,请求加入消费组。一旦所有成员都发送了JoinGroup请求,协调i器从中选择一个消费者担任Leader的角色,并把组成员信息以及订阅信息发给Leader。
  2. Sync,Leader开始分配消费方案,即哪个消费者负责消费哪些主题的哪些分区。一旦完成分配,Leader会将这个方案封装进SyncGroup请求中发给消费组协调器,非Leader也会发SyncGroup请求,只是内容为空。消费组协调器接收到分配方案之后会把方案塞进SyncGroup的response中发给各个消费者。

注:消费组的分区分配方案在客户端执行。Kafka交给客户端可以有更好的灵活性。Kafka默认提供三种分配策略:range和round-robin和sticky。可以通过消费者的参数: q partition.assignment.strategy 来实现自己分配策略。

消费组状态机

消费组协调器根据状态机对消费组做不同的处理:
image.png
说明:

  1. Dead:组内已经没有任何成员的最终状态,组的元数据也已经被组协调器移除了。这种状态响应各种请求都是一个response: UNKNOWN_MEMBER_ID
  2. Empty:组内无成员,但是位移信息还没有过期。这种状态只能响应JoinGroup请求
  3. PreparingRebalance:组准备开启新的rebalance,等待成员加入
  4. AwaitingSync:正在等待leader consumer将分配方案传给各个成员
  5. Stable:再均衡完成,可以开始消费。

    三、主题

    1、管理

  • kafka-topics.sh 中的选项

image.png
image.png
image.png

  • 对以上参数的解释

image.png
image.png
image.png

创建主题

  1. kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_x - -partitions 1 --replication-factor 1
  2. kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_02 --partitions 3 --replication-factor 1 --config max.message.bytes=1048576 --config segment.bytes=10485760

查看主题

  1. kafka-topics.sh --zookeeper localhost:2181/myKafka --list
  2. kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_x
  3. kafka-topics.sh --zookeeper localhost:2181/myKafka --topics-with-overrides -- describe

修改主题

  1. kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_01 --partitions 2 --replication-factor 1
  2. kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 --config max.message.bytes=1048576
  3. kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_test_01
  4. kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 --config segment.bytes=10485760
  5. kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --delete-config max.message.bytes --topic topic_test_01

删除主题

  1. kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_x

image.png

2、增加分区

  • 通过命令行工具操作,主题的分区只能增加,不能减少,否则会报错
    • 报错内容:ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic myTop1 currently has 2 partitions, 1 would not be an increase.
  • 通过—alter修改主题的分区数,增加分区:
    1. kafka-topics.sh --zookeeper localhost/myKafka --alter --topic myTop1 -- partitions 2

    3、分区副本的分配【了解】

    副本分配的三个目标:
  1. 均衡地将副本分散于各个broker上
  2. 对于某个broker上分配的分区,它的其他副本在其他broker上
  3. 如果所有的broker都有机架信息,尽量将分区的各个副本分配到不同机架上的broker。

在不考虑机架信息的情况下:

  1. 第一个副本分区通过轮询的方式挑选一个broker,进行分配。该轮询从broker列表的随机位置进行轮询。
  2. 其余副本通过增加偏移进行分配。

    4、KafkaAdminClient应用

    说明
  • 除了使用Kafka的bin目录下的脚本工具来管理Kafka,还可以使用管理Kafka的API将某些管理查看的功能集成到系统中。在Kafka0.11.0.0版本之前,可以通过kafka-core包(Kafka的服务端,采用Scala编写)中的AdminClient和AdminUtils来实现部分的集群管理操作。Kafka0.11.0.0之后,又多了一个AdminClient,在kafka-client包下,一个抽象类,具体的实现是org.apache.kafka.clients.admin.KafkaAdminClient

功能与原理介绍

  • Kafka官网:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects
  • KafkaAdminClient包含了一下几种功能(以Kafka1.0.2版本为准):
  1. 创建主题:

createTopics(final Collection<NewTopic> newTopics, final CreateTopicsOptions options)

  1. 删除主题:

deleteTopics(final Collection<String> topicNames, DeleteTopicsOptions options)

  1. 列出所有主题:

listTopics(final ListTopicsOptions options)

  1. 查询主题:

describeTopics(final Collection<String> topicNames, DescribeTopicsOptions options)

  1. 查询集群信息:

describeCluster(DescribeClusterOptions options)

  1. 查询配置信息:

describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options)

  1. 修改配置信息:

alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options)

  1. 修改副本的日志目录:

alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, final AlterReplicaLogDirsOptions options)

  1. 查询节点的日志目录信息:

describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options)

  1. 查询副本的日志目录信息:

describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirsOptions options)

  1. 增加分区:

createPartitions(Map<String, NewPartitions> newPartitions, final CreatePartitionsOptions options)

  • 内部原理是使用Kafka自定义的一套二进制协议来实现,详细可以参见Kafka协议


用到的参数:**
image.png
image.png
image.png
主要操作步骤:

  • 客户端根据方法的调用创建相应的协议请求,比如创建Topic的createTopics方法,其内部就是发送CreateTopicRequest请求。
  • 客户端发送请求至Kafka Broker。
  • Kafka Broker处理相应的请求并回执,比如与CreateTopicRequest对应的是 CreateTopicResponse。 客户端接收相应的回执并进行解析处理。
  • 和协议有关的请求和回执的类基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是这些请求和响应类的两个父类。

综上,如果要自定义实现一个功能,只需要三个步骤:

  1. 自定义XXXOptions;
  2. 自定义XXXResult返回值;
  3. 自定义Call,然后挑选合适的XXXRequest和XXXResponse来实现Call类中的3个抽象方法。
  • 具体代码实现见课件

    5、偏移量管理

    Kafka 1.0.2,__consumer_offsets主题中保存各个消费组的偏移量。
    早期由zookeeper管理消费组的偏移量。

    查询方法

    通过原生 kafka 提供的工具脚本进行查询。

  • kafka-consumer-groups.sh

首先运行脚本,查看帮助:
image.png
image.png
image.png
image.png

四、分区

1、副本机制

Kafka在一定数量的服务器上对主题分区进行复制。
当集群中的一个broker宕机后系统可以自动故障转移到其他可用的副本上,不会造成数据丢失。

  1. 将复制因子为1的未复制主题称为复制主题。
  2. 主题的分区是复制的最小单元。
  3. 在非故障情况下,Kafka中的每个分区都有一个Leader副本和零个或多个Follower副本。
  4. 包括Leader副本在内的副本总数构成复制因子。
  5. 所有读取和写入都由Leader副本负责。
  6. 通常,分区比broker多,并且Leader分区在broker之间平均分配。

Follower分区像普通的Kafka消费者一样,消费来自Leader分区的消息,并将其持久化到自己的日志中。
允许Follower对日志条目拉取进行批处理
同步节点定义:

  1. 节点必须能够维持与ZooKeeper的会话(通过ZooKeeper的心跳机制)
  2. 对于Follower副本分区,它复制在Leader分区上的写入,并且不要延迟太多

Kafka提供的保证是,只要有至少一个同步副本处于活动状态,提交的消息就不会丢失

宕机如何恢复
(1)少部分副本宕机

  • 当leader宕机了,会从follower选择一个作为leader。当宕机的重新恢复时,会把之前commit的数据清空,重新从leader里pull数据。

(2)全部副本宕机

  • 当全部副本宕机了有两种恢复方式
  1. 等待ISR中的一个恢复后,并选它作为leader。(等待时间较长,降低可用性)
  2. 选择第一个恢复的副本作为新的leader,无论是否在ISR中。(并未包含之前leader commit的数据,因此造成数据丢失)

    2、Leader选举

    Leader副本和Follower副本之间的关系并不是固定不变的,在Leader所在的broker发生故障的时候,就需要进行分区的Leader副本和Follower副本之间的切换,需要选举Leader副本。
    如果某个分区所在的服务器出了问题,不可用,kafka会从该分区的其他的副本中选择一个作为新的Leader。之后所有的读写就会转移到这个新的Leader上。

如何选择新的leader:

  • 只有那些跟Leader保持同步的Follower才应该被选作新的Leader。
  • Kafka会在Zookeeper上针对每个Topic维护一个称为ISR(in-sync replica,已同步的副本)的集合,该集合中是一些分区的副本。
  • 只有当这些副本都跟Leader中的副本同步了之后,kafka才会认为消息已提交,并反馈给消息的生产者。
  • 如果这个集合有增减,kafka会更新zookeeper上的记录。
  • 显然通过ISR,kafka需要的冗余度较低,可以容忍的失败数比较高。假设某个topic有N+1个副本,kafka可以容忍N个服务器不可用

为什么不用少数服从多数的方法:

  • 少数服从多数是一种比较常见的一致性算发和Leader选举法。
  • 它的含义是只有超过半数的副本同步了,系统才会认为数据已同步;选择Leader时也是从超过半数的同步的副本中选择。
  • 这种算法需要较高的冗余度 ,跟Kafka比起来,浪费资源。

如果所有的**ISR**副本都失败了怎么办?
此时有两种方法可选:

  1. 等待ISR集合中的副本复活,
  2. 选择任何一个立即可用的副本,而这个副本不一定是在ISR集合中。
  • 需要设置 unclean.leader.election.enable=true
  • 这两种方法各有利弊,实际生产中按需选择。 如果要等待ISR副本复活,虽然可以保证一致性,但可能需要很长时间。而如果选择立即可用的副本,则很可能该副本并不一致。


总结:**

  • Kafka中Leader分区选举,通过维护一个动态变化的ISR集合来实现,一旦Leader分区丢掉,则从ISR中随机挑选一个副本做新的Leader分区。
  • 如果ISR中的副本都丢失了,则:
  1. 可以等待ISR中的副本任何一个恢复,接着对外提供服务,需要时间等待。
  2. 从OSR中选出一个副本做Leader副本,此时会造成数据丢失

    3、分区重新分配

  • 向已经部署好的Kafka集群里面添加机器,我们需要从已经部署好的Kafka节点中复制相应的配置文件,然后把里面的broker id修改成全局唯一的,最后启动这个节点即可将它加入到现有Kafka集群中。
  • 问题:新添加的Kafka节点并不会自动地分配数据,无法分担集群的负载,除非我们新建一个topic,需要手动将部分分区移到新添加的Kafka节点上,Kafka内部提供了相关的工具来重新分布某个topic的分区。
  • 在重新分布topic分区之前,我们先来看看现在topic的各个分区的分布位置:

image.png

  • 此处不需要zookeeper**,切记!!!**

image.png
image.png

  • 注意观察node11**上节点启动的时候的ClusterId,看和zookeeper节点上的ClusterId是否一致,如果是,证明node11和node1在同一个集群中。**
    • node11启动的Cluster ID:
      • image.png
    • zookeeper节点上的Cluster ID:
      • image.png
    • node1上查看zookeeper的节点信息:
      • image.png
      • 说明 node11 节点已经加入集群中了


  • 现在我们在现有集群的基础上再添加一个Kafka节点,然后使用Kafka自带的 kafka- reassign-partitions.sh 工具来重新分布分区。该工具有三种使用模式:

    • 1、**generate** 模式,给定需要重新分配的Topic,自动生成reassign plan(并不执行)
    • 2、**execute** 模式,根据指定的reassign plan重新分配Partition
    • 3、**verify** 模式,验证重新分配 Partition是否成功
  • 我们将分区3和4重新分布到broker1上,借助kafka-reassign-partitions.sh工具生成reassignplan,不过先得按照要求定义一个文件,里面说明哪些topic需要重新分区,文件内容如下:

    1. [root@node1 ~]# cat topics-to-move.json
    2. { "topics": [
    3. {
    4. "topic":"tp_re_01"
    5. }
    6. ],
    7. "version":1
    8. }
  • 然后使用 kafka-reassign-partitions.sh 工具生成 reassign plan

image.png

  • Proposed partition reassignment configuration下面生成的就是将分区重新分布到broker 1上的结果。我们将这些内容保存到名为result.json文件里面(文件名不重要,文件格式也不一定要以json为结尾,只要保证内容是 json 即可),然后执行这些reassign plan:

image.png

  • 校验reassign plan是否执行完成

image.png

  • 查看主题的细节:

image.png

  • 分区的分布的确和操作之前不一样了,broker 1上已经有分区分布上去了。使用 kafka-reassign-partitions.sh 工具生成的reassign plan只是一个建议,方便大家而已。其实我们自己完全可以编辑一个 reassign plan,然后执行它【如分区的配置节点可以按照我们的需要去设定】

    4、自动再均衡

  • 我们可以在新建主题的时候,手动指定主题各个Leader分区以及Follower分区的分配情况,即什么分区副本在哪个broker节点上。

  • 随着系统的运行,broker的宕机重启,会引发Leader分区和Follower分区的角色转换,最后可能Leader大部分都集中在少数几台broker上,由于Leader负责客户端的读写操作,此时集中Leader分区的少数几台服务器的网络I/O,CPU,以及内存都会很紧张。
  • Leader和Follower的角色转换会引起Leader副本在集群中分布不均衡,此时我们需要让Leader的分布重新恢复到一个均衡的状态。

  • 执行脚本:

    1. [root@node11 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create -- topic tp_demo_03 --replica-assignment "0:1,1:0,0:1"
  • 上述脚本执行的结果是:创建了主题tp_demo_03,有三个分区,每个分区两个副本,Leader副本在列表中第一个指定的brokerId上,Follower副本在随后指定的brokerId上。

image.png

  • 然后模拟broker0宕机的情况,使得 节点1 中的副本都变成了leader副本

image.png

  • 重新启动 broker1 后,分区的分配情况没有改变,依然是上面的分配
  • 是否有一种方式,可以让Kafka自动帮我们进行修改?改为初始的副本分配?
    • 此时,用到了Kafka提供的自动再均衡脚本kafka``-``preferred``-``replica-election.sh
    • 该工具会让每个分区的Leader副本分配在合适的位置,让Leader分区和Follower分区在服务器之间均衡分配。
    • 如果该脚本仅指定zookeeper地址,则会对集群中所有的主题进行操作,自动再平衡。

具体操作:

  1. 创建preferred-replica.json,内容如下:

image.png

  1. 执行操作

image.png

  1. 查看操作结果
    • 发现恢复到了最初的分配情况

image.png

  • 之所以是这样的分配,是因为我们在创建主题的时候:

    • --replica-assignment ``"0:1,1:0,0:1"
    • 在逗号分割的每个数值对中排在前面的是Leader分区,后面的是副本分区。那么所谓的preferredreplica,就是排在前面的数字就是Leader副本应该在的brokerId。

      5、修改分区副本

  • 实际项目中,我们可能由于主题的副本因子设置的问题,需要重新设置副本因子或者由于集群的扩展,需要重新设置副本因子【指定副本的数量,包括leader副本和follow副本】。

    • topic一旦使用又不能轻易删除重建,因此动态增加副本因子就成为最终的选择。
  • 说明:kafka 1.0版本配置文件默认没有default.replication.factor=x, 因此如果创建topic时,不指定–replication-factor 想, 默认副本因子为1. 我们可以在自己的server.properties中配置上常用的副本因子,省去手动调整。例如设置default.replication.factor=3, 详细内容可参考官方文档https://kafka.apache.org/documentation/#replication

    1. 创建主题:
      • kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_re_02 --partitions 3 --replication-factor 1
    1. 查看主题细节:

image.png

    1. 修改副本因子
      • 注:不能用 —alter 来修改副本因子,会报错
        • image.png
      • 使用 kafka``-``reassign``-partitions.sh 修改副本因子:
      • ① 创建 increment-replication-factor.json

image.png

  • ② 执行分配
    • image.png
        1. 查看主题细节
  • 发现分配成功

image.png

6、分区分配策略

  • 在Kafka中,每个Topic会包含多个分区,默认情况下一个分区只能被一个消费组下面的一个消费者消费,这里就产生了分区分配的问题。Kafka中提供了多重分区分配算法(PartitionAssignor)的实现:RangeAssignorRoundRobinAssignorStickyAssignor【推荐】

    ① RangeAssignor【kafka默认】

  • PartitionAssignor接口用于用户定义实现分区分配算法,以实现Consumer之间的分区分配。

  • 消费组的成员订阅它们感兴趣的Topic并将这种订阅关系传递给作为订阅组协调者的Broker。协调者选择其中的一个消费者来执行这个消费组的分区分配并将分配结果转发给消费组内所有的消费者。

Kafka默认采用RangeAssignor的分配算法

  • RangeAssignor对每个Topic进行独立的分区分配。对于每一个Topic,首先对分区按照分区ID进行数值排序,然后订阅这个Topic的消费组的消费者再进行字典排序,之后尽量均衡的将分区分配给消费者。这里只能是尽量均衡,因为分区数可能无法被消费者数量整除,那么有一些消费者就会多分配到一些分区。

image.png

  • RangeAssignor策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每一个Topic,RangeAssignor策略会将消费组内所有订阅这个Topic的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。
  • 这种分配方式明显的一个问题是随着消费者订阅的Topic的数量的增加,不均衡的问题会越来越严重,比如上图中4个分区3个消费者的场景,C0会多分配一个分区。如果此时再订阅一个分区数为4的Topic,那么C0又会比C1、C2多分配一个分区,这样C0总共就比C1、C2多分配两个分区了,而且随着Topic的增加,这个情况会越来越严重。
  • 字典序靠前的消费组中的消费者比较“贪婪

    • image.png

      ② RoungRobinAssignor

  • RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进行排序后尽量均衡的分配(RangeAssignor是针对单个Topic的分区进行排序分配的)。如果消费组内,消费者订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果是尽量均衡的(消费者之间分配到的分区数的差值不会超过1)。如果订阅的Topic列表是不同的,那么分配结果是不保证“尽量均衡”的,因为某些消费者不参与一些Topic的分配。

image.png

  • 相对于RangeAssignor,在订阅多个Topic的情况下,RoundRobinAssignor的方式能消费者之间尽量均衡的分配到分区(分配到的分区数的差值不会超过1——RangeAssignor的分配策略可能随着订阅的Topic越来越多,差值越来越大)。
  • 对于消费组内消费者订阅Topic不一致的情况:假设有两个个消费者分别为C0和C1,有2个TopicT1、T2,分别拥有3和2个分区,并且C0订阅T1和T2,C1订阅T2,那么RoundRobinAssignor的分配结果如下:

    • image.png

      ③ StickyAssignor

  • 尽管RoundRobinAssignor已经在RangeAssignor上做了一些优化来更均衡的分配分区,但是在一些情况下依旧会产生严重的分配偏差,比如消费组中订阅的Topic列表不相同的情况下。

  • 更核心的问题是无论是RangeAssignor,还是RoundRobinAssignor,当前的分区分配算法都没有考虑上一次的分配结果。显然,在执行一次新的分配之前,如果能考虑到上一次分配的结果,尽量少的调整分区分配的变动,显然是能节省很多开销的
    目标
  1. 分区的分配尽量的均衡
  2. 每一次重分配的结果尽量与上一次分配结果保持一致

    五、物理存储

    1、日志存储概述

  • Kafka 消息是以主题为单位进行归类,各个主题之间是彼此独立的,互不影响
  • 每个主题又可以分为一个或多个分区
  • 每个分区各自存在一个记录消息数据的日志文件

image.png

  • 上图中,创建了一个 tp_demo_01 主题,其存在6个 Parition,对应的每个Parition下存在一个 [Topic-Parition] 命名的消息日志文件。在理想情况下,数据流量分摊到各个 Parition 中,实现了负载均衡的效果。在分区日志文件中,你会发现很多类型的文件,比如: .index、.timestamp、.log、.snapshot 等
    • 其中,文件名一致的文件集合就称为 LogSement
    • image.png

      logSegment

  1. 分区日志文件中包含很多的 LogSegment
  2. Kafka 日志追加是顺序写入的
  3. LogSegment 可以减小日志文件的大小
  4. 进行日志删除的时候和数据查找的时候可以快速定位
  5. ActiveLogSegment 是活跃的日志分段,拥有文件拥有写入权限,其余的 LogSegment 只有只读的权限
  • 日志文件存在多种后缀文件,重点需要关注 .index.timestamp.log 三种类型

image.png

  • 每个 LogSegment 都有一个基准偏移量,表示当前 LogSegment 中 第一条消息的 offset
  • 偏移量是一个 64 位的长整形数,固定是20位数字,长度未达到,用 0 进行填补,索引文件和日志文件都由该作为文件名命名规则(00000000000000000000.index00000000000000000000.timestamp00000000000000000000.log)。
  • 如果日志文件名为 00000000000000000121.log ,则当前日志文件的一条数据偏移量就是 121(偏移量从 0 开始)

    image.png
    image.png
    配置项默认值说明

  • 偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系。

  • 时间戳索引文件则根据时间戳查找对应的偏移量。
  • Kafka 中的索引文件是以稀疏索引的方式构造消息的索引,并不保证每一个消息在索引文件中都有对应的索引项。
  • 每当写入一定量的消息 时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项。
  • 通过修改 log.index.interval.bytes 的值,改变索引项的密度

    切分文件

    当满足如下几个条件中的其中之一,就会触发文件的切分:
  1. 当前日志分段文件的大小超过了 broker 端参数 log.segment.bytes 配置的值。log.segment.bytes 参数的默认值为 1073741824,即 1GB。
  2. 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于 log.roll.ms 或 log.roll.hours 参数配置的值。如果同时配置了 log.roll.ms 和 log.roll.hours 参数,那么 log.roll.ms 的优先级高。默认情况下,只配置了 log.roll.hours 参数,其值为168,即 7 天。
  3. 偏移量索引文件或时间戳索引文件的大小达到 broker 端参数 log.index.size.max.bytes 配置的值。 log.index.size.max.bytes 的默认值为 10485760,即 10MB。
  4. 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于 Integer.MAX_VALUE ,即要追加的消息的偏移量不能转变为相对偏移量

为什么是 **Integer.MAX_VALUE **?

  • 1024 1024 1024=1073741824
  • 在偏移量索引文件中,每个索引项共占用 8 个字节,并分为两部分:
    • 相对偏移量:表示消息相对与基准偏移量的偏移量,占 4 个字节
    • 物理地址:消息在日志分段文件中对应的物理位置,也占 4 个字节
  • 4 个字节刚好对应 Integer.MAX_VALUE ,如果大于 Integer.MAX_VALUE ,则不能用 4 个字节进行表示了

索引文件切分过程:

  • 索引文件会根据 log.index.size.max.bytes 值进行预先分配空间,即文件创建的时候就是最大值
  • 当真正的进行索引文件切分的时候,才会将其裁剪到实际数据大小的文件。
  • 这一点是跟日志文件有所区别的地方。其意义降低了代码逻辑的复杂性

    2、日志存储

    索引

    ① 偏移量

  1. 位置索引保存在 index 文件中
  2. log日志默认每写入4Klog.index.interval.bytes设定的),会写入一条索引信息到index文件中,因此索引文件是稀疏索引不会为每条日志都建立索引信息
  3. log文件中的日志,是顺序写入的,由 message + 实际offset + position 组成
  4. 索引文件的数据结构则是由相对offset(4byte)+position(4byte)组成,由于保存的是相对第一个消息的相对offset,只需要4byte就可以了,可以节省空间,在实际查找后还需要计算回实际的offset,这对用户是透明的
  • 稀疏索引,索引密度不高,但是offset有序,二分查找的时间复杂度为O(lgN)

image.png

  • 可以通过如下命令解析 .index 文件

kafka-run-class.sh kafka.tools.DumpLogSegments ``--files ``00000000000000000000``.index ``--print-data-log ``| head

  • 注:offset 与 position 没有直接关系,因为会删除数据和清理日志

思考:如何查看偏移量为23**的消息?**

  • Kafka 中存在一个 ConcurrentSkipListMap 来保存在每个日志分段,通过跳跃表方式,定位到在 00000000000000000000.index ,通过二分法在偏移量索引文件中找到不大于 23 的最大索引项,即 offset 20 那栏,然后从日志分段文件中的物理位置为320 开始顺序查找偏移量为 23 的消息

② 时间戳

  • 在偏移量索引文件中,索引数据都是顺序记录 offset ,但时间戳索引文件中每个追加的索引时间戳必须大于之前追加的索引项,否则不予追加。在 Kafka 0.11.0.0 以后,消息信息中存在若干的时间戳信息。如果 broker 端参数 log.message.timestamp.type 设置为 LogAppendTIme ,那么时间戳必定能保持单调增长。反之如果是 CreateTime 则无法保证顺序。
    • 注:timestamp文件中的 offset 与 index 文件中的 relativeOffset 不是一一对应的,因为数据的写入是**各自追加**
  • 通过时间戳方式进行查找消息,需要通过查找时间戳索引和偏移量索引 两个文件
  • 时间戳索引索引格式:前八个字节表示时间戳,后四个字节表示偏移量
    • image.png


思考:查找时间戳为 1557554753430 **开始的消息?

  1. 查找该时间戳应该在哪个日志分段中。将1557554753430和每个日志分段中最大时间戳 largestTimeStamp 逐一对比,直到找到不小于1557554753430所对应的日志分段。日志分段中的 largestTimeStamp 的计算是:先查询该日志分段所对应时间戳索引文件,找到最后一条索引项,若最后一条索引项的时间戳字段值大于0,则取该值,否则取该日志分段的最近修改时间。
  2. 查找该日志分段的偏移量索引文件,查找该偏移量对应的物理地址。
  3. 日志文件中从 320 的物理位置开始查找不小于 1557554753430 数据

清理

① 日志删除

基于时间
  • 日志删除任务会根据 log.retention.hours/log.retention.minutes/log.retention.ms 设定日志保留的时间节点。如果超过该设定值,就需要进行删除。默认是 7 天, log.retention.ms 优先级最高。
  • Kafka 依据日志分段中最大的时间戳进行定位。
  • 首先要查询该日志分段所对应的时间戳索引文件,查找时间戳索引文件中最后一条索引项,若最后一条索引项的时间戳字段值大于 0,则取该值,否则取最近修改时间。

为什么不直接选最近修改时间呢?

  • 因为日志文件可以有意无意的被修改,并不能真实的反应日志分段的最大时间信息。

删除过程

  1. 从日志对象中所维护日志分段的跳跃表中移除待删除的日志分段,保证没有线程对这些日志分段进行读取操作。
  2. 这些日志分段所有文件添加 上 .delete 后缀。
  3. 交由一个以 “delete-file” 命名的延迟任务来删除这些 .delete 为后缀的文件。延迟执行时间可以通过 file.delete.delay.ms 进行设置

如果活跃的日志分段中也存在需要删除的数据时?

  • Kafka 会先切分出一个新的日志分段作为活跃日志分段,该日志分段不删除,删除原来的日志分段。
  • 先腾出地方,再删除

基于日志大小
  • 日志删除任务会检查当前日志的大小是否超过设定值。设定项为 log.retention.bytes ,单个日志分段的大小由 log.segment.bytes 进行设定

删除过程

  1. 计算需要被删除的日志总大小 (当前日志文件大小(所有分段)减去retention值)。
  2. 从日志文件第一个 LogSegment 开始查找可删除的日志分段的文件集合。
  3. 执行删除

基于偏移量
  • 根据日志分段的下一个日志分段的起始偏移量是否 <= 日志文件的起始偏移量,若是,则可以删除此日志分段。
  • 注:日志文件的起始偏移量并不一定等于第一个日志分段的基准偏移量,存在数据删除,可能与之相等的那条数据已经被删除了。

image.png


② 日志压缩策略【生产常用】

  • 日志压缩是Kafka的一种机制,可以提供较为细粒度的记录保留,而不是基于粗粒度的基于时间的保留。
  • 对于具有相同的Key,而数据不同,只保留最后一条数据,前面的数据在合适的情况下删除

应用场景:

  • 日志压缩特性,就实时计算来说,可以在异常容灾方面有很好的应用途径。比如,我们在Spark、Flink中做实时计算时,需要长期在内存里面维护一些数据,这些数据可能是通过聚合了一天或者一周的日志得到的,这些数据一旦由于异常因素(内存、网络、磁盘等)崩溃了,从头开始计算需要很长的时间。一个比较有效可行的方式就是定时将内存里的数据备份到外部存储介质中,当崩溃出现时,再从外部存储介质中恢复并继续计算。
  • 使用日志压缩来替代这些外部存储有哪些优势呢?
    • Kafka即是数据源又是存储工具,可以简化技术栈,降低维护成本
    • 使用外部存储介质的话,需要将存储的Key记录下来,恢复的时候再使用这些Key将数据取回,实现起来有一定的工程难度和复杂度。使用Kafka的日志压缩特性,只需要把数据写进Kafka,等异常出现恢复任务时再读回到内存就可以了
    • Kafka对于磁盘的读写做了大量的优化工作,比如磁盘顺序读写。相对于外部存储介质没有索引查询等工作量的负担,可以实现高性能。同时,Kafka的日志压缩机制可以充分利用廉价的
    • 磁盘,不用依赖昂贵的内存来处理,在性能相似的情况下,实现非常高的性价比(这个观点仅仅针对于异常处理和容灾的场景来说)

日志压缩方式的实现细节:

  • 主题的 cleanup.policy 需要设置为compact。
  • Kafka的后台线程会定时将Topic遍历两次:
  1. 记录每个key的hash值最后一次出现的偏移量
  2. 第二次检查每个offset对应的Key是否在后面的日志中出现过,如果出现了就删除对应的日志。
  • 日志压缩允许删除,除最后一个key之外,删除先前出现的所有该key对应的记录。在一段时间后从日志中清理,以释放空间。
  • 注:日志压缩与key有关,确保每个消息的key不为null

压缩是在Kafka后台通过定时重新打开Segment来完成的,Segment的压缩细节如下图所示:
image.png
日志压缩可以确保:

  1. 任何保持在日志头部以内的使用者都将看到所写的每条消息,这些消息将具有顺序偏移量。可以使用Topic的min.compaction.lag.ms属性来保证消息在被压缩之前必须经过的最短时间。也就是说,它为每个消息在(未压缩)头部停留的时间提供了一个下限。可以使用Topic的max.compaction.lag.ms属性来保证从收到消息到消息符合压缩条件之间的最大延时
  • 消息始终保持顺序,压缩永远不会重新排序消息,只是删除一些而已消息的偏移量永远不会改变,它是日志中位置的永久标识符
  • 从日志开始的任何使用者将至少看到所有记录的最终状态,按记录的顺序写入。另外,如果使用者在比Topic的log.cleaner.delete.retention.ms短的时间内到达日志的头部,则会看到已删除记录的所有delete标记。保留时间默认是24小时。

默认情况下,启动日志清理器,若需要启动特定Topic的日志清理,请添加特定的属性。配置日志清理器,总结如下:

  1. log.cleanup.policy 设置为 compact ,Broker的配置,影响集群中所有的Topic。
  2. log.cleaner.min.compaction.lag.ms ,用于防止对更新超过最小消息进行压缩,如果没有设置,除最后一个Segment之外,所有Segment都有资格进行压缩log.cleaner.max.compaction.lag.ms ,用于防止低生产速率的日志在无限制的时间内不压缩。
  • Kafka的日志压缩原理并不复杂,就是定时把所有的日志读取两遍,写一遍,而CPU的速度超过磁盘完全不是问题,只要日志的量对应的读取两遍和写入一遍的时间在可接受的范围内,那么它的性能就是可以接受的。

    3、磁盘存储

    零拷贝

  • kafka高性能,是多方面协同的结果,包括宏观架构、分布式partition存储、ISR数据同步、以及“无所不用其极”的高效利用磁盘/操作系统特性

  • 零拷贝并不是不需要拷贝,而是减少不必要的拷贝次数【磁盘和内核态之间的拷贝是无法避免的】。通常是说在IO读写过程中
  • nginx的高性能也有零拷贝的身影

  • 在传统 I/O 中,比如:读取文件,socket发送,传统方式实现是:先读取、再发送,实际经过1~4四次copy。

1、第一次:将磁盘文件,读取到操作系统内核缓冲区;
2、第二次:将内核缓冲区的数据,copy到application应用程序的buffer;
3、第三步:将application应用程序buffer中的数据,copy到socket网络发送缓冲区(属于操作系统内核的缓冲区);
4、第四次:将socket buffer的数据,copy到网络协议栈,由网卡进行网络传输。
image.png

  • 实际IO读写,需要进行IO中断,需要CPU响应中断(内核态到用户态转换),尽管引入DMA(Direct Memory Access,直接存储器访问)来接管CPU的中断请求,但四次copy是存在“不必要的拷贝”的。实际上并不需要第二个和第三个数据副本。数据可以直接从读缓冲区传输到套接字缓冲区。

  • kafka的两个过程:

1、网络数据持久化到磁盘 (Producer 到 Broker)
2、磁盘文件通过网络发送(Broker 到 Consumer)

  • 数据落盘通常都是非实时的,Kafka的数据并不是实时的写入硬盘,它充分利用了 现代操作系统分页存储 来利用内存提高I/O效率。

解析:**磁盘文件通过网络发送(Broker Consumer)**

  • 磁盘数据通过DMA(Direct Memory Access,直接存储器访问)拷贝到内核态 Buffer
  • 直接通过 DMA 拷贝到 NIC Buffer(socket buffer),无需 CPU 拷贝。
  • 除了减少数据拷贝外,整个读文件 ==> 网络发送由一个 sendfile 调用完成,整个过程只有两次上下文切换,因此大大提高了性能。
  • Java NIO对sendfile的支持就是FileChannel.transferTo()/transferFrom()
    • fileChannel.transferTo( position, count, socketChannel);
  • 把磁盘文件读取OS内核缓冲区后的fileChannel,直接转给socketChannel发送;底层就是sendfile。消费者从broker读取数据,就是由此实现

具体来看,Kafka 的数据传输通过 TransportLayer 来完成,其子类 PlaintextTransportLayer 通过 Java NIO 的 FileChannel 的 transferTo 和 transferFrom 方法实现零拷贝
image.png
注: transferTo 和 transferFrom 并不保证一定能使用零拷贝,需要操作系统支持。Linux 2.4+ 内核通过 sendfile 系统调用,提供了零拷贝


页缓存

  • 页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘 I/O 的操作。具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问
  • Kafka接收来自socket buffer的网络数据,应用进程不需要中间处理、直接进行持久化时。可以使用mmap内存文件映射。
  • Memory Mapped Files
    • 简称mmap,简单描述其作用就是:将磁盘文件映射到内存, 用户通过修改内存就能修改磁盘文件。
    • 工作原理是直接利用操作系统的Page来实现磁盘文件到物理内存的直接映射。完成映射之后,对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。

image.png

  • 通过mmap,进程像读写硬盘一样读写内存(当然是虚拟机内存)。使用这种方式可以获取很大的 I/O 提升,省去了用户空间到内核空间复制的开销。
  • mmap也有一个很明显的缺陷:不可靠:写到mmap中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘。
  • Kafka提供了一个参数 producer.type 来控制是不是主动flush;
  • 如果Kafka写入到mmap之后就立即 flush 然后再返回 Producer 叫同步(sync);
  • 写入mmap之后立即返回Producer不调用 flush 叫异步(async)。

Java NIO对文件映射的支持

  • Java NIO,提供了一个MappedByteBuffer 类可以用来实现内存映射
    • MappedByteBuffer只能通过调用 FileChannel 的 map() 取得
    • FileChannel.map()是抽象方法,具体实现是在 FileChannelImpl.map()可自行查看JDK源码,其map0()方法就是调用了Linux内核的mmap的API

image.png
image.png
使用 MappedByteBuffer**类 注意事项:**

  • mmap的文件映射,在 full gc 时才会进行释放。当close时,需要手动清除内存映射文件,可以反射调用sun.misc.Cleaner方法。

当一个进程准备**读取磁盘**上的文件内容时:

  1. 操作系统会先查看待读取的数据所在的页 (page)是否在页缓存(pagecache)中,如果存在(命中)则直接返回数据,从而避免了对物理磁盘的 I/O 操作;
  2. 如果没有命中,则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存,之后再将数据返回给进程。

如果一个进程需要将数据**写入磁盘**:

  1. 操作系统也会检测数据对应的页是否在页缓存中,如果不存在,则会先在页缓存中添加相应的页,最后将数据写入对应的页。
  2. 被修改过后的页也就变成了脏页【数据不一致】,操作系统会在合适的时间把脏页中的数据写入磁盘,以保持数据的一致性。
  • 对一个进程而言,它会在进程内部缓存处理所需的数据,然而这些数据有可能还缓存在操作系统的页缓存中,因此同一份数据有可能被缓存了两次。并且,除非使用Direct I/O的方式, 否则页缓存很难被禁止。
  • 当使用页缓存的时候,即使Kafka服务重启, 页缓存还是会保持有效,然而进程内的缓存却需要重建。这样也极大地简化了代码逻辑,因为维护页缓存和文件之间的一致性交由操作系统来负责,这样会比进程内维护更加安全有效。
  • Kafka**中大量使用了页缓存,这是** Kafka 实现高吞吐的重要因素之一。
  • 消息先被写入页缓存,由操作系统负责刷盘任务

顺序写入

  • 操作系统可以针对线性读写做深层次的优化,比如预读(read-ahead,提前将一个比较大的磁盘块读入内存) 和后写(write-behind,将很多小的逻辑写操作合并起来组成一个大的物理写操作)技术

image.png

  • Kafka 在设计时采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消息,并且也不允许修改已写入的消息,这种方式属于典型的顺序写盘的操作,所以就算 Kafka 使用磁盘作为存储介质,也能承载非常大的吞吐量。

mmap **sendfile: **

  1. Linux内核提供、实现零拷贝的API;
  2. sendfile 是将读到内核空间的数据,转到socket buffer,进行网络发送;
  3. mmap将磁盘文件映射到内存,支持读和写,对内存的操作会反映在磁盘文件上。
  4. RocketMQ 在消费消息时,使用了 mmap。kafka 使用了 sendFile。

Kafka**速度快的原因:**

  1. partition顺序读写,充分利用磁盘特性,这是基础;
  2. Producer生产的数据持久化到broker,采用mmap文件映射,实现顺序的快速写入;
  3. Customer从broker读取数据,采用sendfile,将磁盘文件读到OS内核缓冲区后,直接转到 socket buffer 进行网络发送。

六、稳定性

1、事务

① 事务场景

  1. 如producer发的多条消息组成一个事务这些消息需要对consumer同时可见或者同时不可见。
  2. producer可能会给多个topic,多个partition发消息,这些消息也需要能放在一个事务里面,这就形成了一个典型的分布式事务。
  3. kafka的应用场景经常是应用先消费一个topic,然后做处理再发到另一个topic,这个consume-transform-produce过程需要放到一个事务里面,比如在消息处理或者发送的过程中如果失败了,消费偏移量也不能提交。
  4. producer或者producer所在的应用可能会挂掉,新的producer启动以后需要知道怎么处理之前未完成的事务 。
  5. 在一个原子操作中,根据包含的操作类型,可以分为三种情况,前两种情况是事务引入的场景,最后一种没用。

    • 只有Producer生产消息;
    • 消费消息和生产消息并存,这个是事务场景中最常用的情况,就是我们常说的 consume-transform-produce 模式
    • 只有consumer消费消息,这种操作其实没有什么意义,跟使用手动提交效果一样,而且也不是事务属性引入的目的,所以一般不会使用这种情况

      ② 关键概念

  6. 因为producer发送消息可能是分布式事务,所以引入了常用的2PC,所以有事务协调者(Transaction Coordinator)。Transaction Coordinator和之前为了解决脑裂惊群问题引入的Group Coordinator在选举上类似。

  7. 事务管理中事务日志是必不可少的,kafka使用一个内部 topic 来保存事务日志,这个设计和之前使用内部topic保存偏移量的设计保持一致。事务日志是Transaction Coordinator管理的状态的持久化,因为不需要回溯事务的历史状态,所以事务日志只用保存最近的事务状态。__transaction_state
  8. 因为事务存在 commit abort 两种操作,而客户端又有 read committed 和 read uncommitted 两种隔离级别,所以消息队列必须能标识事务状态,这个被称作Control Message。
  9. producer 挂掉重启或者漂移到其它机器需要能关联的之前的未完成事务所以需要有一个唯一标识符来进行关联,这个就是TransactionalId,一个producer挂了,另一个有相同TransactionalId的producer能够接着处理这个事务未完成的状态。kafka目前没有引入全局序,所以也没有transaction id,这个TransactionalId是用户提前配置的。
  10. TransactionalId 能关联 producer,也需要避免两个使用相同 **TransactionalId producer 同时存在**,所以引入了 producer epoch 来保证对应一个TransactionalId只有一个活跃的producer

    ③ 事务语义

    多分区原子写入
  • 事务能够保证Kafka topic下每个分区的原子写入。事务中所有的消息都将被成功写入或者丢弃。
  • 首先,我们来考虑一下原子 读取-处理-写入 周期是什么意思。简而言之,这意味着如果某个应用程序在某个topic tp0的偏移量X处读取到了消息A,并且在对消息A进行了一些处理(如B = F(A))之后将消息B写入topic tp1,则只有当消息A和B被认为被成功地消费并一起发布,或者完全不发布时,整个读取过程写入操作是原子的
  • 现在,只有当消息A的偏移量X被标记为已消费,消息A才从topic tp0消费,消费到的数据偏移量(record offset)将被标记为提交偏移量(Committing offset)。在Kafka中,我们通过写入一个名为offsets topic的内部Kafka topic来记录offset commit。消息仅在其offset被提交给offsets topic时才被认为成功消费。
  • 由于 offset commit 只是对 Kafka topic 的另一次写入,并且由于消息仅在提交偏移量时被视为成功消费,所以跨多个主题和分区的原子写入也启用原子 读取-处理-写入 循环:提交偏移量X到offset topic和消息B到tp1的写入将是单个事务的一部分,所以整个步骤都是原子的。

粉碎**僵尸实例”**

  • 我们通过为每个事务Producer分配一个称为transactional.id的唯一标识符来解决僵尸实例的问题。在进程重新启动时能够识别相同的Producer实例。
  • API要求事务性Producer的第一个操作应该是在Kafka集群中显示注册transactional.id。 当注册的时候,Kafka broker用给定的transactional.id检查打开的事务并且完成处理。 Kafka也增加了一个与transactional.id相关的epoch。Epoch存储每个transactional.id内部元数据。
  • 一旦epoch被触发,任何具有相同的transactional.id和旧的epoch的生产者被视为僵尸,Kafka拒绝来自这些生产者的后续事务性写入。
  • 简而言之:Kafka可以保证Consumer最终只能消费非事务性消息已提交事务性消息。它将保留来自未完成事务的消息,并过滤掉已中止事务的消息。

事务消息定义
生产者可以显式地发起事务会话,在这些会话中发送(事务)消息,并提交或中止事务。有如下要求:

  1. 原子性:消费者的应用程序不应暴露于==未提交事务==的消息中。
  2. 持久性:Broker不能丢失任何已提交的事务。
  3. 排序:事务消费者应在每个分区中以原始顺序查看事务消息。
  4. 交织:每个分区都应该能够接收来自事务性生产者和非事务生产者的消息
  5. 事务中不应有重复的消息。

如果允许事务性和非事务性消息的交织,则非事务性和事务性消息的相对顺序将基于附加(对于非事务性消息)和最终提交(对于事务性消息)的相对顺序。

④ 事务配置

1、创建消费者代码,需要:

  • 将配置中的自动提交属性(auto.commit)进行关闭,而且在代码里面也不能使用手动提交 commitSync( ) 或者 commitAsync( )
  • 设置isolation.level:READ_COMMITTED或READ_UNCOMMITTED

2、创建生成者,需要:

  • 配置transactional.id属性
  • 配置enable.idempotence属性

    ⑤ 事务概览

    生产者将表示事务开始/结束/中止状态的事务控制消息发送给使用多阶段协议管理事务的高可用事务协调器。生产者将事务控制记录(开始/结束/中止)发送到事务协调器,并将事务的消息直接发送到目标数据分区。消费者需要了解事务并缓冲每个待处理的事务,直到它们到达其相应的结束(提交/中止)记录为止。

  • 事务组

  • 事务组中的生产者
  • 事务组的事务协调器
  • Leader brokers(事务数据所在分区的Broker)
  • 事务的消费者

    事务组

  • 事务组用于映射到特定的事务协调器(基于日志分区数字的哈希)。该组中的生产者需要配置为该组事务生产者。由于来自这些生产者的所有事务都通过此协调器进行,因此我们可以在这些事务生产者之间实现严格的有序。

    生产者ID和事务组状态

  • 事务生产者需要两个新参数:生产者ID生产组

  • 需要将生产者的输入状态与上一个已提交的事务相关联。这使事务生产者能够重试事务(通过为该事务重新创建输入状态;在我们的用例中通常是偏移量的向量)。
  • 可以使用消费者偏移量管理机制来管理这些状态。消费者偏移量管理器将每个键(consumergroup-topic-partition )与该分区的最后一个检查点偏移量和元数据相关联。在事务生产者中,我们保存消费者的偏移量,该偏移量与事务的提交点关联。此偏移提交记录(在consumer_offsets 主题中)应作为事务的一部分写入。即,存储消费组偏移量的consumer_offsets 主题分区将需要参与事务。因此,假定生产者在事务中间失败(事务协调器随后到期);当生产者恢复时,它可以发出偏移量获取请求,以恢复与最后提交的事务相关联的输入偏移量,并从该点恢复事务处理。
  • 为了支持此功能,我们需要对偏移量管理器和压缩的 __consumer_offsets 主题进行一些增强。

    • 首先,压缩的主题现在还将包含事务控制记录。我们将需要为这些控制记录提出剔除策略。
    • 其次,偏移量管理器需要具有事务意识;特别是,如果组与==待处理的事务==相关联,则偏移量提取请求应返回错误。

      事务协调器

      事务协调器是 __transaction_state 主题特定分区的Leader分区所在的Broker。它负责初始化、提交以及回滚事务。事务协调器在内存管理如下的状态:
  • 对应正在处理的事务的第一个消息的HW。事务协调器周期性地将HW写到ZK。

  • 事务控制日志中存储对应于日志HW的所有正在处理的事务:
  • 事务消息主题分区的列表。
    • 事务的超时时间。
    • 与事务关联的Producer ID。
  • 需要确保无论是什么样的保留策略(日志分区的删除还是压缩),都不能删除包含事务HW的日志分段。

    ⑨ 事务流程

  • ⑩ 事务的中止

    当事务生产者发送业务消息的时候如果发生异常,可以中止该事务。如果事务提交超时,事务协调器也会中止当前事务。

  • Producer:向事务协调器发送AbortTransaction(TxId)请求并等待响应。(一个没有异常的响应表示事务将会中止)

  • Coordinator:向事务控制主题追加PREPARE_ABORT(TxId)消息,然后向生产者发送响应。
  • Coordinator:向事务业务数据的目标主题的每个涉及到的Leader分区Broker发送AbortTransaction(TxId, partitions…)请求。(收到Leader分区Broker响应后,事务协调器中止动作跟上面的提交类似。)

    11. 基本事务流程的失败

  • 生产者发送BeginTransaction(TxId):的时候超时或响应中包含异常,生产者使用相同的TxId重试。

  • 生产者发送数据时的Broker错误:生产者应中止(然后重做)事务(使用新的TxId)。如果生产者没有中止事务,则协调器将在事务超时后中止事务。仅在可能已将请求数据附加并复制到Follower的错误的情况下才需要重做事务。例如,生产者请求超时将需要重做,而NotLeaderForPartitionException不需要重做。
  • 生产者发送CommitTransaction(TxId)请求超时或响应中包含异常,生产者使用相同的TxId重试事务。此时需要幂等性。

    12. 主题压缩

  • 压缩主题在压缩过程中会丢弃具有相同键的早期记录。如果这些记录是事务的一部分,这合法吗?这可能有点怪异,但可能不会太有害,因为在主题中使用压缩策略的理由是保留关键数据的最新更新。

  • 如果该应用程序正在(例如)更新某些表,并且事务中的消息对应于不同的键,则这种情况可能导致数据库视图不一致

    13. 事务相关配置

    Broker configs

    image.png
    image.png

    Producer configs

    image.png

    Consumer configs

    image.png

(1)幂等性

image.png
image.png

(2)事务操作

在Kafka事务中,一个原子性操作,根据操作类型可以分为3种情况。情况如下:

  • 只有Producer生产消息,这种场景需要事务的介入;
  • 消费消息和生产消息并存,比如Consumer&Producer模式,这种场景是一般Kafka项目中比较常见的模式,需要事务介入;
  • 只有Consumer消费消息,这种操作在实际项目中意义不大,和手动Commit Offsets的结果一样,而且这种场景不是事务的引入目的。

    事务 API

    ```java // 初始化事务,需要注意确保transation.id属性被分配 void initTransactions();

// 开启事务 void beginTransaction() throws ProducerFencedException;

// 为Consumer提供的在事务内Commit Offsets的操作 void sendOffsetsToTransaction(Map offsets, String consumerGroupId) throws ProducerFencedException;

// 提交事务 void commitTransaction() throws ProducerFencedException;

// 放弃事务,类似于回滚事务的操作 void abortTransaction() throws ProducerFencedException;

  1. <a name="FAS7L"></a>
  2. #### 案例1:单个Produce
  3. - 使用事务保证消息的仅一次发送
  4. ```java
  5. public class MyTransactionalProducer {
  6. public static void main(String[] args) {
  7. Map<String, Object> configs = new HashMap<>();
  8. configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
  9. configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  10. configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  11. // 提供客户端ID
  12. configs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer");
  13. // 事务ID
  14. configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my_tx_id");
  15. // 要求ISR都确认
  16. configs.put(ProducerConfig.ACKS_CONFIG, "all");
  17. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
  18. // 初始化事务
  19. producer.initTransactions();
  20. // 开启事务
  21. producer.beginTransaction();
  22. try {
  23. // producer.send(new ProducerRecord<>("tp_tx_01", "tx_msg_01"));
  24. producer.send(new ProducerRecord<>("tp_tx_01", "tx_msg_02"));
  25. // int i = 1 / 0;
  26. //
  27. 提交事务 producer.commitTransaction();
  28. } catch (Exception ex) {
  29. // 中止事务
  30. producer.abortTransaction();
  31. } finally {
  32. // 关闭生产者
  33. producer.close();
  34. }
  35. }
  36. }

案例2:消费者+生产者

  • 在 消费-转换-生产 模式,使用事务保证仅一次发送

    1. public class MyTransactional {
    2. public static KafkaProducer<String, String> getProducer() {
    3. Map<String, Object> configs = new HashMap<>();
    4. configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
    5. configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    6. configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    7. // 设置client.id
    8. configs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer_01");
    9. // 设置事务id
    10. configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id_02");
    11. // 需要所有的ISR副本确认
    12. configs.put(ProducerConfig.ACKS_CONFIG, "all");
    13. // 启用幂等性
    14. configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    15. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
    16. return producer;
    17. }
    18. public static KafkaConsumer<String, String> getConsumer(String consumerGroupId) {
    19. Map<String, Object> configs = new HashMap<>();
    20. configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
    21. configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    22. configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    23. // 设置消费组ID
    24. configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_grp_02");
    25. // 不启用消费者偏移量的自动确认,也不要手动确认
    26. configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    27. configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_client_02");
    28. configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    29. // 只读取已提交的消息
    30. // configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    31. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
    32. return consumer;
    33. }
    34. public static void main(String[] args) {
    35. String consumerGroupId = "consumer_grp_id_101";
    36. KafkaProducer<String, String> producer = getProducer();
    37. KafkaConsumer<String, String> consumer = getConsumer(consumerGroupId);
    38. // 事务的初始化
    39. producer.initTransactions();
    40. //订阅主题
    41. consumer.subscribe(Collections.singleton("tp_tx_01"));
    42. final ConsumerRecords<String, String> records = consumer.poll(1_000);
    43. // 开启事务
    44. producer.beginTransaction();
    45. try {Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<> ();
    46. for (ConsumerRecord<String, String> record : records) {
    47. System.out.println(record);
    48. producer.send(new ProducerRecord<String, String> ("tp_tx_out_01", record.key(), record.value()));
    49. offsets.put(
    50. new TopicPartition(record.topic(), record.partition()),
    51. new OffsetAndMetadata(record.offset() + 1)); // 偏移量表示下一条要消费的消息
    52. }
    53. // 将该消息的偏移量提交作为事务的一部分,随事务提交和回滚(不提交消费偏移量)
    54. producer.sendOffsetsToTransaction(offsets, consumerGroupId);
    55. // int i = 1 / 0;
    56. // 提交事务
    57. producer.commitTransaction();
    58. } catch (Exception e) {
    59. e.printStackTrace();
    60. // 回滚事务
    61. producer.abortTransaction();
    62. } finally {
    63. // 关闭资源
    64. producer.close();
    65. consumer.close();
    66. }
    67. }
    68. }

    2、控制器

  • Kafka集群包含若干个broker,broker.id指定broker的编号,编号不要重复。

  • Kafka集群上创建的主题,包含若干个分区。
  • 每个分区包含若干个副本,副本因子包括了Follower副本和Leader副本。
  • 副本又分为ISR(同步副本分区)和OSR(非同步副本分区)


  • 控制器就是一个broker,除了一般broker的功能,还负责Leader分区的选举

    broker选举

  • 集群里第一个启动的broker在Zookeeper中创建临时节点 <KafkaZkChroot>/controller

  • 其他broker在该控制器节点创建Zookeeper watch对象,使用Zookeeper的监听机制接收该节点的变更。
    • 即:Kafka通过Zookeeper的分布式锁特性选举集群控制器

下图中,节点 /myKafka/controller 是一个zookeeper临时节点,其中 “brokerid”:0 ,表示当前控制器是 broker.id 为 0 的broker
image.png

  • 每个新选出的控制器通过 Zookeeper 的条件递增操作获得一个全新的、数值更大的 controller epoch。其他 broker 在知道当前 controller epoch 后,如果收到由控制器发出的包含较旧epoch 的消息,就会忽略它们,以防止“脑裂”。
  • 比如当一个Leader副本分区所在的broker宕机,需要选举新的Leader副本分区,有可能两个具有不同纪元数字的控制器都选举了新的Leader副本分区,如果选举出来的Leader副本分区不一样,听谁的?脑裂了。有了纪元数字,直接使用纪元数字最新的控制器结果。

当控制器发现一个 broker 已经离开集群,那些失去Leader副本分区的Follower分区需要一个新 Leader(这些分区的首领刚好是在这个 broker 上)。

  1. 控制器需要知道哪个broker宕机了?
  2. 控制器需要知道宕机的broker上负责的时候哪些分区的Leader副本分区?

下图中, <KafkaChroot>/brokers/ids/0 保存该broker的信息,此节点为临时节点,如果 broker 节点宕机,该节点丢失。

  • 集群控制器负责监听 ids 节点,一旦节点子节点发送变化,集群控制器得到通知

image.png
结论:

  1. Kafka 使用 Zookeeper 的分布式锁选举控制器,并在节点加入集群或退出集群时通知控制器。
  2. 控制器负责在节点加入或离开集群时进行分区Leader选举。
  3. 控制器使用epoch 来避免“脑裂”【“脑裂”是指两个节点同时认为自己是当前的控制器】

    3、可靠性保证

    概念

  4. 创建Topic的时候可以指定 —replication-factor 3 ,表示分区的副本数,不要超过broker的数量

  5. Leader是负责读写的节点,而其他副本则是Follower。Producer只把消息发送到Leader,Follower定期地到Leader上Pull数据。
  6. ISR是Leader负责维护的与其保持同步的Replica列表,即当前活跃的副本列表。如果一个Follow落后太多,Leader会将它从ISR中移除。落后太多意思是该Follow复制的消息Follow长时间没有向Leader发送fetch请求(参数: replica.lag.time.max.ms 默认值:10000)。
  7. 为了保证可靠性,可以设置 acks=all 。Follower收到消息后,会像Leader发送ACK。一旦Leader收到了ISR中所有Replica的ACK,Leader就commit,那么Leader就向Producer发送ACK。

副本的分配:

  • 当某个topic的 —replication-factor 为N(N>1)时,每个Partition都有N个副本,称作replica。原则上是将replica均匀的分配到整个集群上。不仅如此,partition的分配也同样需要均匀分配,为了更好的负载均衡。

副本分配的三个目标:

  1. 均衡地将副本分散于各个broker上
  2. 对于某个broker上分配的分区,它的其他副本在其他broker上
  3. 如果所有的broker都有机架信息,尽量将分区的各个副本分配到不同机架上的broker。

在不考虑机架信息的情况下:

  1. 第一个副本分区通过轮询的方式挑选一个broker,进行分配。该轮询从broker列表的随机位置进行轮询。
  2. 其余副本通过增加偏移进行分配。

    失效副本

    失效副本的判定
    replica.lag.time.max.ms 默认大小为10000。
  • 当ISR中的一个Follower副本滞后Leader副本的时间超过参数 replica.lag.time.max.ms 指定的值时即判定为副本失效,需要将此Follower副本剔出除ISR。
  • 具体实现原理:当Follower副本将Leader副本的LEO之前的日志全部同步时,则认为该Follower副本已经追赶上Leader副本,此时更新该副本的lastCaughtUpTimeMs标识。
  • Kafka的副本管理器(ReplicaManager)启动时会启动一个副本过期检测的定时任务,而这个定时任务会定时检查当前时间与副本的lastCaughtUpTimeMs差值是否大于参数 replica.lag.time.max.ms 指定的值。
  • Kafka源码注释中说明了一般有两种情况会导致副本失效:
    1. 1. Follower副本进程卡住,在一段时间内没有向Leader副本发起同步请求,比如频繁的Full GC
    2. 1. Follower副本进程同步过慢,在一段时间内都无法追赶上Leader副本,比如IO开销过大。
  • 如果通过工具增加了副本因子,那么新增加的副本在赶上Leader副本之前也都是处于失效状态的。
  • 如果一个Follower副本由于某些原因(比如宕机)而下线,之后又上线,在追赶上Leader副本之前也是出于失效状态。
  • 失效副本的分区个数是用于衡量Kafka**性能指标的重要部分**。Kafka本身提供了一个相关的指标,即 UnderReplicatedPartitions,这个可以通过JMX访问:

    kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions

取值范围是>=0的整数。注意:如果Kafka集群正在做分区迁移(kafka-reassign-partitions.sh)的时候,这个值也会大于0。

副本复制

  • 日志复制算法(log replication algorithm)必须提供的基本保证是:如果它告诉客户端消息已被提交,而当前**Leader出现故障,新选出的Leader**也必须具有该消息。在出现故障时,Kafka会从挂掉 Leader 的ISR里面选择一个Follower作为这个分区新的Leader。
  • 每个分区的 leader 会维护一个in-sync replica(同步副本列表,又称 ISR)。当Producer向broker 发送消息,消息先写入到对应Leader分区,然后复制到这个分区的所有副本中。ACKS=ALL时,只有将消息成功复制到所有同步副本(ISR)后,这条消息才算被提交。

什么情况下会导致一个副本与** leader **失去同步:
一个副本与 leader 失去同步的原因有很多,主要包括:

  • 慢副本(Slow replica):follower replica 在一段时间内一直无法赶上 leader 的写进度。造成这种情况的最常见原因之一是 follower replica 上的 I/O瓶颈,导致它持久化日志的时间比它从 leader 消费消息的时间要长;
  • 卡住副本(Stuck replica):follower replica 在很长一段时间内停止从 leader 获取消息。这可能是以为 GC 停顿,或者副本出现故障;
  • 刚启动副本(Bootstrapping replica):当用户给某个主题增加副本因子时,新的 follower replicas 是不同步的,直到它跟上 leader 的日志。当副本落后于 leader 分区时,这个副本被认为是不同步或滞后的。在 Kafka中,副本的滞后于 Leader是根据 replica.lag.time.max.ms 来衡量。

如何确认某个副本处于**滞后状态**

  • 通过 replica.lag.time.max.ms 来检测卡住副本(Stuck replica)在所有情况下都能很好地工作。它跟踪 follower 副本没有向 leader 发送获取请求的时间,通过这个可以推断 follower 是否正常。
  • 另一方面,使用消息数量检测不同步慢副本(Slow replica)的模型只有在为单个主题或具有同类流量模式的多个主题设置这些参数时才能很好地工作,但我们发现它不能扩展到生产集群中所有主题。

4、一致性保证

概念

1. 水位标记

  • 水位或水印(watermark)一词,表示位置信息,即位移(offset)。Kafka源码中使用的名字是高水位,HW(high watermark)。

2. 副本角色

  • Kafka分区使用多个副本(replica)提供高可用。

3. LEO和HW

  • 每个分区副本对象都有两个重要的属性:LEOHW
    • LEO:即日志末端位移(log end offset),记录了该副本日志中下一条消息的位移值。如果 LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。另外,Leader LEO和Follower LEO**的更新是有区别的**。
    • HW:即上面提到的水位值。对于同一个副本对象而言,其**HW值不会大于LEO值。小于等于HW值的所有消息都被认为是“已备份”的(replicated)。Leader副本和Follower**副本的HW更新不同

image.png

  • 上图中,HW值是7,表示位移是 0~7 的所有消息都已经处于“已提交状态”(committed),而LEO值是14,8~13的消息就是未完全备份(fully replicated)——为什么没有14?LEO指向的是下一条消息到来时的位移【还没到来】
  • 消费者无法消费分区下Leader副本中位移大于分区HW的消息
    Follower副本何时更新LEO
    Follower副本不停地向Leader副本所在的broker发送FETCH请求,一旦获取消息后写入自己的日志中进行备份。那么Follower副本的LEO是何时更新的呢?首先必须言明,Kafka有两套Follower副本 LEO:
  1. 一套LEO保存在Follower副本所在Broker的副本管理机中;
  2. 另一套LEO保存在Leader副本所在Broker的副本管理机中。Leader副本机器上保存了所有的follower**副本的**LEO

Kafka使用前者帮助Follower副本更新其HW值;利用后者帮助Leader副本更新其HW。

  1. Follower副本的本地LEO何时更新? Follower副本的LEO值就是日志的LEO值,每当新写入一条消息,**LEO**值就会被更新。当Follower发送FETCH请求后,Leader将数据返回给Follower,此时Follower开始Log写数据,从而自动更新LEO值。
  2. Leader端Follower的LEO何时更新? Leader端的Follower的LEO更新发生在**Leader在处理 Follower FETCH **请求时。一旦Leader接收到Follower发送的FETCH请求,它先从Log中读取相应的数据,给Follower返回数据前,先更新Follower的LEO。
    Follower副本何时更新HW
  • Follower更新**HW发生在其更新LEO**之后,一旦Follower向Log写完数据,尝试更新 自己的HW值。
  • 比较follower当前LEO值与FETCH响应中Leader的HW值,取两者的小者作为新的HW值。
  • 即:如果Follower的LEO大于Leader的HW,Follower HW值不会大于Leader的HW值。

    Leader副本何时更新LEO
  • 和Follower更新LEO相同,Leader写Log时自动更新自己的LEO值。

    Leader副本何时更新HW

    Leader的HW值就是分区HW值直接影响分区数据对消费者的可见性
    Leader会尝试去更新分区HW的四种情况:

  1. Follower副本成为Leader副本时:Kafka会尝试去更新分区HW。
  2. Broker崩溃导致副本被踢出ISR时:检查下分区HW值是否需要更新是有必要的。
  3. 生产者向Leader副本写消息时:因为写入消息会更新Leader的LEO,有必要检查HW值是否需要更新
  4. Leader处理Follower FETCH请求时:首先从Log读取数据,之后尝试更新分区HW值

结论:
当Kafka broker都正常工作时,分区HW值的更新时机有两个:

  1. Leader 处理PRODUCE请求时
  2. Leader 处理FETCH请求时。

  • Leader如何更新自己的HW值?Leader broker**上保存了一套Follower副本的LEO以及自己的**LEO。当尝试确定分区HW时,它会选出所有满足条件的副本,比较它们的LEO(包括Leader的LEO),并选择**最小的LEO**作为HW值

需要满足的条件,(二选一):

  1. 处于ISR中
  2. 副本LEO落后于Leader LEO的时长不大于 replica.lag.time.max.ms 参数值(默认是10s)

如果Kafka只判断第一个条件的话,确定分区HW值时就不会考虑这些未在ISR中的副本,但这些副本已经具备了“立刻进入ISR”的资格,因此就可能出现分区HW值越过ISR中副本LEO的情况——不允许。因为分区HW定义就是ISR中所有副本LEO的最小值

HW和LEO正常更新案例
  • 我们假设有一个 topic,单分区,副本因子是2,即一个 Leader 副本和一个 Follower 副本。当 producer 发送一条消息时,broker 端的副本到底会发生什么事情以及分区 HW 是如何被更新的呢?
    • 1、初始状态
      • 初始时Leader和Follower的HW和LEO都是0(严格来说源代码会初始化LEO为-1,不过这不影响之后的讨论)。Leader中的 Remote LEO 指的就是Leader端保存的Follower LEO,也被初始化成0。此时,生产者没有发送任何消息给Leader,而Follower已经开始不断地给Leader发送FETCH请求了,但因为没有数据因此什么都不会发生。值得一提的是,Follower发送过来的FETCH请求因为无数据而暂时会被寄存到Leader端的purgatory**(炼狱)**中,待500ms ( replica.fetch.wait.max.ms 参数)超时后会强制完成。倘若在寄存期间生产者发来数据,则Kafka会自动唤醒该FETCH请求,让Leader继续处理。

image.png

  • 2、follower发送fetch请求在Leader处理完produce请求之后
    • producer给该topic分区发送了一条消息,状态如下图所示:

image.png

  1. - 如上图所示,Leader接收到PRODUCE请求主要做两件事情:
  2. 1. 把消息写入Log,同时自动更新Leader自己的LEO
  3. 1. 尝试更新Leader HW值。假设此时Follower尚未发送FETCH请求,Leader端保存的Remote LEO依然是0,因此Leader会比较它自己的LEO值和Remote LEO值,发现最小值是0,与当前HW值相同,故不会更新分区HW值(仍为0
  4. - PRODUCE 请求处理完成后各值如下,Leader端的HW值依然是0,而LEO1Remote LEO也是0

image.png

  1. - **假设此时follower发送了FETCH请求**,则状态变更如下:

image.png

  1. - 本例中当follower发送FETCH请求时,Leader端的处理依次是:
  2. 1. 读取Log数据
  3. 1. 更新remote LEO = 0(为什么是0 因为此时Follower还没有写入这条消息。Leader如何确认Follower还未写入呢?这是通过Follower发来的FETCH请求中的`Fetch offset`来确定的)
  4. 1. 尝试更新分区HW:此时Leader LEO = 1Remote LEO = 0,故分区HW值= min(Leader LEO, Follower Remote LEO) = 0
  5. 1. 把数据和当前分区HW值(依然是0)发送给Follower副本
  6. - Follower副本接收到FETCH Response后依次执行下列操作:
  7. 1. 写入本地Log,同时更新Follower自己管理的 LEO1
  8. 1. 更新Follower HW:比较本地LEO FETCH Response 中的当前Leader HW值,取较小者,Follower HW = 0
  9. - 此时,第一轮FETCH RPC结束,我们会发现虽然LeaderFollower都已经在Log中保存了这条消息,但分区HW值尚未被更新,仍为0

image.png

  1. - **Follower第二轮FETCH**
  2. - 分区HW是在第二轮FETCH RPC中被更新的,如下图所示:

image.png

  1. - Follower发来了第二轮FETCH请求,Leader端接收到后仍然会依次执行下列操作:
  2. 1. 读取Log数据
  3. 1. 更新Remote LEO = 1(这次为什么是1了? 因为这轮FETCH RPC携带的fetch offset1,那么为什么这轮携带的就是1了呢,因为上一轮结束后Follower LEO被更新为1了)
  4. 1. 尝试更新分区HW:此时leader LEO = 1Remote LEO = 1,故分区HW值= min(Leader LEO, Follower Remote LEO) = **1**。
  5. 1. 把数据(实际上没有数据)和当前分区HW值(已更新为1)发送给Follower副本作为Response
  6. - 同样地,Follower副本接收到FETCH response后依次执行下列操作:
  7. 1. 写入本地Log,当然没东西可写,Follower LEO也不会变化,依然是1
  8. 1. 更新Follower HW:比较本地LEO和当前LeaderHW取小者。由于都是1,故更新follower HW= 1

image.png

  1. - 此时消息已经成功地被复制到LeaderFollowerLog中且分区HW1,表明消费者能够消费 offset = 0的消息
  • 3、FETCH请求保存在purgatory【炼狱】中,PRODUCE请求到来
    • 当Leader无法立即满足FECTH返回要求的时候(比如没有数据),那么该FETCH请求被暂存到Leader 端的purgatory中(炼狱),待时机成熟尝试再次处理。Kafka不会无限期缓存,默认有个超时时间(500ms),一旦超时时间已过,则这个请求会被强制完成。当寄存期间还没超时,生产者发送PRODUCE请求从而使之满足了条件以致被唤醒。此时,Leader端处理流程如下:
      1. Leader写Log(自动更新Leader LEO)
      2. 尝试唤醒在purgatory中寄存的FETCH请求
      3. 尝试更新分区HW
        HW和LEO异常
  • Kafka使用HW值来决定副本备份的进度,而HW**值的更新通常需要额外一轮**FETCH RPC才能完成。但这种设计是有问题的,可能引起的问题包括:
    1. 备份数据丢失
    2. 备份数据不一致
      Leader Epoch使用

Kafka解决方案

规避数据丢失

规避数据不一致

5、消息重复的场景及解决方案

消息重复和丢失是kafka中很常见的问题,主要发生在以下三个阶段:

  1. 生产者阶段
  2. broke 阶段
  3. 消费者阶段

    6、__consumer_offsets

    七、延时队列

  • 两个follower副本都已经拉取到了leader副本的最新位置,此时又向leader副本发送拉取请求,而leader副本并没有新的消息写入,那么此时leader副本该如何处理呢?可以直接返回空的拉取结果给follower副本,不过在leader副本一直没有新消息写入的情况下,follower副本会一直发送拉取请求,并且总收到空的拉取结果,消耗资源

八、重试队列