Broker

基础配置

  1. log.dirs:kafka保存消息的目录,存放分区数据。一般不使用默认的,根据消息的数量,单独配置一个或多个目录逗号隔开
  2. zookeeper.connect:zk连接参数,示例:zk1:2181,zk2:2181,zk3:2181/kafka

    Topic相关

  3. auto.create.topics.enable:是否自动创建topic,默认为true

  4. unclean.leader.election.enable:非ISR中的Follow副本是否也能被选举为Leader副本,默认为true,建议修改为false
  5. auto.leader.rebalance.enable:是否允许定期进行 Leader 选举,默认为true,建议修改为false
  6. num.partitions:Topic的分区数量

    副本同步相关

  7. min.insync.replicas:指定 “应答写入成功” 所需要的最低副本数。需要根据业务进行设计,如果设置过大,很可能导致消息频繁发送失败

  8. replica.lag.time.max.ms:指定Follow副本与Leader副本的最大失联时间,超过这个时间,Follow副本就会被踢出ISR

    消息持久化相关

  9. log.retention.{hours|minutes|ms}:控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hours 最低。实际应用中还是hour较多一点

  10. log.retention.bytes:指定 Broker 为消息保存的总磁盘容量大小。默认值-1,代表不限制大小
  11. message.max.bytes:控制 Broker 能够接收的最大消息大小。
  12. log.segment.bytes:指定一个Segment的大小,默认为1G
  13. log.index.interval.bytes:指定了在.log文件写入多少数据,就要在.index文件写一条索引,默认是4KB,即写4kb的数据然后在索引里写1条索引。
  14. log.cleanup.policy:log文件定期处理策略,默认值delete,即定期删除。也可设置为compac,代表定期压缩
  15. log.flush.interval.messages:日志分区中消息数达到多少时,刷新到磁盘 ,默认值为10000
  16. log.flush.interval.ms:日志保存最长多少毫秒被刷新到磁盘,无默认值
  17. log.flush.scheduler.interval.ms:日志刷到磁盘的频率,默认值1000

    Consumer相关

  18. group.min.session.timeout.ms:踢出分组的最小session超时时间

  19. group.max.session.timeout.ms:踢出分组的最大session超时时间

    Producer

  20. bootstrap.servers:Kafka服务端地址,配置格式host1:port1;host2:port2…

  21. acks:Producer如何确认消息已发布成功,包括0|1|-1(ALL)3个值,默认值1。具体参考:Kafka消息的发布与消费原理
  22. key.serializer:消息Key的序列化形式
  23. value.serializer:消息Value的序列化形式
  24. enable.idempotence:是否开启幂等性,默认false。具体参考:Kafka的一些常见问题。需要注意的是,开启幂等后,acks参数必须设置为-1
  25. max.in.flight.requests.per.connection :客户端在单个连接上能够发送的未响应请求的个数。默认值5, 设置此值是1表示kafka broker在响应请求之前client不能再向同一个broker发送请求。
  26. request.timeout.ms:请求响应超时时间,默认值305000
  27. retries:响应失败重试次数,默认值为0
  28. buffer.memory:生产者内存缓冲区大小,默认32M
  29. batch.size:设置批量提交的数据大小,默认是16k,当积压的消息达到这个值的时候就会统一发送(发往同一分区的消息)
  30. linger.ms:这个设置是为发送设置一定是延迟来收集更多的消息,默认大小是0ms(就是有消息就立即发送)
  31. compression.type:默认情况下,消息发送时不会被压缩。该参数可以设置为snappy|gzip|lz4 |zstd,它指定了消息被发送给broker 之前使用哪一种压缩算法进行压缩。

    Consumer

  32. bootstrap.servers:Kafka服务端地址,配置格式host1:port1;host2:port2…

  33. client.id:客户端ID标识
  34. group.id:消费者组
  35. key.descrializer:消息Key的反序列化形式
  36. value.descrializer:消息Value的反序列化形式
  37. session.timeout.ms:Consumer session超时时间,默认为30s。该值必须在group.min.session.timeout.ms和group.max.session.timeout.ms之间
  38. heartbeat.interval.ms:Consumer心跳超时时间,默认为3s,该值必须小于session.timeout.ms
  39. enable.auto.commit:是否周期性的自动提交,默认值为true
  40. auto.commit.interval.ms:自动提交频率,默认值5s
  41. connections.max.idle.ms:链接空闲超时时间,默认值5分钟
  42. max.poll.interval.ms:最大poll消息时间间隔,默认值5分钟,超过该时间即认为当前Consumer不可用
  43. max.poll.records: Consumer每次调用poll()时取到的records的最大数。默认值500
  44. request.timeout.ms:请求响应超时时间,默认值305000