概念

  • 生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑
  • 消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑
  • 两种拦截器都支持链的方式,可以将一组拦截器串连成一个大的拦截器,Kafka 会按照添加顺序依次执行拦截器逻辑

使用场景

Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景

自定义拦截器步骤

  1. 生产者继承org.apache.kafka.clients.producer.ProducerInterceptor 接口, 实现方法
  • onSend, 该方法会在消息发送之前被调用
  • onAcknowledgement, 该方法会在消息成功提交或发送失败之后被调用
    • onAcknowledgement 的调用要早于 callback
    • 这个方法和 onSend 不是在同一个线程中被调用的,因此如果你在这两个方法中调用了某个共享可变对象,一定要保证线程安全
    • 这个方法处在 Producer 发送的主路径中,所以最好别放一些太重的逻辑进去,否则Producer TPS 直线下降

消费者同理, 继承org.apache.kafka.clients.consumer.ConsumerInterceptor 接口, 实现方法

  • onConsume:该方法在消息返回给 Consumer 程序之前调用
  • onCommit:Consumer 在提交位移之后调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。
  1. 设置方式, 设置interceptor.classes=拦截器的全路径类名, 示例: ```java

Properties props = new Properties(); List interceptors = new ArrayList<>(); interceptors.add(“com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor”); // 拦截器1 interceptors.add(“com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor”); // 拦截器2 props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); …… ```