参考:https://developer.aliyun.com/article/716134
#参考:https://juejin.im/entry/6844903829624848398
镜像:
docker pull dockerkafka/zookeeper
docker pull dockerkafka/kafka
docker pull dockerkafka/kafka-manager
- 先搭建zookeeper集群,并创建一个网络
-
docker-compose 搭建zookeeper集群
执行,即获得一个zookeeper集群:
sudo docker-compose -f docker-compose.yml.zookeeper -p zookeeper up -d
>>
Creating network "zookeeper" with the default driver
Creating zoo2 ... done
Creating zoo3 ... done
Creating zoo1 ... done
docker-compose.yml.zookeeper文件 ```yaml version: ‘2’ services: zoo1: image: wurstmeister/zookeeper restart: always hostname: zoo1 container_name: zoo1 privileged: true ports: # 端口
- 2181:2181 environment: TZ: Asia/Shanghai ZOO_MY_ID: 1 # 节点ID ZOO_PORT: 2181 # zookeeper端口号 ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 # zookeeper节点列表 networks: zookeeper: ipv4_address: 172.25.0.11
zoo2: image: wurstmeister/zookeeper restart: always hostname: zoo2 container_name: zoo2 privileged: true ports:
- 2182:2181 environment: TZ: Asia/Shanghai ZOO_MY_ID: 2 ZOO_PORT: 2181 ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 networks: zookeeper: ipv4_address: 172.25.0.12
zoo3: image: wurstmeister/zookeeper restart: always hostname: zoo3 container_name: zoo3 privileged: true ports:
- 2183:2181 environment: TZ: Asia/Shanghai ZOO_MY_ID: 3 ZOO_PORT: 2181 ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 networks: zookeeper: ipv4_address: 172.25.0.13
networks: zookeeper: ipam: config:
- subnet: 172.25.0.0/16
gateway: 172.25.0.1
<a name="dXJO3"></a>
### docker-compose搭建kafka集群
Kafka部分名词解释如下:
- Broker:消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
- Topic:一类消息,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。
- Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。
- Segment:partition物理上由多个segment组成,下面有详细说明。
- offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息.每个partition中的消息都由offset=0开始记录消息。
基于zookeeper集群网络,需要修改不同的ip<br />执行命令即获得kafka集群<br />`sudo docker-compose -f docker-compose.yml.kafka up -d`<br />docker-compose.yml.kafka文件
```yaml
version: '2'
services:
broker1:
image: wurstmeister/kafka
restart: always
hostname: broker1
container_name: broker1
privileged: true
ports:
- "9091:9092"
environment:
TZ: Asia/Shanghai
KAFKA_BROKER_ID: 1
KAFKA_LISTENERS: PLAINTEXT://broker1:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker1:9092
KAFKA_ADVERTISED_HOST_NAME: broker1
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181/kafka,zoo2:2181/kafka,zoo3:2181/kafka
JMX_PORT: 9988
external_links: # 连接本compose文件以外的container
- zoo1
- zoo2
- zoo3
working_dir: /opt/kafka
networks:
default:
ipv4_address: 172.25.0.14
broker2:
image: wurstmeister/kafka
restart: always
hostname: broker2
container_name: broker2
privileged: true
ports:
- "9092:9092"
environment:
TZ: Asia/Shanghai
KAFKA_BROKER_ID: 2
KAFKA_LISTENERS: PLAINTEXT://broker2:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker2:9092
KAFKA_ADVERTISED_HOST_NAME: broker2
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181/kafka,zoo2:2181/kafka,zoo3:2181/kafka
JMX_PORT: 9988
external_links:
- zoo1
- zoo2
- zoo3
working_dir: /opt/kafka
networks:
default:
ipv4_address: 172.25.0.15
broker3:
image: wurstmeister/kafka
restart: always
hostname: broker3
container_name: broker3
privileged: true
ports:
- "9093:9092"
environment:
TZ: Asia/Shanghai
KAFKA_BROKER_ID: 3
KAFKA_LISTENERS: PLAINTEXT://broker3:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker3:9092
KAFKA_ADVERTISED_HOST_NAME: broker3
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181/kafka,zoo2:2181/kafka,zoo3:2181/kafka
JMX_PORT: 9988
external_links:
- zoo1
- zoo2
- zoo3
working_dir: /opt/kafka
networks:
default:
ipv4_address: 172.25.0.16
kafka-manager:
image: sheepkiller/kafka-manager:latest
restart: always
container_name: kafka-manager
hostname: kafka-manager
ports:
- "9090:9000"
links: # 连接本compose文件创建的container
- broker1
- broker2
- broker3
external_links: # 连接本compose文件以外的container
- zoo1
- zoo2
- zoo3
environment:
TZ: Asia/Shanghai
ZK_HOSTS: zoo1:2181/kafka,zoo2:2181/kafka,zoo3:2181/kafka
KAFKA_BROKERS: broker1:9092,broker2:9092,broker3:9092
APPLICATION_SECRET: letmein
KM_ARGS: -Djava.net.preferIPv4Stack=true
networks:
default:
ipv4_address: 172.25.0.10
networks:
default:
external: # 使用已创建的网络,格式为 目录名_default
name: zookeeper_default
验证
我们打开kafka-manager的管理页面,访问路径是,宿主机ip:9000;
如果所示,填写上Zookeeper集群的地址,划到最下边点击save
点击刚刚添加的集群,可以看到,集群中有三个节点
查看版本
root:~$ docker exec broker1 find / -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'
kafka_2.13-2.6.0
root:~$ docker exec zoo1 pwd
/opt/zookeeper-3.4.13
执行上面两句,就可以得到kafka版本和zookeeper版本
其中kafka_2.13-2.6.0 为:scala版本2.13,kafka版本2.6.0,zookeeper版本3.4.13
测试kafka 1
单机
docker exec -it broker1 /bin/bash
cd /opt/kafka_2.13-2.6.0
启动消息发送方
unset JMX_PORT; ./bin/kafka-console-producer.sh —broker-list broker1:9092 —topic mykafka
新建一个shell,启动消息接收方
unset JMX_PORT; ./bin/kafka-console-consumer.sh —bootstrap-server broker1:9092 —topic mykafka —from-beginning
在消息发送方输入123456
在消息接收方查看
如果看到123456 消息发送完成
集群
创建Replication为2,Partition为2的topic 在kafka容器中的opt/kafka_2.13-2.6.0/目录下输入
unset JMX_PORT;./bin/kafka-topics.sh --create --zookeeper zoo1:2181/kafka,zoo2:2181/kafka,zoo3:2181/kafka --replication-factor 2 --partitions 2 --topic partopic
说明: —replication-factor副本数; —partitions分区数; replication<=broker(一定); 有效消费者数<=partitions分区数(一定);
—zookeeper zoo1:2181/kafka 对应docker-compose.yml中的 KAFKA_ZOOKEEPER_CONNECT
查看集群topics:
./bin/kafka-topics.sh --list --zookeeper zoo1:2181/kafka,zoo2:2181/kafka,zoo3:2181/kafka
查看topic的状态 在kafka容器中的opt/kafka_2.13-2.6.0/目录下输入
bin/kafka-topics.sh --describe --zookeeper zoo1:2181/kafka,zoo2:2181/kafka,zoo3:2181/kafka --topic partopic
输出结果:
Topic:partopic PartitionCount:2 ReplicationFactor:2 Configs:
Topic: partopic Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: partopic Partition: 1 Leader: 0 Replicas: 1,0 Isr: 0,1
显示每个分区的Leader机器为broker0,在broker0和1上具有备份,Isr代表存活的备份机器中存活的。 当停掉kafka1后,
docker stop kafka1
再查看topic状态,输出结果:
Topic:partopic PartitionCount:2 ReplicationFactor:2 Configs:
Topic: partopic Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0
Topic: partopic Partition: 1 Leader: 0 Replicas: 1,0 Isr: 0
生产/消费数据的kafka端可以是任意的。
生产数据
./bin/kafka-console-producer.sh --broker-list broker1:9092,broker2:9092,broker3:9092 --topic partopic
消费数据
unset JMX_PORT;./bin/kafka-console-consumer.sh --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --topic partopic --from-beginning