• 基于发布、订阅模式的消息队列

基础架构

image.png

工作流程

image.png

  • producer 生产 追加到log文件,每条数据有offset

文件存储机制

分片+索引

image.png

image.png
“.index”文件存储大量的索引信息
“.log”文件存储大量的数据
索引文件中的元数据指向对应数据文件中message的物理偏移地址。

生产者

分区策略

image.png

  • 有partition,直接partition值
  • 没有partition,但有key 取 hash(key)%partition
  • 都没有 积攒成batch 发送到随机新的分区

数据可靠性

image.png

  • 副本同步策略
    • 半数以上 2n+1 最坏情况n台故障
    • 全部完成同步
  • ISR 形成小圈子 小圈子完成同步即可。 长时间未同步 踢出圈子。 选举也是圈子中选
  • ack应答
    • 0 producer 不等待broker ack
    • 1 producer 等待broker ack
    • -1 落盘后菜返回ack
  • 故障细节

At Least Once + 幂等性= Exactly Once

幂等性:无论producer发送多少重复数据 server只持续化一条

消费者

  • 分区分配策略
    • roundrobin
    • range
  • offset维护 —-consumer自己记录自己消费哪个offset

高速读写

  • 顺序写磁盘
  • 应用Pagecache
    • I/O Scheduler小块组装成大块
    • I/O Scheduler写操作重新顺序排好
    • 充分利用空闲内存
    • 消写速度相同,无需磁盘
    • 重启进程PageCache可用
  • 零复制

image.png

ProducerAPI

  1. package com.Kafka;
  2. import org.apache.kafka.clients.producer.Callback;
  3. import org.apache.kafka.clients.producer.KafkaProducer;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import org.apache.kafka.clients.producer.RecordMetadata;
  6. import org.junit.After;
  7. import org.junit.Before;
  8. import org.junit.Test;
  9. import java.util.Properties;
  10. import java.util.concurrent.ExecutionException;
  11. public class producer {
  12. Properties properties = null;
  13. KafkaProducer<String, String> producer = null;
  14. @Before
  15. public void init() {
  16. properties = new Properties();
  17. properties.put("bootstrap.servers", "node01:9092");//kafka集群,broker-list
  18. properties.put("acks", "all");
  19. properties.put("retries", 1);//重试次数
  20. properties.put("batch.size", 16384);//批次大小
  21. properties.put("linger.ms", 1);//等待时间
  22. properties.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小
  23. properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  24. properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  25. producer = new KafkaProducer<String, String>(properties);
  26. }
  27. /***
  28. * 异步
  29. * 无回调
  30. * 有回调
  31. *
  32. */
  33. @Test
  34. public void withoutcallback() {
  35. for (int i = 0; i < 100; i++) {
  36. producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)));
  37. }
  38. }
  39. @Test
  40. public void withcallback() {
  41. for (int i = 0; i < 100; i++) {
  42. producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)), new Callback() {
  43. //回调函数,该方法会在Producer收到ack时调用,为异步调用
  44. @Override
  45. public void onCompletion(RecordMetadata metadata, Exception exception) {
  46. if (exception == null) {
  47. System.out.println("success->" + metadata.offset());
  48. } else {
  49. exception.printStackTrace();
  50. }
  51. }
  52. });
  53. }
  54. }
  55. /***
  56. * 同步
  57. *
  58. */
  59. @Test
  60. public void sync() throws ExecutionException, InterruptedException {
  61. for (int i = 0; i < 100; i++) {
  62. producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i))).get();
  63. }
  64. }
  65. @After
  66. public void close() {
  67. producer.close();
  68. }
  69. }

ConsumerAPI

  1. package com.Kafka;
  2. import org.apache.kafka.clients.consumer.*;
  3. import org.apache.kafka.common.TopicPartition;
  4. import org.junit.After;
  5. import org.junit.Before;
  6. import org.junit.Test;
  7. import java.util.Arrays;
  8. import java.util.Map;
  9. import java.util.Properties;
  10. public class comsumer {
  11. Properties properties = null;
  12. KafkaConsumer<String, String> comsumer = null;
  13. @Before
  14. public void init() {
  15. properties = new Properties();
  16. properties.put("bootstrap.servers", "node01:9092");//kafka集群,broker-list
  17. properties.put("group.id", "test");
  18. // properties.put("enable.auto.commit", "true");
  19. properties.put("auto.commit.interval.ms", "1000");
  20. properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  21. properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  22. }
  23. /***
  24. * 自动提交offset
  25. * 手动提交offset
  26. * 同步提交
  27. * 异步提交
  28. *
  29. */
  30. @Test
  31. public void autoSubmit() {
  32. properties.put("enable.auto.commit", "true");
  33. comsumer = new KafkaConsumer<String, String>(properties);
  34. comsumer.subscribe(Arrays.asList("first"));
  35. while (true) {
  36. ConsumerRecords<String, String> records = comsumer.poll(100);
  37. for (ConsumerRecord<String, String> record : records) {
  38. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  39. }
  40. }
  41. }
  42. @Test
  43. public void manualSyncSubmit() {
  44. properties.put("enable.auto.commit", "false");
  45. comsumer = new KafkaConsumer<String, String>(properties);
  46. comsumer.subscribe(Arrays.asList("first"));
  47. ConsumerRecords<String, String> records = comsumer.poll(100);
  48. for (ConsumerRecord<String, String> record : records) {
  49. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  50. }
  51. comsumer.commitSync();
  52. }
  53. @Test
  54. public void manualAsyncSubmit() {
  55. properties.put("enable.auto.commit", "false");
  56. comsumer = new KafkaConsumer<String, String>(properties);
  57. comsumer.subscribe(Arrays.asList("first"));
  58. ConsumerRecords<String, String> records = comsumer.poll(100);
  59. for (ConsumerRecord<String, String> record : records) {
  60. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  61. }
  62. comsumer.commitAsync(new OffsetCommitCallback() {
  63. @Override
  64. public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
  65. if (e != null) {
  66. System.out.println("Commit fail for " + map);
  67. }
  68. }
  69. });
  70. }
  71. @After
  72. public void close() {
  73. comsumer.close();
  74. }
  75. }