- 和生产者不同的是,构建 KafkaConsumer 实例时是不会创建任何 TCP 连接
创建时机
- TCP 连接是在调用 KafkaConsumer.poll 方法时被创建的
发起 FindCoordinator 请求时
- 该请求是确定管理它的 Coordinator 所在的 Broker,且首次获取元数据
- 发送对象是集群中的任意 Broker
- 优化点是 Consumer 主观上认为的负载最少的 Broker
- 发送对象是集群中的任意 Broker
- 当第三类 TCP 连接创建时,该连接会被代替,之后在定期请求元数据时,都采用第三类 TCP 连接
连接协调者时
- 获得 FindCoordinator 请求的响应后,Consumer 会创建连向 Coordinator 所在的 Broker 的 Socket 连接
- 组协调请求和真正的数据获取请求如何使用不同的 Socket 连接
- 由
Integer.MAX_VALUE
减去协调者所在 Broker 的真实 ID 计算得来的
- 由
消费数据时
- Consumer 会为每个要消费的分区创建与该分区 Leader replication 所在 Broker 连接的 TCP
- 主要看有多少个主分片
关闭时机
主动关闭
- 手动调用
KafkaConsumer.close()
方法,或者是执行Kill
命令,不论是 Kill -2 还是 Kill -9
被动关闭
- Kafka 自动关闭是由消费者端参数
connection.max.idle.ms
控制的- 该参数现在的默认值是 9 分钟,即如果某个 Socket 连接上连续 9 分钟都没有任何请求,那么消费者会强行关闭这个 Socket 连接。
- 当第三类 TCP 连接(消费数据时)成功创建后,消费者程序就会废弃第一类 TCP 连接
- 之后在定期请求元数据时,都采用第三类 TCP 连接