参考链接
kafka 中文文档:http://kafka.apachecn.org/quickstart.html#quickstart_multibroker
kafka docker 镜像使用:https://blog.csdn.net/boling_cavalry/article/details/85395080
kafka 集群: https://zhuanlan.zhihu.com/p/110905106
kafka 集群
version: '3'services:zookeeper:image: wurstmeister/zookeepercontainer_name: zookeeperports:- "2181:2181"networks:- kafka_netkafka1:image: wurstmeister/kafka:2.11-0.11.0.3container_name: kafka1ports:- "9092:9092"environment:KAFKA_BROKER_ID: 1# 对应 server.properties 中 advertised.listeners 配置KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092# 对应 server.properties 中 listeners 配置KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ADVERTISED_PORT: 9092KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181volumes:- ./data/kafka1:/kafkadepends_on:- zookeepernetworks:- kafka_netkafka2:image: wurstmeister/kafka:2.11-0.11.0.3container_name: kafka2ports:- "9093:9092"environment:KAFKA_BROKER_ID: 2KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ADVERTISED_PORT: 9092KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181volumes:- ./data/kafka2:/kafkadepends_on:- zookeepernetworks:- kafka_netnetworks:kafka_net:driver: bridge
创建 topic
进入到容器
docker exec -it kafka /bin/bash
进入bin目录
cd opt/kafka/bin/
调用脚本创建 topic
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test
--zookeeper:zookeeper 的地址--replication-factor:副本数量--partitions:partitions 数量--topic:topic 名称
修改 topic
修改 partitions 数量
kafka-topics.sh --alter --zookeeper zookeeper:2181 --topic test --partitions 4
查看 topic 列表
> kafka-topics.sh --list --zookeeper zookeeper:2181test
启动 producer 发送消息到 topic
> kafka-console-producer.sh --broker-list kafka1:9092 --topic testThis is a MessageThis is another Message
启动 consomer
消费 topic 消息,并输出到控制台
> kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test --from-beginningThis is a MessageThis is another Message
查看某个 topic 详情
> kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic my-replicated-topicTopic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
leader:是负责给定分区所有读写操作的节点。每个节点都是随机选择的部分分区的领导者replicas:是复制分区日志的节点列表,不管这些节点是leader还是仅仅活着isr:是一组“同步”replicas,是replicas列表的子集,它活着并被指到leader
集群容错实例
创建一个副本为 2 的 topic
> kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 2 --partitions 1 --topic my-replicated-topic
查看 topic 信息
> kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic my-replicated-topicTopic:my-replicated-topic PartitionCount:1 ReplicationFactor:2 Configs:Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
发表一些消息到新的 topic
> kafka-console-producer.sh --broker-list kafka1:9092 --topic my-replicated-topicmy test message 1my test message 2
消费消息
> kafka-console-consumer.sh --bootstrap-server kafka1:9092 --from-beginning --topic my-replicated-topic...my test message 1my test message 2^C
容错测试
broker2 现在是 leader,试着 kill 掉 broker2
> docker container stop kafka2
再次查看 topic 信息
> kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic my-replicated-topicTopic:my-replicated-topic PartitionCount:1 ReplicationFactor:2 Configs:Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1
leader 变成了 broker1 ,而且 broker2 也不再副本集(Isr)中了 停掉一个节点后,my-replicated-topic 还能正常收发消息
kafka connect 使用
进入kafka目录,后续操作均在该目录下进行
cd opt/kafka
添加测试数据
echo -e "foo\nbar" > test.txt
启动两个独立运行(standlone)的连接器
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
查看运行结果
more test.sink.text
数据存储在Kafka topic
connect-test中,因此我们也可以运行一个 console consumer(控制台消费者)来查看 topic 中的数据(或使用custom consumer(自定义消费者)代码进行处理)
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning{"schema":{"type":"string","optional":false},"payload":"foo"}{"schema":{"type":"string","optional":false},"payload":"bar"}...
