资源规划
组件 | bigdata-node1 | bigdata-node2 | bigdata-node3 |
---|---|---|---|
OS | centos7.6 | centos7.6 | centos7.6 |
JDK | jvm | jvm | jvm |
Zookeeper | QuorumPeerMain | QuorumPeerMain | QuorumPeerMain |
Kafka | kafka | kafka | kafka |
安装介质
版本:kafka_2.11-0.11.0.3.tgz
下载:http://kafka.apache.org/downloads.html
环境准备
安装JDK
安装ZooKeeper
参考:《CentOS7.6-安装ZooKeeper-3.4.10》
安装Kafka
解压缩
# 先在节点bigdata-node3上安装,之后分发到bigdata-node1、bigdata-node2
cd /share
tar -zxvf kafka_2.11-0.11.0.3.tgz -C ~/modules/
rm kafka_2.11-0.11.0.3.tgz
创建相关目录
# kafka数据目录
mkdir -p ~/modules/kafka_2.11-0.11.0.3/tmp/kafka-logs
chmod -R a+w ~/modules/kafka_2.11-0.11.0.3/tmp/kafka-logs
配置
- server.properties
配置如下:vi ~/modules/kafka_2.11-0.11.0.3/config/server.properties
2.zookeeper.properties# 节点唯一标识,注意该值为整形,建议设置为IP最后一段
broker.id=103
# 默认端口号
listeners=PLAINTEXT://192.168.0.103:9092
advertised.listeners=PLAINTEXT://192.168.0.103:9092
# Kafka数据目录
log.dirs=/home/vagrant/modules/kafka_2.11-0.11.0.3/tmp/kafka-logs
# 配置Zookeeper
zookeeper.connect=192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181
# 配置删除属性
delete.topic.enable=true
配置如下:vi ~/modules/kafka_2.11-0.11.0.3/config/zookeeper.properties
3.consumer.properties#Zookeeper的数据存储路径与Zookeeper集群配置保持一致
dataDir=/home/vagrant/modules/zookeeper-3.4.10/data
配置如下:vi ~/modules/kafka_2.11-0.11.0.3/config/consumer.properties
4.producer.properties#配置Zookeeper地址
zookeeper.connect=192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181
配置如下:vi ~/modules/kafka_2.11-0.11.0.3/config/producer.properties
5.kafka-run-class.sh#配置Kafka集群地址
bootstrap.servers=192.168.0.101:9092,192.168.0.102:9092,192.168.0.103:9092
配置如下:vi ~/modules/kafka_2.11-0.11.0.3/bin/kafka-run-class.sh
# 行首新增JAVA_HOME配置
export JAVA_HOME=/home/vagrant/modules/jdk1.8.0_221
分发Kafka
cd ~/modules/
scp -r kafka_2.11-0.11.0.3 vagrant@bigdata-node1:~/modules/
scp -r kafka_2.11-0.11.0.3 vagrant@bigdata-node2:~/modules/
修改分发节点的server.properties。
bigdata-node1节点配置如下:vi ~/modules/kafka_2.11-0.11.0.3/config/server.properties
bigdata-node2节点配置如下:broker.id=101
listeners=PLAINTEXT://192.168.0.101:9092
advertised.listeners=PLAINTEXT://192.168.0.101:9092
broker.id=102
listeners=PLAINTEXT://192.168.0.102:9092
advertised.listeners=PLAINTEXT://192.168.0.102:9092
验证Kafka
1.启动Zookeeper集群。
2.启动Kafka集群。
3.创建topic。(任意Kafka集群节点)cd ~/modules/kafka_2.11-0.11.0.3/
# 启动(每个节点)
bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
bin/kafka-server-start.sh -daemon config/server.properties
# 停止(每个节点)
bin/kafka-server-stop.sh config/server.properties
4.查看topic列表。(任意Kafka集群节点)cd ~/modules/kafka_2.11-0.11.0.3/
bin/kafka-topics.sh --zookeeper 192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181 --create --topic test --replication-factor 1 --partitions 3
5.生产者生成数据cd ~/modules/kafka_2.11-0.11.0.3/
bin/kafka-topics.sh --zookeeper 192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181 --list
6.消费者消费数据cd ~/modules/kafka_2.11-0.11.0.3/
bin/kafka-console-producer.sh --broker-list 192.168.0.101:9092,192.168.0.102:9092,192.168.0.103:9092 --topic test
7.删除topiccd ~/modules/kafka_2.11-0.11.0.3/
# --from-beginning 从头开始消费,不加该参数则从最新的消息开始消费,之前的丢弃
# --bootstrap-server 将在kafka集群上创建一个名称为“__consumer_offsets”的topic,50个分区,1个副本,用于存放消费者偏移量
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.101:9092,192.168.0.102:9092,192.168.0.103:9092 --topic test --from-beginning
cd ~/modules/kafka_2.11-0.11.0.3/
bin/kafka-topics.sh --delete --zookeeper 192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181 --topic test