1. 影响Flink性能较大的几个配置参数

版本:Apache Flink 1.11.1
jobmanager.memory.process.size: 1600m JobManager的总进程内存大小。这占用了JobManager进程中的所有内存使用,包括JVM元空间和其他开销。
taskmanager.memory.process.size:1728m 当前taskmanager整个进程的总内存大小,包括堆内存和堆外内存(状态存放在堆外内存)
如果想要排除JVM元空间和开销,使用Flink内存大小,不建议与taskmanager.memory.process.size同时设置
taskmanager.memory.flink.size: 1280m
taskmanager.numberOfTaskSlots: 1 任务槽,表示当前Task最大能同时执行的线程数量(并行的最大能力),执行一个独立任务或线程所需要计算资源的最小资源
parallelism.default: 1 并行度,表示真正执行时并行的个数
Slot 和 parallelism 的关系?
Slot 是指 TaskManager 最大能并发执行的能力。parallelism 是指 TaskManager 实际使用的并发能力。也是 Flink-Job 的实际并发能力。
并行度一定比任务槽小吗?不一定,并行度是默认值,是首先在别的地方没有配置的时候用这个,然后任务槽是针对一个tsakmanager来说的,只需要并行度 < slot * tsakmanager的个数
并行度的优先级(代码中设置 > 提交Job时的设置 > 配置文件中的设置)
taskmanager.memory.network.fraction: 指定任务管理器为排序、散列表和中间结果的缓存所保留的相对内存量。由于不同的程序可能需要不同大小的内存来缓存中间结果,因此taskmanager.memory.fraction值过小会导致内存缓冲区过小而无法缓存中间结果,从而显著降低性能。
taskmanager.memory.network.min 最小值 网络缓冲区的最小内存大小
taskmanager.memory.network.max 最大值 网络缓冲区的最大内存大小
akka.framesize 指定在JobManager和TaskManager之间发送的消息的最大值。
taskmanager.network.netty.sendReceiveBufferSize Netty发送和接收缓冲区大小。

2. Flink配置参数优化的实际应用

3. Flink集群的JobManager和TaskManager

作业管理器(JobManager)
JobManager是集群中的master,它是整个集群的协调者,负责接收Flink Job,协调检查点,故障恢复等,同时管理着TaskManager。

  1. JobManager接收待执行的应用程序,这个应用程序包含一个JobGraph和Jar包(包含所需要的类、库和其它资源)
  2. JobManger将JobGraph转成ExecutionGraph,ExecutionGraph包含可以并发执行的tasks
  3. JobManager 向 ResourceManager 申请需要的资源(TaskManager slots),一旦分配到足够的slots,则分发 tasks 到 TaskManager 执行。
  4. 在运行过程中, JobManager会负责所有需要中央协调的操作,比如说检查点(checkpoints) 的协调。

任务管理器(TaskManager)
TaskManager是实际负责执行计算的Worker,在其上执行Flink Job的一组Task,每个TaskManager负责管理其所在节点上的资源信息,如内存、磁盘、网络,在启动的时候将资源的状态向JobManager汇报。

  1. 启动之后,TaskManager 向 ResourceManager 注册 slots 数,当接收到 ResourceManager 的分配通知后,会向 JobManager 提供一个或多个slots。
  2. JobManager 将 tasks 分配到 slots 执行。
  3. 执行期间,不同的 TaskManager 之间会进行数据交换

资源管理器(ResourceManager)

  1. 主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger 插槽是Flink中定义的处理资源单元
  2. Flink为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s,以及standalone部署。
  3. 当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。如果ResourceManager没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。

    4. Flink对于迟到数据和乱序数据怎么处理

    Flink中的 WaterMark 和 Window 机制解决了流式数据的乱序问题。
    对于因为延迟而顺序有误的数据,可以根据 eventTime 进行业务处理,主要办法是给定一个允许延迟的时间(WaterMark),在该时间范围内仍可接受处理延迟数据。到一个时间戳达到了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口 。

  4. watermark先处理短时间内的乱序数据

  5. 先输出一个近似正确的结果,再等待一段时间,来一个数据更新一个
  6. 窗口关了,window特殊处理

举一个学生秋游坐车的例子,预先订好9:00发车,但是有很多学生是9:00到9:01之间到达,因此将WaterMark 设置为9:01,等到9:01再关闭车门(关闭窗口)。这时候如果还有人迟到,就先往前开,等到学生来一个开车门上一个学生。一段时间后,发车。如果还有学生迟到,就坐下一辆车或者特殊处理一下。(可以先记录下来,后面再把缺失数据补上)
如果数据延迟非常严重呢?只使用WaterMark可以处理吗?那应该怎么解决?
使用 WaterMark+ EventTimeWindow 机制可以在一定程度上解决数据乱序的问题,但是,WaterMark 水位线也不是万能的,在某些情况下,数据延迟会非常严重,即使通过Watermark + EventTimeWindow也无法等到数据全部进入窗口再进行处理,因为窗口触发计算后,对于延迟到达的本属于该窗口的数据,Flink默认会将这些延迟严重的数据进行丢弃
那么如果想要让一定时间范围的延迟数据不会被丢弃,可以使用Allowed Lateness(允许迟到机制/侧道输出机制)设定一个允许延迟的时间和侧道输出对象来解决

即使用WaterMark + EventTimeWindow + Allowed Lateness方案(包含侧道输出),可以做到数据不丢失。
watermark 的设定
• 在 Flink 中,watermark 由应用程序开发人员生成,这通常需要对相应的领域有一定的了解
• 如果watermark设置延迟太久,收到结果速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果
• 而如果watermark到达得太早,则可能收到错误结果,不过 Flink 处理迟到数据的机制可以解决这个问题

5. Flink中的状态存储

Flink中由一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态,可以认为状态就是一个本地变量,可以被任务的业务逻辑访问 ,状态存放在本地内存。
Flink 内置的很多算子,包括源 source,数据存储 sink 都是有状态的。在 Flink 中,状态始终与特定算子相关联。Flink 会以 checkpoint 的形式对各个任务的状态进行快照,用于保证故障恢复时的状态一致性。
Flink在做计算的过程中经常需要存储中间状态,来避免数据丢失和状态恢复。状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(state backend),状态后端主要负责两件事:本地的状态管理,以及将检查点(checkpoint)状态写入远程存储。
Flink提供了三种状态后端:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。

  • MemoryStateBackend:内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在Taskmanager 的 JVM 堆上,而将 checkpoint 存储在 JobManager 的内存中。特点是:快速,低延迟,但不稳定,如果JobManager挂掉
  • FsStateBackend:将 checkpoint 存到远程的持久化文件系统(FileSystem)上,对于本地状态,跟 MemoryStateBackend 一样,也会存在 TaskManager 的 JVM 堆上。 特点:同时拥有内存级的本地访问速度和很好的容错保证,但如果状态特别大,会出现OOM,这个时候可以选择扩容,如果扩不了容,选择RocksDB
  • RocksDBStateBackend:将所有状态序列化后,存入本地的 RocksDB 中。 特点:访问速度慢,但不会出现OOM问题


Flink跟其他的流计算引擎相比,最突出或者做的最好的就是状态的管理。什么是状态呢?
比如我们在平时的开发中,需要对数据进行count,sum,max等操作,这些中间的结果(即是状态)是需要保存的,因为要不断的更新,这些值或者变量就可以理解为是一种状态,拿读取kafka为例,我们需要记录数据读取的位置(即是偏移量),并保存offest,这时offest也可以理解为是一种状态。

6. Flink 和 Spark Streaming的区别

Flink是标准的实时处理引擎,而Spark Streaming 是微批(micro-batching)的模型。

  • 数据模型

– spark 采用 RDD 模型,spark streaming 的 DStream 实际上也就是一组小批数据 RDD 的集合
– flink 基本数据模型是数据流,以及事件(Event)序列 (不存在数据集)

  • 运行时架构

– spark 是批计算,将 DAG 划分为不同的 stage,一个完成后才可以计算下一个
– flink 是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节点进行处理

  • 时间机制

– Spark Streaming 只支持处理时间
– Flink 支持了流处理程序的三个时间语义:处理时间、事件时间、注入时间。同时也支持 watermark机制来处理滞后数据

  • 任务调度

– Spark Streaming 连续不断的生成微小的数据批次,构建有向无环图DAG,Spark Streaming 会依次创建 DStreamGraph、JobScheduler。
– Flink 根据用户提交的代码生成 StreamGraph,经过优化生成 JobGraph,然后提交给 JobManager进行处理,JobManager 会根据 JobGraph 生成 ExecutionGraph,ExecutionGraph 是 Flink 调度最核心的数据结构,JobManager 根据 ExecutionGraph 对 Job 进行调度,根据物理执行图部署到Taskmanager上形成具体的Task执行。

  • 容错机制

对于 Spark Streaming 任务,我们可以设置 checkpoint,然后假如发生故障并重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰好一次处理语义。
– Flink 则使用两阶段提交协议来解决这个问题。
为什么使用 Flink 替代 Spark?
解答:主要考虑的是 flink 的低延迟、高吞吐量和对流式数据应用场景更好的支持;另外,flink 可以很好地处理乱序数据,而且可以保证 exactly-once 的状态一致性。

介绍两个框架的主要区别:
1)从流处理的角度来讲,Spark基于微批量处理,把流数据看成是一个个小的批处理数据块分别处理,所以延迟性只能做到秒级。而Flink基于每个事件处理,每当有新的数据输入都会立刻处理,是真正的流式计算,支持毫秒级计算。由于相同的原因,Spark只支持基于时间的窗口操作(处理时间或者事件时间),而Flink支持的窗口操作则非常灵活,不仅支持时间窗口,还支持基于数据本身的窗口(另外还支持基于time、count、session,以及data-driven的窗口操作),开发者可以自由定义想要的窗口操作。
2)从SQL 功能的角度来讲,Spark和Flink分别提供SparkSQL和Table APl提供SQL
3)交互支持。两者相比较,Spark对SQL支持更好,相应的优化、扩展和性能更好,而Flink在SQL支持方面还有很大提升空间。
4)从迭代计算的角度来讲,Spark对机器学习的支持很好,因为可以在内存中缓存中间计算结果来加速机器学习算法的运行。但是大部分机器学习算法其实是一个有环的数据流,在Spark中,却是用无环图来表示。而Flink支持在运行时间中的有环数据流,从而可以更有效的对机器学习算法进行运算。
5)从相应的生态系统角度来讲,Spark的社区无疑更加活跃。Spark可以说有着Apache旗下最多的开源贡献者,而且有很多不同的库来用在不同场景。而Flink由于较新,现阶段的开源社区不如Spark活跃,各种库的功能也不如Spark全面。但是Flink还在不断发展,各种功能也在逐渐完善。

6. Flink 三种时间语义

问题:Flink 三种时间语义是什么,分别说出应用场景?
1. Event Time:事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳。场景:这是实际应用最常见的时间语义
2. Processing Time:执行操作算子的本地系统时间,与机器相关。场景:没有事件时间的情况下,或者对实时性要求超高的情况下。
3. Ingestion Time:数据进入Flink的时间 。场景:存在多个 Source Operator 的情况下,每个 Source Operator 可以使用自己本地系统时钟指派 Ingestion Time。后续基于时间相关的各种操作, 都会使用数据记录中的 Ingestion Time。

7. Flink checkpoint 与 spark 比较

问题:Flink 的 checkpoint 机制对比 spark 有什么不同和优势?
解答:spark streaming 的 checkpoint 仅仅是针对 driver 的故障恢复做了数据 和元数据的 checkpoint。而 flink 的 checkpoint 机制要复杂了很多,它采用的是轻量级的分布式快照,实现了每个算子的快照,及流动中的数据的快照。
参见链接:https://cloud.tencent.com/developer/article/1189624

问题:Flink 分布式快照 Checkpoint 的原理是什么?
解答: Flink的分布式快照是根据Chandy-Lamport算法量身定做的。简单来说就是持续创建分布式数据流及其状态的一致快照。 核心思想:是在 input source 端插入 barrier,控制 barrier 的同步 (分界线对齐)来实现 snapshot 的备份 和 exactly-once 语义。

问题:Flink是怎么保证容错恢复的时候保证数据没有丢失也没有数据的冗余呢?
checkpoint是使Flink 能从故障恢复的一种内部机制。检查点是 Flink 应用状态的一个一致性副本,包括了输入的读取位点。在发生故障时,Flink 通过从检查点加载应用程序状态来恢复,并从恢复的读取位点继续处理,就好像什么事情都没发生一样。Flink的状态存储在Flink的内部,这样做的好处就是不再依赖外部系统,降低了对外部系统的依赖。在Flink的内部。通过自身的进程去访问状态变量。同时会定期的做checkpoint持久化。把checkpoint存储在一个分布式的持久化系统中。如果发生故障。就会从最近的一次checkpoint中将整个流的状态进行恢复。

8. Flink exactly-once 的保证

问题:如果下级存储不支持事务,Flink 怎么保证 exactly-once?
端到端的 exactly-once 对 sink 要求比较高,具体实现主要有幂等写入和事务性写入两种方式。幂等写入的场景依赖于业务逻辑,更常见的是用事务性写入。而事务性写入又有预写日志(WAL)和两阶段提交(2PC)两种方式。如果外部系统不支持事务,那么可以用预写日志的方式,把结果数据先当成状态保存,然后在收到 checkpoint 完成的通知时,一次性写入 sink 系统。

问题:Flink 中 exactly-once 语义是如何实现的,状态是如何存储的?
Flink 依靠 checkpoint 机制来实现 exactly-once 语义,如果要实现端到端 的 exactly-once,还需要外部 source 和 sink 满足一定的条件。
比如:Flink+Kafka 端到端状态一致性的保证
• 内部 —— 利用 checkpoint 机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性
• source —— kafka consumer 作为 source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性
• sink —— kafka producer 作为sink,采用两阶段提交 sink,需要实现一个 TwoPhaseCommitSinkFunction

状态的存储通过状态后端来管理,Flink 中可以配置不同的状态后端。

下面通过Flink从Kafka中获取数据,来说下怎么管理offest实现exactly-once的。
Apache Flink中实现的Kafka消费者是一个有状态的算子(operator),它集成了Flink的检查点机制,它的状态是所有Kafka分区的读取偏移量。当一个检查点被触发时,每一个分区的偏移量都被存到了这个检查点中。Flink的检查点机制保证了所有operator task的存储状态都是一致的。这里的“一致的”意思是它们存储的状态都是基于相同的输入数据。当所有的operator task成功存储了它们的状态,一个检查点才算完成。因此,当从潜在的系统故障中恢复时,系统提供了excatly-once的状态更新语义。

9. Flink 怎么做压力测试和监控?

我们一般碰到的压力来自以下几个方面:
一,产生数据流的速度如果过快,而下游的算子消费不过来的话,会产生背压。 背压的监控可以使用 Flink Web UI(localhost:8081) 来可视化监控 Metrics,一旦报警就能知道。一般情况下背压问题的产生可能是由于 sink 这个操作符没有优化好,做一下优化就可以了。比如如果是写入 ElasticSearch, 那么可以改成批量写入,可以调大 ElasticSearch 队列的大小等等策略。
二,设置 watermark 的最大延迟时间这个参数,如果设置的过大,可能会造成内存的压力。可以设置最大延迟时间小一些,然后把迟到元素发送到侧输出流中去。晚一点更新结果。或者使用类似于 RocksDB 这样的状态后端, RocksDB 会开辟堆外存储空间,但 IO 速度会变慢,需要权衡。
三,还有就是滑动窗口的长度如果过长,而滑动距离很短的话,Flink 的性能会下降的很厉害。我们主要通过时间分片的方法,将每个元素只存入一个“重叠窗口”,这样就可以减少窗口处理中状态的写入。
参见链接: https://www.infoq.cn/article/sIhs_qY6HCpMQNblTI9M
四,状态后端使用 RocksDB,还没有碰到被撑爆的问题。
反压的危害
反压如果不能得到正确的处理,可能会影响到 checkpoint 时长和 state 大小,甚至可能会导致资源耗尽甚至系统崩溃。

  • 1)影响 checkpoint 时长:barrier 不会越过普通数据,数据处理被阻塞也会导致checkpoint barrier 流经整个数据管道的时长变长,导致 checkpoint 总体时间(End to End Duration)变长。
  • 2)影响 state 大小:barrier 对齐时,接受到较快的输入管道的 barrier 后,它后面数据会被缓存起来但不处理,直到较慢的输入管道的 barrier 也到达,这些被缓存的数据会被放到 state 里面,导致 checkpoint 变大。

这两个影响对于生产环境的作业来说是十分危险的,因为 checkpoint 是保证数据一致性的关键, checkpoint 时间变长有可能导致 checkpoint 超时失败,而 state 大小同样可能拖慢 checkpoint 甚至导致 OOM (使用 Heap-based StateBackend)或者物理内存使用超出容器资源(使用RocksDBStateBackend)的稳定性问题。

Flink是如何处理反压的?
Flink 内部是基于 producer-consumer 模型来进行消息传递的,Flink的反压设计也是基于这个模型。Flink 使用了高效有界的分布式阻塞队列,就像 Java 通用的阻塞队列(BlockingQueue)一样。下游消费者消费变慢,上游就会受到阻塞。

10. CEP

问题:Flink CEP 编程中当状态没有到达的时候会将数据保存在哪里?
在流式处理中,CEP 当然是要支持 EventTime 的,那么相对应的也要支持数据的迟到现象,也就是 watermark 的处理逻辑。CEP 对未匹配成功的事件序列的处理,和迟到数据是类似的。在 Flink CEP 的处理逻辑中,状态没有满足的和迟到的数据,都会存储在一个 Map 数据结构中,也就是说,如果我们限定判断事件序列的时长为 5 分钟,那么内存中就会存储 5 分钟的数据,这在我看来,也是对内存的极大损伤之一。

11. 实际场景问题

问题:怎么去重?考虑一个实时场景:双十一场景,滑动窗口长度为 1 小时,滑动距离为 10 秒钟,亿级用户,怎样计算 UV?
解答:使用类似于 scala 的 set 数据结构或者 redis 的 set 显然是不行的,因为可能有上亿个 Key,内存放不下。所以可以考虑使用布隆过滤器(Bloom Filter)来去重。

问题:Flink 程序在面对数据高峰期时如何处理?
解答:使用大容量的 Kafka 把数据先放到消息队列里面作为数据源,再使用 Flink 进行消费,不过这样会影响到一点实时性。

12. Spark SQL与Flink SQL的本质区别,各自的优缺点以及适用场景。

Spark SQL 的核心是Catalyst优化器,首先将SQL处理成未优化过的逻辑计划(Unresolved Logical Plan),其只包括数据结构,不包含任何数据信息。然后通过解析,形成解析后的逻辑计划([Analyzed] Logical Plan),这里节点上就会绑定各种信息。通过优化规则,形成优化后的逻辑计划(Optimized Logical Plan),这里会对一些低效的逻辑计划进行转换。逻辑计划之后,会进行物理执行就计划,物理计划阶段会将逻辑计划生成的子树进行进一步转化生成物理算子树,物理算子树上的节点会直接生成RDD或对RDD进行转化(transformation/execute)操作。
Flink SQL 是Fllink提供的SQL的SDK API。SQL是比Table更高阶的API,集成在Table library中提供,在流和批上都可以用此API开发业务。

  • 其完全依靠calcite(sql parser)去做语法解析,
  • validate后生成calcite logical plan.
  • 而Table API先自己生成table API的logical plan,再通过calcite relbuilder translation成calcite logical plan。
  • 使用calcite cost-based optimizor 进行优化。

也就是说和spark不同, flink 的SQL Parsing, Analysing, Optimizing都是托管给calcite(flink会加入一些optimze rules). Calcite 会基于优化规则来优化这些 Logical Plan,根据运行环境的不同会应用不同的优化规则(Flink提供了批的优化规则,和流的优化规则)。Calcite提供的内置优化规则(如条件下推,剪枝等),再基于flink定制的一些优化rules(根据是streaming还是batch选择rulue)去优化logical Plan。生成phsyical plan,基于flink里头的rules生成了DataStream Plan(Physical Plan)。

逻辑和spark类似,只不过calcite做了catalyst的事(sql parsing,analysis和optimizing)

Spark SQL是一个用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了关于数据结构和正在执行的计算的更多信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。有几种与Spark SQL交互的方法,包括SQL和数据集API。
在计算结果时,使用相同的执行引擎,而不依赖于用于表示计算的API/语言。这种统一意味着开发人员可以很容易地在不同的api之间来回切换,这些api提供了最自然的方式来表达给定的转换。
Spark SQL的一个用途是执行SQL查询。Spark SQL还可以用于从现有Hive安装中读取数据。

Spark SQL 的优点:

  1. 易整合:将sql查询与spark程序无缝混合,可以使用java、scala、python、R等语言的API操作。
  2. 统一的数据访问 :以相同的方式连接到任何数据源( 包括hive、 json、 parquet、 jdbc等等)。
  3. 兼容hive.
  4. 标准的数据连接.
  5. 高效低延迟,查询速度快.

Spark SQL 的缺点

  1. 数据量过大(1T以上)时没有MapReduce并行处理的效果好;
  2. Spark是基于内存的,对内存要求比较高,程序设计不合理或处理数据庞大容易造成OOM异常.
  3. 少量的Hive高级查询在Spark SQL中是不支持的.

    Apache Flink是一个用于分布式流和批处理数据的开源平台。Flink的核心是一个流数据流引擎,它为数据流上的分布式计算提供数据分发、通信和容错功能。
    Table API是用于流和批处理的统一关系API。表API查询可以在批量或流式输入上运行而无需修改。Table API是SQL语言的超级集合,专门用于Apache Flink。Table API是Scala和Java的语言集成API。Table API查询不是像SQL中常见的那样将查询指定为String值,而是在Java或Scala中以嵌入语言的样式定义,并支持自动完成和语法验证等IDE支持。
    Flink SQL 的优缺点:
    Flink SQL优点:易于平台化、开发效率高、维度成本低等、查询速度快。
    Flink SQL缺陷:1. 语法和Hive SQL有一定区别;2. 自定义函数UDF不如Hive丰富,写UDF的频率高于 Hive。

    SparkSQL 的主要适用场景是处理结构化数据(较为规范的半结构化数据也可以处理)。

13. Flink SQL解析过程

Apache Calcite是一个动态数据管理框架 ,它具备很多典型数据库管理系统的功能,如SQL解析、SQL校验、SQL查询优化等,又省略了一些功能,如不存储相关数据,也不完全包含相关处理数据等。
Flink中的sql解析、sql校验和sql优化便是依赖calcite来完成的。
image.png

  1. 通过Parser解析器将传入的sql解析成一颗词法树,SqlNode作为树的节点
  2. 做词法的校验Validate,类型校验,元数据校验等等
  3. 将校验好的SqlNode树转换成对应的关系代数表达式,也是一颗树,RelNode作为节点
  4. 将RelNode关系代数表达式树,通过内置的两种优化器Volcano , Hep 优化关系代数表达式得到最优逻辑代数的一颗树,也是RelNode
  5. 最优的逻辑代数表达式(RelNode),会被转换成对应的可执行的物理执行计划(转换逻辑根据框架有所同),像Flink就转成他的Operator去运行


14. Flink消费kafka分区的数据时flink任务并行度之间的关系

  1. Source 端并行度的配置

数据源端是 Kafka,Source 的并行度设置为 Kafka 对应 Topic 的分区数。
如果已经等于 Kafka 的分区数,消费速度仍跟不上数据生产速度,考虑下 Kafka 要扩大分区,同时调大并行度等于分区数。
Flink 的一个并行度可以处理一至多个分区的数据,如果并行度多于 Kafka 的分区数,那么就会造成有的并行度空闲,浪费资源。

  1. Transform 端并行度的配置

➢ Keyby 之前的算子
一般不会做太重的操作,都是比如 map、filter、flatmap 等处理较快的算子,并行度可以和 source 保持一致。
➢ Keyby 之后的算子
如果并发较大,建议设置并行度为 2 的整数次幂,例如:128、256、512;小并发任务的并行度不一定需要设置成 2 的整数次幂;大并发任务如果没有 KeyBy,并行度也无需设置为 2 的整数次幂;

  1. Sink 端并行度的配置

Sink 端是数据流向下游的地方,可以根据 Sink 端的数据量及下游的服务抗压能力进行评估。如果 Sink 端是 Kafka,可以设为 Kafka 对应 Topic 的分区数。
Sink 端的数据量小,比较常见的就是监控告警的场景,并行度可以设置的小一些。
Source 端的数据量是最小的,拿到 Source 端流过来的数据后做了细粒度的拆分,数据量不断的增加,到 Sink 端的数据量就非常大。那么在 Sink 到下游的存储中间件的时候就需要提高并行度。
另外 Sink 端要与下游的服务进行交互,并行度还得根据下游的服务抗压能力来设置,如果在 Flink Sink 这端的数据量过大的话,且 Sink 处并行度也设置的很大,但下游的服务完全撑不住这么大的并发写入,可能会造成下游服务直接被写挂,所以最终还是要在 Sink 处的并行度做一定的权衡。

15. 如何动态修改Flink的配置,前提是Flink不能重启

Flink支持不同的重启策略,这些重启策略控制着job失败后如何重启

  1. 固定延迟重启策略:固定延迟重启策略会尝试一个给定的次数来重启Job,如果超过了最大的重启次数,Job最终将失败。在连续的两次重启尝试之间,重启策略会等待一个固定的时间。
  2. 失败率重启策略:失败率重启策略在Job失败后会重启,但是超过失败率后,Job会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。
  3. 无重启策略:Job直接失败,不会尝试进行重启。

重启策略可以在flink-conf.yaml中配置,表示全局的配置。也可以在应用代码中动态指定,会覆盖全局配置
如果没有启用 checkpointing,则使用无重启 (no restart) 策略。如果启用了 checkpointing,但没有配置重启策略,则使用固定间隔 (fixed-delay) 策略

Flink 作业的配置一般都是通过在作业启动的时候通过参数传递的,或者通过读取配置文件的参数,在作业启动后初始化了之后如果再想更新作业的配置一般有两种解决方法:

  1. 改变启动参数或者改变配置文件,重启作业,让作业能够读取到修改后的配置
  2. 通过读取配置流(需要自定义 Source 读取配置),然后流和流连接起来
  3. 读取配置信息,从mysql或者redis
  4. Flink的广播机制 , 将一个流中的数据(通常是较少量的数据)广播到下游算子的所有并发实例中,实现真正的低延迟动态更新。


16. 如何确定Flink任务的合理并行度?

Flink任务并行度合理行一般根据峰值流量进行压测评估,并且根据集群负载情况留一定量的buffer资源。
1.如果数据源已经存在,则可以直接消费进行测试
2.如果数据源不存在,需要自行造压测数据进行测试
对于一个Flink任务来说,一般可以按照以下方式进行细粒度设置并行度:

  1. source并行度配置:以kafka为例,source的并行度一般设置为kafka对应的topic的分区数
  2. transform(比如flatmap、map、filter等算子)并行度的配置:这些算子一般不会做太重的操作,并行度可以和source保持一致,使得算子之间可以做到forward传输数据,不经过网络传输
  3. keyby之后的处理算子:建议最大并行度为此算子并行度的整数倍,这样可以使每个算子上的keyGroup是相同的,从而使得数据相对均匀shuffle到下游算子
  4. sink并行度的配置:sink是数据流向下游的地方,可以根据sink的数据量及下游的服务抗压能力进行评估。如果sink是kafka,可以设为kafka对应topic的分区数。注意sink并行度最好和kafka partition成倍数关系,否则可能会出现如到kafka partition数据不均匀的情况。但是大多数情况下sink算子并行度不需要特别设置,只需要和整个任务的并行度相同就行。

    17. Flink数据倾斜

    1. 判断是否存在数据倾斜
    通过 Flink Web UI 可以精确地看到每个 Subtask 处理了多少数据,即可判断出 Flink 任务是否存在数据倾斜。
    另外,有时 Checkpoint detail 里不同 SubTask 的 State size 也是一个分析数据倾斜的有用指标。
    2.数据倾斜的解决
    (1)keyBy 后的聚合操作存在数据倾斜
    使用 LocalKeyBy 的思想 :在 keyBy 上游算子数据发送之前,首先在上游算子的本地对数据进行聚合后,再发送到下游,使下游接收到的数据量大大减少,从而使得 keyBy 之后的聚合操作不再是任务的瓶颈。类MapReduce中Combiner的思想,但是这要求聚合操作必须是多条数据或者一批数据才能聚合,单条数据没有办法通过聚合来减少数据量。从 Flink LocalKeyBy 实现原理来讲,必然会存在一个积攒批次的过程,在上游算子中必须攒够一定的数据量,对这些数据聚合后再发送到下游。
    (2)keyBy 之前发生数据倾斜
    如果 keyBy 之前就存在数据倾斜,上游算子的某些实例可能处理的数据较多,某些实例可能处理的数据较少,产生该情况可能是因为数据源的数据本身就不均匀,例如由于某些原因 Kafka 的 topic 中某些 partition 的数据量较大,某些 partition 的数据量较少。对于不存在 keyBy 的 Flink 任务也会出现该情况。
    这种情况,需要让 Flink 任务强制进行 shuffle。使用 shuffle、rebalance 或 rescale 算子即可将数据均匀分配,从而解决数据倾斜的问题。
    (3)keyBy 后的窗口聚合操作存在数据倾斜
    因为使用了窗口,变成了有界数据(攒批)的处理,窗口默认是触发时才会输出一条结果发往下游,所以可以使用两阶段聚合的方式:
    ➢ 第一阶段聚合:key 拼接随机数前缀或后缀,进行 keyby、开窗、聚合
    注意:聚合完不再是 WindowedStream,要获取 WindowEnd 作为窗口标记作为第二阶段分组依据,避免不同窗口的结果聚合到一起)
    ➢ 第二阶段聚合:按照原来的 key 及 windowEnd 作 keyby、聚合

18. Flink调度

调度器是 Flink 作业执行的核心组件,管理作业执行的所有相关过程,包括 JobGraph 到 ExecutionGraph 的转换、作业生命周期管理(作业的发布、取消、停止)、作业的 Task 生命周期管理(Task 的发布、取消、停止)、资源申请与释放、作业和 Task 的 Failover 等。
调度有几个重要的组件:

  • 调度器:SchedulerNG 及其子类、实现类
  • 调度策略:SchedulingStrategy 及其实现类
  • 调度模式:ScheduleMode 包含流和批的调度,有各自不同的调度模式

调度器作用:
1)作业的生命周期管理,如作业的发布、挂起、取消
2)作业执行资源的申请、分配、释放
3)作业的状态管理,作业发布过程中的状态变化和作业异常时的 FailOver 等
4)作业的信息提供,对外提供作业的详细信息
ScheduleMode 决定如何启动 ExecutionGraph 中的 Task。Flink 提供 3 中调度模式:
1)Eager 调度
适用于流计算。一次性申请需要的所有资源,如果资源不足,则作业启动失败。
2)分阶段调度
LAZY_FROM_SOURCES 适用于批处理。从 SourceTask 开始分阶段调度,申请资源的时候,一次性申请本阶段所需要的所有资源。上游 Task 执行完毕后开始调度执行下游的 Task,读取上游的数据,执行本阶段的计算任务,执行完毕之后,调度后一个阶段的 Task,依次进行调度,直到作业完成。
3)分阶段 Slot 重用调度
LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST 适用于批处理。与分阶段调度基本一样,区别在于该模式下使用批处理资源申请模式,可以在资源不足的情况下执行作业,但是需要确保在本阶段的作业执行中没有 Shuffle 行为。目前视线中的 Eager 模式和 LAZY_FROM_SOURCES 模式的资源申请逻辑一样,LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST 是单独的资源申请逻辑。
图片.png调度策略有三种实现:

  • EagerSchedulingStrategy:适用于流计算,同时调度所有的 task
  • LazyFromSourcesSchedulingStrategy:适用于批处理,当输入数据准备好时(上游处
  • 理完)进行 vertices 调度。
  • PipelinedRegionSchedulingStrategy:以流水线的局部为粒度进行调度