环境:Centos7 软件:kafka_2.11-2.4.1.tgz 安装步骤: 下载安装 配置 部署 验证

安装kafka前,需要安装zookeeper为其提供分布式服务。可参考以往文章Zookeeper-集群部署
集群规划:

hadoop001 hadoop002 hadoop003
zookeeper zookeeper zookeeper
kafka kafka kafka

下载安装

官网-kafka_2.11-2.4.1.tgz

  1. tar -zxvf kafka_2.11-2.4.1.tgz
  2. mv kafka_2.11-2.4.1 kafka

配置

kafka目录下创建logs文件夹,用于保存kafka运行过程中产生的日志文件。

kafka/config/server.properties

broker.id 必须唯一,在hadoop002和hadoop003中分别修改即可

  1. mkdir kafka/logs
  2. cd kafka/config
  3. vi server.properties
  4. ############################# Server Basics #############################
  5. broker.id=0
  6. # 监听地址
  7. listeners=PLAINTEXT://hadoop001:9092
  8. ############################# Socket Server Settings #############################
  9. # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
  10. #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
  11. delete.topic.enable=true
  12. # The number of threads that the server uses for receiving requests from the network and sending responses to the network
  13. num.network.threads=3
  14. # The number of threads that the server uses for processing requests, which may include disk I/O
  15. num.io.threads=8
  16. # The send buffer (SO_SNDBUF) used by the socket server
  17. socket.send.buffer.bytes=102400
  18. # The receive buffer (SO_RCVBUF) used by the socket server
  19. socket.receive.buffer.bytes=102400
  20. # The maximum size of a request that the socket server will accept (protection against OOM)
  21. socket.request.max.bytes=104857600
  22. ############################# Log Basics #############################
  23. # A comma separated list of directories under which to store log files
  24. log.dirs=/root/kafka/logs
  25. # The default number of log partitions per topic. More partitions allow greater
  26. # parallelism for consumption, but this will also result in more files across
  27. # the brokers.
  28. num.partitions=1
  29. # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
  30. # This value is recommended to be increased for installations with data dirs located in RAID array.
  31. num.recovery.threads.per.data.dir=1
  32. ############################# Zookeeper #############################
  33. # Zookeeper connection string (see zookeeper docs for details).
  34. # This is a comma separated host:port pairs, each corresponding to a zk
  35. # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
  36. # You can also append an optional chroot string to the urls to specify the
  37. # root directory for all kafka znodes.
  38. zookeeper.connect=hadoop001:2181,hadoop002:2181,hadoop003:2181
  39. # Timeout in ms for connecting to zookeeper
  40. zookeeper.connection.timeout.ms=6000

注:如出现
image.png
按提示修改broker.id即可。

配置环境变量

  1. export KAFKA_HOME=/root/kafka
  2. export Path=$PATH:$KAFKA_HOME/bin

部署

分发程序

  1. scp -r /root/kafka/ hadoop002:/root/
  2. scp -r /root/kafka/ hadoop003:/root/

启动与停止

启动: kafka-server-start.sh server.properties

  1. /root/kafka/bin/kafka-server-start.sh /root/kafka/config/server.properties &

启动成功:
image.png
image.png
image.png
停止:kafka-server-stop.sh stop

  1. /root/kafka/bin/kafka-server-stop.sh stop

image.png

测试

  1. kafka/bin/kafka-topics.sh --create --bootstrap-server hadoop001:9092 \
  2. --replication-factor 3 \
  3. --partitions 1 --topic my-replicated-topic
  1. kafka/bin/kafka-topics.sh --describe --bootstrap-server hadoop001:9092 --topic my-replicated-topic

结果如图:
image.png