1. task-instance queued但不执行

  1. def queue_command(self,
  2. task_instance: TaskInstance,
  3. command: CommandType,
  4. priority: int = 1,
  5. queue: Optional[str] = None, ):
  6. """Queues command to task"""
  7. if task_instance.key not in self.queued_tasks and task_instance.key not in self.running:
  8. self.log.info("Adding to queue: %s", command)
  9. self.queued_tasks[task_instance.key] = (command, priority, queue, task_instance)
  10. else:
  11. self.log.error("could not queue task %s", task_instance.key)
  • task提交过程会按照下面逻辑进行调度
    • scheduler发送当前taskInstance的请求进行对应的queue
    • 触发 queue_command 判断 taskInstance 状态
      • 判断当前 taskInstance 的状态,是否在 set=(queued_task, running) 状态中
      • 如果在上面状态中,进入 queued 状态,准备执行
      • 如果步在上述状态,报错 "cound not queue task taskInstance" ,但是当前状态下 webserver 的上对应 taskInstance 的状态也是 queued 的状态
    • 正常状态的 taskInstance 在Pool和queue并发没有问题的状态下会正常执行

2. celeryExecutor执行任务异常

表现:通过celery调度的Executor接收到task之后不能正常的执行,一直处于queued状态。web-UI上表现为任务pool中存在可用的slot,但是还有任务会卡在queued_slot中
原因:celery的预取机制导致的问题

  • celery的预取机制通过 worker_multiplier_prefetch 参数控制,代表的是Executor上每个celery进程可以预取的任务数。
    • 例如, worker_concurrency=4 AND worker_multiplier_prefetch = 2 代表每个Executor除了可以同时执行4个任务外,还可以预测4个任务在Executor上进行等待执行

解决方法:取消预取机制

  • worker_multiplier_prefetch = 1
  • task_act_late = True