方式一:下载安装包并配置
解压:tar -xzvf ...
修改配置文件
修改 config/ 目录下的 server.properties 的内容
修改的内容(Kafka 的运行日志存放的路径): log.dirs=/usr/local/software/kafka_2.12-3.0.0/kafka-logs
配置环境变量
在 /etc/profile.d/my_env.sh 文件中增加 kafka 环境变量配置,
并执行命令刷新环境变量:source /etc/profile
#KAFKA_HOME
export KAFKA_HOME=/usr/local/software/kafka_2.12-3.0.0
export PATH=$PATH:$KAFKA_HOME/bin
server.properties 配置文件解读
#broker 的全局唯一编号,不能重复,只能是数字。
broker.id=0
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志(数据)存放的路径,路径不需要提前创建, kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用 "," 分隔
log.dirs=/opt/module/kafka/datas
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个 topic 创建时的副本数,默认时 1 个副本
offsets.topic.replication.factor=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个 segment 文件的大小,默认最大 1G
log.segment.bytes=1073741824
#检查过期数据的时间,默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka, 方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
操作 Kafka
首先启动 Zookeeper,然后启动 Kafka。
启动 Kafka 服务器:bin/kafka-server-start.sh -daemon config/server.properties &
(后台运行)
关闭 Kafka 服务器:bin/kafka-server-stop.sh
注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper集群。
因为 Zookeeper 集群当中记录着 Kafka 集群相关信息, Zookeeper 集群一旦先停止,Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程了。