1,非法配置异常
如果您看到从 TaskExecutorProcessUtils 或 JobManagerProcessUtils 抛出的
IllegalConfigurationException,通常表明存在无效的配置值(例如负内存大小、大于 1 的分数等)或配置冲突。请重新配置内存参数。
2,Java 堆空间异常
如果报 OutOfMemoryError: Java heap space 异常,通常表示 JVM Heap 太小。可以尝试通过增加总内存来增加 JVM 堆大小。也可以直接为 TaskManager 增加任务堆内存或为 JobManager 增加 JVM 堆内存。
还可以为 TaskManagers 增加框架堆内存,但只有在确定 Flink 框架本身需要更多内存时才应该更改此选项。
3,直接缓冲存储器异常
如果报 OutOfMemoryError: Direct buffer memory 异常,通常表示 JVM 直接内存限制太小或存在直接内存泄漏。检查用户代码或其他外部依赖项是否使用了 JVM 直接内存,以及它是否被正确考虑。可以尝试通过调整直接堆外内存来增加其限制。可以参考如何为 TaskManagers、 JobManagers 和 Flink 设置的 JVM 参数配置堆外内存。
4,元空间异常
如果报 OutOfMemoryError: Metaspace 异常,通常表示 JVM 元空间限制配置得太小。您可以尝试加大 JVM 元空间 TaskManagers 或 JobManagers 选项。
5,网络缓冲区数量不足
如果报 IOException: Insufficient number of network buffers 异常,这仅与TaskManager 相关。通常表示配置的网络内存大小不够大。您可以尝试增加网络内存。
6,超出容器内存异常
如果 Flink 容器尝试分配超出其请求大小(Yarn 或 Kubernetes)的内存,这通常表明 Flink 没有预留足够的本机内存。当容器被部署环境杀死时,可以通过使用外部监控系统或从错误消息中观察到这一点。
如果在 JobManager 进程中遇到这个问题,还可以通过设置排除可能的 JVM Direct Memory 泄漏的选项来开启 JVM Direct Memory 的限制:
jobmanager.memory.enable-jvm-direct-memory-limit: true
如果想手动多分一部分内存给 RocksDB 来防止超用,预防在云原生的环境因 OOM 被 K8S kill,可将 JVM OverHead 内存调大。
之所以不调大 Task Off-Heap,是由于目前 Task Off-Heap 是和 Direct Memeory 混在一起的,即使调大整体,也并不一定会分给 RocksDB 来做 Buffer,所以我们推荐通过调整 JVM OverHead 来解决内存超用的问题。
7,Checkpoint 失败
8,Checkpoint 慢
9,Kafka 动态发现分区
当 FlinkKafkaConsumer 初始化时,每个 subtask 会订阅一批 partition,但是当 Flink 任务运行过程中,如果被订阅的 topic 创建了新的 partition,FlinkKafkaConsumer 如何实现动态发现新创建的 partition 并消费呢?
在使用 FlinkKafkaConsumer 时,可以开启 partition 的动态发现。通过 Properties 指定参数开启(单位是毫秒):
FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS
该参数表示间隔多久检测一次是否有新创建的 partition。默认值是 Long 的最小值, 表示不开启,大于 0 表示开启。开启时会启动一个线程根据传入的 interval 定期获取 Kafka 最新的元数据,新 partition 对应的那一个 subtask 会自动发现并从 earliest 位置开始消费,新创建的 partition 对其他 subtask 并不会产生影响。
代码如下所示:
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVA
L_MILLIS, 30 * 1000 + "");
10,Watermark 不更新
如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着
WatermarkGenerator 也不会获得任何新数据去生成 watermark。我们称这类数据源为空闲输入或空闲源。在这种情况下,当某些其他分区仍然发送事件数据的时候就会出现问题。
比如 Kafka 的 Topic 中,由于某些原因,造成个别 Partition 一直没有新的数据。由于下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值,则其 watermark 将不会发生变化,导致窗口、定时器等不会被触发。
为了解决这个问题,你可以使用 WatermarkStrategy 来检测空闲输入并将其标记为空闲状态。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092");
properties.setProperty("group.id", "fffffffffff");
FlinkKafkaConsumer<String> kafkaSourceFunction = new FlinkKafkaConsumer<>("flinktest",new SimpleStringSchema(),properties);
kafkaSourceFunction.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(2)).withIdleness(Duration.ofMinutes(5)));
env.addSource(kafkaSourceFunction)
11,依赖冲突
ClassNotFoundException/NoSuchMethodError/IncompatibleClassChangeError/…
一般都是因为用户依赖第三方包的版本与 Flink 框架依赖的版本有冲突导致。根据报错信息中的类名,定位到冲突的 jar 包,idea可以借助 maven helper插件查找冲突的有哪些。打包插件建议使用 maven-shade-plugin。
12,超出文件描述符限制
java.io.IOException: Too many open files
首先检查 Linux 系统 ulimit -n 的文件描述符限制,再注意检查程序内是否有资源(如各种连接池的连接)未及时释放。值得注意的是,低版本 Flink 使用 RocksDB 状态后端也有 可 能 会 抛 出 这 个 异 常 , 此 时 需 修 改 flink-conf.yaml 中 的state.backend.rocksdb.files.open 参数,如果不限制,可以改为-1(1.13 默认就是-1)。
13,脏数据导致数据转发失败
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
该异常几乎都是由于程序业务逻辑有误,或者数据流里存在未处理好的脏数据导致的,继续向下追溯异常栈一般就可以看到具体的出错原因,比较常见的如 POJO 内有空字段,或者抽取事件时间的时间戳为 null 等。
14,通讯超时
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://…]] after [10000 ms]
Akka 超时导致,一般有两种原因:一是集群负载比较大或者网络比较拥塞,二是业务逻辑同步调用耗时的外部服务。如果负载或网络问题无法彻底缓解,需考虑调大akka.ask.timeout 参数的值(默认只有 10 秒);另外,调用外部服务时尽量异步操作(Async I/O)。