前言

该文档主要是提供一些安装的参考,帮住开发人员以及运维人员能快速的很好的安装kafka,方便后面的开发测试与发布。

一 集群环境规划

先设计好,申请好资源,才能实践,这是我们一般公司的工作流程。

1.1 OS的选型

这块其实没啥可选的,就是linux ,linux ,linux,重要的事情说三遍,目前我已经很少看到windows做服务器的。但是作为一个技术人员要态度严谨的阐述其中的因果关系。

1.1.1 I/O 模型的使用

目前主流的I/O模型 有五种:阻塞 I/O、非阻塞 I/O、I/O多路复用、信号驱动 I/O 和异步 I/O
linux epoll(比select模型高级 底层优化了select 函数,取消了轮询机制,取而代之的是回调机制)属于 I/O多路复用模型。
window IOCP 模型 属于真正意义上的异步 I/O模型。
kafka clients 底层采用 Java 的 Selector 机制 ,他在linux上使用的是epoll ,而在windows平台上,Java NIO 的 Selector 底层是使用 select 模型而非IOCP 实现的(Java NI02 才是使用 IOCP 实现),所以linux上部署kafka要比windw性能高些(epoll>select)

1.1.2 数据网络传输效率

kafka 调用 linux 的sendfile的Zero Copy(零拷贝)技术
虽然windows也提供了TransmitFiie 函数来支持Zero Copy(零拷贝)技术,单是jdk8u60(FiieChannel 的 transferTo 方法调用读函数)才支持。
具体详情参考:JDK bug:http://bugs.java.com/view_bug.do?bug_id=8064407
如果是使用jdk7的项目显然无法利用零拷贝技术提升性能。
综合以上两点差异以及目前主流服务器通常在 Linux 上部署的事实,强烈推荐 Kafka 的生产环境集群首选 Linux 操作系统。当然至于是 Linux 的哪个发行版,者熟悉的即可。

1.2 磁盘的选型

如果问哪个因素对 Kafka 性能最重要?磁盘无疑是排名靠前的答案。众所周知,Kafka是大量使用磁盘的。
SSD 通常有着极低的寻道时间(seek time) 和存取时间 (access time),性能上的优势很大,但同时也有着非常高的成本。因此在规划 Kafka 线上环境时,就需要根据公司自身的实际条件进行有针对性的选型。

1.3 磁盘容量的规划

这是一个非常经典的规划问题。如前所述,Kafka的每条消息都保存在实际的物理磁盘中,这些消息默认会被broker保存一段时间之后清除。这 段时间是可以配置的,因此我们可以根据自身实际业务场景和存储需求来大致计算线上环境所需的磁盘容量。
kafka主要考虑五个因素

  • 新增消息数 [1亿条数据/天]
  • 消息留存时间 [7天]
  • 平均消息大小 [平均每条1kb]
  • 副本数 [2]
  • 是否启用压缩 [0.5压缩比]
    求案例结果值:

1亿X1kb/1000/1000=200G (每天的数据) ==> 200X(1+10%)=210G(10%是预留空间) ==> 210X7=1.5T(7天的容量) ==> 1.5X0.5=0.75T (压缩比是0.5)
当然上面只是一个计算方式,我们可以根据自己的业务进行调整数值即可。

1.4 内存规划

很多人的误区,觉得内存没有啥必要,因为kafka的数据都是写到磁盘上的,其实不然,kafka之所以有较高的吞吐量是因为他很好的利用页缓存(page cache)。
Kafka 虽然会持久化每条消息,但其实这个工作都是底层的文件系统来完成的,Kafka仅仅将消息写入page cache而己,之后将消息“冲刷”到磁盘的任务完全交由操作系统来完成。另外consumer在读取消息时也会首先尝试从该区域中查找,如果直接命中则完全不用执行耗 时的物理I/O操作,从而提升了consumer 的整体性能。不论是缓冲己发送消息还是待读取消 息,操作系统都要先幵辟一块内存区域用于存放接收的Kafka 消息,因此这块内存区域大小的 设置对于Kafka 的性能就显得尤为关键了。
kafka 对于 Java 堆内存的使用反而不是很多,因为 Kafka 中的消息通 常都属于“朝生夕灭”的对象实例,可以很快地垃圾回收(GC) 。一般情况下,broker所需的堆内存都不会超过6GB。所以对于一台16GB内存的机器而言,文件系统page cache的大小甚至可以达到10GB!
除以上这些考量之外,用户还需要把 page cache 大小与实际线上环境中设置的日志段大小 相比较。假设单个日志段文件大小设置为 10GB, 那么你至少应该给予 page cache 10GB 以上的内存空间。这样,待消费的消息有很大概率会保 存在页缓存中,故consumer能够直接命中页缓存而无须执行缓慢的磁盘 I/O 读操作。

总之对于内存规划的建议如下。

  • 尽量分配更多的内存给操作系统的 page cache。
  • 不要为broker设置过大的堆内存,最好不超过 6GB。
  • page cache 大小至少要大于一个日志段的大小。

    1.5 CPU规划

    比起磁盘和内存,CPU 于 Kafka 而言并没有那么重要— 严格来说,Kafka 不属于计算密集型(CPU七ound) 的系统,因此对于 CPU 需要记住一点就可以了:追求多核而非高时钟频 率。

简单来说,Kafka 的机器有 16 个 CPU 核这件事情比该机器 CPU 时钟高达 4GHz 更加重要,
因为Kafka可能无法充分利用这4GHz的频率,但几乎肯定会用满16个CPU核。Kafka broker 通常会创建几十个后台线程,再加上多个垃圾回收线程,多核系统显然是最佳的配置选择。

当然,凡事皆有例外。若 clients 端启用了消息压缩,那么除了要为 clients 机器分配足够 的 CPU 资源外, broker 端也有可能需要大量的 CPU 资源—尽管 Kafka 0.10.0.0 改进了在 broker 端的消息处理,免除了解压缩消息的负担以节省磁盘占用和网络带宽,但并非所有情况 下都可以避免这种解压缩(比如 clients 端和 broker 端配置的消息版本号不匹配)。若出现这 种情况,用户就需要为 broker 端的机器也配置充裕的 CPU 资源。

基于以上的判断依据,我们对 CPU 资源规划的建议如下。

  • 使用多核系统,CPU 核数最好大于 8。
  • 如果使用Kafka0.10.0.0之前的版本或clients端与broker端消息版本不一致(若无显式 配置,这种情况多半由 clients 和 broker 版本不一致造成),则考虑多配置一些资源以
    防止消息解压缩操作消耗过多 CPU

    1.6 带宽规划

    对于 Kafka 这种在网络间传输大量数据的分布式数据管道而言,带宽资源至关重要,并且 特别容易成为系统的瓶颈,因此一个快速且稳定的网络是 Kafka 集群搭建的前提条件。低延时 的网络以及高带宽有助于实现 Kafka 集群的高吞吐量以及用户请求处理低延时。
    当前主流的网络环境皆是使用以太网,带宽主要也有两种:lGb/s 和 10Gb/s,即平时所说的千兆位网络和万兆位网络。无论是哪种带宽,对于大多数的 Kafka 集群来说都足矣了。
    根据带宽求机器资源: ``` 假设:1GB/s带宽 ,1TB数据/每小时=290MB/s kafka能消耗带宽为70%,则是710Mb/s ,当然这是最高带宽,但是万一碰到突发流量基本网卡就被“打满”,因此我们取1/3 则获取一个保守的数据 240mb/s (当然这是个保守的,可以酌情考虑)

290X8=2336Mb/s(MB=>>mb)

2336/10=233mb/s则需要10 台broker 然后保存两份则是20台broker

  1. 关于带宽资源方面的规划,用户还需要注意的是尽量避免使用跨机房的网络环境,特别是 那些跨城市甚至是跨大洲的网络。因为这些网络条件下请求的延时将会非常高,不管是 broker 端还是 clients 端都需要额外做特定的配置才能适应<br />综合上述内容,我们对带宽资源规划的建议如下。
  2. - 尽量使用高速网络。
  3. - 根据自身网络条件和带宽来评估 Kafka 集群机器数量。
  4. - 避免使用跨机房网络。
  5. <a name="08a4ac5d"></a>
  6. ## 1.7 典型线上环境配置
  7. 下面给出一份典型的线上环境配置,用户可以参考这份配置以及结合自己的实际情况进行二次调整:
  8. - CPU 24
  9. - 内存 32GB
  10. - 磁盘 lTB 7200 SAS 盘两块
  11. - 带宽1Gb/s
  12. - ulimit -n 1000000
  13. - Socket Buffer 至少64KB,适用于跨机房网络传输
  14. <a name="5ac65cc5"></a>
  15. # 二 伪分布式环境安装[适合开发环境]
  16. 单节点伪分布式环境是指集群由一台 ZooKeeper 服务器和一台 Kafka broker 服务器组成。<br />![image.png](https://cdn.nlark.com/yuque/0/2020/png/1502688/1592986811763-4fc1e336-e102-4524-ab0b-68acaf308137.png#align=left&display=inline&height=285&margin=%5Bobject%20Object%5D&name=image.png&originHeight=570&originWidth=1778&size=322395&status=done&style=none&width=889)
  17. <a name="0b1f1f12"></a>
  18. ## 2.1 linux 伪分布式环境安装
  19. <a name="c8989e4d"></a>
  20. ### 2.1.1 安装Java环境
  21. Java 虚拟机版本使用 Java 8, 并且使用比较 成熟的 Oracle 公司的 HotSpot 虚拟机。

centos 7 执行 新增文件夹

cd /usr/local/ mkdir java

上传文件到linux

scp jdk-8u241-linux-x64.tar.gz root@192.168.64.201:/usr/local/java

解压重命名

tar -zxvf jdk-8u241-linux-x64.tar.gz mv jdk1.8.0_241 jdk1.8

配置环境变量

vi /etc/profile

JAVA_HOME=/usr/local/java/jdk1.8 JRE_HOME=/usr/local/java/jdk1.8/jre PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib export JAVA_HOME JRE_HOME PATH CLASSPATH

刷新配置

source /etc/profile

测试安装结果

java -version


安装完毕<br />![image.png](https://cdn.nlark.com/yuque/0/2020/png/1502688/1592986845480-b0d10594-771e-425c-98b8-e35b45edd8d9.png#align=left&display=inline&height=278&margin=%5Bobject%20Object%5D&name=image.png&originHeight=556&originWidth=1824&size=157803&status=done&style=none&width=912)
<a name="fe2f8a5e"></a>
### 2.1.2 安装zookeeper环境
```bash
#centos 7 执行 新增文件夹
cd /usr/local/
mkdir zookeeper

#上传文件到linux
scp zookeeper-3.4.14.tar.gz root@192.168.64.201:/usr/local/zookeeper

#解压重命名
 tar -zxvf zookeeper-3.4.14.tar.gz
#编辑配置文件
 cp zoo_sample.cfg zoo.cfg

#新增文件夹
mkdir -p /usr/local/zookeeper/data

#更新里面的配置 zoo.cfg
dataDir=/usr/local/zookeeper/data

#测试是否安装成功
cd /usr/local/zookeeper/zookeeper-3.4.14/bin
./zkServer.sh start
./zkServer.sh status 
ps -ef | grep zookeeper

#设置zookeeper开机自启动
cd /etc/init.d

vi zookeeper 

#脚本
#!/bin/bash 
#chkconfig:2345 20 90 
#description:zookeeper 
#processname:zookeeper 
ZK_PATH=/usr/local/zookeeper/zookeeper-3.4.14
export JAVA_HOME=/usr/local/java/jdk1.8
case $1 in 
        start) sudo $ZK_PATH/bin/zkServer.sh start;; 
        stop) sudo $ZK_PATH/bin/zkServer.sh stop;; 
        status) sudo $ZK_PATH/bin/zkServer.sh status;; 
        restart) sudo $ZK_PATH/bin/zkServer.sh restart;; 
        *) echo "require start|stop|status|restart" ;; 
esac

#注册为服务
chkconfig --add zookeeper
chkconfig zookeeper on
chkconfig --list

#测试验证 
service zookeeper start
chmod +x zookeeper

安装完毕
image.png

2.1.3 安装单节点Kafka集群

#centos 7 执行 新增文件夹
mkdir -p /usr/local/kafka

#上传文件到linux
scp kafka_2.12-2.5.0.tgz root@192.168.64.201:/usr/local/kafka

#解压重命名
 tar -zxvf kafka_2.12-2.5.0.tgz

#创建数据文件夹 
mkdir -p /usr/local/kafka/data-log

#配置配置地址:/usr/local/kafka/kafka_2.12-2.5.0/config/server.properties

log.dirs=/usr/local/kafka/data-log

#启动kafka
bin/kafka-server-start# sh -daemon config/server* properties

#查看 
jps


# 将kafka的开机脚本文件新建到/etc/inti.d目录下
vi /etc/init.d/kafka

# 修改执行的权限 
chmod 755 /etc/init.d/kafka

#配置开机并启动
#!/bin/sh
#chkconfig: 2345 10 90
#description: kafka  service
#设置java安装路径
export JAVA_HOME=/usr/local/java/jdk1.8
KA_PATH=/usr/local/kafka/kafka_2.12-2.5.0
#切换到kafka的解压目录
cd $KA_PATH/bin
echo "kafka start .............."
./kafka-server-start.sh -daemon ../config/server.properties
echo "kafka end ................"

#注册为服务
chkconfig --add kafka
chkconfig kafka on

#验证测试
service kafka start

安装完毕
image.png

2.2 docker 伪分布式环境安装

2.2.1 docker 安装 zookeeper

docker pull zookeeper:3.5


  docker run --name:zookeeper -p 2181:2181 -d \
  --net=zk-net \
  --ip 172.19.0.100 \
  zookeeper:3.5

2.2.2 docker 安装 kafka

docker pull wurstmeister/kafka

 docker run -d --name kafka \
-p 9092:9092 \
--net=zk-net \
--ip 172.19.0.101 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=172.19.0.100:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.19.0.101:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

三 多节点环境安装[适合生产环境]

单节点的 Kafka 伪集群应付日常的应用开发或是功能验证绰绰有余,但若在生产环境中直接使用则无法充分利用 Kafka 提供的分布式特性,比如负载均衡和故障转移等。此外,Kafka 还具有优秀的准线性扩容的能力,因此用户可以很容易地扩展 kafka 节点数量以应对不断增长 的消息处理需求。同时由于 Kafka 提供了完备的备份机制,多节点集群天然地为用户提供高可用保障,极大地降低了人工维护的成本。
从本质上来说,多节点 Kafka 集群由一套多节点 ZooKeeper 集群和一套多节点 Kafka 集群组成.image.png

3.1 linux多节点环境安装

3.1.1 安装多节点 ZooKeeper 集群

ZooKeeper 集群通常被称为一个 ensemble。只要这个 ensemble 中的大多数节点存活,那么 ZooKeeper 集群就能正常提供服务。显然,既然是大多数, 那么最好使用奇数个服务器,即 2n+ 1 个服务器。如果使用偶数个服务器则通常会浪费一台服务器的资源。
基于上面的规则,一个生产环境中最少的 ZooKeeper 集群节点数量是 3

好了,在搭建之前我们先了解配置文件参数的意义,下面是一份典型的配置
zoo.cfg文件

tickTime=2000
dataDir=/usr/zookeeper/data_dir
clientPort=2181
initLimit=5
syncLimit=2
server l=zkl:2888:3888
server 2=zk2:2888:3888
server 3=zk3;2888:3888

比较关键的参数含义如下。

  • tickTime:ZooKeeper 最小的时间单位,用于丈量心跳时间和超时时间等。通常设置成 默认值 2 秒即可。
  • dataDir:非常重要的参数!ZooKeeper 会在内存中保存系统快照,并定期写入该路径 指定的文件夹中。生产环境中需要注意该文件夹的磁盘占用情况。
  • clientPort:ZooKeeper 监听客户端连接的端口,一般设置成默认值 2181 即可。
  • initLimit:指定follower节点初始时连接leader节点的最大tick次数。假设是5,表示 follower 必须要在 5 x tickTime 时间内 (默认是 10 秒)连接上 leaden 否则将被视为 超时。
  • syncLimit:设定了 follower 节点与 leader 节点进行同步的最大时间。与 initLimit 类似,
    它也是以 tickTime 为单位进行指定的。
  • server.X=host:port:port;配置文件中的最后3行都是这种形式的。这里的X必须是一个全局唯一的数字,且需要与myid文件中的数字相对应(关于myid 文件的设置稍后会做详细讨论)。一般设置 X 值为 1~255 之间的整数。这行的后面还配置了两个端口, 通常是2888和3888。第一个端口用于使follower节点连接 leader节点,而第二个端口则用于leader选举。

下面以三个节点集群为案例:
先看配置文件
zoo1.cfg

ticklime=2000
dataDir=/usr/local/zookeeper/data-log/zk1
clientPort=2181 
initLimit=5 
syncLimit =2
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

zoo2.cfg

ticklime=2000
dataDir=/usr/local/zookeeper/data-log/zk2
clientPort=2182 
initLimit=5 
syncLimit =2
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

zoo3.cfg

ticklime=2000
dataDir=/usr/local/zookeeper/data-log/zk3
clientPort=2183
initLimit=5 
syncLimit =2
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

上面的配置文件中我们分别选取了 2181、2182 和 2183 这 3 个端口。如果是多机器安装方案,指定相同的端口号也是可以的,只要确保没有端口冲突就行,这也包括配置文件中的所有其他端口

下面设置 myid

echo "1" > /usr/local/zookeeper/data-log/zk1/myid
echo "2" > /usr/local/zookeeper/data-log/zk2/myid
echo "3" > /usr/local/zookeeper/data-log/zk3/myid

启动命令

bin/zkServer.sh start conf/zoo1.cfg
bin/zkServer.sh start conf/zoo2.cfg
bin/zkServer.sh start conf/zoo3.cfg

测试是否成功
jps
安装完毕
image.png

值得一提的是,如果是多台机器只是端口不同而已,其他一致。

3.1.2 安装多节点 Kafka

安装多节点的 Kafka 比安装多节点的 ZooKeeper 要简单得多,我们只需要创建多份配置文 件,然后指定它们启动 Kafka 服务即可。本例中依然使用一台机器来模拟一个 3 节点 Kafka 集 群的搭建
config/server1.properties

broker.id=0
delete.topic.enable=true 
listeners=PLAINTEXT://192.168.64.202:9092
log.dirs=/usr/local/kafka/data-log/kafka1
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
unclean.leader.election.enable=false
zookeeper.connection.timeout.ms=6000

config/server2.properties

broker.id=1
delete.topic.enable=true 
listeners=PLAINTEXT://192.168.64.202:9093
log.dirs=/usr/local/kafka/data-log/kafka2
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
unclean.leader.election.enable=false
zookeeper.connection.timeout.ms=6000

config/server3.properties

broker.id=2
delete.topic.enable=true 
listeners=PLAINTEXT://192.168.64.202:9094
log.dirs=/usr/local/kafka/data-log/kafka3
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
unclean.leader.election.enable=false
zookeeper.connection.timeout.ms=6000

创建 Kafka 配置文件之后,剩下的工作就很简单了,只需要执行下列命令启动 Kafka broker 服务器

bin/kafka-server-start.sh -daemon config/server1.properties
bin/kafka-server-start.sh -daemon config/server2.properties
bin/kafka-server-start.sh -daemon config/server3.properties

安装完成
jps
image.png

3.2 docker多节点环境安装

因为 docker 提供了编排技术,我这里主要是编写编排文件,因为如果我们要一个一个run,太麻烦了,不利于维护发布。

3.2.1 编写编排文件

vi docker-compose.yaml

version: '2'

services:
  zoo2:
    image: zookeeper:3.5
    restart: always
    hostname: zoo2
    container_name: zoo2
    privileged: true
    ports:
      - 2184:2181
    volumes:
      - /docker/zookeeper/zk2/conf/zoo.cfg:/conf/zoo.cfg
      - /docker/zookeeper/zk2/data:/data
      - /docker/zookeeper/zk2/logs:/datalog
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888 server.2=zoo3:2888:3888 server.3=zoo4:2888:3888
    networks:
      zk-net:
        ipv4_address: 172.19.0.2
  zoo3:
    image: zookeeper:3.5
    restart: always
    hostname: zoo3
    container_name: zoo3
    privileged: true
    ports:
      - 2185:2181
    volumes:
      - /docker/zookeeper/zk3/conf/zoo.cfg:/conf/zoo.cfg
      - /docker/zookeeper/zk3/data:/data
      - /docker/zookeeper/zk3/logs:/datalog
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo2:2888:3888 server.2=0.0.0.0:2888:3888 server.3=zoo4:2888:3888
    networks:
      zk-net:
        ipv4_address: 172.19.0.3
  zoo4:
    image: zookeeper:3.5
    restart: always
    hostname: zoo4
    container_name: zoo4
    privileged: true
    ports:
      - 2186:2181
    volumes:
      - /docker/zookeeper/zk4/conf/zoo.cfg:/conf/zoo.cfg
      - /docker/zookeeper/zk4/data:/data
      - /docker/zookeeper/zk4/logs:/datalog
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo2:2888:3888 server.2=zoo3:2888:3888 server.3=0.0.0.0:2888:3888
    networks:
      zk-net:
        ipv4_address: 172.19.0.4

  kafka1:
    image: kafka
    restart: always
    hostname: kafka1
    container_name: kafka1
    privileged: true
    ports:
      - 9092:9092
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 172.19.0.14
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zoo2:2181,zoo3:2181,zoo4:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.19.0.14:9092
      KAFKA_LISTENERS: PLAINTEXT://172.19.0.14:9092
    volumes:
      - /docker/kafka/kafka1/logs:/kafka
    external_links:
      - zoo2
      - zoo3
      - zoo4
    networks:
      zk-net:
        ipv4_address: 172.19.0.14

  kafka2:
    image: kafka
    restart: always
    hostname: kafka2
    container_name: kafka2
    privileged: true
    ports:
      - 9093:9093
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 172.19.0.15
      KAFKA_ADVERTISED_PORT: 9093
      KAFKA_ZOOKEEPER_CONNECT: zoo2:2181,zoo3:2181,zoo4:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.19.0.15:9093
      KAFKA_LISTENERS: PLAINTEXT://172.19.0.15:9093
    volumes:
      - /docker/kafka/kafka2/logs:/kafka
    external_links:
      - zoo2
      - zoo3
      - zoo4
    networks:
      zk-net:
        ipv4_address: 172.19.0.15

  kafka3:
    image: kafka
    restart: always
    hostname: kafka3
    container_name: kafka3
    privileged: true
    ports:
      - 9094:9094
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 172.19.0.16 
      KAFKA_ADVERTISED_PORT: 9094
      KAFKA_ZOOKEEPER_CONNECT: zoo2:2181,zoo3:2181,zoo4:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.19.0.16:9094
      KAFKA_LISTENERS: PLAINTEXT://172.19.0.16:9094
    volumes:
      - /docker/kafka/kafka3/logs:/kafka
    external_links:
      - zoo2
      - zoo3
      - zoo4
    networks:
      zk-net:
        ipv4_address: 172.19.0.16

  kafka-manager:
    image: kafka-manager
    restart: always
    container_name: kafa-manager
    hostname: kafka-manager
    privileged: true
    ports:
      - 9000:9000
    links:            # 连接本compose文件创建的container
      - kafka1
      - kafka2
      - kafka3
    external_links:   # 连接本compose文件以外的container
      - zoo2
      - zoo3
      - zoo4
    environment:
      ZK_HOSTS: zoo2:2181,zoo3:2181,zoo4:2181
      KAFKA_BROKERS: 172.19.0.14:9092,172.19.0.15:9093,172.19.0.16:9094
      APPLICATION_SECRET: letmein
      KM_ARGS: -Djava.net.preferIPv4Stack=true
    networks:
      zk-net:
        ipv4_address: 172.19.0.17

networks:
  zk-net:
    external:
      name: zk-net

这里主要配置两个集群,一个是zookeeper ,另一个是kafka的集群。

3.2.2 创建相关操作

  • 创建网卡

docker network create —subnet=172.19.0.1/24 zk-net

  • 创建 zk 配置文件
    mkdir -p /docker/zookeeper/zk2/conf

    tickTime=2000
    initLimit=5
    syncLimit=2
    dataDir=/data
    dataLogDir=/datalog
    clientPort=2181
    maxClientCnxns=60
    server.1=172.19.0.2:2888:3888
    server.2=172.19.0.3:2888:3888
    server.3=172.19.0.4:2888:3888
    

    mkdir -p /docker/zookeeper/zk3/conf

    tickTime=2000
    initLimit=5
    syncLimit=2
    dataDir=/data
    dataLogDir=/datalog
    clientPort=2181
    maxClientCnxns=60
    server.1=172.19.0.2:2888:3888
    server.2=172.19.0.3:2888:3888
    server.3=172.19.0.4:2888:3888
    

    mkdir -p /docker/zookeeper/zk4/conf

    tickTime=2000
    initLimit=5
    syncLimit=2
    dataDir=/data
    dataLogDir=/datalog
    clientPort=2181
    maxClientCnxns=60
    server.1=172.19.0.2:2888:3888
    server.2=172.19.0.3:2888:3888
    server.3=172.19.0.4:2888:3888
    
  • 创建 zk 挂目录

mkdir -p /docker/zookeeper/zk2/data
mkdir -p /docker/zookeeper/zk2/log
mkdir -p /docker/zookeeper/zk3/data
mkdir -p /docker/zookeeper/zk3/log
mkdir -p /docker/zookeeper/zk4/data
mkdir -p /docker/zookeeper/zk4/log

  • 创建 kafka 挂载目录

mkdir -p /docker/kafka/kafka1/logs
mkdir -p /docker/kafka/kafka2/logs
mkdir -p /docker/kafka/kafka3/logs

3.2.3 启动kafka集群

docker-compose up -d

四 部署验证

成功搭建起多节点的 Kafka 集群还不够,我们还需要验证线上环境是没有错误且可以使用的。下面将从以下几个方面分别来验证 Kafka 集群部署的正确性。

  • 测试 topic 创建与删除
  • 测试消息的生产与发送
  • 生产者吞吐量测试
  • 消费者吞吐量测试

    4.1 测试topic创建与删除

#创建 topic
bin/kafka-topics.sh --create --bootstrap-server 192.168.0.112:9092,192.168.0.112:9093,192.168.0.112:9094 --replication-factor 1 --partitions 1 --topic test
#查看 topic 信息
bin/kafka-topics.sh --describe  --bootstrap-server localhost:9092,localhost:9093,localhost:9094 test
#查看topic 列表
bin/kafka-topics.sh --list   --bootstrap-server 192.168.0.112:9092,192.168.0.112:9093,192.168.0.112:9094 
#删除topic 注意delete.topic.enable=true 才能删除,否则只能是待删除:Topic xxx is marked for deletion
bin/kafka-topics.sh --delete --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic test

docker-compose.yml
有些时候,可能发现topic没有被成功删除,这可能是topic分区数过多或数据过多的原因。Kafka 当前只能一个分区一个分区 地删除数据,无法做到同时删除,因此我们可以查询底层的文件系统来判断删除操作是否在正 常执行。如果发现删除操作停滞了,这通常表明该 Kafka 集群有问题,此时便需要进一步地详
查才能确定无法删除的真正原因,比如是否有分区正处于被分配过程中等

4.2 测试消息发送与消费

#发送消息
bin/kafka-console-producer.sh --broker-list  192.168.0.112:9092,192.168.0.112:9093,192.168.0.112:9094 --topic test
hello,world
hello,kafka

#消费消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic test --from-beginning

4.3 生产者吞吐量测试

#kafka自带脚本测试吞吐量  num-records:压测消息数,record-size:1000字节,throughput:每秒发送2000条
bin/kafka-producer-perf-test.sh  --topic test --num-records 100000 --record-size 1000  --throughput 2000 --producer-props bootstrap.servers=localhost:9092,localhost:9093,localhost:9094

image.png
注意这只是我电脑虚拟机集群在一个电脑上,而且手机分享的带宽

表明Kafka producer的平均吞吐量是1.9MB/S,即占用 1.9X8=16Mb/s 左右的带宽,平均每秒能发送 1999 条消息,平均延吋是 2.36 秒,最大延时是1.34/1000秒,平均有50%的消息发送需要花费1/1000秒,95%的消息发送需要花费1/1000秒,99%的消息发送需要花费34/1000 秒,而 99,9%的消息发送需要花费83/1000 秒。

4.4 消费者吞吐量测试

bin/kafka-consumer-perf-test.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --fetch-size 1000 --messages 100000 --topic test --threads 1

image.png
注意这只是我电脑虚拟机集群在一个电脑上,而且手机分享的带宽
表明本次测试共消费95MB 左右的消息,吞吐量大约是 5.9MB/S

五 重要参数设置说明

前面我们通过 Kafka 提供的各种脚本工具验证了 Kafka 集群能够正常工作。接下来,需要
讨论 Kafka 集群涉及的各方面参数,主要包括以下几种参数。

  • broker 端参数。
  • topic 级別参数。
  • GC 配置参数。
  • JVM参数。

    5.1 broker 端参数

    broker 端参数需要在 Kafka 目录下的 config/server.properties 文件中进行设置。当前对于绝 大多数的 broker 端参数而言,Kafka 尚不支持动态修改—这就是说,如果要新增、修改,抑或是删除某些 broker 参数的话,需要重启对应的 broker 服务器。下面就让我们来看看主要的 参数配置。
    broker.id
    Kafka 使用唯一的一个整数来标识每个 broker, 这就是 broker.id。该参数默认是-1。如果不指定,Kafka会自动生成一个唯一值。总之,不管用户指定什么都必须保证该值在 Kafka 集群中是唯一的,不能与其他 broker 冲突。在实际使用中,推
    荐使用从 0 开始的数字序列,如 0、1、2⋯⋯
    log.dirs
    非常重要的参数!该参数指定了 Kafka持久化消息的目录。若待保存的消息数量非常多,那么最好确保该文件夹下有充足的磁盘空间。该参数可以设置多个目 录,以逗号分隔,比如/home/kafkal,/home/kafka2。在实际使用过程中,指定多个目录 的做法通常是被推荐的,因为这样 Kafka 可以把负载均匀地分配到多个目录下。
    zookeeper.connect
    同样是非常重要的参数。如果说前两个参数还有默认值可以使用 的话(虽然极其不推荐将默认值应用到线上环境),那么此参数则完全没有默认值,是必须要设置的。
    listeners
    该参数主要用于客户端连接broker使用,可以认为是broker端开放给 clients 的监听端口。如果不指定主机名,则表示绑定默认网卡;如果主机名是 0.0.0.0, 则表示 绑定所有网卡。Kafka 当前支持的协议类型包括 PLAINTEXT、SSL 及 SASL_SSL 等
    unclean.leader.election.enable
    是否开启 unclean leader选举。何为 unclean leader选举?
    。ISR 中的所有副本都有资格随时成为新的 leader,
    但若 ISR 变空而此时 leader 又宕机了,Kafka 应读如何选举新的 leader 呢?为了不影响Kafka服务,该参数默认值是false,即表明如果发生这种情况,Kafka不允许从剩下存活的非ISR副本中选择一个当 leader。因为如果允许,这样做固然可以让 Kafka 继续提供服务给 clients, 但会造成消息数据的丟失,而在一般的用户使用场景中,数据不丢 失是基本的业务需求,因此设置此参数为 false 显得很有必要。
    delete.topic.enable
    否允许 Kafka 删除 topic,推荐可以删除,设置为true。
    log.retention{hours|minutes|ms}
    这组参数控制了消息数据的留存时间,它们是“三兄弟”。如果同时设置,优先选取 ms 的设置,minutes 次之,hours 最后。有了这 3 个 参数,用户可以很方便地在 3 个时间维度上设置日志的留存时间。默认的留存时间是 7 天,即 Kafka 只会保存最近 7 天的数据,并自动删除 7 天前的数据。当前较新版本的 Kafka 会根据消息的时间戳信息进行留存与否的判断。对于没有时间戳的老版本消息 格式,Kafka 会根据日志文件的最近修改时间(last modified time) 进行判断。可以这 样说,这组参数定义的是时间维度上的留存策略。实际线上环境中,需要根据用户的
    业务需求进行设置。保存消息很长时间的业务通常都需要设置一个较大的值
    log.retention.bytes
    如果说上面那组参数定义了时间维度上的留存策略,那么这个参 —
    数便定义了空间维度上的留存策略,即它控制着 Kafka 集群需要为每个消息日志保存多大的数据。对于大小超过该参数的分区日志而言,Kafka 会自动清理该分区的过期
    日志段文件。该参数默认值是-1,表示Kafka永远不会根据消息日志文件总大小来删除日志。和上面的参数一样,生产环境中需要根据实际业务场景设置该参数的值。
    min.insync.replicas
    该参数主要配合 producer 端的acks 参数配合使用的,当acks=-1时才有意义。他指定了broker端必须成功响应clients消息发送的最少副本数,假如broker无法满足该条件,则clients的消息发送,并不会被视为发送成功。实际应用中推荐为-1。举一个例子,假设某个 topic 的每个分区的副本数是 3, 那么推荐设置该参数为 2, 这样我们就能够容忍一台 broker 宕机而不影响服务;若设置参数为 3, 那么只要任何一台 broker 宕机,整个Kafka集群将无法继续提供服务。因此用户需要在高可用和数据一致性之间取得平衡。
    num.network.threads
    一个非常重要的参数。它控制了一个broker在后台用于处理网 络请求的线程数,默认是 3。通常情况下,broker 启动时会创建多个线程处理来自其他broker和clients发送过来的各种请求。注意,这里的“处理”其实只是负责转发请求, 它会将接收到的请求转发到后面的处理线程中。在真实的环境中,用户需要不断地监 控 NetworkProcessorAvgldlePercent JMX 指标。如果该指标持续低于 0.3,建议适 当增加该参数的值。
    num.io.threads
    这个参数就是控制 broker 端实际处理网络请求的线程数,默认值是 8, 即 Kafka broker 默认创建 8 个线程以轮询方式不停地监听转发过来的网络请求并进 行实时处理。Kafka 同样也为请求处理提供了一个 JMX 监控指标 Request HandlerAvgldlePercent。如果发现该指标持续低于 0.3,则可以考虑适当增加该参数的值。
    message.max.bytes
    Kafka broker能够接收的最大消息大小,默认是977KB,还达不到1MB,可见是非常小的,实际场景中,突破1MB大小的消息十分常见。因此要综合考虑最大消息的尺寸,并设置该参数值。
    以上是较为重要的参数,其他参数参见:http://kafka.apache.org/documentation/#configuration

    5.2 topic 级别参数

    指覆盖 broker 端全局参数。每个不同的 topic 都可以设置自己的参数值
    delete.retention.ms
    每个 topic 可以设置自己的日志留存时间以覆盖全局默认值
    max.message.bytes
    覆盖全局的 message.max.bytes, 即为每个 topic 指定不同的最大消息尺寸
    retention.bytes
    覆盖全局的 log.retention.bytes,每个 topic 设置不同的日志留存尺寸

    5.3 GC 参数

    Kafka broker端代码虽然是用Scala语言编写的,但终归要编译为.class文件在JVM上运行。
    既然是 JVM 上面的应用,垃圾回收(garbage collection, GC) 参数的设置就显得非常重要
    推荐始终使用 G1 收集器,不论是在 broker 端还是在 clients 端

    5.4 JVM参数

    在实际使用中,通常为 broker 设置不超过 6GB 的堆空间。以下 就是一份典型的生产环境中的 JVM 参数列表

    -Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+DseGlGC -XX:MaxGCPauseMillis-20
    -XX:InitiatingHeapOccupancyPercent=35 -XX:GlHeapRegionSize=16M -XX: MinMe- taspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80
    

    5.5 os参数

    通常情况下,Kafka 并不需要太多的 0S 级别的参数调优,但依然有一些 OS 参数是必须要 调整的。

  • 文件描述符限制

Kafka 会频繁地创建并修改文件系统中的文件,这包括消息的口志
文件、索引文件及各种元数据管理文件等.
因此在实际使用场景中最好首先增大进程能够打开的最大文件描述符上限, 比如设置一个很大的值,如 100000。具体设置方法为 ulimit -n 100000

  • Socket 缓冲区大小

这里指的是 0S 级别的 Socket 缓冲区大小,而非 Kafka 自己提供 的 Socket 缓冲区参数。事实上,Kafka 自己的参数将其设置为 64KB,这对于普通的内 网环境而言通常是足够的,因为内网环境下往返时间(round-trip time,RTT) 一般都 很低,不会产生过多的数据堆积在 Socket 缓冲区中,但对于那些跨地区的数据传输而 言,仅仅增加 Kafka 参数就不够了,因为前者也受限于 0S 级别的设置。因此如果是 做远距离的数据传输,那么建议将 OS 级别的 Socket 缓冲区调大,比如增加到 128KB, 甚至更大。

  • 最好使用Ext4 或 XFS文件系统

其实 Kafka 操作的都是普通文件,并没有依赖于特 定的文件系统,但依然推荐使用 Ext4 或 XFS 文件系统,特别是 XFS 通常都有着更好的性能。这种性能的提升主要影响的是 Kafka 的写入性能。根据官网的测试报告,使 用 XFS 的写入时间大约是 160 毫秒,而使用 Ext4 大约是 250 毫秒。因此生产环境中最 好使用 XFS 文件系统。

  • 关闭 swap

大幅度降低对swap空间的使用,以免极大地拉 低性能

  • 设置更长的flush时间

我们知道 Kafka 依赖 OS 页缓存的“刷盘”功能实现消息真正 写入物理磁盘,默认的刷盘间隔是 5 秒。通常情况下,这个间隔太短了,适当增加该 值可以在很大程度上提升 OS 物理写入操作的性能。Linkedlri 公司自己就将该值设置为
2 分钟以增加整体的物理写入吞吐量

总结

我们着重讲述了如何搭建 Kafka 生产环境以及设置 Kafka 集群需要考虑的各方面参数,并且针对如何评估 Kafka 线上集群分别从各个方面进行了探讨。相信各位在阅读完之后己经可以独立地根据自身业务的需要搭建起 Kafka生产集群。