Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

Apache Kafka 是一个开源分布式事件流平台,已被数千家公司使用,作为高性能数据通道、数据流分析、数据采集和关键任务应用。

搭建环境是基于Docker,Docker Compose。

1. 创建目录。

  1. mkdir -p kafka/zoo1/data
  2. mkdir -p kafka/zoo1/datalog
  3. mkdir -p kafka/zoo1/logs
  4. mkdir -p kafka/zoo2/data
  5. mkdir -p kafka/zoo2/datalog
  6. mkdir -p kafka/zoo2/logs
  7. mkdir -p kafka/zoo3/data
  8. mkdir -p kafka/zoo3/datalog
  9. mkdir -p kafka/zoo3/logs
  10. mkdir -p kafka/kafka1
  11. mkdir -p kafka/kafka2
  12. mkdir -p kafka/kafka3

Kafka目录如下:

  1. .
  2. ├── docker-compose.yml
  3. ├── kafka1
  4. ├── kafka2
  5. ├── kafka3
  6. ├── zoo1
  7. ├── data
  8. ├── datalog
  9. └── logs
  10. ├── zoo2
  11. ├── data
  12. ├── datalog
  13. └── logs
  14. └── zoo3
  15. ├── data
  16. ├── datalog
  17. └── logs

2. 创建docker-compose.yml文件并放在根目录。

  1. version: '3.8'
  2. networks:
  3. app-tier:
  4. driver: bridge
  5. services:
  6. zoo1:
  7. container_name: zoo1
  8. hostname: zoo1
  9. image: zookeeper:3.7
  10. restart: always
  11. ports:
  12. - 2181:2181
  13. environment:
  14. ZOO_MY_ID: 1
  15. ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
  16. ZOO_LOG4J_PROP: INFO,ROLLINGFILE
  17. volumes:
  18. - ./zoo1/data:/data
  19. - ./zoo1/datalog:/datalog
  20. - ./zoo1/logs:/logs
  21. networks:
  22. - app-tier
  23. zoo2:
  24. container_name: zoo2
  25. hostname: zoo2
  26. image: zookeeper:3.7
  27. restart: always
  28. ports:
  29. - 2182:2181
  30. environment:
  31. ZOO_MY_ID: 2
  32. ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
  33. ZOO_LOG4J_PROP: INFO,ROLLINGFILE
  34. volumes:
  35. - ./zoo2/data:/data
  36. - ./zoo2/datalog:/datalog
  37. - ./zoo2/logs:/logs
  38. networks:
  39. - app-tier
  40. zoo3:
  41. container_name: zoo3
  42. hostname: zoo3
  43. image: zookeeper:3.7
  44. restart: always
  45. ports:
  46. - 2183:2181
  47. environment:
  48. ZOO_MY_ID: 3
  49. ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
  50. ZOO_LOG4J_PROP: INFO,ROLLINGFILE
  51. volumes:
  52. - ./zoo3/data:/data
  53. - ./zoo3/datalog:/datalog
  54. - ./zoo3/logs:/logs
  55. networks:
  56. - app-tier
  57. kafka1:
  58. container_name: kafka1
  59. hostname: kafka1
  60. image: bitnami/kafka:2.8.1
  61. restart: always
  62. ports:
  63. - '9091:9091'
  64. volumes:
  65. - ./kafka1:/bitnami/kafka
  66. environment:
  67. - KAFKA_BROKER_ID=1
  68. - ALLOW_PLAINTEXT_LISTENER=yes
  69. - KAFKA_CFG_LISTENERS=PLAINTEXT://:9091
  70. - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.77:9091
  71. - KAFKA_CFG_ZOOKEEPER_CONNECT=zoo1:2181,zoo2:2182,zoo3:2183
  72. - KAFKA_HEAP_OPTS=-Xmx512m -Xms512m
  73. networks:
  74. - app-tier
  75. depends_on:
  76. - zoo1
  77. - zoo2
  78. - zoo3
  79. kafka2:
  80. container_name: kafka2
  81. hostname: kafka2
  82. image: bitnami/kafka:2.8.1
  83. restart: always
  84. ports:
  85. - '9092:9092'
  86. volumes:
  87. - ./kafka2:/bitnami/kafka
  88. environment:
  89. - KAFKA_BROKER_ID=2
  90. - ALLOW_PLAINTEXT_LISTENER=yes
  91. - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
  92. - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.77:9092
  93. - KAFKA_CFG_ZOOKEEPER_CONNECT=zoo1:2181,zoo2:2182,zoo3:2183
  94. - KAFKA_HEAP_OPTS=-Xmx512m -Xms512m
  95. networks:
  96. - app-tier
  97. depends_on:
  98. - zoo1
  99. - zoo2
  100. - zoo3
  101. kafka3:
  102. container_name: kafka3
  103. hostname: kafka3
  104. image: bitnami/kafka:2.8.1
  105. restart: always
  106. ports:
  107. - '9093:9093'
  108. volumes:
  109. - ./kafka3:/bitnami/kafka
  110. environment:
  111. - KAFKA_BROKER_ID=3
  112. - ALLOW_PLAINTEXT_LISTENER=yes
  113. - KAFKA_CFG_LISTENERS=PLAINTEXT://:9093
  114. - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.77:9093
  115. - KAFKA_CFG_ZOOKEEPER_CONNECT=zoo1:2181,zoo2:2182,zoo3:2183
  116. - KAFKA_HEAP_OPTS=-Xmx512m -Xms512m
  117. networks:
  118. - app-tier
  119. depends_on:
  120. - zoo1
  121. - zoo2
  122. - zoo3
  123. kafdrop:
  124. container_name: kafdrop
  125. hostname: kafdrop
  126. image: obsidiandynamics/kafdrop:3.27.0
  127. restart: always
  128. ports:
  129. - 9094:9000
  130. environment:
  131. - KAFKA_BROKERCONNECT=kafka1:9091,kafka2:9092,kafka3:9093
  132. - JVM_OPTS=-Xms32m -Xmx64m
  133. - SERVER_SERVLET_CONTEXTPATH=/
  134. networks:
  135. - app-tier
  136. depends_on:
  137. - kafka1
  138. - kafka2
  139. - kafka3

3. 以上准备工作都做好后,开始实操。

3.1 启动Kafka。

  1. docker-compose up

3.2 kafdrop - Kafka可视化UI Web。

docker-compose.yml的最后1个引用就是kafdrop。
打开浏览器并访问:http://192.168.3.77:9094/
image.png

3.3 Offset Explorer - Kafka 的终极 UI 工具。

The Ultimate UI Tool for Kafka. 下载地址详见附录。
image.png

4. Spring项目集成。

Topic 主题
Partition 分区
Kafka中每个主题可以有多个分区,每个分区只能有一个消费者,保证消息消费是有序的。
如果需要将消息发送到指定分区,可以单独定义分发规则,自定义方法实现 Partitioner 接口。

4.1 pom.xml 依赖引用

  1. <kafka.version>2.4.1</kafka.version>
  2. <jackson.version>2.12.5</jackson.version>
  3. <dependency>
  4. <groupId>org.springframework.kafka</groupId>
  5. <artifactId>spring-kafka</artifactId>
  6. <exclusions>
  7. <exclusion>
  8. <artifactId>kafka-clients</artifactId>
  9. <groupId>org.apache.kafka</groupId>
  10. </exclusion>
  11. </exclusions>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.apache.kafka</groupId>
  15. <artifactId>kafka-clients</artifactId>
  16. <version>${kafka.version}</version>
  17. </dependency>
  18. <dependency>
  19. <groupId>com.fasterxml.jackson.core</groupId>
  20. <artifactId>jackson-databind</artifactId>
  21. <version>${jackson.version}</version>
  22. </dependency>

4.2 自定义Kafka解析器

4.2.1 自定义对象序列化工具 KafkaJsonSerializer

  1. import com.fasterxml.jackson.databind.ObjectMapper;
  2. import lombok.SneakyThrows;
  3. import org.apache.kafka.common.header.Headers;
  4. import org.apache.kafka.common.serialization.Serializer;
  5. import java.util.Map;
  6. /**
  7. * 自定义对象序列化工具
  8. *
  9. * @author menglt
  10. * @since 2021/11/20 15:52
  11. */
  12. public class KafkaJsonSerializer implements Serializer<Object> {
  13. @Override
  14. @SneakyThrows
  15. public byte[] serialize(String s, Object o) {
  16. ObjectMapper objectMapper = new ObjectMapper();
  17. return objectMapper.writeValueAsString(o).getBytes();
  18. }
  19. @Override
  20. public void configure(Map<String, ?> configs, boolean isKey) {
  21. Serializer.super.configure(configs, isKey);
  22. }
  23. @Override
  24. public byte[] serialize(String topic, Headers headers, Object data) {
  25. return Serializer.super.serialize(topic, headers, data);
  26. }
  27. @Override
  28. public void close() {
  29. Serializer.super.close();
  30. }
  31. }

4.2.2 自定义对象反序列化工具 KafkaJsonDeserializer

  1. import com.fasterxml.jackson.databind.JsonNode;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import lombok.SneakyThrows;
  4. import org.apache.kafka.common.header.Headers;
  5. import org.apache.kafka.common.serialization.Deserializer;
  6. import java.util.Map;
  7. /**
  8. * 自定义对象反序列化工具
  9. *
  10. * @author menglt
  11. * @since 2021/11/20 15:52
  12. */
  13. public class KafkaJsonDeserializer implements Deserializer<JsonNode> {
  14. @Override
  15. @SneakyThrows
  16. public JsonNode deserialize(String s, byte[] bytes) {
  17. ObjectMapper objectMapper = new ObjectMapper();
  18. return objectMapper.readTree(bytes);
  19. }
  20. @Override
  21. public void configure(Map<String, ?> configs, boolean isKey) {
  22. Deserializer.super.configure(configs, isKey);
  23. }
  24. @Override
  25. public JsonNode deserialize(String topic, Headers headers, byte[] data) {
  26. return Deserializer.super.deserialize(topic, headers, data);
  27. }
  28. @Override
  29. public void close() {
  30. Deserializer.super.close();
  31. }
  32. }

4.3 application.properties

4.3.1 生产者 producer

  1. # kafka
  2. spring.kafka.bootstrap-servers=192.168.3.77:9091,192.168.3.77:9092,192.168.3.77:9093
  3. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  4. spring.kafka.producer.value-serializer=com.your.project.kafka.KafkaJsonSerializer

4.3.2 消费者 consumer

  1. # kafka
  2. spring.kafka.bootstrap-servers=192.168.3.77:9091,192.168.3.77:9092,192.168.3.77:9093
  3. spring.kafka.consumer.group-id=your-project-group-id
  4. spring.kafka.consumer.auto-offset-reset=earliest
  5. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  6. spring.kafka.consumer.value-deserializer=com.your.project.kafka.KafkaJsonDeserializer
  7. spring.kafka.listener.missing-topics-fatal=false

4.4 SpringBootApplication

  1. // 启动kafka
  2. @EnableKafka
  3. @SpringBootApplication
  4. public class YourProjectApplication {
  5. public static void main(String[] args) {
  6. SpringApplication springApplication = new SpringApplication(YourProjectApplication.class);
  7. springApplication.run(args);
  8. }
  9. // 如果没有创建好的topic,就初始化1个
  10. @Bean
  11. public NewTopic topic() {
  12. return TopicBuilder.name("your-project-log-topic").partitions(10).replicas(1).build();
  13. }
  14. }

4.5 KafkaTemplate

  1. import org.springframework.kafka.core.KafkaTemplate;
  2. import org.springframework.kafka.support.SendResult;
  3. import org.springframework.util.concurrent.ListenableFuture;
  4. @Autowired private KafkaTemplate kafkaTemplate;
  5. YourProjectLog yourProjectLog = new YourProjectLog();
  6. ListenableFuture<SendResult> listenableFuture = kafkaTemplate.send("your-project-log-topic", yourProjectLog);

4.6 @KafkaListener

  1. @KafkaListener(topics = "your-project-log-topic", groupId = "your-project-group-id", topicPartitions = {})
  2. public void processMessage(String content) {
  3. log.info("receiving message: " + content);
  4. }

5. 附录。

5.1 Offset Explorer 下载地址

5.2 Kafka 文档

5.3 Logstash-Plugin 集成Kafka方案

5.4 Spring for Apache Kafka

5.5 spring-kafka 更多配置参数

[

](http://kafka.apache.org/28/documentation.html)