1. Kafka API
2. Producer API
Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
2.1. 异步发送API
导入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
api使用
package com.atguigu.producer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class Producer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//实例化kafka集群
Properties properties = new Properties();
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //key的序列化类
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value的序列化类
properties.setProperty("acks", "all"); //ack级别
properties.setProperty("bootstrap.servers", "hadoop102:9092");
properties.setProperty("buffer.memory", "33554432");//RecordAccumulator缓冲区大小
properties.setProperty("retries", "1"); //重试次数
properties.setProperty("batch.size", "16384");//打包大小
properties.setProperty("linger.ms", "1");//等待时间
//当缓冲区大小达到16384时就向broker发送一次 如果没有达到但时间已经等待了1毫秒也会发送
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
//用集群对象发送数据
for (int i = 0; i < 100; i++) {
Future<RecordMetadata> fist = producer.send(
//封装ProducerRecord
new ProducerRecord<>("first", Integer.toString(i), "Value" + i), new Callback() {
//回调函数
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
System.out.println(recordMetadata);
}
}
});
System.out.println("发完了" + i + "条数据");
}
//关闭资源
producer.close();
}
}
回调函数不是必须的 也可以不传递回调函数
回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是RecordMetadata和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
2.2. 同步发送 API
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可。
package com.atguigu.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class Producer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//实例化kafka集群
Properties properties = new Properties();
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //key的序列化类
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value的序列化类
properties.setProperty("acks", "all"); //ack级别
// properties.put(ProducerConfig.ACKS_CONFIG,1); //ProducerConfig封装配置所有key
properties.setProperty("bootstrap.servers", "hadoop102:9092");
properties.setProperty("buffer.memory", "33554432");//RecordAccumulator缓冲区大小
properties.setProperty("retries", "1"); //重试次数
properties.setProperty("batch.size", "16384");//打包大小
properties.setProperty("linger.ms", "1");//等待时间
//当缓冲区大小达到16384时就向broker发送一次 如果没有达到但时间已经等待了1毫秒也会发送
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
// producer.beginTransaction(); //获取事务对象
//用集群对象发送数据
for (int i = 0; i < 100; i++) {
Future<RecordMetadata> fist = producer.send(
//封装ProducerRecord
new ProducerRecord<>("first", Integer.toString(i), "Value" + i), new Callback() {
//回调函数
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
System.out.println(recordMetadata);
}
}
});
RecordMetadata recordMetadata = fist.get(); //直到返回ack后 RecordMetadata 有数据了 才发下一条数据
System.out.println("发完了" + i + "条数据");
}
//关闭资源
producer.close();
}
}
3. Consumer API
3.1. 自动提交offset
读取properties文件
bootstrap.servers=hadoop102:9092
group.id=test
enable.auto.commit=true
auto.commit.interval.ms=1000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
auto.offset.reset=earliest
# 默认为latest从最后一条数据后拉取 earliest从开头拉取
consumer类
package com.atguigu.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) throws IOException, InterruptedException {
//实例化一个Consumer对象
Properties properties = new Properties();
properties.load(Consumer.class.getClassLoader().getResourceAsStream("conusumer1.properties"));
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//接受消息
consumer.subscribe(Collections.singleton("first")); //定义话题
while (true) {
ConsumerRecords<String, String> poll = consumer.poll(2000); //从话题中拉取数据 2000毫秒
if (poll.count() == 0){
Thread.sleep(100);
}
for (ConsumerRecord<String, String> record : poll) {
System.out.println(record);
}
}
//关闭Consumer
// consumer.close();
}
}
3.2. 手动提交offset
虽然自动提交offset十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。
properties文件
bootstrap.servers=hadoop102:9092
group.id=test
enable.auto.commit=flase
#自动提交offset 默认为true 如果自动提交offset由broker来进行保存
auto.commit.interval.ms=1000
#多久提交一次offset
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
auto.offset.reset=earliest
# 默认为latest从最后一条数据后拉取 earliest从开头拉取
auto.commit.interval.ms=5000
#自动提交offset的时间 默认为5000毫秒
consumer类
package com.atguigu.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) throws IOException, InterruptedException {
//实例化一个Consumer对象
Properties properties = new Properties();
properties.load(Consumer.class.getClassLoader().getResourceAsStream("conusumer1.properties"));
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//接受消息
consumer.subscribe(Collections.singleton("first")); //定义话题
while (true) {
ConsumerRecords<String, String> poll = consumer.poll(2000); //从话题中拉取数据 2000毫秒
//ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(2000)); //从话题中拉取数据 2000毫秒
if (poll.count() == 0) {
Thread.sleep(100);
}
for (ConsumerRecord<String, String> record : poll) {
System.out.println(record);
}
// consumer.commitSync(); //手动提交offset 同步提交
consumer.commitAsync(new OffsetCommitCallback() {
//回调函数
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if (e !=null){
System.out.println("Commit failed for " + map);
}
}
}); //手动提交offset 异步提交
}
//关闭Consumer
// consumer.close();
}
}
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。
3.2.1. 数据漏消费和重复消费分析
无论是同步提交还是异步提交offset,都有可能会造成数据的漏消费或者重复消费。先提交offset后消费,有可能造成数据的漏消费;而先消费后提交offset,有可能会造成数据的重复消费。
解决方案: 只有将消费和提交offset进行一个原子绑定才能解决
3.3. 自定义存储offset
package com.atguigu.consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.io.*;
import java.util.*;
/*
自定义保存
*/
public class ConsumerManual {
//用于记录top 分区
private static Map<TopicPartition, Long> offset = new HashMap<>();
private static String file = "d:/offset";
public static void main(String[] args) throws IOException {
//实例化一个Consumer对象
Properties properties = new Properties();
properties.load(Consumer.class.getClassLoader().getResourceAsStream("conusumer1.properties"));
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//订阅话题 拉取消息
consumer.subscribe(Collections.singleton("first"), new ConsumerRebalanceListener() {
//分区分配之前做的操作
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
//提交旧的offset
commit();
}
//分区分配之后做的操作
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
//获取新的offset
readOffset(collection);
for (TopicPartition partition : collection) {
Long os = offset.get(partition);
if (os == null) {
consumer.seek(partition, 0);
} else {
consumer.seek(partition, os);
}
}
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(2000);
//原子绑定
for (ConsumerRecord<String, String> record : records) {
//消费
System.out.println(record);
//消费完后 写入map中
offset.put(
new TopicPartition(record.topic(), record.partition()), record.offset());
}
commit();
}
}
/**
* 从自定义介质读取offset到缓存
*
* @param collection
*/
private static void readOffset(Collection<TopicPartition> collection) {
ObjectInputStream objectInputStream = null;
Map<TopicPartition, Long> temp;
try {
objectInputStream = new ObjectInputStream(new FileInputStream(file));
temp = (Map<TopicPartition, Long>) objectInputStream.readObject();
} catch (Exception e) {
temp = new HashMap<>();
} finally {
if (objectInputStream != null) {
try {
objectInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
//从全部分区offset中读取我们分配到的分区的offset
for (TopicPartition partition : collection) {
offset.put(partition, temp.get(partition));
}
}
/**
* 将缓存中的offset提交到自定义介质中
*/
private static void commit() {
//先从文件中读取旧的所有的offset
ObjectInputStream objectInputStream = null;
Map<TopicPartition, Long> temp;
try {
objectInputStream = new ObjectInputStream(new FileInputStream(file));
temp = (Map<TopicPartition, Long>) objectInputStream.readObject();
} catch (Exception e) {
temp = new HashMap<>();
} finally {
if (objectInputStream != null) {
try {
objectInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
//合并offset
temp.putAll(offset);
//将新的offset写出去
ObjectOutputStream objectOutputStream = null;
try {
objectOutputStream = new ObjectOutputStream(new FileOutputStream(file));
objectOutputStream.writeObject(temp);
} catch (IOException e) {
e.printStackTrace();
} finally {
if (objectInputStream != null) {
try {
objectInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
4. 自定义Interceptor(拦截器)
拦截器实现的接口是ProducerInterceptor
4.1. 使用拦截器统计消息发送成功和失败的数量
package com.atguigu.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
/**
* 统计消息发送成功和失败的数量
*/
public class CountInterceptor implements ProducerInterceptor<String, String> {
private long success = 0;
private long fail = 0;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
return producerRecord;
}
/**
* 收到ACK后做计数
*
* @param recordMetadata
* @param e
*/
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
success++;
} else {
fail++;
}
}
@Override
public void close() {
System.out.println("成功了" + success + "条");
System.out.println("失败了" + fail + "条");
}
@Override
public void configure(Map<String, ?> map) {
}
}
4.2. 使用拦截器 将值改为 自定义前缀 + 时间戳 + 值
package com.atguigu.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class TimeInterceptor implements ProducerInterceptor<String, String> {
//前缀
private String prefix;
/**
* 自定义Record 修改时间戳可以在此方法中修改
*
* @param producerRecord 原始Record
* @return 修改后的Record
*/
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
Long timestamp = producerRecord.timestamp();
//Record只能获取 不能修改 所有我们只重新创建一个Record 并把对应的值赋上去
return new ProducerRecord<String, String>(
producerRecord.topic(),
producerRecord.partition(),
producerRecord.timestamp(),
producerRecord.key(),
prefix + System.currentTimeMillis() + producerRecord.value(),
producerRecord.headers()
);
}
/**
* 收到 ACK以后调用
*
* @param recordMetadata
* @param e
*/
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
}
/**
* 关闭Producer时调用
*/
@Override
public void close() {
}
/**
* 定义拦截器的方法
*
* @param map
*/
@Override
public void configure(Map<String, ?> map) {
//定义前缀
//获取配置文件中配置值
prefix = (String) map.get("prefix");
}
}
4.3. 生产者调用自定义拦截器
package com.atguigu.interceptor;
import org.apache.kafka.clients.producer.*;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class Producer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//实例化kafka集群
Properties properties = new Properties();
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //key的序列化类
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value的序列化类
properties.setProperty("acks", "all"); //ack级别
properties.setProperty("bootstrap.servers", "hadoop102:9092");
properties.setProperty("buffer.memory", "33554432");//RecordAccumulator缓冲区大小
properties.setProperty("retries", "1"); //重试次数
properties.setProperty("batch.size", "16384");//打包大小
properties.setProperty("linger.ms", "1");//等待时间
//自定义拦截器 列表
ArrayList<String> interceptors = new ArrayList<>();
interceptors.add("com.atguigu.interceptor.TimeInterceptor"); //执行顺序为添加顺序
interceptors.add("com.atguigu.interceptor.CountInterceptor"); //值为类引用路径
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); //添加到properties中
//自定义前缀
properties.setProperty("prefix","自定义前缀测试");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
//用集群对象发送数据
for (int i = 0; i < 10; i++) {
Future<RecordMetadata> fist = producer.send(
//封装ProducerRecord
new ProducerRecord<>("first", Integer.toString(i), "Value" + i), new Callback() {
//回调函数
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
System.out.println(recordMetadata);
}
}
});
RecordMetadata recordMetadata = fist.get(); //直到返回ack后 RecordMetadata 有数据了 才发下一条数据
System.out.println("发完了" + i + "条数据");
}
//关闭资源
producer.close();
}
}