1.什么是Kafka事务

在Kafka0.11版本中,引入了Kafka事务这一概念。
Kafka事务是指,一系列Producer生产消息和Consumer消费消息在一个事务中进行,具备原子性,生产消息和Consumer偏移量同时成功或同时失败

2.Kafka事务支持的场景

  1. Producer生产消息封装在一个事务中,这些消息要么全部发送成功,要么全部发送失败

伪代码如下:

  1. public void sendInTransaction(){
  2. //1.初始化事务
  3. producer.initTransactions();
  4. //2.开启事务
  5. producer.beginTransaction();
  6. //3.发送消息
  7. producer.send("topic", "message");
  8. //4.提交事务
  9. producer.commitTransaction();
  10. }
  1. Producer生产消息与Consumer消费消息封装在一个事务中,通常用于Kafka的流式处理

    3.Kafka的特性

    原子写

    确保同一个事务中的消息要么全部写入成功,要不全部写入失败。保证跨分区写操作的原子性
    这一点同样适用于生产消息与消费消息并提交偏移量在同一个事务中执行
    因为Consumer提交偏移量最终也是会提交到名为consumer_offsets的内置Topic中,本质也是发布消息

    读事务消息

    Consumer如果设置了isolation.level = read_committed,那么它只会读取已经提交了的消息。
    Producer提交事务后,该事务中相关消息的状态会从uncommitted变更为committed,从而实现Consumer可以拉取到已提交的消息

    拒绝僵尸进程

    本质上是基于Producer幂等性
    开启Producer幂等后,每个PID对应每个Topic的每个Partition发送的消息都会携带一个自增的Sequence Number,而Broker对于每个PID的每个Topic的每个Partition同样会维护一个自增的Sequence Number。
    可以通过对比Producer发送消息的Sequence Numer和Broker维护的Sequence Number,来确定消息的丢失,重复,乱序问题。具体可参考:Kafka一些常见问题
    而所有具有相同Transaction-id的Producer都会被分配相同的PID。因此,即使两个相同的Producer同时发布重复的消息,因为PID一样,Broker也只会持久化一条

    4.Kafka事务的原理

    Kafka为了支持事务特性,引入一个新的组件:Transaction Coordinator(TC:事务协调者)。
    主要负责分配PID,记录事务状态等操作
    详情参考:

    5.单数据源-Spring整合Kafka开启事务

    版本

    Kafka从0.11版本开始支持事务,对应Springboot版本应 >=2.1.X

    yml配置

    通过transaction-id-prefix配置开启事务机制 ```yaml

    服务器配置

    server: port: 8089

spring配置

spring: kafka: bootstrap-servers: 120.48.107.224:9092 producer: acks: -1 transaction-id-prefix: transaction-producer- key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 3 properties: enable.idempotence: true retry.backoff.ms: 100 consumer: group-id: transaction-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: earliest enable-auto-commit: true

  1. <a name="Rtako"></a>
  2. ### Producer
  3. 如下,编写两个方法
  4. 1. 第一个方法正常发布消息
  5. 2. 第二个方法发布消息后代码出现运行时异常
  6. 3. Producer通过@Transactional(rollbackFor = RuntimeException.class)注解开启事务
  7. 4. 基于Spring的AOP原理,我们将发送逻辑与实际发送分开写,写在一个类里面Spring无法通过AOP来进行事务的相关控制
  8. ```java
  9. import com.ftc.transactionkafka.config.KafkaSendCallBackForString;
  10. import org.springframework.kafka.core.KafkaTemplate;
  11. import org.springframework.stereotype.Component;
  12. import org.springframework.transaction.annotation.Transactional;
  13. import javax.annotation.Resource;
  14. /**
  15. * @author: 冯铁城 [17615007230@163.com]
  16. * @date: 2022-09-19 18:07:06
  17. * @describe: 生产者
  18. */
  19. @Component
  20. @Transactional(rollbackFor = RuntimeException.class)
  21. public class Producer {
  22. @Resource
  23. private KafkaTemplate<String, String> kafkaTemplate;
  24. @Resource
  25. private KafkaSendCallBackForString kafkaSendCallBackForString;
  26. public void sendMessageNoError() {
  27. kafkaTemplate.send("transaction-topic", "transaction-message")
  28. .addCallback(kafkaSendCallBackForString);
  29. }
  30. public void sendMessageWithError() {
  31. //1.发送消息
  32. kafkaTemplate.send("transaction-topic", "transaction-message")
  33. .addCallback(kafkaSendCallBackForString);
  34. //2.异常
  35. int errorNumber = 1 / 0;
  36. }
  37. }
  1. import com.ftc.transactionkafka.producer.Producer;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.boot.test.context.SpringBootTest;
  5. @SpringBootTest
  6. class TransactionProducerTests {
  7. @Autowired
  8. private Producer producer;
  9. @Test
  10. void sendNoError() {
  11. producer.sendMessageNoError();
  12. }
  13. @Test
  14. void sendWithError() {
  15. producer.sendMessageWithError();
  16. }
  17. }

Consumer

Consumer无特殊代码,正常监听Topic即可

  1. import org.apache.kafka.clients.consumer.ConsumerRecord;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.stereotype.Component;
  4. /**
  5. * @author: 冯铁城 [17615007230@163.com]
  6. * @date: 2022-09-13 10:01:26
  7. * @describe: 消费者demo
  8. */
  9. @Component
  10. public class Consumer {
  11. @KafkaListener(topics = {"transaction-topic"}, groupId = "${spring.kafka.consumer.group-id}")
  12. public void listenerDemo(ConsumerRecord<String, String> record) {
  13. System.out.println("receive message:" + record.value());
  14. }
  15. }

验证事务

开启事务,无异常发布消息

Producer:如下图,消息发布成功
image.png
Consumer:如下图,消息消费成功
image.png

开启事务,有异常发布消息

Producer:因为开启事务,消息发布过程中出现异常,并且异常为事务回滚异常,消息发布失败
image.png
Consumer:如下图,并未接收到任何消息
image.png

关闭事务,有异常发布消息

为了进一步验证事务,我们关闭事务,再试一次
yml配置:如下,关闭事务机制

  1. #服务器配置
  2. server:
  3. port: 8089
  4. #spring配置
  5. spring:
  6. kafka:
  7. bootstrap-servers: 120.48.107.224:9092
  8. producer:
  9. acks: -1
  10. # transaction-id-prefix: transaction-producer-
  11. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  12. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  13. retries: 3
  14. properties:
  15. enable.idempotence: true
  16. retry.backoff.ms: 100
  17. consumer:
  18. group-id: transaction-group
  19. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  20. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  21. auto-offset-reset: earliest
  22. enable-auto-commit: true

Producer:如下图,发布过程依然出现异常,但是并不是事务异常
image.png
Consumer:如下图,消息接收成功,由此可以看出,开启事务可以保证,消息接收与发布的原子性
image.png

6.多数据源-Spring整合Kafka开启事务

版本

Kafka从0.11版本开始支持事务,对应Springboot版本应 >=2.1.X

yml配置

主要通过transaction-id-prefix开启Kafka事务

  1. 主数据源开启Kafka事务
  2. 备数据源不开启Kafka事务 ```yaml

    服务器配置

    server: port: 8082

spring配置

kafka:

主数据源配置

primary: producer: bootstrap-servers: 120.48.107.224:9092 ack: “-1” key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 3 idempotence: true transaction-id-prefix: primary-transaction-producer- consumer: bootstrap-servers: 120.48.107.224:9092 group-id: multi-kafka-group-primary enable-auto-commit: true key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: earliest poll-time: 3000

从数据源配置

secondary: producer: bootstrap-servers: 120.48.107.224:9092 ack: “-1” key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 3 idempotence: true consumer: bootstrap-servers: 120.48.107.224:9092 group-id: multi-kafka-group-secondary enable-auto-commit: true key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: earliest poll-time: 3000

  1. <a name="TksF3"></a>
  2. ### 配置类
  3. 参考:[Springboot整合Kafka](https://www.yuque.com/u27809381/tsycgx/vgn8z2?view=doc_embed&inner=vv96V)
  4. <a name="hfqwF"></a>
  5. ### Producer
  6. 注意点如下
  7. 1. 主数据源发送消息到0分区,并且开启事务
  8. 2. 备数据源发送消息到1分区,不开启事务
  9. 3. 基于Spring的AOP原理,我们将发送逻辑与实际发送分开写,写在一个类里面Spring无法通过AOP来进行事务的相关控制
  10. ```java
  11. import com.ftc.multi.config.KafkaSendCallBackForString;
  12. import org.springframework.beans.factory.annotation.Qualifier;
  13. import org.springframework.kafka.core.KafkaTemplate;
  14. import org.springframework.stereotype.Component;
  15. import org.springframework.transaction.annotation.Transactional;
  16. import javax.annotation.Resource;
  17. /**
  18. * @author: 冯铁城 [17615007230@163.com]
  19. * @date: 2022-09-19 19:06:26
  20. * @describe: 事务生产者
  21. */
  22. @Component
  23. public class TransactionProducer {
  24. @Resource
  25. @Qualifier("primaryProducerTemplate")
  26. private KafkaTemplate<String, String> primaryTemplate;
  27. @Resource
  28. @Qualifier("secondaryProducerTemplate")
  29. private KafkaTemplate<String, String> secondaryTemplate;
  30. @Resource
  31. private KafkaSendCallBackForString kafkaSendCallBackForString;
  32. @Transactional(rollbackFor = RuntimeException.class, transactionManager = "primaryTransactionManager")
  33. public void sendMessageWithErrorPrimary() {
  34. //1.发送消息
  35. primaryTemplate.send("multi-kafka-topic", 0, "key", "primary-transaction-message")
  36. .addCallback(kafkaSendCallBackForString);
  37. //2.出现异常
  38. int errorNumber = 1 / 0;
  39. }
  40. public void sendMessageWithErrorSecondary() {
  41. //1.发送消息
  42. secondaryTemplate.send("multi-kafka-topic", 1, "key", "secondary-transaction-message")
  43. .addCallback(kafkaSendCallBackForString);
  44. //2.出现异常
  45. int errorNumber = 1 / 0;
  46. }
  47. }
  1. package com.ftc.multi;
  2. import com.ftc.multi.producer.TransactionProducer;
  3. import lombok.SneakyThrows;
  4. import org.junit.jupiter.api.AfterEach;
  5. import org.junit.jupiter.api.Test;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.boot.test.context.SpringBootTest;
  8. import java.util.concurrent.TimeUnit;
  9. @SpringBootTest
  10. class ProducerTests {
  11. @Autowired
  12. private TransactionProducer transactionProducer;
  13. @AfterEach
  14. @SneakyThrows({InterruptedException.class})
  15. void stopSeconds() {
  16. TimeUnit.MILLISECONDS.sleep(500);
  17. }
  18. @Test
  19. void sendWithErrorPrimary() {
  20. transactionProducer.sendMessageWithErrorPrimary();
  21. }
  22. @Test
  23. void sendWithErrorSecondary() {
  24. transactionProducer.sendMessageWithErrorSecondary();
  25. }
  26. }

Consumer

注意点如下

  1. 第一个消费者监听主数据源
  2. 第二个消费者监听备数据源 ```java import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.TopicPartition; import org.springframework.stereotype.Component;

/**

  • @author: 冯铁城 [17615007230@163.com]
  • @date: 2022-09-08 16:06:48
  • @describe: 多数据源监听 */ @Component public class Consumer {

    @KafkaListener(

    1. topics = {"multi-kafka-topic"},
    2. containerFactory = "primaryConsumerFactory",
    3. topicPartitions = {@TopicPartition(topic = "multi-kafka-topic", partitions = {"0"})}

    ) public void primaryListener(ConsumerRecord record) {

    1. System.out.println("主数据源_监听获取数据:" + record.value());

    }

    @KafkaListener(

    1. topics = {"multi-kafka-topic"},
    2. containerFactory = "secondaryConsumerFactory",
    3. topicPartitions = {@TopicPartition(topic = "multi-kafka-topic", partitions = {"1"})}

    ) public void secondaryListener(ConsumerRecord record) {

    1. System.out.println("备数据源_监听获取数据:" + record.value());

    } } ```

    验证事务

    主数据源发送异常

    Producer:如下图,消息发送异常,并且异常为事务回滚异常
    image.png
    Consumer:如下图,Consumer并未收到任何消息,事务成功
    image.png

    备数据源发送异常

    Producer:如下图,数据发送异常,异常为非事务异常
    image.png
    Consumer:如下图,因为备数据源未开启事务,即使数据发布过程中出现异常,Consumer依然可以接收到消息。由此可以看出,开启事务可以保证消息发布与接收的原子性
    image.png