SpringBoot & Kafka
添加依赖
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.4.0.RELEASE</version>
</dependency>
添加配置
vi application.properties
内容如下:
# kafka config
spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
消息发送
@ Autowired
privete KafkaTemplate template;
private static final String topic = "topicName";
/** 发送消息 **/
@GetMapping("/send/{input}")
public String sendToKafka(@PathVariable String input){
this.template.send(topic,input);
return "send success";
}
消息接收
@KafkaListener(id="",topic = "topicName",groupId = "groupName")
public void listener(String input){
System.out.println(input);
}
事务处理
添加配置
vi application.properties
内容如下:
# 事务id要唯一
spring.kafka.producer.transaction-id-prefix = kafka_tx
注解方式使用事务
@Autowired
privete KafkaTemplate template;
private static final String topic = "topicName";
/** 发送消息 **/
@GetMapping("/send/{input}")
@Transactional(rollbackFor = RuntimeException.class)
public String sendToKafka(@PathVariable String input){
this.template.send(topic,input);
return "send success";
}
executeInTransaction方式使用事务
@Autowired
privete KafkaTemplate template;
private static final String topic = "topicName";
/** 发送消息 **/
@GetMapping("/send/{input}")
public String sendToKafka(@PathVariable String input){
this.template.send(topic,input);
template.executeInTransaction(t->
t.send(topic,input);
if("error".queals(input)){
throw new RuntimeException("is error");
}
t.send(topic,input+"anthor");
return true;
);
return "send success";
}
Producer API
发送流程
异步发送
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>("test", "polaris:"+Integer.toString(i)));
}
同步发送
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>("test", "polaris:"+Integer.toString(i))).get();
}
Consumer API
自动提交offset
offset完成提交,在延时时间内,如果消费者进程出现异常,则可能丢失消息。若消费速度快,offset提交慢,在延时时间内,消费者进程出现异常,则下次重启会获取到之前已经处理过的数据,造成重复问题。
// 自动提交offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 自动提交延时
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// ....
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
手动提交offset
处理完消息后,如果提交offset异常,则下次重启会获取到之前已经处理过的数据,造成重复问题。
(1)异步提交。
// 关闭自动提交offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// ....
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
//异步提交
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.err.println("Commit failed for" + offsets);
}
}
});
}
(2)同步提交。
// 关闭自动提交offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// ....
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
自定义存储offset
无论是同步提交还是异步提交offset,都有可能会造成数据的漏消费或者重复消费。先提交offset后消费,有可能造成数据的漏消费;而先消费后提交offset,有可能会造成数据的重复消费。Kafka 0.9版本之前,offset存储在zookeeper,0.9版本及之后,默认将offset存储在Kafka的一个内置的topic中。除此之外,Kafka还可以选择自定义存储offset。
要实现自定义存储offset,需要借助ConsumerRebalanceListener,以下为示例代码,其中提交和获取offset的方法,需要根据所选的offset存储系统自行实现。
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class CustomConsumer {
private static Map<TopicPartition, Long> currentOffset = new HashMap<>();
public static void main(String[] args) {
//创建配置信息
Properties props = new Properties();
//Kafka集群
props.put("bootstrap.servers", "192.168.0.101:9092,192.168.0.102:9092,192.168.0.103:9092");
//消费者组,只要group.id相同,就属于同一个消费者组
props.put("group.id", "test");
//关闭自动提交offset
props.put("enable.auto.commit", "false");
//Key和Value的反序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//创建一个消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//消费者订阅主题
consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {
//该方法会在Rebalance之前调用
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
commitOffset(currentOffset);
}
//该方法会在Rebalance之后调用
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
currentOffset.clear();
for (TopicPartition partition : partitions) {
consumer.seek(partition, getOffset(partition));//定位到最近提交的offset位置继续消费
}
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);//消费者拉取数据
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());
}
commitOffset(currentOffset);//异步提交
}
}
//获取某分区的最新offset
private static long getOffset(TopicPartition partition) {
return 0;
}
//提交该消费者所有分区的offset
private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
}
}
参考:https://blog.csdn.net/qq_38704184/article/details/103200513
Interceptor API
Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。
对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor。
/**
* 自定义生产者
* @description
* 拦截器应用<br/>
* 实现一个简单的双interceptor组成的拦截链。<br/>
* 第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;<br/>
* 第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数。
*/
public class InterceptorProducer {
public static void main(String[] args) throws Exception {
// ...
List<String> interceptors = new ArrayList<>();
interceptors.add("com.lonton.t8.bigdata.demo.kafka.interceptor.TimeInterceptor");
interceptors.add("com.lonton.t8.bigdata.demo.kafka.interceptor.CounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
// ....
// 一定要关闭producer,这样才会调用interceptor的close方法
producer.close();
}
}
public class TimeInterceptor implements ProducerInterceptor<String, String> {
@Override
public void configure(Map<String, ?> configs) {}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 创建一个新的record,把时间戳写入消息体的最前部
return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
System.currentTimeMillis() + "," + record.value().toString());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}
@Override
public void close() {}
}
public class CounterInterceptor implements ProducerInterceptor<String, String> {
private int errorCounter = 0;
private int successCounter = 0;
@Override
public void configure(Map<String, ?> configs) {}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 统计成功和失败的次数
if (exception == null) {
successCounter++;
} else {
errorCounter++;
}
}
@Override
public void close() {
// 保存结果
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}
}
Connector API
Connect是Kafka的一部分,是一种用于在Kafka和其他系统之间可扩展的、可靠的流式传输数据的工具。
它为在Kafka和外部数据存储系统之间移动数据提供了一种可靠且可伸缩的方案。
它使得能够快速定义将大量数据集合移入和移出Kafka的连接器变得简单。 Kafka Connect可以获取整个数
据库或从所有应用程序服务器收集指标到Kafka主题,使数据可用于低延迟的流处理。导出作业可以将数据从
Kafka topic传输到二次存储和查询系统,或者传递到批处理系统以进行离线分析。
Streams API
Kafka Streams。Apache Kafka开源项目的一个组成部分。是一个功能强大,易于使用的库。用于在Kafka上构建高可分布式、拓展性,容错的应用程序。
参考
OrcHome:Kafka Streams开发者指南
https://www.orchome.com/335