本文由 简悦 SimpRead 转码, 原文地址 littlemagic.blog.csdn.net

部署和资源问题

(0) JDK 版本过低

这不是个显式错误,但是 JDK 版本过低很有可能会导致 Flink 作业出现各种莫名其妙的问题,因此在生产环境中建议采用 JDK 8 的较高 update(我们使用的是 181)。

(1) Could not build the program from JAR file

该信息不甚准确,因为绝大多数情况下都不是 JAR 包本身有毛病,而是在作业提交过程中出现异常退出了。因此需要查看本次提交产生的客户端日志(默认位于 $FLINK_HOME/logs 目录下),再根据其中的信息定位并解决问题。

(2) ClassNotFoundException/NoSuchMethodError/IncompatibleClassChangeError/…

一般都是因为用户依赖第三方包的版本与 Flink 框架依赖的版本有冲突导致。如果是采用 Maven 做项目管理的话,可参照我之前写的这篇文章来解决冲突。

(3) Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster

就是字面意思,YARN 集群内没有足够的资源启动 Flink 作业。检查一下当前 YARN 集群的状态、正在运行的 YARN App 以及 Flink 作业所处的队列,释放一些资源或者加入新的资源。

(4) java.util.concurrent.TimeoutException: Slot allocation request timed out

slot 分配请求超时,是因为 TaskManager 申请资源时无法正常获得,按照上一条的思路检查即可。

(5) org.apache.flink.util.FlinkException: The assigned slot was removed

TaskManager 的 Container 因为使用资源超限被 kill 掉了。首先需要保证每个 slot 分配到的内存量足够,特殊情况下可以手动配置 SlotSharingGroup 来减少单个 slot 中共享 Task 的数量。如果资源没问题,那么多半就是程序内部发生了内存泄露。建议仔细查看 TaskManager 日志,并按处理 JVM OOM 问题的常规操作来排查。

(6) java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id timed out

TaskManager 心跳超时。有可能是 TaskManager 已经失败,如果没有失败,那么有可能是因为网络不好导致 JobManager 没能收到心跳信号,或者 TaskManager 忙于 GC,无法发送心跳信号。JobManager 会重启心跳超时的 TaskManager,如果频繁出现此异常,应该通过日志进一步定位问题所在。

Flink on YARN 的其他问题,还可以参考这篇,非常有帮助。

作业问题

(1) org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

该异常几乎都是由于程序业务逻辑有误,或者数据流里存在未处理好的脏数据导致的,继续向下追溯异常栈一般就可以看到具体的出错原因,比较常见的如 POJO 内有空字段,或者抽取事件时间的时间戳为 null 等。

(2) java.lang.IllegalStateException: Buffer pool is destroyed || Memory manager has been shut down

很多童鞋拿着这两条异常信息来求助,但实际上它们只是表示 BufferPool、MemoryManager 这些 Flink 运行时组件被销毁,亦即作业已经失败。具体的原因多种多样,根据经验,一般是上一条描述的情况居多(即 Could not forward element to next operator 错误会伴随出现),其次是 JDK 版本问题。具体情况还是要根据 TaskManager 日志具体分析。

(3) akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://…]] after [10000 ms]

Akka 超时导致,一般有两种原因:一是集群负载比较大或者网络比较拥塞,二是业务逻辑同步调用耗时的外部服务。如果负载或网络问题无法彻底缓解,需考虑调大akka.ask.timeout参数的值(默认只有 10 秒);另外,调用外部服务时尽量异步操作(Async I/O)。

(4) java.io.IOException: Too many open files

这个异常我们应该都不陌生,首先检查系统ulimit -n的文件描述符限制,再注意检查程序内是否有资源(如各种连接池的连接)未及时释放。值得注意的是,Flink 使用 RocksDB 状态后端也有可能会抛出这个异常,此时需修改 flink-conf.yaml 中的state.backend.rocksdb.files.open参数,如果不限制,可以改为 - 1。
关于文件描述符的一些有趣知识,可以参见之前我写的这一篇

(5) org.apache.flink.api.common.function.InvalidTypesException: The generic type parameters of ‘’ are missing

在 Flink 内使用 Java Lambda 表达式时,由于类型擦除造成的副作用(详情见这篇文章),注意调用 returns() 方法指定被擦除的类型。

检查点和状态问题

(1) Received checkpoint barrier for checkpoint before completing current checkpoint . Skipping current checkpoint

在当前检查点还未做完时,收到了更新的检查点的 barrier,表示当前检查点不再需要而被取消掉,一般不需要特殊处理。

(2) Checkpoint expired before completing

首先应检查CheckpointConfig.setCheckpointTimeout()方法设定的检查点超时,如果设的太短,适当改长一点。另外就是考虑发生了反压或数据倾斜,或者 barrier 对齐太慢。具体思路不再赘述,看官可以参考这篇文章,非常详细。

(3) org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible

我们知道 Flink 的状态是按 key 组织并保存的,如果程序逻辑内改了 keyBy() 逻辑或者 key 的序列化逻辑,就会导致检查点 / 保存点的数据无法正确恢复。所以如果必须要改 key 相关的东西,就弃用之前的状态数据吧。

(4) org.apache.flink.util.StateMigrationException: The new serializer for a MapState requires state migration in order for the job to proceed. However, migration for MapState currently isn’t supported

在 1.9 之前的 Flink 版本中,如果我们使用 RocksDB 状态后端,并且更改了自用 MapState 的 schema,恢复作业时会抛出此异常,表示不支持更改 schema。这个问题已经在 FLINK-11947 解决,升级版本即可。