拦截器

  • 生产者和消费者两端有一个相同的参数,名字叫 interceptor.classes,它指定的是一组类的列表,每个类就是特定逻辑的拦截器实现类
  • 指定拦截器类时要指定它们的全限定名 ```java Properties props = new Properties(); List interceptors = new ArrayList<>(); interceptors.add(“全类名”); // 拦截器 props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
  1. <a name="pNATS"></a>
  2. ### 生产端
  3. - 拦截器实现类要继承` org.apache.kafka.clients.producer.ProducerInterceptor` 接口
  4. ```java
  5. public interface ProducerInterceptor<K, V> extends Configurable {
  6. // 该方法会在消息发送之前被调用
  7. ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1);
  8. /*
  9. 该方法会在消息成功提交或发送失败之后被调用。
  10. 1. onAcknowledgement 的调用要早于 callback 的调用。
  11. 2. 这个方法和 onSend 不是在同一个线程中被调用的,要保证线程安全
  12. 3. 该方法处在 Producer 发送的主路径中,所以最好别放一些太重的逻辑进去,
  13. 否则 Producer TPS 直线下降。
  14. */
  15. void onAcknowledgement(RecordMetadata var1, Exception var2);
  16. void close();
  17. }

消费端

  • 实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口 ```java public interface ConsumerInterceptor extends Configurable, AutoCloseable { // 该方法在消息返回给 Consumer 程序之前调用 ConsumerRecords onConsume(ConsumerRecords var1);

    // Consumer 在提交位移之后调用该方法。 // 通常你可以在该方法中做一些记账类的动作,比如打日志等。 void onCommit(Map var1);

    void close(); }

```