资源规划

组件 bigdata-node1 bigdata-node2 bigdata-node3
OS centos7.6 centos7.6 centos7.6
JDK jvm jvm jvm
Zookeeper QuorumPeerMain QuorumPeerMain QuorumPeerMain
Kafka kafka kafka kafka

安装介质

版本:kafka_2.11-0.11.0.3.tgz
下载:http://kafka.apache.org/downloads.html

环境准备

安装JDK

参考:《CentOS7.6-安装JDK-1.8.221

安装ZooKeeper

参考:《CentOS7.6-安装ZooKeeper-3.4.10

安装Kafka

解压缩

  1. # 先在节点bigdata-node3上安装,之后分发到bigdata-node1、bigdata-node2
  2. cd /share
  3. tar -zxvf kafka_2.11-0.11.0.3.tgz -C ~/modules/
  4. rm kafka_2.11-0.11.0.3.tgz

创建相关目录

  1. # kafka数据目录
  2. mkdir -p ~/modules/kafka_2.11-0.11.0.3/tmp/kafka-logs
  3. chmod -R a+w ~/modules/kafka_2.11-0.11.0.3/tmp/kafka-logs

配置

  1. server.properties
    1. vi ~/modules/kafka_2.11-0.11.0.3/config/server.properties
    配置如下:
    1. # 节点唯一标识,注意该值为整形,建议设置为IP最后一段
    2. broker.id=103
    3. # 默认端口号
    4. listeners=PLAINTEXT://192.168.0.103:9092
    5. advertised.listeners=PLAINTEXT://192.168.0.103:9092
    6. # Kafka数据目录
    7. log.dirs=/home/vagrant/modules/kafka_2.11-0.11.0.3/tmp/kafka-logs
    8. # 配置Zookeeper
    9. zookeeper.connect=192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181
    10. # 配置删除属性
    11. delete.topic.enable=true
    2.zookeeper.properties
    1. vi ~/modules/kafka_2.11-0.11.0.3/config/zookeeper.properties
    配置如下:
    1. #Zookeeper的数据存储路径与Zookeeper集群配置保持一致
    2. dataDir=/home/vagrant/modules/zookeeper-3.4.10/data
    3.consumer.properties
    1. vi ~/modules/kafka_2.11-0.11.0.3/config/consumer.properties
    配置如下:
    1. #配置Zookeeper地址
    2. zookeeper.connect=192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181
    4.producer.properties
    1. vi ~/modules/kafka_2.11-0.11.0.3/config/producer.properties
    配置如下:
    1. #配置Kafka集群地址
    2. bootstrap.servers=192.168.0.101:9092,192.168.0.102:9092,192.168.0.103:9092
    5.kafka-run-class.sh
    1. vi ~/modules/kafka_2.11-0.11.0.3/bin/kafka-run-class.sh
    配置如下:
    1. # 行首新增JAVA_HOME配置
    2. export JAVA_HOME=/home/vagrant/modules/jdk1.8.0_221

    分发Kafka

    1. cd ~/modules/
    2. scp -r kafka_2.11-0.11.0.3 vagrant@bigdata-node1:~/modules/
    3. scp -r kafka_2.11-0.11.0.3 vagrant@bigdata-node2:~/modules/
    1. 修改分发节点的server.properties
    1. vi ~/modules/kafka_2.11-0.11.0.3/config/server.properties
    bigdata-node1节点配置如下:
    1. broker.id=101
    2. listeners=PLAINTEXT://192.168.0.101:9092
    3. advertised.listeners=PLAINTEXT://192.168.0.101:9092
    bigdata-node2节点配置如下:
    1. broker.id=102
    2. listeners=PLAINTEXT://192.168.0.102:9092
    3. advertised.listeners=PLAINTEXT://192.168.0.102:9092

    验证Kafka

    1.启动Zookeeper集群。
    2.启动Kafka集群。
    1. cd ~/modules/kafka_2.11-0.11.0.3/
    2. # 启动(每个节点)
    3. bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
    4. bin/kafka-server-start.sh -daemon config/server.properties
    5. # 停止(每个节点)
    6. bin/kafka-server-stop.sh config/server.properties
    3.创建topic。(任意Kafka集群节点)
    1. cd ~/modules/kafka_2.11-0.11.0.3/
    2. bin/kafka-topics.sh --zookeeper 192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181 --create --topic test --replication-factor 1 --partitions 3
    4.查看topic列表。(任意Kafka集群节点)
    1. cd ~/modules/kafka_2.11-0.11.0.3/
    2. bin/kafka-topics.sh --zookeeper 192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181 --list
    5.生产者生成数据
    1. cd ~/modules/kafka_2.11-0.11.0.3/
    2. bin/kafka-console-producer.sh --broker-list 192.168.0.101:9092,192.168.0.102:9092,192.168.0.103:9092 --topic test
    6.消费者消费数据
    1. cd ~/modules/kafka_2.11-0.11.0.3/
    2. # --from-beginning 从头开始消费,不加该参数则从最新的消息开始消费,之前的丢弃
    3. # --bootstrap-server 将在kafka集群上创建一个名称为“__consumer_offsets”的topic,50个分区,1个副本,用于存放消费者偏移量
    4. bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.101:9092,192.168.0.102:9092,192.168.0.103:9092 --topic test --from-beginning
    7.删除topic
    1. cd ~/modules/kafka_2.11-0.11.0.3/
    2. bin/kafka-topics.sh --delete --zookeeper 192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181 --topic test