• 和生产者不同的是,构建 KafkaConsumer 实例时是不会创建任何 TCP 连接

**

创建时机

  • TCP 连接是在调用 KafkaConsumer.poll 方法时被创建的

发起 FindCoordinator 请求时

  • 该请求是确定管理它的 Coordinator 所在的 Broker,且首次获取元数据
    • 发送对象是集群中的任意 Broker
      • 优化点是 Consumer 主观上认为的负载最少的 Broker
  • 当第三类 TCP 连接创建时,该连接会被代替,之后在定期请求元数据时,都采用第三类 TCP 连接

连接协调者时

  • 获得 FindCoordinator 请求的响应后,Consumer 会创建连向 Coordinator 所在的 Broker 的 Socket 连接
  • 组协调请求和真正的数据获取请求如何使用不同的 Socket 连接
    • Integer.MAX_VALUE 减去协调者所在 Broker 的真实 ID 计算得来的

消费数据时

  • Consumer 会为每个要消费的分区创建与该分区 Leader replication 所在 Broker 连接的 TCP
    • 主要看有多少个主分片

关闭时机

主动关闭

  1. 手动调用 KafkaConsumer.close() 方法,或者是执行 Kill 命令,不论是 Kill -2 还是 Kill -9

被动关闭

  1. Kafka 自动关闭是由消费者端参数 connection.max.idle.ms控制的
    1. 该参数现在的默认值是 9 分钟,即如果某个 Socket 连接上连续 9 分钟都没有任何请求,那么消费者会强行关闭这个 Socket 连接。
  2. 当第三类 TCP 连接(消费数据时)成功创建后,消费者程序就会废弃第一类 TCP 连接
    1. 之后在定期请求元数据时,都采用第三类 TCP 连接