调优目标

吞吐量和延时。

  1. 客户端版本和Broker端版本保持一致。否则可能丧失 Zero-Copy。
  2. 批次/微批处理。增加较小的时延,获取较大的吞吐。-
  3. Broker 端适当增加 num.replica.fetchers follower 同步的线程数。调优 GC 避免频繁发生 Full GC。
  4. Producer 端增大 batch.size,增大 liner.ms、
  5. 消费者端,采用多线程消费模型。

    1. 磁盘故障转移

    log.dirs 配置多个路径,比如 /home/h1,/home/h2。这是 Kafka 1.1 版本引入的强大功能。在之前,任意一块磁盘挂掉了,整个 Broker 进程也会挂掉。

    2. 通信设置

  • listeners:学名叫监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。
  • advertised.listeners:和 listeners 相比多了个 advertised。Advertised 的含义表示宣称的、公布的,就是说这组监听器是 Broker 用于对外发布的。
  • host.name/port:列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了。

监听器,它是由若干个逗号分隔的三元组,每个三元组的格式为 <协议名称, 主机名, 端口号>。协议包括内置协议:PLAINTEXT、SSL、TLS,也可以自定义协议 MY: //localhost:9092。一旦你指定了协议名称,你必须还指定 listener.security.protocol.map 参数告诉这个协议底层到底使用了哪种安全协议。比如 listener.security.protocol.map=MY:PLAINTEXT
还有一个建议就是主机名中建议使用主机名,而非 IP。

3. 安全

  • auto.create.topics.enable:是否允许自动创建 Topic。
  • unclean.leader.election.enable:是否允许 Unclean Leader 选举。 需要关闭。
  • auto.leader.rebalance.enable:是否允许定期进行 Leader 选举。这个应该是更换 Leader。

    4. 数据留存

  • log.retention.{hour|minutes|ms}:这是个“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hour 最低。

  • log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小。
  • message.max.bytes:控制 Broker 能够接收的最大消息大小。

    5. Topic 级别参数

  • retention.ms:规定了该 Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。

  • retention.bytes:规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间。

    6. JVM 参数

  • JVM 堆大小设置为 6GB

  • 收集器:如果 CPU 资源非常充裕,建议使用 CMS:-XX:+UseCurrentMarkSweepGC。否则使用吞吐量收集器:-XX:+UseParallelGC

    7. 操作系统

  • 文件描述符限制:ulimit -n 100000

  • 文件系统类型
  • Swappiness
  • 提交时间:脏页落盘。默认时间是 5s。一般情况下认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘写操作。

日均千亿 Kafka 频繁发生 ISR 变化

  1. 查看各节点日志,出现 Shrinking ISR,意味着 ISR 频繁收缩又扩张。使用 Kafka 监控发现该节点流出的流量不正常。
  2. 根据业务方反馈,那天增加了很多客户连接集群。

问题原因:突发增加这么多客户端连接,可能是 Broker 端处理不过来,导致请求阻塞,超时后被断开。因此才会出现 ISR 变化的同时还会出现连接断开的日志。
Processor 线程 num.network.threads(3),Handler 线程 num.io.threads(8)

生产端发送延迟特别高,吞吐量上不去,但集群负载低

  1. linger.ms=50
  2. batch.size=524288
  3. compression.type=lz4
  4. acks=1(用户要求消息至少要发送到分区 leader
  5. max.request.size=5242880
  6. buffer.memory=268435456
  1. 使用 Kafka 脚本进行压测,正常。
  2. 增大 buffer.memory(32MB),生产者消费缓冲池。那只能存储 8192 条数据。
  3. 增大 batch.size(16KB),客户端消息普遍是 4KB 左右,很容易填满。
  4. 增大 max.request.size(1MB),过于小,导致网络 I/O 次数增多,

    Kafka 重启失败

    通过日志发现大量主题索引文件损坏并且重建索引文件的警告信息。

    Kafka 消息堆积问题

  5. 查看消费者组情况。是否频繁发生分区重平衡。

  6. 消费者组是否经常断线离线。
  7. 消息处理耗时过长。导致心跳丢失。

监控框架

JMXTool,查看 kafka JMX 指标。

Kafka Manager

主题创建、Preferred Leader 选举。

JMXTrans + InfluxDB + Grafana

Confluent Control Center


最重要的集群参数配置

Broker 端主要有 7 方面相关的参数配置,分别是

  • 存储配置:log.dirlog.dirs
  • Broker 连接:listeneradvertised.listenerslistener.security.protocol.map
  • Topic 管理:auto.create.topic.enableunclean.leader.election.enableauto.leader.rebalance.enable
  • 日志留存:log.retention.{}log.retention.bytes、``
  • Topic 级别参数配置:log.retention.mslog.retention.byteslog.message.bytes
  • JVM 参数:-XX:+UseG1GC
  • 操作系统:文件描述符、Socket 发送/接收缓冲区、刷盘时间间隔、文件系统 | 类别 | 参数 | 说明 | 默认值 | 建议值 | | —- | —- | —- | —- | —- | | 存储配置 | log.dirs | 指定 Broker 需要使用的若干个文件目录路径 | /a,/b | 最好保证这些目录挂载到不同的物理磁盘上,有两个好处:① 多块磁盘同时读写有更高的吞吐量。② 能够实现故障转换。Kafka 1.1 引入的强大功能,坏掉磁盘上的数据自动会转移到其它正常的磁盘上,能保证磁盘正常工作。 | | | log.dir | | | | | Broker 连接 | listeners | 监听器,告诉外部连接者要通过什么协议访问指定主机名和端口 | 协议名称: // 主机名:端口号 | | | | advertised.listeners | 宣称的,这组监听器是对 Broker 用于外部发布的 | | 这个值是对外网访问的,如果在内网,则不需要设置,常见的玩法是,一个网卡用于外网,一般网卡用于内网,listeners 为内网,adver…为外网 | | | listener.security.protocol.map | 一旦自定义协议名称,必须指定这个协议到底底层使用了哪种安全协议 | PLAINTEXT、
    SSL、
    SASL_PLAINTEXT | | | Topic 管理 | auto.create.topic.enable | 是否允许自动创建主题 | false | 最好设置为 false,比如可能不小心写错了,创建主题的操作应该由运维严格把控 | | | unclean.leader.election.enable | 是否允许 unclean leader 选举 | false | | | | auto.leader.rebalance.enable | 是否允许定期进行 Leader 选举 | true | true:定期对一些 Topic 分区进行 Leader 重选举,当然,并非无脑触发,而是需要满足一定条件。但是更换 Leader 代价过高,建议生产环境为 false。阈值由 leader.imbalance.per.broker.percentage 配置,默认为 10,表示允许非 preferred-leader 所占的比例,如果在一个 Broker 超过这个阈值,就会触发 Leader 重平衡,将这个分区的 Leader 重新换回到 preferred-replica 中。
    | | 日志留存 | log.retention.{hour|minutes|ms} | 日志留存时间,ms>minutes>hour | 默认值 7 天 | 360(15天) | | | log.retention.bytes | 为消息保存的总磁盘容量大小 | -1 | | | | message.max.bytes | Broker 能够接收的最大消息的大小 | 1048588(1MB) | 这个数值太小了,可以设置大一点,仅仅是衡量 Broker 能够处理的最大消息大小 | | Topic级别 | retention.ms | 规定该 Topic 消息被保存的时长 | 7天 | 一旦设置该值,可以覆盖全局的设置 | | | retention.bytes | 规定该 Topic 预留多大的磁盘空间 | -1 | 一旦设置该值,可以覆盖全局的设置,在多租户环境下有用 | | | max.message.bytes | Broker 能够接收该主题的最大消息大小 | | 我们可以自行设定 | | 关于修改 Topic 参数,我们有两种方式:① 创建 Topic 时进行设置。② 修改 Topic 的设置。
    ./kafka-topic.sh —config retention.ms=15552000000 —config max.message.bytes=5242880 | | | | | | JVM 参数 | export KAKFA_HEAP_OPTS=—Xms6g —Xmx6g
    export KAKFA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC
    -XX:MaxGCPauseMillis=20
    -XX:InitiatingHeapOccupancyPercent=35:当老年代大小占整个堆大小百分比达到该阈值时,会触发一次 mixed GC
    -XX:+ExplicitGCInvokesConcurrent:System.gc() 调用是触发 Full GC,默认是 STW,如果找到这个参数,即并行 GC,可提高 Full GC 效率。
    | | | | | 操作系统 | ulimit -n | 文件描述符限制 | | | | |
    | 文件系统类型 | | ZFS> XFS -> ext4
    Kafka 使用 ZFS 报告 | | |
    | swappiness | 1 | 网卡将 swap 禁用掉以防止 kakfa 使用 swap 空间,但设置一个比 0 大的值反而更好,因为设为 0 时操作系统会会触发 OOM Killer,随机挑选一个进程然后 kill 掉,即根本不给用户任何预警。而设置成一个较小值,当开始使用 swap 空间,你至少能够观测到 Broker 性能开始出现下降,从而给你进一步调优和诊断问题的时间。 | | |
    | 提交时间 | | Flush 落盘时间,Kafka 写入数据是写入 Page Cache 就返回,应用层也会认为成功写入磁盘。随后操作系统会根据 LRU 算法会定期将页缓存上的脏数据落盘到物理磁盘文件中。默认时间是 5S,一般情况下,这个默认时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。有人可能会认为,提高刷新间隔时间会增大数据丢失的风险,但是鉴于在软件层已经提供了多副本冗余机制,因此稍稍提高提交去换取性能是一个较合理的作法。 |