创建
- KafkaProducer 实例创建时启动 Sender 线程,从而创建与
bootstrap.servers
中所有 Broker 的 TCP 连接。- 连接
bootstrap.servers
参数指定的所有 Broker- Producer
bootstrap.servers
指定的 Broker 中的某一台发送 METADATA 请求,尝试获取集群的元数据信息,故没必要为 bootstrap.servers 指定所有的 Broker。
- Producer
- 这种设计似乎不太好,在对象构造器中启动线程会造成 this 指针的逃逸。理论上,Sender 线程完全能够观测到一个尚未构造完成的 KafkaProducer 实例。
- 连接
- KafkaProducer 实例更新元数据信息之后,还会再次创建与集群中所有 Broker 的 TCP 连接。
- Producer 更新集群元数据信息的两个场景。
- 场景一:当 Producer 尝试给一个不存在的主题发送消息时,Broker 会告诉 Producer 说这个主题不存在。此时 Producer 会发送 METADATA 请求给 Kafka 集群,去尝试获取最新的元数据信息。
- 场景二:Producer 通过
metadata.max.age.ms
参数定期地去更新元数据信息。该参数的默认值是 300000,即 5 分钟,也就是说不管集群那边是否有变化,Producer 每 5 分钟都会强制刷新一次元数据以保证它是最及时的数据。
- Producer 更新集群元数据信息的两个场景。
- 如果 Producer 端发送消息到某台 Broker 时发现没有与该 Broker 的 TCP 连接,那么也会立即创建连接。
关闭
- Producer 调用
close()
- 如果设置 Producer 端
connections.max.idle.ms
参数- 默认 9分钟
- 大于
0
,则连接所有 Broker 中创建的 TCP 连接如果处于空闲会被自动关闭 -1
,那么 TCP 连接将无法被关闭,即使 Broker 端使用 keepalive,但是发起者是 client,会导致这些连接变成CLOSE_WAIT
连接