测试说明
启动项目
正常发送两个消息
postman发送Get请求:
http://localhost:8080//testSendMsg?flag=1
此时两个consumer都会接到消息
发送两个消息间隔抛出异常
postman发送Get请求:
http://localhost:8080//testSendMsg?flag=0
发送两个消息之间出现异常,第一个消息会被回滚,此时两个consumer都不会接到消息, 这就是事务的原子性
pom依赖
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.0.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.lzx</groupId><artifactId>springboot_kafka_mybatis</artifactId><version>0.0.1-SNAPSHOT</version><name>springboot_kafka_mybatis</name><description>Kafka Study for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.0.RELEASE</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
application.properties 配置文件
server.port=8080spring.kafka.bootstrap-servers=zjj101:9092 zjj102:9092 zjj103:9092spring.kafka.producer.retries=1spring.kafka.producer.batch-size=16384spring.kafka.producer.buffer-memory=33554432spring.kafka.consumer.group-id=etlspring.kafka.consumer.auto-offset-reset=earliestspring.kafka.consumer.max-poll-records=1000spring.kafka.consumer.auto-commit-interval=1000spring.kafka.consumer.enable-auto-commit=true
配置类
package com.lzx.kafka.config;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.common.serialization.StringSerializer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerContainerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.core.ProducerFactory;import org.springframework.kafka.transaction.KafkaTransactionManager;import java.util.HashMap;import java.util.Map;@Configuration@EnableKafkapublic class KafkaConfiguration {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.enable-auto-commit}")private Boolean autoCommit;@Value("${spring.kafka.consumer.auto-commit-interval}")private Integer autoCommitInterval;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.consumer.max-poll-records}")private Integer maxPollRecords;@Value("${spring.kafka.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${spring.kafka.producer.retries}")private Integer retries;@Value("${spring.kafka.producer.batch-size}")private Integer batchSize;@Value("${spring.kafka.producer.buffer-memory}")private Integer bufferMemory;/*** 生产者配置信息*/@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap();//重试,0为不启用重试机制props.put(ProducerConfig.ACKS_CONFIG, "all");//连接地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.RETRIES_CONFIG, retries);//控制批处理大小,单位为字节props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);//批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量props.put(ProducerConfig.LINGER_MS_CONFIG, 1);//生产者可以使用的总内存字节来缓冲等待发送到服务器的记录props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);//键的序列化方式props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//值的序列化方式props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}// /** kafka无事务模式// * @return// */// @Bean// public ProducerFactory<String, String> producerFactory() {// return new DefaultKafkaProducerFactory<>(producerConfigs());// }/*** 开启kafka事务** @return*/@Beanpublic ProducerFactory<String, String> producerFactory() {DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs());//在producerFactory中开启事务功能factory.transactionCapable();//TransactionIdPrefix是用来生成Transactional.id的前缀factory.setTransactionIdPrefix("tran-");return factory;}@Beanpublic KafkaTransactionManager transactionManager(ProducerFactory producerFactory) {KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory);return manager;}/*** 生产者模板*/@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}/*** 消费者配置信息*/@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap();/* //是否自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动提交的频率props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");*///GroupIDprops.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);//连接地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//Session超时设置props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);//键的反序列化方式props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);//值的反序列化方式props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}/*** 消费者批量工程*/@Beanpublic KafkaListenerContainerFactory<?> batchFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));//设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIGfactory.setBatchListener(true);return factory;}}
controller
package com.lzx.kafka.controller;import com.lzx.kafka.producer.KfkaProducer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.ResponseBody;@Controllerpublic class KafkaTestController {@Autowiredprivate KfkaProducer producer;/*** 如果为0就抛异常,1就不抛异常,* 张俊杰 2021年01月29日 13:12*/@GetMapping("/testSendMsg")@ResponseBodypublic String testSendMsg(String flag) {try {producer.send(flag);} catch (Exception e) {e.printStackTrace();return "false";}return "success";}}
生产者
package com.lzx.kafka.producer;import com.lzx.kafka.handler.KafkaSendResultHandler;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Component;import org.springframework.transaction.annotation.Transactional;import org.springframework.util.concurrent.ListenableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.TimeoutException;@Componentpublic class KfkaProducer {final KafkaTemplate<String, String> kafkaTemplate;@AutowiredKafkaSendResultHandler producerListener;public KfkaProducer(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}//为kafka添加事务@Transactional//发送消息方法public void send(String flag) throws ExecutionException, InterruptedException, TimeoutException {//消息结果回调配置kafkaTemplate.setProducerListener(producerListener);this.doSent("topic1", "11111111111");//默认同步发送if ("0".equals(flag)) {int a = 1 / 0;}this.doSent("topic2", "2222222");/*//同步发送 当send方法耗时大于get方法所设定的参数时会抛出一个超时异常,但需要注意,这里仅抛出异常,消息还是会发送成功的kafkaTemplate.send(topic, "test send message timeout").get(1, TimeUnit.MICROSECONDS);*///异步发送 发送时间较长的时候会导致进程提前关闭导致无法调用回调函数Thread.sleep(1000);}private void doSent(String topic, String data) {System.out.println("我是生产者, 我往: " + topic + "里面发送了数据,数据是: " + data);//默认同步发送ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(topic, data);}}
消费者
package com.lzx.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Componentpublic class KafkaReceiverErro {/*** 消费者1* 张俊杰 2021年01月29日 13:13*/@KafkaListener(topics = "topic1", groupId = "test1")public void listen0(ConsumerRecord<String, String> record) {System.out.println("我是消费者"+ record.topic()+",我消费的内容是 : " + record.value());}/*** 消费者2* 张俊杰 2021年01月29日 13:15*/@KafkaListener(topics = "topic2", groupId = "test1")public void listen2(ConsumerRecord<String, String> record) {System.out.println("我是消费者"+ record.topic()+",我消费的内容是 : " + record.value());}}
消息结果回调类
package com.lzx.kafka.handler;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.support.ProducerListener;import org.springframework.stereotype.Component;/*** 消息结果回调类*/@Componentpublic class KafkaSendResultHandler implements ProducerListener {private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);@Overridepublic void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {log.info("Message send success : " + producerRecord.toString());}@Overridepublic void onError(ProducerRecord producerRecord, Exception exception) {log.info("Message send error : " + producerRecord.toString());}}
启动类
package com.lzx.kafka;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class KafkaStudyApplication {public static void main(String[] args) {SpringApplication.run(KafkaStudyApplication.class, args);}}
实体类
package com.lzx.kafka.entity;import lombok.Data;import java.util.Date;@Datapublic class Message {private Long id;private String msg;private Date sendTime;}
