Flink任务频繁出现以下的错误堆栈, 任务无法正常recover
java.lang.IllegalStateException
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
at org.apache.flink.core.fs.SafetyNetCloseableRegistry.<init>(SafetyNetCloseableRegistry.java:75)
at org.apache.flink.core.fs.FileSystemSafetyNet.initializeSafetyNetForThread(FileSystemSafetyNet.java:89)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:620)
at java.lang.Thread.run(Thread.java:882)
查看代码
SafetyNetCloseableRegistry() {
super(new IdentityHashMap<>());
synchronized (REAPER_THREAD_LOCK) {
if (0 == GLOBAL_SAFETY_NET_REGISTRY_COUNT) {
Preconditions.checkState(null == REAPER_THREAD);
REAPER_THREAD = new CloseableReaperThread();
REAPER_THREAD.start();
}
++GLOBAL_SAFETY_NET_REGISTRY_COUNT;
}
}
这里是task创建的时候,创建SafetyNetCloseableRegistry对象,SafetyNetCloseableRegistry类有两个static变量,两者的变化都由一个static锁保护,上述的报错表示GLOBAL_SAFETY_NET_REGISTRY_COUNT全局计数变为0时,线程变量还存在,但是从代码上看都是线程安全的,只有初始化和close方法会修改,并且都是在REAPER_THREAD_LOCK锁的保护下进行的。
/** Singleton reaper thread takes care of all registries in VM. */
private static CloseableReaperThread REAPER_THREAD = null;
/** Global count of all instances of SafetyNetCloseableRegistry. */
private static int GLOBAL_SAFETY_NET_REGISTRY_COUNT = 0;
private static final Object REAPER_THREAD_LOCK = new Object();
close方法
public void close() throws IOException {
try {
super.close();
}
finally {
synchronized (REAPER_THREAD_LOCK) {
--GLOBAL_SAFETY_NET_REGISTRY_COUNT;
if (0 == GLOBAL_SAFETY_NET_REGISTRY_COUNT) {
REAPER_THREAD.interrupt();
REAPER_THREAD = null;
}
}
}
}
那么问题出在哪里?实际上我们看初始化块的代码,虽然有锁保护,但是其实还是可能存在异常,这个时候线程变量和全局计数就可能存在对不上的现象,而实际上就是在这里出现了问题,在日志中有如下的异常,创建线程启动的时候,可能由于线程泄露或者系统资源耗尽等原因,创建出现异常导致了这一奇怪的报错
timestamp__)) -> Calc(sele... (1/2) (25688531b03f89c1ea7e7a3c569721bf) switched from DEPLOYING to FAILED.
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:850)
at org.apache.flink.core.fs.SafetyNetCloseableRegistry.<init>(SafetyNetCloseableRegistry.java:77)
at org.apache.flink.core.fs.FileSystemSafetyNet.initializeSafetyNetForThread(FileSystemSafetyNet.java:89)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:620)
at java.lang.Thread.run(Thread.java:882)