资源规划
| 组件 | bigdata-node1 | bigdata-node2 | bigdata-node3 |
|---|---|---|---|
| OS | centos7.6 | centos7.6 | centos7.6 |
| JDK | jvm | jvm | jvm |
| Zookeeper | QuorumPeerMain | QuorumPeerMain | QuorumPeerMain |
| Kafka | kafka | kafka | kafka |
安装介质
版本:kafka_2.11-0.11.0.3.tgz
下载:http://kafka.apache.org/downloads.html
环境准备
安装JDK
安装ZooKeeper
参考:《CentOS7.6-安装ZooKeeper-3.4.10》
安装Kafka
解压缩
# 先在节点bigdata-node3上安装,之后分发到bigdata-node1、bigdata-node2cd /sharetar -zxvf kafka_2.11-0.11.0.3.tgz -C ~/modules/rm kafka_2.11-0.11.0.3.tgz
创建相关目录
# kafka数据目录mkdir -p ~/modules/kafka_2.11-0.11.0.3/tmp/kafka-logschmod -R a+w ~/modules/kafka_2.11-0.11.0.3/tmp/kafka-logs
配置
- server.properties
配置如下:vi ~/modules/kafka_2.11-0.11.0.3/config/server.properties
2.zookeeper.properties# 节点唯一标识,注意该值为整形,建议设置为IP最后一段broker.id=103# 默认端口号listeners=PLAINTEXT://192.168.0.103:9092advertised.listeners=PLAINTEXT://192.168.0.103:9092# Kafka数据目录log.dirs=/home/vagrant/modules/kafka_2.11-0.11.0.3/tmp/kafka-logs# 配置Zookeeperzookeeper.connect=192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181# 配置删除属性delete.topic.enable=true
配置如下:vi ~/modules/kafka_2.11-0.11.0.3/config/zookeeper.properties
3.consumer.properties#Zookeeper的数据存储路径与Zookeeper集群配置保持一致dataDir=/home/vagrant/modules/zookeeper-3.4.10/data
配置如下:vi ~/modules/kafka_2.11-0.11.0.3/config/consumer.properties
4.producer.properties#配置Zookeeper地址zookeeper.connect=192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181
配置如下:vi ~/modules/kafka_2.11-0.11.0.3/config/producer.properties
5.kafka-run-class.sh#配置Kafka集群地址bootstrap.servers=192.168.0.101:9092,192.168.0.102:9092,192.168.0.103:9092
配置如下:vi ~/modules/kafka_2.11-0.11.0.3/bin/kafka-run-class.sh
# 行首新增JAVA_HOME配置export JAVA_HOME=/home/vagrant/modules/jdk1.8.0_221
分发Kafka
cd ~/modules/scp -r kafka_2.11-0.11.0.3 vagrant@bigdata-node1:~/modules/scp -r kafka_2.11-0.11.0.3 vagrant@bigdata-node2:~/modules/
修改分发节点的server.properties。
bigdata-node1节点配置如下:vi ~/modules/kafka_2.11-0.11.0.3/config/server.properties
bigdata-node2节点配置如下:broker.id=101listeners=PLAINTEXT://192.168.0.101:9092advertised.listeners=PLAINTEXT://192.168.0.101:9092
broker.id=102listeners=PLAINTEXT://192.168.0.102:9092advertised.listeners=PLAINTEXT://192.168.0.102:9092
验证Kafka
1.启动Zookeeper集群。
2.启动Kafka集群。
3.创建topic。(任意Kafka集群节点)cd ~/modules/kafka_2.11-0.11.0.3/# 启动(每个节点)bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &bin/kafka-server-start.sh -daemon config/server.properties# 停止(每个节点)bin/kafka-server-stop.sh config/server.properties
4.查看topic列表。(任意Kafka集群节点)cd ~/modules/kafka_2.11-0.11.0.3/bin/kafka-topics.sh --zookeeper 192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181 --create --topic test --replication-factor 1 --partitions 3
5.生产者生成数据cd ~/modules/kafka_2.11-0.11.0.3/bin/kafka-topics.sh --zookeeper 192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181 --list
6.消费者消费数据cd ~/modules/kafka_2.11-0.11.0.3/bin/kafka-console-producer.sh --broker-list 192.168.0.101:9092,192.168.0.102:9092,192.168.0.103:9092 --topic test
7.删除topiccd ~/modules/kafka_2.11-0.11.0.3/# --from-beginning 从头开始消费,不加该参数则从最新的消息开始消费,之前的丢弃# --bootstrap-server 将在kafka集群上创建一个名称为“__consumer_offsets”的topic,50个分区,1个副本,用于存放消费者偏移量bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.101:9092,192.168.0.102:9092,192.168.0.103:9092 --topic test --from-beginning
cd ~/modules/kafka_2.11-0.11.0.3/bin/kafka-topics.sh --delete --zookeeper 192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181 --topic test
