Java SpringBoot Kafka

一、介绍

结合生产环境的真实案例,以SpringBoot技术框架为基础,向大家介绍 kafka 的使用以及如何实现数据高吞吐!

二、程序实践

2.1、添加 kafka 依赖包

本次项目的SpringBoot版本为2.1.5.RELEASE,依赖的 kafka 的版本为2.2.6.RELEASE

  1. <!--kafka-->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. <version>2.2.6.RELEASE</version>
  6. </dependency>

2.2、添加 kafka 配置变量

当添加完了依赖包之后,只需要在application.properties中添加 kafka 配置变量,基本上就可以正常使用了。

  1. # 指定kafka server的地址,集群配多个,中间,逗号隔开
  2. spring.kafka.bootstrap-servers=197.168.25.196:9092
  3. #重试次数
  4. spring.kafka.producer.retries=3
  5. #批量发送的消息数量
  6. spring.kafka.producer.batch-size=1000
  7. #32MB的批处理缓冲区
  8. spring.kafka.producer.buffer-memory=33554432
  9. #默认消费者组
  10. spring.kafka.consumer.group-id=crm-microservice-newperformance
  11. #最早未被消费的offset
  12. spring.kafka.consumer.auto-offset-reset=earliest
  13. #批量一次最大拉取数据量
  14. spring.kafka.consumer.max-poll-records=4000
  15. #是否自动提交
  16. spring.kafka.consumer.enable-auto-commit=true
  17. #自动提交时间间隔,单位ms
  18. spring.kafka.consumer.auto-commit-interval=1000

2.3、创建一个消费者

  1. @Component
  2. public class BigDataTopicListener {
  3. private static final Logger log = LoggerFactory.getLogger(BigDataTopicListener.class);
  4. /**
  5. * 监听kafka数据
  6. * @param consumerRecords
  7. * @param ack
  8. */
  9. @KafkaListener(topics = {"big_data_topic"})
  10. public void consumer(ConsumerRecord<?, ?> consumerRecord) {
  11. log.info("收到bigData推送的数据'{}'", consumerRecord.toString());
  12. //...
  13. //db.save(consumerRecord);//插入或者更新数据
  14. }
  15. }

2.4、模拟对方推送数据测试

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. public class KafkaProducerTest {
  4. @Autowired
  5. private KafkaTemplate<String, String> kafkaTemplate;
  6. @Test
  7. public void testSend(){
  8. for (int i = 0; i < 5000; i++) {
  9. Map<String, Object> map = new LinkedHashMap<>();
  10. map.put("datekey", 20210610);
  11. map.put("userid", i);
  12. map.put("salaryAmount", i);
  13. //向kafka的big_data_topic主题推送数据
  14. kafkaTemplate.send("big_data_topic", JSONObject.toJSONString(map));
  15. }
  16. }
  17. }

起初,通过这种单条数据消费方式,进行测试程序没太大毛病!
但是,当上到生产之后,发现一个很大的问题,就是消费1000万条数据,至少需要3个小时,结果导致数据看板一直没数据。
痛定思痛,决定改成批量消费模型,怎么操作呢,请看下面!

2.5、将 kafka 的消费模式改成批量消费

首先,创建一个KafkaConfiguration配置类,内容如下!

  1. @Configuration
  2. public class KafkaConfiguration {
  3. @Value("${spring.kafka.bootstrap-servers}")
  4. private String bootstrapServers;
  5. @Value("${spring.kafka.producer.retries}")
  6. private Integer retries;
  7. @Value("${spring.kafka.producer.batch-size}")
  8. private Integer batchSize;
  9. @Value("${spring.kafka.producer.buffer-memory}")
  10. private Integer bufferMemory;
  11. @Value("${spring.kafka.consumer.group-id}")
  12. private String groupId;
  13. @Value("${spring.kafka.consumer.auto-offset-reset}")
  14. private String autoOffsetReset;
  15. @Value("${spring.kafka.consumer.max-poll-records}")
  16. private Integer maxPollRecords;
  17. @Value("${spring.kafka.consumer.batch.concurrency}")
  18. private Integer batchConcurrency;
  19. @Value("${spring.kafka.consumer.enable-auto-commit}")
  20. private Boolean autoCommit;
  21. @Value("${spring.kafka.consumer.auto-commit-interval}")
  22. private Integer autoCommitInterval;
  23. /**
  24. * 生产者配置信息
  25. */
  26. @Bean
  27. public Map<String, Object> producerConfigs() {
  28. Map<String, Object> props = new HashMap<>();
  29. props.put(ProducerConfig.ACKS_CONFIG, "0");
  30. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  31. props.put(ProducerConfig.RETRIES_CONFIG, retries);
  32. props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
  33. props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  34. props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
  35. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  36. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  37. return props;
  38. }
  39. /**
  40. * 生产者工厂
  41. */
  42. @Bean
  43. public ProducerFactory<String, String> producerFactory() {
  44. return new DefaultKafkaProducerFactory<>(producerConfigs());
  45. }
  46. /**
  47. * 生产者模板
  48. */
  49. @Bean
  50. public KafkaTemplate<String, String> kafkaTemplate() {
  51. return new KafkaTemplate<>(producerFactory());
  52. }
  53. /**
  54. * 消费者配置信息
  55. */
  56. @Bean
  57. public Map<String, Object> consumerConfigs() {
  58. Map<String, Object> props = new HashMap<>();
  59. props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  60. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
  61. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  62. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
  63. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
  64. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
  65. props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
  66. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  67. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  68. return props;
  69. }
  70. /**
  71. * 消费者批量工厂
  72. */
  73. @Bean
  74. public KafkaListenerContainerFactory<?> batchFactory() {
  75. ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  76. factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
  77. //设置并发量,小于或等于Topic的分区数
  78. factory.setConcurrency(batchConcurrency);
  79. factory.getContainerProperties().setPollTimeout(1500);
  80. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
  81. //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
  82. factory.setBatchListener(true);
  83. return factory;
  84. }
  85. }

同时,新增一个spring.kafka.consumer.batch.concurrency变量,用来设置并发数,通过这个参数可以指定几个线程来实现消费。
application.properties配置文件中,添加如下变量

  1. #批消费并发量,小于或等于Topic的分区数
  2. spring.kafka.consumer.batch.concurrency = 3
  3. #设置每次批量拉取的最大数量为4000
  4. spring.kafka.consumer.max-poll-records=4000
  5. #设置自动提交改成false
  6. spring.kafka.consumer.enable-auto-commit=false

最后,将单个消费方法改成批量消费方法模式

  1. @Component
  2. public class BigDataTopicListener {
  3. private static final Logger log = LoggerFactory.getLogger(BigDataTopicListener.class);
  4. /**
  5. * 监听kafka数据(批量消费)
  6. * @param consumerRecords
  7. * @param ack
  8. */
  9. @KafkaListener(topics = {"big_data_topic"}, containerFactory = "batchFactory")
  10. public void batchConsumer(List<ConsumerRecord<?, ?>> consumerRecords, Acknowledgment ack) {
  11. long start = System.currentTimeMillis();
  12. //...
  13. //db.batchSave(consumerRecords);//批量插入或者批量更新数据
  14. //手动提交
  15. ack.acknowledge();
  16. log.info("收到bigData推送的数据,拉取数据量:{},消费时间:{}ms", consumerRecords.size(), (System.currentTimeMillis() - start));
  17. }
  18. }

此时,消费性能大大的提升,数据处理的非常快,500万条数据,最多 30 分钟就全部消费完毕了。
本例中的消费微服务,生产环境部署了3台服务器,同时big_data_topic主题的分区数为3,因此并发数设置为3比较合适。
随着推送的数据量不断增加,如果觉得消费速度还不够,可以重新设置每次批量拉取的最大数量,活着横向扩展微服务的集群实例数量和 topic 的分区数,以此来加快数据的消费速度。
但是,如果在单台机器中,每次批量拉取的最大数量过大,大对象也会很大,会造成频繁的 gc 告警!
因此,在实际的使用过程中,每次批量拉取的最大数量并不是越大越好,根据当前服务器的硬件配置,调节到合适的阀值,才是最优的选择!