背景
环境
- Flink1.11
- jdk8
- mysql8
崩溃日志
2020-08-19 02:30:01,082 INFO org.apache.flink.runtime.taskmanager.Task [] - Registering task at network: CHAIN DataSource (at createInput(ExecutionEnvironment.java:605) (org.apache.flink.connector.jdbc.JdbcInputFormat)) -> Map (Key Extractor) (1/1) (0cc5169838d25900d33cb6978ad2ab5c) [DEPLOYING].2020-08-19 02:30:01,082 INFO org.apache.flink.runtime.taskmanager.Task [] - Registering task at network: CHAIN DataSource (at createInput(ExecutionEnvironment.java:605) (org.apache.flink.connector.jdbc.JdbcInputFormat)) -> Map (Key Extractor) (1/1) (e35e98c04fb641b5e8ddc3f18f7c5241) [DEPLOYING].2020-08-19 02:30:01,083 INFO org.apache.flink.runtime.taskmanager.Task [] - CHAIN DataSource (at createInput(ExecutionEnvironment.java:605) (org.apache.flink.connector.jdbc.JdbcInputFormat)) -> Map (Key Extractor) (1/1) (e35e98c04fb641b5e8ddc3f18f7c5241) switched from DEPLOYING to RUNNING.2020-08-19 02:30:01,083 INFO org.apache.flink.runtime.taskmanager.Task [] - CHAIN DataSource (at createInput(ExecutionEnvironment.java:605) (org.apache.flink.connector.jdbc.JdbcInputFormat)) -> Map (Key Extractor) (1/1) (0cc5169838d25900d33cb6978ad2ab5c) switched from DEPLOYING to RUNNING.2020-08-19 02:30:01,104 WARN org.apache.flink.metrics.MetricGroup [] - The operator name DataSource (at createInput(ExecutionEnvironment.java:605) (org.apache.flink.connector.jdbc.JdbcInputFormat)) exceeded the 80 characters length limit and was truncated.2020-08-19 02:30:01,104 WARN org.apache.flink.metrics.MetricGroup [] - The operator name DataSource (at createInput(ExecutionEnvironment.java:605) (org.apache.flink.connector.jdbc.JdbcInputFormat)) exceeded the 80 characters length limit and was truncated.2020-08-19 02:30:02,535 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error occurred while executing the TaskManager. Shutting it down...java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error has occurred. This can mean two things: either the job requires a larger size of JVM metaspace to load classes or there is a class loading leak. In the first case 'taskmanager.memory.jvm-metaspace.size' configuration option should be increased. If the error persists (usually in cluster after several job (re-)submissions) then there is probably a class loading leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...at java.lang.ClassLoader.defineClass1(Native Method) ~[?:1.8.0_121]at java.lang.ClassLoader.defineClass(ClassLoader.java:763) ~[?:1.8.0_121]at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) ~[?:1.8.0_121]at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) ~[?:1.8.0_121]at java.net.URLClassLoader.access$100(URLClassLoader.java:73) ~[?:1.8.0_121]at java.net.URLClassLoader$1.run(URLClassLoader.java:368) ~[?:1.8.0_121]at java.net.URLClassLoader$1.run(URLClassLoader.java:362) ~[?:1.8.0_121]at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_121]at java.net.URLClassLoader.findClass(URLClassLoader.java:361) ~[?:1.8.0_121]at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:71) ~[flink-dist_2.11-1.11.0.jar:1.11.0]at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) [flink-dist_2.11-1.11.0.jar:1.11.0]at java.lang.ClassLoader.loadClass(ClassLoader.java:357) [?:1.8.0_121]at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:199) [blob_p-01458c5a327275241c4ac2c8256594929fe0438a-3f8b768c5fd5f31caa33179fabad5ed7:?]at java.sql.DriverManager.getConnection(DriverManager.java:664) [?:1.8.0_121]at java.sql.DriverManager.getConnection(DriverManager.java:247) [?:1.8.0_121]at org.apache.flink.connector.jdbc.JdbcInputFormat.openInputFormat(JdbcInputFormat.java:148) [blob_p-01458c5a327275241c4ac2c8256594929fe0438a-3f8b768c5fd5f31caa33179fabad5ed7:?]at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:143) [flink-dist_2.11-1.11.0.jar:1.11.0]at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.0.jar:1.11.0]at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.0.jar:1.11.0]at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]2020-08-19 02:30:02,537 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Stopping TaskExecutor akka.tcp://flink@127.0.0.1:36986/user/rpc/taskmanager_0.2020-08-19 02:30:02,537 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close ResourceManager connection 0288ae05d22818283091f6970871e1ff.2020-08-19 02:30:02,708 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job cfde75e38a14a9fe7933c0ff6ecdaa2c.2020-08-19 02:30:02,708 INFO org.apache.flink.runtime.taskmanager.Task [] - Attempting to fail task externally CHAIN DataSource (at createInput(ExecutionEnvironment.java:605) (org.apache.flink.connector.jdbc.JdbcInputFormat)) -> Map (Key Extractor) (1/1) (e35e98c04fb641b5e8ddc3f18f7c5241).2020-08-19 02:30:02,708 WARN org.apache.flink.runtime.taskmanager.Task [] - CHAIN DataSource (at createInput(ExecutionEnvironment.java:605) (org.apache.flink.connector.jdbc.JdbcInputFormat)) -> Map (Key Extractor) (1/1) (e35e98c04fb641b5e8ddc3f18f7c5241) switched from RUNNING to FAILED.
问题思路
MetaSpace负责类的加载和卸载,Class对象太多了无法被释放,自然就OOM了- 这里提出了两种猜想:
- 提交Job的速度太快,内存还没回收完就又要分配
- 有Class对象被创建出来后一直被什么东西持有着,无法被释放
- 在得知该任务的提交速度是半小时一次,果断排除第一种可能
那啥东西一直持有着Class对象呢?我用
Arthas查看了一下ClassLoader[arthas@22458]$ classloadername numberOfInstances loadedCountTotalorg.apache.flink.util.ChildFirstClassLoader 26 10110sun.misc.Launcher$AppClassLoader 1 6976BootstrapClassLoader 1 2892com.taobao.arthas.agent.ArthasClassloader 1 1379sun.reflect.DelegatingClassLoader 292 292sun.misc.Launcher$ExtClassLoader 1 5sun.reflect.misc.MethodUtil 1 1Affect(row-cnt:7) cost in 8 ms.
好家伙,
ChildFirstClassLoader的实例居然有这么多,牛逼奥;还有个DelegatingClassLoader。前者是每次提交任务时创建的ClassLoader,以实现应用间独立。这和Tomcat的WebAppClassLoader很像;后者是反射创建对象时使用的。通过
JVisual回收了一遍,DelegatingClassLoader能够减少,而ChildFirstClassLoader不变。那大致可以锁定每次提交任务时,有什么鬼东西没关掉。思路到这里就没头绪了,因为看代码看不出来啥东西没关。后来在官网上看到几个建议:
大概就是说几个任务共用的依赖直接丢 /lib 目录下,因为每个 job's jar file 都是单独一个 ClassLoader 隔离开的,所以不要放到每个 job's jar file 里面,否则会造成内存泄漏。然后给了几个 MetaSpace OOM 的排查建议:
- 确保
functions/sources/sinks等操作关闭了所有线程 - 避免将对象缓存在除
functions/sources/sinks等生命周期之外的地方- 然后我就又没得思路了,后来在
heapdump中看到了好多mysql-cj-abandoned-connection-cleanup。卧槽,这是什么吊玩意!
- 然后我就又没得思路了,后来在

查了一下,发现是用来关闭
mysql连接的线程,这就有意思了,为什么这个线程不会结束?守护线程么?打开源码一看,我干,还真是。static { cleanupThreadExcecutorService = Executors.newSingleThreadExecutor(r -> { Thread t = new Thread(r, "mysql-cj-abandoned-connection-cleanup"); t.setDaemon(true); } }如果只剩用户线程存在时JVM不会关闭,会等到用户线程执行完毕再退出;但是如果只剩下守护线程,那JVM会直接退出。但是因为Flink的TaskManager是常驻的JVM啊,不会关闭的啊!所以就一直退不出去,导致创建这个
AbandonedConnectionCleanupThread线程的ClassLoader也无法被回收,最终导致MetaSpace OOM。
到此,这个问题就完美的说通了,不过好像还差一点。为什么每次提交任务都会要创建 AbandonedConnectionCleanupThread 呢?换言之,每次提交任务为什么都会重新创建Driver?因为注册这些 Driver 也好, AbandonedConnectionCleanupThread 线程对象也好,都是不同的ClassLoader,每个任务间不知道彼此的存在,所以每次提交都会新建。
解决思路
害,找到问题所在了,和 Flink 官方的建议正好对上,那就按官方的处理,把 mysql 的jar包提出来,丢到 /lib 目录下,作为一个公共的依赖。这层 classpath 应该是由 AppClassLoader 加载的,根据双亲委派模型, AppClassLoader 创建的所有子 ClassLoader (比如 ChildFirstClassLoader )都能看到 mysql 的jar包,那么 Driver 只会被注册一次, AbandonedConnectionCleanupThread 也只会有一个,所有的任务都能正常结束了~
