Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
Apache Kafka 是一个开源分布式事件流平台,已被数千家公司使用,作为高性能数据通道、数据流分析、数据采集和关键任务应用。
搭建环境是基于Docker,Docker Compose。
1. 创建目录。
mkdir -p kafka/zoo1/data
mkdir -p kafka/zoo1/datalog
mkdir -p kafka/zoo1/logs
mkdir -p kafka/zoo2/data
mkdir -p kafka/zoo2/datalog
mkdir -p kafka/zoo2/logs
mkdir -p kafka/zoo3/data
mkdir -p kafka/zoo3/datalog
mkdir -p kafka/zoo3/logs
mkdir -p kafka/kafka1
mkdir -p kafka/kafka2
mkdir -p kafka/kafka3
Kafka目录如下:
.
├── docker-compose.yml
├── kafka1
├── kafka2
├── kafka3
├── zoo1
│ ├── data
│ ├── datalog
│ └── logs
├── zoo2
│ ├── data
│ ├── datalog
│ └── logs
└── zoo3
├── data
├── datalog
└── logs
2. 创建docker-compose.yml文件并放在根目录。
version: '3.8'
networks:
app-tier:
driver: bridge
services:
zoo1:
container_name: zoo1
hostname: zoo1
image: zookeeper:3.7
restart: always
ports:
- 2181:2181
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
ZOO_LOG4J_PROP: INFO,ROLLINGFILE
volumes:
- ./zoo1/data:/data
- ./zoo1/datalog:/datalog
- ./zoo1/logs:/logs
networks:
- app-tier
zoo2:
container_name: zoo2
hostname: zoo2
image: zookeeper:3.7
restart: always
ports:
- 2182:2181
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
ZOO_LOG4J_PROP: INFO,ROLLINGFILE
volumes:
- ./zoo2/data:/data
- ./zoo2/datalog:/datalog
- ./zoo2/logs:/logs
networks:
- app-tier
zoo3:
container_name: zoo3
hostname: zoo3
image: zookeeper:3.7
restart: always
ports:
- 2183:2181
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
ZOO_LOG4J_PROP: INFO,ROLLINGFILE
volumes:
- ./zoo3/data:/data
- ./zoo3/datalog:/datalog
- ./zoo3/logs:/logs
networks:
- app-tier
kafka1:
container_name: kafka1
hostname: kafka1
image: bitnami/kafka:2.8.1
restart: always
ports:
- '9091:9091'
volumes:
- ./kafka1:/bitnami/kafka
environment:
- KAFKA_BROKER_ID=1
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9091
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.77:9091
- KAFKA_CFG_ZOOKEEPER_CONNECT=zoo1:2181,zoo2:2182,zoo3:2183
- KAFKA_HEAP_OPTS=-Xmx512m -Xms512m
networks:
- app-tier
depends_on:
- zoo1
- zoo2
- zoo3
kafka2:
container_name: kafka2
hostname: kafka2
image: bitnami/kafka:2.8.1
restart: always
ports:
- '9092:9092'
volumes:
- ./kafka2:/bitnami/kafka
environment:
- KAFKA_BROKER_ID=2
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.77:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zoo1:2181,zoo2:2182,zoo3:2183
- KAFKA_HEAP_OPTS=-Xmx512m -Xms512m
networks:
- app-tier
depends_on:
- zoo1
- zoo2
- zoo3
kafka3:
container_name: kafka3
hostname: kafka3
image: bitnami/kafka:2.8.1
restart: always
ports:
- '9093:9093'
volumes:
- ./kafka3:/bitnami/kafka
environment:
- KAFKA_BROKER_ID=3
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.77:9093
- KAFKA_CFG_ZOOKEEPER_CONNECT=zoo1:2181,zoo2:2182,zoo3:2183
- KAFKA_HEAP_OPTS=-Xmx512m -Xms512m
networks:
- app-tier
depends_on:
- zoo1
- zoo2
- zoo3
kafdrop:
container_name: kafdrop
hostname: kafdrop
image: obsidiandynamics/kafdrop:3.27.0
restart: always
ports:
- 9094:9000
environment:
- KAFKA_BROKERCONNECT=kafka1:9091,kafka2:9092,kafka3:9093
- JVM_OPTS=-Xms32m -Xmx64m
- SERVER_SERVLET_CONTEXTPATH=/
networks:
- app-tier
depends_on:
- kafka1
- kafka2
- kafka3
3. 以上准备工作都做好后,开始实操。
3.1 启动Kafka。
docker-compose up
3.2 kafdrop - Kafka可视化UI Web。
docker-compose.yml的最后1个引用就是kafdrop。
打开浏览器并访问:http://192.168.3.77:9094/
3.3 Offset Explorer - Kafka 的终极 UI 工具。
The Ultimate UI Tool for Kafka. 下载地址详见附录。
4. Spring项目集成。
Topic 主题
Partition 分区
Kafka中每个主题可以有多个分区,每个分区只能有一个消费者,保证消息消费是有序的。
如果需要将消息发送到指定分区,可以单独定义分发规则,自定义方法实现 Partitioner 接口。
4.1 pom.xml 依赖引用
<kafka.version>2.4.1</kafka.version>
<jackson.version>2.12.5</jackson.version>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<artifactId>kafka-clients</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
4.2 自定义Kafka解析器
4.2.1 自定义对象序列化工具 KafkaJsonSerializer
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
/**
* 自定义对象序列化工具
*
* @author menglt
* @since 2021/11/20 15:52
*/
public class KafkaJsonSerializer implements Serializer<Object> {
@Override
@SneakyThrows
public byte[] serialize(String s, Object o) {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(o).getBytes();
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Serializer.super.configure(configs, isKey);
}
@Override
public byte[] serialize(String topic, Headers headers, Object data) {
return Serializer.super.serialize(topic, headers, data);
}
@Override
public void close() {
Serializer.super.close();
}
}
4.2.2 自定义对象反序列化工具 KafkaJsonDeserializer
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
/**
* 自定义对象反序列化工具
*
* @author menglt
* @since 2021/11/20 15:52
*/
public class KafkaJsonDeserializer implements Deserializer<JsonNode> {
@Override
@SneakyThrows
public JsonNode deserialize(String s, byte[] bytes) {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readTree(bytes);
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Deserializer.super.configure(configs, isKey);
}
@Override
public JsonNode deserialize(String topic, Headers headers, byte[] data) {
return Deserializer.super.deserialize(topic, headers, data);
}
@Override
public void close() {
Deserializer.super.close();
}
}
4.3 application.properties
4.3.1 生产者 producer
# kafka
spring.kafka.bootstrap-servers=192.168.3.77:9091,192.168.3.77:9092,192.168.3.77:9093
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=com.your.project.kafka.KafkaJsonSerializer
4.3.2 消费者 consumer
# kafka
spring.kafka.bootstrap-servers=192.168.3.77:9091,192.168.3.77:9092,192.168.3.77:9093
spring.kafka.consumer.group-id=your-project-group-id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=com.your.project.kafka.KafkaJsonDeserializer
spring.kafka.listener.missing-topics-fatal=false
4.4 SpringBootApplication
// 启动kafka
@EnableKafka
@SpringBootApplication
public class YourProjectApplication {
public static void main(String[] args) {
SpringApplication springApplication = new SpringApplication(YourProjectApplication.class);
springApplication.run(args);
}
// 如果没有创建好的topic,就初始化1个
@Bean
public NewTopic topic() {
return TopicBuilder.name("your-project-log-topic").partitions(10).replicas(1).build();
}
}
4.5 KafkaTemplate
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
@Autowired private KafkaTemplate kafkaTemplate;
YourProjectLog yourProjectLog = new YourProjectLog();
ListenableFuture<SendResult> listenableFuture = kafkaTemplate.send("your-project-log-topic", yourProjectLog);
4.6 @KafkaListener
@KafkaListener(topics = "your-project-log-topic", groupId = "your-project-group-id", topicPartitions = {})
public void processMessage(String content) {
log.info("receiving message: " + content);
}
5. 附录。
5.1 Offset Explorer 下载地址。
5.2 Kafka 文档。
5.3 Logstash-Plugin 集成Kafka方案。
5.4 Spring for Apache Kafka。
5.5 spring-kafka 更多配置参数。
[