参考链接

kafka 中文文档:http://kafka.apachecn.org/quickstart.html#quickstart_multibroker
kafka docker 镜像使用:https://blog.csdn.net/boling_cavalry/article/details/85395080
kafka 集群: https://zhuanlan.zhihu.com/p/110905106

kafka 集群

  1. version: '3'
  2. services:
  3. zookeeper:
  4. image: wurstmeister/zookeeper
  5. container_name: zookeeper
  6. ports:
  7. - "2181:2181"
  8. networks:
  9. - kafka_net
  10. kafka1:
  11. image: wurstmeister/kafka:2.11-0.11.0.3
  12. container_name: kafka1
  13. ports:
  14. - "9092:9092"
  15. environment:
  16. KAFKA_BROKER_ID: 1
  17. # 对应 server.properties 中 advertised.listeners 配置
  18. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
  19. # 对应 server.properties 中 listeners 配置
  20. KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
  21. KAFKA_ADVERTISED_PORT: 9092
  22. KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  23. volumes:
  24. - ./data/kafka1:/kafka
  25. depends_on:
  26. - zookeeper
  27. networks:
  28. - kafka_net
  29. kafka2:
  30. image: wurstmeister/kafka:2.11-0.11.0.3
  31. container_name: kafka2
  32. ports:
  33. - "9093:9092"
  34. environment:
  35. KAFKA_BROKER_ID: 2
  36. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092
  37. KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
  38. KAFKA_ADVERTISED_PORT: 9092
  39. KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  40. volumes:
  41. - ./data/kafka2:/kafka
  42. depends_on:
  43. - zookeeper
  44. networks:
  45. - kafka_net
  46. networks:
  47. kafka_net:
  48. driver: bridge

创建 topic

  • 进入到容器

    1. docker exec -it kafka /bin/bash
  • 进入bin目录

    1. cd opt/kafka/bin/
  • 调用脚本创建 topic

    1. kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test

    --zookeeper :zookeeper 的地址 --replication-factor :副本数量 --partitions :partitions 数量 --topic :topic 名称

修改 topic

修改 partitions 数量

  1. kafka-topics.sh --alter --zookeeper zookeeper:2181 --topic test --partitions 4

查看 topic 列表

  1. > kafka-topics.sh --list --zookeeper zookeeper:2181
  2. test

启动 producer 发送消息到 topic

  1. > kafka-console-producer.sh --broker-list kafka1:9092 --topic test
  2. This is a Message
  3. This is another Message

启动 consomer

消费 topic 消息,并输出到控制台

  1. > kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test --from-beginning
  2. This is a Message
  3. This is another Message

查看某个 topic 详情

  1. > kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic my-replicated-topic
  2. Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
  3. Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

leader :是负责给定分区所有读写操作的节点。每个节点都是随机选择的部分分区的领导者 replicas :是复制分区日志的节点列表,不管这些节点是leader还是仅仅活着 isr :是一组“同步”replicas,是replicas列表的子集,它活着并被指到leader

集群容错实例

创建一个副本为 2 的 topic

  1. > kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 2 --partitions 1 --topic my-replicated-topic

查看 topic 信息

  1. > kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic my-replicated-topic
  2. Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:2 Configs:
  3. Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1

发表一些消息到新的 topic

  1. > kafka-console-producer.sh --broker-list kafka1:9092 --topic my-replicated-topic
  2. my test message 1
  3. my test message 2

消费消息

  1. > kafka-console-consumer.sh --bootstrap-server kafka1:9092 --from-beginning --topic my-replicated-topic
  2. ...
  3. my test message 1
  4. my test message 2
  5. ^C

容错测试
broker2 现在是 leader,试着 kill 掉 broker2

  1. > docker container stop kafka2

再次查看 topic 信息

  1. > kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic my-replicated-topic
  2. Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:2 Configs:
  3. Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1

leader 变成了 broker1 ,而且 broker2 也不再副本集(Isr)中了 停掉一个节点后,my-replicated-topic 还能正常收发消息

kafka connect 使用

进入kafka目录,后续操作均在该目录下进行

  1. cd opt/kafka

添加测试数据

  1. echo -e "foo\nbar" > test.txt

启动两个独立运行(standlone)的连接器

  1. > bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

查看运行结果

  1. more test.sink.text

数据存储在Kafka topic connect-test 中,因此我们也可以运行一个 console consumer(控制台消费者)来查看 topic 中的数据(或使用custom consumer(自定义消费者)代码进行处理)

  1. > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
  2. {"schema":{"type":"string","optional":false},"payload":"foo"}
  3. {"schema":{"type":"string","optional":false},"payload":"bar"}
  4. ...