测试说明
启动项目
正常发送两个消息
postman发送Get请求:
http://localhost:8080//testSendMsg?flag=1
此时两个consumer都会接到消息
发送两个消息间隔抛出异常
postman发送Get请求:
http://localhost:8080//testSendMsg?flag=0
发送两个消息之间出现异常,第一个消息会被回滚,此时两个consumer都不会接到消息, 这就是事务的原子性
pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.lzx</groupId>
<artifactId>springboot_kafka_mybatis</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot_kafka_mybatis</name>
<description>Kafka Study for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties 配置文件
server.port=8080
spring.kafka.bootstrap-servers=zjj101:9092 zjj102:9092 zjj103:9092
spring.kafka.producer.retries=1
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.consumer.group-id=etl
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.max-poll-records=1000
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.enable-auto-commit=true
配置类
package com.lzx.kafka.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private Boolean autoCommit;
@Value("${spring.kafka.consumer.auto-commit-interval}")
private Integer autoCommitInterval;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.max-poll-records}")
private Integer maxPollRecords;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.producer.retries}")
private Integer retries;
@Value("${spring.kafka.producer.batch-size}")
private Integer batchSize;
@Value("${spring.kafka.producer.buffer-memory}")
private Integer bufferMemory;
/**
* 生产者配置信息
*/
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap();
//重试,0为不启用重试机制
props.put(ProducerConfig.ACKS_CONFIG, "all");
//连接地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
//控制批处理大小,单位为字节
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
//批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
//键的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//值的序列化方式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
// /** kafka无事务模式
// * @return
// */
// @Bean
// public ProducerFactory<String, String> producerFactory() {
// return new DefaultKafkaProducerFactory<>(producerConfigs());
// }
/**
* 开启kafka事务
*
* @return
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs());
//在producerFactory中开启事务功能
factory.transactionCapable();
//TransactionIdPrefix是用来生成Transactional.id的前缀
factory.setTransactionIdPrefix("tran-");
return factory;
}
@Bean
public KafkaTransactionManager transactionManager(ProducerFactory producerFactory) {
KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory);
return manager;
}
/**
* 生产者模板
*/
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
/**
* 消费者配置信息
*/
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap();
/* //是否自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
//自动提交的频率
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");*/
//GroupID
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
//连接地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
//Session超时设置
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
//键的反序列化方式
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//值的反序列化方式
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
/**
* 消费者批量工程
*/
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
//设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
return factory;
}
}
controller
package com.lzx.kafka.controller;
import com.lzx.kafka.producer.KfkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class KafkaTestController {
@Autowired
private KfkaProducer producer;
/**
* 如果为0就抛异常,1就不抛异常,
* 张俊杰 2021年01月29日 13:12
*/
@GetMapping("/testSendMsg")
@ResponseBody
public String testSendMsg(String flag) {
try {
producer.send(flag);
} catch (Exception e) {
e.printStackTrace();
return "false";
}
return "success";
}
}
生产者
package com.lzx.kafka.producer;
import com.lzx.kafka.handler.KafkaSendResultHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
@Component
public class KfkaProducer {
final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
KafkaSendResultHandler producerListener;
public KfkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
//为kafka添加事务
@Transactional
//发送消息方法
public void send(String flag) throws ExecutionException, InterruptedException, TimeoutException {
//消息结果回调配置
kafkaTemplate.setProducerListener(producerListener);
this.doSent("topic1", "11111111111");
//默认同步发送
if ("0".equals(flag)) {
int a = 1 / 0;
}
this.doSent("topic2", "2222222");
/*//同步发送 当send方法耗时大于get方法所设定的参数时会抛出一个超时异常,但需要注意,这里仅抛出异常,消息还是会发送成功的
kafkaTemplate.send(topic, "test send message timeout").get(1, TimeUnit.MICROSECONDS);*/
//异步发送 发送时间较长的时候会导致进程提前关闭导致无法调用回调函数
Thread.sleep(1000);
}
private void doSent(String topic, String data) {
System.out.println("我是生产者, 我往: " + topic + "里面发送了数据,数据是: " + data);
//默认同步发送
ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(topic, data);
}
}
消费者
package com.lzx.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaReceiverErro {
/**
* 消费者1
* 张俊杰 2021年01月29日 13:13
*/
@KafkaListener(topics = "topic1", groupId = "test1")
public void listen0(ConsumerRecord<String, String> record) {
System.out.println("我是消费者"+ record.topic()+",我消费的内容是 : " + record.value());
}
/**
* 消费者2
* 张俊杰 2021年01月29日 13:15
*/
@KafkaListener(topics = "topic2", groupId = "test1")
public void listen2(ConsumerRecord<String, String> record) {
System.out.println("我是消费者"+ record.topic()+",我消费的内容是 : " + record.value());
}
}
消息结果回调类
package com.lzx.kafka.handler;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;
/**
* 消息结果回调类
*/
@Component
public class KafkaSendResultHandler implements ProducerListener {
private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
log.info("Message send success : " + producerRecord.toString());
}
@Override
public void onError(ProducerRecord producerRecord, Exception exception) {
log.info("Message send error : " + producerRecord.toString());
}
}
启动类
package com.lzx.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaStudyApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaStudyApplication.class, args);
}
}
实体类
package com.lzx.kafka.entity;
import lombok.Data;
import java.util.Date;
@Data
public class Message {
private Long id;
private String msg;
private Date sendTime;
}