1.什么是Kafka事务
在Kafka0.11版本中,引入了Kafka事务这一概念。
Kafka事务是指,一系列Producer生产消息和Consumer消费消息在一个事务中进行,具备原子性,生产消息和Consumer偏移量同时成功或同时失败
2.Kafka事务支持的场景
- Producer生产消息封装在一个事务中,这些消息要么全部发送成功,要么全部发送失败
伪代码如下:
public void sendInTransaction(){
//1.初始化事务
producer.initTransactions();
//2.开启事务
producer.beginTransaction();
//3.发送消息
producer.send("topic", "message");
//4.提交事务
producer.commitTransaction();
}
- Producer生产消息与Consumer消费消息封装在一个事务中,通常用于Kafka的流式处理
3.Kafka的特性
原子写
确保同一个事务中的消息要么全部写入成功,要不全部写入失败。保证跨分区写操作的原子性
这一点同样适用于生产消息与消费消息并提交偏移量在同一个事务中执行
因为Consumer提交偏移量最终也是会提交到名为consumer_offsets的内置Topic中,本质也是发布消息读事务消息
Consumer如果设置了isolation.level = read_committed,那么它只会读取已经提交了的消息。
Producer提交事务后,该事务中相关消息的状态会从uncommitted变更为committed,从而实现Consumer可以拉取到已提交的消息拒绝僵尸进程
本质上是基于Producer幂等性
开启Producer幂等后,每个PID对应每个Topic的每个Partition发送的消息都会携带一个自增的Sequence Number,而Broker对于每个PID的每个Topic的每个Partition同样会维护一个自增的Sequence Number。
可以通过对比Producer发送消息的Sequence Numer和Broker维护的Sequence Number,来确定消息的丢失,重复,乱序问题。具体可参考:Kafka一些常见问题
而所有具有相同Transaction-id的Producer都会被分配相同的PID。因此,即使两个相同的Producer同时发布重复的消息,因为PID一样,Broker也只会持久化一条4.Kafka事务的原理
Kafka为了支持事务特性,引入一个新的组件:Transaction Coordinator(TC:事务协调者)。
主要负责分配PID,记录事务状态等操作
详情参考:5.单数据源-Spring整合Kafka开启事务
版本
Kafka从0.11版本开始支持事务,对应Springboot版本应 >=2.1.Xyml配置
通过transaction-id-prefix配置开启事务机制 ```yaml服务器配置
server: port: 8089
spring配置
spring: kafka: bootstrap-servers: 120.48.107.224:9092 producer: acks: -1 transaction-id-prefix: transaction-producer- key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 3 properties: enable.idempotence: true retry.backoff.ms: 100 consumer: group-id: transaction-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: earliest enable-auto-commit: true
<a name="Rtako"></a>
### Producer
如下,编写两个方法
1. 第一个方法正常发布消息
2. 第二个方法发布消息后代码出现运行时异常
3. Producer通过@Transactional(rollbackFor = RuntimeException.class)注解开启事务
4. 基于Spring的AOP原理,我们将发送逻辑与实际发送分开写,写在一个类里面Spring无法通过AOP来进行事务的相关控制
```java
import com.ftc.transactionkafka.config.KafkaSendCallBackForString;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-19 18:07:06
* @describe: 生产者
*/
@Component
@Transactional(rollbackFor = RuntimeException.class)
public class Producer {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@Resource
private KafkaSendCallBackForString kafkaSendCallBackForString;
public void sendMessageNoError() {
kafkaTemplate.send("transaction-topic", "transaction-message")
.addCallback(kafkaSendCallBackForString);
}
public void sendMessageWithError() {
//1.发送消息
kafkaTemplate.send("transaction-topic", "transaction-message")
.addCallback(kafkaSendCallBackForString);
//2.异常
int errorNumber = 1 / 0;
}
}
import com.ftc.transactionkafka.producer.Producer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class TransactionProducerTests {
@Autowired
private Producer producer;
@Test
void sendNoError() {
producer.sendMessageNoError();
}
@Test
void sendWithError() {
producer.sendMessageWithError();
}
}
Consumer
Consumer无特殊代码,正常监听Topic即可
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-13 10:01:26
* @describe: 消费者demo
*/
@Component
public class Consumer {
@KafkaListener(topics = {"transaction-topic"}, groupId = "${spring.kafka.consumer.group-id}")
public void listenerDemo(ConsumerRecord<String, String> record) {
System.out.println("receive message:" + record.value());
}
}
验证事务
开启事务,无异常发布消息
Producer:如下图,消息发布成功
Consumer:如下图,消息消费成功
开启事务,有异常发布消息
Producer:因为开启事务,消息发布过程中出现异常,并且异常为事务回滚异常,消息发布失败
Consumer:如下图,并未接收到任何消息
关闭事务,有异常发布消息
为了进一步验证事务,我们关闭事务,再试一次
yml配置:如下,关闭事务机制
#服务器配置
server:
port: 8089
#spring配置
spring:
kafka:
bootstrap-servers: 120.48.107.224:9092
producer:
acks: -1
# transaction-id-prefix: transaction-producer-
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 3
properties:
enable.idempotence: true
retry.backoff.ms: 100
consumer:
group-id: transaction-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
enable-auto-commit: true
Producer:如下图,发布过程依然出现异常,但是并不是事务异常
Consumer:如下图,消息接收成功,由此可以看出,开启事务可以保证,消息接收与发布的原子性
6.多数据源-Spring整合Kafka开启事务
版本
Kafka从0.11版本开始支持事务,对应Springboot版本应 >=2.1.X
yml配置
主要通过transaction-id-prefix开启Kafka事务
spring配置
kafka:
主数据源配置
primary: producer: bootstrap-servers: 120.48.107.224:9092 ack: “-1” key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 3 idempotence: true transaction-id-prefix: primary-transaction-producer- consumer: bootstrap-servers: 120.48.107.224:9092 group-id: multi-kafka-group-primary enable-auto-commit: true key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: earliest poll-time: 3000
从数据源配置
secondary: producer: bootstrap-servers: 120.48.107.224:9092 ack: “-1” key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 3 idempotence: true consumer: bootstrap-servers: 120.48.107.224:9092 group-id: multi-kafka-group-secondary enable-auto-commit: true key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: earliest poll-time: 3000
<a name="TksF3"></a>
### 配置类
参考:[Springboot整合Kafka](https://www.yuque.com/u27809381/tsycgx/vgn8z2?view=doc_embed&inner=vv96V)
<a name="hfqwF"></a>
### Producer
注意点如下
1. 主数据源发送消息到0分区,并且开启事务
2. 备数据源发送消息到1分区,不开启事务
3. 基于Spring的AOP原理,我们将发送逻辑与实际发送分开写,写在一个类里面Spring无法通过AOP来进行事务的相关控制
```java
import com.ftc.multi.config.KafkaSendCallBackForString;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-19 19:06:26
* @describe: 事务生产者
*/
@Component
public class TransactionProducer {
@Resource
@Qualifier("primaryProducerTemplate")
private KafkaTemplate<String, String> primaryTemplate;
@Resource
@Qualifier("secondaryProducerTemplate")
private KafkaTemplate<String, String> secondaryTemplate;
@Resource
private KafkaSendCallBackForString kafkaSendCallBackForString;
@Transactional(rollbackFor = RuntimeException.class, transactionManager = "primaryTransactionManager")
public void sendMessageWithErrorPrimary() {
//1.发送消息
primaryTemplate.send("multi-kafka-topic", 0, "key", "primary-transaction-message")
.addCallback(kafkaSendCallBackForString);
//2.出现异常
int errorNumber = 1 / 0;
}
public void sendMessageWithErrorSecondary() {
//1.发送消息
secondaryTemplate.send("multi-kafka-topic", 1, "key", "secondary-transaction-message")
.addCallback(kafkaSendCallBackForString);
//2.出现异常
int errorNumber = 1 / 0;
}
}
package com.ftc.multi;
import com.ftc.multi.producer.TransactionProducer;
import lombok.SneakyThrows;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.concurrent.TimeUnit;
@SpringBootTest
class ProducerTests {
@Autowired
private TransactionProducer transactionProducer;
@AfterEach
@SneakyThrows({InterruptedException.class})
void stopSeconds() {
TimeUnit.MILLISECONDS.sleep(500);
}
@Test
void sendWithErrorPrimary() {
transactionProducer.sendMessageWithErrorPrimary();
}
@Test
void sendWithErrorSecondary() {
transactionProducer.sendMessageWithErrorSecondary();
}
}
Consumer
注意点如下
- 第一个消费者监听主数据源
- 第二个消费者监听备数据源 ```java import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.TopicPartition; import org.springframework.stereotype.Component;
/**
- @author: 冯铁城 [17615007230@163.com]
- @date: 2022-09-08 16:06:48
@describe: 多数据源监听 */ @Component public class Consumer {
@KafkaListener(
topics = {"multi-kafka-topic"},
containerFactory = "primaryConsumerFactory",
topicPartitions = {@TopicPartition(topic = "multi-kafka-topic", partitions = {"0"})}
) public void primaryListener(ConsumerRecord
record) { System.out.println("主数据源_监听获取数据:" + record.value());
}
@KafkaListener(
topics = {"multi-kafka-topic"},
containerFactory = "secondaryConsumerFactory",
topicPartitions = {@TopicPartition(topic = "multi-kafka-topic", partitions = {"1"})}
) public void secondaryListener(ConsumerRecord
record) { System.out.println("备数据源_监听获取数据:" + record.value());
验证事务
主数据源发送异常
Producer:如下图,消息发送异常,并且异常为事务回滚异常
Consumer:如下图,Consumer并未收到任何消息,事务成功
备数据源发送异常
Producer:如下图,数据发送异常,异常为非事务异常
Consumer:如下图,因为备数据源未开启事务,即使数据发布过程中出现异常,Consumer依然可以接收到消息。由此可以看出,开启事务可以保证消息发布与接收的原子性