安装启动zookeeper

docker run -d —name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
image.png

安裝启动kafka

docker run -d —name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.186.130:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.186.130:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
image.png
查看zookeeper和kafka是否安装成功
image.png

创建topic test

进入kafka容器,
docker exec -it kafka /bin/sh
cd /opt/kafka_2.13-2.7.0/bin (进入自己的版本)
./kafka-console-producer.sh —broker-list localhost:9092 —topic test
image.png

SpringBoot集成Kafka

添加依赖:

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter</artifactId>
  4. </dependency>
  5. <!--https://search.maven.org/artifact/org.springframework.kafka/spring-kafka/2.6.10/jar-->
  6. <dependency>
  7. <groupId>org.springframework.kafka</groupId>
  8. <artifactId>spring-kafka</artifactId>
  9. <version>2.6.10</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.springframework.boot</groupId>
  13. <artifactId>spring-boot-starter-test</artifactId>
  14. <scope>test</scope>
  15. </dependency>

配置kafka
application.yml

  1. server:
  2. port: 8080
  3. servlet:
  4. context-path: /demo
  5. spring:
  6. kafka:
  7. bootstrap-servers: 192.168.186.130:9092 #kafka地址
  8. producer:
  9. retries: 0
  10. batch-size: 16384
  11. buffer-memory: 33554432
  12. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  13. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  14. consumer:
  15. group-id: javabb-demo-mq-kafka
  16. # 手动提交
  17. enable-auto-commit: false
  18. auto-offset-reset: latest
  19. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  20. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  21. properties:
  22. session.timeout.ms: 60000
  23. listener:
  24. log-container-config: false
  25. concurrency: 5
  26. # 手动提交
  27. ack-mode: manual_immediate

定义常量:kafkaConsts

  1. public class KafkaConsts {
  2. /**
  3. * 默认分区大小
  4. */
  5. public final static Integer DEFAULT_PARTITION_NUM = 3;
  6. /**
  7. * Topic 名称
  8. */
  9. public final static String TOPIC_TEST = "test";
  10. }

自定义kafka配置:KafkaConfig

  1. @Configuration
  2. @EnableConfigurationProperties({KafkaProperties.class})
  3. @EnableKafka
  4. @AllArgsConstructor
  5. public class KafkaConfig {
  6. private final KafkaProperties kafkaProperties;
  7. @Bean
  8. public KafkaTemplate<String, String> kafkaTemplate() {
  9. return new KafkaTemplate<>(producerFactory());
  10. }
  11. @Bean
  12. public ProducerFactory<String, String> producerFactory() {
  13. return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
  14. }
  15. @Bean
  16. public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
  17. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  18. factory.setConsumerFactory(consumerFactory());
  19. factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
  20. factory.setBatchListener(true);
  21. factory.getContainerProperties().setPollTimeout(3000);
  22. return factory;
  23. }
  24. @Bean
  25. public ConsumerFactory<String, String> consumerFactory() {
  26. return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
  27. }
  28. @Bean("ackContainerFactory")
  29. public ConcurrentKafkaListenerContainerFactory<String, String> ackContainerFactory() {
  30. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  31. factory.setConsumerFactory(consumerFactory());
  32. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
  33. factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
  34. return factory;
  35. }
  36. }

创建消息消费者

  1. @Slf4j
  2. @Component
  3. public class MessageHandle {
  4. @KafkaListener(topics = KafkaConsts.TOPIC_TEST, containerFactory = "ackContainerFactory")
  5. public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
  6. try {
  7. String message = (String) record.value();
  8. log.info("收到消息: {}", message);
  9. } catch (Exception e) {
  10. log.error(e.getMessage(), e);
  11. } finally {
  12. // 手动提交 offset
  13. acknowledgment.acknowledge();
  14. }
  15. }
  16. }

Springboot启动类

  1. @SpringBootApplication
  2. public class KafkaApplication {
  3. public static void main(String[] args) {
  4. SpringApplication.run(KafkaApplication.class, args);
  5. }
  6. }

启动KafkaApplication

创建测试类,发送消息测试

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest(classes = KafkaApplication.class)
  3. public class kafkaTest {
  4. @Autowired
  5. private KafkaTemplate<String, String> kafkaTemplate;
  6. /**
  7. * 测试发送消息
  8. */
  9. @Test
  10. public void testSend() {
  11. kafkaTemplate.send(KafkaConsts.TOPIC_TEST, "hello,kafka...");
  12. }
  13. }

image.png
在服务器中也可以发送消息测试
./kafka-console-producer.sh —broker-list localhost:9092 —topic test
输入消息。
image.png