官网地址

1. 安装

1.1 环境规划

IP Hostname 部署服务
10.0.0.71 kafka-node01 Kafka,Zookeeper
10.0.0.72 kafka-node02 Kafka,Zookeeper
10.0.0.73 kafka-node03 Kafka,Zookeeper

1.2 修改主机名称

分别修改三台主机的名称和/etc/hosts文件。

  1. hostnamectl set-hostname kafka-node01
  2. hostnamectl set-hostname kafka-node02
  3. hostnamectl set-hostname kafka-node03
  4. # 修改hosts文件
  5. cat >> /etc/hosts << EOF
  6. 10.0.0.71 kafka-node01
  7. 10.0.0.72 kafka-node02
  8. 10.0.0.73 kafka-node03
  9. EOF

1.3 安装JDK

  1. # 安装jdk 1.11
  2. yum -y install java-11-openjdk-devel

1.5 部署Kafka集群

下载地址

1.5.1 下载安装

  1. # 下载
  2. wget https://mirrors.cnnic.cn/apache/kafka/3.1.0/kafka_2.13-3.1.0.tgz -P /opt/src/
  3. # 解压
  4. tar xvf /opt/src/kafka_2.13-3.1.0.tgz -C /opt/
  5. # 软连接
  6. ln -s /opt/kafka_2.13-3.1.0 /opt/kafka

1.4.2 修改Zookeeper配置文件

注意:每台机器的 myid 不同。

  1. # 创建配置文件
  2. cat > /opt/kafka/config/zookeeper.properties << EOF
  3. tickTime=2000
  4. initLimit=10
  5. syncLimit=5
  6. dataDir=/data/zookeeper
  7. datalogDir=/opt/kafka/logs
  8. clientPort=2181
  9. server.1=10.0.0.71:2888:3888
  10. server.2=10.0.0.72:2888:3888
  11. server.3=10.0.0.73:2888:3888
  12. EOF
  13. # 创建数据目录文件夹
  14. mkdir -p /data/zookeeper
  15. # 创建myid文件
  16. [root@kafka-node01 ~]# echo 1 > /data/zookeeper/myid
  17. [root@kafka-node02 ~]# echo 2 > /data/zookeeper/myid
  18. [root@kafka-node03 ~]# echo 3 > /data/zookeeper/myid

1.5.3 修改Kafka配置文件

  • kafka-node01 ```bash

    修改broker id

    sed -ri ‘s#^(broker.id).*#\1=1#’ /opt/kafka/config/server.properties

修改log目录

sed -ri ‘s#^(log.dirs).*#\1=/data/kafka-logs#’ /opt/kafka/config/server.properties

修改Zookeeper集群

sed -ri ‘s#^(zookeeper.connect=).*#\1kafka-node01:2181,kafka-node02:2181,kafka-node03:2181/kafka#’ /opt/kafka/config/server.properties

拷贝文件到其他机器

scp /opt/kafka/config/server.properties kafka-node02:/opt/kafka/config/server.properties scp /opt/kafka/config/server.properties kafka-node03:/opt/kafka/config/server.properies

  1. - kafka-node02
  2. ```bash
  3. # 修改broker id
  4. sed -ri 's#^(broker.id).*#\1=2#' /opt/kafka/config/server.properties
  • kafka-node03
    1. # 修改broker id
    2. sed -ri 's#^(broker.id).*#\1=3#' /opt/kafka/config/server.properties

    1.5.4 配置JVM

    设置 jvm 参数。(参考值:zookeeper:3G,kafka:4-8G,es:不超过服务器内存的一半) ```bash [root@kafka-node01 ~]# grep ‘KAFKA_HEAP_OPTS’ /opt/kafka/bin/zookeeper-server-start.sh if [ “x$KAFKA_HEAP_OPTS” = “x” ]; then export KAFKA_HEAP_OPTS=”-Xmx3G -Xms3G”

[root@kafka-node01 ~]# grep ‘KAFKA_HEAP_OPTS’ /opt/kafka/bin/kafka-server-start.sh if [ “x$KAFKA_HEAP_OPTS” = “x” ]; then export KAFKA_HEAP_OPTS=”-Xmx4G -Xms4G”

  1. <a name="k75mQ"></a>
  2. ### 1.5.5 启动Zookeeper和Kafka
  3. - 方式一:脚本直接启动
  4. ```bash
  5. # 添加环境变量
  6. cat > /etc/profile.d/kafka.sh <<EOF
  7. KAFKA_HOME=/opt/kafka
  8. PATH=\$PATH:/opt/kafka/bin
  9. export KAFKA_HOME PATH
  10. EOF
  11. # 加载环境变量
  12. source /etc/profile.d/kafka.sh
  13. # 启动服务
  14. zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
  15. kafka-server-start.sh -daemon /opt/kafka/config/server.properties
  • 方式二:systemctl 启动 ```bash

    配置zookeeper启动文件

    cat > /usr/lib/systemd/system/zookeeper.service <<EOF [Unit] Description=Apache Zookeeper server Documentation=http://zookeeper.apache.org Requires=network.target remote-fs.target After=network.target remote-fs.target

[Service] Type=simple User=root Group=root ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh

[Install] WantedBy=multi-user.target EOF

配置kafka启动文件

cat > /usr/lib/systemd/system/kafka.service <<EOF [Unit] Description=Apache Kafka server Documentation=http://kafka.apache.org/documentation.html Requires=network.target remote-fs.target After=network.target remote-fs.target zookeeper.service

[Service] Type=simple User=root Group=root ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties ExecStop=/opt/kafka/bin/kafka-server-stop.sh

(file size)

LimitFSIZE=infinity

(cpu time)

LimitCPU=infinity

(virtual memory size)

LimitAS=infinity

(locked-in-memory size)

LimitMEMLOCK=infinity

(open files)

LimitNOFILE=64000

(processes/threads)

LimitNPROC=64000 [Install] WantedBy=multi-user.target EOF

启动

systemctl enable —now zookeeper systemctl enable —now kafka

查看状态

systemctl status zookeeper systemctl status kafka

注意:启动时先启动zookeeper后启动kafka,停止时先停止kafka再停止zookeeper。

  1. <a name="oi2ND"></a>
  2. # 2. 常用操作
  3. <a name="QWxMN"></a>
  4. ## 2.1 Topic相关操作
  5. ```bash
  6. # 创建topic
  7. ./bin/kafka-topics.sh --create --topic test-events --bootstrap-server kafka-node01:9092,kafka-node02:9092,kafka-node03:9092
  8. # 查看指定topic
  9. ./bin/kafka-topics.sh --describe --topic test-events --bootstrap-server kafka-node01:9092,kafka-node02:9092,kafka-node03:9092
  10. # 列出所有topic
  11. ./bin/kafka-topics.sh --list --bootstrap-server kafka-node01:9092,kafka-node02:9092,kafka-node03:9092
  12. # 删除topic
  13. ./bin/kafka-topics.sh --delete --bootstrap-server kafka-node01:9092,kafka-node02:9092,kafka-node03:9092

2.2 生产者相关操作

  1. # 写入event到topic
  2. ./bin/kafka-console-producer.sh --topic test-events --bootstrap-server localhost:9092