1 生产者消息发送流程
1.1 发送原理

将生产者的数据导入集群的流程:
- 在main线程中创建Producer对象,调用send方法发送数据,根据业务需求设置拦截器Interceptors,通过序列化器对数据进行序列化,用分区器规定数据发往哪个分区
- 发送到分区实际上是发送到某个缓存队列里,队列默认32m,每批16k(双端队列)
- sender线程主动拉取数据,拉取数据有两个条件:
- batch.size:数据积累到batch.size之后,sender才会发送数据,默认16k
- linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就发送数据,单位ms,默认值是0ms,表示没有延迟
以上两个条件满足一个就可以发送数据
- 每一个队列一个节点,发送时,如果没有应答,最多缓存5个请求
- 通过Selector打通底层链路,集群收到后进行副本同步,同步完应答,应答有3种级别:
- 0 生产者发送的数据,不需要等数据落盘应答
- 1 生产者发送的数据,Leader收到数据后应答
- -1(all) 生产者发送的数据,Leader和ISR队列里的所有节点收齐数据后应答
- 应答成功,清楚对应请求,同时清理分区数据
- 应答失败,重试
1.2 生产者重要参数列表
| 参数名称 | 描述 | | —- | —- | | bootstrap.servers | 生产者连接集群所需的broker地 址 清 单。例如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置 1 个或者多个,中间用逗号隔开。这里并非需要所有的 broker 地址,因为生产者从给定的 broker 里查找到其他 broker 信息。 | | key.serializer 和 value.serializer | 指定发送消息的 key 和 value 的序列化类型。一定要写全类名。 | | buffer.memory | RecordAccumulator 缓冲区总大小,默认32m。 | | batch.size | 缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 | | linger.ms | 如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。 | | acks | 0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据,Leader 收到数据后应答。
-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和all 是等价的 | | max.in.flight.requests.per.connection | 允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字 | | retries | 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是 int 最大值,2147483647。
如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1
否则在重试此失败消息的时候,其他的消息可能发送成功了 | | retry.backoff.ms | 两次重试之间的时间间隔,默认是 100ms。 | | enable.idempotence | 是否开启幂等性,默认 true,开启幂等性 | | compression.type | 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。
支持压缩类型:none、gzip、snappy、lz4 和 zstd。 |
2 异步发送API
2.1 普通异步发送
创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker
导入依赖
<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency></dependencies>
public class CustomProducer {public static void main(String[] args) {// 0. 配置Properties properties = new Properties();// 连接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");// 指定对应的key和value的序列化类型 下面两种等价// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 1. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 2. 发送数据for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", "hello" + i));}// 3. 关闭资源kafkaProducer.close();}}
2.2 带回调函数的异步发送
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
public class CustomProducerCallback {public static void main(String[] args) {// 0. 配置Properties properties = new Properties();// 连接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");// 指定对应的key和value的序列化类型 下面两种等价// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 1. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 2. 发送数据for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", "hello" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("主题:" + recordMetadata.topic() + " 分区:" + recordMetadata.partition());}}});}// 3. 关闭资源kafkaProducer.close();}}
3 同步发送API
在异步发送的基础上,调用一下get()方法。
public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 0. 配置Properties properties = new Properties();// 连接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");// 指定对应的key和value的序列化类型 下面两种等价// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 1. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 2. 发送数据for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", "hello" + i)).get();}// 3. 关闭资源kafkaProducer.close();}}
4 生产者分区
4.1 分区的好处
- 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
提高并行度,生产者可以以分区为单位发送数据,消费者可以以分区为单位消费数据。
4.2 生产者发送消息的分区策略
默认的分区器DefaultPartitioner


public class CustomProducerCallbackPartitions {public static void main(String[] args) {// 0. 配置Properties properties = new Properties();// 连接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");// 指定对应的key和value的序列化类型 下面两种等价// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 关联自定义分区器properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.example.kafka.producer.MyPartitioner");// 1. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 2. 发送数据for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", "b","pppppp" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("主题:" + recordMetadata.topic() + " 分区:" + recordMetadata.partition());}}});}// 3. 关闭资源kafkaProducer.close();}}
4.3 自定义分区器
- 需求:发送的数据如果包含hello,就发往0号分区,不包含,就发往1号分区
实现步骤
- 定义类实现Partitioner接口
重写partition()方法
public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取数据String msgValues = value.toString();int partition;if (msgValues.contains("hello")) {partition = 0;} else {partition = 1;}return partition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}}
```java public class CustomProducerCallbackPartitions { public static void main(String[] args) { // 0. 配置 Properties properties = new Properties(); // 连接集群 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “hadoop102:9092,hadoop103:9092”);
// 指定对应的key和value的序列化类型 下面两种等价 // properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, “org.apache.kafka.common.serialization.StringSerializer”); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 关联自定义分区器 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, “com.example.kafka.producer.MyPartitioner”); // 1. 创建kafka生产者对象 KafkaProducer
kafkaProducer = new KafkaProducer<>(properties); // 2. 发送数据 for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "b","pppppp" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("主题:" + recordMetadata.topic() + " 分区:" + recordMetadata.partition());}}});
}
// 3. 关闭资源 kafkaProducer.close(); } }
<a name="kQGX6"></a>
# 5 生产者提高吞吐量
- batch.size:批次大小,默认16k
- linger.ms:等待时间,修改为5-100ms
- compression.type:压缩snappy
- RecordAccumulator:缓冲区大小,修改为64m
```java
public class CustomProducerParameters {
public static void main(String[] args) {
// 0. 配置信息
Properties properties = new Properties();
// 连接kafka集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
// 序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 32m
// 批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// linger.ms
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 压缩
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
// 1. 创建生产者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 2. 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "hello" + i));
}
// 3. 关闭资源
kafkaProducer.close();
}
}
6 数据可靠性
- ack应答原理
- 0

- 1

- -1(all)

- Leader收到数据,所有Follower都开始同步数据,但有一个Follower,因为某种故障,迟迟不能与Leader进行同步问题
Leader维护了一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。
如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。例如2超时,(leader:0, isr:0,1)。
这样就不用等长期联系不上或者已经故障的节点
- 数据可靠性分析
如果分区副本设置为1个,或 者ISR里应答的最小副本数量( min.insync.replicas 默认为1)设置为1,和ack=1的效果是一样的,仍然有丢数的风险(leader:0,isr:0)。
- 数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
- 数据重复分析

public class CustomProducerAcks {
public static void main(String[] args) {
// 0. 配置
Properties properties = new Properties();
// 连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
// 指定对应的key和value的序列化类型 下面两种等价
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// acks
properties.put(ProducerConfig.ACKS_CONFIG, "1");
// 重试次数retries,默认是int最大值 2147483647
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
// 1. 创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 2. 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "hello" + i));
}
// 3. 关闭资源
kafkaProducer.close();
}
}
7 数据去重
7.1 数据传递语义
- 至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
- 最多一次(At Most Once)= ACK级别设置为0
- 总结:
At Least Once可以保证数据不丢失,但是不能保证数据不重复;
At Most Once可以保证数据不重复,但是不能保证数据不丢失。
- 幂等性原理
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。
重复数据的判断标准:具有
所以幂等性只能保证的是在单分区单会话内不重复。

Kafka的事务一共有5个API
// 1 初始化事务 void initTransactions(); // 2 开启事务 void beginTransaction() throws ProducerFencedException; // 3 在事务内提交已经消费的偏移量(主要用于消费者) void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException; // 4 提交事务 void commitTransaction() throws ProducerFencedException; // 5 放弃事务(类似于回滚事务的操作) void abortTransaction() throws ProducerFencedException;public class CustomProducerTransactions { public static void main(String[] args) { // 0. 配置 Properties properties = new Properties(); // 连接集群 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092"); // 指定对应的key和value的序列化类型 下面两种等价 // properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 指定事务id properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_01"); // 1. 创建kafka生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); kafkaProducer.initTransactions(); kafkaProducer.beginTransaction(); try { // 2. 发送数据 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("first", "hello" + i)); } kafkaProducer.commitTransaction(); } catch (Exception e) { kafkaProducer.abortTransaction(); }finally { // 3. 关闭资源 kafkaProducer.close(); } } }8 数据有序
9 数据乱序
1)kafka在1.x版本之前保证数据单分区有序,条件如下:
max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)。
2)kafka在1.x及以后版本保证数据单分区有序,条件如下:
(1)未开启幂等性
max.in.flight.requests.per.connection需要设置为1。
(2)开启幂等性
max.in.flight.requests.per.connection需要设置小于等于5。
原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的。
