date: 2021-11-21title: kafka集群部署 #标题
tags: kafka集群部署 #标签
categories: MQ # 分类

本文用于记录下从0到1搭建一个贴近生产的kafka集群。

环境准备

OS hostname IP
CentOS 7.7 kafka1 192.168.20.2
CentOS 7.7 kafka2 192.168.20.3
CentOS 7.7 kafka3 192.168.20.4

配置hosts文件

  1. $ cat >> /etc/hosts << EOF # 配置hosts文件,以便可以相互解析
  2. > 192.168.20.2 kafka1
  3. > 192.168.20.3 kafka2
  4. > 192.168.20.4 kafka3
  5. > EOF
  6. # 将hosts文件分发到其他节点
  7. $ for i in kafka{2..3};do rsync -avz /etc/hosts ${i}:/etc/;done

配置zookeeper集群

不管kafka是单机部署也好,群集也好,其都依托于zookeeper(接下来简称为zk),每个kakfa节点(以下简称为broker)都会到zk上进行注册,所以,要有一个kafka集群,必须现配置zk集群。

至于kafka和zk的关系,网上有大量的文档,这里就不重复造轮子了。

接下来,没有特别说明的,只在kafka1上进行配置。

下载

  1. # kafka的二进制包囊括了zk,所以直接下载kafka即可。
  2. $ cd /opt/
  3. $ wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz
  4. # 解压
  5. $ tar zxf kafka_2.13-2.5.0.tgz
  6. $ mv kafka_2.13-2.5.0 kafka

配置zk

  1. $ mkdir /data/zk -p # 创建用于存放数据的目录
  2. # 修改配置文件
  3. $ pwd
  4. /opt/kafka
  5. $ rm -rf bin/windows/ # 删除windows中使用的脚本
  6. $ egrep -v '^#|^$' config/zookeeper.properties # 修改后的配置文件如下
  7. # 注意每一行配置后不要有空格,否则启动时可能会报错
  8. dataDir=/data/zk
  9. clientPort=2181
  10. maxClientCnxns=0
  11. tickTime=2000
  12. initLimit=10
  13. syncLimit=5
  14. quorumListenOnAllIPs=true
  15. server.1=kafka1:2888:3888
  16. server.2=kafka2:2888:3888
  17. server.3=kafka3:2888:3888
  18. $ echo 1 > /data/zk/myid # 设置zk的id

上述配置的解释:

  • dataDir:zk数据存放目录
  • clientPort:client连接zk时需要指定的端口
  • maxClientCnxns:该参数表示允许客户端最大连接数。如果设置为0则表示不做限制。
  • tickTime:该参数表示Zookeeper服务之间进行心跳监测的间隔时间,单位是毫秒。设置为2000,表示每隔2秒,Zookeeper服务器之间会进行一次心跳监测。
  • initLimit:该参数表示Zookeeper集群中的Follower在启动时需要在多少个心跳时间内从Leader同步数据。设置为10,表示要在10个心跳时间内,也就是在20秒内,要完成Leader数据的同步(如果数据量大,可以适当的将此指调大)。
  • syncLimit:该参数表示超过多少个心跳时间收不到Follower的响应,Leader就认为此Follower已经下线。设置为5,表示在5个心跳时间内,也就是判断Follower是否存活的响应时间是10秒。
  • quorumListenOnAllIPs:该参数设置为true,配置为true可以避免入坑(尤其是多网卡主机),Zookeeper服务器将监听所有可用IP地址的连接。他会影响ZAB协议和快速Leader选举协议。默认是false。
  • server.x=IP:Port1:Port2:指定zk节点列表。

    • x:表示节点的编号,此编号需要写入到zk数据存放目录下,以myid命名的文件。
    • IP:可以指定IP也可以是主机名。
    • Port1表示该Zookeeper集群中的Follower节点与Leader节点通讯时使用的端口。作为Leader时监听该端口。
    • Port2表示选举新的Leader时,Zookeeper节点之间互相通信的端口,比如当Leader挂掉时,其余服务器会互相通信,选出新的Leader。Leader和Follower都会监听该端口。

至此,第一台主机的zk配置完成,但是现在先不启动,我们先接着把第一台主机的kafka也配置好。

配置kafka

  1. $ cat config/server.properties # kafka配置文件如下
  2. broker.id=0
  3. listeners=PLAINTEXT://:9092
  4. auto.create.topics.enable=true
  5. delete.topic.enable=true
  6. num.network.threads=15
  7. num.io.threads=30
  8. socket.send.buffer.bytes=102400
  9. socket.receive.buffer.bytes=102400
  10. socket.request.max.bytes=104857600
  11. log.dirs=/data/kafka/log
  12. num.partitions=3
  13. num.recovery.threads.per.data.dir=3
  14. default.replication.factor=2
  15. num.replica.fetchers=2
  16. log.retention.hours=168
  17. log.segment.bytes=1073741824
  18. log.retention.check.interval.ms=300000
  19. zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181
  20. zookeeper.connection.timeout.ms=10000
  21. group.initial.rebalance.delay.ms=0
  22. $ mkdir /data/kafka/log -p # 创建所需目录

kafka重要配置

kafka有以下一些参数,需要重点关注下,才能搭建出一个强壮的kafka集群。

  • auto.create.topics.enable:该配置项默认值是true,但在生产环境最好设置为false。这样可以控制创建Topic的人以及创建时间。
  • background.threads:该配置项默认值是10,既整个Kafka在执行各种任务时会启动的线程数。如果你的CPU很强劲,那么可以将线程数设大一点。
  • delete.topic.enable:该配置项默认值是false,可以根据实际需求改变,在生产环境还是建议保持默认值,这样至少不会出现Topic被误删的情况。
  • log.flush.interval.messages:该配置项最好保持默认值,把这个任务交给操作系统的文件系统去处理。
  • log.retention.hours:日志文件保留的时间默认是168小时,即7天。这个配置可以根据具体业务需求而定。
  • message.max.bytes:每条Message或一批次Message的大小默认是1MB。这个配置也要根据具体需求而定,比如带宽的情况。
  • min.insync.replicas:该配置项的默认值是1,既在acks=all时,最少得有一个Replica进行确认回执。建议在生产环境配置为2,保证数据的完整性。
  • num.io.threads:处理I/O操作的线程数,默认是8个线程。如果觉得在这个环节达到了瓶颈,那么可以适当调整该参数。
  • num.network.threads:处理网络请求和响应的线程数,默认是3个线程。如果觉得在这个环节达到了瓶颈,那么可以适当调整该参数。
  • num.recovery.threads.per.data.dir:每个数据目录启用几个线程来处理,这里的线程数和数据目录数是乘积关系,并且只在Broker启动或关闭时使用。默认值是1,根据实际情况配置数据目录数,从而判断该配置项应该如何设置。
  • num.replica.fetchers:该配置项影响Replicas同步数据的速度,默认值是1,如果发现Replicas同步延迟较大,可以提升该配置项。
  • offsets.retention.minutes:Offset保留的时间,默认值是1440,既24小时。在生产环境建议将该配置项设大一点,比如设置为1个月,保证消费数据的完整性。
  • unclean.leader.election.enable:该配置项的作用是,指定是否可以将非ISR的Replicas选举为Leader,默认值为false。在生产环境建议保持默认值,防止数据丢失。
  • zookeeper.session.timeout.ms:Zookeeper会话超时时间,默认值为6000。按实际情况而定,通常情况下保持60秒即可。
  • default.replication.factor:默认Replication Factor为1,建议设置为2或者3,以保证数据完整性和整个集群的健壮性。
  • num.partitions:Topic默认的Partition数,默认是1,建议设置为3或者6,以保证数据完整性和整个集群的健壮性。
  • group.initial.rebalance.delay.ms:当Consumer Group新增或减少Consumer时,重新分配Topic Partition的延迟时间。
    以上是比较重要,需要我们根据实际情况额外关注的配置项。

配置其他节点

现在将kafka1主机上修改好的配置文件都发送到其余两节点,然后修改冲突。

  1. $ for i in kafka{2..3};do rsync -avz /opt/kafka ${i}:/opt/;done
  2. $ for i in kafka{2..3};do rsync -avz /data ${i}:/;done
  3. # 注,我这里的data目录下只有kafka和zk所用的东西,请结合实际来分发

配置kafka2

  1. $ echo 2 > /data/zk/myid # 修改zk的myid
  2. # 修改kakfa的id号
  3. $ sed -i 's#broker.id=0#broker.id=1#g' /opt/kafka/config/server.properties

配置kafka3

同样是修改冲突即可。

  1. $ echo 3 > /data/zk/myid
  2. $ sed -i 's#broker.id=0#broker.id=2#g' /opt/kafka/config/server.properties

启动各个程序

启动zk

各个节点都需要执行以下命令

  1. # 启动zk
  2. $ cd /opt/kafka/bin
  3. $ ./zookeeper-server-start.sh ../config/zookeeper.properties &
  4. $ ss -lnpt | egrep '2181|3888|2888' # 只有leader才会监听2888端口
  5. LISTEN 0 50 :::2181 :::* users:(("java",pid=58948,fd=118))
  6. LISTEN 0 50 :::2888 :::* users:(("java",pid=58948,fd=130))
  7. LISTEN 0 50 :::3888 :::* users:(("java",pid=58948,fd=127))
  8. [

通过zk CLI验证

我们还可以通过Zookeeper Client连接到集群来检验。我们选择任意一台服务器,首先连接kafka1主机。

  1. $ ./zookeeper-shell.sh kafka1:2181 # 连接到kafka1
  2. create /my_zNode "some data" # 连接成功后,创建一个zNode
  3. Created /my_zNode
  4. ls / # 查看节点中所有znode
  5. [my_zNode, zookeeper]
  6. $ ./zookeeper-shell.sh kafka2:2181 # 再连接到kaka2
  7. ls / # 同样可以查看到my_zNode
  8. [my_zNode, zookeeper]
  9. get /my_zNode # 查看my_zNode中的数据
  10. some data
  11. $ ./zookeeper-shell.sh kafka3:2181 # 再连接到kafka3节点
  12. ls / # 查看
  13. [my_zNode, zookeeper]
  14. get /my_zNode # 获取数据
  15. some data
  16. set /my_zNode "new data" # 修改my_zNode中的数据
  17. get /my_zNode # 确认数据已修改
  18. new data
  19. $ ./zookeeper-shell.sh kafka1:2181 # 再连接到kafka1,确定数据修改
  20. get /my_zNode
  21. new data

上面的过程虽然比较繁琐,但是充分说明了我们的Zookeeper集群是搭建成功的。无论从哪个Zookeeper节点创建的zNode,都可以同步到集群中的其他节点。无论从哪个Zookeeper节点修改的zNode中的数据,也可以同步到起群中的其他节点。

启动kafka

以下指令需要在所有节点都执行。

  1. $ cd /opt/kafka/bin/
  2. $ ./kafka-server-start.sh -daemon ../config/server.properties

kafka常用指令

当三个节点都启动后,并且确认9092端口在监听,那么就可以执行下面的指令,来测试kafka是否正常了。

  1. # 显示topic列表
  2. bin/kafka-topics.sh --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --list
  3. 也可以从一个节点上查看。下面简写查看一个节点。
  4. # 创建一个topic,并指定topic属性(副本数、分区数等)
  5. bin/kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 3 --topic test
  6. # --partitions(分区)应等于或大于消费者,--replication-factor(副本数)不能大于kafka集群内主机节点
  7. # 查看某个topic的状态
  8. bin/kafka-topics.sh --zookeeper kafka1:2181 --topic test --describe
  9. # 生产消息
  10. bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic test
  11. # 消费消息
  12. bin/kafka-console-consumer.sh --bootstrap-server PLAINTEXT://kafka1:9092 --topic test
  13. # 查看实时消息,如果从头看可在后面加 --from-beginning
  14. kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test --from-beginning
  15. # 删除topic
  16. bin/kafka-topics.sh --delete --zookeeper kafka1:2181 --topic test

安装kafka管理工具

kafka有一个管理工具叫kafka-manager,它支持管理多个集群、选择副本、副本重新分配以及创建Topic。同时,这个管理工具也是一个非常好的可以快速浏览这个集群的工具。

kafka manager有如下功能:

  • 管理多个kafka集群
  • 便捷的检查kafka集群状态(topics,brokers,备份分布情况,分区分布情况)
  • 选择你要运行的副本
  • 基于当前分区状况进行
  • 可以选择topic配置并创建topic(0.8.1.1和0.8.2的配置不同)
  • 删除topic(只支持0.8.2以上的版本并且要在broker配置中设置delete.topic.enable=true)
  • Topic list会指明哪些topic被删除(在0.8.2以上版本适用)
  • 为已存在的topic增加分区
  • 为已存在的topic更新配置
  • 在多个topic上批量重分区
  • 在多个topic上批量重分区(可选partition broker位置)

强烈建议使用docker来部署这个kafka-manager工具,编译安装的坑实在跳不过去,也不纠结了,doker一条命令搞定。如下:

  1. $ docker run -itd --rm -p 9000:9000 -e ZK_HOSTS="192.168.20.2:2181,192.168.20.3:2181,192.168.20.4:2181" -e APPLICATION_SECRET=letmein sheepkiller/kafka-manager

此项目托管在github上,可以参阅github

docker启动后,访问9000端口,界面如下:

kafka集群部署 - 图1

关于kafka-manager的配置使用,可以参阅详细解析 kafka manager 的使用

kafka开启jmx polling

在使用kafka-amnager时,有一个功能为Enable JMX Polling,该部分直接影响部分 kafka broker 和 topic 监控指标指标的获取,那么如何开启此功能呢?方法有两种。如下:

方法1:

启动kafka时增加JMX_PORT=9988。如下:

  1. $ JMX_PORT=9988 ./kafka-server-start.sh -daemon ../config/server.properties

方法2:

修改kafka-run-class.sh脚本,第一行增加JMX_PORT=9988即可。

不管是哪种方法,都只是定义了一个变量而已,剩下的事情交给程序去做就好。

附加:k8s部署kafka-manager

在k8s中部署kafka-manager的yml文件如下(可根据实际情况来修改):

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. name: iov-sjjh-kafka-manager
  5. namespace: iov-sjjh
  6. labels:
  7. name: iov-sjjh-kafka-manager
  8. spec:
  9. type: NodePort
  10. ports:
  11. - name: service
  12. port: 9000
  13. nodePort: 30200
  14. targetPort: 9000
  15. selector:
  16. name: iov-sjjh-kafka-manager
  17. ---
  18. apiVersion: apps/v1beta1
  19. kind: Deployment
  20. metadata:
  21. name: iov-sjjh-kafka-manager
  22. namespace: iov-sjjh
  23. spec:
  24. replicas: 1
  25. template:
  26. metadata:
  27. labels:
  28. name: iov-sjjh-kafka-manager
  29. spec:
  30. containers:
  31. - name: iov-sjjh-kafka-manager
  32. image: sheepkiller/kafka-manager:latest
  33. imagePullPolicy: IfNotPresent
  34. ports:
  35. - containerPort: 9000
  36. resources:
  37. limits:
  38. memory: 1G
  39. cpu: 2
  40. requests:
  41. memory: 1G
  42. cpu: 2
  43. env:
  44. - name: "ZK_HOSTS"
  45. value: "172.16.2.74:2181,172.16.2.75:2181,172.16.2.76:2181"
  46. - name: "KAFKA_MANAGER_AUTH_ENABLED"
  47. value: "true"
  48. - name: "KAFKA_MANAGER_USERNAME"
  49. value: "iov_sjjh"
  50. - name: "KAFKA_MANAGER_PASSWORD"
  51. value: "iov_sjjh123"