Flink任务频繁出现以下的错误堆栈, 任务无法正常recover
java.lang.IllegalStateExceptionat org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)at org.apache.flink.core.fs.SafetyNetCloseableRegistry.<init>(SafetyNetCloseableRegistry.java:75)at org.apache.flink.core.fs.FileSystemSafetyNet.initializeSafetyNetForThread(FileSystemSafetyNet.java:89)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:620)at java.lang.Thread.run(Thread.java:882)
查看代码
SafetyNetCloseableRegistry() {super(new IdentityHashMap<>());synchronized (REAPER_THREAD_LOCK) {if (0 == GLOBAL_SAFETY_NET_REGISTRY_COUNT) {Preconditions.checkState(null == REAPER_THREAD);REAPER_THREAD = new CloseableReaperThread();REAPER_THREAD.start();}++GLOBAL_SAFETY_NET_REGISTRY_COUNT;}}
这里是task创建的时候,创建SafetyNetCloseableRegistry对象,SafetyNetCloseableRegistry类有两个static变量,两者的变化都由一个static锁保护,上述的报错表示GLOBAL_SAFETY_NET_REGISTRY_COUNT全局计数变为0时,线程变量还存在,但是从代码上看都是线程安全的,只有初始化和close方法会修改,并且都是在REAPER_THREAD_LOCK锁的保护下进行的。
/** Singleton reaper thread takes care of all registries in VM. */private static CloseableReaperThread REAPER_THREAD = null;/** Global count of all instances of SafetyNetCloseableRegistry. */private static int GLOBAL_SAFETY_NET_REGISTRY_COUNT = 0;private static final Object REAPER_THREAD_LOCK = new Object();
close方法
public void close() throws IOException {try {super.close();}finally {synchronized (REAPER_THREAD_LOCK) {--GLOBAL_SAFETY_NET_REGISTRY_COUNT;if (0 == GLOBAL_SAFETY_NET_REGISTRY_COUNT) {REAPER_THREAD.interrupt();REAPER_THREAD = null;}}}}
那么问题出在哪里?实际上我们看初始化块的代码,虽然有锁保护,但是其实还是可能存在异常,这个时候线程变量和全局计数就可能存在对不上的现象,而实际上就是在这里出现了问题,在日志中有如下的异常,创建线程启动的时候,可能由于线程泄露或者系统资源耗尽等原因,创建出现异常导致了这一奇怪的报错
timestamp__)) -> Calc(sele... (1/2) (25688531b03f89c1ea7e7a3c569721bf) switched from DEPLOYING to FAILED.java.lang.OutOfMemoryError: unable to create new native threadat java.lang.Thread.start0(Native Method)at java.lang.Thread.start(Thread.java:850)at org.apache.flink.core.fs.SafetyNetCloseableRegistry.<init>(SafetyNetCloseableRegistry.java:77)at org.apache.flink.core.fs.FileSystemSafetyNet.initializeSafetyNetForThread(FileSystemSafetyNet.java:89)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:620)at java.lang.Thread.run(Thread.java:882)
