Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
Apache Kafka 是一个开源分布式事件流平台,已被数千家公司使用,作为高性能数据通道、数据流分析、数据采集和关键任务应用。
搭建环境是基于Docker,Docker Compose。
1. 创建目录。
mkdir -p kafka/zoo1/datamkdir -p kafka/zoo1/datalogmkdir -p kafka/zoo1/logsmkdir -p kafka/zoo2/datamkdir -p kafka/zoo2/datalogmkdir -p kafka/zoo2/logsmkdir -p kafka/zoo3/datamkdir -p kafka/zoo3/datalogmkdir -p kafka/zoo3/logsmkdir -p kafka/kafka1mkdir -p kafka/kafka2mkdir -p kafka/kafka3
Kafka目录如下:
.├── docker-compose.yml├── kafka1├── kafka2├── kafka3├── zoo1│ ├── data│ ├── datalog│ └── logs├── zoo2│ ├── data│ ├── datalog│ └── logs└── zoo3├── data├── datalog└── logs
2. 创建docker-compose.yml文件并放在根目录。
version: '3.8'networks:app-tier:driver: bridgeservices:zoo1:container_name: zoo1hostname: zoo1image: zookeeper:3.7restart: alwaysports:- 2181:2181environment:ZOO_MY_ID: 1ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181ZOO_LOG4J_PROP: INFO,ROLLINGFILEvolumes:- ./zoo1/data:/data- ./zoo1/datalog:/datalog- ./zoo1/logs:/logsnetworks:- app-tierzoo2:container_name: zoo2hostname: zoo2image: zookeeper:3.7restart: alwaysports:- 2182:2181environment:ZOO_MY_ID: 2ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181ZOO_LOG4J_PROP: INFO,ROLLINGFILEvolumes:- ./zoo2/data:/data- ./zoo2/datalog:/datalog- ./zoo2/logs:/logsnetworks:- app-tierzoo3:container_name: zoo3hostname: zoo3image: zookeeper:3.7restart: alwaysports:- 2183:2181environment:ZOO_MY_ID: 3ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181ZOO_LOG4J_PROP: INFO,ROLLINGFILEvolumes:- ./zoo3/data:/data- ./zoo3/datalog:/datalog- ./zoo3/logs:/logsnetworks:- app-tierkafka1:container_name: kafka1hostname: kafka1image: bitnami/kafka:2.8.1restart: alwaysports:- '9091:9091'volumes:- ./kafka1:/bitnami/kafkaenvironment:- KAFKA_BROKER_ID=1- ALLOW_PLAINTEXT_LISTENER=yes- KAFKA_CFG_LISTENERS=PLAINTEXT://:9091- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.77:9091- KAFKA_CFG_ZOOKEEPER_CONNECT=zoo1:2181,zoo2:2182,zoo3:2183- KAFKA_HEAP_OPTS=-Xmx512m -Xms512mnetworks:- app-tierdepends_on:- zoo1- zoo2- zoo3kafka2:container_name: kafka2hostname: kafka2image: bitnami/kafka:2.8.1restart: alwaysports:- '9092:9092'volumes:- ./kafka2:/bitnami/kafkaenvironment:- KAFKA_BROKER_ID=2- ALLOW_PLAINTEXT_LISTENER=yes- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.77:9092- KAFKA_CFG_ZOOKEEPER_CONNECT=zoo1:2181,zoo2:2182,zoo3:2183- KAFKA_HEAP_OPTS=-Xmx512m -Xms512mnetworks:- app-tierdepends_on:- zoo1- zoo2- zoo3kafka3:container_name: kafka3hostname: kafka3image: bitnami/kafka:2.8.1restart: alwaysports:- '9093:9093'volumes:- ./kafka3:/bitnami/kafkaenvironment:- KAFKA_BROKER_ID=3- ALLOW_PLAINTEXT_LISTENER=yes- KAFKA_CFG_LISTENERS=PLAINTEXT://:9093- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.77:9093- KAFKA_CFG_ZOOKEEPER_CONNECT=zoo1:2181,zoo2:2182,zoo3:2183- KAFKA_HEAP_OPTS=-Xmx512m -Xms512mnetworks:- app-tierdepends_on:- zoo1- zoo2- zoo3kafdrop:container_name: kafdrophostname: kafdropimage: obsidiandynamics/kafdrop:3.27.0restart: alwaysports:- 9094:9000environment:- KAFKA_BROKERCONNECT=kafka1:9091,kafka2:9092,kafka3:9093- JVM_OPTS=-Xms32m -Xmx64m- SERVER_SERVLET_CONTEXTPATH=/networks:- app-tierdepends_on:- kafka1- kafka2- kafka3
3. 以上准备工作都做好后,开始实操。
3.1 启动Kafka。
docker-compose up
3.2 kafdrop - Kafka可视化UI Web。
docker-compose.yml的最后1个引用就是kafdrop。
打开浏览器并访问:http://192.168.3.77:9094/
3.3 Offset Explorer - Kafka 的终极 UI 工具。
The Ultimate UI Tool for Kafka. 下载地址详见附录。
4. Spring项目集成。
Topic 主题
Partition 分区
Kafka中每个主题可以有多个分区,每个分区只能有一个消费者,保证消息消费是有序的。
如果需要将消息发送到指定分区,可以单独定义分发规则,自定义方法实现 Partitioner 接口。
4.1 pom.xml 依赖引用
<kafka.version>2.4.1</kafka.version><jackson.version>2.12.5</jackson.version><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><artifactId>kafka-clients</artifactId><groupId>org.apache.kafka</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>${jackson.version}</version></dependency>
4.2 自定义Kafka解析器
4.2.1 自定义对象序列化工具 KafkaJsonSerializer
import com.fasterxml.jackson.databind.ObjectMapper;import lombok.SneakyThrows;import org.apache.kafka.common.header.Headers;import org.apache.kafka.common.serialization.Serializer;import java.util.Map;/*** 自定义对象序列化工具** @author menglt* @since 2021/11/20 15:52*/public class KafkaJsonSerializer implements Serializer<Object> {@Override@SneakyThrowspublic byte[] serialize(String s, Object o) {ObjectMapper objectMapper = new ObjectMapper();return objectMapper.writeValueAsString(o).getBytes();}@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {Serializer.super.configure(configs, isKey);}@Overridepublic byte[] serialize(String topic, Headers headers, Object data) {return Serializer.super.serialize(topic, headers, data);}@Overridepublic void close() {Serializer.super.close();}}
4.2.2 自定义对象反序列化工具 KafkaJsonDeserializer
import com.fasterxml.jackson.databind.JsonNode;import com.fasterxml.jackson.databind.ObjectMapper;import lombok.SneakyThrows;import org.apache.kafka.common.header.Headers;import org.apache.kafka.common.serialization.Deserializer;import java.util.Map;/*** 自定义对象反序列化工具** @author menglt* @since 2021/11/20 15:52*/public class KafkaJsonDeserializer implements Deserializer<JsonNode> {@Override@SneakyThrowspublic JsonNode deserialize(String s, byte[] bytes) {ObjectMapper objectMapper = new ObjectMapper();return objectMapper.readTree(bytes);}@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {Deserializer.super.configure(configs, isKey);}@Overridepublic JsonNode deserialize(String topic, Headers headers, byte[] data) {return Deserializer.super.deserialize(topic, headers, data);}@Overridepublic void close() {Deserializer.super.close();}}
4.3 application.properties
4.3.1 生产者 producer
# kafkaspring.kafka.bootstrap-servers=192.168.3.77:9091,192.168.3.77:9092,192.168.3.77:9093spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=com.your.project.kafka.KafkaJsonSerializer
4.3.2 消费者 consumer
# kafkaspring.kafka.bootstrap-servers=192.168.3.77:9091,192.168.3.77:9092,192.168.3.77:9093spring.kafka.consumer.group-id=your-project-group-idspring.kafka.consumer.auto-offset-reset=earliestspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=com.your.project.kafka.KafkaJsonDeserializerspring.kafka.listener.missing-topics-fatal=false
4.4 SpringBootApplication
// 启动kafka@EnableKafka@SpringBootApplicationpublic class YourProjectApplication {public static void main(String[] args) {SpringApplication springApplication = new SpringApplication(YourProjectApplication.class);springApplication.run(args);}// 如果没有创建好的topic,就初始化1个@Beanpublic NewTopic topic() {return TopicBuilder.name("your-project-log-topic").partitions(10).replicas(1).build();}}
4.5 KafkaTemplate
import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.util.concurrent.ListenableFuture;@Autowired private KafkaTemplate kafkaTemplate;YourProjectLog yourProjectLog = new YourProjectLog();ListenableFuture<SendResult> listenableFuture = kafkaTemplate.send("your-project-log-topic", yourProjectLog);
4.6 @KafkaListener
@KafkaListener(topics = "your-project-log-topic", groupId = "your-project-group-id", topicPartitions = {})public void processMessage(String content) {log.info("receiving message: " + content);}
5. 附录。
5.1 Offset Explorer 下载地址。
5.2 Kafka 文档。
5.3 Logstash-Plugin 集成Kafka方案。
5.4 Spring for Apache Kafka。
5.5 spring-kafka 更多配置参数。
[
