1.项目地址
https://github.com/GuardFTC/Kafka-test.git
2.创建Topic
如图,创建一个名为multi-thread-topic的Topic,并指定分区为1
3.创建线程池配置
结合Spring对于线程池的支持,对线程池进行配置
Spring中线程池的任务队列,取决于对于队列容量的配置,当配置>0时,使用LinkedBlackingQueue,当配置=0时,使用SynchronousQueue
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-13 14:04:56
* @describe: 线程池配置
*/
@Configuration
@EnableAsync
public class ExecutorConfig {
@Bean("kafkaListenerExecutor")
public Executor saveDbExecutor() {
//1.定义线程池
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//2.定义核心线程数
int cpuCount = Runtime.getRuntime().availableProcessors();
executor.setCorePoolSize(cpuCount);
//3.最大线程数
executor.setMaxPoolSize(cpuCount << 1);
//4.设置额外线程存活时间
executor.setKeepAliveSeconds(60);
//5.队列大小
executor.setQueueCapacity(1024);
//6.线程池中的线程名前缀
executor.setThreadNamePrefix("kafka-listener-");
//7.拒绝策略:异常抛出策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
//8.初始化线程池
executor.initialize();
//9.返回线程池
return executor;
}
}
4.创建线程池任务
使用@Async标签,进行异步方法的配置操作
import lombok.SneakyThrows;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-13 14:11:38
* @describe: 多线程消费者逻辑处理类
*/
@Component
@Async("kafkaListenerExecutor")
public class MultiThreadListenerService {
/**
* 处理业务逻辑
*
* @param record 消息
*/
@SneakyThrows(InterruptedException.class)
public void doSomeThing(ConsumerRecord<String, String> record) {
//1.模拟业务流程耗时
TimeUnit.MILLISECONDS.sleep(1000);
//2.控制台打印
System.out.println(Thread.currentThread().getName() + "线程消费者_监听获取数据:" + record.value());
}
}
5.创建异步无返回值任务异常捕捉器
import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.log.StaticLog;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import java.lang.reflect.Method;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-13 14:33:13
* @describe: 异步无返回值方法-异常处理
*/
@Configuration
public class AsyncExceptionConfig implements AsyncConfigurer {
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SpringAsyncExceptionHandler();
}
static class SpringAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable throwable, Method method, Object... objects) {
StaticLog.error("[异步方法:[{}]出现异常:{}]", method.getName(), ExceptionUtil.getMessage(throwable.getCause()));
}
}
}
6.编写发送结果处理类
import cn.hutool.log.StaticLog;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaProducerException;
import org.springframework.kafka.core.KafkaSendCallback;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-07 16:13:57
* @describe: Kafka发布消息回调处理逻辑实现类
*/
@Component
public class KafkaSendCallBackForString implements KafkaSendCallback<String, String> {
@Override
public void onSuccess(SendResult<String, String> result) {
//1.获取消息属性
ProducerRecord<String, String> producerRecord = result.getProducerRecord();
String topic = producerRecord.topic();
Integer partition = producerRecord.partition();
String key = producerRecord.key();
String value = producerRecord.value();
//2.打印日志
StaticLog.info(LogTemplateEnum.KAFKA_SEND_SUCCESS_LOG.getTemplate(), topic, partition, key, value);
}
@Override
public void onFailure(KafkaProducerException e) {
//1.获取消息属性
ProducerRecord<String, String> failedProducerRecord = e.getFailedProducerRecord();
String topic = failedProducerRecord.topic();
Integer partition = failedProducerRecord.partition();
String key = failedProducerRecord.key();
String value = failedProducerRecord.value();
//2.打印日志
StaticLog.error(LogTemplateEnum.KAFKA_SEND_ERROR_LOG.getTemplate(), topic, partition, key, value);
//3.异常堆栈信息输出
e.printStackTrace();
//4.TODO 可进行自定义的异常逻辑,比如重新发送消息等操作
}
}
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-07 16:24:20
* @describe: 日志模板枚举
*/
@Getter
@AllArgsConstructor
public enum LogTemplateEnum {
/**
* Kafka消息发送成功日志模板
*/
KAFKA_SEND_SUCCESS_LOG("Kafka producer send success! topic:{} partition:{} key:{} value:{}"),
/**
* Kafka消息发送失败日志模板
*/
KAFKA_SEND_ERROR_LOG("Kafka producer send error! topic:{} partition:{} key:{} value:{}"),
;
private final String template;
}
7.创建多线程消费者
import com.ftc.multithread.service.MultiThreadListenerService;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-08 16:06:48
* @describe: 多线程消费者数据源监听
*/
@Component
@RequiredArgsConstructor
public class Consumer {
private final MultiThreadListenerService multiThreadListenerService;
@KafkaListener(topics = {"multi-thread-topic"}, groupId = "multi-thread-group")
public void listener(ConsumerRecord<String, String> record) {
multiThreadListenerService.doSomeThing(record);
}
}
8.测试
发送消息
测试,连续发送100条消息
import com.ftc.multithread.config.KafkaSendCallBackForString;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
@SpringBootTest
class ProducerTests {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaSendCallBackForString kafkaSendCallBackForString;
@Test
void sendMessage() {
//1.循环发送消息
for (int i = 1; i <= 100; i++) {
kafkaTemplate.send("multi-thread-topic", i + "")
.addCallback(kafkaSendCallBackForString);
}
}
}
接收消息
9.总结
偏移量的提交
上述多线程消费者,是通过消费者与处理逻辑分开线程进行处理的。并且消费者偏移量的提交机制,采用的是自动提交。
在实际的项目中,有可能遇到一个消息处理耗时超过了自动提交偏移量的频率的情况。这种时候我们可以采用手动提交偏移量的情况。当然,手动提交偏移量,处理的难度,是要远远高于自动提交偏移量的,这个后续遇到的话,我在整理到笔记里面吧
消息的乱序问题
在控制台打印我们可以看到,实际上我们发送消息的顺序是从1~100,但是实际上消息的消费过程是无序的。这就是一个典型的消费者端多线程引起的Kafka乱序问题
关于消息乱序的问题,我们放在下一章笔记处理