创建

  1. KafkaProducer 实例创建时启动 Sender 线程,从而创建与bootstrap.servers 中所有 Broker 的 TCP 连接。
    • 连接 bootstrap.servers 参数指定的所有 Broker
      • Producer bootstrap.servers 指定的 Broker 中的某一台发送 METADATA 请求,尝试获取集群的元数据信息,故没必要为 bootstrap.servers 指定所有的 Broker。
    • 这种设计似乎不太好,在对象构造器中启动线程会造成 this 指针的逃逸。理论上,Sender 线程完全能够观测到一个尚未构造完成的 KafkaProducer 实例。
  2. KafkaProducer 实例更新元数据信息之后,还会再次创建与集群中所有 Broker 的 TCP 连接。
    1. Producer 更新集群元数据信息的两个场景。
      1. 场景一:当 Producer 尝试给一个不存在的主题发送消息时,Broker 会告诉 Producer 说这个主题不存在。此时 Producer 会发送 METADATA 请求给 Kafka 集群,去尝试获取最新的元数据信息。
      2. 场景二:Producer 通过 metadata.max.age.ms 参数定期地去更新元数据信息。该参数的默认值是 300000,即 5 分钟,也就是说不管集群那边是否有变化,Producer 每 5 分钟都会强制刷新一次元数据以保证它是最及时的数据。
  3. 如果 Producer 端发送消息到某台 Broker 时发现没有与该 Broker 的 TCP 连接,那么也会立即创建连接。

关闭

  1. Producer 调用 close()
  2. 如果设置 Producer 端 connections.max.idle.ms 参数
    1. 默认 9分钟
    • 大于 0,则连接所有 Broker 中创建的 TCP 连接如果处于空闲会被自动关闭
    • -1,那么 TCP 连接将无法被关闭,即使 Broker 端使用 keepalive,但是发起者是 client,会导致这些连接变成 CLOSE_WAIT 连接