getTriggerExecutions()

  • [x] getTriggerExecutions()

    1. // TODO 检查是否需要触发的所有任务都在运行。如果不是,请中止检查点
    2. /**
    3. * Check if all tasks that we need to trigger are running. If not, abort the checkpoint.
    4. *
    5. * @return the executions need to be triggered.
    6. *
    7. * @throws CheckpointException the exception fails checking
    8. */
    9. private Execution[] getTriggerExecutions() throws CheckpointException {
    10. // TODO 创建一个 Execution 数组
    11. Execution[] executions = new Execution[tasksToTrigger.length];
    12. // TODO 遍历 ExecutionVertex
    13. for (int i = 0; i < tasksToTrigger.length; i++) {
    14. // TODO 从 ExecutionVertex 中 顶点任务或者当前任务 [该字段不能为空,如果为 null 代表并非所有必须的任务都在运行 ]
    15. Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
    16. if (ee == null) {
    17. LOG.info("当前并非所有必需的任务都在运行");
    18. LOG.info(
    19. "Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
    20. tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
    21. job);
    22. throw new CheckpointException(
    23. CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
    24. // TODO 如果 Task 生命周期 为 RUNNING , 将 ExecutionVertex 填充进 Execution 数组
    25. } else if (ee.getState() == ExecutionState.RUNNING) {
    26. executions[i] = ee;
    27. } else {
    28. // TODO 不为 null, 状态也不是 Running , 可能是 Task 生命周期中的其他状态 不为 Running 则不能运行
    29. LOG.info(
    30. "Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
    31. tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
    32. job,
    33. ExecutionState.RUNNING,
    34. ee.getState());
    35. throw new CheckpointException(
    36. CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
    37. }
    38. }
    39. return executions;
    40. }