getTriggerExecutions()
[x]
getTriggerExecutions()
// TODO 检查是否需要触发的所有任务都在运行。如果不是,请中止检查点
/**
* Check if all tasks that we need to trigger are running. If not, abort the checkpoint.
*
* @return the executions need to be triggered.
*
* @throws CheckpointException the exception fails checking
*/
private Execution[] getTriggerExecutions() throws CheckpointException {
// TODO 创建一个 Execution 数组
Execution[] executions = new Execution[tasksToTrigger.length];
// TODO 遍历 ExecutionVertex
for (int i = 0; i < tasksToTrigger.length; i++) {
// TODO 从 ExecutionVertex 中 顶点任务或者当前任务 [该字段不能为空,如果为 null 代表并非所有必须的任务都在运行 ]
Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
if (ee == null) {
LOG.info("当前并非所有必需的任务都在运行");
LOG.info(
"Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job);
throw new CheckpointException(
CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
// TODO 如果 Task 生命周期 为 RUNNING , 将 ExecutionVertex 填充进 Execution 数组
} else if (ee.getState() == ExecutionState.RUNNING) {
executions[i] = ee;
} else {
// TODO 不为 null, 状态也不是 Running , 可能是 Task 生命周期中的其他状态 不为 Running 则不能运行
LOG.info(
"Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job,
ExecutionState.RUNNING,
ee.getState());
throw new CheckpointException(
CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
return executions;
}