getTriggerExecutions()
[x]
getTriggerExecutions()// TODO 检查是否需要触发的所有任务都在运行。如果不是,请中止检查点/*** Check if all tasks that we need to trigger are running. If not, abort the checkpoint.** @return the executions need to be triggered.** @throws CheckpointException the exception fails checking*/private Execution[] getTriggerExecutions() throws CheckpointException {// TODO 创建一个 Execution 数组Execution[] executions = new Execution[tasksToTrigger.length];// TODO 遍历 ExecutionVertexfor (int i = 0; i < tasksToTrigger.length; i++) {// TODO 从 ExecutionVertex 中 顶点任务或者当前任务 [该字段不能为空,如果为 null 代表并非所有必须的任务都在运行 ]Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();if (ee == null) {LOG.info("当前并非所有必需的任务都在运行");LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",tasksToTrigger[i].getTaskNameWithSubtaskIndex(),job);throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);// TODO 如果 Task 生命周期 为 RUNNING , 将 ExecutionVertex 填充进 Execution 数组} else if (ee.getState() == ExecutionState.RUNNING) {executions[i] = ee;} else {// TODO 不为 null, 状态也不是 Running , 可能是 Task 生命周期中的其他状态 不为 Running 则不能运行LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",tasksToTrigger[i].getTaskNameWithSubtaskIndex(),job,ExecutionState.RUNNING,ee.getState());throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);}}return executions;}
