1、解压

从software目录下解压至module目录。

  1. [root@slave4 software]# ls
  2. apache-hive-3.1.2-bin.tar.gz hadoop-3.2.1.tar.gz kafka_2.12-2.4.1.tgz nacos-server-1.2.1.tar.gz spark-3.0.0-bin-hadoop3.2.tgz
  3. [root@slave4 software]# tar -xzvf kafka_2.12-2.4.1.tgz -C ../module/

-C 参数表示解压到指定目录。

2、配置

2.1 新建目录

进入kafka_2.12-2.4.1目录,新建datazk_data目录。

  1. [root@slave3 kafka_2.12-2.4.1]# mkdir data
  2. [root@slave3 kafka_2.12-2.4.1]# mkdir zk_data

2.2 修改配置文件

进入kafka_2.12-2.4.1/config目录,修改配置文件。

server.properties

  1. ...
  2. # A comma separated list of directories under which to store log files
  3. # kafka运行日志(数据)存放的路径。
  4. log.dirs=/opt/module/kafka_2.12-2.4.1/data
  5. # 开启删除Topic功能
  6. delete.topic.enable=true
  7. ...
  8. listeners=PLAINTEXT://填入当前节点的IP地址:9092
  9. # Hostname and port the broker will advertise to producers and consumers. If not set,
  10. # it uses the value for "listeners" if configured. Otherwise, it will use the value
  11. # returned from java.net.InetAddress.getCanonicalHostName().
  12. advertised.listeners=PLAINTEXT://填入当前节点的IP地址:9092
  13. ...

zookeeper.properties

  1. # the directory where the snapshot is stored.
  2. dataDir=/opt/module/kafka_2.12-2.4.1/zk_data

3、启动

3.1 启动进程

  1. 进入kafka_2.12-2.4.1安装目录
  2. 启动Zookeeper

    1. 启动:
    2. [root@slave3 kafka_2.12-2.4.1]# ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
    3. 停止:
    4. [root@slave3 kafka_2.12-2.4.1]#./bin/zookeeper-server-stop.sh
  3. 启动Kafka

    1. 启动:
    2. [root@slave3 kafka_2.12-2.4.1]# ./bin/kafka-server-start.sh -daemon config/server.properties
    3. 停止:
    4. [root@slave3 kafka_2.12-2.4.1]# ./bin/kafka-server-stop.sh

    3.2 检查

    3.2.1 检查进程

    QuorumPeerMain是zookeeper的进程名称。
    image.png

    3.2.2 消息生产消费测试

    进入安装目录

  4. 创建Topic

    1. ./bin/kafka-topics.sh --create --zookeeper 当前节点hostname或者IP地址:2181 --topic test --partitions 1 --replication-factor 1

    2.查看Topic

    1. ./bin/kafka-topics.sh --list --zookeeper 当前节点hostname或者IP地址:2181

    3.创建生产者

    1. ./bin/kafka-console-producer.sh --topic test --broker-list 当前节点hostname或者IP地址:9092

    生产者:image.png
    4.打开一个新终端,创建消费者。

    1. ./bin/kafka-console-consumer.sh --topic test --bootstrap-server 当前节点hostname或者IP地址:9092

    消费者:image.png

  5. 发送接收消息

发送成功: image.png

消费成功:image.png