1.项目地址
https://github.com/GuardFTC/Kafka-test.git
2.前言
在Springboot 多线程消费者内容中,我们了解到,当消费者端基于多线程处理消息时,出现了消息的乱序现象。实际上,为了实现Kafka消息的顺序消费,无论是Producer,Broker,Consumer都需要做出相应的处理
以下实现都是基于Kafka一些常见问题中的理论实现
3.Topic单分区方案
适用场景
Broker
从broker角度,为了实现消息的顺序性,必须要确保的就是一类有序消息存储到同一分区。因为Kafka可以确保一个Partition有序,但是整个Topic无法保证有序性
因此,我们创建Topic时,指定当前Topic只有一个Partition即可
创建Topic
如图,创建一个名为order-message-topic-v1的Topic,并指定分区数=1
Producer
从Producer角度,需要确保的就是,避免因为异常重试机制,导致的消息重发乱序
因此Producer需开启幂等配置(enable.idempotence: true),yml配置如下
#服务器配置
server:
port: 8086
#spring配置
spring:
kafka:
bootstrap-servers: 120.48.107.224:9092
producer:
acks: -1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 3
properties:
enable.idempotence: true
代码示例
如下,循环发送0~99共100条消息,并且发送时无需指定Partition或Key
期望是消费者顺序打印0~99
package com.ftc.ordermessage;
import com.ftc.ordermessage.config.KafkaSendCallBackForString;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
@SpringBootTest
class ProducerTests {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaSendCallBackForString kafkaSendCallBackForString;
@Test
void sendOrderMessageV1() {
//1.循环发送消息
for (int i = 0; i < 100; i++) {
//2.发送消息
kafkaTemplate.send("order-message-topic-v1", i + "")
.addCallback(kafkaSendCallBackForString);
}
}
}
Consumer
从Consumer角度出发,避免因为多线程导致的,消息顺序拉取,但是消息没有被顺序执行。
而且因为是一类有序消息,所以也无需采用多线程的方案
消费者组里面的消费者数量可以为多个,也可以为一个,最终通过Rebalance机制,可以保证某个时刻只有一个消费者在消费当前消息
代码示例
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-13 10:01:26
* @describe: 顺序消息消费者V1
*/
@Component
public class OrderMessageConsumerV1 {
@KafkaListener(topics = {"order-message-topic-v1"}, groupId = "${spring.kafka.consumer.group-id}")
public void listenerN1(ConsumerRecord<String, String> record) {
System.out.println("receive message N1:" + record.value());
}
@KafkaListener(topics = {"order-message-topic-v1"}, groupId = "${spring.kafka.consumer.group-id}")
public void listenerN2(ConsumerRecord<String, String> record) {
System.out.println("receive message N2:" + record.value());
}
}
验证
Producer
Consumer
如图,本次消息由ConsumerN1处理,因为是单线程处理,所以实现了消息的顺序消费
4.Topic多分区方案
适用场景
项目中有多类需要被有序处理的消息,为了避免多类消息拥堵在一个Partition中,采用多Partition方案
Broker
创建Topic
如下,创建一个名为order-message-topic-v2的Topic,并指定分区数=3
Producer
多分区时,除了开启Producer幂等,在发送消息时,还需要指定不同类型消息的Key,每种类型的消息对应一个Key,
Kafka会通过Hash(Key)%总分区数来确定该消息要被发布到哪个分区,因此,实际上最终一类有序消息还是会被持久化到一个Partition
代码示例
我们设置2类有序消息,
- 0~99的数字消息,key=Integer
- a~z的字母消息,key=CHARACTER ```java import com.ftc.ordermessage.config.KafkaSendCallBackForString; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate;
@SpringBootTest class ProducerTests {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaSendCallBackForString kafkaSendCallBackForString;
@Test
void sendOrderMessageV2() {
//1.发送数字类型消息
for (int i = 0; i < 100; i++) {
//2.发送消息
kafkaTemplate.send("order-message-topic-v2", "Integer", i + "")
.addCallback(kafkaSendCallBackForString);
}
//3.发送字母类型消息
for (int i = 0; i < 26; i++) {
//4.发送消息
kafkaTemplate.send("order-message-topic-v2", "CHARACTER", (char) (97 + i) + "")
.addCallback(kafkaSendCallBackForString);
}
}
}
<a name="dAy7Q"></a>
### Consumer
从Consumer角度出发,因为多类消息存储到了一个Topic中,所以Consumer需要用多线程来处理多类消息,并且每一类消息都由一个线程处理,再结合任务队列,即可实现顺序消费消息
<a name="Wi74a"></a>
#### 代码示例
<a name="lzv7K"></a>
##### 创建3个单例线程
```java
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-13 14:04:56
* @describe: 线程池配置
*/
@Configuration
@EnableAsync
public class ExecutorConfig {
@Bean("kafkaListenerExecutor-1")
public Executor singleExecutorNumber1() {
return getSingleExecutor(1);
}
@Bean("kafkaListenerExecutor-2")
public Executor singleExecutorNumber2() {
return getSingleExecutor(2);
}
@Bean("kafkaListenerExecutor-3")
public Executor singleExecutorNumber3() {
return getSingleExecutor(3);
}
/**
* 获取单例线程池
*
* @param number 线程池序号
* @return 单例线程池
*/
private Executor getSingleExecutor(int number) {
//1.定义线程池
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//2.定义核心线程数
executor.setCorePoolSize(1);
//3.最大线程数
executor.setMaxPoolSize(1);
//4.设置额外线程存活时间
executor.setKeepAliveSeconds(60);
//5.队列大小
executor.setQueueCapacity(1024);
//6.线程池中的线程名前缀
executor.setThreadNamePrefix("kafka-listener-" + number + "-");
//7.拒绝策略:异常抛出策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
//8.初始化线程池
executor.initialize();
//9.返回线程池
return executor;
}
}
创建对应3个处理逻辑
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-13 14:11:38
* @describe: 多线程消费者逻辑处理类
*/
@Component
public class ListenerService {
@Async("kafkaListenerExecutor-1")
public void doSomeThingNumber1(ConsumerRecord<String, String> record) {
doSomeThing(record);
}
@Async("kafkaListenerExecutor-2")
public void doSomeThingNumber2(ConsumerRecord<String, String> record) {
doSomeThing(record);
}
@Async("kafkaListenerExecutor-3")
public void doSomeThingNumber3(ConsumerRecord<String, String> record) {
doSomeThing(record);
}
/**
* 处理业务逻辑
*
* @param record 消息
*/
private void doSomeThing(ConsumerRecord<String, String> record) {
System.out.println(Thread.currentThread().getName() + "线程消费者_监听获取数据:" + record.value());
}
}
Consumer通过取模法将一类消息交给一个线程
import com.ftc.ordermessage.service.ListenerService;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-13 10:01:26
* @describe: 顺序消息消费者V2
*/
@Component
@RequiredArgsConstructor
public class OrderMessageConsumerV2 {
private final ListenerService listenerService;
@KafkaListener(topics = {"order-message-topic-v2"}, groupId = "order-message-group-2")
public void listenerN1(ConsumerRecord<String, String> record) {
//1.打印消费者信息
System.out.println("receive message N1");
//2.消费消息
consumerMessage(record);
}
@KafkaListener(topics = {"order-message-topic-v2"}, groupId = "order-message-group-2")
public void listenerN2(ConsumerRecord<String, String> record) {
//1.打印消费者信息
System.out.println("receive message N2");
//2.消费消息
consumerMessage(record);
}
@KafkaListener(topics = {"order-message-topic-v2"}, groupId = "order-message-group-2")
public void listenerN3(ConsumerRecord<String, String> record) {
//1.打印消费者信息
System.out.println("receive message N3");
//2.消费消息
consumerMessage(record);
}
/**
* 消费消息
*
* @param record 消息
*/
private void consumerMessage(ConsumerRecord<String, String> record) {
//1.获取消息Key
String key = record.key();
//2.hash(Key)与总线程数取模
int number = key.hashCode() % 3;
//3.分配消息
if (0 == number) {
listenerService.doSomeThingNumber1(record);
} else if (1 == number) {
listenerService.doSomeThingNumber2(record);
} else {
listenerService.doSomeThingNumber3(record);
}
}
}
验证
Producer
Consumer
5.总结
综上所述,无论是多分区还是单分区,多线程还是单线程,Kafka确保消息顺序消费的根本原则就是:
- Producer开启幂等,并从业务逻辑上确保不会发送乱序的消息
- Broker确保一类有序消息全部持久化到一个分区
- Consumer确保一个线程处理一类有序消息