SpringBoot & Kafka
添加依赖
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.4.0.RELEASE</version></dependency>
添加配置
vi application.properties
内容如下:
# kafka configspring.kafka.producer.bootstrap-servers=127.0.0.1:9092
消息发送
@ Autowiredprivete KafkaTemplate template;private static final String topic = "topicName";/** 发送消息 **/@GetMapping("/send/{input}")public String sendToKafka(@PathVariable String input){this.template.send(topic,input);return "send success";}
消息接收
@KafkaListener(id="",topic = "topicName",groupId = "groupName")public void listener(String input){System.out.println(input);}
事务处理
添加配置
vi application.properties
内容如下:
# 事务id要唯一spring.kafka.producer.transaction-id-prefix = kafka_tx
注解方式使用事务
@Autowiredprivete KafkaTemplate template;private static final String topic = "topicName";/** 发送消息 **/@GetMapping("/send/{input}")@Transactional(rollbackFor = RuntimeException.class)public String sendToKafka(@PathVariable String input){this.template.send(topic,input);return "send success";}
executeInTransaction方式使用事务
@Autowiredprivete KafkaTemplate template;private static final String topic = "topicName";/** 发送消息 **/@GetMapping("/send/{input}")public String sendToKafka(@PathVariable String input){this.template.send(topic,input);template.executeInTransaction(t->t.send(topic,input);if("error".queals(input)){throw new RuntimeException("is error");}t.send(topic,input+"anthor");return true;);return "send success";}
Producer API
发送流程
异步发送
Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<String, String>("test", "polaris:"+Integer.toString(i)));}
同步发送
Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<String, String>("test", "polaris:"+Integer.toString(i))).get();}
Consumer API
自动提交offset
offset完成提交,在延时时间内,如果消费者进程出现异常,则可能丢失消息。若消费速度快,offset提交慢,在延时时间内,消费者进程出现异常,则下次重启会获取到之前已经处理过的数据,造成重复问题。
// 自动提交offsetprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 自动提交延时props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// ....while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}
手动提交offset
处理完消息后,如果提交offset异常,则下次重启会获取到之前已经处理过的数据,造成重复问题。
(1)异步提交。
// 关闭自动提交offsetprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// ....while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}//异步提交consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {System.err.println("Commit failed for" + offsets);}}});}
(2)同步提交。
// 关闭自动提交offsetprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// ....while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}consumer.commitSync();}
自定义存储offset
无论是同步提交还是异步提交offset,都有可能会造成数据的漏消费或者重复消费。先提交offset后消费,有可能造成数据的漏消费;而先消费后提交offset,有可能会造成数据的重复消费。Kafka 0.9版本之前,offset存储在zookeeper,0.9版本及之后,默认将offset存储在Kafka的一个内置的topic中。除此之外,Kafka还可以选择自定义存储offset。
要实现自定义存储offset,需要借助ConsumerRebalanceListener,以下为示例代码,其中提交和获取offset的方法,需要根据所选的offset存储系统自行实现。
import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.TopicPartition;import java.util.*;public class CustomConsumer {private static Map<TopicPartition, Long> currentOffset = new HashMap<>();public static void main(String[] args) {//创建配置信息Properties props = new Properties();//Kafka集群props.put("bootstrap.servers", "192.168.0.101:9092,192.168.0.102:9092,192.168.0.103:9092");//消费者组,只要group.id相同,就属于同一个消费者组props.put("group.id", "test");//关闭自动提交offsetprops.put("enable.auto.commit", "false");//Key和Value的反序列化类props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//创建一个消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//消费者订阅主题consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {//该方法会在Rebalance之前调用@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {commitOffset(currentOffset);}//该方法会在Rebalance之后调用@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {currentOffset.clear();for (TopicPartition partition : partitions) {consumer.seek(partition, getOffset(partition));//定位到最近提交的offset位置继续消费}}});while (true) {ConsumerRecords<String, String> records = consumer.poll(100);//消费者拉取数据for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());}commitOffset(currentOffset);//异步提交}}//获取某分区的最新offsetprivate static long getOffset(TopicPartition partition) {return 0;}//提交该消费者所有分区的offsetprivate static void commitOffset(Map<TopicPartition, Long> currentOffset) {}}
参考:https://blog.csdn.net/qq_38704184/article/details/103200513
Interceptor API
Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。
对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor。
/*** 自定义生产者* @description* 拦截器应用<br/>* 实现一个简单的双interceptor组成的拦截链。<br/>* 第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;<br/>* 第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数。*/public class InterceptorProducer {public static void main(String[] args) throws Exception {// ...List<String> interceptors = new ArrayList<>();interceptors.add("com.lonton.t8.bigdata.demo.kafka.interceptor.TimeInterceptor");interceptors.add("com.lonton.t8.bigdata.demo.kafka.interceptor.CounterInterceptor");props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);// ....// 一定要关闭producer,这样才会调用interceptor的close方法producer.close();}}
public class TimeInterceptor implements ProducerInterceptor<String, String> {@Overridepublic void configure(Map<String, ?> configs) {}@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {// 创建一个新的record,把时间戳写入消息体的最前部return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),System.currentTimeMillis() + "," + record.value().toString());}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {}}
public class CounterInterceptor implements ProducerInterceptor<String, String> {private int errorCounter = 0;private int successCounter = 0;@Overridepublic void configure(Map<String, ?> configs) {}@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return record;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {// 统计成功和失败的次数if (exception == null) {successCounter++;} else {errorCounter++;}}@Overridepublic void close() {// 保存结果System.out.println("Successful sent: " + successCounter);System.out.println("Failed sent: " + errorCounter);}}
Connector API
Connect是Kafka的一部分,是一种用于在Kafka和其他系统之间可扩展的、可靠的流式传输数据的工具。
它为在Kafka和外部数据存储系统之间移动数据提供了一种可靠且可伸缩的方案。
它使得能够快速定义将大量数据集合移入和移出Kafka的连接器变得简单。 Kafka Connect可以获取整个数
据库或从所有应用程序服务器收集指标到Kafka主题,使数据可用于低延迟的流处理。导出作业可以将数据从
Kafka topic传输到二次存储和查询系统,或者传递到批处理系统以进行离线分析。
Streams API
Kafka Streams。Apache Kafka开源项目的一个组成部分。是一个功能强大,易于使用的库。用于在Kafka上构建高可分布式、拓展性,容错的应用程序。
参考
OrcHome:Kafka Streams开发者指南
https://www.orchome.com/335
