https://kafka.apache.org/documentation/#dynamicbrokerconfigs

  • 在 Dynamic Update Mode 列
    • 该列有 3 类值,分别是 read-only、per-broker 和 cluster-wide
    • read-only。被标记为 read-only 的参数和原来的参数行为一样,只有重启 Broker,才能令修改生效
    • per-broker。被标记为 per-broker 的参数属于动态参数,修改它之后,只会在对应的 Broker 上生效
    • cluster-wide。被标记为 cluster-wide 的参数也属于动态参数,修改它之后,会在整个集群范围内生效,也就是说,对所有 Broker 都生效。你也可以为具体的 Broker 修改 cluster-wide 参数
  • 优先级
    • per-broker 参数 > cluster-wide 参数 > static 参数 > Kafka 默认值

原理

  • Kafka 将动态 Broker 参数保存在 ZooKeeper 中
    • 使用永久节点

image.png

  • changes 是用来实时监测动态参数变更的,不会保存参数值;topics 是用来保存 Kafka 主题级别参数的
    • 虽然它们不属于动态 Broker 端参数,但其实它们也是能够动态变更的
  • usersclients 则是用于动态调整客户端配额(Quota)的 znode 节点
    • 所谓配额,是指 Kafka 运维人员限制连入集群的客户端的吞吐量或者是限定它们使用的 CPU 资源。
  • /config/brokers 才是真正保存动态 Broker 参数的地方。该 znode 下有两大类子节点
    • 第一类子节点就只有一个,它有个固定的名字叫 < default >,保存的是前面说过的 cluster-wide范围的动态参数
    • 另一类则以 broker.id 为名,保存的是特定 Broker 的 per-broker 范围参数。由于是 per-broker 范围,因此这类子节点可能存在多个

**

配置

  • kafka-configs 脚本

    查看

    集群层面

    1. kafka-configs.sh --bootstrap-server kafka-host:port \
    2. --entity-type brokers --entity-default --describe

单体层面

  1. kafka-configs.sh --bootstrap-server kafka-host:port \
  2. --entity-type brokers --entity-name <broker.id> --describe

创建

集群层面

  1. kafka-configs.sh --bootstrap-server kafka-host:port \
  2. --entity-type brokers --entity-default \
  3. --alter --add-config unclean.leader.election.enable=true
  • 如果要设置 cluster-wide 范围的动态参数,需要显式指定 entity-default

单体层面

  1. kafka-configs.sh --bootstrap-server kafka-host:port \
  2. --entity-type brokers --entity-name <broker.id> \
  3. --alter --add-config unclean.leader.election.enable=false

删除

  • 删除动态参数要指定 **delete-config**

    集群层面

    1. kafka-configs.sh --bootstrap-server kafka-host:port \
    2. --entity-type brokers --entity-default \
    3. --alter --delete-config unclean.leader.election.enable

单体层面

  1. kafka-configs.sh --bootstrap-server kafka-host:port \
  2. --entity-type brokers --entity-name <broker.id> \
  3. --alter --delete-config unclean.leader.election.enable

可选参数

  1. kafka-configs.bat
  2. This tool helps to manipulate and describe entity config for a topic, client, user or broker
  3. Option Description
  4. ------ -----------
  5. --add-config <String> Key Value pairs of configs to add.
  6. Square brackets can be used to group
  7. values which contain commas: 'k1=v1,
  8. k2=[v1,v2,v2],k3=v3'. The following
  9. is a list of valid configurations:
  10. For entity-type 'topics':
  11. cleanup.policy
  12. compression.type
  13. delete.retention.ms
  14. file.delete.delay.ms
  15. flush.messages
  16. flush.ms
  17. follower.replication.throttled.
  18. replicas
  19. index.interval.bytes
  20. leader.replication.throttled.replicas
  21. max.compaction.lag.ms
  22. max.message.bytes
  23. message.downconversion.enable
  24. message.format.version
  25. message.timestamp.difference.max.ms
  26. message.timestamp.type
  27. min.cleanable.dirty.ratio
  28. min.compaction.lag.ms
  29. min.insync.replicas
  30. preallocate
  31. retention.bytes
  32. retention.ms
  33. segment.bytes
  34. segment.index.bytes
  35. segment.jitter.ms
  36. segment.ms
  37. unclean.leader.election.enable
  38. For entity-type 'brokers':
  39. advertised.listeners
  40. background.threads
  41. compression.type
  42. follower.replication.throttled.rate
  43. leader.replication.throttled.rate
  44. listener.security.protocol.map
  45. listeners
  46. log.cleaner.backoff.ms
  47. log.cleaner.dedupe.buffer.size
  48. log.cleaner.delete.retention.ms
  49. log.cleaner.io.buffer.load.factor
  50. log.cleaner.io.buffer.size
  51. log.cleaner.io.max.bytes.per.second
  52. log.cleaner.max.compaction.lag.ms
  53. log.cleaner.min.cleanable.ratio
  54. log.cleaner.min.compaction.lag.ms
  55. log.cleaner.threads
  56. log.cleanup.policy
  57. log.flush.interval.messages
  58. log.flush.interval.ms
  59. log.index.interval.bytes
  60. log.index.size.max.bytes
  61. log.message.downconversion.enable
  62. log.message.timestamp.difference.max.
  63. ms
  64. log.message.timestamp.type
  65. log.preallocate
  66. log.retention.bytes
  67. log.retention.ms
  68. log.roll.jitter.ms
  69. log.roll.ms
  70. log.segment.bytes
  71. log.segment.delete.delay.ms
  72. max.connections
  73. max.connections.per.ip
  74. max.connections.per.ip.overrides
  75. message.max.bytes
  76. metric.reporters
  77. min.insync.replicas
  78. num.io.threads
  79. num.network.threads
  80. num.recovery.threads.per.data.dir
  81. num.replica.fetchers
  82. principal.builder.class
  83. replica.alter.log.dirs.io.max.bytes.
  84. per.second
  85. sasl.enabled.mechanisms
  86. sasl.jaas.config
  87. sasl.kerberos.kinit.cmd
  88. sasl.kerberos.min.time.before.relogin
  89. sasl.kerberos.principal.to.local.rules
  90. sasl.kerberos.service.name
  91. sasl.kerberos.ticket.renew.jitter
  92. sasl.kerberos.ticket.renew.window.
  93. factor
  94. sasl.login.refresh.buffer.seconds
  95. sasl.login.refresh.min.period.seconds
  96. sasl.login.refresh.window.factor
  97. sasl.login.refresh.window.jitter
  98. sasl.mechanism.inter.broker.protocol
  99. ssl.cipher.suites
  100. ssl.client.auth
  101. ssl.enabled.protocols
  102. ssl.endpoint.identification.algorithm
  103. ssl.key.password
  104. ssl.keymanager.algorithm
  105. ssl.keystore.location
  106. ssl.keystore.password
  107. ssl.keystore.type
  108. ssl.protocol
  109. ssl.provider
  110. ssl.secure.random.implementation
  111. ssl.trustmanager.algorithm
  112. ssl.truststore.location
  113. ssl.truststore.password
  114. ssl.truststore.type
  115. unclean.leader.election.enable
  116. For entity-type 'users':
  117. SCRAM-SHA-256
  118. SCRAM-SHA-512
  119. consumer_byte_rate
  120. producer_byte_rate
  121. request_percentage
  122. For entity-type 'clients':
  123. consumer_byte_rate
  124. producer_byte_rate
  125. request_percentage
  126. Entity types 'users' and 'clients' may
  127. be specified together to update
  128. config for clients of a specific
  129. user.
  130. --alter Alter the configuration for the entity.
  131. --bootstrap-server <String: server to The Kafka server to connect to. This
  132. connect to> is required for describing and
  133. altering broker configs.
  134. --command-config <String: command Property file containing configs to be
  135. config property file> passed to Admin Client. This is used
  136. only with --bootstrap-server option
  137. for describing and altering broker
  138. configs.
  139. --delete-config <String> config keys to remove 'k1,k2'
  140. --describe List configs for the given entity.
  141. --entity-default Default entity name for
  142. clients/users/brokers (applies to
  143. corresponding entity type in command
  144. line)
  145. --entity-name <String> Name of entity (topic name/client
  146. id/user principal name/broker id)
  147. --entity-type <String> Type of entity
  148. (topics/clients/users/brokers/broker-
  149. loggers)
  150. --force Suppress console prompts
  151. --help Print usage information.
  152. --version Display Kafka version.
  153. --zookeeper <String: urls> REQUIRED: The connection string for
  154. the zookeeper connection in the form
  155. host:port. Multiple URLS can be
  156. given to allow fail-over.

常用的 动态参数 修改

log.retention.ms

修改日志留存时间应该算是一个比较高频的操作,毕竟,我们不可能完美地预估所有业务的消息留存时长。虽然该参数有对应的主题级别参数可以设置,但拥有在全局层面上动态变更的能力,依然是一个很好的功能亮点

num.io.threads 和 num.network.threads。

这是我们在前面提到的两组线程池。就我个人而言,我觉得这是动态 Broker 参数最实用的场景了。毕竟,在实际生产环境中,Broker 端请求处理能力经常要按需扩容。如果没有动态 Broker 参数,我们是无法做到这一点的

与 SSL 相关的参数

主要是 4 个参数(ssl.keystore.type、ssl.keystore.location、ssl.keystore.password 和 ssl.key.password)。允许动态实时调整它们之后,我们就能创建那些过期时间很短的 SSL 证书。每当我们调整时,Kafka 底层会重新配置 Socket 连接通道并更新 Keystore。新的连接会使用新的 Keystore,阶段性地调整这组参数,有利于增加安全性

num.replica.fetchers

这也是我认为的最实用的动态 Broker 参数之一。Follower 副本拉取速度慢,在线上 Kafka 环境中一直是一个老大难的问题。针对这个问题,常见的做法是增加该参数值,确保有充足的线程可以执行 Follower 副本向 Leader 副本的拉取。现在有了动态参数,你不需要再重启 Broker,就能立即在 Follower 端生效,因此我说这是很实用的应用场景