- 基于发布、订阅模式的消息队列
基础架构

工作流程

- producer 生产 追加到log文件,每条数据有offset
文件存储机制
分片+索引


“.index”文件存储大量的索引信息
“.log”文件存储大量的数据
索引文件中的元数据指向对应数据文件中message的物理偏移地址。
生产者
分区策略

- 有partition,直接partition值
- 没有partition,但有key 取 hash(key)%partition
- 都没有 积攒成batch 发送到随机新的分区
数据可靠性

- 副本同步策略
- 半数以上 2n+1 最坏情况n台故障
- 全部完成同步
- ISR 形成小圈子 小圈子完成同步即可。 长时间未同步 踢出圈子。 选举也是圈子中选
- ack应答
- 0 producer 不等待broker ack
- 1 producer 等待broker ack
- -1 落盘后菜返回ack
- 故障细节
At Least Once + 幂等性= Exactly Once
幂等性:无论producer发送多少重复数据 server只持续化一条
消费者
- 分区分配策略
- roundrobin
- range
- offset维护 —-consumer自己记录自己消费哪个offset
高速读写
- 顺序写磁盘
- 应用Pagecache
- I/O Scheduler小块组装成大块
- I/O Scheduler写操作重新顺序排好
- 充分利用空闲内存
- 消写速度相同,无需磁盘
- 重启进程PageCache可用
- 零复制

ProducerAPI
package com.Kafka;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.junit.After;import org.junit.Before;import org.junit.Test;import java.util.Properties;import java.util.concurrent.ExecutionException;public class producer {Properties properties = null;KafkaProducer<String, String> producer = null;@Beforepublic void init() {properties = new Properties();properties.put("bootstrap.servers", "node01:9092");//kafka集群,broker-listproperties.put("acks", "all");properties.put("retries", 1);//重试次数properties.put("batch.size", 16384);//批次大小properties.put("linger.ms", 1);//等待时间properties.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");producer = new KafkaProducer<String, String>(properties);}/**** 异步* 无回调* 有回调**/@Testpublic void withoutcallback() {for (int i = 0; i < 100; i++) {producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)));}}@Testpublic void withcallback() {for (int i = 0; i < 100; i++) {producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)), new Callback() {//回调函数,该方法会在Producer收到ack时调用,为异步调用@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("success->" + metadata.offset());} else {exception.printStackTrace();}}});}}/**** 同步**/@Testpublic void sync() throws ExecutionException, InterruptedException {for (int i = 0; i < 100; i++) {producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i))).get();}}@Afterpublic void close() {producer.close();}}
ConsumerAPI
package com.Kafka;import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.TopicPartition;import org.junit.After;import org.junit.Before;import org.junit.Test;import java.util.Arrays;import java.util.Map;import java.util.Properties;public class comsumer {Properties properties = null;KafkaConsumer<String, String> comsumer = null;@Beforepublic void init() {properties = new Properties();properties.put("bootstrap.servers", "node01:9092");//kafka集群,broker-listproperties.put("group.id", "test");// properties.put("enable.auto.commit", "true");properties.put("auto.commit.interval.ms", "1000");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");}/**** 自动提交offset* 手动提交offset* 同步提交* 异步提交**/@Testpublic void autoSubmit() {properties.put("enable.auto.commit", "true");comsumer = new KafkaConsumer<String, String>(properties);comsumer.subscribe(Arrays.asList("first"));while (true) {ConsumerRecords<String, String> records = comsumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}@Testpublic void manualSyncSubmit() {properties.put("enable.auto.commit", "false");comsumer = new KafkaConsumer<String, String>(properties);comsumer.subscribe(Arrays.asList("first"));ConsumerRecords<String, String> records = comsumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}comsumer.commitSync();}@Testpublic void manualAsyncSubmit() {properties.put("enable.auto.commit", "false");comsumer = new KafkaConsumer<String, String>(properties);comsumer.subscribe(Arrays.asList("first"));ConsumerRecords<String, String> records = comsumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}comsumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {if (e != null) {System.out.println("Commit fail for " + map);}}});}@Afterpublic void close() {comsumer.close();}}
