1. 安装
1.1 环境规划
| IP | Hostname | 部署服务 |
|---|---|---|
| 10.0.0.71 | kafka-node01 | Kafka,Zookeeper |
| 10.0.0.72 | kafka-node02 | Kafka,Zookeeper |
| 10.0.0.73 | kafka-node03 | Kafka,Zookeeper |
1.2 修改主机名称
分别修改三台主机的名称和/etc/hosts文件。
hostnamectl set-hostname kafka-node01hostnamectl set-hostname kafka-node02hostnamectl set-hostname kafka-node03# 修改hosts文件cat >> /etc/hosts << EOF10.0.0.71 kafka-node0110.0.0.72 kafka-node0210.0.0.73 kafka-node03EOF
1.3 安装JDK
# 安装jdk 1.11yum -y install java-11-openjdk-devel
1.5 部署Kafka集群
1.5.1 下载安装
# 下载wget https://mirrors.cnnic.cn/apache/kafka/3.1.0/kafka_2.13-3.1.0.tgz -P /opt/src/# 解压tar xvf /opt/src/kafka_2.13-3.1.0.tgz -C /opt/# 软连接ln -s /opt/kafka_2.13-3.1.0 /opt/kafka
1.4.2 修改Zookeeper配置文件
注意:每台机器的 myid 不同。
# 创建配置文件cat > /opt/kafka/config/zookeeper.properties << EOFtickTime=2000initLimit=10syncLimit=5dataDir=/data/zookeeperdatalogDir=/opt/kafka/logsclientPort=2181server.1=10.0.0.71:2888:3888server.2=10.0.0.72:2888:3888server.3=10.0.0.73:2888:3888EOF# 创建数据目录文件夹mkdir -p /data/zookeeper# 创建myid文件[root@kafka-node01 ~]# echo 1 > /data/zookeeper/myid[root@kafka-node02 ~]# echo 2 > /data/zookeeper/myid[root@kafka-node03 ~]# echo 3 > /data/zookeeper/myid
1.5.3 修改Kafka配置文件
修改log目录
sed -ri ‘s#^(log.dirs).*#\1=/data/kafka-logs#’ /opt/kafka/config/server.properties
修改Zookeeper集群
sed -ri ‘s#^(zookeeper.connect=).*#\1kafka-node01:2181,kafka-node02:2181,kafka-node03:2181/kafka#’ /opt/kafka/config/server.properties
拷贝文件到其他机器
scp /opt/kafka/config/server.properties kafka-node02:/opt/kafka/config/server.properties scp /opt/kafka/config/server.properties kafka-node03:/opt/kafka/config/server.properies
- kafka-node02```bash# 修改broker idsed -ri 's#^(broker.id).*#\1=2#' /opt/kafka/config/server.properties
- kafka-node03
# 修改broker idsed -ri 's#^(broker.id).*#\1=3#' /opt/kafka/config/server.properties
1.5.4 配置JVM
设置 jvm 参数。(参考值:zookeeper:3G,kafka:4-8G,es:不超过服务器内存的一半) ```bash [root@kafka-node01 ~]# grep ‘KAFKA_HEAP_OPTS’ /opt/kafka/bin/zookeeper-server-start.sh if [ “x$KAFKA_HEAP_OPTS” = “x” ]; then export KAFKA_HEAP_OPTS=”-Xmx3G -Xms3G”
[root@kafka-node01 ~]# grep ‘KAFKA_HEAP_OPTS’ /opt/kafka/bin/kafka-server-start.sh if [ “x$KAFKA_HEAP_OPTS” = “x” ]; then export KAFKA_HEAP_OPTS=”-Xmx4G -Xms4G”
<a name="k75mQ"></a>### 1.5.5 启动Zookeeper和Kafka- 方式一:脚本直接启动```bash# 添加环境变量cat > /etc/profile.d/kafka.sh <<EOFKAFKA_HOME=/opt/kafkaPATH=\$PATH:/opt/kafka/binexport KAFKA_HOME PATHEOF# 加载环境变量source /etc/profile.d/kafka.sh# 启动服务zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.propertieskafka-server-start.sh -daemon /opt/kafka/config/server.properties
- 方式二:systemctl 启动
```bash
配置zookeeper启动文件
cat > /usr/lib/systemd/system/zookeeper.service <<EOF [Unit] Description=Apache Zookeeper server Documentation=http://zookeeper.apache.org Requires=network.target remote-fs.target After=network.target remote-fs.target
[Service] Type=simple User=root Group=root ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
[Install] WantedBy=multi-user.target EOF
配置kafka启动文件
cat > /usr/lib/systemd/system/kafka.service <<EOF [Unit] Description=Apache Kafka server Documentation=http://kafka.apache.org/documentation.html Requires=network.target remote-fs.target After=network.target remote-fs.target zookeeper.service
[Service] Type=simple User=root Group=root ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties ExecStop=/opt/kafka/bin/kafka-server-stop.sh
(file size)
LimitFSIZE=infinity
(cpu time)
LimitCPU=infinity
(virtual memory size)
LimitAS=infinity
(locked-in-memory size)
LimitMEMLOCK=infinity
(open files)
LimitNOFILE=64000
(processes/threads)
LimitNPROC=64000 [Install] WantedBy=multi-user.target EOF
启动
systemctl enable —now zookeeper systemctl enable —now kafka
查看状态
systemctl status zookeeper systemctl status kafka
注意:启动时先启动zookeeper后启动kafka,停止时先停止kafka再停止zookeeper。
<a name="oi2ND"></a># 2. 常用操作<a name="QWxMN"></a>## 2.1 Topic相关操作```bash# 创建topic./bin/kafka-topics.sh --create --topic test-events --bootstrap-server kafka-node01:9092,kafka-node02:9092,kafka-node03:9092# 查看指定topic./bin/kafka-topics.sh --describe --topic test-events --bootstrap-server kafka-node01:9092,kafka-node02:9092,kafka-node03:9092# 列出所有topic./bin/kafka-topics.sh --list --bootstrap-server kafka-node01:9092,kafka-node02:9092,kafka-node03:9092# 删除topic./bin/kafka-topics.sh --delete --bootstrap-server kafka-node01:9092,kafka-node02:9092,kafka-node03:9092
2.2 生产者相关操作
# 写入event到topic./bin/kafka-console-producer.sh --topic test-events --bootstrap-server localhost:9092
