日话题:【2022-02-28 FLINK 系列 】

1. 在你们生产环境上,Flink 如何解决作业资源随着需求的变化而动态调整的?

一部分业务是将 Flink 任务运行在 K8S 容器上的,使用 Flink Reactive Mode 响应模式,将 Flink 集群部署到 K8S 容器中,通过 K8S 的 Pod 水平自动扩缩容组件来实现 Flink 作业资源 TaskManager 的动态改变。 K8S 的 Pod 水平自动缩放组件会监控所有 TaskManager pods 的 CPU 负载来相应的调整副本因子(replication factor)。当 CPU 负载升高时,autoscaler 会增加 TaskManager 资源来平摊压力;当负载降低时,autoscaler 会减少 TaskManager 资源。

背后原理 首先,在 Flink 1.14.0 最新版本中依旧无法做到 Job 在运行时动态调整并行度 不经历重启 直接拉起新的 Task 实例运行,目前都是基于重启恢复机制来实现的,因为涉及到状态管理。 在资源管理中,资源的获取包含两种模式:

  • Active 模式:主动式,Flink 可以主动申请、释放资源(通过与资源管理框架集成,如 Yarn、Mesos)。
  • Reactive 模式:被动响应式,该模式由外部系统来分配,释放资源,Flink 只是简单地对可用资源进行响应。(这种模式对于基于容器环境相当有意义,如 K8S )

现在 Flink 的资源管理模式是 Active 模式,由用户明确指定 Job 需要的资源,JobMaster 会向 ResourceManager 去请求用户指定的全量资源,当 ResourceManager 无法满足 JobMaster 的申请需求时,该 job 将无法正常启动,它会陷入失败重启->再次尝试申请的循环。 为了能够同时支持 Active/Reactive 这两种模式,Flink 1.13.0 版本引入声明式资源管理设计。 该功能与原先最大差别在于:JobMaster 不再去逐个请求 Slot,而是声明需要的资源情况,对资源要求是弹性范围的,不是固定的。 弹性资源包含四件套:(min, target, max, rs),每个元素的含义如下:

  • min : 执行 Job 的最小资源。
  • target : JobMaster 期望获得的常规资源需求,它是可变的,这是扩缩容的主要触发机制。
  • max : 最大资源期望。
  • rs : resource spec(资源描述信息)。

2. 模型推理过程中要是单个模型过大,如何发送到 kafka?

在 kafka 中,默认单条消息最大为 1 M,当单条消息长度超过 1 M 时,就会出现发送到 broker 失败,从而导致消息在 producer 的队列中一直累积,直到撑爆生产者的内存。所以在实际生产环境中,由于我们的单个模型是 5 M 多,所以刚开始出现上述的 发送到 broker 失败问题。 解决办法,只往 kafka 中发送消息体的 ID、 Location、Name,这样 消费者消费数据时,直接根据对应ID、Location 可以消费到模型。

3. Flink 集群的资源使用率是多少?资源使用率过低的话,K8S 应该怎么处理?

主要分离线作业资源使用率和流式作业资源使用率。根据具体公司的情况回复,比如:离线资源使用达到60%,流式作业资源使用达到40%。流式资源使用率过低,这时 K8S 会根据自动扩缩容机制,减少 Pod 节点,降低负载,降低 CPU,资源的利用率。