1.项目地址

https://github.com/GuardFTC/Kafka-test.git

2.前言

Springboot 多线程消费者内容中,我们了解到,当消费者端基于多线程处理消息时,出现了消息的乱序现象。实际上,为了实现Kafka消息的顺序消费,无论是Producer,Broker,Consumer都需要做出相应的处理
以下实现都是基于Kafka一些常见问题中的理论实现

3.Topic单分区方案

适用场景

项目中只有一类需要被有序处理的消息

Broker

从broker角度,为了实现消息的顺序性,必须要确保的就是一类有序消息存储到同一分区。因为Kafka可以确保一个Partition有序,但是整个Topic无法保证有序性
因此,我们创建Topic时,指定当前Topic只有一个Partition即可

创建Topic

如图,创建一个名为order-message-topic-v1的Topic,并指定分区数=1
image.png

Producer

从Producer角度,需要确保的就是,避免因为异常重试机制,导致的消息重发乱序
因此Producer需开启幂等配置(enable.idempotence: true),yml配置如下

  1. #服务器配置
  2. server:
  3. port: 8086
  4. #spring配置
  5. spring:
  6. kafka:
  7. bootstrap-servers: 120.48.107.224:9092
  8. producer:
  9. acks: -1
  10. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  11. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  12. retries: 3
  13. properties:
  14. enable.idempotence: true

代码示例

如下,循环发送0~99共100条消息,并且发送时无需指定Partition或Key
期望是消费者顺序打印0~99

  1. package com.ftc.ordermessage;
  2. import com.ftc.ordermessage.config.KafkaSendCallBackForString;
  3. import org.junit.jupiter.api.Test;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. import org.springframework.kafka.core.KafkaTemplate;
  7. @SpringBootTest
  8. class ProducerTests {
  9. @Autowired
  10. private KafkaTemplate<String, String> kafkaTemplate;
  11. @Autowired
  12. private KafkaSendCallBackForString kafkaSendCallBackForString;
  13. @Test
  14. void sendOrderMessageV1() {
  15. //1.循环发送消息
  16. for (int i = 0; i < 100; i++) {
  17. //2.发送消息
  18. kafkaTemplate.send("order-message-topic-v1", i + "")
  19. .addCallback(kafkaSendCallBackForString);
  20. }
  21. }
  22. }

Consumer

从Consumer角度出发,避免因为多线程导致的,消息顺序拉取,但是消息没有被顺序执行。
而且因为是一类有序消息,所以也无需采用多线程的方案
消费者组里面的消费者数量可以为多个,也可以为一个,最终通过Rebalance机制,可以保证某个时刻只有一个消费者在消费当前消息

代码示例

  1. import org.apache.kafka.clients.consumer.ConsumerRecord;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.stereotype.Component;
  4. /**
  5. * @author: 冯铁城 [17615007230@163.com]
  6. * @date: 2022-09-13 10:01:26
  7. * @describe: 顺序消息消费者V1
  8. */
  9. @Component
  10. public class OrderMessageConsumerV1 {
  11. @KafkaListener(topics = {"order-message-topic-v1"}, groupId = "${spring.kafka.consumer.group-id}")
  12. public void listenerN1(ConsumerRecord<String, String> record) {
  13. System.out.println("receive message N1:" + record.value());
  14. }
  15. @KafkaListener(topics = {"order-message-topic-v1"}, groupId = "${spring.kafka.consumer.group-id}")
  16. public void listenerN2(ConsumerRecord<String, String> record) {
  17. System.out.println("receive message N2:" + record.value());
  18. }
  19. }

验证

Producer

如图,消息发布成功
image.png

Consumer

如图,本次消息由ConsumerN1处理,因为是单线程处理,所以实现了消息的顺序消费
image.png

4.Topic多分区方案

适用场景

项目中有多类需要被有序处理的消息,为了避免多类消息拥堵在一个Partition中,采用多Partition方案

Broker

该方案下我们创建一个多分区的Topic

创建Topic

如下,创建一个名为order-message-topic-v2的Topic,并指定分区数=3image.png

Producer

多分区时,除了开启Producer幂等,在发送消息时,还需要指定不同类型消息的Key,每种类型的消息对应一个Key,
Kafka会通过Hash(Key)%总分区数来确定该消息要被发布到哪个分区,因此,实际上最终一类有序消息还是会被持久化到一个Partition

代码示例

我们设置2类有序消息,

  1. 0~99的数字消息,key=Integer
  2. a~z的字母消息,key=CHARACTER ```java import com.ftc.ordermessage.config.KafkaSendCallBackForString; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate;

@SpringBootTest class ProducerTests {

  1. @Autowired
  2. private KafkaTemplate<String, String> kafkaTemplate;
  3. @Autowired
  4. private KafkaSendCallBackForString kafkaSendCallBackForString;
  5. @Test
  6. void sendOrderMessageV2() {
  7. //1.发送数字类型消息
  8. for (int i = 0; i < 100; i++) {
  9. //2.发送消息
  10. kafkaTemplate.send("order-message-topic-v2", "Integer", i + "")
  11. .addCallback(kafkaSendCallBackForString);
  12. }
  13. //3.发送字母类型消息
  14. for (int i = 0; i < 26; i++) {
  15. //4.发送消息
  16. kafkaTemplate.send("order-message-topic-v2", "CHARACTER", (char) (97 + i) + "")
  17. .addCallback(kafkaSendCallBackForString);
  18. }
  19. }

}

  1. <a name="dAy7Q"></a>
  2. ### Consumer
  3. 从Consumer角度出发,因为多类消息存储到了一个Topic中,所以Consumer需要用多线程来处理多类消息,并且每一类消息都由一个线程处理,再结合任务队列,即可实现顺序消费消息
  4. <a name="Wi74a"></a>
  5. #### 代码示例
  6. <a name="lzv7K"></a>
  7. ##### 创建3个单例线程
  8. ```java
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import org.springframework.scheduling.annotation.EnableAsync;
  12. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  13. import java.util.concurrent.Executor;
  14. import java.util.concurrent.ThreadPoolExecutor;
  15. /**
  16. * @author: 冯铁城 [17615007230@163.com]
  17. * @date: 2022-09-13 14:04:56
  18. * @describe: 线程池配置
  19. */
  20. @Configuration
  21. @EnableAsync
  22. public class ExecutorConfig {
  23. @Bean("kafkaListenerExecutor-1")
  24. public Executor singleExecutorNumber1() {
  25. return getSingleExecutor(1);
  26. }
  27. @Bean("kafkaListenerExecutor-2")
  28. public Executor singleExecutorNumber2() {
  29. return getSingleExecutor(2);
  30. }
  31. @Bean("kafkaListenerExecutor-3")
  32. public Executor singleExecutorNumber3() {
  33. return getSingleExecutor(3);
  34. }
  35. /**
  36. * 获取单例线程池
  37. *
  38. * @param number 线程池序号
  39. * @return 单例线程池
  40. */
  41. private Executor getSingleExecutor(int number) {
  42. //1.定义线程池
  43. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  44. //2.定义核心线程数
  45. executor.setCorePoolSize(1);
  46. //3.最大线程数
  47. executor.setMaxPoolSize(1);
  48. //4.设置额外线程存活时间
  49. executor.setKeepAliveSeconds(60);
  50. //5.队列大小
  51. executor.setQueueCapacity(1024);
  52. //6.线程池中的线程名前缀
  53. executor.setThreadNamePrefix("kafka-listener-" + number + "-");
  54. //7.拒绝策略:异常抛出策略
  55. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
  56. //8.初始化线程池
  57. executor.initialize();
  58. //9.返回线程池
  59. return executor;
  60. }
  61. }

创建对应3个处理逻辑
  1. import org.apache.kafka.clients.consumer.ConsumerRecord;
  2. import org.springframework.scheduling.annotation.Async;
  3. import org.springframework.stereotype.Component;
  4. /**
  5. * @author: 冯铁城 [17615007230@163.com]
  6. * @date: 2022-09-13 14:11:38
  7. * @describe: 多线程消费者逻辑处理类
  8. */
  9. @Component
  10. public class ListenerService {
  11. @Async("kafkaListenerExecutor-1")
  12. public void doSomeThingNumber1(ConsumerRecord<String, String> record) {
  13. doSomeThing(record);
  14. }
  15. @Async("kafkaListenerExecutor-2")
  16. public void doSomeThingNumber2(ConsumerRecord<String, String> record) {
  17. doSomeThing(record);
  18. }
  19. @Async("kafkaListenerExecutor-3")
  20. public void doSomeThingNumber3(ConsumerRecord<String, String> record) {
  21. doSomeThing(record);
  22. }
  23. /**
  24. * 处理业务逻辑
  25. *
  26. * @param record 消息
  27. */
  28. private void doSomeThing(ConsumerRecord<String, String> record) {
  29. System.out.println(Thread.currentThread().getName() + "线程消费者_监听获取数据:" + record.value());
  30. }
  31. }

Consumer通过取模法将一类消息交给一个线程
  1. import com.ftc.ordermessage.service.ListenerService;
  2. import lombok.RequiredArgsConstructor;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.springframework.kafka.annotation.KafkaListener;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @author: 冯铁城 [17615007230@163.com]
  8. * @date: 2022-09-13 10:01:26
  9. * @describe: 顺序消息消费者V2
  10. */
  11. @Component
  12. @RequiredArgsConstructor
  13. public class OrderMessageConsumerV2 {
  14. private final ListenerService listenerService;
  15. @KafkaListener(topics = {"order-message-topic-v2"}, groupId = "order-message-group-2")
  16. public void listenerN1(ConsumerRecord<String, String> record) {
  17. //1.打印消费者信息
  18. System.out.println("receive message N1");
  19. //2.消费消息
  20. consumerMessage(record);
  21. }
  22. @KafkaListener(topics = {"order-message-topic-v2"}, groupId = "order-message-group-2")
  23. public void listenerN2(ConsumerRecord<String, String> record) {
  24. //1.打印消费者信息
  25. System.out.println("receive message N2");
  26. //2.消费消息
  27. consumerMessage(record);
  28. }
  29. @KafkaListener(topics = {"order-message-topic-v2"}, groupId = "order-message-group-2")
  30. public void listenerN3(ConsumerRecord<String, String> record) {
  31. //1.打印消费者信息
  32. System.out.println("receive message N3");
  33. //2.消费消息
  34. consumerMessage(record);
  35. }
  36. /**
  37. * 消费消息
  38. *
  39. * @param record 消息
  40. */
  41. private void consumerMessage(ConsumerRecord<String, String> record) {
  42. //1.获取消息Key
  43. String key = record.key();
  44. //2.hash(Key)与总线程数取模
  45. int number = key.hashCode() % 3;
  46. //3.分配消息
  47. if (0 == number) {
  48. listenerService.doSomeThingNumber1(record);
  49. } else if (1 == number) {
  50. listenerService.doSomeThingNumber2(record);
  51. } else {
  52. listenerService.doSomeThingNumber3(record);
  53. }
  54. }
  55. }

验证

Producer

如下,消息发送成功
image.png

Consumer

如下,数字类以及字母类消息均被有序消费
image.png
image.png

5.总结

综上所述,无论是多分区还是单分区,多线程还是单线程,Kafka确保消息顺序消费的根本原则就是:

  1. Producer开启幂等,并从业务逻辑上确保不会发送乱序的消息
  2. Broker确保一类有序消息全部持久化到一个分区
  3. Consumer确保一个线程处理一类有序消息