1.项目地址

https://github.com/GuardFTC/Kafka-test.git

2.创建Topic

如图,创建一个名为multi-thread-topic的Topic,并指定分区为1
image.png

3.创建线程池配置

结合Spring对于线程池的支持,对线程池进行配置
Spring中线程池的任务队列,取决于对于队列容量的配置,当配置>0时,使用LinkedBlackingQueue,当配置=0时,使用SynchronousQueue

  1. import org.springframework.context.annotation.Bean;
  2. import org.springframework.context.annotation.Configuration;
  3. import org.springframework.scheduling.annotation.EnableAsync;
  4. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  5. import java.util.concurrent.Executor;
  6. import java.util.concurrent.ThreadPoolExecutor;
  7. /**
  8. * @author: 冯铁城 [17615007230@163.com]
  9. * @date: 2022-09-13 14:04:56
  10. * @describe: 线程池配置
  11. */
  12. @Configuration
  13. @EnableAsync
  14. public class ExecutorConfig {
  15. @Bean("kafkaListenerExecutor")
  16. public Executor saveDbExecutor() {
  17. //1.定义线程池
  18. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  19. //2.定义核心线程数
  20. int cpuCount = Runtime.getRuntime().availableProcessors();
  21. executor.setCorePoolSize(cpuCount);
  22. //3.最大线程数
  23. executor.setMaxPoolSize(cpuCount << 1);
  24. //4.设置额外线程存活时间
  25. executor.setKeepAliveSeconds(60);
  26. //5.队列大小
  27. executor.setQueueCapacity(1024);
  28. //6.线程池中的线程名前缀
  29. executor.setThreadNamePrefix("kafka-listener-");
  30. //7.拒绝策略:异常抛出策略
  31. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
  32. //8.初始化线程池
  33. executor.initialize();
  34. //9.返回线程池
  35. return executor;
  36. }
  37. }

4.创建线程池任务

使用@Async标签,进行异步方法的配置操作

  1. import lombok.SneakyThrows;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.springframework.scheduling.annotation.Async;
  4. import org.springframework.stereotype.Component;
  5. import java.util.concurrent.TimeUnit;
  6. /**
  7. * @author: 冯铁城 [17615007230@163.com]
  8. * @date: 2022-09-13 14:11:38
  9. * @describe: 多线程消费者逻辑处理类
  10. */
  11. @Component
  12. @Async("kafkaListenerExecutor")
  13. public class MultiThreadListenerService {
  14. /**
  15. * 处理业务逻辑
  16. *
  17. * @param record 消息
  18. */
  19. @SneakyThrows(InterruptedException.class)
  20. public void doSomeThing(ConsumerRecord<String, String> record) {
  21. //1.模拟业务流程耗时
  22. TimeUnit.MILLISECONDS.sleep(1000);
  23. //2.控制台打印
  24. System.out.println(Thread.currentThread().getName() + "线程消费者_监听获取数据:" + record.value());
  25. }
  26. }

5.创建异步无返回值任务异常捕捉器

  1. import cn.hutool.core.exceptions.ExceptionUtil;
  2. import cn.hutool.log.StaticLog;
  3. import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.scheduling.annotation.AsyncConfigurer;
  6. import java.lang.reflect.Method;
  7. /**
  8. * @author: 冯铁城 [17615007230@163.com]
  9. * @date: 2022-09-13 14:33:13
  10. * @describe: 异步无返回值方法-异常处理
  11. */
  12. @Configuration
  13. public class AsyncExceptionConfig implements AsyncConfigurer {
  14. @Override
  15. public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
  16. return new SpringAsyncExceptionHandler();
  17. }
  18. static class SpringAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
  19. @Override
  20. public void handleUncaughtException(Throwable throwable, Method method, Object... objects) {
  21. StaticLog.error("[异步方法:[{}]出现异常:{}]", method.getName(), ExceptionUtil.getMessage(throwable.getCause()));
  22. }
  23. }
  24. }

6.编写发送结果处理类

  1. import cn.hutool.log.StaticLog;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import org.springframework.kafka.core.KafkaProducerException;
  4. import org.springframework.kafka.core.KafkaSendCallback;
  5. import org.springframework.kafka.support.SendResult;
  6. import org.springframework.stereotype.Component;
  7. /**
  8. * @author: 冯铁城 [17615007230@163.com]
  9. * @date: 2022-09-07 16:13:57
  10. * @describe: Kafka发布消息回调处理逻辑实现类
  11. */
  12. @Component
  13. public class KafkaSendCallBackForString implements KafkaSendCallback<String, String> {
  14. @Override
  15. public void onSuccess(SendResult<String, String> result) {
  16. //1.获取消息属性
  17. ProducerRecord<String, String> producerRecord = result.getProducerRecord();
  18. String topic = producerRecord.topic();
  19. Integer partition = producerRecord.partition();
  20. String key = producerRecord.key();
  21. String value = producerRecord.value();
  22. //2.打印日志
  23. StaticLog.info(LogTemplateEnum.KAFKA_SEND_SUCCESS_LOG.getTemplate(), topic, partition, key, value);
  24. }
  25. @Override
  26. public void onFailure(KafkaProducerException e) {
  27. //1.获取消息属性
  28. ProducerRecord<String, String> failedProducerRecord = e.getFailedProducerRecord();
  29. String topic = failedProducerRecord.topic();
  30. Integer partition = failedProducerRecord.partition();
  31. String key = failedProducerRecord.key();
  32. String value = failedProducerRecord.value();
  33. //2.打印日志
  34. StaticLog.error(LogTemplateEnum.KAFKA_SEND_ERROR_LOG.getTemplate(), topic, partition, key, value);
  35. //3.异常堆栈信息输出
  36. e.printStackTrace();
  37. //4.TODO 可进行自定义的异常逻辑,比如重新发送消息等操作
  38. }
  39. }
  1. import lombok.AllArgsConstructor;
  2. import lombok.Getter;
  3. /**
  4. * @author: 冯铁城 [17615007230@163.com]
  5. * @date: 2022-09-07 16:24:20
  6. * @describe: 日志模板枚举
  7. */
  8. @Getter
  9. @AllArgsConstructor
  10. public enum LogTemplateEnum {
  11. /**
  12. * Kafka消息发送成功日志模板
  13. */
  14. KAFKA_SEND_SUCCESS_LOG("Kafka producer send success! topic:{} partition:{} key:{} value:{}"),
  15. /**
  16. * Kafka消息发送失败日志模板
  17. */
  18. KAFKA_SEND_ERROR_LOG("Kafka producer send error! topic:{} partition:{} key:{} value:{}"),
  19. ;
  20. private final String template;
  21. }

7.创建多线程消费者

  1. import com.ftc.multithread.service.MultiThreadListenerService;
  2. import lombok.RequiredArgsConstructor;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.springframework.kafka.annotation.KafkaListener;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @author: 冯铁城 [17615007230@163.com]
  8. * @date: 2022-09-08 16:06:48
  9. * @describe: 多线程消费者数据源监听
  10. */
  11. @Component
  12. @RequiredArgsConstructor
  13. public class Consumer {
  14. private final MultiThreadListenerService multiThreadListenerService;
  15. @KafkaListener(topics = {"multi-thread-topic"}, groupId = "multi-thread-group")
  16. public void listener(ConsumerRecord<String, String> record) {
  17. multiThreadListenerService.doSomeThing(record);
  18. }
  19. }

8.测试

发送消息

测试,连续发送100条消息

  1. import com.ftc.multithread.config.KafkaSendCallBackForString;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.boot.test.context.SpringBootTest;
  5. import org.springframework.kafka.core.KafkaTemplate;
  6. @SpringBootTest
  7. class ProducerTests {
  8. @Autowired
  9. private KafkaTemplate<String, String> kafkaTemplate;
  10. @Autowired
  11. private KafkaSendCallBackForString kafkaSendCallBackForString;
  12. @Test
  13. void sendMessage() {
  14. //1.循环发送消息
  15. for (int i = 1; i <= 100; i++) {
  16. kafkaTemplate.send("multi-thread-topic", i + "")
  17. .addCallback(kafkaSendCallBackForString);
  18. }
  19. }
  20. }

接收消息

如图,消息通过不同的线程消费成功
image.png

9.总结

偏移量的提交

上述多线程消费者,是通过消费者与处理逻辑分开线程进行处理的。并且消费者偏移量的提交机制,采用的是自动提交。
在实际的项目中,有可能遇到一个消息处理耗时超过了自动提交偏移量的频率的情况。这种时候我们可以采用手动提交偏移量的情况。当然,手动提交偏移量,处理的难度,是要远远高于自动提交偏移量的,这个后续遇到的话,我在整理到笔记里面吧

消息的乱序问题

在控制台打印我们可以看到,实际上我们发送消息的顺序是从1~100,但是实际上消息的消费过程是无序的。这就是一个典型的消费者端多线程引起的Kafka乱序问题
关于消息乱序的问题,我们放在下一章笔记处理