环境:Centos7 软件:kafka_2.11-2.4.1.tgz 安装步骤: 下载安装 配置 部署 验证
安装kafka前,需要安装zookeeper为其提供分布式服务。可参考以往文章Zookeeper-集群部署。
集群规划:
| hadoop001 | hadoop002 | hadoop003 |
|---|---|---|
| zookeeper | zookeeper | zookeeper |
| kafka | kafka | kafka |
下载安装
tar -zxvf kafka_2.11-2.4.1.tgzmv kafka_2.11-2.4.1 kafka
配置
kafka目录下创建logs文件夹,用于保存kafka运行过程中产生的日志文件。
kafka/config/server.properties
broker.id 必须唯一,在hadoop002和hadoop003中分别修改即可
mkdir kafka/logscd kafka/configvi server.properties############################# Server Basics #############################broker.id=0# 监听地址listeners=PLAINTEXT://hadoop001:9092############################# Socket Server Settings ############################## Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSLdelete.topic.enable=true# The number of threads that the server uses for receiving requests from the network and sending responses to the networknum.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/Onum.io.threads=8# The send buffer (SO_SNDBUF) used by the socket serversocket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket serversocket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log fileslog.dirs=/root/kafka/logs# The default number of log partitions per topic. More partitions allow greater# parallelism for consumption, but this will also result in more files across# the brokers.num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.# This value is recommended to be increased for installations with data dirs located in RAID array.num.recovery.threads.per.data.dir=1############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).# This is a comma separated host:port pairs, each corresponding to a zk# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".# You can also append an optional chroot string to the urls to specify the# root directory for all kafka znodes.zookeeper.connect=hadoop001:2181,hadoop002:2181,hadoop003:2181# Timeout in ms for connecting to zookeeperzookeeper.connection.timeout.ms=6000
配置环境变量
export KAFKA_HOME=/root/kafkaexport Path=$PATH:$KAFKA_HOME/bin
部署
分发程序
scp -r /root/kafka/ hadoop002:/root/scp -r /root/kafka/ hadoop003:/root/
启动与停止
启动: kafka-server-start.sh server.properties
/root/kafka/bin/kafka-server-start.sh /root/kafka/config/server.properties &
启动成功:


停止:kafka-server-stop.sh stop
/root/kafka/bin/kafka-server-stop.sh stop
测试
kafka/bin/kafka-topics.sh --create --bootstrap-server hadoop001:9092 \--replication-factor 3 \--partitions 1 --topic my-replicated-topic
kafka/bin/kafka-topics.sh --describe --bootstrap-server hadoop001:9092 --topic my-replicated-topic
结果如图:

