拦截器
- 生产者和消费者两端有一个相同的参数,名字叫 interceptor.classes,它指定的是一组类的列表,每个类就是特定逻辑的拦截器实现类
- 指定拦截器类时要指定它们的全限定名
```java
Properties props = new Properties();
List
interceptors = new ArrayList<>(); interceptors.add(“全类名”); // 拦截器 props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
<a name="pNATS"></a>
### 生产端
- 拦截器实现类要继承` org.apache.kafka.clients.producer.ProducerInterceptor` 接口
```java
public interface ProducerInterceptor<K, V> extends Configurable {
// 该方法会在消息发送之前被调用
ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1);
/*
该方法会在消息成功提交或发送失败之后被调用。
1. onAcknowledgement 的调用要早于 callback 的调用。
2. 这个方法和 onSend 不是在同一个线程中被调用的,要保证线程安全
3. 该方法处在 Producer 发送的主路径中,所以最好别放一些太重的逻辑进去,
否则 Producer TPS 直线下降。
*/
void onAcknowledgement(RecordMetadata var1, Exception var2);
void close();
}
消费端
实现
org.apache.kafka.clients.consumer.ConsumerInterceptor
接口 ```java public interface ConsumerInterceptorextends Configurable, AutoCloseable { // 该方法在消息返回给 Consumer 程序之前调用 ConsumerRecords onConsume(ConsumerRecords var1); // Consumer 在提交位移之后调用该方法。 // 通常你可以在该方法中做一些记账类的动作,比如打日志等。 void onCommit(Map
var1); void close(); }
```