环境:Centos7 软件:kafka_2.11-2.4.1.tgz 安装步骤: 下载安装 配置 部署 验证
安装kafka前,需要安装zookeeper为其提供分布式服务。可参考以往文章Zookeeper-集群部署。
集群规划:
hadoop001 | hadoop002 | hadoop003 |
---|---|---|
zookeeper | zookeeper | zookeeper |
kafka | kafka | kafka |
下载安装
tar -zxvf kafka_2.11-2.4.1.tgz
mv kafka_2.11-2.4.1 kafka
配置
kafka目录下创建logs文件夹,用于保存kafka运行过程中产生的日志文件。
kafka/config/server.properties
broker.id 必须唯一,在hadoop002和hadoop003中分别修改即可
mkdir kafka/logs
cd kafka/config
vi server.properties
############################# Server Basics #############################
broker.id=0
# 监听地址
listeners=PLAINTEXT://hadoop001:9092
############################# Socket Server Settings #############################
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
delete.topic.enable=true
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/root/kafka/logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=hadoop001:2181,hadoop002:2181,hadoop003:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
配置环境变量
export KAFKA_HOME=/root/kafka
export Path=$PATH:$KAFKA_HOME/bin
部署
分发程序
scp -r /root/kafka/ hadoop002:/root/
scp -r /root/kafka/ hadoop003:/root/
启动与停止
启动: kafka-server-start.sh server.properties
/root/kafka/bin/kafka-server-start.sh /root/kafka/config/server.properties &
启动成功:
停止:kafka-server-stop.sh stop
/root/kafka/bin/kafka-server-stop.sh stop
测试
kafka/bin/kafka-topics.sh --create --bootstrap-server hadoop001:9092 \
--replication-factor 3 \
--partitions 1 --topic my-replicated-topic
kafka/bin/kafka-topics.sh --describe --bootstrap-server hadoop001:9092 --topic my-replicated-topic
结果如图: