Kafka 2.8.0之后也可以配置不采用ZK

image.png

image.png

生产者如何提高吞吐量?

  • batch.size:批次大小,默认16k
  • linger.ms:等待时间,修改为5-100ms
  • compression.type:消息压缩,snappy
  • RecordAccumulator:缓冲区大小,修改为64m

    数据的可靠性

    image.png
    image.png

    分区副本包括Leader


    image.png

幂等性

image.png
幂等性默认开启,参数 enable.idempotence 默认为 true,false关闭。

事务

image.png

数据有序性

image.png

ZK中存储的Kafka信息

image.png

kafka Broker总体工作流程

image.png

kafka副本

image.png
image.png
image.png

kafka文件存储机制

image.png

kafka高效读写数据

image.png
4)页缓存+零拷贝
image.png

kafka消费方式

image.png
image.png

消费者组

image.png
image.png

image.png

消费者组消费流程

image.png

消费分区的分配以及再平衡

image.png

Range

image.png

RoundRobin

image.png

Sticky

image.png

offset

image.png

image.png
image.png

image.png
image.png

  1. public class CustomConsumerSeek {
  2. public static void main(String[] args) {
  3. // 0 配置信息
  4. Properties properties = new Properties();
  5. // 连接
  6. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
  7. // key value 反序列化
  8. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  9. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  10. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
  11. // 1 创建一个消费者
  12. KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
  13. // 2 订阅一个主题
  14. ArrayList<String> topics = new ArrayList<>();
  15. topics.add("first");
  16. kafkaConsumer.subscribe(topics);
  17. Set<TopicPartition> assignment= new HashSet<>();
  18. while (assignment.size() == 0) {
  19. kafkaConsumer.poll(Duration.ofSeconds(1));
  20. // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
  21. assignment = kafkaConsumer.assignment();
  22. }
  23. // 遍历所有分区,并指定 offset 从 1700 的位置开始消费
  24. for (TopicPartition tp: assignment) {
  25. kafkaConsumer.seek(tp, 1700);
  26. }
  27. // 3 消费该主题数据
  28. while (true) {
  29. ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
  30. for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  31. System.out.println(consumerRecord);
  32. }
  33. }
  34. }
  35. }

指定时间消费

  1. public class CustomConsumerForTime {
  2. public static void main(String[] args) {
  3. // 0 配置信息
  4. Properties properties = new Properties();
  5. // 连接
  6. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
  7. // key value 反序列化
  8. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  9. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  10. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
  11. // 1 创建一个消费者
  12. KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
  13. // 2 订阅一个主题
  14. ArrayList<String> topics = new ArrayList<>();
  15. topics.add("first");
  16. kafkaConsumer.subscribe(topics);
  17. Set<TopicPartition> assignment = new HashSet<>();
  18. while (assignment.size() == 0) {
  19. kafkaConsumer.poll(Duration.ofSeconds(1));
  20. // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
  21. assignment = kafkaConsumer.assignment();
  22. }
  23. HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
  24. // 封装集合存储,每个分区对应一天前的数据
  25. for (TopicPartition topicPartition : assignment) {
  26. timestampToSearch.put(topicPartition,
  27. System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
  28. }
  29. // 获取从 1 天前开始消费的每个分区的 offset
  30. Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);
  31. // 遍历每个分区,对每个分区设置消费时间。
  32. for (TopicPartition topicPartition : assignment) {
  33. OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
  34. // 根据时间指定开始消费的位置
  35. if (offsetAndTimestamp != null){
  36. kafkaConsumer.seek(topicPartition,
  37. offsetAndTimestamp.offset());
  38. }
  39. }
  40. // 3 消费该主题数据
  41. while (true) {
  42. ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
  43. for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  44. System.out.println(consumerRecord);
  45. }
  46. }
  47. }
  48. }

重复消费与漏消费

image.png
image.png

数据积压

image.png

参考资料

01_尚硅谷大数据技术之Kafka.pdf
03_尚硅谷大数据技术之Kafka(生产调优手册)V3.3.pdf
04_尚硅谷大数据技术之Kafka(源码解析)V3.3.pdf