Flink任务频繁出现以下的错误堆栈, 任务无法正常recover

    1. java.lang.IllegalStateException
    2. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
    3. at org.apache.flink.core.fs.SafetyNetCloseableRegistry.<init>(SafetyNetCloseableRegistry.java:75)
    4. at org.apache.flink.core.fs.FileSystemSafetyNet.initializeSafetyNetForThread(FileSystemSafetyNet.java:89)
    5. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:620)
    6. at java.lang.Thread.run(Thread.java:882)

    查看代码

    1. SafetyNetCloseableRegistry() {
    2. super(new IdentityHashMap<>());
    3. synchronized (REAPER_THREAD_LOCK) {
    4. if (0 == GLOBAL_SAFETY_NET_REGISTRY_COUNT) {
    5. Preconditions.checkState(null == REAPER_THREAD);
    6. REAPER_THREAD = new CloseableReaperThread();
    7. REAPER_THREAD.start();
    8. }
    9. ++GLOBAL_SAFETY_NET_REGISTRY_COUNT;
    10. }
    11. }

    这里是task创建的时候,创建SafetyNetCloseableRegistry对象,SafetyNetCloseableRegistry类有两个static变量,两者的变化都由一个static锁保护,上述的报错表示GLOBAL_SAFETY_NET_REGISTRY_COUNT全局计数变为0时,线程变量还存在,但是从代码上看都是线程安全的,只有初始化和close方法会修改,并且都是在REAPER_THREAD_LOCK锁的保护下进行的。

    1. /** Singleton reaper thread takes care of all registries in VM. */
    2. private static CloseableReaperThread REAPER_THREAD = null;
    3. /** Global count of all instances of SafetyNetCloseableRegistry. */
    4. private static int GLOBAL_SAFETY_NET_REGISTRY_COUNT = 0;
    5. private static final Object REAPER_THREAD_LOCK = new Object();

    close方法

    1. public void close() throws IOException {
    2. try {
    3. super.close();
    4. }
    5. finally {
    6. synchronized (REAPER_THREAD_LOCK) {
    7. --GLOBAL_SAFETY_NET_REGISTRY_COUNT;
    8. if (0 == GLOBAL_SAFETY_NET_REGISTRY_COUNT) {
    9. REAPER_THREAD.interrupt();
    10. REAPER_THREAD = null;
    11. }
    12. }
    13. }
    14. }

    那么问题出在哪里?实际上我们看初始化块的代码,虽然有锁保护,但是其实还是可能存在异常,这个时候线程变量和全局计数就可能存在对不上的现象,而实际上就是在这里出现了问题,在日志中有如下的异常,创建线程启动的时候,可能由于线程泄露或者系统资源耗尽等原因,创建出现异常导致了这一奇怪的报错

    1. timestamp__)) -> Calc(sele... (1/2) (25688531b03f89c1ea7e7a3c569721bf) switched from DEPLOYING to FAILED.
    2. java.lang.OutOfMemoryError: unable to create new native thread
    3. at java.lang.Thread.start0(Native Method)
    4. at java.lang.Thread.start(Thread.java:850)
    5. at org.apache.flink.core.fs.SafetyNetCloseableRegistry.<init>(SafetyNetCloseableRegistry.java:77)
    6. at org.apache.flink.core.fs.FileSystemSafetyNet.initializeSafetyNetForThread(FileSystemSafetyNet.java:89)
    7. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:620)
    8. at java.lang.Thread.run(Thread.java:882)