集群方案
- 操作系统, 选linux
I/O模型: Kafka 客户端底层使用了 Java 的 selector,selector 在 Linux 上的实现机制是 epoll,而在 Windows 平台上的实现机制是 select。因此在这一点上将 Kafka 部署在 Linux 上是有优势的,因为能够获得更高效的 I/O 性能。
补充知识点: 主流I/O模型:
- 阻塞式I/O, 如java中Socket对象的阻塞模式
- 非阻塞式I/O, 如java中Socket对象的非阻塞模式
- I/O多路复用, 如LInux中的select函数, epoll系统调用介于词条和下一条之间
- 信号驱动I/O
- 异步I/O, 如Window系统的IOCP线程模型
数据网络传输效率: 在 Linux 部署 Kafka 能够享受到零拷贝技术所带来的快速数据传输特性。
零拷贝(Zero Copy)技术,就是当数据在磁盘和网络进行传输时避免昂贵的内核态数据拷贝从而实现快速的数据传输。Linux 平台实现了这样的零拷贝机制,但有些令人遗憾的是在 Windows 平台上必须要等到 Java 8 的 60 更新版本才能“享受”到这个福利。
社区支持度: Windows 上的 Bug 一般是不会修复的
- 磁盘, 推荐机械磁盘
- Kafka使用磁盘都是顺序读写, SSD没有太大的性能优势, 机械磁盘性价比更突出
- 机械磁盘易损坏等缺陷, 可通过Kafka在软件层面的机制来保证
- 追求性价比的公司可以不搭建 RAID,使用普通磁盘组成存储空间即可。
- 磁盘容量, 需考虑 新增消息数, 消息留存时间, 平均消息大小, 备份数, 是否启用压缩等因素
示例: 假设你所在公司有个业务每天需要向 Kafka 集群发送 1 亿条消息,每条消息保存两份以防止数据丢失,另外消息默认保存两周时间。现在假设消息的平均大小是 1KB,那么你能说出你的 Kafka 集群需要为这个业务预留多少磁盘空间吗?
我们来计算一下:每天 1 亿条 1KB 大小的消息,保存两份且留存两周的时间,那么总的空间大小就等于 1 亿 1KB 2 / 1000 / 1000 = 200GB。一般情况下 Kafka 集群除了消息数据还有其他类型的数据,比如索引数据等,故我们再为这些数据预留出 10% 的磁盘空间,因此总的存储容量就是 220GB。既然要保存两周,那么整体容量即为 220GB 14,大约 3TB 左右。Kafka 支持数据的压缩,假设压缩比是 0.75,那么最后你需要规划的存储空间就是 0.75 3 = 2.25TB。
- 带宽, 本质是服务器数量
假设你公司的机房环境是千兆网络,即 1Gbps,现在你有个业务,其业务目标或 SLA 是在 1 小时内处理 1TB 的业务数据。那么问题来了,你到底需要多少台 Kafka 服务器来完成这个业务呢?
通常假设Kafka使用70%的带宽资源, 即单台服务器最大带宽资源为700Mb, 通常情况会预留2/3的资源(2/3较为保守, 实际可降低), 即使用带宽为700/3~240Mb, 需求为1h处理1TB, 即1s处理2336MB, 即需要2336/240~10台服务器, 若消息需要复制两份, 则总服务器为10*3=30台.
总结
集群参数
Broker端
存储信息相关
log.dirs: broker文件目录配置
- 指定 Broker 需要使用的若干个文件目录路径, 无默认值
相似参数log.dir只能指定一个路径, 建议只配置dirs, 将目录挂载到不同的物理磁盘上, 一是提升读写性能, 二是能实现故障转移(Failover)
从1.1开始, Kafka的Failover机制会将坏掉的磁盘上的数据自动转移到其他正常的磁盘, 且Broker能正常工作
示例: /home/kafka1,/home/kafka2,/home/kafka3
Broker连接相关
listeners: 监听器配置
advertised.listeners: Broker用于对外发布的监听器配置
- 示例: CONTROLLER: //localhost:9092, 格式为<协议名称,主机名,端口号>
- 协议名称可能是标准的名字,比如 PLAINTEXT 表示明文传输、SSL 表示使用 SSL 或 TLS 加密传输等;也可能是你自己定义的协议名字,比如CONTROLLER
- 一旦你自己定义了协议名称,你必须还要指定listener.security.protocol.map参数告诉这个协议底层使用了哪种安全协议,比如指定listener.security.protocol.map=CONTROLLER:PLAINTEXT表示CONTROLLER这个自定义协议底层使用明文不加密传输数据。
- 主机名, 建议Broker端和Client应用端都使用主机名, 而不是IP, 因为Broker源码中也用的主机名
Topic相关
auto.create.topics.enable:是否允许自动创建 Topic
- 建议false
unclean.leader.election.enable:是否允许 Unclean Leader 选举
- 建议false, 不让落后太多的副本竞选leader
auto.leader.rebalance.enable:是否允许定期进行 Leader 选举
- 建议false, 换leader成本非常高
数据留存相关
log.retention.{hours|minutes|ms}:消息数据保存时间
- 这是个“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hours 最低
- 通常设置hours, 如log.retention.hours=168表示默认保存 7 天的数据,自动删除 7 天前的数据
log.retention.bytes:指定 Broker 为消息保存的总磁盘容量大小
- 默认-1, 不做限制, 在saas场景下可用于对租户做限制
message.max.bytes:控制 Broker 能够接收的最大消息大小
- 默认1000012<1MB, 建议线上设为较大的值
zk相关
zookeeper是分布式协调框架, 负责协调管理并保存Kafka集群的所有元数据信息
zookeeper.connect: zk连接配置
- 示例: zk1:2181,zk2:2181,zk3:2181, 2181是zk的默认端口
- zk有个chroot的概念, 类似别名, 以实现多个kafka集群使用同一套zk集群时的隔离, 示例: 当有2个kafka集群使用同一个zk时, 可以分别这样配置zk1:2181,zk2:2181,zk3:2181/kafka1, zk1:2181,zk2:2181,zk3:2181/kafka2
(注意chroot只需在最后写一次)
Topic级别参数
若同时设置了Broker以及Topic参数, Topic级别参数会覆盖全局Broker参数的值
retention.ms:规定了该 Topic 消息被保存的时长
- 默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。
retention.bytes:规定了要为该 Topic 预留多大的磁盘空间
- 默认值-1不限制, 多租户时有用, 与全局参数作用相似
max.message.bytes: Kafka Broker 能够正常接收该 Topic 的最大消息大小
修改 Topic 级 max.message.bytes,还要考虑以下两个
还要修改 Broker的 replica.fetch.max.bytes 保证复制正常
消费还要修改配置 fetch.message.max.bytes
补充: topic级别参数可以在创建时和修改时设置, 建议始终使用第二种方式设置
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760
JVM参数
Kafka 服务器端代码是用 Scala 语言编写,终归还是编译成 Class 文件在 JVM 上运行
设置JVM堆为6GB, 默认1GB太小, Kafka Broker 在与客户端进行交互时会在 JVM 堆上创建大量的 ByteBuffer 实例,Heap Size 不能太小
如果 Broker 所在机器的 CPU 资源非常充裕,建议使用 CMS 收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC。否则,使用吞吐量收集器。开启方法是指定-XX:+UseParallelGC。
如果你在使用 Java 8,那么可以手动设置使用 G1 收集器
设置方式, 启动Broker前, 指定环境变量即可
$> export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g
$> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
$> bin/kafka-server-start.sh config/server.properties
操作系统参数
文件描述符限制, ulimit -n, 通常情况下将它设置成一个超大的值是合理的做法,比如ulimit -n 1000000, 不设置可能会经常看到“Too many open files”的错误。
文件系统类型, 根据官网的测试报告,XFS 的性能要强于 ext4,所以生产环境最好还是使用 XFS
Swappiness, 建议将 swappniess 配置成一个接近 0 但不为 0 的值,比如 1。因为一旦设置成 0,当物理内存耗尽时,操作系统会触发 OOM killer 这个组件,它会随机挑选一个进程然后 kill 掉,即根本不给用户任何的预警。但如果设置成一个比较小的值,当开始使用 swap 空间时,你至少能够观测到 Broker 性能开始出现急剧下降,从而给你进一步调优和诊断问题的时间
提交时间或者说是 Flush 落盘时间, 向 Kafka 发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据 LRU 算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。这个定期就是由提交时间来确定的,默认是 5 秒。一般情况下我们会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作.