需求
实现一个简单的双interceptor组成的拦截链。第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数。
增加时间戳拦截器
package com.interceptor;import java.util.Map;import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;public class TimeInterceptor implements ProducerInterceptor<String, String> {@Overridepublic void configure(Map<String, ?> configs) {}@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {// 创建一个新的record,把时间戳写入消息体的最前部return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),System.currentTimeMillis() + "," + record.value().toString());}//收到ack以后调用这个方法@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}//关闭producer时候调用这个方法@Overridepublic void close() {}}
统计发送消息成功和发送失败消息数,并在producer关闭时打印这两个计数器
package com.interceptor;import java.util.Map;import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;public class CounterInterceptor implements ProducerInterceptor<String, String>{private int errorCounter = 0;private int successCounter = 0;@Overridepublic void configure(Map<String, ?> configs) {}@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return record;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {// 统计成功和失败的次数if (exception == null) {successCounter++;} else {errorCounter++;}}@Overridepublic void close() {// 保存结果System.out.println("Successful sent: " + successCounter);System.out.println("Failed sent: " + errorCounter);}}
生产者主程序
package com.interceptor;import java.util.ArrayList;import java.util.List;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;public class InterceptorProducer {public static void main(String[] args) throws Exception {// 1 设置配置信息Properties props = new Properties();props.put("bootstrap.servers", "zjj101:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 2 构建拦截链List<String> interceptors = new ArrayList<>();interceptors.add("com.interceptor.TimeInterceptor");interceptors.add("com.interceptor.CounterInterceptor");props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);String topic = "first";Producer<String, String> producer = new KafkaProducer<>(props);// 3 发送消息for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message" + i);producer.send(record);}// 4 一定要关闭producer,这样才会调用interceptor的close方法producer.close();}}
消费者主程序
package com.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "zjj101:9092");props.put("group.id", "test");//enable.auto.commit:是否开启自动提交offset功能props.put("enable.auto.commit", "true");//auto.commit.interval.ms:自动提交offset的时间间隔props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//订阅 名字为 first的 topic ,可以订阅好几个topicconsumer.subscribe(Arrays.asList("first"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());//输出: offset = 400, key = 0, value = 0}}}}
如何测试
启动一个消费者,然后运行这个生产者 就能看到效果了.
