测试说明

启动项目

正常发送两个消息

postman发送Get请求:

  1. http://localhost:8080//testSendMsg?flag=1

此时两个consumer都会接到消息

发送两个消息间隔抛出异常

postman发送Get请求:

  1. http://localhost:8080//testSendMsg?flag=0

发送两个消息之间出现异常,第一个消息会被回滚,此时两个consumer都不会接到消息, 这就是事务的原子性

pom依赖

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.1.0.RELEASE</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.lzx</groupId>
  12. <artifactId>springboot_kafka_mybatis</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>springboot_kafka_mybatis</name>
  15. <description>Kafka Study for Spring Boot</description>
  16. <properties>
  17. <java.version>1.8</java.version>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-web</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.kafka</groupId>
  26. <artifactId>spring-kafka</artifactId>
  27. <version>2.2.0.RELEASE</version>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.projectlombok</groupId>
  31. <artifactId>lombok</artifactId>
  32. <optional>true</optional>
  33. </dependency>
  34. </dependencies>
  35. <build>
  36. <plugins>
  37. <plugin>
  38. <groupId>org.springframework.boot</groupId>
  39. <artifactId>spring-boot-maven-plugin</artifactId>
  40. </plugin>
  41. </plugins>
  42. </build>
  43. </project>

application.properties 配置文件

  1. server.port=8080
  2. spring.kafka.bootstrap-servers=zjj101:9092 zjj102:9092 zjj103:9092
  3. spring.kafka.producer.retries=1
  4. spring.kafka.producer.batch-size=16384
  5. spring.kafka.producer.buffer-memory=33554432
  6. spring.kafka.consumer.group-id=etl
  7. spring.kafka.consumer.auto-offset-reset=earliest
  8. spring.kafka.consumer.max-poll-records=1000
  9. spring.kafka.consumer.auto-commit-interval=1000
  10. spring.kafka.consumer.enable-auto-commit=true

配置类

  1. package com.lzx.kafka.config;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.common.serialization.StringDeserializer;
  5. import org.apache.kafka.common.serialization.StringSerializer;
  6. import org.springframework.beans.factory.annotation.Value;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import org.springframework.kafka.annotation.EnableKafka;
  10. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  11. import org.springframework.kafka.config.KafkaListenerContainerFactory;
  12. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  13. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  14. import org.springframework.kafka.core.KafkaTemplate;
  15. import org.springframework.kafka.core.ProducerFactory;
  16. import org.springframework.kafka.transaction.KafkaTransactionManager;
  17. import java.util.HashMap;
  18. import java.util.Map;
  19. @Configuration
  20. @EnableKafka
  21. public class KafkaConfiguration {
  22. @Value("${spring.kafka.bootstrap-servers}")
  23. private String bootstrapServers;
  24. @Value("${spring.kafka.consumer.enable-auto-commit}")
  25. private Boolean autoCommit;
  26. @Value("${spring.kafka.consumer.auto-commit-interval}")
  27. private Integer autoCommitInterval;
  28. @Value("${spring.kafka.consumer.group-id}")
  29. private String groupId;
  30. @Value("${spring.kafka.consumer.max-poll-records}")
  31. private Integer maxPollRecords;
  32. @Value("${spring.kafka.consumer.auto-offset-reset}")
  33. private String autoOffsetReset;
  34. @Value("${spring.kafka.producer.retries}")
  35. private Integer retries;
  36. @Value("${spring.kafka.producer.batch-size}")
  37. private Integer batchSize;
  38. @Value("${spring.kafka.producer.buffer-memory}")
  39. private Integer bufferMemory;
  40. /**
  41. * 生产者配置信息
  42. */
  43. @Bean
  44. public Map<String, Object> producerConfigs() {
  45. Map<String, Object> props = new HashMap();
  46. //重试,0为不启用重试机制
  47. props.put(ProducerConfig.ACKS_CONFIG, "all");
  48. //连接地址
  49. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  50. props.put(ProducerConfig.RETRIES_CONFIG, retries);
  51. //控制批处理大小,单位为字节
  52. props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
  53. //批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
  54. props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  55. //生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
  56. props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
  57. //键的序列化方式
  58. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  59. //值的序列化方式
  60. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  61. return props;
  62. }
  63. // /** kafka无事务模式
  64. // * @return
  65. // */
  66. // @Bean
  67. // public ProducerFactory<String, String> producerFactory() {
  68. // return new DefaultKafkaProducerFactory<>(producerConfigs());
  69. // }
  70. /**
  71. * 开启kafka事务
  72. *
  73. * @return
  74. */
  75. @Bean
  76. public ProducerFactory<String, String> producerFactory() {
  77. DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs());
  78. //在producerFactory中开启事务功能
  79. factory.transactionCapable();
  80. //TransactionIdPrefix是用来生成Transactional.id的前缀
  81. factory.setTransactionIdPrefix("tran-");
  82. return factory;
  83. }
  84. @Bean
  85. public KafkaTransactionManager transactionManager(ProducerFactory producerFactory) {
  86. KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory);
  87. return manager;
  88. }
  89. /**
  90. * 生产者模板
  91. */
  92. @Bean
  93. public KafkaTemplate<String, String> kafkaTemplate() {
  94. return new KafkaTemplate<>(producerFactory());
  95. }
  96. /**
  97. * 消费者配置信息
  98. */
  99. @Bean
  100. public Map<String, Object> consumerConfigs() {
  101. Map<String, Object> props = new HashMap();
  102. /* //是否自动提交
  103. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  104. //自动提交的频率
  105. props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");*/
  106. //GroupID
  107. props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  108. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
  109. //连接地址
  110. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  111. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
  112. //Session超时设置
  113. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
  114. props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
  115. //键的反序列化方式
  116. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  117. //值的反序列化方式
  118. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  119. return props;
  120. }
  121. /**
  122. * 消费者批量工程
  123. */
  124. @Bean
  125. public KafkaListenerContainerFactory<?> batchFactory() {
  126. ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  127. factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
  128. //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
  129. factory.setBatchListener(true);
  130. return factory;
  131. }
  132. }

controller

  1. package com.lzx.kafka.controller;
  2. import com.lzx.kafka.producer.KfkaProducer;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Controller;
  5. import org.springframework.web.bind.annotation.GetMapping;
  6. import org.springframework.web.bind.annotation.ResponseBody;
  7. @Controller
  8. public class KafkaTestController {
  9. @Autowired
  10. private KfkaProducer producer;
  11. /**
  12. * 如果为0就抛异常,1就不抛异常,
  13. * 张俊杰 2021年01月29日 13:12
  14. */
  15. @GetMapping("/testSendMsg")
  16. @ResponseBody
  17. public String testSendMsg(String flag) {
  18. try {
  19. producer.send(flag);
  20. } catch (Exception e) {
  21. e.printStackTrace();
  22. return "false";
  23. }
  24. return "success";
  25. }
  26. }

生产者

  1. package com.lzx.kafka.producer;
  2. import com.lzx.kafka.handler.KafkaSendResultHandler;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.kafka.core.KafkaTemplate;
  5. import org.springframework.kafka.support.SendResult;
  6. import org.springframework.stereotype.Component;
  7. import org.springframework.transaction.annotation.Transactional;
  8. import org.springframework.util.concurrent.ListenableFuture;
  9. import java.util.concurrent.ExecutionException;
  10. import java.util.concurrent.TimeoutException;
  11. @Component
  12. public class KfkaProducer {
  13. final KafkaTemplate<String, String> kafkaTemplate;
  14. @Autowired
  15. KafkaSendResultHandler producerListener;
  16. public KfkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
  17. this.kafkaTemplate = kafkaTemplate;
  18. }
  19. //为kafka添加事务
  20. @Transactional
  21. //发送消息方法
  22. public void send(String flag) throws ExecutionException, InterruptedException, TimeoutException {
  23. //消息结果回调配置
  24. kafkaTemplate.setProducerListener(producerListener);
  25. this.doSent("topic1", "11111111111");
  26. //默认同步发送
  27. if ("0".equals(flag)) {
  28. int a = 1 / 0;
  29. }
  30. this.doSent("topic2", "2222222");
  31. /*//同步发送 当send方法耗时大于get方法所设定的参数时会抛出一个超时异常,但需要注意,这里仅抛出异常,消息还是会发送成功的
  32. kafkaTemplate.send(topic, "test send message timeout").get(1, TimeUnit.MICROSECONDS);*/
  33. //异步发送 发送时间较长的时候会导致进程提前关闭导致无法调用回调函数
  34. Thread.sleep(1000);
  35. }
  36. private void doSent(String topic, String data) {
  37. System.out.println("我是生产者, 我往: " + topic + "里面发送了数据,数据是: " + data);
  38. //默认同步发送
  39. ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(topic, data);
  40. }
  41. }

消费者

  1. package com.lzx.kafka.consumer;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.kafka.annotation.KafkaListener;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. public class KafkaReceiverErro {
  9. /**
  10. * 消费者1
  11. * 张俊杰 2021年01月29日 13:13
  12. */
  13. @KafkaListener(topics = "topic1", groupId = "test1")
  14. public void listen0(ConsumerRecord<String, String> record) {
  15. System.out.println("我是消费者"+ record.topic()+",我消费的内容是 : " + record.value());
  16. }
  17. /**
  18. * 消费者2
  19. * 张俊杰 2021年01月29日 13:15
  20. */
  21. @KafkaListener(topics = "topic2", groupId = "test1")
  22. public void listen2(ConsumerRecord<String, String> record) {
  23. System.out.println("我是消费者"+ record.topic()+",我消费的内容是 : " + record.value());
  24. }
  25. }

消息结果回调类

  1. package com.lzx.kafka.handler;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import org.apache.kafka.clients.producer.RecordMetadata;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.kafka.support.ProducerListener;
  7. import org.springframework.stereotype.Component;
  8. /**
  9. * 消息结果回调类
  10. */
  11. @Component
  12. public class KafkaSendResultHandler implements ProducerListener {
  13. private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);
  14. @Override
  15. public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
  16. log.info("Message send success : " + producerRecord.toString());
  17. }
  18. @Override
  19. public void onError(ProducerRecord producerRecord, Exception exception) {
  20. log.info("Message send error : " + producerRecord.toString());
  21. }
  22. }

启动类

  1. package com.lzx.kafka;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class KafkaStudyApplication {
  6. public static void main(String[] args) {
  7. SpringApplication.run(KafkaStudyApplication.class, args);
  8. }
  9. }

实体类

  1. package com.lzx.kafka.entity;
  2. import lombok.Data;
  3. import java.util.Date;
  4. @Data
  5. public class Message {
  6. private Long id;
  7. private String msg;
  8. private Date sendTime;
  9. }

码云地址

https://gitee.com/crow1/springboot_kafka