JavaSpringKafka

消息监听容器

1、KafkaMessageListenerContainer

由spring提供用于监听以及拉取消息,并将这些消息按指定格式转换后交给由@KafkaListener注解的方法处理,相当于一个消费者;
看看其整体代码结构:
2022-06-09-09-03-51-525023.png

  • 可以发现其入口方法为doStart(),往上追溯到实现了SmartLifecycle接口,很明显,由spring管理其start和stop操作;
  • ListenerConsumer,内部真正拉取消息消费的是这个结构,其 实现了Runable接口,简言之,它就是一个后台线程轮训拉取并处理消息
  • doStart方法中会创建ListenerConsumer并交给线程池处理
  • 以上步骤就开启了消息监听过程

    KafkaMessageListenerContainer#doStart

    1. protected void doStart() {
    2. if (isRunning()) {
    3. return;
    4. }
    5. if (this.clientIdSuffix == null) { // stand-alone container
    6. checkTopics();
    7. }
    8. ContainerProperties containerProperties = getContainerProperties();
    9. checkAckMode(containerProperties);
    10. ......
    11. // 创建ListenerConsumer消费者并放入到线程池中执行
    12. this.listenerConsumer = new ListenerConsumer(listener, listenerType);
    13. setRunning(true);
    14. this.startLatch = new CountDownLatch(1);
    15. this.listenerConsumerFuture = containerProperties
    16. .getConsumerTaskExecutor()
    17. .submitListenable(this.listenerConsumer);
    18. ......
    19. }

    KafkaMessageListenerContainer.ListenerConsumer#run

    1. public void run() { // NOSONAR complexity
    2. .......
    3. Throwable exitThrowable = null;
    4. while (isRunning()) {
    5. try {
    6. // 拉去消息并处理消息
    7. pollAndInvoke();
    8. }
    9. catch (@SuppressWarnings(UNUSED) WakeupException e) {
    10. ......
    11. }
    12. ......
    13. }
    14. wrapUp(exitThrowable);
    15. }

    2、ConcurrentMessageListenerContainer

    并发消息监听,相当于创建消费者;其底层逻辑仍然是通过KafkaMessageListenerContainer实现处理;从实现上看就是在KafkaMessageListenerContainer上做了层包装,有多少的concurrency就创建多个KafkaMessageListenerContainer,也就是concurrency个消费者
    2022-06-09-09-03-51-634004.png

    ConcurrentMessageListenerContainer#doStart

    1. protected void doStart() {
    2. if (!isRunning()) {
    3. checkTopics();
    4. ......
    5. setRunning(true);
    6. for (int i = 0; i < this.concurrency; i++) {
    7. KafkaMessageListenerContainer<K, V> container =
    8. constructContainer(containerProperties, topicPartitions, i);
    9. String beanName = getBeanName();
    10. container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
    11. ......
    12. if (isPaused()) {
    13. container.pause();
    14. }
    15. // 这里调用KafkaMessageListenerContainer启动相关监听方法
    16. container.start();
    17. this.containers.add(container);
    18. }
    19. }
    20. }

    @KafkaListener底层监听原理

    上面已经介绍了KafkaMessageListenerContainer的作用是拉取并处理消息,但还缺少关键的一步,即 如何将业务逻辑与KafkaMessageListenerContainer的处理逻辑联系起来?
    那么这个桥梁就是@KafkaListener注解

  • KafkaListenerAnnotationBeanPostProcessor,从后缀BeanPostProcessor就可以知道这是Spring IOC初始化bean相关的操作,当然这里也是;此类会扫描带@KafkaListener注解的类或者方法,通过 KafkaListenerContainerFactory工厂创建对应的KafkaMessageListenerContainer,并调用start方法启动监听,也就是这样打通了这条路…

    Spring Boot 自动加载kafka相关配置

    1、KafkaAutoConfiguration

  • 自动生成kafka相关配置,比如当缺少这些bean的时候KafkaTemplate、ProducerListener、ConsumerFactory、ProducerFactory等,默认创建bean实例

    2、KafkaAnnotationDrivenConfiguration

  • 主要是针对于spring-kafka提供的注解背后的相关操作,比如 @KafkaListener;

  • 在开启了@EnableKafka注解后,spring会扫描到此配置并创建缺少的bean实例,比如当配置的工厂beanName不是kafkaListenerContainerFactory的时候,就会默认创建一个beanName为kafkaListenerContainerFactory的实例,这也是为什么在springboot中不用定义consumer的相关配置也可以通过@KafkaListener正常的处理消息

    生产配置

    1、单条消息处理

    ```java @Configuration @EnableKafka public class KafkaConfig {

    @Bean KafkaListenerContainerFactory>

    1. kafkaListenerContainerFactory() {
    2. ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
    3. new ConcurrentKafkaListenerContainerFactory<>();
    4. factory.setConsumerFactory(consumerFactory());
    5. factory.setConcurrency(3);
    6. factory.getContainerProperties().setPollTimeout(3000);
    7. return factory;

    }

    @Bean public ConsumerFactory consumerFactory() {

    1. return new DefaultKafkaConsumerFactory<>(consumerConfigs());

    }

    @Bean public Map consumerConfigs() {

    1. Map<String, Object> props = new HashMap<>();
    2. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
    3. ...
    4. return props;

    } }

@KafkaListener(id = “myListener”, topics = “myTopic”, autoStartup = “${listen.auto.start:true}”, concurrency = “${listen.concurrency:3}”) public void listen(String data) { … }

  1. <a name="Esk6D"></a>
  2. #### 2、批量处理
  3. ```java
  4. @Configuration
  5. @EnableKafka
  6. public class KafkaConfig {
  7. @Bean
  8. public KafkaListenerContainerFactory<?, ?> batchFactory() {
  9. ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
  10. new ConcurrentKafkaListenerContainerFactory<>();
  11. factory.setConsumerFactory(consumerFactory());
  12. factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<<
  13. return factory;
  14. }
  15. @Bean
  16. public ConsumerFactory<Integer, String> consumerFactory() {
  17. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  18. }
  19. @Bean
  20. public Map<String, Object> consumerConfigs() {
  21. Map<String, Object> props = new HashMap<>();
  22. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
  23. ...
  24. return props;
  25. }
  26. }
  27. @KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
  28. public void listen(List<String> list) {
  29. ...
  30. }

3、同一个消费组支持单条和批量处理

场景:
生产上最初都采用单条消费模式,随着量的积累,部分topic常常出现消息积压,最开始通过新增消费者实例和分区来提升消费端的能力;一段时间后又开始出现消息积压,由此便从代码层面通过批量消费来提升消费能力。

4、只对部分topic做批量消费处理

简单的说就是需要配置批量消费和单条记录消费(从单条消费逐步向批量消费演进)

  • 假设最开始就是配置的单条消息处理的相关配置,原配置基本不变
  • 然后新配置 批量消息监听KafkaListenerContainerFactory

    1. @Configuration
    2. @EnableKafka
    3. public class KafkaConfig {
    4. @Bean(name = [batchListenerContainerFactory])
    5. public KafkaListenerContainerFactory<?, ?> batchFactory() {
    6. ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
    7. new ConcurrentKafkaListenerContainerFactory<>();
    8. factory.setConsumerFactory(consumerFactory());
    9. // 开启批量处理
    10. factory.setBatchListener(true);
    11. return factory;
    12. }
    13. @Bean(name = [batchConsumerFactory])
    14. public ConsumerFactory<Integer, String> consumerFactory() {
    15. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    16. }
    17. @Bean(name = [batchConsumerConfig])
    18. public Map<String, Object> consumerConfigs() {
    19. Map<String, Object> props = new HashMap<>();
    20. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
    21. ...
    22. return props;
    23. }
    24. }

    注意点:

  • 如果自定义的ContainerFactory其beanName不是kafkaListenerContainerFactory,spring会通过KafkaAnnotationDrivenConfiguration创建新的bean实例,所以需要注意的是最终的@KafkaListener会使用到哪个ContainerFactory

  • 单条或在批量处理的ContainerFactory可以共存,默认会使用beanName为kafkaListenerContainerFactory的bean实例,因此可以为batch container Factory实例指定不同的beanName,并在@KafkaListener使用的时候指定containerFactory即可

    总结

  • spring为了将kafka融入其生态,方便在spring大环境下使用kafka,开发了spring-kafa这一模块,本质上是为了帮助开发者更好的以spring的方式使用kafka

  • @KafkaListener就是这么一个工具,在同一个项目中既可以有单条的消息处理,也可以配置多条的消息处理,稍微改变下配置即可实现,很是方便
  • 当然,@KafkaListener单条或者多条消息处理仍然是spring自行封装处理,与kafka-client客户端的拉取机制无关;比如一次性拉取50条消息,对于单条处理来说就是循环50次处理,而多条消息处理则可以一次性处理50条;本质上来说这套逻辑都是spring处理的,并不是说单条消费就是通过kafka-client一次只拉取一条消息
  • 在使用过程中需要注意spring自动的创建的一些bean实例,当然也可以覆盖其自动创建的实例以满足特定的需求场景

调试及相关源码版本:

  • org.springframework.boot:2.3.3.RELEASE
  • spring-kafka:2.5.4.RELEASE