1.什么是Kafka事务
在Kafka0.11版本中,引入了Kafka事务这一概念。
Kafka事务是指,一系列Producer生产消息和Consumer消费消息在一个事务中进行,具备原子性,生产消息和Consumer偏移量同时成功或同时失败
2.Kafka事务支持的场景
- Producer生产消息封装在一个事务中,这些消息要么全部发送成功,要么全部发送失败
伪代码如下:
public void sendInTransaction(){//1.初始化事务producer.initTransactions();//2.开启事务producer.beginTransaction();//3.发送消息producer.send("topic", "message");//4.提交事务producer.commitTransaction();}
- Producer生产消息与Consumer消费消息封装在一个事务中,通常用于Kafka的流式处理
3.Kafka的特性
原子写
确保同一个事务中的消息要么全部写入成功,要不全部写入失败。保证跨分区写操作的原子性
这一点同样适用于生产消息与消费消息并提交偏移量在同一个事务中执行
因为Consumer提交偏移量最终也是会提交到名为consumer_offsets的内置Topic中,本质也是发布消息读事务消息
Consumer如果设置了isolation.level = read_committed,那么它只会读取已经提交了的消息。
Producer提交事务后,该事务中相关消息的状态会从uncommitted变更为committed,从而实现Consumer可以拉取到已提交的消息拒绝僵尸进程
本质上是基于Producer幂等性
开启Producer幂等后,每个PID对应每个Topic的每个Partition发送的消息都会携带一个自增的Sequence Number,而Broker对于每个PID的每个Topic的每个Partition同样会维护一个自增的Sequence Number。
可以通过对比Producer发送消息的Sequence Numer和Broker维护的Sequence Number,来确定消息的丢失,重复,乱序问题。具体可参考:Kafka一些常见问题
而所有具有相同Transaction-id的Producer都会被分配相同的PID。因此,即使两个相同的Producer同时发布重复的消息,因为PID一样,Broker也只会持久化一条4.Kafka事务的原理
Kafka为了支持事务特性,引入一个新的组件:Transaction Coordinator(TC:事务协调者)。
主要负责分配PID,记录事务状态等操作
详情参考:5.单数据源-Spring整合Kafka开启事务
版本
Kafka从0.11版本开始支持事务,对应Springboot版本应 >=2.1.Xyml配置
通过transaction-id-prefix配置开启事务机制 ```yaml服务器配置
server: port: 8089
spring配置
spring: kafka: bootstrap-servers: 120.48.107.224:9092 producer: acks: -1 transaction-id-prefix: transaction-producer- key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 3 properties: enable.idempotence: true retry.backoff.ms: 100 consumer: group-id: transaction-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: earliest enable-auto-commit: true
<a name="Rtako"></a>### Producer如下,编写两个方法1. 第一个方法正常发布消息2. 第二个方法发布消息后代码出现运行时异常3. Producer通过@Transactional(rollbackFor = RuntimeException.class)注解开启事务4. 基于Spring的AOP原理,我们将发送逻辑与实际发送分开写,写在一个类里面Spring无法通过AOP来进行事务的相关控制```javaimport com.ftc.transactionkafka.config.KafkaSendCallBackForString;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;/*** @author: 冯铁城 [17615007230@163.com]* @date: 2022-09-19 18:07:06* @describe: 生产者*/@Component@Transactional(rollbackFor = RuntimeException.class)public class Producer {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;@Resourceprivate KafkaSendCallBackForString kafkaSendCallBackForString;public void sendMessageNoError() {kafkaTemplate.send("transaction-topic", "transaction-message").addCallback(kafkaSendCallBackForString);}public void sendMessageWithError() {//1.发送消息kafkaTemplate.send("transaction-topic", "transaction-message").addCallback(kafkaSendCallBackForString);//2.异常int errorNumber = 1 / 0;}}
import com.ftc.transactionkafka.producer.Producer;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTestclass TransactionProducerTests {@Autowiredprivate Producer producer;@Testvoid sendNoError() {producer.sendMessageNoError();}@Testvoid sendWithError() {producer.sendMessageWithError();}}
Consumer
Consumer无特殊代码,正常监听Topic即可
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;/*** @author: 冯铁城 [17615007230@163.com]* @date: 2022-09-13 10:01:26* @describe: 消费者demo*/@Componentpublic class Consumer {@KafkaListener(topics = {"transaction-topic"}, groupId = "${spring.kafka.consumer.group-id}")public void listenerDemo(ConsumerRecord<String, String> record) {System.out.println("receive message:" + record.value());}}
验证事务
开启事务,无异常发布消息
Producer:如下图,消息发布成功
Consumer:如下图,消息消费成功
开启事务,有异常发布消息
Producer:因为开启事务,消息发布过程中出现异常,并且异常为事务回滚异常,消息发布失败
Consumer:如下图,并未接收到任何消息
关闭事务,有异常发布消息
为了进一步验证事务,我们关闭事务,再试一次
yml配置:如下,关闭事务机制
#服务器配置server:port: 8089#spring配置spring:kafka:bootstrap-servers: 120.48.107.224:9092producer:acks: -1# transaction-id-prefix: transaction-producer-key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerretries: 3properties:enable.idempotence: trueretry.backoff.ms: 100consumer:group-id: transaction-groupkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliestenable-auto-commit: true
Producer:如下图,发布过程依然出现异常,但是并不是事务异常
Consumer:如下图,消息接收成功,由此可以看出,开启事务可以保证,消息接收与发布的原子性
6.多数据源-Spring整合Kafka开启事务
版本
Kafka从0.11版本开始支持事务,对应Springboot版本应 >=2.1.X
yml配置
主要通过transaction-id-prefix开启Kafka事务
spring配置
kafka:
主数据源配置
primary: producer: bootstrap-servers: 120.48.107.224:9092 ack: “-1” key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 3 idempotence: true transaction-id-prefix: primary-transaction-producer- consumer: bootstrap-servers: 120.48.107.224:9092 group-id: multi-kafka-group-primary enable-auto-commit: true key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: earliest poll-time: 3000
从数据源配置
secondary: producer: bootstrap-servers: 120.48.107.224:9092 ack: “-1” key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 3 idempotence: true consumer: bootstrap-servers: 120.48.107.224:9092 group-id: multi-kafka-group-secondary enable-auto-commit: true key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: earliest poll-time: 3000
<a name="TksF3"></a>### 配置类参考:[Springboot整合Kafka](https://www.yuque.com/u27809381/tsycgx/vgn8z2?view=doc_embed&inner=vv96V)<a name="hfqwF"></a>### Producer注意点如下1. 主数据源发送消息到0分区,并且开启事务2. 备数据源发送消息到1分区,不开启事务3. 基于Spring的AOP原理,我们将发送逻辑与实际发送分开写,写在一个类里面Spring无法通过AOP来进行事务的相关控制```javaimport com.ftc.multi.config.KafkaSendCallBackForString;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;/*** @author: 冯铁城 [17615007230@163.com]* @date: 2022-09-19 19:06:26* @describe: 事务生产者*/@Componentpublic class TransactionProducer {@Resource@Qualifier("primaryProducerTemplate")private KafkaTemplate<String, String> primaryTemplate;@Resource@Qualifier("secondaryProducerTemplate")private KafkaTemplate<String, String> secondaryTemplate;@Resourceprivate KafkaSendCallBackForString kafkaSendCallBackForString;@Transactional(rollbackFor = RuntimeException.class, transactionManager = "primaryTransactionManager")public void sendMessageWithErrorPrimary() {//1.发送消息primaryTemplate.send("multi-kafka-topic", 0, "key", "primary-transaction-message").addCallback(kafkaSendCallBackForString);//2.出现异常int errorNumber = 1 / 0;}public void sendMessageWithErrorSecondary() {//1.发送消息secondaryTemplate.send("multi-kafka-topic", 1, "key", "secondary-transaction-message").addCallback(kafkaSendCallBackForString);//2.出现异常int errorNumber = 1 / 0;}}
package com.ftc.multi;import com.ftc.multi.producer.TransactionProducer;import lombok.SneakyThrows;import org.junit.jupiter.api.AfterEach;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import java.util.concurrent.TimeUnit;@SpringBootTestclass ProducerTests {@Autowiredprivate TransactionProducer transactionProducer;@AfterEach@SneakyThrows({InterruptedException.class})void stopSeconds() {TimeUnit.MILLISECONDS.sleep(500);}@Testvoid sendWithErrorPrimary() {transactionProducer.sendMessageWithErrorPrimary();}@Testvoid sendWithErrorSecondary() {transactionProducer.sendMessageWithErrorSecondary();}}
Consumer
注意点如下
- 第一个消费者监听主数据源
- 第二个消费者监听备数据源 ```java import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.TopicPartition; import org.springframework.stereotype.Component;
/**
- @author: 冯铁城 [17615007230@163.com]
- @date: 2022-09-08 16:06:48
@describe: 多数据源监听 */ @Component public class Consumer {
@KafkaListener(
topics = {"multi-kafka-topic"},containerFactory = "primaryConsumerFactory",topicPartitions = {@TopicPartition(topic = "multi-kafka-topic", partitions = {"0"})}
) public void primaryListener(ConsumerRecord
record) { System.out.println("主数据源_监听获取数据:" + record.value());
}
@KafkaListener(
topics = {"multi-kafka-topic"},containerFactory = "secondaryConsumerFactory",topicPartitions = {@TopicPartition(topic = "multi-kafka-topic", partitions = {"1"})}
) public void secondaryListener(ConsumerRecord
record) { System.out.println("备数据源_监听获取数据:" + record.value());
验证事务
主数据源发送异常
Producer:如下图,消息发送异常,并且异常为事务回滚异常

Consumer:如下图,Consumer并未收到任何消息,事务成功
备数据源发送异常
Producer:如下图,数据发送异常,异常为非事务异常

Consumer:如下图,因为备数据源未开启事务,即使数据发布过程中出现异常,Consumer依然可以接收到消息。由此可以看出,开启事务可以保证消息发布与接收的原子性
